在这篇文章中,我们将为您详细介绍python并发编程非阻塞IO模型的内容,并且讨论关于python非阻塞多线程的相关问题。此外,我们还会涉及一些关于java并发编程-Lock与synchronized
在这篇文章中,我们将为您详细介绍python 并发编程 非阻塞IO模型的内容,并且讨论关于python非阻塞多线程的相关问题。此外,我们还会涉及一些关于java 并发编程 - Lock 与 synchronized、Java 并发编程 - 并发难点及解决方法、java 并发编程 - 线程与进程、java 并发编程 - 线程的 join () 示例的知识,以帮助您更全面地了解这个主题。
本文目录一览:- python 并发编程 非阻塞IO模型(python非阻塞多线程)
- java 并发编程 - Lock 与 synchronized
- Java 并发编程 - 并发难点及解决方法
- java 并发编程 - 线程与进程
- java 并发编程 - 线程的 join () 示例
python 并发编程 非阻塞IO模型(python非阻塞多线程)
非阻塞IO(non-blocking IO)
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的,这段是本地拷贝,copy data ),然后返回。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,
此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,
循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,
进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel操作系统内存 数据准备好了没有。
非阻塞IO示例
设置socket接口为 非阻塞IO接口
默认是True 为阻塞
server.setblocking(False)
处理一下这个异常
BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。
from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind((‘127.0.0.1‘,8000)) server.listen(5) # 设置socket接口为 非阻塞IO接口 # 默认是True 为阻塞 server.setblocking(False) print("starting...") while True: try: conn,addr = server.accept() print(addr) except BlockingIOError: print("干其他的工作") server.close()
执行结果,如上面的图,一直返回error消息
starting...
干其他的工作
干其他的工作
干其他的工作
干其他的工作
服务端 可以与 多个客户端建立连接,实现服务端可以不停的建立连接
from socket import * server = socket(AF_INET,8000)) server.listen(5) # 设置socket接口为 非阻塞IO接口 # 默认是True 为阻塞 server.setblocking(False) r_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: pass server.close()
起三个客户端与服务端建立连接
r_list 存着所有建立的连接
有连接来,就建立连接,没有连接来,就抛出异常
实现IO非阻塞 并发 多个连接
from socket import * server = socket(AF_INET,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 定义删除连接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空数据时候 if not data: del_rlist.append(conn) continue conn.send(data.upper()) # 没有连接,抛出异常,就结束这次循环,继续 except BlockingIOError: continue # 套接字出现异常,客户端单方面连接断开 except Exception: conn.close() del_rlist.append(conn) break # 结束上面循环之后,循环del_list 连接元素 删除连接 for conn in del_rlist: del_rlist.remove(conn) server.close()
BUG:send也是IO阻塞接口
当send在数据量过大时候,也会阻塞。
send操作是,把应用程序把数据发送到操作系统缓存区里,而操作系统缓存区空间也是有限的。缓存区也会满了,后面还有数据需要发送,那只能等缓存区清掉数据,有空间了,才能发送数据。所以在这里缓存区满了,就阻塞。
修改后服务端的代码 可以自己检测IO,遇到IO切换单个线程的其他任务,去运行,实现单线程并发
from socket import * server = socket(AF_INET,8000)) server.listen(5) # 设置socket接口为 非阻塞IO接口 # 默认是True 为阻塞 server.setblocking(False) r_list = [] w_list = [] print("starting...") while True: try: conn,addr = server.accept() r_list.append(conn) print(r_list) except BlockingIOError: # 收消息 # 定义删除连接列表 del_rlist = [] for conn in r_list: try: data = conn.recv(1024) # 收空数据时候 if not data: del_rlist.append(conn) continue ‘‘‘加入元祖 元祖有两个元素 1.存放套接字连接 2.准备要发送的的数据 ‘‘‘ w_list.append((conn,data.upper())) # 没有连接,抛出异常,就结束这次循环,继续 except BlockingIOError: continue # 套接字出现异常,客户端单方面连接断开 except Exception: conn.close() del_rlist.append(conn) break # 发消息 # 用于 发成功数据后,删除套接字连接的列表 del_wlist = [] for item in w_list: try: conn = item[0] data = item[1] conn.send(data) # 发成功后,从列表删除连接 del_wlist.append(item) # send 有可能出现异常 没发完情况 except BlockingIOError: pass # 结束上面循环之后,循环del_wlist 连接元素 删除连接 for item in del_wlist: del_wlist.remove(item) # 结束上面循环之后,循环del_rlist 连接元素 删除连接 for conn in del_rlist: del_rlist.remove(conn) server.close()
这就是非阻塞IO
但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
干其他活时候,有可能来新的连接,新的连接来了,不能及时响应与该新的连接,建立连接。所以会导致问题:数据不会及时响应
但是也难掩其缺点:
1. 循环调用recv()将大幅度推高cpu占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。
这会导致整体数据吞吐量的降低。
3.死循环While True会导致cpu的无用的耗用、占用
此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃
java 并发编程 - Lock 与 synchronized
1. ReentrantLock 拥有与 Synchronized 相同的并发性和内存语义,此外还多了 锁投票,定时锁等候和中断锁等候。
线程A和B都要获取对象O的锁定,假设A获取了对象O锁,B将等待A释放对O的锁定,
如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断
如果 使用ReentrantLock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情
ReentrantLock 获取锁定有三种方式:
a) lock(), 如果获取了锁立即返回,如果别的线程持有锁,当前线程则一直处于休眠状态,直到获取锁
b) tryLock(), 如果获取了锁立即返回 true,如果别的线程正持有锁,立即返回 false;
c) tryLock(long timeout,TimeUnit unit), 如果获取了锁定立即返回 true,如果别的线程正持有锁,会等待参数给定的时间,在等待的过程中,如果获取了锁定,就返回 true,如果等待超时,返回 false;
d) lockInterruptibly: 如果获取了锁定立即返回,如果没有获取锁定,当前线程处于休眠状态,直到或者锁定,或者当前线程被别的线程中断
2. synchronized 是在 JVM 层面上实现的,不但可以通过一些监控工具监控 synchronized 的锁定,而且在代码执行时出现异常,JVM 会自动释放锁定;但是使用 Lock 则不行,lock 是通过代码实现的,要保证锁定一定会被释放,就必须将 unLock () 放到 finally {} 中。
3. (下面内容是转载 http://zzhonghe.iteye.com/blog/826162)
5.0 的多线程任务包对于同步的性能方面有了很大的改进,在原有 synchronized 关键字的基础上,又增加了 ReentrantLock,以及各种 Atomic 类。了解其性能的优劣程度,有助与我们在特定的情形下做出正确的选择。
synchronized: 在资源竞争不是很激烈的情况下,偶尔有同步的情形下,synchronized 是很合适的。原因在于,编译程序通常会尽可能的进行优化 synchronize,另外可读性非常好,不管用没用过 5.0 多线程包的程序员都能理解。
ReentrantLock: 提供了多样化的同步,比如有时间限制的同步,可以被 Interrupt 的同步(synchronized 的同步是不能 Interrupt 的)等。在资源竞争不激烈的情形下,性能稍微比 synchronized 差点点。但是当同步非常激烈的时候,synchronized 的性能一下子能下降好几十倍。而 ReentrantLock 确还能维持常态。
Atomic: 和上面的类似,不激烈情况下,性能比 synchronized 略逊,而激烈的时候,也能维持常态。激烈的时候,Atomic 的性能会优于 ReentrantLock 一倍左右。但是其有一个缺点,就是只能同步一个值,一段代码中只能出现一个 Atomic 的变量,多于一个同步无效。因为他不能在多个 Atomic 之间同步。
所以,我们写同步的时候,优先考虑 synchronized,如果有特殊需要,再进一步优化。ReentrantLock 和 Atomic 灵活性和伸缩性更好,但是如果用的不好,不仅不能提高性能,还可能带来灾难。
先贴测试结果,再贴代码.
(其中:Atomic 测试代码不准确,一个同步中只能有 1 个 Actomic,这里用了 2 个,但是这里的测试只看速度)
==========================
round:100000 thread:5
Sync = 35301694
Lock = 56255753
Atom = 43467535
==========================
round:200000 thread:10
Sync = 110514604
Lock = 204235455
Atom = 170535361
==========================
round:300000 thread:15
Sync = 253123791
Lock = 448577123
Atom = 362797227
==========================
round:400000 thread:20
Sync = 16562148262
Lock = 846454786
Atom = 667947183
==========================
round:500000 thread:25
Sync = 26932301731
Lock = 1273354016
Atom = 982564544
代码如下:
package test.thread;
import static java.lang.System.out;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
public class TestSyncMethods
{
public static void test(int round,int threadNum,CyclicBarrier cyclicBarrier)
{
new SyncTest("Sync",round,threadNum,cyclicBarrier).testTime();
new LockTest("Lock",round,threadNum,cyclicBarrier).testTime();
new AtomicTest("Atom",round,threadNum,cyclicBarrier).testTime();
}
public static void main(String args[])
{
for(int i=0;i<5;i++){
int round=100000*(i+1);
int threadNum=5*(i+1);
CyclicBarrier cb=new CyclicBarrier(threadNum*2+1);
out.println("==========================");
out.println("round:"+round+" thread:"+threadNum);
test(round,threadNum,cb);
}
}
}
class SyncTest extends TestTemplate
{
public SyncTest(String _id,int _round,int _threadNum,CyclicBarrier _cb)
{
super( _id, _round, _threadNum, _cb);
}
@Override
/**
* synchronized关键字不在方法签名里面,所以不涉及重载问题
*/
synchronized long getValue()
{
return super.countValue;
}
@Override
synchronized void sumValue()
{
super.countValue+=preInit[index++%round];
}
}
class LockTest extends TestTemplate{
ReentrantLock lock=new ReentrantLock();
public LockTest(String _id,int _round,int _threadNum,CyclicBarrier _cb){
super( _id, _round, _threadNum, _cb);
}
/**
* synchronized关键字不在方法签名里面,所以不涉及重载问题
*/
@Override
long getValue() {
try{
lock.lock();
return super.countValue;
}finally{
lock.unlock();
}
}
@Override
void sumValue() {
try{
lock.lock();
super.countValue+=preInit[index++%round];
}finally{
lock.unlock();
}
}
}
class AtomicTest extends TestTemplate{
public AtomicTest(String _id,int _round,int _threadNum,CyclicBarrier _cb){
super( _id, _round, _threadNum, _cb);
}
@Override
/**
* synchronized关键字不在方法签名里面,所以不涉及重载问题
*/
long getValue() {
return super.countValueAtmoic.get();
}
@Override
void sumValue() {
super.countValueAtmoic.addAndGet(super.preInit[indexAtomic.get()%round]);
}
}
abstract class TestTemplate{
private String id;
protected int round;
private int threadNum;
protected long countValue;
protected AtomicLong countValueAtmoic=new AtomicLong(0);
protected int[] preInit;
protected int index;
protected AtomicInteger indexAtomic=new AtomicInteger(0);
Random r=new Random(47);
//任务栅栏,同批任务,先到达wait的任务挂起,一直等到全部任务到达制定的wait地点后,才能全部唤醒,继续执行
private CyclicBarrier cb;
public TestTemplate(String _id,int _round,int _threadNum,CyclicBarrier _cb){
this.id=_id;
this.round=_round;
this.threadNum=_threadNum;
cb=_cb;
preInit=new int[round];
for(int i=0;i<preInit.length;i++){
preInit[i]=r.nextInt(100);
}
}
abstract void sumValue();
/*
* 对long的操作是非原子的,原子操作只针对32位
* long是64位,底层操作的时候分2个32位读写,因此不是线程安全
*/
abstract long getValue();
public void testTime(){
ExecutorService se=Executors.newCachedThreadPool();
long start=System.nanoTime();
//同时开启2*ThreadNum个数的读写线程
for(int i=0;i<threadNum;i++)
{
se.execute(new Runnable()
{
public void run()
{
for(int i=0;i<round;i++){
sumValue();
}
//每个线程执行完同步方法后就等待
try
{
cb.await();
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
se.execute(new Runnable()
{
public void run()
{
getValue();
try
{
//每个线程执行完同步方法后就等待
cb.await();
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
try
{
//当前统计线程也wait,所以CyclicBarrier的初始值是threadNum*2+1
cb.await();
} catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
//所有线程执行完成之后,才会跑到这一步
long duration=System.nanoTime()-start;
out.println(id+" = "+duration);
}
}
synchronized 的使用场景:
两个线程,一个打印 1-100 的奇数,一个打印 1-100 的偶数;要求:线程 1 打印 5 个之后,线程 2 开始打印,线程 2 打印 5 个之后,线程 1 再开始打印,以此循环。
public class Test { //state==1表示线程1开始打印,state==2表示线程2开始打印 private static int state = 1; private static int num1 = 1; private static int num2 = 2; public static void main(String[] args) { final Test t = new Test(); new Thread(new Runnable() { @Override public void run() { while(num1<100){ //两个线程都用t对象作为锁,保证每个交替期间只有一个线程在打印 synchronized (t) { // 如果state!=1, 说明此时尚未轮到线程1打印, 线程1将调用t的wait()方法, 直到下次被唤醒 if(state!=1){ try { t.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } // 当state=1时, 轮到线程1打印5次数字 for(int j=0; j<5; j++){ System.out.println(num1); num1 += 2; } // 线程1打印完成后, 将state赋值为2, 表示接下来将轮到线程2打印 state = 2; // notifyAll()方法唤醒在t上wait的线程2, 同时线程1将退出同步代码块, 释放t锁 t.notifyAll(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { while(num2<100){ synchronized (t) { if(state!=2){ try { t.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } for(int j=0; j<5; j++){ System.out.println(num2); num2 += 2; } state = 1; t.notifyAll(); } } } }).start(); } }
Java 并发编程 - 并发难点及解决方法
Java 并发编程 - 并发难点及解决方法
前言
本文的目的是有一篇视角不一样的文章,通过技术推理,思维演进的方式来理解技术问题以及解决方案,相关的问题和方案存在着一定的通用性;适用于了解大部分基础的读者,用来从宏观视角思考,需要了解线程的基础操作,cas,aqs 等基础,本文不会介绍具体类的适用,实现的源码分析等重复性工作。此文从包含重要基础概念,与少量实现解析,会从源代码以及书籍中抽取核心概念,实现范式概括。阅读需要一定的前提条件,也可以作为部分书籍的所以然快速补充;
不才之处还请不吝赐教,不足之处会持续补充,不明之处欢迎留言;未经允许,禁止转载.
并发难点
顺序性
因为多个 CPU 执行多个线程,所以谁先谁后就很重要,比如 CPU1 否则读取,CPUB 负责计算,那么计算要保证在读取之后.
就好像你买菜,你媳妇做饭,那么做饭这件事要等待买菜完成,但是两件事是不同的两个人用不同的环境实现的.
现实世界的解决方案就是:你媳妇等你买菜,你买完菜了通知你媳妇可以开始做饭了。这就需要你媳妇等待做饭,你通知她做饭. 此为等待通知机制
可见性
因为 CPU 读取数据的时候有缓存,写出数据的时候有缓冲区,所以 CPU1 读写完的数据并不一定能被 CPU2 看到结果.
就好像你媳妇给你和你老弟发短信去买白菜,如果你们俩不带电话,记在脑子里 (缓存数据), 那么你媳妇此时改变主意了,要买黄瓜了 (其他线程修改数据), 你就不知道。所以需要带上电话,你媳妇改变注意的时候,你俩就不知道,就会买错菜.
现实世界的解决方案就是,你们带上电话,你媳妇改变主意了给你们再发信息,此为广播通知 , 你们收到消息,知道改变主意了,在采购的时候重新查看一下短信,获取最新的购买需求,此为缓存一致.
原子性
因为多个 CPU 执行多个线程,所以无法保证一个数据的读写只能被一个 CPU 的一个线程执行,中间可能有其他 CPU 的其他线程干预.
就好像你媳妇做好的菜饭,需要放到橱柜,此时菜能吃,但是要防止你那个调皮的儿子来吐口水,如果吐了口水,则不能吃了.
现实世界的解决方案就是,给你儿子锁屋子里,但是锁太大了 (总线锁), 妨碍了你儿子玩你,所以锁了橱柜 (缓存锁), 你儿子仅仅不能碰那些大米饭.
解决方法
对某个资源加锁来保证原子性
多个操作者操作一个资源,需要对资源上锁,处理完再解锁。这样可以保证数据原子操作,就好像互联网常用的分布式锁 ,CPU 也会有总线锁 / 缓存锁.
锁定的是公共资源
从现实世界来说,我们在使用公共资源的时候,通常都会独占某公共资源;但是你使用你的胳膊,嗓子,眼睛的时候,这是你的私有资源,不会影响别人,除非别人想摘掉你的胳膊,否则你的私有资源使用与别人无关.
注意锁的粒度
就好像你把你儿子关起来,还是把饭菜锁起来一样,锁需要有一个粒度,很明显的是,在入库的时候我们不会让整个系统其他所有入库操作都禁止,而是当前操作的那个仓库被锁住,想想商城里的公共卫生间不会因为有人进入卫生间,而锁住整个商场不让其他人进出.
同样,CPU 在上锁的时候有两个选择①将整个 CPU 操作的总线锁定,②将会涉及到的缓存锁定。很明显锁定缓存让锁的粒度更小,其他 CPU 受到的影响也小.
因此我们要知道,锁的粒度会对系统性能有很大影响.
用比较和替换 (CAS) 来保证原子性
其实计算机中的数据只有读写两种操作,原子性操作就是保证你读取和写入之间没有别人写入。我们可以在写入的时候比较一下,如果是从当初你读取的值写入到当前值,则是中间没有操作入过,否则就是有操作写入过。这就是 cas 操作,比较后替换.
就好像你听见儿子说他饿了 (读取儿子的状态), 你去拿来奶喂它 (写入充饥的数据), 如果你去拿奶的时候你小舅子来照顾过你的儿子给他奶了,你再喂他,他可能就吃多了撑得慌,为了避免这种情况,你需要喂他之前再问问你儿子 "小家伙,你还饿么?" 如果跟之前的信息 (饿) 不一致,则放弃喂奶.
用排队等待解决顺序性
多个操作的顺序乱序,可以让他们排队操作,此时需要等待通知来实现。首先要锁定这个资源,然后争抢使用这个资源的操作需要排队.
等待的一定是个公共资源
并且你只会在你所等待的公共资源的队列上排队;就好像你在烧烤店门口排队,那么你等待的资源就是烧烤店,而等待卫生间的人不会到烧烤店门口排队,所以你们俩之前不竞争,而是排队等待卫生间的那些人存在竞同一资源。换言之:这个世界上肯定是你占用了某资源才造成排队,比如公共电话亭,比如公共卫生间.
被动等待
就好像很多人去银行排队取钱,银行会给你一个号 (排队), 到你了用喇叭喊你 (广播通知), 你听见广播了 (唤醒) 就去柜台,在你拿到排队号的时候,你没得选,保安要求你必须排队等待。这是被动的等待,由 Jvm 给我们提供的 shynchronized 关键字.
主动等待
但是你在取钱的时候发现银行现在没钱了,那么你有两个选择,①放弃取钱,②等待总行来送钱。此时你会重新排队,但是队列不一样,是一个等待拿钱就走的队列,那么此时进入等待是你主动的,在运钞车来的时候,会通知等着钱的这个队列的人,给他们现金然后让他们走人。由 jdk 为我们提供的 Object.wait 和 Object.notify 方法.
一定要获取锁才能主动等待
因为你一定要独占了这个资源,才有能力分辨出这个资源是否符合你操作的需要,否则别人也操作这个资源,你的判断可能就不准确.
就好像在卫生间门口排队,自然就是等待,而你进入卫生间才会发现里面没有纸,此时你已经获取了卫生间的使用权 (关门,上锁) 才能发现,那么你很可能让那些不需要纸就能上厕所的人先用,所以你出来了并重新排队 (wait). 所以你看,主动等待的时候会释放锁.
由此可见排队等待必须在某个资源上排队,也是唤醒排队某个资源的等待队列,所以 synchronized 关键字要对某个对象加锁,Object.wait 和 Object.notify 也是针对某个对象上的队列.
让缓存失效保证可见性
每个操作之间有缓存,需要通过广播让缓存失效,失效之后要重新获取下最新数据。比如通过 MQ 发布一个主题消息,通知其他机器某个缓存失效了,订阅主题的机器收到消息之后将本地的缓存标记失效,这是分布式系统上的,从 CPU 微观来说,连接 MQ 的网线是 CPU 的总线,类似的机制实现就是 CPU 缓存一致性.
我们最好不用 (也最好不要) 每次失效都重新读取数据,因为如果在时间点 1->2 之间失效 100 次,那么则将重新获取 100 次数据,而这 100 次获取数据都不一定会被读取一次。就好像你 10 台机器上都有用户信息,但是不一定每台的用户信息都会被读取.(此为惰性缓存)
宏观来看
如果你不了解什么是微服务,不知道 zookeeper, 不知道分布式锁,不知道消息队列服务器,可以不看这些.
你完全可以将上面这些过程想象成很多个微服务实例下,拥有自己的本地缓存,在操作某个缓存数据的时候,会在 zookeeper 上加一个分布式锁,处理完成的时候会在 mq 上某个主题发布消息,通知缓存被修改,其他实例收到消息后,将本地缓存标记失效,在下次读取数据操作的时候,重新加载缓存.
这个时候你就发现,原本的 CPU 总线换成了网线传输数据,原本 CPU 的 1,2,3 级缓存变成了微服务的本地缓存,原本的缓存锁变成了分布式锁.
但是这还不够,有的时候用户拿到数据等了很久才提交,所以我们的数据上会有一个版本号,每次提交增加版本号,每次写回数据库的时候比较当前数据库版本和用户提交的版本,这就是 cpu 上的 cas 操作.
如何实现
比较替换的实现
在 CPU 层面为我们提供了 cmpxchg 指令来实现,这是个 IA-32 的汇编指令.
缓存加锁的实现
在 CPU 层面为我们提供了 lock 指令前缀来实现,这个指令会锁定总线 / 缓存,然后独享数据的去修改,并且修改完之后会将当前 CPU 缓冲区写会主存。这是个 IA-32 的汇编指令.
缓存失效的实现
在 CPU 层面为我们提供了总线广播监听机制,这个机制可以监听缓存的失效,并将其标记失效,下次看到失效的缓存则从主存读取,这个机制的学名为 MESI (缓存一致性协议).
线程等待的实现
等待有两种方式,一个是一直等待,一个是超时等待,一直等待直接将线程保存起来,等到有唤醒的时候再调度执行就行。超时等待则需要一个定时器.
定时器的具体实现是 OS 层面的东西,比较复杂,但是现在很成熟,同时有硬件层面的支持。硬件层面有一个固定时钟和计数器,每一个时钟周期将计数器递减,计数器为 0 的时候触发 CPU 中断,这看上去很消耗 CPU, 因为每次 CPU 的周期都需要检查。也就是说操作系统级别的等待是硬件级别的操作,所以消耗是很小很小的.
其实无论如何都离不开不断检查,从现实世界来说,如果我们想等待到某时间点,唯一的办法 (不考虑闹钟等) 就是不断的检查当前时间和任务列表。不可能有其他办法。考虑其实等待到某时间点和等待一段时间其实都是一样的,所以只需要一个定时检查的计时器就可以了。这就是为什么一个线程能等待一段时间.
- 等到某时间点 = 当前时间 + 需要等待的一段时间
- 等待一段时间 = 最终时间点 - 当前时
排队的实现思路
有了计时器,我们在加上排队,就可以实现排队 - 等待 - 通知 , 排队的实现需要一个队列,不管你是基于链表,还是数组,总之需要一个排队的地方,现实世界中有排队的地方,就有插队的人,不同的是看管理队列的人允不允许插队而已.
此时有两种实现思路,公平排队和非公平排队.
- 非公平排队就好像排队吃饭,门口拥堵了一大堆人,老板每次有空桌出来叫一个人,谁挤进去就是谁,这样挤得快的人马上进去吃饭,是不公平的。因为可能有人等了半天,就因为一不小心走神,就被你捷足先登了.
- 公平排队拿到号码之后老板按号叫人,到你了你就进去,这样一定公平,因为一定是有序的。谁先来谁先进,但是同时会牺牲排队的效率,因为你可能昨晚没睡好,叫你进去的时候你磨蹭半天没反应,这就浪费时间了,而且老板还要维护这样一堆的号码牌.
Java 层面的实现
Jvm 级别
首先 Java 的线程跟 OS 线程是 1:1 映射关系,所以底层会有 OS 的系统调用库来实现线程创建,销毁,启动,等待操作,然后封装成 Thread 类操作。提供 synchronized 实现对一个对象资源的锁定操作,提供 volatile 实现对 CPU 缓存的锁定和缓冲区 flush 的操作.
Jdk 级别
然后在 JDK 层面,为我们提供了 Object.wait 和 Object.notify 方法与 synchronized 配合,来实现排队 - 等待 - 通知机制.
CPU 级别的 CAS 操作,jdk 提供了 UNSAFE 类来操作,这个类里面有很多 CAS 操作的方法.
JUC 级别
最后在 JUC 包内,为我们提供了 LockSupport 类,让我们可以直接操作一个线程进入等待,和唤醒线程;有了 LockSupport 类,便可以向 OS 操作线程一样去实现高级的排队 - 等待 - 通知 - 唤醒等操作,于是提供了 AQS 线程同步框架.
有了 AQS 线程同步框架,便可以利用将线程排队 - 等待这些操作,实现非常灵活的 Lock 类.
编程实现范式
普通等待通知
// 获得锁,判断条件是否满足,不满足,则等待
// 一个线程也能在没有被通知、中断或超时的情况下唤醒,也即所谓的“虚假唤醒”,虽然这点在实践中很少发生,应用应该检测导致线程唤醒的条件,并在条件不满足的情况下继续等待,以此来防止这一点。
synchronized (obj) {
while (<condition does not hold>)
obj.wait(timeout);
}
// 摘录来自: jdk 源代码 wait方法注释
普通等待通知且支持超时
// 获得锁,计算结束时间,循环判断条件是否满足,等待剩余时间时间,被唤醒之后检查条件,并且重新计算等待时间
public synchronized Object get(long mills) throws InterruptedException {
long future = System.currentTimeMillis() + mills;
long remaining = mills;
// 当超时大于0并且result返回值不满足要求
while ((result == null) && remaining > 0) {
wait(remaining);
remaining = future - System.currentTimeMillis();
}
return result;
}
// 摘录来自: 方腾飞,魏鹏,程晓明 著. “Java并发编程的艺术 (Java核心技术系列)。” Apple Books.
具体的实现类
Object 类
Object 类提供了线程的基础等待通知实现方法,这些方法要求在 synchronized 关键字范围内的代码段执行,即:获取了 jvm 提供的锁.
之所以定义在 Object 方法中,一部分原因因为 synchronized 锁是在某个对象头加锁,换言之就是锁定某个对象资源。然后进行排队等待等调度,之所以这样设计,是因为必须要有一个资源上加锁之后才能进入等待队列,所以你需要锁定这个对象,然后调用这个对象上的 wait-notify, 来实现某个资源的锁定和等待通知机制.
- wait 方法需要先获取对象的锁,因为 wait 需要先释放持有的锁,然后进入该对象的等待队列。
- notify 方法需要先获取对象的锁,因为 notify 方法会在锁的对象对应的等待队列唤醒(一个或全部)线程。
Thread 类
- static currentThread 获取当前线程
- static yield 建议调度器让出 CPU
- static sleep 让当前线程睡眠
- interrupt 设置线程的中断标志位为 true
- static interrupted 判断线程中断标识位,并且清除标志位,换言之:如果调用两次该方法,第二次一定返回 false
- isInterrupted 判断线程中断标识位,但是不会影响标志位状态.
- isAlive 表示当前线程启动了,但是没有运行结束
Thread.join 实现原理
该方法是一个同步方法,使用一个 this.wait 的循环来实现当前线程的等待,while 的条件是当前线程还存活,wait 导致调用线程的等待,在 join 的线程结束的时候会调用 this.notifyAll 来唤醒当前等待的线程,这个方法是 jvm 负责调用。源代码中有一句话:不建议在程序中的线程实例中使用 wait,notify 或者 notifyAll.
It is recommended that applications not use wait, notify, or notifyAll on Thread instances.
ThreadLocal 类
作用
ThreadLocal 可以在当前线程存储一个变量。核心原理是在当前线程对象下创建一个 map, 然后将当前 ThreadLocal 的实例作为 key 存储变量.
关键点
- ThreadLocal 跟 Thrad 类都属于
java.lang
, 所以默认的成员变量可以互相访问. - 在 Thread 类中存放一个 threadLocals 属性,该属性是 ThreadLocal 下的静态内部类.
- ThreadLocal 在调用
set,get
方法的时候,会去初始化这个属性. - ThreadLocal 将自己的 this 作为 key 保存自己想要存放的变量到 threadLocal 属性.
安全隐患 如果线程没有被销毁,那么线程内部的 threadLocals 变量将会一直保持引用,无法回收,如果调用 ThreadLocal#set 方法设置的变量没有调用 remove 方法清理,则一直保持在 ThreadLocalMap 中,此时保持的值处于一个无法被回收的状态.
LockSupport 类
LockSupport 类提供了 park 系列和 unpark 系列方,内部调用 UNSAFE 类实现,UNSAFE 类由 JVM 在本地 C++ 中实现,具体实现与平台有关,park 方法在 Linux 下使用的是系统方法 pthread_cond_wait 实现;unpark 方法在 Linux 下是使用 pthread_cond_signal 实现的.
在 park (Object) 的时候,dump 线程会发现由 wait 的对象信息,是存储在 Thread.parkBlocker 字段上,通过 UNSAFE.objectFieldOffset 写到线程对象上的.
AQS 线程同步器
线程同步器实现了一个线程排队等待的框架,线程排队等待的实现离不开两点实现:①创建一个排队的线程,因为需要前后节节点的快速获取,此队列通常使用链表实现;②利用线程的等待通知机制将队列中的线程阻塞住.Java 中的线程和 OS 的线程是 1:1 映射的,线程的等待利用的是 OS 的系统调用实现,封装成了 Jdk 中的 LockSupport 类;这个机制保证了线程的有序性.
线程同步器还提供了一种原子性争抢资源的能力,他内部存储了一个状态标志,用 volatile 修饰,是一个数字类型,利用 cas 操作进行修改操作,如果操作成功,表示这个操作期间没有其他线程竞争,或者其他线程都竞争失败,此时其他线程发现自己竞争失败则进入等待队列,竞争成功的线程得以真正的执行,执行结束的时候再次修改这个状态表示,此时因为 volatile 的语义特性,最后这个修改操作会保证数据被 (所有 CPU 缓冲区) 写回主存,这个操作间接的保证了线程的原子性,可见性.
线程同步器提供了独占和共享的两组获取锁和释放锁的抽象方法
- tryAcquire 返回 true 表示获取到了锁,false 表示没有获取到锁,并且会进入自旋状态.
- tryAcquireShared 返回负数表示获取失败,0 表示并发量耗尽 (不能再获取了), 正数表示获取成功,还能继续获取,(但是正数本身的值不做意义)
线程等待实现
实现线程等待在一个死循环中无限尝试获取锁,直到成功或者被中断,在执行过程中有可能会被 LockSupport.park 方法暂停掉,在首节点执行完毕的时候会唤醒排在其之后的第一个可运行节点,节点被唤醒之后马上重新在死循环中争抢锁.
总的来说,就是在死循环中处理等待通知机制.
独占锁的处理
- 线程首先尝试获取资源,如果获取失败,则使用 cas 操作加入等待队列;如果获取成功,则标记为首节点.
- 获取失败的线程进入资源申请的死循环,每次循环首先检查,如果前驱节点是首结点,则尝试 CAS 获取锁,如果不是,则调用 LockSupport.park 进入等待状态.
- 首节点的任务处理完毕之后会释放锁,在释放锁的时候唤醒下一个可执行的节点的线程.
- 唤醒的线程如果处于次 (前驱是首节点) 节点,此时线程应该处于一个自旋获取锁的状态,此时将快速的获取锁,否则处于一个等待状态,则恢复开始再次获取资源或进入等待.
共享锁的处理
初始一个令牌桶,桶中包含多个锁,锁的数量影响并发数量,,每次获取锁,则从桶中拿走一个 (或者多个) 令牌,每次释放,则从桶中放回锁,每次取出和释放的过程使用 cas 操作,保证原子性,则允许通过的线程是:桶大小 (锁总数)/ 桶粒度 (每次获取锁数)
Condition 等待条件
Condition 在自己内部又做了一个队列,在 wait 的时候将当前线程放入队列,在 notify 的时候从队列唤醒,因为 condition 要求在调用 wait-notify 的时候必须获取锁,所以内部在 wait 和 notify 的时候无需 cas 操作.
如何保证可见性
unlock 操作会 cas 修改一个 volatile 变量,而 volatile 变量的修改会运行一条 LOCK
指令,该指令会锁缓存 (或者总线) 保证其他 CPU 不会并发访问数据,并且写完数据之后会立即将这颗 CPU 的将缓冲区数据刷入主存,同时
并发工具类
CountDownLatch
利用 AQS 实现,比如 5 个线程,初始设置 state=5; 每次结束一个线程 (countDown) 做减法: state=state-1;await 等待方法则是从申请 1 个值,但是在 tryAcquireShared 方法中实现的是如果状态不是 0, 则不允许,如果是 0, 则允许.
Semaphore
利用 AQS 实现,类似 CountDownLatch, 初始化一个令牌桶,每次申请 N 个令牌桶,不够申请则等待。区别是 Semaphore 只要桶内数量足够申请就行,CountDownLatch 要求必须只剩下 0 个.
CyclicBarrier
利用 ReentrantLock 以及 ReentrantLock#newCondition , 内部使用一个 int count 做计数,每次 await 一个线程的时候,加锁,做 count--, 如果 count 是 0, 则重新开始,重新开始的时候 condition.notifyall; 如果不是 0, 则 condition.await (此时会释放锁);
原子操作类
大多数原子类,都是利用一个 Unsafe 类和一个 volatile 变量包装的。执行原子更新的时候适用 cas 方式来执行更新.
Unsafe 类底层实现依赖于汇编指令 cmpxchg 实现原子性操作.
但是 lazySet 却是使用 UNSAFE.putOrderedInt 方法执行的,底层虚拟机调用的是 C++ 方法的 SET_FIELD_VOLATILE(obj, offset, jlong, x);
仅仅是实现了 volatile 的语义操作,该操作导致修改后不是立即可见的.
计数器类
核心实现是创建一个 base 值,做基础累加,在 cas 操作 base 值失败之后,根据 CPU 数量创建多个 Cell, 将计算的逻辑分散到各个 cell 中,此时每个 CPU 最多操作自身的一个 cell, 几乎是资源独占的模式工作,所以累加效率会大有提高,但是读取值的时候需要将所有 cell 以及 base 值,读取速度可能变慢.
就好像是一个分桶排序的算法,因为一个桶放了太多数排序太慢,而分为多个桶来计算;或者说数据库表太大,则分成多个表.
其实我们的程序大多数都是写多读少的
并发编程锁
Java 中锁的实现都是基于 AQS 实现的,具体一个锁的实现会选择实现 AQS 的独占锁或者共享锁的一种,在初始化的时候初始化一个 state, 独占锁通常为 1, 共享锁可能是大于 1, 获取锁则 acquire (1), 释放锁则 release (1).
ReentrantLock
ReentrantLock 实现了初始 state=0 的一个 AQS, 每次 acquire 则增加 1,release 则减少 1
重入实现
acquire 的时候如果当前线程是获取到锁的线程,则会再次增加 1, 来实现重入获取,release 在发现 state=0 的时候,则将当前获取锁的线程标记为 null, 来实现重入释放.state 不是 0, 且当前线程没有持有锁,则不允许获取锁.
公平与非公平实现
公平锁在请求锁的时候会判断是不是有一个前驱节点在排队,如果有则放弃 cas 操作 (tryAcquire 返回 false), 非公平锁则不判断直接进行 cas 操作.
判断前驱节点在排队的方法是查看前首节点 (当前执行的节点) 的下一节点不是当前线程,换言之:当前线程不是队列中排队第二的待执行线程.
ReentrantReadWriteLock
读写锁实现了 AQS 的共享锁和独占锁两种模式,原理是将一个 int 值,按位分割成高 16 位来标记读,和低 16 位来标记写。内部有一个 int exclusiveCount (int c) 用来计算独占锁的数量,一个 int sharedCount (int c) 用来计算共享锁的数量,
写锁的获取在有读锁已被获取的情况下进行等待,没有读的时候跟 ReentrantLock 类似;读锁在没有写锁的情况下就可以用 cas 尝试更新 state 了,失败则放入等待队列,用死循环来等待.
写锁重入的实现跟 ReentrantLock 类似,只不过只能支持低 16 位最大数次重入,读锁的重入利用 ThreadLocale 来存放重入次数.
写锁获取
tryAcquire 方法注释说:
- 如果读状态不是 0, 或者写锁 (独占锁) 已经被获取,单不是当前线程,则直接失败 ()
- 如果写锁的重入次数太大,(16 位存放不下) 则抛出 error (为什么是 error, 不是 exception 呢?)
- 其他情况下,这个线程将会执行类似 ReentrantLock 的重入锁获取机制
读锁获取
tryAcquireShared 方法注释说:
-
如果独占锁数量不是 0, 并且获取的线程不是当前线程,则获取失败
-
其他情况下判断是否需要进入队列 (此处用于处理公平不公平实现), 如果不需要,则尝试 CAS 更新 state
-
在上一步失败的情况下。进入一个循环重试的状态,死循环尝试获取锁,直到成功.
锁降级
根据读锁的获取流程可以看出,获取读锁的时候,如果当前线程已经获取了独占的写锁,也可以获取读锁,此被称为锁降级. 但是写锁获取的时候,如果已经有读锁了,直接放弃,意味着无法锁升级.
锁降级是位了写完数据之后马上可以读取到数据,并且在独占的写锁中获取读锁,然后释放写锁,保证了在获取读锁的过程中不会有其他线程写入数据,保证自己写完之后看到的数据是最新的.
总结
我们讨论了 Java 并发编程的基础,从其面临的困难,以及如何解决这些困难的方案,到方案的具体实现。本文的基础章节很啰嗦,但是他很重要,那些高级的工具类,以及 lock 都是如何实现的并发包,但是基础章节却是为何要这么实现,以及如何思考到这些东西的思维演进。有了这些基础,我们可以很容易的实现线程池,阻塞队列,等高级工具.
- 并发编程的难点,这些难点在现实世界的样子
- 这些难点在现实世界以及计算机中的解决方案
- 这些解决方案的实现思路
- 这些思路的具体实现
- 这些具体实现的高级工具
java 并发编程 - 线程与进程
线程概念
线程就是程序中单独顺序的流控制。
线程本身不能运行,它只能用于程序中。
说明:线程是程序内的顺序控制流,只能使用分配给程序的资源和环境。
进程
进程:执行中的程序。
程序是静态的概念,进程是动态的概念。
一个进程可以包含一个或多个线程。
一个进程至少要包含一个线程。
线程与进程的区别
多个进程的内部数据和状态都是完全独立的,而多线程是共享一块内存空间和一组系统资源,有可能互相影响。
线程本身的数据通常只有寄存器数据,以及一个程序执行时使用的堆栈,所以线程的切换负担比进程切换的负担要小。
多线程程序比多进程程序需要更少的管理费用。
进程是重量级的任务,需要分配给它们独立的地址空间,进程间通信是昂贵和受限的,进程间的转换也是很需要花费的。
另一方面,线程是轻量级的选手,它们共享相同的地址空间并且共同分享同一个进程,线程间的通信是便宜的,线程间的转换也是低成本的。
单线程
单个程序中只有一个线程就是单线程。
当程序启动运行时,就自动产生一个线程,主方法 main 就在这个主线程上运行。我们的程序都是由线程来执行的。
多线程
多线程指在单个程序中可以同时运行多个不同的线程执行不同的任务。
多线程编程的目的,就是 “最大限度地利用 CPU 资源”,当某一线程的处理不需要占用 CPU 而只和 IO 等资源打交道时,让需要占用 CPU 的其他线程有机会获得 CPU 资源。从根本上说,这就是多线程编程的最终目的。
一个程序实现多个代码同时交替运行就需要产生多个线程。
CPU 随机地抽出时间,让我们的程序一会做这件事情,一会做另外的事情。
从宏观角度来看,多个线程在同时执行(宏观并行),但是微观上来看,处理器的个数决定了某一个时刻可以同时运行的最大线程数,如单核 CPU 某一时刻只能有一个线程在执行(微观串行),双核的 CPU 在某一个时刻,最多可以运行两个线程,可以做到微观并行。
Java 中的多线程
同其他大多数编程语言不同,Java 内置支持多线程编程 (Multithreaded Programming)。
多线程程序包含两条或两条以上并发运行的部分,程序中每个这样的部分都叫做一个线程(Thread)。每个线程都有独立的执行路径,因此多线程是多任务处理的一种特殊形式。
多任务处理被所有的现代操作系统所支持。然而,多任务处理有两种截然不同的类型:基于进程的和基于线程的。
1. 基于进程的多任务处理是更熟悉的形式。进程(process)本质上是一个执行的程序。因此基于进程的多任务处理的特点是允许你的计算机同时运行两个或更多的程序。
举例来说,基于进程的多任务处理使你在运用文本编辑器的时候可以同时运行 Java 编译器。
在基于进程的多任务处理中,程序是调度程序所分派的最小代码单位。
2. 而在基于线程(thread-based)的多任务处理环境中,线程是最小的执行单位。
这意味着一个程序可以同时执行两个或者多个任务的功能。
例如,一个文本编辑器可以在打印的同时格式化文本。
Java 线程模型
Java 多线程的优点就在于取消了主循环 / 轮询机制。一个线程可以暂停而不影响程序的其他部分。
多线程允许活的循环在每一帧间隙中沉睡一秒而不暂停整个系统。
线程组
所有线程都隶属于一个线程组。那可以是一个默认线程组,也可以是一个创建线程时明确指定的组。
说明:
在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。
若创建多个线程而不指定一个组,它们就会与创建它的线程属于同一个组。
java 并发编程 - 线程的 join () 示例
Java7 API: http://docs.oracle.com/javase/7/docs/api/java/lang/Thread.html#join()
public final void join()
throws InterruptedException
Waits for this thread to die.
An invocation of this method behaves in exactly the same way as the invocation
join(0)
Throws:
InterruptedException - if any thread has interrupted the current thread. The interrupted status of the current thread is cleared when this exception is thrown.
public final void join(long millis)
throws InterruptedException
Waits at most millis milliseconds for this thread to die. A timeout of 0 means to wait forever.
This implementation uses a loop of this.wait calls conditioned on this.isAlive. As a thread terminates the this.notifyAll method is invoked. It is recommended that applications not use wait, notify, or notifyAll on Thread instances.
Parameters:
millis - the time to wait in milliseconds
Throws:
IllegalArgumentException - if the value of millis is negative
InterruptedException - if any thread has interrupted the current thread. The interrupted status of the current thread is cleared when this exception is thrown.
public final void join(long millis,
int nanos)
throws InterruptedException
Waits at most millis milliseconds plus nanos nanoseconds for this thread to die.
This implementation uses a loop of this.wait calls conditioned on this.isAlive. As a thread terminates the this.notifyAll method is invoked. It is recommended that applications not use wait, notify, or notifyAll on Thread instances.
Parameters:
millis - the time to wait in milliseconds
nanos - 0-999999 additional nanoseconds to wait
Throws:
IllegalArgumentException - if the value of millis is negative, or the value of nanos is not in the range 0-999999
InterruptedException - if any thread has interrupted the current thread. The interrupted status of the current thread is cleared when this exception is thrown.
Java Tourial: http://docs.oracle.com/javase/tutorial/essential/concurrency/join.html
Joins
The join method allows one thread to wait for the completion of another. If t is a Thread object whose thread is currently executing,
t.join();
causes the current thread to pause execution until t''s thread terminates. Overloads of join allow the programmer to specify a waiting period. However, as with sleep, join is dependent on the OS for timing, so you should not assume that join will wait exactly as long as you specify.
Like sleep, join responds to an interrupt by exiting with an InterruptedException.
用处
t.jion () 使得当前线程暂停,直到线程 t 终止,才继续执行。
如此,在多线程时,可以确保线程的执行顺序。
代码 1:
public class CustomThread1 extends Thread
{
public CustomThread1()
{
super("CustomThread1");
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
try
{
for (int i = 0; i < 5; i++)
{
Thread.sleep(1000);
System.out.println(threadName + " loop at " + i);
}
System.out.println(threadName + " end.");
} catch (Exception e)
{
System.out.println("Exception from " + threadName + ".run");
}
}
}
public class CustomThread2 extends Thread
{
CustomThread1 t1;
public CustomThread2(CustomThread1 t1)
{
super("CustomThread2");
this.t1 = t1;
}
@Override
public void run()
{
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
try
{
System.out.println("t1 join again");
t1.join();
System.out.println("t1 joined again");
System.out.println(threadName + " end.");
} catch (Exception e)
{
System.out.println("Exception from " + threadName + ".run");
}
}
}
public class JoinTestDemo
{
public static void main(String[] args)
{
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
CustomThread1 t1 = new CustomThread1();
CustomThread2 t2 = new CustomThread2(t1);
try
{
t1.start();
Thread.sleep(3000);
System.out.println("t1 jion");
t1.join();
System.out.println("t1 jioned");
t2.start();
System.out.println("t2 jion");
t2.join();
System.out.println("t2 jioned");
} catch (Exception e)
{
System.out.println("Exception from main");
}
System.out.println(threadName + " end !");
}
}
代码 1 运行结果:
main start.
CustomThread1 start.
CustomThread1 loop at 0
CustomThread1 loop at 1
t1 jion
CustomThread1 loop at 2
CustomThread1 loop at 3
CustomThread1 loop at 4
CustomThread1 end.
t1 jioned
t2 jion
CustomThread2 start.
t1 join again
t1 joined again
CustomThread2 end.
t2 jioned
main end !
代码 2:
仅仅把代码 1 中 JoinTestDemo 类中 main 方法的 try 快修改为如下:
t1.start();
Thread.sleep(3000);
//System.out.println("t1 jion");
//t1.join();
//System.out.println("t1 jioned");
t2.start();
//System.out.println("t2 jion");
//t2.join();
//System.out.println("t2 jioned");
代码 2 运行结果:
main start.
CustomThread1 start.
CustomThread1 loop at 0
CustomThread1 loop at 1
CustomThread1 loop at 2
main end !
CustomThread2 start.
t1 join again
CustomThread1 loop at 3
CustomThread1 loop at 4
CustomThread1 end.
t1 joined again
CustomThread2 end.
参考资料:http://blog.csdn.net/bzwm/article/details/3881392
今天的关于python 并发编程 非阻塞IO模型和python非阻塞多线程的分享已经结束,谢谢您的关注,如果想了解更多关于java 并发编程 - Lock 与 synchronized、Java 并发编程 - 并发难点及解决方法、java 并发编程 - 线程与进程、java 并发编程 - 线程的 join () 示例的相关知识,请在本站进行查询。
本文标签: