多线程学习--线程通信

客官°小女子只卖身不卖艺 2024-02-19 14:35 164阅读 0赞

线程彼此通信会涉及到等待/通知,其最经典的案例就是“生产者/消费者模式”。原理都是基于wait/notify的。

  1. 一生产一消费:
  2. //消费者
  3. class Consumer {
  4. private String lock;
  5. public Consumer(String lock) {
  6. super();
  7. this.lock = lock;
  8. }
  9. public void getValue() {
  10. try {
  11. synchronized (lock) {
  12. if (ValueObject.value.equals("")) {
  13. lock.wait();
  14. }
  15. System.out.println("get的值是" + ValueObject.value);
  16. ValueObject.value = "";
  17. lock.notify();
  18. }
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. //生产者
  25. class Producer {
  26. private String lock;
  27. public Producer(String lock) {
  28. super();
  29. this.lock = lock;
  30. }
  31. public void setValue() {
  32. try {
  33. synchronized (lock) {
  34. if (!ValueObject.value.equals("")) {
  35. lock.wait();
  36. }
  37. String value = System.currentTimeMillis() + "_" + System.nanoTime();
  38. System.out.println("set的值是" + value);
  39. ValueObject.value = value;
  40. lock.notify();
  41. }
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }
  47. public class test {
  48. public static void main(String[] args) {
  49. String lock = new String("");
  50. Producer producer = new Producer(lock);
  51. Consumer consumer = new Consumer(lock);
  52. new Thread(()->{
  53. while (true) {
  54. producer.setValue();
  55. }
  56. },"生产者").start();
  57. new Thread(()->{
  58. while (true) {
  59. consumer.getValue();
  60. }
  61. },"消费者").start();
  62. }
  63. }

结果:

  1. set的值是1544676746771_186857371485669
  2. get的值是1544676746771_186857371485669
  3. set的值是1544676746771_186857371495584
  4. get的值是1544676746771_186857371495584
  5. set的值是1544676746771_186857371515716
  6. get的值是1544676746771_186857371515716
  7. 。。。。。。。。。。。。。。。。。。。。。

假死:无法不保证唤醒的是非当前线程

  1. class ValueObject {
  2. public static String value = "";
  3. }
  4. class Producer {
  5. private String lock;
  6. public Producer(String lock) {
  7. super();
  8. this.lock = lock;
  9. }
  10. public void setValue() {
  11. try {
  12. synchronized (lock) {
  13. while (!ValueObject.value.equals("")) {
  14. System.out.println("生产者 " + Thread.currentThread().getName() + " WAITING了★");
  15. lock.wait();
  16. }
  17. System.out.println("生产者 " + Thread.currentThread().getName() + " RUNNABLE了");
  18. String value = System.currentTimeMillis() + "_" + System.nanoTime();
  19. ValueObject.value = value;
  20. lock.notify();
  21. }
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. }
  27. class Consumer {
  28. private String lock;
  29. public Consumer(String lock) {
  30. super();
  31. this.lock = lock;
  32. }
  33. public void getValue() {
  34. try {
  35. synchronized (lock) {
  36. while (ValueObject.value.equals("")) {
  37. System.out.println("消费者 " + Thread.currentThread().getName() + " WAITING了☆");
  38. lock.wait();
  39. }
  40. System.out.println("消费者 " + Thread.currentThread().getName() + " RUNNABLE了");
  41. ValueObject.value = "";
  42. lock.notify();
  43. }
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. class ThreadP extends Thread {
  50. private Producer p;
  51. public ThreadP(Producer p) {
  52. super();
  53. this.p = p;
  54. }
  55. @Override
  56. public void run() {
  57. while (true) {
  58. p.setValue();
  59. }
  60. }
  61. }
  62. class ThreadC extends Thread {
  63. private Consumer consumer;
  64. public ThreadC(Consumer consumer) {
  65. super();
  66. this.consumer = consumer;
  67. }
  68. @Override
  69. public void run() {
  70. while (true) {
  71. consumer.getValue();
  72. }
  73. }
  74. }
  75. public class Run8_allWait {
  76. public static void main(String[] args) throws InterruptedException {
  77. String lock = new String("");
  78. Producer producer = new Producer(lock);
  79. Consumer consumer = new Consumer(lock);
  80. ThreadP[] pThread = new ThreadP[2];
  81. ThreadC[] cThread = new ThreadC[2];
  82. for (int i = 0; i < 2; i++) {
  83. pThread[i] = new ThreadP(producer);
  84. pThread[i].setName("生产者" + (i + 1));
  85. cThread[i] = new ThreadC(consumer);
  86. cThread[i].setName("消费者" + (i + 1));
  87. pThread[i].start();
  88. cThread[i].start();
  89. }
  90. Thread.sleep(5000);
  91. //获取当前活动的线程
  92. Thread[] threads = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
  93. Thread.currentThread().getThreadGroup().enumerate(threads);
  94. for (int i = 0; i < threads.length; i++) {
  95. System.out.println(threads[i].getName() + " " + threads[i].getState());
  96. }
  97. }
  98. }

结果:

  1. 生产者 生产者1 RUNNABLE
  2. 生产者 生产者1 WAITING了★
  3. 生产者 生产者2 WAITING了★
  4. 消费者 消费者1 RUNNABLE
  5. 消费者 消费者1 WAITING了☆
  6. 生产者 生产者1 RUNNABLE
  7. 生产者 生产者1 WAITING了★
  8. 生产者 生产者2 WAITING了★
  9. 消费者 消费者2 RUNNABLE
  10. 消费者 消费者2 WAITING了☆
  11. 消费者 消费者1 WAITING了☆
  12. main RUNNABLE
  13. Monitor Ctrl-Break RUNNABLE
  14. 生产者1 WAITING
  15. 消费者1 WAITING
  16. 生产者2 WAITING
  17. 消费者2 WAITING

分析:jps/jstack pid 分析线程问题 jdk自带工具

jsp:

2018121313265596.png

jstack 7444:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzU0OTU3OA_size_16_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzU0OTU3OA_size_16_color_FFFFFF_t_70 1

线程根据等待和唤醒进行通信成功,但是无法不保证唤醒的是非当前线程,比如“生产者”唤醒 “生产者” / “消费者” 唤醒 “消费者”,从而导致了程序最后呈“假死状态”。

解决方法:notify()换成notifyAll().

  1. 一生产与多消费:
  2. class MyStack {
  3. private List list = new ArrayList();
  4. synchronized public void push() {
  5. try {
  6. if (list.size() == 1) {
  7. System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  8. this.wait();
  9. }
  10. list.add(Math.random());
  11. this.notify();
  12. System.out.println("push = " + list.size());
  13. } catch (InterruptedException e) {
  14. e.printStackTrace();
  15. }
  16. }
  17. synchronized public String pop() {
  18. String returnValue = "";
  19. try {
  20. if (list.size() == 0) {
  21. System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  22. this.wait();
  23. }
  24. returnValue = list.get(0) + " " + Thread.currentThread().getName();
  25. list.remove(0);
  26. this.notify();
  27. System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. return returnValue;
  32. }
  33. }
  34. class Producer {
  35. private MyStack myStack;
  36. public Producer(MyStack myStack) {
  37. super();
  38. this.myStack = myStack;
  39. }
  40. public void pushService() {
  41. myStack.push();
  42. }
  43. }
  44. class Consumer {
  45. private MyStack myStack;
  46. public Consumer(MyStack myStack) {
  47. super();
  48. this.myStack = myStack;
  49. }
  50. public void popService() {
  51. System.out.println("pop = " + myStack.pop() + " Consumer的popService方法中打印pop返回值");
  52. }
  53. }
  54. class P_Thread extends Thread {
  55. private Producer p;
  56. public P_Thread(Producer p) {
  57. super();
  58. this.p = p;
  59. }
  60. @Override
  61. public void run() {
  62. while (true) {
  63. p.pushService();
  64. }
  65. }
  66. }
  67. class C_Thread extends Thread {
  68. private Consumer c;
  69. public C_Thread(Consumer c) {
  70. super();
  71. this.c = c;
  72. }
  73. @Override
  74. public void run() {
  75. while (true) {
  76. c.popService();
  77. }
  78. }
  79. }
  80. public class Run9_oneP_manyC {
  81. public static void main(String[] args) {
  82. MyStack myStack = new MyStack();
  83. Producer p = new Producer(myStack);
  84. Consumer c1 = new Consumer(myStack);
  85. Consumer c2 = new Consumer(myStack);
  86. Consumer c3 = new Consumer(myStack);
  87. Consumer c4 = new Consumer(myStack);
  88. Consumer c5 = new Consumer(myStack);
  89. P_Thread p_thread = new P_Thread(p);
  90. p_thread.start();
  91. C_Thread c_thread1 = new C_Thread(c1);
  92. C_Thread c_thread2 = new C_Thread(c2);
  93. C_Thread c_thread3 = new C_Thread(c3);
  94. C_Thread c_thread4 = new C_Thread(c4);
  95. C_Thread c_thread5 = new C_Thread(c5);
  96. c_thread1.start();
  97. c_thread2.start();
  98. c_thread3.start();
  99. c_thread4.start();
  100. c_thread5.start();
  101. }
  102. }

结果:

  1. "C:\Program Files\Java\jdk1.8.0_181\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.4\lib\idea_rt.jar=51167:C:\Program Files\JetBrains\IntelliJ IDEA 2018.2.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_181\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_181\jre\lib\rt.jar;C:\Users\zhw\IdeaProjects\java-learning\java-multithread\target\classes" com.brianway.learning.java.multithread.communication.example9.Run9_oneP_manyC
  2. Exception in thread "Thread-1" Exception in thread "Thread-2" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
  3. push = 1
  4. at java.util.ArrayList.rangeCheck(ArrayList.java:657)
  5. push操作中的: Thread-0 线程呈wait状态
  6. pop = 0 Mystackpop方法中 线程Thread-3
  7. at java.util.ArrayList.get(ArrayList.java:433)
  8. pop = 0.859192167402362 Thread-3 ConsumerpopService方法中打印pop返回值
  9. pop操作中的: Thread-2 线程呈wait状态
  10. pop操作中的: Thread-1 线程呈wait状态
  11. at com.brianway.learning.java.multithread.communication.example9.MyStack.pop(Run9_oneP_manyC.java:41)
  12. pop操作中的: Thread-3 线程呈wait状态
  13. push = 1
  14. at com.brianway.learning.java.multithread.communication.example9.Consumer.popService(Run9_oneP_manyC.java:73)
  15. push操作中的: Thread-0 线程呈wait状态
  16. pop = 0 Mystackpop方法中 线程Thread-5
  17. at com.brianway.learning.java.multithread.communication.example9.C_Thread.run(C_Thread.java:17)
  18. pop = 0.381657285316201 Thread-5 ConsumerpopService方法中打印pop返回值
  19. pop操作中的: Thread-4 线程呈wait状态
  20. java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
  21. pop操作中的: Thread-5 线程呈wait状态
  22. at java.util.ArrayList.rangeCheck(ArrayList.java:657)
  23. at java.util.ArrayList.get(ArrayList.java:433)
  24. at com.brianway.learning.java.multithread.communication.example9.MyStack.pop(Run9_oneP_manyC.java:41)
  25. at com.brianway.learning.java.multithread.communication.example9.Consumer.popService(Run9_oneP_manyC.java:73)
  26. at com.brianway.learning.java.multithread.communication.example9.C_Thread.run(C_Thread.java:17)

原因:

  1. if判断存在弊端,条件发生改变时没有得到及时的响应,多个呈wait状态的线程被唤醒,继而执行list.remove(0)出现异常java.lang.IndexOutOfBoundsException.
  2. 解决办法:

if 改为while, while判断解决条件发生改变时没有得到及时的响应,多个呈wait状态的线程被唤醒的问题,但会出现新的问题:假死,同时需要将notify()换成notifyAll()

  1. class MyStack {
  2. private List list = new ArrayList();
  3. synchronized public void push() {
  4. try {
  5. while (list.size() == 1) {
  6. System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  7. this.wait();
  8. }
  9. list.add(Math.random());
  10. this.notifyAll();
  11. System.out.println("push = " + list.size());
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. synchronized public String pop() {
  17. String returnValue = "";
  18. try {
  19. while (list.size() == 0) {
  20. System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  21. this.wait();
  22. }
  23. returnValue = list.get(0) + " " + Thread.currentThread().getName();
  24. list.remove(0);
  25. this.notifyAll();
  26. System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. return returnValue;
  31. }
  32. }
  33. 多生产与一消费:notifyAll()代替notify(),将一直运行下去
  34. class MyStack {
  35. private List list = new ArrayList();
  36. synchronized public void push() {
  37. try {
  38. while (list.size() == 1) {
  39. System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  40. this.wait();
  41. }
  42. list.add(Math.random());
  43. this.notifyAll();
  44. System.out.println("push = " + list.size());
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. synchronized public String pop() {
  50. String returnValue = "";
  51. try {
  52. while (list.size() == 0) {
  53. System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  54. this.wait();
  55. }
  56. returnValue = list.get(0) + " " + Thread.currentThread().getName();
  57. list.remove(0);
  58. this.notifyAll();
  59. System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. return returnValue;
  64. }
  65. }
  66. class Producer {
  67. private MyStack myStack;
  68. public Producer(MyStack myStack) {
  69. super();
  70. this.myStack = myStack;
  71. }
  72. public void pushService() {
  73. myStack.push();
  74. }
  75. }
  76. class Consumer {
  77. private MyStack myStack;
  78. public Consumer(MyStack myStack) {
  79. super();
  80. this.myStack = myStack;
  81. }
  82. public void popService() {
  83. System.out.println("pop = " + myStack.pop() + " Consumer的popService方法中打印pop返回值");
  84. }
  85. }
  86. class C_Thread extends Thread {
  87. private Consumer c;
  88. public C_Thread(Consumer c) {
  89. super();
  90. this.c = c;
  91. }
  92. @Override
  93. public void run() {
  94. while (true) {
  95. c.popService();
  96. }
  97. }
  98. }
  99. class P_Thread extends Thread {
  100. private Producer p;
  101. public P_Thread(Producer p) {
  102. super();
  103. this.p = p;
  104. }
  105. @Override
  106. public void run() {
  107. while (true) {
  108. p.pushService();
  109. }
  110. }
  111. }
  112. public class test{
  113. public static void main(String[] args) {
  114. MyStack myStack = new MyStack();
  115. int pNum = 6;
  116. Producer[] producers = new Producer[pNum];
  117. P_Thread[] p_threads = new P_Thread[pNum];
  118. for (int i = 0; i < pNum; i++) {
  119. producers[i] = new Producer(myStack);
  120. }
  121. for (int i = 0; i < pNum; i++) {
  122. p_threads[i] = new P_Thread(producers[i]);
  123. p_threads[i].start();
  124. }
  125. Consumer c = new Consumer(myStack);
  126. C_Thread c_thread = new C_Thread(c);
  127. c_thread.start();
  128. }
  129. }

多生产与多消费: 一直运行下去

  1. class MyStack {
  2. private List list = new ArrayList();
  3. synchronized public void push() {
  4. try {
  5. while (list.size() == 1) {
  6. System.out.println("push操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  7. this.wait();
  8. }
  9. list.add(Math.random());
  10. this.notifyAll();
  11. System.out.println("push = " + list.size());
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. synchronized public String pop() {
  17. String returnValue = "";
  18. try {
  19. while (list.size() == 0) {
  20. System.out.println("pop操作中的: " + Thread.currentThread().getName() + " 线程呈wait状态");
  21. this.wait();
  22. }
  23. returnValue = list.get(0) + " " + Thread.currentThread().getName();
  24. list.remove(0);
  25. this.notifyAll();
  26. System.out.println("pop = " + list.size() + " Mystack的pop方法中 线程" + Thread.currentThread().getName());
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. return returnValue;
  31. }
  32. }
  33. class Producer {
  34. private MyStack myStack;
  35. public Producer(MyStack myStack) {
  36. super();
  37. this.myStack = myStack;
  38. }
  39. public void pushService() {
  40. myStack.push();
  41. }
  42. }
  43. class Consumer {
  44. private MyStack myStack;
  45. public Consumer(MyStack myStack) {
  46. super();
  47. this.myStack = myStack;
  48. }
  49. public void popService() {
  50. System.out.println("pop = " + myStack.pop() + " Consumer的popService方法中打印pop返回值");
  51. }
  52. }
  53. class C_Thread extends Thread {
  54. private Consumer c;
  55. public C_Thread(Consumer c) {
  56. super();
  57. this.c = c;
  58. }
  59. @Override
  60. public void run() {
  61. while (true) {
  62. c.popService();
  63. }
  64. }
  65. }
  66. class P_Thread extends Thread {
  67. private Producer p;
  68. public P_Thread(Producer p) {
  69. super();
  70. this.p = p;
  71. }
  72. @Override
  73. public void run() {
  74. while (true) {
  75. p.pushService();
  76. }
  77. }
  78. }
  79. public class test{
  80. public static void main(String[] args) {
  81. MyStack myStack = new MyStack();
  82. int pNum = 6;
  83. Producer[] producers = new Producer[pNum];
  84. P_Thread[] p_threads = new P_Thread[pNum];
  85. for (int i = 0; i < pNum; i++) {
  86. producers[i] = new Producer(myStack);
  87. }
  88. for (int i = 0; i < pNum; i++) {
  89. p_threads[i] = new P_Thread(producers[i]);
  90. p_threads[i].start();
  91. }
  92. int cNum = 8;
  93. Consumer[] consumers = new Consumer[cNum];
  94. C_Thread[] c_threads = new C_Thread[cNum];
  95. for (int i = 0; i < cNum; i++) {
  96. consumers[i] = new Consumer(myStack);
  97. }
  98. for (int i = 0; i < cNum; i++) {
  99. c_threads[i] = new C_Thread(consumers[i]);
  100. c_threads[i].start();
  101. }
  102. }
  103. }

发表评论

表情:
评论列表 (有 0 条评论,164人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线12/线通信

    线程通信的例子:使用两个线程打印1-100。线程1,线程2交替打印 前置知识: 1. wait(); 一旦执行此方法,当前线程就进入阻塞状态,并释放同步监视器。

    相关 Java 线-线通信

    最近,美美非常的爱吃栗子,剥栗子却有些麻烦,这个任务理所当然的交给了帅帅,每一次,帅帅都会把热气腾腾的栗子剥好,然后放进一个盘子里,而美美每次都会从盘子里拿一个栗子吃: !

    相关 线(3)- 线通信

    线程之间的通信: 多个线程在处理同一个资源,但是处理的动作(线程的任务)却不相同。通过一定的手段使各个线程能有效的利用资源。而这种手段即—— 等待唤醒机制。 等待唤醒机...