多线程学习笔记(二)多线程之线程通信

太过爱你忘了你带给我的痛 2023-02-15 10:05 128阅读 0赞

引言

对于一个人而言,一生中会经历生、老、病、死等不同的状态,正所谓水无常形,兵无常势,反观线程启动后亦会有不同状态,线程所经历的状态比较固定,共有六种状态,一个线程从从创建到死亡过程,可能会经历中间的四种状态,也可能朝生夕死,线程的生命周期示意图如下所示。
在这里插入图片描述

线程与其状态

下文是有关线程在计算机中的六种状态,当一个线程加载如计算机中时,通常会有六种状态:new、terminated、timed waiting、running、blocked、wait,这六种状态的关系如下:
在这里插入图片描述
线程的创建和消亡以及运行状态就不再赘述。

timed waiting

timed waiting状态为 计时等待状态,当我们调用Thread.sleep(xxx)时就可以进入计时等待状态例如Thread.sleep(20)表示该线程进入20ms的休眠等待状态,等20ms结束后,该线程就自动唤醒并争夺锁对象(即争夺cpu资源拥有权),如果争夺失败则会进入阻塞状态,等待下一轮争夺。
对于如何进入timedwaiting 具体的例子如下所示。

  1. public class SleepTest {
  2. public static int j=200;
  3. public static Object lock=new Object();;
  4. public static void main(String[] args) throws InterruptedException {
  5. //Object lock = new Object();
  6. new Thread(){
  7. @Override
  8. public void run() {
  9. super.setName("赵丽颖");
  10. synchronized (lock){
  11. for (int i = 0; i < 50; i++) {
  12. j--;
  13. System.out.println("这是第:"+i+"--个"+Thread.currentThread().getName()+"线程"+":"+j);
  14. }
  15. }
  16. try {
  17. Thread.sleep(10);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }.start();
  23. new Thread(new Runnable() {
  24. @Override
  25. public void run() {
  26. synchronized (lock){
  27. for (int i = 0; i < 50; i++) {
  28. j++;
  29. System.out.println("这是第:"+i+"--个"+Thread.currentThread().getName()+"线程"+":"+j);
  30. }
  31. }
  32. try {
  33. Thread.sleep(100);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38. },"地里热巴").start();
  39. Thread.sleep(100);
  40. System.out.println("----------"+Thread.currentThread().getName()+":----------");
  41. }
  42. }

运行结果如下:
在这里插入图片描述
上述例子分别采用了”实现”Runnable接口和继承Thread类来实现多线程程序,并在其中使用同步代码块作为同步技术,简单的解决了线程安全的问题。

waiting

无限等待状态即waiting,waiting状态并不是一个线程的操作,它体现的是多个线程间之间的通信,可以理解为多个线程之间的协作关系,多个线程会争取锁,同时相互之间又存在协作关系。当多个线程协作时,如A,B线程,如果A线程在Runnable(可运行)状态中调用了wait()方法那么A线程就进入
了Waiting(无限等待)状态,同时失去了同步锁。假如这个时候B线程获取到了同步锁,在运行状态中调用了notify()方法,那么就会将无限等待的A线程唤醒。注意是唤醒,如果获取到锁对象,那么A线程唤醒后就进入Runnable(可运行)状态;如果没有获取锁对象,那么就进入到Blocked(锁阻塞状态)。

blocked

对于blocked而言,多个线程中竞争锁对象失败者都会进入阻塞状态并等待新一轮抢夺cpu资源的机会。

线程之间的相互通信

对于线程之间的相互通信,这个问题比较宽泛,多线程之间通信总体来说能分为 共享内存机制和消息通信机制,下文先介绍多线程的内存共享性通讯机制(以java为例)。

内存共享机制

内存共享性顾名思义就是两个线程共享某一块内存区域,通过共享内存区域的内容达到线程通信的目的。共享内存这种方式比较常见,我们经常会设置一个共享变量。然后多个线程去操作同一个共享变量。从而达到线程通讯的目的。
在这里插入图片描述
在内存共享机制中常见的有 轮询模式和同步模式

轮询模式

首先来探讨一下第一种内存共享性通讯机制轮询模式,轮询模式比较简单,如有A,B两个线程,我们的需求是先让程序A执行完毕后触发标志位,如果标志位符合条件再让线程B执行,这就要求线程B不断轮询表示位是否符合条件,如果符合条件则开始执行B线程的程序,否则不执行,这样就简单的实现了线程之间的信号传递,下面给出小猿的案例,假定线程A每计数5次就让flag变为true,线程B不断轮询标志位,当符合条件时再触发B线程中的程序。
定义的标志类:

  1. package com.itheima.demo01.polling;
  2. public class MyFlag {
  3. private boolean flag ;
  4. public MyFlag(boolean flag) {
  5. this.flag = flag;
  6. }
  7. public MyFlag() {
  8. }
  9. public boolean isFlag() {
  10. return flag;
  11. }
  12. public void setFlag(boolean flag) {
  13. this.flag = flag;
  14. }
  15. }

线程A类实现了runnable接口,显然实现了接口中的run方法。

  1. public class MyThreadA implements Runnable{
  2. private volatile MyFlag flag = new MyFlag();
  3. public MyThreadA(MyFlag flag) {
  4. this.flag = flag;
  5. }
  6. public MyFlag getFlag() {
  7. return flag;
  8. }
  9. public void setFlag(MyFlag flag) {
  10. this.flag = flag;
  11. }
  12. @Override
  13. public void run() {
  14. System.out.println("-----------"+Thread.currentThread().getName()+"正在执行------------------");
  15. while (true){
  16. if(!flag.isFlag()){
  17. for (int i = 1; i < 20; i++) {
  18. System.out.println("----------------"+i+"-----------------");
  19. if(i%5==0){
  20. flag.setFlag(true);
  21. System.out.println(flag.isFlag());
  22. }else {
  23. flag.setFlag(false);
  24. System.out.println(flag.isFlag());
  25. }
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }
  34. // flag.setFlag(true);
  35. // System.out.println("-----------"+Thread.currentThread().getName()+"要退出了------------------");
  36. }
  37. }

线程B类实现了继承了Thread类,重写了父类中的run方法。

  1. public class MyThreadB extends Thread {
  2. private volatile MyFlag flag;
  3. public MyFlag getFlag() {
  4. return flag;
  5. }
  6. public void setFlag(MyFlag flag) {
  7. this.flag = flag;
  8. }
  9. public MyThreadB(MyFlag flag) {
  10. this.flag = flag;
  11. }
  12. @Override
  13. public void run() {
  14. super.setName("MyThreadB");
  15. while (true){
  16. if(flag.isFlag()){
  17. System.out.println("-----------"+Thread.currentThread().getName()+"将要开始了-------------------");
  18. try {
  19. throw new InterruptedException("线程B需要执行了");
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }finally {
  23. System.out.println("-----------"+Thread.currentThread().getName()+"将要结束了-------------------");
  24. }
  25. try {
  26. Thread.sleep(1000);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. }
  33. }

最后给出测试类

  1. public class DemoTestPolling {
  2. public static void main(String[] args) {
  3. MyFlag flag = new MyFlag(false);
  4. new Thread(new MyThreadA(flag),"MyThreadA").start();
  5. new MyThreadB(flag).start();
  6. }
  7. }

此处要注意 MyFlag 定义的flag 一定要用 volatile关键字修饰,这一点是非常关键的,否则就达不到我们想要的效果,其中的原因是Java内存模型规定所有的变量都是存在主存当中,每个线程都有自己的工作内存。线程对变量的所有操作都必须在工作内存中进行,而不能直接对主存进行操作。并且每个线程不能访问其他线程的工作内存。变量的值何时从线程的工作内存写回主存,但是假如线程A没有及时更新主内存,那么线程B 就会产生脏读现象,所以为了保证内存操作的原子性,就必须用volatile关键字修饰,保证线程内存和主内存的数据同步。
最后小猿运行的结果如下:
在这里插入图片描述
在这里插入图片描述

同步机制

同步机制也是内存共享中的一种,目前已被广泛应用于解决线程安全问题,同步机制包含同步代码块,同步方法和lock锁机制。首先,小猿来介绍一下同步代码块的用法。

同步代码块

直接上案例,
定义买票线程类

  1. public class RunnableImpl implements Runnable{
  2. //定义一个多个线程共享的票源
  3. private int ticket = 1000;
  4. //创建一个锁对象
  5. Object obj = new Object();
  6. //设置线程任务:卖票
  7. @Override
  8. public void run() {
  9. //使用死循环,让卖票操作重复执行
  10. while(true){
  11. //同步代码块
  12. synchronized (obj){
  13. //先判断票是否存在
  14. if(ticket>0){
  15. //提高安全问题出现的概率,让程序睡眠
  16. try {
  17. Thread.sleep(10);
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. //票存在,卖票 ticket--
  22. System.out.println(Thread.currentThread().getName()+"-->正在卖第"+ticket+"张票");
  23. ticket--;
  24. }
  25. }
  26. }
  27. }
  28. }

测试类

  1. public class Demo01Ticket {
  2. public static void main(String[] args) {
  3. //创建Runnable接口的实现类对象
  4. RunnableImpl runnable = new RunnableImpl();
  5. Thread thread1 = new Thread(runnable,"窗口一");
  6. Thread thread2 = new Thread(runnable,"窗口二");
  7. Thread thread3 = new Thread(runnable,"窗口三");
  8. thread1.start();
  9. thread2.start();
  10. thread3.start();
  11. }
  12. }

测试结果
在这里插入图片描述在这里插入图片描述
上述案例中同步代码块的锁采用的线程自定义的obj类并解决了多个线程访问ticket而产生的线程的安全问题,

同步方法

案例还是不同窗口买票,这次采用的同步方法来解决线程安全问题
定义的线程类

  1. public class RunnableImpl implements Runnable{
  2. //定义一个多个线程共享的票源
  3. private static int ticket = 50;
  4. private int ticket2=50;
  5. //设置线程任务:卖票
  6. @Override
  7. public void run() {
  8. System.out.println("this:"+this);
  9. //使用死循环,让卖票操作重复执行
  10. while(true){
  11. payTicketStatic();
  12. //payTicket();
  13. }
  14. }
  15. /*
  16. 静态的同步方法
  17. 锁对象是谁?
  18. 不能是this
  19. this是创建对象之后产生的,静态方法优先于对象
  20. 静态方法的锁对象是本类的class属性-->class文件对象(反射)
  21. */
  22. /* public static *//*synchronized*//* void payTicketStatic(){
  23. synchronized (RunnableImpl.class){
  24. //先判断票是否存在
  25. if(ticket>0){
  26. //提高安全问题出现的概率,让程序睡眠
  27. try {
  28. Thread.sleep(100);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. //票存在,卖票 ticket--
  33. System.out.println(Thread.currentThread().getName()+"-->正在卖第"+ticket+"张票");
  34. ticket--;
  35. }
  36. }
  37. }*/
  38. public static synchronized void payTicketStatic() {
  39. //先判断票是否存在
  40. if (ticket > 0) {
  41. //票存在,卖票 ticket--
  42. System.out.println(Thread.currentThread().getName() + "-->正在卖第" + ticket + "张票");
  43. ticket--;
  44. //提高安全问题出现的概率,让程序睡眠
  45. try {
  46. Thread.sleep(10);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. }
  52. /*
  53. 定义一个同步方法
  54. 同步方法也会把方法内部的代码锁住
  55. 只让一个线程执行
  56. 同步方法的锁对象是谁?
  57. 就是实现类对象 new RunnableImpl()
  58. 也是就是this
  59. */
  60. /*public *//*synchronized*//* void payTicket(){
  61. synchronized (this){
  62. //先判断票是否存在
  63. if(ticket>0){
  64. //提高安全问题出现的概率,让程序睡眠
  65. try {
  66. Thread.sleep(10);
  67. } catch (InterruptedException e) {
  68. e.printStackTrace();
  69. }
  70. //票存在,卖票 ticket--
  71. System.out.println(Thread.currentThread().getName()+"-->正在卖第"+ticket+"张票");
  72. ticket--;
  73. }
  74. }
  75. }*/
  76. public synchronized void payTicket() {
  77. //先判断票是否存在
  78. if (ticket2 > 0) {
  79. //提高安全问题出现的概率,让程序睡眠
  80. //票存在,卖票 ticket--
  81. System.out.println(Thread.currentThread().getName() + "-->正在卖第" + ticket2 + "张票");
  82. ticket2--;
  83. try {
  84. Thread.sleep(20);
  85. } catch (InterruptedException e) {
  86. e.printStackTrace();
  87. }
  88. }
  89. }
  90. }

测试类

  1. public class Demo01Ticket {
  2. public static void main(String[] args) {
  3. // Runnable runnable=new RunnableImpl();
  4. //创建Thread类对象,构造方法中传递Runnable接口的实现类对象
  5. Thread t0 = new Thread(new RunnableImpl(),"窗口一");
  6. Thread t1 = new Thread(new RunnableImpl(),"窗口二");
  7. Thread t2 = new Thread(new RunnableImpl(),"窗口三");
  8. //调用start方法开启多线程
  9. t0.start();
  10. t1.start();
  11. t2.start();
  12. }
  13. }

在这里插入图片描述
此处需要说明的是 同步方法中静态方法的锁对象是该类(即RunnableImpl.class),而普通同步方法的锁对象就是该对象自己 。下面图解一下这两者的区别。
在这里插入图片描述
从上图中可以反映出静态同步方法和同步方法之间的区别。

lock锁

lock锁是另外一种重要的同步机制之一,采用lock所,首先需要穿件Lock锁对象 new ReentrantLock(),然后采用lock对象的成员方法lock()和unlock()夹住代码,从而解决被夹代码的线程安全问题,

  1. public class RunnableImpl implements Runnable{
  2. //定义一个多个线程共享的票源
  3. private int ticket = 1000;
  4. //1.在成员位置创建一个ReentrantLock对象
  5. Lock l = new ReentrantLock();
  6. //设置线程任务:卖票
  7. @Override
  8. public void run() {
  9. //使用死循环,让卖票操作重复执行
  10. while(true){
  11. //2.在可能会出现安全问题的代码前调用Lock接口中的方法lock获取锁
  12. l.lock();
  13. //先判断票是否存在
  14. if(ticket>0){
  15. //提高安全问题出现的概率,让程序睡眠
  16. try {
  17. Thread.sleep(10);
  18. //票存在,卖票 ticket--
  19. System.out.println(Thread.currentThread().getName()+"-->正在卖第"+ticket+"张票");
  20. ticket--;
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }finally {
  24. //3.在可能会出现安全问题的代码后调用Lock接口中的方法unlock释放锁
  25. l.unlock();//无论程序是否异常,都会把锁释放
  26. }
  27. }
  28. }
  29. }
  30. /*//设置线程任务:卖票
  31. @Override
  32. public void run() {
  33. //使用死循环,让卖票操作重复执行
  34. while(true){
  35. //2.在可能会出现安全问题的代码前调用Lock接口中的方法lock获取锁
  36. l.lock();
  37. //先判断票是否存在
  38. if(ticket>0){
  39. //提高安全问题出现的概率,让程序睡眠
  40. try {
  41. Thread.sleep(10);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. //票存在,卖票 ticket--
  46. System.out.println(Thread.currentThread().getName()+"-->正在卖第"+ticket+"张票");
  47. ticket--;
  48. }
  49. //3.在可能会出现安全问题的代码后调用Lock接口中的方法unlock释放锁
  50. l.unlock();
  51. }
  52. }*/
  53. }

测试类

  1. public class Demo01Ticket {
  2. public static void main(String[] args) {
  3. //创建Runnable接口的实现类对象
  4. RunnableImpl run = new RunnableImpl();
  5. //创建Thread类对象,构造方法中传递Runnable接口的实现类对象
  6. Thread t0 = new Thread(run);
  7. Thread t1 = new Thread(run);
  8. Thread t2 = new Thread(run);
  9. //调用start方法开启多线程
  10. t0.start();
  11. t1.start();
  12. t2.start();
  13. }
  14. }

运行结果
在这里插入图片描述
在这里插入图片描述
运行达到案例预期效果。

消息通信机制

消息传递方式采取的是线程之间的直接通信,不同的线程之间通过显式的发送消息来达到交互目的。消息传递最有名的方式应该是actor模型了。在这种模型下,一切都是actor,所有的actor之间的通信都必须通过传递消息才能达到。每个actor都有一个收件箱(消息队列)用来保存收到其他actor传递来的消息。actor自己也可以给自己发送消息,当然这种情况是比较特殊的,比价少见。

在这里插入图片描述

等待/通知机制

在java中,Wait/Notify(NotifyAll)是一个非常常见的消息通信机制,这种机制有点像嵌入式中的中断机制,下文就着重探讨案例,案例要求如下:现有有个中断、中断处理器,CPU三个角色,当中断发起时,首先经过中断处理器处理,若是不可屏蔽中断则直接交由cpu处理,若是可屏蔽中断则现先由断处理器处理,然后再交由cpu处理,需要用代码简单的模拟一下这个过程,案例代码如下。
中断类

  1. public class Interruption {
  2. private String interruptType;
  3. private String treatment;
  4. private boolean interruptFlag=false;
  5. public Interruption(String interruptType, String treatment, boolean interruptFlag) {
  6. this.interruptType = interruptType;
  7. this.treatment = treatment;
  8. this.interruptFlag = interruptFlag;
  9. }
  10. public Interruption(String interruptType) {
  11. this.interruptType = interruptType;
  12. }
  13. public Interruption() {
  14. }
  15. public String getInterruptType() {
  16. return interruptType;
  17. }
  18. public void setInterruptType(String interruptType) {
  19. this.interruptType = interruptType;
  20. }
  21. public String getTreatment() {
  22. return treatment;
  23. }
  24. public void setTreatment(String treatment) {
  25. this.treatment = treatment;
  26. }
  27. public boolean isInterruptFlag() {
  28. return interruptFlag;
  29. }
  30. public void setInterruptFlag(boolean interruptFlag) {
  31. this.interruptFlag = interruptFlag;
  32. }
  33. @Override
  34. public String toString() {
  35. return "Interruption{" +
  36. "interruptType='" + interruptType + '\'' +
  37. ", treatment='" + treatment + '\'' +
  38. ", interruptFlag=" + interruptFlag +
  39. '}';
  40. }
  41. }

中断处理器类

  1. //extends thread
  2. public class InterruptHandler extends Thread{
  3. private Interruption interruption;
  4. public InterruptHandler(Interruption interruption,String name) {
  5. super(name);
  6. this.interruption = interruption;
  7. }
  8. public InterruptHandler(Interruption interruption) {
  9. this.interruption = interruption;
  10. }
  11. @Override
  12. public void run(){
  13. int count =0;
  14. //模拟总中断开关寄存器相关动作
  15. while (true){
  16. synchronized (interruption){
  17. //now there are some interrupt message in interruptHandler
  18. if(interruption.isInterruptFlag()){
  19. try {
  20. interruption.wait();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. //interruptHandler is empty
  26. System.out.println("interruptHandler is empty,start produce some interrupt");
  27. if(count%2==0){
  28. interruption.setInterruptType("MI interruption");
  29. interruption.setTreatment("response delay");
  30. }else {
  31. interruption.setInterruptType("NMI interruption");
  32. interruption.setTreatment("response immediately");
  33. }
  34. count++;
  35. System.out.println(interruption.getInterruptType()+"is readyCPU processing should be :" +
  36. interruption.getTreatment());
  37. try {
  38. Thread.sleep(2000);
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. interruption.setInterruptFlag(true);
  43. interruption.notify();
  44. }
  45. }
  46. }
  47. }

CPU类

  1. public class CPU implements Runnable{
  2. private Interruption interruption;
  3. private String threadName;
  4. public CPU(Interruption interruption, String threadName) {
  5. this.interruption = interruption;
  6. this.threadName = threadName;
  7. }
  8. public CPU(Interruption interruption) {
  9. this.interruption = interruption;
  10. }
  11. @Override
  12. public void run() {
  13. while (true){
  14. synchronized (interruption){
  15. if(!interruption.isInterruptFlag()){
  16. try {
  17. interruption.wait();
  18. } catch (InterruptedException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. System.out.println("CPU is processing"+interruption.getInterruptType()+"and will"+
  23. interruption.getTreatment());
  24. try {
  25. Thread.sleep(1000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. System.out.println("finished");
  30. System.out.println("----------------------------------------");
  31. interruption.setInterruptFlag(false);
  32. interruption.notify();
  33. }
  34. }
  35. }
  36. }

测试类

  1. public class DemoTest {
  2. public static void main(String[] args) {
  3. Interruption interruption = new Interruption();
  4. InterruptHandler interruptHandler = new InterruptHandler(interruption,"interruptionHandler");
  5. interruptHandler.start();
  6. CPU cpu = new CPU(interruption);
  7. new Thread(cpu,"cpu").start();
  8. }
  9. }

运行结果
在这里插入图片描述
运行达到案例预期效果。

管道通信机制

案例

消息制造者A则需要产生消息且将消息放入管道队列中,然后打印放入队列中的消息,消息消费者提取管道队里中的消息,并将提取的消息打印到控制台。

方法一

通过PipedInputStream和PipedOutputStream来实现线程之间的通信,手先由ThreadWrite线程产生消息放入队列中,然后又ThreadRead线程来消费消息,直接上代码。
WriteData类

  1. public class WriteData {
  2. public void writeMethod(PipedOutputStream out){
  3. try {
  4. System.out.println("write :");
  5. int maxSize =50;
  6. StringBuffer outData1=new StringBuffer();
  7. String outData =null;
  8. for(int i=0;i<maxSize;i++){
  9. outData=""+(i+1);
  10. out.write(outData.getBytes());
  11. if(i==0){
  12. outData1.append("["+(i+1)+",");
  13. }else if(i==maxSize-1){
  14. outData1.append(i+1+ "]"+"\n");
  15. }else {
  16. if((i+1)%5==0){
  17. outData1.append(i+1+"]"+"\n");
  18. outData1.append("[");
  19. }else {
  20. outData1.append(i+1+ ",");
  21. }
  22. }
  23. }
  24. String s = outData1.toString();
  25. System.out.println(s);
  26. System.out.println();
  27. out.close();
  28. } catch (Exception e) {
  29. // TODO: handle exception
  30. }
  31. }
  32. }

ReadData类

  1. public class ReadData {
  2. public void readMethod(PipedInputStream inputStream){
  3. try {
  4. System.out.println("read:");
  5. byte[] byteArray=new byte[128];
  6. int readLength = inputStream.read(byteArray);
  7. while (readLength != -1){
  8. String newData = new String(byteArray, 0, readLength);
  9. System.out.println(newData);
  10. readLength=inputStream.read(byteArray);
  11. }
  12. System.out.println();
  13. inputStream.close();
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }

ThreadWrite类

  1. public class ThreadWrite implements Runnable{
  2. WriteData writeData;
  3. PipedOutputStream outputStream;
  4. public ThreadWrite(WriteData writeData,PipedOutputStream outputStream){
  5. this.writeData = writeData;
  6. this.outputStream = outputStream;
  7. }
  8. @Override
  9. public void run() {
  10. writeData.writeMethod(outputStream);
  11. }
  12. }

ThreadRead

  1. public class ThreadRead extends Thread{
  2. ReadData readData;
  3. PipedInputStream inputStream;
  4. public ThreadRead(ReadData readData, PipedInputStream inputStream) {
  5. this.readData = readData;
  6. this.inputStream = inputStream;
  7. }
  8. @Override
  9. public void run() {
  10. readData.readMethod(inputStream);
  11. }
  12. }

测试类

  1. public class TestDemo1 {
  2. public static void main(String[] args) {
  3. try {
  4. WriteData writeData=new WriteData();
  5. ReadData readData=new ReadData();
  6. PipedInputStream input=new PipedInputStream();
  7. PipedOutputStream out=new PipedOutputStream();
  8. out.connect(input);
  9. ThreadRead threadRead=new ThreadRead(readData, input);
  10. threadRead.start();
  11. Thread.sleep(2000);
  12. ThreadWrite threadWriteImpl=new ThreadWrite(writeData, out);
  13. Thread threadWrite = new Thread(threadWriteImpl,"threadWrite");
  14. threadWrite.start();
  15. } catch (Exception e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }

测试结果
在这里插入图片描述
测试结果非常完美。

方法二

案例二则采用PipedReader和PipedWriter类做管道通讯,方法二与方法本质的区别就是InputStream、OutputStrean和Reader、Writer的区别,代码实现如下.
WriteData类

  1. public class WriteData {
  2. public void writeData(PipedWriter writer){
  3. int maxSize =50;
  4. String outData=null;
  5. StringBuffer outData1= new StringBuffer();
  6. System.out.println("write:");
  7. try {
  8. for (int i = 0; i < maxSize; i++) {
  9. outData=""+(i+1);
  10. writer.write(outData);
  11. if(i==0){
  12. outData1.append("["+(i+1)+",");
  13. }else if(i==maxSize-1){
  14. outData1.append(i+1+"]"+"\n");
  15. }else {
  16. if((i+1)%5==0){
  17. outData1.append(i+1+"]"+"\n");
  18. }else {
  19. outData1.append(i+1+",");
  20. }
  21. }
  22. }
  23. String s = outData1.toString();
  24. System.out.println(s);
  25. writer.close();
  26. }catch (IOException e){
  27. e.printStackTrace();
  28. }
  29. }
  30. }

ReadData类

  1. public class ReadData {
  2. public void readMethod(PipedReader read){
  3. try {
  4. char [] byteArray = new char[64];
  5. int readLength = read.read(byteArray);
  6. System.out.println("read:");
  7. while (readLength !=-1){
  8. String newData = new String(byteArray, 0, readLength);
  9. System.out.print(newData);
  10. readLength = read.read(byteArray);
  11. }
  12. System.out.println();
  13. read.close();
  14. } catch (IOException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }

ThreadWriter类

  1. public class ThreadWriter extends Thread {
  2. WriteData out;
  3. PipedWriter writer;
  4. public ThreadWriter(WriteData out, PipedWriter writer) {
  5. this.out = out;
  6. this.writer = writer;
  7. }
  8. @Override
  9. public void run() {
  10. out.writeData(writer);
  11. }
  12. }

测试类

  1. public class DemoTest2 {
  2. public static void main(String[] args) {
  3. try {
  4. WriteData writeData = new WriteData();
  5. ReadData readData = new ReadData();
  6. PipedWriter pipedWriter = new PipedWriter();
  7. PipedReader pipedReader = new PipedReader();
  8. pipedWriter.connect(pipedReader);
  9. ThreadWriter threadWriter = new ThreadWriter(writeData,pipedWriter);
  10. threadWriter.start();
  11. Thread.sleep(2000);
  12. ThreadReader threadReader = new ThreadReader(readData, pipedReader);
  13. Thread threadReader1 = new Thread(threadReader, "threadReader");
  14. threadReader1.start();
  15. } catch (InterruptedException | IOException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }

通过方法一和方法二的对比,发现两种方法都基本一致,最大的区别在于消息生产者将消息放入管道队列的方式,结果如下。
在这里插入图片描述
PipeOutputStream写入管道队列之前需要先将写入对象转换成二进制,而PipedWriter则不需要转换,直接写入即可。
测试结果:
在这里插入图片描述

总结

总之多线程之间的通信方式可以分为两种机制,内存共享机制和消息传递机制,其中内存共享机制中,常见的有同步机制和轮询机制,而同步机制中常见有同步方法,同步代码块,lock锁;而消息传递机制中常见的有等待/通知机制和管道机制,这些通信方式有各自的应用场合,从而使得多线程通信更加丰富多姿,更加完善,小猿在此恳请各位同学指正批评。

发表评论

表情:
评论列表 (有 0 条评论,128人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线12/线通信

    线程通信的例子:使用两个线程打印1-100。线程1,线程2交替打印 前置知识: 1. wait(); 一旦执行此方法,当前线程就进入阻塞状态,并释放同步监视器。

    相关 Java 线-线通信

    最近,美美非常的爱吃栗子,剥栗子却有些麻烦,这个任务理所当然的交给了帅帅,每一次,帅帅都会把热气腾腾的栗子剥好,然后放进一个盘子里,而美美每次都会从盘子里拿一个栗子吃: !

    相关 线(3)- 线通信

    线程之间的通信: 多个线程在处理同一个资源,但是处理的动作(线程的任务)却不相同。通过一定的手段使各个线程能有效的利用资源。而这种手段即—— 等待唤醒机制。 等待唤醒机...