线程间通信
一、引言
线程与线程之间不是相互独立的存在,它们彼此之间需要相互通信和协作。最典型的例子就是生产者-消费者问题。下面首先介绍 wait/notify 机制,并对实现该机制的两种方式:synchronized+wait-notify模式和 Lock+Condition 模式进行介绍,作为线程间通信与协作的基础。最后,对 Thread 类中的 join() 方法进行源码分析,并以宿主线程与寄生线程的协作为例进行说明。
二、使用 wait/notify/notifyAll 实现线程间通信的几点重要说明
1、Object是所有类的父类,它有5个方法组成了等待/通知机制的核心:notify()、notifyAll()、wait()、wait(long)和wait(long,int)。在Java中,所有的类都从Object继承而来,因此,所有的类都拥有这些共有方法可供使用。而且,由于他们都被声明为final,因此在子类中不能覆写任何一个方法。
2、在Java中,可以通过配合调用Object对象的 wait() 方法和 notify() 方法或 notifyAll() 方法来实现线程间的通信。上述三个方法均非Thread类中所声明的方法,而是Object类中声明的方法。原因是每个对象都拥有monitor(锁),所以让当前线程等待某个对象的锁,当然应该通过这个对象来操作,而不是用当前线程来操作,因为当前线程可能会等待多个线程的锁,如果通过线程来操作,就非常复杂了。
3、在线程中调用 wait() 方法,将阻塞等待其他线程的通知(其他线程调用 notify() 方法或 notifyAll() 方法),在线程中调用 notify() 方法或 notifyAll() 方法,将通知其他线程从 wait() 方法处返回。
4、wait()、notify()、notifyAll() 使用条件:
必须放在synchronized代码块中;
必须要锁住同一个对象,notify()方法才能唤醒等待的线程;
notify()只会随机的唤醒一个处于wait状态的线程;
notifyall()会全部唤醒所有处于wait线程,争夺到时间片的只有一个;
5、wait()方法
该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait() 之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait() 方法。 进入 wait() 方法后,当前线程释放锁。在从 wait() 返回前,线程与其他线程竞争重新获得锁。如果调用 wait() 时,没有持有适当的锁,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类。
6、notify()
该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用 notify() 时没有持有适当的锁,也会抛出IllegalMonitorStateException。
该方法用来通知那些可能等待该对象的对象锁的其他线程。如果有多个线程等待,则线程规划器会随机挑选出其中一个 wait() 状态的线程来发出通知,并使它等待获取该对象的对象锁(notify后,当前线程不会马上释放该对象锁,wait所在的线程并不能马上获取该对象锁,要等到程序退出synchronized代码块后,当前线程才会释放锁,wait所在的线程也才可以获取该对象锁),但不惊动其他同样在等待被该对象notify的线程们。当第一个获得了该对象锁的 wait 线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,会继续阻塞在wait状态,直到这个对象发出一个notify或notifyAll。这里需要注意:它们等待的是被notify或notifyAll,而不是锁。这与下面的notifyAll()方法执行后的情况不同。
7、notifyAll()
该方法与 notify() 方法的工作方式相同,重要的一点差异是:
notifyAll() 使所有原来在该对象上 wait 的线程统统退出 wait 的状态(即全部被唤醒,不再等待notify或notifyAll,但由于此时还没有获取到该对象锁,因此还不能继续往下执行),变成等待获取该对象上的锁, 一旦该对象锁被释放(notifyAll线程退出调用了notifyAll的synchronized代码块的时候),它们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出synchronized代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。
8、wait(long) 和 wait(long,int)
这两个方法是设置等待超时时间的,后者在超值时间上加上ns,精度也难以达到,因此,该方法很少使用。对于前者,如果在等待的线程接到通知或被中断之前,已经超过了指定的毫秒数,则它通过竞争重新获得锁,并从 wait(long) 返回。另外,需要知道,如果设置了超时时间,当 wait() 返回时,我们不能确定它是因为接到了通知还是因为超时而返回的,因为 wait() 方法不会返回任何相关的信息。但一般可以通过设置标志位来判断,在notify之前改变标志位的值,在 wait() 方法后读取该标志位的值来判断,当然为了保证notify不被遗漏,我们还需要另外一个标志位来循环判断是否调用 wait() 方法。
9、小结
a. wait()、notify() 和 notifyAll()方法是 本地方法,并且为 final 方法,无法被重写;
b. 调用某个对象的 wait() 方法能让 当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁);
c. 调用某个对象的 notify() 方法能够唤醒 一个正在等待这个对象的monitor的线程,如果有多个线程都
在等待这个对象的monitor,则只能唤醒其中一个线程;
d. 调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程。
三、生产者-消费者模型
线程与线程之间相互通信和协作,最经典的例子就是 生产者-消费者模型:
在下面的例子中,虽然两个线程实现了通信,但是凭借 线程B 不断地通过 while语句轮询 来检测某一个条件,这样会导致CPU的浪费。因此,需要一种机制来减少 CPU资源 的浪费,而且还能实现多个线程之间的通信,即 wait/notify 机制 。
//资源类
class MyList {
//临界资源
private volatile List<String> list = new ArrayList<String>();
public void add() {
list.add("abc");
}
public int size() {
return list.size();
}
}
// 线程A
class ThreadA extends Thread {
private MyList list;//临界资源
public ThreadA(MyList list,String name) {
super(name);
this.list = list;
}
@Override
public void run() {
try {
for (int i = 0; i < 3; i++) {
list.add();
System.out.println("添加了" + (i + 1) + "个元素");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//线程B
class ThreadB extends Thread {
private MyList list;//临界资源
public ThreadB(MyList list,String name) {
super(name);
this.list = list;
}
@Override
public void run() {
try {
while (true) { // while 语句轮询
if (list.size() == 2) {
System.out.println("==2了,线程b要退出了!");
throw new InterruptedException();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//测试
public class Test {
public static void main(String[] args) {
MyList service = new MyList();
ThreadA a = new ThreadA(service,"A");
ThreadB b = new ThreadB(service,"B");
a.start();
b.start();
}
}
/* Output(输出结果不唯一): 添加了1个元素 添加了2个元素 ==2了,线程b要退出了! java.lang.InterruptedException at test.ThreadB.run(Test.java:57) 添加了3个元素 *///:~
四、 wait/notify 机制
1、方法调用与线程状态关系
每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了已就绪(将要竞争锁)的线程,阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待CPU的调度;反之,当一个线程被 wait 后,就会进入阻塞队列,等待被唤醒。
2、使用举例
public class Test {
public static Object object = new Object();
public static void main(String[] args) throws InterruptedException {
Thread1 thread1 = new Thread1();
Thread2 thread2 = new Thread2();
thread1.start();
Thread.sleep(2000);//main线程休眠2秒
thread2.start();
}
static class Thread1 extends Thread {
@Override
public void run() {
synchronized (object) {
System.out.println("线程" + Thread.currentThread().getName()
+ "获取到了锁...");
try {
System.out.println("线程" + Thread.currentThread().getName()
+ "阻塞并释放锁...");
object.wait();
} catch (InterruptedException e) {
}
System.out.println("线程" + Thread.currentThread().getName()
+ "执行完成...");
}
}
}
static class Thread2 extends Thread {
@Override
public void run() {
synchronized (object) {
System.out.println("线程" + Thread.currentThread().getName()
+ "获取到了锁...");
object.notify();
System.out.println("线程" + Thread.currentThread().getName()
+ "唤醒了正在wait的线程...");
}
System.out
.println("线程" + Thread.currentThread().getName() + "执行完成...");
}
}
}
/* Output: 线程Thread-0获取到了锁... 线程Thread-0阻塞并释放锁... 线程Thread-1获取到了锁... 线程Thread-1唤醒了正在wait的线程... 线程Thread-1执行完成... 线程Thread-0执行完成... *///:~
3、多个同类型线程的场景(wait 的条件发生变化)
//资源类
class ValueObject {
public static List<String> list = new ArrayList<String>();
}
//元素添加线程
class ThreadAdd extends Thread {
private String lock;
public ThreadAdd(String lock,String name) {
super(name);
this.lock = lock;
}
@Override
public void run() {
synchronized (lock) {
ValueObject.list.add("anyString");
lock.notifyAll(); // 唤醒所有 wait 线程
}
}
}
//元素删除线程
class ThreadSubtract extends Thread {
private String lock;
public ThreadSubtract(String lock,String name) {
super(name);
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (ValueObject.list.size() == 0) {
System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
lock.wait();
System.out.println("wait end ThreadName=" + Thread.currentThread().getName());
}
ValueObject.list.remove(0);
System.out.println("list size=" + ValueObject.list.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//测试类
public class Run {
public static void main(String[] args) throws InterruptedException {
//锁对象
String lock = new String("");
ThreadSubtract subtract1Thread = new ThreadSubtract(lock,"subtract1Thread");
subtract1Thread.start();
ThreadSubtract subtract2Thread = new ThreadSubtract(lock,"subtract2Thread");
subtract2Thread.start();
Thread.sleep(1000);
ThreadAdd addThread = new ThreadAdd(lock,"addThread");
addThread.start();
}
}/* Output: wait begin ThreadName=subtract1Thread wait begin ThreadName=subtract2Thread wait end ThreadName=subtract2Thread list size=0 wait end ThreadName=subtract1Thread Exception in thread "subtract1Thread" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(Unknown Source) at java.util.ArrayList.remove(Unknown Source) at test.ThreadSubtract.run(Run.java:49) *///:~
当 线程subtract1Thread 被唤醒后,将从 wait 处继续执行。但由于 线程subtract2Thread 先获取到锁得到运行,导致 线程subtract1Thread 的 wait 的条件发生变化(不再满足),而 线程subtract1Thread 却毫无所知,导致异常产生。
像这种有多个相同类型的线程场景,为防止 wait 的条件发生变化而导致的线程异常终止,我们在阻塞线程被唤醒的同时还必须对 wait 的条件进行额外的检查,如下所示:
//元素删除线程
class ThreadSubtract extends Thread {
private String lock;
public ThreadSubtract(String lock,String name) {
super(name);
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
while (ValueObject.list.size() == 0) { //将 if 改成 while
System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
lock.wait();
System.out.println("wait end ThreadName=" + Thread.currentThread().getName());
}
ValueObject.list.remove(0);
System.out.println("list size=" + ValueObject.list.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}/* Output: wait begin ThreadName=subtract1Thread wait begin ThreadName=subtract2Thread wait end ThreadName=subtract2Thread list size=0 wait end ThreadName=subtract1Thread wait begin ThreadName=subtract1Thread *///:~
只需将 线程类ThreadSubtract 的 run()方法中的 if 条件 改为 while 条件 即可。
四、 Condition
Condition 是在java 1.5中出现的,它用来替代传统的 Object 的wait()/notify()实现线程间的协作,它的使用依赖于 Lock。Condition、Lock 和 Thread 三者之间的关系如下图所示。相比使用Object的wait()/notify(),使用Condition的await()/signal()这种方式能够更加安全和高效地实现线程间协作。Condition是个接口,基本的方法就是await()和signal()方法。Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 。 必须要注意的是,Condition 的 await()/signal() 使用都必须在lock保护之内,也就是说,必须在lock.lock()和lock.unlock之间才可以使用。事实上,Conditon的await()/signal() 与 Object的wait()/notify() 有着天然的对应关系:
Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
使用Condition往往比使用传统的通知等待机制(Object的wait()/notify())要更灵活、高效,例如,我们可以使用多个Condition实现通知部分线程:
// 线程 A
class ThreadA extends Thread {
private MyService service;
public ThreadA(MyService service) {
super();
this.service = service;
}
@Override
public void run() {
service.awaitA();
}
}
// 线程 B
class ThreadB extends Thread {
private MyService service;
public ThreadB(MyService service) {
super();
this.service = service;
}
@Override
public void run() {
service.awaitB();
}
}
class MyService {
private Lock lock = new ReentrantLock();
// 使用多个Condition实现通知部分线程
public Condition conditionA = lock.newCondition();
public Condition conditionB = lock.newCondition();
public void awaitA() {
lock.lock();
try {
System.out.println("begin awaitA时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionA.await();
System.out.println(" end awaitA时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void awaitB() {
lock.lock();
try {
System.out.println("begin awaitB时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionB.await();
System.out.println(" end awaitB时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void signalAll_A() {
try {
lock.lock();
System.out.println(" signalAll_A时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionA.signalAll();
} finally {
lock.unlock();
}
}
public void signalAll_B() {
try {
lock.lock();
System.out.println(" signalAll_B时间为" + System.currentTimeMillis()
+ " ThreadName=" + Thread.currentThread().getName());
conditionB.signalAll();
} finally {
lock.unlock();
}
}
}
// 测试
public class Run {
public static void main(String[] args) throws InterruptedException {
MyService service = new MyService();
ThreadA a = new ThreadA(service);
a.setName("A");
a.start();
ThreadB b = new ThreadB(service);
b.setName("B");
b.start();
Thread.sleep(3000);
service.signalAll_A();
}
}
输出结果如下图所示,我们可以看到只有线程A被唤醒,线程B仍然阻塞。
实际上,Condition 实现了一种分组机制,将所有对临界资源进行访问的线程进行分组,以便实现线程间更精细化的协作,例如通知部分线程。我们可以从上面例子的输出结果看出,只有conditionA范围内的线程A被唤醒,而conditionB范围内的线程B仍然阻塞。
六、 生产者-消费者模型
等待/通知机制 最经典的应用就是 生产者-消费者模型。下面以多生产者-多消费者问题为背景,分别运用两种模式 synchronized+wait-notify 模式和 Lock+Condition 模式实现 wait-notify 机制。
传统实现方式
//资源类
class MyStack {
// 共享队列
private List list = new ArrayList();
// 生产
public synchronized void push() {
try {
while (list.size() == 1) { // 多个生产者
System.out.println("队列已满,线程 "
+ Thread.currentThread().getName() + " 呈wait状态...");
this.wait();
}
list.add("anyString=" + Math.random());
System.out.println("线程 " + Thread.currentThread().getName()
+ " 生产了,队列已满...");
this.notifyAll(); // 防止生产者仅通知生产者
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 消费
public synchronized String pop() {
String returnValue = "";
try {
while (list.size() == 0) { // 多个消费者
System.out.println("队列已空,线程 "
+ Thread.currentThread().getName() + " 呈wait状态...");
this.wait();
}
returnValue = "" + list.get(0);
list.remove(0);
System.out.println("线程 " + Thread.currentThread().getName()
+ " 消费了,队列已空...");
this.notifyAll(); // 防止消费者仅通知消费者
} catch (InterruptedException e) {
e.printStackTrace();
}
return returnValue;
}
}
//生产者
class P_Thread extends Thread {
private MyStack myStack;
public P_Thread(MyStack myStack,String name) {
super(name);
this.myStack = myStack;
}
public void pushService() {
myStack.push();
}
@Override
public void run() {
while (true) {
myStack.push();
}
}
}
//消费者
class C_Thread extends Thread {
private MyStack myStack;
public C_Thread(MyStack myStack,String name) {
super(name);
this.myStack = myStack;
}
@Override
public void run() {
while (true) {
myStack.pop();
}
}
}
//测试类
public class Run {
public static void main(String[] args) throws InterruptedException {
MyStack myStack = new MyStack();
P_Thread pThread1 = new P_Thread(myStack, "P1");
P_Thread pThread2 = new P_Thread(myStack, "P2");
P_Thread pThread3 = new P_Thread(myStack, "P3");
P_Thread pThread4 = new P_Thread(myStack, "P4");
P_Thread pThread5 = new P_Thread(myStack, "P5");
P_Thread pThread6 = new P_Thread(myStack, "P6");
pThread1.start();
pThread2.start();
pThread3.start();
pThread4.start();
pThread5.start();
pThread6.start();
C_Thread cThread1 = new C_Thread(myStack, "C1");
C_Thread cThread2 = new C_Thread(myStack, "C2");
C_Thread cThread3 = new C_Thread(myStack, "C3");
C_Thread cThread4 = new C_Thread(myStack, "C4");
C_Thread cThread5 = new C_Thread(myStack, "C5");
C_Thread cThread6 = new C_Thread(myStack, "C6");
C_Thread cThread7 = new C_Thread(myStack, "C7");
C_Thread cThread8 = new C_Thread(myStack, "C8");
cThread1.start();
cThread2.start();
cThread3.start();
cThread4.start();
cThread5.start();
cThread6.start();
cThread7.start();
cThread8.start();
}
}/* Output: 线程 P1 生产了,队列已满... 队列已满,线程 P1 呈wait状态... 线程 C5 消费了,队列已空... 队列已空,线程 C5 呈wait状态... 队列已空,线程 C8 呈wait状态... 队列已空,线程 C2 呈wait状态... 队列已空,线程 C7 呈wait状态... 队列已空,线程 C4 呈wait状态... 队列已空,线程 C6 呈wait状态... 队列已空,线程 C3 呈wait状态... 队列已空,线程 C1 呈wait状态... 线程 P6 生产了,队列已满... 队列已满,线程 P6 呈wait状态... 队列已满,线程 P5 呈wait状态... 队列已满,线程 P4 呈wait状态... ... *///:~
对于生产者-消费者问题,有两个要点需要注意:
在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止wait的条件发生变化而导致线程异常终止,我们在阻塞线程被唤醒的同时还必须对wait的条件进行额外的检查,即 使用 while 循环代替 if 条件;
在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止生产者(消费者)唤醒生产者(消费者),保证生产者和消费者互相唤醒,需要 使用 notify 替代 notifyAll.
使用 Condition 实现方式
// 线程A
class MyThreadA extends Thread {
private MyService myService;
public MyThreadA(MyService myService, String name) {
super(name);
this.myService = myService;
}
@Override
public void run() {
while (true)
myService.set();
}
}
// 线程B
class MyThreadB extends Thread {
private MyService myService;
public MyThreadB(MyService myService, String name) {
super(name);
this.myService = myService;
}
@Override
public void run() {
while (true)
myService.get();
}
}
// 资源类
class MyService {
private ReentrantLock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition(); // 生产线程
private Condition conditionB = lock.newCondition(); // 消费线程
private boolean hasValue = false;
public void set() {
try {
lock.lock();
while (hasValue == true) {
System.out.println("[生产线程] " + " 线程"
+ Thread.currentThread().getName() + " await...");
conditionA.await();
}
System.out.println("[生产中] " + " 线程" + Thread.currentThread().getName() + " 生产★");
Thread.sleep(1000);
hasValue = true;
System.out.println("线程" + Thread.currentThread().getName() + " 生产完毕...");
System.out.println("[唤醒所有消费线程] " + " 线程"
+ Thread.currentThread().getName() + "...");
conditionB.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get() {
try {
lock.lock();
while (hasValue == false) {
System.out.println("[消费线程] " + " 线程"
+ Thread.currentThread().getName() + " await...");
conditionB.await();
}
System.out.println("[消费中] " + " 线程"
+ Thread.currentThread().getName() + " 消费☆");
Thread.sleep(1000);
System.out.println("线程" + Thread.currentThread().getName() + " 消费完毕...");
hasValue = false;
System.out.println("[唤醒所有生产线程] " + " 线程"
+ Thread.currentThread().getName() + "...");
conditionA.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Run {
public static void main(String[] args) throws InterruptedException {
MyService service = new MyService();
MyThreadA[] threadA = new MyThreadA[10];
MyThreadB[] threadB = new MyThreadB[10];
for (int i = 0; i < 10; i++) {
threadA[i] = new MyThreadA(service, "ThreadA-" + i);
threadB[i] = new MyThreadB(service, "ThreadB-" + i);
threadA[i].start();
threadB[i].start();
}
}
}/* Output: [生产中] 线程ThreadA-0 生产★ 线程ThreadA-0 生产完毕... [唤醒所有消费线程] 线程ThreadA-0... [生产线程] 线程ThreadA-0 await... [消费中] 线程ThreadB-0 消费☆ 线程ThreadB-0 消费完毕... [唤醒所有生产线程] 线程ThreadB-0... [消费线程] 线程ThreadB-0 await... [生产中] 线程ThreadA-1 生产★ 线程ThreadA-1 生产完毕... [唤醒所有消费线程] 线程ThreadA-1... [生产线程] 线程ThreadA-1 await... [消费中] 线程ThreadB-1 消费☆ 线程ThreadB-1 消费完毕... [唤醒所有生产线程] 线程ThreadB-1... [消费线程] 线程ThreadB-1 await... [生产中] 线程ThreadA-2 生产★ 线程ThreadA-2 生产完毕... [唤醒所有消费线程] 线程ThreadA-2... ... *///:~
还没有评论,来说两句吧...