Java多线程 二 线程间通信

红太狼 2021-11-01 08:32 522阅读 0赞

线程间通信:

多个线程在处理同一资源,但是

等待唤醒机制

涉及的方法:

1、wait() 让线程处于冻结状态,被wait的线程会被存储到线程池中。

2、notify() 唤醒线程池中的一个线程(任意)

3、notifyAll() 唤醒线程池中的所有线程、、

这些方法都必须定义在同步中,

因为这些方法是用于操作线程状态的方法。

必须明确到底操作的那个锁上的线程。

为什么操作线程的方法wait notify notifyAll定义在了Object中。

因为这些方法是监视器方法,监视器其实就是锁。

锁可以是任意的对象,任意的对象调用的方法一定定义在Object类中。

  1. package com.pzq.thread;
  2. public class SyncDemo3 {
  3. public static void main(String[] args) {
  4. Resource resource = new Resource();
  5. Input input = new Input(resource);
  6. OutPut outPut = new OutPut(resource);
  7. Thread threadInput = new Thread(input);
  8. Thread threadOuput = new Thread(outPut);
  9. threadInput.start();
  10. threadOuput.start();
  11. }
  12. }
  13. class Input implements Runnable {
  14. Resource r;
  15. public Input(Resource r) {
  16. this.r = r;
  17. }
  18. @Override
  19. public void run() {
  20. int x = 0;
  21. while (true) {
  22. synchronized (r) {
  23. if (r.flag) {
  24. try {
  25. r.wait();
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. if (x == 0) {
  31. r.name = "mike";
  32. r.sex = "nan";
  33. } else {
  34. r.name = "丽丽";
  35. r.sex = "女女女女女女女";
  36. }
  37. r.flag = true;
  38. r.notify();
  39. }
  40. x = (x + 1) % 2;
  41. }
  42. }
  43. }
  44. class OutPut implements Runnable {
  45. Resource r;
  46. public OutPut(Resource r) {
  47. this.r = r;
  48. }
  49. @Override
  50. public void run() {
  51. while (true) {
  52. synchronized (r) {
  53. if (!r.flag) {
  54. try {
  55. r.wait();
  56. } catch (InterruptedException e) {
  57. e.printStackTrace();
  58. }
  59. }
  60. System.out.println(r.name + "......" + r.sex);
  61. r.flag = false;
  62. r.notify();
  63. }
  64. }
  65. }
  66. }
  67. class Resource {
  68. String name;
  69. String sex;
  70. boolean flag = false;
  71. public Resource() {
  72. }
  73. public Resource(String name, String sex) {
  74. this.name = name;
  75. this.sex = sex;
  76. }
  77. }

生产者消费者

代码优化,将同步方法封装到属性当中

  1. package com.pzq.thread;
  2. public class SyncDemo3 {
  3. public static void main(String[] args) {
  4. Resource resource = new Resource();
  5. Input input = new Input(resource);
  6. OutPut outPut = new OutPut(resource);
  7. Thread threadInput = new Thread(input);
  8. Thread threadOuput = new Thread(outPut);
  9. threadInput.start();
  10. threadOuput.start();
  11. }
  12. }
  13. class Input implements Runnable {
  14. Resource r;
  15. public Input(Resource r) {
  16. this.r = r;
  17. }
  18. @Override
  19. public void run() {
  20. int x = 0;
  21. while (true) {
  22. if (x == 0) {
  23. r.set("mike", "nan");
  24. } else {
  25. r.set("丽丽", "=========女");
  26. }
  27. x = (x + 1) % 2;
  28. }
  29. }
  30. }
  31. class OutPut implements Runnable {
  32. Resource r;
  33. public OutPut(Resource r) {
  34. this.r = r;
  35. }
  36. @Override
  37. public void run() {
  38. while (true) {
  39. r.out();
  40. }
  41. }
  42. }
  43. class Resource {
  44. private String name;
  45. private String sex;
  46. private boolean flag = false;
  47. public Resource() {
  48. }
  49. public synchronized void set(String name, String sex) {
  50. if (flag) {
  51. try {
  52. this.wait();
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. this.name = name;
  58. this.sex = sex;
  59. flag = true;
  60. this.notify();
  61. }
  62. public synchronized void out() {
  63. if (!flag) {
  64. try {
  65. this.wait();
  66. } catch (InterruptedException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. System.out.println(this.name + "....." + this.sex);
  71. flag = false;
  72. this.notify();
  73. }
  74. }

单生产单消费

  1. package com.pzq.thread;
  2. public class SyncDemo4 {
  3. public static void main(String[] args) {
  4. Resource2 resource = new Resource2();
  5. Producter input = new Producter(resource);
  6. Customer outPut = new Customer(resource);
  7. Thread threadInput = new Thread(input);
  8. Thread threadOuput = new Thread(outPut);
  9. threadInput.start();
  10. threadOuput.start();
  11. }
  12. }
  13. class Producter implements Runnable {
  14. Resource2 r;
  15. public Producter(Resource2 r) {
  16. this.r = r;
  17. }
  18. @Override
  19. public void run() {
  20. while (true) {
  21. r.set("烤鸭");
  22. }
  23. }
  24. }
  25. class Customer implements Runnable {
  26. Resource2 r;
  27. public Customer(Resource2 r) {
  28. this.r = r;
  29. }
  30. @Override
  31. public void run() {
  32. while (true) {
  33. r.out();
  34. }
  35. }
  36. }
  37. class Resource2 {
  38. private String name;
  39. private boolean flag = false;
  40. private int count = 1;
  41. public Resource2() {
  42. }
  43. public synchronized void set(String name) {
  44. if (flag) {
  45. try {
  46. this.wait();
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. this.name = name + count;
  52. count++;
  53. System.out.println(Thread.currentThread().getName() + "生产者+++" + this.name);
  54. flag = true;
  55. this.notify();
  56. }
  57. public synchronized void out() {
  58. if (!flag) {
  59. try {
  60. this.wait();
  61. } catch (InterruptedException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. System.out.println(Thread.currentThread().getName() + "消费者" + "=================" + this.name);
  66. flag = false;
  67. this.notify();
  68. }
  69. }

多生产者多消费者

if判断标记,只有一次,会导致不该运行的线程运行了。出现了数据错误的情况。

while判断标记,解决了线程获取执行权后,是否要运行

notify只能唤醒一个线程,如果本方唤醒了本方,就没有意义了。而且 while判断标记+notify会导致死锁。

notifyAll解决了,本方线程一定会唤醒对方线程。

  1. package com.pzq.thread;
  2. public class SyncDemo4 {
  3. public static void main(String[] args) {
  4. Resource2 resource = new Resource2();
  5. Producter input = new Producter(resource);
  6. Customer outPut = new Customer(resource);
  7. Thread threadInput1 = new Thread(input);
  8. Thread threadInput2 = new Thread(input);
  9. Thread threadInput3 = new Thread(input);
  10. Thread threadOuput1 = new Thread(outPut);
  11. Thread threadOuput2 = new Thread(outPut);
  12. Thread threadOuput3 = new Thread(outPut);
  13. Thread threadOuput4 = new Thread(outPut);
  14. Thread threadOuput5 = new Thread(outPut);
  15. threadInput1.start();
  16. threadInput2.start();
  17. threadInput3.start();
  18. threadOuput1.start();
  19. threadOuput2.start();
  20. threadOuput3.start();
  21. threadOuput4.start();
  22. threadOuput5.start();
  23. }
  24. }
  25. class Producter implements Runnable {
  26. Resource2 r;
  27. public Producter(Resource2 r) {
  28. this.r = r;
  29. }
  30. @Override
  31. public void run() {
  32. while (true) {
  33. r.set("烤鸭");
  34. }
  35. }
  36. }
  37. class Customer implements Runnable {
  38. Resource2 r;
  39. public Customer(Resource2 r) {
  40. this.r = r;
  41. }
  42. @Override
  43. public void run() {
  44. while (true) {
  45. r.out();
  46. }
  47. }
  48. }
  49. class Resource2 {
  50. private String name;
  51. private boolean flag = false;
  52. private int count = 1;
  53. public Resource2() {
  54. }
  55. public synchronized void set(String name) {
  56. while (flag) {
  57. try {
  58. this.wait();
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. this.name = name + count;
  64. count++;
  65. System.out.println(Thread.currentThread().getName() + "生产者+++" + this.name);
  66. flag = true;
  67. this.notifyAll();
  68. }
  69. public synchronized void out() {
  70. while (!flag) {
  71. try {
  72. this.wait();
  73. } catch (InterruptedException e) {
  74. e.printStackTrace();
  75. }
  76. }
  77. System.out.println(Thread.currentThread().getName() + "消费者" + "=================" + this.name);
  78. flag = false;
  79. this.notifyAll();
  80. }
  81. }

多生产者多消费者 1.5新特性

jdk1.5以后将同步和锁封装成了对象。

并将操作锁的隐式方式定义到了该对象中,

将隐式动作变成了显示动作。

Lock接口:出现了替代同步代码块或者同步函数。将同步的隐式锁操作变成现实锁操作。

同时更为灵活,可以一个锁上加上多组监视器。

lock() 获取锁

unlock():释放锁,通常需要定义finally代码块中

Condition接口:出现替代了Object中的wait notify notifyAll方法。

  将这些监视器方法单独进行了封装,变成Condition监视器对象。

  可以任意锁进行组合。

await();

signal();

singnalAll

  1. package com.pzq.thread;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class SyncDemo4 {
  6. public static void main(String[] args) {
  7. Resource2 resource = new Resource2();
  8. Producter input = new Producter(resource);
  9. Customer outPut = new Customer(resource);
  10. Thread threadInput1 = new Thread(input);
  11. Thread threadInput2 = new Thread(input);
  12. Thread threadInput3 = new Thread(input);
  13. Thread threadOuput1 = new Thread(outPut);
  14. Thread threadOuput2 = new Thread(outPut);
  15. Thread threadOuput3 = new Thread(outPut);
  16. Thread threadOuput4 = new Thread(outPut);
  17. Thread threadOuput5 = new Thread(outPut);
  18. threadInput1.start();
  19. threadInput2.start();
  20. threadInput3.start();
  21. threadOuput1.start();
  22. threadOuput2.start();
  23. threadOuput3.start();
  24. threadOuput4.start();
  25. threadOuput5.start();
  26. }
  27. }
  28. class Producter implements Runnable {
  29. Resource2 r;
  30. public Producter(Resource2 r) {
  31. this.r = r;
  32. }
  33. @Override
  34. public void run() {
  35. while (true) {
  36. r.set("烤鸭");
  37. }
  38. }
  39. }
  40. class Customer implements Runnable {
  41. Resource2 r;
  42. public Customer(Resource2 r) {
  43. this.r = r;
  44. }
  45. @Override
  46. public void run() {
  47. while (true) {
  48. r.out();
  49. }
  50. }
  51. }
  52. class Resource2 {
  53. private String name;
  54. private boolean flag = false;
  55. private int count = 1;
  56. // 创建一个锁对象
  57. Lock lock = new ReentrantLock();
  58. // 通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者
  59. Condition con_producter = lock.newCondition();
  60. Condition con_customer = lock.newCondition();
  61. public Resource2() {
  62. }
  63. public void set(String name) {
  64. lock.lock();
  65. try {
  66. while (flag) {
  67. try {
  68. con_producter.await();
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. }
  73. this.name = name + count;
  74. count++;
  75. System.out.println(Thread.currentThread().getName() + "生产者+++" + this.name);
  76. flag = true;
  77. con_customer.signal();
  78. } finally {
  79. lock.unlock();
  80. }
  81. }
  82. public void out() {
  83. lock.lock();
  84. try {
  85. while (!flag) {
  86. try {
  87. con_customer.await();
  88. } catch (InterruptedException e) {
  89. e.printStackTrace();
  90. }
  91. }
  92. System.out.println(Thread.currentThread().getName() + "消费者" + "=================" + this.name);
  93. flag = false;
  94. con_producter.signal();
  95. } finally {
  96. lock.unlock();
  97. }
  98. }
  99. }

定义一个容器的生产者消费者

  1. package com.pzq.thread;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.Lock;
  4. import java.util.concurrent.locks.ReentrantLock;
  5. public class BoundBuffer {
  6. public static void main(String[] args) {
  7. Resources resources = new Resources();
  8. Products products = new Products(resources);
  9. Customers customers = new Customers(resources);
  10. new Thread(products).start();
  11. new Thread(products).start();
  12. new Thread(products).start();
  13. new Thread(customers).start();
  14. new Thread(customers).start();
  15. new Thread(customers).start();
  16. }
  17. }
  18. class Customers implements Runnable {
  19. Resources rs;
  20. public Customers(Resources rs) {
  21. this.rs = rs;
  22. }
  23. @Override
  24. public void run() {
  25. while (true) {
  26. try {
  27. rs.take();
  28. Thread.sleep(1000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }
  35. class Products implements Runnable {
  36. Resources rs;
  37. int count = 1;
  38. public Products(Resources rs) {
  39. this.rs = rs;
  40. }
  41. @Override
  42. public void run() {
  43. while (true) {
  44. try {
  45. rs.put("烤鸭");
  46. Thread.sleep(1000);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. }
  52. }
  53. class Resources {
  54. Lock lock = new ReentrantLock();
  55. Condition notEmpty = lock.newCondition();
  56. Condition notFull = lock.newCondition();
  57. Object[] buffer = new Object[100];
  58. int putptr, takeptr, count;
  59. public void put(Object x) throws InterruptedException {
  60. lock.lock();
  61. try {
  62. while (count == buffer.length) {
  63. notFull.await();
  64. }
  65. buffer[putptr] = x;
  66. if (++putptr == buffer.length) {
  67. putptr = 0;
  68. }
  69. ++count;
  70. System.out.println("-------" + Thread.currentThread().getName() + "生产了---" + (x.toString() + putptr));
  71. notEmpty.signal();
  72. } finally {
  73. lock.unlock();
  74. }
  75. }
  76. public void take() throws InterruptedException {
  77. lock.lock();
  78. // Object x = null;
  79. try {
  80. while (count == 0) {
  81. notEmpty.await();
  82. }
  83. Object x = buffer[takeptr];
  84. if (++takeptr == buffer.length) {
  85. takeptr = 0;
  86. }
  87. --count;
  88. System.out.println("#######" + Thread.currentThread().getName() + "消费了####" + (x.toString() + takeptr));
  89. notFull.signal();
  90. } finally {
  91. lock.unlock();
  92. }
  93. // return x;
  94. }
  95. }

wait和sleep区别

1、wait可以指定时间也可以不指定。

  sleep必须指定时间

2、在同步中时,对CPU的执行权和锁的处理不同

  wait:释放执行权,释放锁

  sleep:释放执行权,不释放锁

停止线程

1、run方法结束:线程中的任务结束了,线程就停止了。

2、可以使用interrupt()方法将线程从冻结状态强制回鹘到运行状态中,让线程具备cpu的执行资格

  1. 当强制动作发生了InterruputdException,设置标记。
  2. package com.pzq.thread;
  3. public class SyncDemo5 {
  4. public static void main(String[] args) {
  5. StopRunnable stop = new StopRunnable();
  6. Thread thread1 = new Thread(stop);
  7. Thread thread2 = new Thread(stop);
  8. thread1.start();
  9. thread2.start();
  10. thread1.interrupt();
  11. thread2.interrupt();
  12. }
  13. }
  14. class StopRunnable implements Runnable {
  15. boolean flag = true;
  16. @Override
  17. public synchronized void run() {
  18. while (flag) {
  19. try {
  20. wait();
  21. } catch (InterruptedException e) {
  22. System.out.println(Thread.currentThread().getName() + " , " + e);
  23. setFlagFalse();
  24. }
  25. System.out.println(Thread.currentThread().getName() + " ..... ");
  26. }
  27. }
  28. public void setFlagFalse() {
  29. flag = false;
  30. }
  31. }

线程中常见方法

setDaemon: 将该线程标记为守护线程(后台线程)或用户线程,当正在运行的线程都是守护线程时java虚拟机退出。该方法必须在启动线程前调用。

      该方法首先调用线程的 checkAccess方法,且不带任何参数。这可能抛出SecurityException(当前线程中)

join 调用join时说明线程要申请加入进来运行。主线程会将执行权释放出来。等待thread1执行完。只要thread1执行完主线程就可以执行。

  1. //临时加入一个线程运算时可以用到join方法
  2. package com.pzq.thread;
  3. public class JoinDemo {
  4. public static void main(String[] args) {
  5. TestRun testRun = new TestRun();
  6. Thread thread1 = new Thread(testRun);
  7. Thread thread2 = new Thread(testRun);
  8. thread1.start();
  9. try {
  10. thread1.join(); //thread1线程要申请加入进来运行。主线程会将执行权释放出来。等待thread1执行完。只要thread1执行完主线程就可以执行。
  11. //临时加入一个线程运算时可以用到join方法
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. thread2.start();
  16. for (int x = 0; x < 50; x++) {
  17. System.out.println(Thread.currentThread().getName() + " ... " + x);
  18. }
  19. }
  20. }
  21. class TestRun implements Runnable {
  22. @Override
  23. public void run() {
  24. for (int i = 0; i < 50; i++) {
  25. System.out.println(Thread.currentThread().getName() + " ... " + i);
  26. }
  27. }
  28. }

setPriority 设置线程优先级

yield:释放自己的执行权

转载于:https://www.cnblogs.com/qiangge-python/p/11067391.html

发表评论

表情:
评论列表 (有 0 条评论,522人围观)

还没有评论,来说两句吧...

相关阅读