线程间通信

墨蓝 2022-04-11 13:56 397阅读 0赞

一、引言

线程与线程之间不是相互独立的存在,它们彼此之间需要相互通信和协作。最典型的例子就是生产者-消费者问题。下面首先介绍 wait/notify 机制,并对实现该机制的两种方式:synchronized+wait-notify模式和 Lock+Condition 模式进行介绍,作为线程间通信与协作的基础。最后,对 Thread 类中的 join() 方法进行源码分析,并以宿主线程与寄生线程的协作为例进行说明。

二、使用 wait/notify/notifyAll 实现线程间通信的几点重要说明

1、Object是所有类的父类,它有5个方法组成了等待/通知机制的核心:notify()、notifyAll()、wait()、wait(long)和wait(long,int)。在Java中,所有的类都从Object继承而来,因此,所有的类都拥有这些共有方法可供使用。而且,由于他们都被声明为final,因此在子类中不能覆写任何一个方法。

2、在Java中,可以通过配合调用Object对象的 wait() 方法和 notify() 方法或 notifyAll() 方法来实现线程间的通信。上述三个方法均非Thread类中所声明的方法,而是Object类中声明的方法。原因是每个对象都拥有monitor(锁),所以让当前线程等待某个对象的锁,当然应该通过这个对象来操作,而不是用当前线程来操作,因为当前线程可能会等待多个线程的锁,如果通过线程来操作,就非常复杂了。

3、在线程中调用 wait() 方法,将阻塞等待其他线程的通知(其他线程调用 notify() 方法或 notifyAll() 方法),在线程中调用 notify() 方法或 notifyAll() 方法,将通知其他线程从 wait() 方法处返回。

4、wait()、notify()、notifyAll() 使用条件:
  1. 必须放在synchronized代码块中;
  2. 必须要锁住同一个对象,notify()方法才能唤醒等待的线程;
  3. notify()只会随机的唤醒一个处于wait状态的线程;
  4. notifyall()会全部唤醒所有处于wait线程,争夺到时间片的只有一个;
5、wait()方法

该方法用来将当前线程置入休眠状态,直到接到通知或被中断为止。在调用 wait() 之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait() 方法。 进入 wait() 方法后,当前线程释放锁。在从 wait() 返回前,线程与其他线程竞争重新获得锁。如果调用 wait() 时,没有持有适当的锁,则抛出IllegalMonitorStateException,它是RuntimeException的一个子类。

6、notify()

该方法也要在同步方法或同步块中调用,即在调用前,线程也必须要获得该对象的对象级别锁,如果调用 notify() 时没有持有适当的锁,也会抛出IllegalMonitorStateException。

该方法用来通知那些可能等待该对象的对象锁的其他线程。如果有多个线程等待,则线程规划器会随机挑选出其中一个 wait() 状态的线程来发出通知,并使它等待获取该对象的对象锁(notify后,当前线程不会马上释放该对象锁,wait所在的线程并不能马上获取该对象锁,要等到程序退出synchronized代码块后,当前线程才会释放锁,wait所在的线程也才可以获取该对象锁),但不惊动其他同样在等待被该对象notify的线程们。当第一个获得了该对象锁的 wait 线程运行完毕以后,它会释放掉该对象锁,此时如果该对象没有再次使用notify语句,则即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,会继续阻塞在wait状态,直到这个对象发出一个notify或notifyAll。这里需要注意:它们等待的是被notify或notifyAll,而不是锁。这与下面的notifyAll()方法执行后的情况不同。

7、notifyAll()

该方法与 notify() 方法的工作方式相同,重要的一点差异是:

notifyAll() 使所有原来在该对象上 wait 的线程统统退出 wait 的状态(即全部被唤醒,不再等待notify或notifyAll,但由于此时还没有获取到该对象锁,因此还不能继续往下执行),变成等待获取该对象上的锁, 一旦该对象锁被释放(notifyAll线程退出调用了notifyAll的synchronized代码块的时候),它们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出synchronized代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。

8、wait(long) 和 wait(long,int)

这两个方法是设置等待超时时间的,后者在超值时间上加上ns,精度也难以达到,因此,该方法很少使用。对于前者,如果在等待的线程接到通知或被中断之前,已经超过了指定的毫秒数,则它通过竞争重新获得锁,并从 wait(long) 返回。另外,需要知道,如果设置了超时时间,当 wait() 返回时,我们不能确定它是因为接到了通知还是因为超时而返回的,因为 wait() 方法不会返回任何相关的信息。但一般可以通过设置标志位来判断,在notify之前改变标志位的值,在 wait() 方法后读取该标志位的值来判断,当然为了保证notify不被遗漏,我们还需要另外一个标志位来循环判断是否调用 wait() 方法。

9、小结
  1. a. wait()、notify() notifyAll()方法是 本地方法,并且为 final 方法,无法被重写;
  2. b. 调用某个对象的 wait() 方法能让 当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁);
  3. c. 调用某个对象的 notify() 方法能够唤醒 一个正在等待这个对象的monitor的线程,如果有多个线程都
  4. 在等待这个对象的monitor,则只能唤醒其中一个线程;
  5. d. 调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程。

三、生产者-消费者模型

线程与线程之间相互通信和协作,最经典的例子就是 生产者-消费者模型:
  
在下面的例子中,虽然两个线程实现了通信,但是凭借 线程B 不断地通过 while语句轮询 来检测某一个条件,这样会导致CPU的浪费。因此,需要一种机制来减少 CPU资源 的浪费,而且还能实现多个线程之间的通信,即 wait/notify 机制 。

  1. //资源类
  2. class MyList {
  3. //临界资源
  4. private volatile List<String> list = new ArrayList<String>();
  5. public void add() {
  6. list.add("abc");
  7. }
  8. public int size() {
  9. return list.size();
  10. }
  11. }
  12. // 线程A
  13. class ThreadA extends Thread {
  14. private MyList list;//临界资源
  15. public ThreadA(MyList list,String name) {
  16. super(name);
  17. this.list = list;
  18. }
  19. @Override
  20. public void run() {
  21. try {
  22. for (int i = 0; i < 3; i++) {
  23. list.add();
  24. System.out.println("添加了" + (i + 1) + "个元素");
  25. Thread.sleep(1000);
  26. }
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. //线程B
  33. class ThreadB extends Thread {
  34. private MyList list;//临界资源
  35. public ThreadB(MyList list,String name) {
  36. super(name);
  37. this.list = list;
  38. }
  39. @Override
  40. public void run() {
  41. try {
  42. while (true) { // while 语句轮询
  43. if (list.size() == 2) {
  44. System.out.println("==2了,线程b要退出了!");
  45. throw new InterruptedException();
  46. }
  47. }
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }
  53. //测试
  54. public class Test {
  55. public static void main(String[] args) {
  56. MyList service = new MyList();
  57. ThreadA a = new ThreadA(service,"A");
  58. ThreadB b = new ThreadB(service,"B");
  59. a.start();
  60. b.start();
  61. }
  62. }
  63. /* Output(输出结果不唯一): 添加了1个元素 添加了2个元素 ==2了,线程b要退出了! java.lang.InterruptedException at test.ThreadB.run(Test.java:57) 添加了3个元素 *///:~

四、 wait/notify 机制

1、方法调用与线程状态关系

每个锁对象都有两个队列,一个是就绪队列,一个是阻塞队列。就绪队列存储了已就绪(将要竞争锁)的线程,阻塞队列存储了被阻塞的线程。当一个阻塞线程被唤醒后,才会进入就绪队列,进而等待CPU的调度;反之,当一个线程被 wait 后,就会进入阻塞队列,等待被唤醒。
在这里插入图片描述

2、使用举例

  1. public class Test {
  2. public static Object object = new Object();
  3. public static void main(String[] args) throws InterruptedException {
  4. Thread1 thread1 = new Thread1();
  5. Thread2 thread2 = new Thread2();
  6. thread1.start();
  7. Thread.sleep(2000);//main线程休眠2秒
  8. thread2.start();
  9. }
  10. static class Thread1 extends Thread {
  11. @Override
  12. public void run() {
  13. synchronized (object) {
  14. System.out.println("线程" + Thread.currentThread().getName()
  15. + "获取到了锁...");
  16. try {
  17. System.out.println("线程" + Thread.currentThread().getName()
  18. + "阻塞并释放锁...");
  19. object.wait();
  20. } catch (InterruptedException e) {
  21. }
  22. System.out.println("线程" + Thread.currentThread().getName()
  23. + "执行完成...");
  24. }
  25. }
  26. }
  27. static class Thread2 extends Thread {
  28. @Override
  29. public void run() {
  30. synchronized (object) {
  31. System.out.println("线程" + Thread.currentThread().getName()
  32. + "获取到了锁...");
  33. object.notify();
  34. System.out.println("线程" + Thread.currentThread().getName()
  35. + "唤醒了正在wait的线程...");
  36. }
  37. System.out
  38. .println("线程" + Thread.currentThread().getName() + "执行完成...");
  39. }
  40. }
  41. }
  42. /* Output: 线程Thread-0获取到了锁... 线程Thread-0阻塞并释放锁... 线程Thread-1获取到了锁... 线程Thread-1唤醒了正在wait的线程... 线程Thread-1执行完成... 线程Thread-0执行完成... *///:~

3、多个同类型线程的场景(wait 的条件发生变化)

  1. //资源类
  2. class ValueObject {
  3. public static List<String> list = new ArrayList<String>();
  4. }
  5. //元素添加线程
  6. class ThreadAdd extends Thread {
  7. private String lock;
  8. public ThreadAdd(String lock,String name) {
  9. super(name);
  10. this.lock = lock;
  11. }
  12. @Override
  13. public void run() {
  14. synchronized (lock) {
  15. ValueObject.list.add("anyString");
  16. lock.notifyAll(); // 唤醒所有 wait 线程
  17. }
  18. }
  19. }
  20. //元素删除线程
  21. class ThreadSubtract extends Thread {
  22. private String lock;
  23. public ThreadSubtract(String lock,String name) {
  24. super(name);
  25. this.lock = lock;
  26. }
  27. @Override
  28. public void run() {
  29. try {
  30. synchronized (lock) {
  31. if (ValueObject.list.size() == 0) {
  32. System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
  33. lock.wait();
  34. System.out.println("wait end ThreadName=" + Thread.currentThread().getName());
  35. }
  36. ValueObject.list.remove(0);
  37. System.out.println("list size=" + ValueObject.list.size());
  38. }
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. //测试类
  45. public class Run {
  46. public static void main(String[] args) throws InterruptedException {
  47. //锁对象
  48. String lock = new String("");
  49. ThreadSubtract subtract1Thread = new ThreadSubtract(lock,"subtract1Thread");
  50. subtract1Thread.start();
  51. ThreadSubtract subtract2Thread = new ThreadSubtract(lock,"subtract2Thread");
  52. subtract2Thread.start();
  53. Thread.sleep(1000);
  54. ThreadAdd addThread = new ThreadAdd(lock,"addThread");
  55. addThread.start();
  56. }
  57. }/* Output: wait begin ThreadName=subtract1Thread wait begin ThreadName=subtract2Thread wait end ThreadName=subtract2Thread list size=0 wait end ThreadName=subtract1Thread Exception in thread "subtract1Thread" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(Unknown Source) at java.util.ArrayList.remove(Unknown Source) at test.ThreadSubtract.run(Run.java:49) *///:~

当 线程subtract1Thread 被唤醒后,将从 wait 处继续执行。但由于 线程subtract2Thread 先获取到锁得到运行,导致 线程subtract1Thread 的 wait 的条件发生变化(不再满足),而 线程subtract1Thread 却毫无所知,导致异常产生。
像这种有多个相同类型的线程场景,为防止 wait 的条件发生变化而导致的线程异常终止,我们在阻塞线程被唤醒的同时还必须对 wait 的条件进行额外的检查,如下所示:

  1. //元素删除线程
  2. class ThreadSubtract extends Thread {
  3. private String lock;
  4. public ThreadSubtract(String lock,String name) {
  5. super(name);
  6. this.lock = lock;
  7. }
  8. @Override
  9. public void run() {
  10. try {
  11. synchronized (lock) {
  12. while (ValueObject.list.size() == 0) { //将 if 改成 while
  13. System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
  14. lock.wait();
  15. System.out.println("wait end ThreadName=" + Thread.currentThread().getName());
  16. }
  17. ValueObject.list.remove(0);
  18. System.out.println("list size=" + ValueObject.list.size());
  19. }
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }/* Output: wait begin ThreadName=subtract1Thread wait begin ThreadName=subtract2Thread wait end ThreadName=subtract2Thread list size=0 wait end ThreadName=subtract1Thread wait begin ThreadName=subtract1Thread *///:~

只需将 线程类ThreadSubtract 的 run()方法中的 if 条件 改为 while 条件 即可。

四、 Condition

Condition 是在java 1.5中出现的,它用来替代传统的 Object 的wait()/notify()实现线程间的协作,它的使用依赖于 Lock。Condition、Lock 和 Thread 三者之间的关系如下图所示。相比使用Object的wait()/notify(),使用Condition的await()/signal()这种方式能够更加安全和高效地实现线程间协作。Condition是个接口,基本的方法就是await()和signal()方法。Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition() 。 必须要注意的是,Condition 的 await()/signal() 使用都必须在lock保护之内,也就是说,必须在lock.lock()和lock.unlock之间才可以使用。事实上,Conditon的await()/signal() 与 Object的wait()/notify() 有着天然的对应关系:

Conditon中的await()对应Object的wait();
Condition中的signal()对应Object的notify();
Condition中的signalAll()对应Object的notifyAll()。
在这里插入图片描述

使用Condition往往比使用传统的通知等待机制(Object的wait()/notify())要更灵活、高效,例如,我们可以使用多个Condition实现通知部分线程:

  1. // 线程 A
  2. class ThreadA extends Thread {
  3. private MyService service;
  4. public ThreadA(MyService service) {
  5. super();
  6. this.service = service;
  7. }
  8. @Override
  9. public void run() {
  10. service.awaitA();
  11. }
  12. }
  13. // 线程 B
  14. class ThreadB extends Thread {
  15. private MyService service;
  16. public ThreadB(MyService service) {
  17. super();
  18. this.service = service;
  19. }
  20. @Override
  21. public void run() {
  22. service.awaitB();
  23. }
  24. }
  25. class MyService {
  26. private Lock lock = new ReentrantLock();
  27. // 使用多个Condition实现通知部分线程
  28. public Condition conditionA = lock.newCondition();
  29. public Condition conditionB = lock.newCondition();
  30. public void awaitA() {
  31. lock.lock();
  32. try {
  33. System.out.println("begin awaitA时间为" + System.currentTimeMillis()
  34. + " ThreadName=" + Thread.currentThread().getName());
  35. conditionA.await();
  36. System.out.println(" end awaitA时间为" + System.currentTimeMillis()
  37. + " ThreadName=" + Thread.currentThread().getName());
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. public void awaitB() {
  45. lock.lock();
  46. try {
  47. System.out.println("begin awaitB时间为" + System.currentTimeMillis()
  48. + " ThreadName=" + Thread.currentThread().getName());
  49. conditionB.await();
  50. System.out.println(" end awaitB时间为" + System.currentTimeMillis()
  51. + " ThreadName=" + Thread.currentThread().getName());
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. } finally {
  55. lock.unlock();
  56. }
  57. }
  58. public void signalAll_A() {
  59. try {
  60. lock.lock();
  61. System.out.println(" signalAll_A时间为" + System.currentTimeMillis()
  62. + " ThreadName=" + Thread.currentThread().getName());
  63. conditionA.signalAll();
  64. } finally {
  65. lock.unlock();
  66. }
  67. }
  68. public void signalAll_B() {
  69. try {
  70. lock.lock();
  71. System.out.println(" signalAll_B时间为" + System.currentTimeMillis()
  72. + " ThreadName=" + Thread.currentThread().getName());
  73. conditionB.signalAll();
  74. } finally {
  75. lock.unlock();
  76. }
  77. }
  78. }
  79. // 测试
  80. public class Run {
  81. public static void main(String[] args) throws InterruptedException {
  82. MyService service = new MyService();
  83. ThreadA a = new ThreadA(service);
  84. a.setName("A");
  85. a.start();
  86. ThreadB b = new ThreadB(service);
  87. b.setName("B");
  88. b.start();
  89. Thread.sleep(3000);
  90. service.signalAll_A();
  91. }
  92. }

输出结果如下图所示,我们可以看到只有线程A被唤醒,线程B仍然阻塞。
在这里插入图片描述
实际上,Condition 实现了一种分组机制,将所有对临界资源进行访问的线程进行分组,以便实现线程间更精细化的协作,例如通知部分线程。我们可以从上面例子的输出结果看出,只有conditionA范围内的线程A被唤醒,而conditionB范围内的线程B仍然阻塞。

六、 生产者-消费者模型

等待/通知机制 最经典的应用就是 生产者-消费者模型。下面以多生产者-多消费者问题为背景,分别运用两种模式 synchronized+wait-notify 模式和 Lock+Condition 模式实现 wait-notify 机制。

传统实现方式
  1. //资源类
  2. class MyStack {
  3. // 共享队列
  4. private List list = new ArrayList();
  5. // 生产
  6. public synchronized void push() {
  7. try {
  8. while (list.size() == 1) { // 多个生产者
  9. System.out.println("队列已满,线程 "
  10. + Thread.currentThread().getName() + " 呈wait状态...");
  11. this.wait();
  12. }
  13. list.add("anyString=" + Math.random());
  14. System.out.println("线程 " + Thread.currentThread().getName()
  15. + " 生产了,队列已满...");
  16. this.notifyAll(); // 防止生产者仅通知生产者
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. // 消费
  22. public synchronized String pop() {
  23. String returnValue = "";
  24. try {
  25. while (list.size() == 0) { // 多个消费者
  26. System.out.println("队列已空,线程 "
  27. + Thread.currentThread().getName() + " 呈wait状态...");
  28. this.wait();
  29. }
  30. returnValue = "" + list.get(0);
  31. list.remove(0);
  32. System.out.println("线程 " + Thread.currentThread().getName()
  33. + " 消费了,队列已空...");
  34. this.notifyAll(); // 防止消费者仅通知消费者
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. return returnValue;
  39. }
  40. }
  41. //生产者
  42. class P_Thread extends Thread {
  43. private MyStack myStack;
  44. public P_Thread(MyStack myStack,String name) {
  45. super(name);
  46. this.myStack = myStack;
  47. }
  48. public void pushService() {
  49. myStack.push();
  50. }
  51. @Override
  52. public void run() {
  53. while (true) {
  54. myStack.push();
  55. }
  56. }
  57. }
  58. //消费者
  59. class C_Thread extends Thread {
  60. private MyStack myStack;
  61. public C_Thread(MyStack myStack,String name) {
  62. super(name);
  63. this.myStack = myStack;
  64. }
  65. @Override
  66. public void run() {
  67. while (true) {
  68. myStack.pop();
  69. }
  70. }
  71. }
  72. //测试类
  73. public class Run {
  74. public static void main(String[] args) throws InterruptedException {
  75. MyStack myStack = new MyStack();
  76. P_Thread pThread1 = new P_Thread(myStack, "P1");
  77. P_Thread pThread2 = new P_Thread(myStack, "P2");
  78. P_Thread pThread3 = new P_Thread(myStack, "P3");
  79. P_Thread pThread4 = new P_Thread(myStack, "P4");
  80. P_Thread pThread5 = new P_Thread(myStack, "P5");
  81. P_Thread pThread6 = new P_Thread(myStack, "P6");
  82. pThread1.start();
  83. pThread2.start();
  84. pThread3.start();
  85. pThread4.start();
  86. pThread5.start();
  87. pThread6.start();
  88. C_Thread cThread1 = new C_Thread(myStack, "C1");
  89. C_Thread cThread2 = new C_Thread(myStack, "C2");
  90. C_Thread cThread3 = new C_Thread(myStack, "C3");
  91. C_Thread cThread4 = new C_Thread(myStack, "C4");
  92. C_Thread cThread5 = new C_Thread(myStack, "C5");
  93. C_Thread cThread6 = new C_Thread(myStack, "C6");
  94. C_Thread cThread7 = new C_Thread(myStack, "C7");
  95. C_Thread cThread8 = new C_Thread(myStack, "C8");
  96. cThread1.start();
  97. cThread2.start();
  98. cThread3.start();
  99. cThread4.start();
  100. cThread5.start();
  101. cThread6.start();
  102. cThread7.start();
  103. cThread8.start();
  104. }
  105. }/* Output: 线程 P1 生产了,队列已满... 队列已满,线程 P1 呈wait状态... 线程 C5 消费了,队列已空... 队列已空,线程 C5 呈wait状态... 队列已空,线程 C8 呈wait状态... 队列已空,线程 C2 呈wait状态... 队列已空,线程 C7 呈wait状态... 队列已空,线程 C4 呈wait状态... 队列已空,线程 C6 呈wait状态... 队列已空,线程 C3 呈wait状态... 队列已空,线程 C1 呈wait状态... 线程 P6 生产了,队列已满... 队列已满,线程 P6 呈wait状态... 队列已满,线程 P5 呈wait状态... 队列已满,线程 P4 呈wait状态... ... *///:~

对于生产者-消费者问题,有两个要点需要注意:

在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止wait的条件发生变化而导致线程异常终止,我们在阻塞线程被唤醒的同时还必须对wait的条件进行额外的检查,即 使用 while 循环代替 if 条件;

在多个同类型线程(多个生产者线程或者消费者线程)的场景中,为防止生产者(消费者)唤醒生产者(消费者),保证生产者和消费者互相唤醒,需要 使用 notify 替代 notifyAll.

使用 Condition 实现方式
  1. // 线程A
  2. class MyThreadA extends Thread {
  3. private MyService myService;
  4. public MyThreadA(MyService myService, String name) {
  5. super(name);
  6. this.myService = myService;
  7. }
  8. @Override
  9. public void run() {
  10. while (true)
  11. myService.set();
  12. }
  13. }
  14. // 线程B
  15. class MyThreadB extends Thread {
  16. private MyService myService;
  17. public MyThreadB(MyService myService, String name) {
  18. super(name);
  19. this.myService = myService;
  20. }
  21. @Override
  22. public void run() {
  23. while (true)
  24. myService.get();
  25. }
  26. }
  27. // 资源类
  28. class MyService {
  29. private ReentrantLock lock = new ReentrantLock();
  30. private Condition conditionA = lock.newCondition(); // 生产线程
  31. private Condition conditionB = lock.newCondition(); // 消费线程
  32. private boolean hasValue = false;
  33. public void set() {
  34. try {
  35. lock.lock();
  36. while (hasValue == true) {
  37. System.out.println("[生产线程] " + " 线程"
  38. + Thread.currentThread().getName() + " await...");
  39. conditionA.await();
  40. }
  41. System.out.println("[生产中] " + " 线程" + Thread.currentThread().getName() + " 生产★");
  42. Thread.sleep(1000);
  43. hasValue = true;
  44. System.out.println("线程" + Thread.currentThread().getName() + " 生产完毕...");
  45. System.out.println("[唤醒所有消费线程] " + " 线程"
  46. + Thread.currentThread().getName() + "...");
  47. conditionB.signalAll();
  48. } catch (InterruptedException e) {
  49. e.printStackTrace();
  50. } finally {
  51. lock.unlock();
  52. }
  53. }
  54. public void get() {
  55. try {
  56. lock.lock();
  57. while (hasValue == false) {
  58. System.out.println("[消费线程] " + " 线程"
  59. + Thread.currentThread().getName() + " await...");
  60. conditionB.await();
  61. }
  62. System.out.println("[消费中] " + " 线程"
  63. + Thread.currentThread().getName() + " 消费☆");
  64. Thread.sleep(1000);
  65. System.out.println("线程" + Thread.currentThread().getName() + " 消费完毕...");
  66. hasValue = false;
  67. System.out.println("[唤醒所有生产线程] " + " 线程"
  68. + Thread.currentThread().getName() + "...");
  69. conditionA.signalAll();
  70. } catch (InterruptedException e) {
  71. e.printStackTrace();
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. }
  77. public class Run {
  78. public static void main(String[] args) throws InterruptedException {
  79. MyService service = new MyService();
  80. MyThreadA[] threadA = new MyThreadA[10];
  81. MyThreadB[] threadB = new MyThreadB[10];
  82. for (int i = 0; i < 10; i++) {
  83. threadA[i] = new MyThreadA(service, "ThreadA-" + i);
  84. threadB[i] = new MyThreadB(service, "ThreadB-" + i);
  85. threadA[i].start();
  86. threadB[i].start();
  87. }
  88. }
  89. }/* Output: [生产中] 线程ThreadA-0 生产★ 线程ThreadA-0 生产完毕... [唤醒所有消费线程] 线程ThreadA-0... [生产线程] 线程ThreadA-0 await... [消费中] 线程ThreadB-0 消费☆ 线程ThreadB-0 消费完毕... [唤醒所有生产线程] 线程ThreadB-0... [消费线程] 线程ThreadB-0 await... [生产中] 线程ThreadA-1 生产★ 线程ThreadA-1 生产完毕... [唤醒所有消费线程] 线程ThreadA-1... [生产线程] 线程ThreadA-1 await... [消费中] 线程ThreadB-1 消费☆ 线程ThreadB-1 消费完毕... [唤醒所有生产线程] 线程ThreadB-1... [消费线程] 线程ThreadB-1 await... [生产中] 线程ThreadA-2 生产★ 线程ThreadA-2 生产完毕... [唤醒所有消费线程] 线程ThreadA-2... ... *///:~

发表评论

表情:
评论列表 (有 0 条评论,397人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线通信ThreadLocal

    ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这 个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal

    相关 线通信

    为什么需要线程通讯 线程是操作系统调度的最小单位,有自己的栈空间,可以按照既定的代码逐步的执行,但是如果每个线程间都孤立的运行,那就会造资源浪费。所以在现实中,我们需要这

    相关 Java线通信

    上述例题无条件的阻塞了其他线程异步访问某个方法。Java对象中隐式管程的应用是很强大的,但是你可以通过进程间通信达到更微妙的境界。这在Java中是尤为简单的。 像前面

    相关 线通信

    线程和线程之间不是独立的个体,它们彼此之间可以互相通信和协作。 线程通信就是在线程之间传递信息,保证他们能够协同工作。在线程间进行通信后,系统之间的交互性会更强大,在大大提高

    相关 线通信

    一、引言 线程与线程之间不是相互独立的存在,它们彼此之间需要相互通信和协作。最典型的例子就是生产者-消费者问题。下面首先介绍 wait/notify 机制,并对实现该机制

    相关 线通信

    注意: 必须在同步方法中使用wait和notify方法,因为执行wait和notify的前提条件是必须持有同步方法(或块)的monitor的所有权,否则将会抛出异常