Java 多线程(三) 线程通信

╰半橙微兮° 2022-03-02 00:58 403阅读 0赞

前言

在前一章我们介绍了线程中较为重要的几个关键字synchronizedvolatile.synchronized关键字主要是用于标示线程的同步关系与锁.volatile主要是用于将线程内的局部变量与进程总变量之间的交互关系.

本文我们将介绍下线程之间的相互通信.本章主要包括如下的几个部分的内容:

  • wait()方法与notify()方法
  • join()方法
  • 经典的生产者&消费者实现
  • ThreadLocal类的使用

正文

wait()方法与notify()方法

在说明wait()与notify()方法之前,我们先说下等待/通知机制.
有如下的两个线程:

  • A 线程执行 -》线程挂起 -》线程接受通知
  • B 线程执行 -》线程通知A线程 -》 线程运行结束
    其实这特别像我们生活中的一些场景.
    小A: 小B,明天7点叫我起床.然后A睡觉了,也就是进行挂起,进行等待.
    小B: 做自己的事情,到B点的时候.通知A进行起床.
    在设计模式内,这是通知者模式的一种反向运用.

OK,我们扯回原来的话题.那么,上述的等待/通知 机制在Java内是如何实现的?
在Java内的锁Lock的概念并未设计出之前,我们是通过wait()/notify()的方式进行实现的.这个做法有个缺点,就是无法准确的通知到某个线程,只能大概的通知.

  1. class WaitThread extends Thread{
  2. private Object lock;
  3. public WaitThread(Object lock){
  4. this.lock = lock;
  5. }
  6. public void run(){
  7. System.out.println("Thread:"+Thread.currentThread().getName()+"Wait begin."+System.currentTimeMillis());
  8. synchronized (lock) {
  9. try {
  10. lock.wait();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. System.out.println("Thread:"+Thread.currentThread().getName()+"Wait end."+System.currentTimeMillis());
  16. }
  17. }
  18. class NotifyThread extends Thread{
  19. private Object lock;
  20. public NotifyThread(Object lock){
  21. this.lock = lock;
  22. }
  23. public void run(){
  24. System.out.println("Thread:"+Thread.currentThread().getName()+"Notify begin."+System.currentTimeMillis());
  25. synchronized (lock) {
  26. try {
  27. lock.notify();
  28. Thread.sleep(2000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. System.out.println("Thread:"+Thread.currentThread().getName()+"Notify end."+System.currentTimeMillis());
  34. }
  35. }
  36. public class WaitNotifyThread {
  37. public static void main(String[] args) {
  38. Object lock = new Object();
  39. Thread threadA = new WaitThread(lock);
  40. Thread threadB = new NotifyThread(lock);
  41. threadA.start();
  42. threadB.start();
  43. }
  44. }
  45. //Thread:Thread-0Wait begin.1552965862322
  46. //Thread:Thread-1Notify begin.1552965862322
  47. //Thread:Thread-1Notify end.1552965864327
  48. //Thread:Thread-0Wait end.1552965864327

根据上述例子,我们可以知道如下几个特性:

  • 两个线程之间要想通信,那么就需要使用同一个对象(类比锁对象)进行依托.像上文中的Object lock就是如此.
  • 此外,当线程在运行wait()notfiy()的时候,必须要获取线程的锁.
  • 最后,wait()notify()方法是Object类的接口,也就是任何对象都可以使用wait()/notify()方法进行线程的通信.
  • PS: 根据上面的输出可以得知,当运行wait()方法时,线程立即释放当前的对象锁.(也就是wait()方法下方的代码不会再进行执行下去).当运行notify()方法时,线程不会立即释放锁,而是等当前的notify()的线程执行完成后,再进行释放.
Wait / Notify Tips
  • 当线程wait()之后,调用interrupt()方法将其关闭时,会导致抛出异常;
  • notify()方法一次只能通知一个线程;notifyall()的方法可以通知所有的线程.
  • 可以使用wait(long)时间段的时间,来解决死锁和长时间未获得通知的问题.
生产者 / 消费者模式

所谓的生产者/消费者模式,简单来说就是一方发送数据,一方接收数据.我们同样可以使用wait()/notify()进行模拟实现.

  1. class ConsumerThread extends Thread{
  2. private Object lock;
  3. public ConsumerThread(Object lock){
  4. this.lock = lock;
  5. }
  6. public void consume(){
  7. synchronized (lock) {
  8. try {
  9. while ("".equals(ProducerConsumerThread.msg)) {
  10. lock.wait();
  11. }
  12. System.out.println("Thread:"+Thread.currentThread().getName()+"Msg "+ ProducerConsumerThread.msg);
  13. ProducerConsumerThread.msg = "";
  14. lock.notify();
  15. } catch (InterruptedException e) {
  16. // TODO Auto-generated catch block
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. public void run(){
  22. while(true){
  23. consume();
  24. }
  25. }
  26. }
  27. class ProducerThread extends Thread{
  28. private Object lock;
  29. public ProducerThread(Object lock){
  30. this.lock = lock;
  31. }
  32. public void produce(){
  33. synchronized (lock) {
  34. try {
  35. while (!"".equals(ProducerConsumerThread.msg)) {
  36. lock.wait();
  37. }
  38. System.out.println("Thread:"+Thread.currentThread().getName()+"Msg "+ ProducerConsumerThread.msg);
  39. ProducerConsumerThread.msg = "MSG"+System.currentTimeMillis();
  40. lock.notify();
  41. } catch (InterruptedException e) {
  42. // TODO Auto-generated catch block
  43. e.printStackTrace();
  44. }
  45. }
  46. }
  47. public void run(){
  48. while(true){
  49. produce();
  50. }
  51. }
  52. }
  53. public class ProducerConsumerThread {
  54. public static String msg = "";
  55. public static void main(String[] args) {
  56. Object lock = new Object();
  57. Thread producerThread = new ProducerThread(lock);
  58. Thread consumerThread = new ConsumerThread(lock);
  59. producerThread.start();
  60. try {
  61. Thread.sleep(1000);
  62. } catch (InterruptedException e) {
  63. e.printStackTrace();
  64. }
  65. consumerThread.start();
  66. }
  67. }
  68. //Thread:Thread-0Msg
  69. //Thread:Thread-1Msg MSG1552968574301
  70. //Thread:Thread-0Msg
  71. //Thread:Thread-1Msg MSG1552968574301
  72. //Thread:Thread-0Msg
  73. //Thread:Thread-1Msg MSG1552968574301
  74. //...

在本实例中,我们使用了String对象作为了承载消息的载体.值得注意的是,控制锁通信的变量与消息的载体尽量不要使用同一个.

多对多消息
当遇到多对多的消息时候,我们通常采用notifyAll()方法,唤醒所有的线程.

  • List消息体实现
    我们还可以通过List消息体进行实现.但是值得注意的是List类型的实现类,无论是ArrayList类型还是LinkedList类型都是线程不安全的.我们可以使用对象锁,进行处理.
    这种直接使用synchronized关键字的做法是非常低效率的.后面我们在讲解JDK1.5LinkedBlockedQueue的时候讲解下使用RententLock锁与CAS机制来对于这块的部分进行优化.

    import java.util.ArrayList;
    import java.util.List;

    class ConsumerListThread extends Thread{

    1. private Object lock;
    2. public ConsumerListThread(Object lock){
    3. this.lock = lock;
    4. }
    5. public void consume(){
    6. synchronized (lock) {
    7. try {
    8. while(ProducerConsumerListThread.list.size() == ProducerConsumerListThread.index){
    9. lock.wait();
    10. }
    11. // 获取对象锁
    12. synchronized(ProducerConsumerListThread.list){
    13. String message = ProducerConsumerListThread.list.get(ProducerConsumerListThread.index++);
    14. System.out.println("Thread:"+Thread.currentThread().getName()+"Consume Msg "+ message);
    15. }
    16. lock.notify();
    17. } catch (InterruptedException e) {
    18. // TODO Auto-generated catch block
    19. e.printStackTrace();
    20. }
    21. }
    22. }
    23. public void run(){
    24. while(true){
    25. consume();
    26. }
    27. }

    }
    class ProducerListThread extends Thread{

    1. private Object lock;
    2. public ProducerListThread(Object lock){
    3. this.lock = lock;
    4. }
    5. public void produce(){
    6. synchronized (lock) {
    7. try {
    8. // offer index - 标识末尾 说明
    9. while(ProducerConsumerListThread.list.size() != ProducerConsumerListThread.index){
    10. lock.wait();
    11. }
    12. // 获取对象锁
    13. synchronized(ProducerConsumerListThread.list){
    14. String message = "MSG:"+System.currentTimeMillis();
    15. ProducerConsumerListThread.list.add(message);
    16. System.out.println("Thread: "+Thread.currentThread().getName()+"Produce Msg "+ message);
    17. }
    18. lock.notify();
    19. } catch (InterruptedException e) {
    20. // TODO Auto-generated catch block
    21. e.printStackTrace();
    22. }
    23. }
    24. }
    25. public void run(){
    26. while(true){
    27. produce();
    28. }
    29. }

    }

    public class ProducerConsumerListThread {

    1. public static List<String> list = new ArrayList<>();
    2. public static int index = 0;
    3. public static void main(String[] args) {
    4. Object lock = new Object();
    5. Thread producerThread = new ProducerListThread(lock);
    6. Thread consumerThread = new ConsumerListThread(lock);
    7. producerThread.start();
    8. try {
    9. Thread.sleep(1000);
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. consumerThread.start();
    14. }

    }
    //Thread: Thread-0Produce Msg MSG:1552977964484
    //Thread:Thread-1Consume Msg MSG:1552977964484
    //Thread: Thread-0Produce Msg MSG:1552977964484
    //Thread:Thread-1Consume Msg MSG:1552977964484
    //Thread: Thread-0Produce Msg MSG:1552977964484
    //Thread:Thread-1Consume Msg MSG:1552977964484

输出的结果如上所示.

join()方法

在现实生活中,我们通常需要等待某样东西的结果再进行运算.比如:医生需要对于病人的病情诊断前,通常需要得到病人的检测结果(检测A/检测B/检测C).这种场景使用Java的join()方法即可实现.(当然在JDK1.5之后,JUC包中还提供了CountDownLaunch/CyclicBarrier/Semaphore类优化以及简便完成了这部分的内容.这个我们在后续的章节中会进行讲解.)

  1. class SonThread extends Thread{
  2. public void run(){
  3. for(int i=0;i<10;i++){
  4. synchronized(JoinThread.num){
  5. JoinThread.num = JoinThread.num+1;
  6. // System.out.println(JoinThread.num);
  7. }
  8. }
  9. }
  10. }
  11. public class JoinThread{
  12. public static Integer num=0;
  13. public static void main(String []args){
  14. Thread threadA = new SonThread();
  15. Thread threadB = new SonThread();
  16. try {
  17. threadA.start();
  18. threadB.start();
  19. threadA.join();
  20. threadB.join();
  21. } catch (InterruptedException e) {
  22. e.printStackTrace();
  23. }
  24. System.out.println("Num = "+num);
  25. }
  26. }
  27. // Num = 20
  • join()方法可以使当前线程在 join()方法完成后再进行执行.
  • wait()方法类似,join()也有join(long)这样的接口.防止等待的时间过长,或者发生死锁的情况.
  • wait()方法类似, 在当前线程join()时候,被interrupt()中断时.也会抛出InterruptedException异常.
  • join()wait()方法一致,在方法运行后,都会释放当前线程所包含的锁.sleep()方法是不会释放锁的.(这在前文已经有所提及了.)
  • 在使用join(long)的接口的时候,需要特别注意锁的释放与争抢的问题.

    public final synchronized void join(long millis)

    1. throws InterruptedException {
    2. long base = System.currentTimeMillis();
    3. long now = 0;
    4. if (millis < 0) {
    5. throw new IllegalArgumentException("timeout value is negative");
    6. }
    7. if (millis == 0) {
    8. while (isAlive()) {
    9. wait(0);
    10. }
    11. } else {
    12. while (isAlive()) {
    13. long delay = millis - now;
    14. if (delay <= 0) {
    15. break;
    16. }
    17. wait(delay);
    18. now = System.currentTimeMillis() - base;
    19. }
    20. }
    21. }

多线程中的join方法——源码分析

ThreadLocal类的使用

根据第二章可以知道,当我们访问同一个变量时候,通常需要加入synchronized关键字以获取锁.但是,如果我们需要在每个线程内都具有自己独自的变量的时候,应当如何解决呢? 不用担心,Java为我们提供了ThreadLocal对象以用于存储线程变量.

ThreadLocal类的使用非常简单,我们可以将其当作一个集合.对于ThreadLocal有时操作集合对象,有时操作普通对象.其实例皆如下所示:

  1. import java.util.HashMap;
  2. class ThreadLocalThread extends Thread{
  3. public void run(){
  4. System.out.println("Thread:"+Thread.currentThread().getName()+"Begin Set "+ ThreadLocalDemo.threadlocalInteger.get());
  5. // set
  6. ThreadLocalDemo.threadlocalInteger.set(456);
  7. System.out.println("Thread:"+Thread.currentThread().getName()+"After Set "+ ThreadLocalDemo.threadlocalInteger.get());
  8. // set2
  9. ThreadLocalDemo.threadlocalHashMap.set(new HashMap());
  10. ThreadLocalDemo.threadlocalHashMap.get().put("hello", "sonThread");
  11. System.out.println("Thread:"+Thread.currentThread().getName()+"Hash Map After Set "+ ThreadLocalDemo.threadlocalHashMap.get().get("hello"));
  12. }
  13. }
  14. public class ThreadLocalDemo {
  15. // 1. 普通对象
  16. public static ThreadLocal<Integer> threadlocalInteger = new ThreadLocal<>();
  17. // 2. 集合对象
  18. public static ThreadLocal<HashMap> threadlocalHashMap = new ThreadLocal<>();
  19. public static void main(String[] args) {
  20. threadlocalInteger.set(123);
  21. threadlocalHashMap.set(new HashMap());
  22. threadlocalHashMap.get().put("hello", "main");
  23. ThreadLocalThread sonThread = new ThreadLocalThread();
  24. try {
  25. Thread.sleep(2000);
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. sonThread.start();
  30. System.out.println("Thread:"+Thread.currentThread().getName()+"After Set "+ ThreadLocalDemo.threadlocalInteger.get());
  31. System.out.println("Thread:"+Thread.currentThread().getName()+"Hash Map After Set "+ ThreadLocalDemo.threadlocalHashMap.get().get("hello"));
  32. }
  33. }
  34. //Thread:mainAfter Set 123
  35. //Thread:Thread-0Begin Set null
  36. //Thread:mainHash Map After Set main
  37. //Thread:Thread-0After Set 456
  38. //Thread:Thread-0Hash Map After Set sonThread

预期结果如上所示.可以看到,变量在两个线程内各不相同.对于不论是简单数据类型还是集合类型都是如此.

  • ThreadLocal的自动初始化(重写initialValuele类)
    不知道大家是否注意到上述的输出中的这句话:Thread:Thread-0Begin Set null.这是因为ThreadLocal在进行初始化操作的时候是null的.我们在使用之前,需要先进行赋值.
    解决办法1.使用之前,需要先进行赋值. 2.重写ThreadLocal的初始化方法.(个人不是特别推荐 因为后期有可能维护稍微有点困难.)

    class ThreadLocalExt extends ThreadLocal{

    1. protected Object initialValue(){
    2. return "First Object";
    3. }

    }

  • 继承类 InheritableThreadLocal

    import java.util.HashMap;

    class InheritThreadLocalThread extends Thread{

    1. public void run(){
    2. System.out.println("Thread:"+Thread.currentThread().getName()+"Parent Set "+ InheritThreadLocalDemo.threadlocalInteger.get());
    3. InheritThreadLocalDemo.threadlocalInteger.set(77777);
    4. System.out.println("Thread:"+Thread.currentThread().getName()+"After Son Set "+ InheritThreadLocalDemo.threadlocalInteger.get());
    5. }

    }
    public class InheritThreadLocalDemo {

    1. // 1. 普通对象
    2. public static InheritableThreadLocal<Integer> threadlocalInteger = new InheritableThreadLocal<>();
    3. public static void main(String[] args) {
    4. threadlocalInteger.set(4396);
    5. Thread sonThread = new InheritThreadLocalThread();
    6. try {
    7. Thread.sleep(2000);
    8. sonThread.start();
    9. sonThread.join();
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. System.out.println("Thread:"+Thread.currentThread().getName()+"Final"+ InheritThreadLocalDemo.threadlocalInteger.get());
    14. }

    }
    //Thread:Thread-0Parent Set 4396
    //Thread:Thread-0After Son Set 77777
    //Thread:mainFinal4396

    //Thread:Thread-0Parent Set 4396
    //Thread:Thread-0After Son Set 77777
    //Thread:Thread-0Parent Set parentThread
    //Thread:Thread-0After Son Set sonThread
    //Thread:mainFinal4396
    //Thread:mainParent Final Set sonThread

由上述可以得知: 1. 子线程继承父线程的InheritableThreadLocal中的值. 2. 子线程拥有自己的独立空间, 子线程更改后,父线程并不会受到影响. 3. 对于集合类型来说,父和子其实使用的是同一块空间.(个人理解这其实是和软拷贝有关系.)

继承后的同时,可以重写protected Object childValue(Object parentValue) 方法进行自定义配置.但个人不是特别建议这么使用.因为这会破坏使用的运行逻辑.

  • 注: ThreadLocal其实维护的是ThreadName - Value的一个HashMap,因为线程不具有重名的特性,所以并不会出现线程冲突的情况.

Tips

  • java.lang.IllegalMonitorStateException
    在使用wait()方法内获取对象的锁 / 当前线程无法获取某个对象的锁
    并发 错误 java.lang.IllegalMonitorStateException: current thread not owner 分析

Reference

[1]. 多线程中的join方法——源码分析
[2]. 一个优雅的threadLocal工具类
[3]. Java 多线程编程核心技术
[4]. Java并发编程的艺术

发表评论

表情:
评论列表 (有 0 条评论,403人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java线04——线通信

    1 线程通信机制 线程通信指的是不同线程之间可以交换一些实时的数据信息。 线程是操作系统中的独立个体,但这些个体如果不经过特殊处理就不能成为一个整体,线程间的通信就成为

    相关 线12/线通信

    线程通信的例子:使用两个线程打印1-100。线程1,线程2交替打印 前置知识: 1. wait(); 一旦执行此方法,当前线程就进入阻塞状态,并释放同步监视器。

    相关 Java 线-线通信

    最近,美美非常的爱吃栗子,剥栗子却有些麻烦,这个任务理所当然的交给了帅帅,每一次,帅帅都会把热气腾腾的栗子剥好,然后放进一个盘子里,而美美每次都会从盘子里拿一个栗子吃: !

    相关 线(3)- 线通信

    线程之间的通信: 多个线程在处理同一个资源,但是处理的动作(线程的任务)却不相同。通过一定的手段使各个线程能有效的利用资源。而这种手段即—— 等待唤醒机制。 等待唤醒机...