Java并发编程的艺术-Java并发编程基础

小鱼儿 2024-04-17 05:42 155阅读 0赞

第4章 Java并发编程基础

​ Java从诞生开始就明智地选择了内置对多线程的支持,这使得Java语言相比同一时期的其他语言具有明显的优势。线程作为操作系统调度的最小单元,多个线程能够同时执行,这将显著提升程序性能,在多核环境中表现得更加明显。但是,过多地创建线程和对线程的不当管理也容易造成问题。本章将着重介绍Java并发编程的基础知识,从启动一个线程到线程间不同的通信方式,最后通过简单的线程池示例以及应用(简单的Web服务器)来串联本章所介绍的内容。

4.1 线程简介

4.1.1 什么是线程

​ 现代操作系统在运行一个程序时,会为其创建一个进程。例如,启动一个Java程序,操作系统就会创建一个Java进程。现代操作系统调度的最小单元是线程,也叫轻量级进程(LightWeight Process),在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。
​ 一个Java程序从main()方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上Java程序天生就是多线程程序,因为执行main()方法的是一个名称为main的线程。下面使用JMX来查看一个普通的Java程序包含哪些线程,如代码清单4-1所示。

  1. 代码清单4-1 MultiThread.java
  2. public class MultiThread{
  3. public static void main(String[] args) {
  4. // 获取Java线程管理MXBean
  5. ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
  6. // 不需要获取同步的monitor和synchronizer信息,仅获取线程和线程堆栈信息
  7. ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
  8. // 遍历线程信息,仅打印线程ID和线程名称信息
  9. for (ThreadInfo threadInfo : threadInfos) {
  10. System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.
  11. getThreadName());
  12. }
  13. }
  14. }

输出如下所示(输出内容可能不同):

  1. [4] Signal Dispatcher  // 分发处理发送给JVM信号的线程
  2. [3] Finalizer     // 调用对象finalize方法的线程
  3. [2] Reference Handler // 清除Reference的线程
  4. [1] main       // main线程,用户程序入口

​ 可以看到,Java程序的运行不仅仅是main()方法的运行,而是main线程和多个其他线程的同时运行。

4.1.2 为什么使用多线程

​ 执行一个简单的“Hello,World!”,却启动了那么多的“无关”线程,是不是把简单的问题复杂化了?当然不是,因为正确使用多线程,总是能够给开发人员带来显著的好处,而使用多线程的原因主要有以下几点。
​ (1)更多的处理器核心
​ 随着处理器上的核心数量越来越多,以及超线程技术的广泛运用,现在大多数计算机都比以往更加擅长并行计算,而处理器性能的提升方式,也从更高的主频向更多的核心发展。如何利用好处理器上的多个核心也成了现在的主要问题。线程是大多数操作系统调度的基本单元,一个程序作为一个进程来运行,程序运行过程中能够创建多个线程,而一个线程在一个时刻只能运行在一个处理器核心上。试想一下,一个单线程程序在运行时只能使用一个处理器核心,那么再多的处理器核心加入也无法显著提升该程序的执行效率。相反,如果该程序使用多线程技术,将计算逻辑分配到多个处理器核心上,就会显著减少程序的处理时间,并且随着更多处理器核心的加入而变得更有效率。
​ (2)更快的响应时间
​ 有时我们会编写一些较为复杂的代码(这里的复杂不是说复杂的算法,而是复杂的业务逻辑),例如,一笔订单的创建,它包括插入订单数据、生成订单快照、发送邮件通知卖家和记录货品销售数量等。用户从单击“订购”按钮开始,就要等待这些操作全部完成才能看到订购成功的结果。但是这么多业务操作,如何能够让其更快地完成呢?在上面的场景中,可以使用多线程技术,即将数据一致性不强的操作派发给其他线程处理(也可以使用消息队列),如生成订单快照、发送邮件等。这样做的好处是响应用户请求的线程能够尽可能快地处理完成,缩短了响应时间,提升了用户体验。
​ (3)更好的编程模型
​ Java为多线程编程提供了良好、考究并且一致的编程模型,使开发人员能够更加专注于问题的解决,即为所遇到的问题建立合适的模型,而不是绞尽脑汁地考虑如何将其多线程化。一旦开发人员建立好了模型,稍做修改总是能够方便地映射到Java提供的多线程编程模型上。

4.1.3 线程优先级

​ 现代操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下次分配。线程分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器资源的线程属性。
​ 在Java线程中,通过一个整型成员变量priority来控制优先级,优先级的范围从1~10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分配时间片的数量要多于优先级低的线程。设置线程优先级时,针对频繁阻塞(休眠或者I/O操作)的线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。在不同的JVM以及操作系统上,线程规划会存在差异,有些操作系统甚至会忽略对线程优先级的设定,示例如代码清单4-2所示。

  1. 代码清单4-2 Priority.java
  2. public class Priority {
  3. private static volatile boolean notStart = true;
  4. private static volatile boolean notEnd = true;
  5. public static void main(String[] args) throws Exception {
  6. List<Job> jobs = new ArrayList<Job>();
  7. for (int i = 0; i < 10; i++) {
  8. int priority = i < 5 Thread.MIN_PRIORITY : Thread.MAX_PRIORITY;
  9. Job job = new Job(priority);
  10. jobs.add(job);
  11. Thread thread = new Thread(job, "Thread:" + i);
  12. thread.setPriority(priority);
  13. thread.start();
  14. }
  15. notStart = false;
  16. TimeUnit.SECONDS.sleep(10);
  17. notEnd = false;
  18. for (Job job : jobs) {
  19. System.out.println("Job Priority : " + job.priority + ",
  20. Count : " + job.jobCount);
  21. }
  22. }
  23. static class Job implements Runnable {
  24. private int priority;
  25. private long jobCount;
  26. public Job(int priority) {
  27. this.priority = priority;
  28. }
  29. public void run() {
  30. while (notStart) {
  31. Thread.yield();
  32. }
  33. while (notEnd) {
  34. Thread.yield();
  35. jobCount++;
  36. }
  37. }
  38. }
  39. }

​ 运行该示例,在笔者机器上对应的输出如下。

  1. Job Priority : 1, Count : 1259592
  2. Job Priority : 1, Count : 1260717
  3. Job Priority : 1, Count : 1264510
  4. Job Priority : 1, Count : 1251897
  5. Job Priority : 1, Count : 1264060
  6. Job Priority : 10, Count : 1256938
  7. Job Priority : 10, Count : 1267663
  8. Job Priority : 10, Count : 1260637
  9. Job Priority : 10, Count : 1261705
  10. Job Priority : 10, Count : 1259967

​ 从输出可以看到线程优先级没有生效,优先级1和优先级10的Job计数的结果非常相近,没有明显差距。这表示程序正确性不能依赖线程的优先级高低。

注意:线程优先级不能作为程序正确性的依赖,因为操作系统可以完全不用理会Java线程对于优先级的设定。

4.1.4 线程的状态

​ 线程在运行的生命周期中可能处于表4-1所示的6种不同的状态,在给定的一个时刻,线程只能处于其中的一个状态。

​ 表4-1 Java线程的状态


































状态名称 说明
NEW 初始状态,线程被构建,还是还没调用start()方法
RUNNABLE 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统地称作“运行中”
BLOCK 阻塞状态,表示线程阻塞于锁
WAITING 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要等待其他线程做出一些特定动作(通知或中断)
TIME_WAITING 超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的
TERMINATED 终止状态,表示当前线程已经执行完毕

下面我们使用jstack工具(可以选择打开终端,键入jstack或者到JDK安装目录的bin目录下
执行命令),尝试查看示例代码运行时的线程信息,更加深入地理解线程状态,示例如代码清
单4-3所示。

  1. 代码清单4-3 ThreadState.java
  2. public class ThreadState {
  3. public static void main(String[] args) {
  4. new Thread(new TimeWaiting (), "TimeWaitingThread").start();
  5. new Thread(new Waiting(), "WaitingThread").start();
  6. // 使用两个Blocked线程,一个获取锁成功,另一个被阻塞
  7. new Thread(new Blocked(), "BlockedThread-1").start();
  8. new Thread(new Blocked(), "BlockedThread-2").start();
  9. }
  10. // 该线程不断地进行睡眠
  11. static class TimeWaiting implements Runnable {
  12. @Override
  13. public void run() {
  14. while (true) {
  15. SleepUtils.second(100);
  16. }
  17. }
  18. }
  19. // 该线程在Waiting.class实例上等待
  20. static class Waiting implements Runnable {
  21. @Override
  22. public void run() {
  23. while (true) {
  24. synchronized (Waiting.class) {
  25. try {
  26. Waiting.class.wait();
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. }
  33. }
  34. // 该线程在Blocked.class实例上加锁后,不会释放该锁
  35. static class Blocked implements Runnable {
  36. public void run() {
  37. synchronized (Blocked.class) {
  38. while (true) {
  39. SleepUtils.second(100);
  40. }
  41. }
  42. }
  43. }
  44. }

​ 上述示例中使用的SleepUtils如代码清单4-4所示。

  1. public class SleepUtils {
  2. public static final void second(long seconds) {
  3. try {
  4. TimeUnit.SECONDS.sleep(seconds);
  5. } catch (InterruptedException e) {
  6. }
  7. }
  8. }

​ 运行该示例,打开终端或者命令提示符,键入“jps”,输出如下。

  1. 611
  2. 935 Jps
  3. 929 ThreadState
  4. 270

​ 可以看到运行示例对应的进程ID是929,接着再键入“jstack 929”(这里的进程ID需要和读者自己键入jps得出的ID一致),部分输出如下所示。

  1. // BlockedThread-2线程阻塞在获取Blocked.class示例的锁上
  2. "BlockedThread-2" prio=5 tid=0x00007feacb05d000 nid=0x5d03 waiting for monitor
  3. entry [0x000000010fd58000]
  4. java.lang.Thread.State: BLOCKED (on object monitor)
  5. // BlockedThread-1线程获取到了Blocked.class的锁
  6. "BlockedThread-1" prio=5 tid=0x00007feacb05a000 nid=0x5b03 waiting on condition
  7. [0x000000010fc55000]
  8. java.lang.Thread.State: TIMED_WAITING (sleeping)
  9. // WaitingThread线程在Waiting实例上等待
  10. "WaitingThread" prio=5 tid=0x00007feacb059800 nid=0x5903 in Object.wait()
  11. [0x000000010fb52000]
  12. java.lang.Thread.State: WAITING (on object monitor)
  13. // TimeWaitingThread线程处于超时等待
  14. "TimeWaitingThread" prio=5 tid=0x00007feacb058800 nid=0x5703 waiting on condition
  15. [0x000000010fa4f000]
  16. java.lang.Thread.State: TIMED_WAITING (sleeping)

​ 通过示例,我们了解到Java程序运行中线程状态的具体含义。线程在自身的生命周期中,并不是固定地处于某个状态,而是随着代码的执行在不同的状态之间进行切换,Java线程状态变迁如图4-1示。

Java线程状态变迁

​ 由图4-1中可以看到,线程创建之后,调用start()方法开始运行。当线程执行wait()方法之后,线程进入等待状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而超时等待状态相当于在等待状态的基础上增加了超时限制,也就是超时时间到达时将会返回到运行状态。当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到阻塞状态。线程在执行Runnable的run()方法之后将会进入到终止状态。

注意:Java将操作系统中的运行和就绪两个状态合并称为运行状态。阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态,但是阻塞在java.concurrent包中Lock接口的线程状态却是等待状态,因为java.concurrent包中Lock接口对于阻塞的实现均使用了LockSupport类中的相关方法。

4.1.5 Daemon 线程

​ Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个Java虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出。可以通过调用Thread.setDaemon(true)将线程设置为Daemon线程。
注意:Daemon属性需要在启动线程之前设置,不能在启动线程之后设置。Daemon线程被用作完成支持性工作,但是在Java虚拟机退出时Daemon线程中的finally块并不一定会执行,示例如代码清单4-5所示。

  1. public class Daemon{
  2. public static void main(String[] args){
  3. Thread thread = new Thread(new DaemonRunner(),"DaemonRunner");
  4. thread.setDaemon(true);
  5. thread.start();
  6. }
  7. static class DaemonRunner implements Runnable{
  8. @Override
  9. public void run(){
  10. try{
  11. Thread.sleep(1000);
  12. }finally{
  13. system.out.println("DaemonThread finally run.");
  14. }
  15. }
  16. }
  17. }

​ 运行Daemon程序,可以看到在终端或者命令提示符上没有任何输出。main线程(非Daemon线程)在启动了线程DaemonRunner之后随着main方法执行完毕而终止,而此时Java虚拟机中已经没有非Daemon线程,虚拟机需要退出。Java虚拟机中的所有Daemon线程都需要立即终止,因此DaemonRunner立即终止,但是DaemonRunner中的finally块并没有执行。
注意:在构建Daemon线程时,不能依靠finally块中的内容来确保执行关闭或清理资源
的逻辑。

4.2 启动和终止线程

​ 在前面章节的示例中通过调用线程的start()方法进行启动,随着run()方法的执行完毕,线程也随之终止,大家对此一定不会陌生,下面将详细介绍线程的启动和终止。

4.2.1 构造线程

​ 在运行线程之前首先要构造一个线程对象,线程对象在构造的时候需要提供线程所需要的属性,如线程所属的线程组、线程优先级、是否是Daemon线程等信息。代码清单4-6所示的代码摘自java.lang.Thread中对线程进行初始化的部分。

  1. private void init(ThreadGroup g, Runnable target, String name,long stackSize,
  2. AccessControlContext acc) {
  3. if (name == null) {
  4. throw new NullPointerException("name cannot be null");
  5. }
  6. // 当前线程就是该线程的父线程
  7. Thread parent = currentThread();
  8. this.group = g;
  9. // 将daemon、priority属性设置为父线程的对应属性
  10. this.daemon = parent.isDaemon();
  11. this.priority = parent.getPriority();
  12. this.name = name.toCharArray();
  13. this.target = target;
  14. setPriority(priority);
  15. // 将父线程的InheritableThreadLocal复制过来
  16. if (parent.inheritableThreadLocals != null)
  17. this.inheritableThreadLocals=ThreadLocal.createInheritedMap(parent.
  18. inheritableThreadLocals);
  19. // 分配一个线程ID
  20. tid = nextThreadID();
  21. }

​ 在上述过程中,一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent是否为Daemon、优先级和加载资源的contextClassLoader以及可继承的ThreadLocal,同时还会分配一个唯一的ID来标识这个child线程。至此,一个能够运行的线程对象就初始化好了,在堆内存中等待着运行。

4.2.2 启动线程

​ 线程对象在初始化完成之后,调用start()方法就可以启动这个线程。线程start()方法的含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,应立即启动调用start()方法的线程。
注意:启动一个线程前,最好为这个线程设置线程名称,因为这样在使用jstack分析程序或者进行问题排查时,就会给开发人员提供一些提示,自定义的线程最好能够起个名字。

4.2.3 理解中断

​ 中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。中断好比其他线程对该线程打了个招呼,其他线程通过调用该线程的interrupt()方法对其进行中断操作。线程通过检查自身是否被中断来进行响应,线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位。如果该线程已经处于终结状态,即使该线程被中断过,在调用该线程对象的isInterrupted()时依旧会返回false。
​ 从Java的API中可以看到,许多声明抛出InterruptedException的方法(例如Thread.sleep(longmillis)方法)这些方法在抛出InterruptedException之前,Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,此时调用isInterrupted()方法将会返回false。在代码清单4-7所示的例子中,首先创建了两个线程,SleepThread和BusyThread,前者不停地睡眠,后者一直运行,然后对这两个线程分别进行中断操作,观察二者的中断标识位。

  1. package com.howinfun.cloud;
  2. import java.util.concurrent.TimeUnit;
  3. public class Interrupted {
  4. public static void main(String[] args) throws Exception {
  5. // sleepThread不停的尝试睡眠
  6. Thread sleepThread = new Thread(new SleepRunner(), "SleepThread");
  7. sleepThread.setDaemon(true);
  8. // busyThread不停的运行
  9. Thread busyThread = new Thread(new BusyRunner(), "BusyThread");
  10. busyThread.setDaemon(true);
  11. sleepThread.start();
  12. busyThread.start();
  13. // 休眠5秒,让sleepThread和busyThread充分运行
  14. TimeUnit.SECONDS.sleep(5);
  15. sleepThread.interrupt();
  16. busyThread.interrupt();
  17. System.out.println("SleepThread interrupted is " + sleepThread.isInterrupted());
  18. System.out.println("BusyThread interrupted is " + busyThread.isInterrupted());
  19. // 防止sleepThread和busyThread立刻退出
  20. TimeUnit.SECONDS.sleep(2);
  21. }
  22. static class SleepRunner implements Runnable {
  23. @Override
  24. public void run() {
  25. while (true) {
  26. try {
  27. TimeUnit.SECONDS.sleep(10);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }
  34. static class BusyRunner implements Runnable {
  35. @Override
  36. public void run() {
  37. while (true) {
  38. }
  39. }
  40. }
  41. }

​ 输出如下:

  1. SleepThread interrupted is false
  2. BusyThread interrupted is true

​ 从结果可以看出,抛出InterruptedException的线程SleepThread,其中断标识位被清除了,而一直忙碌运作的线程BusyThread,中断标识位没有被清除。

4.2.4 过期的suspend()、resume()和stop()

​ 大家对于CD机肯定不会陌生,如果把它播放音乐比作一个线程的运作,那么对音乐播放做出的暂停、恢复和停止操作对应在线程Thread的API就是suspend()、resume()和stop()。
​ 在代码清单4-8所示的例子中,创建了一个线程PrintThread,它以1秒的频率进行打印,而主线程对其进行暂停、恢复和停止操作。

  1. public class Deprecated {
  2. public static void main(String[] args) throws Exception {
  3. DateFormat format = new SimpleDateFormat("HH:mm:ss");
  4. Thread printThread = new Thread(new Runner(), "PrintThread");
  5. printThread.setDaemon(true);
  6. printThread.start();
  7. TimeUnit.SECONDS.sleep(3);
  8. // 将PrintThread进行暂停,输出内容工作停止
  9. printThread.suspend();
  10. System.out.println("main suspend PrintThread at " + format.format(new Date()));
  11. TimeUnit.SECONDS.sleep(3);
  12. // 将PrintThread进行恢复,输出内容继续
  13. printThread.resume();
  14. System.out.println("main resume PrintThread at " + format.format(new Date()));
  15. TimeUnit.SECONDS.sleep(3);
  16. // 将PrintThread进行终止,输出内容停止
  17. printThread.stop();
  18. System.out.println("main stop PrintThread at " + format.format(new Date()));
  19. TimeUnit.SECONDS.sleep(3);
  20. }
  21. static class Runner implements Runnable {
  22. @Override
  23. public void run() {
  24. DateFormat format = new SimpleDateFormat("HH:mm:ss");
  25. while (true) {
  26. System.out.println(Thread.currentThread().getName() + " Run at " +
  27. format.format(new Date()));
  28. SleepUtils.second(1);
  29. }
  30. }
  31. }
  32. }

​ 输出如下:

  1. PrintThread Run at 17:34:36
  2. PrintThread Run at 17:34:37
  3. PrintThread Run at 17:34:38
  4. main suspend PrintThread at 17:34:39
  5. main resume PrintThread at 17:34:42
  6. PrintThread Run at 17:34:42
  7. PrintThread Run at 17:34:43
  8. PrintThread Run at 17:34:44
  9. main stop PrintThread at 17:34:45

​ 在执行过程中,PrintThread运行了3秒,随后被暂停,3秒后恢复,最后经过3秒被终止。通过示例的输出可以看到,suspend()、resume()和stop()方法完成了线程的暂停、恢复和终止工作,而且非常“人性化”。但是这些API是过期的,也就是不建议使用的。
​ 不建议使用的原因主要有:以suspend()方法为例,在调用后,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。同样,stop()方法在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。
注意:正因为suspend()、resume()和stop()方法带来的副作用,这些方法才被标注为不建议使用的过期方法,而暂停和恢复操作可以用后面提到的等待/通知机制来替代。

4.2.5 安全地终止线程

​ 在4.2.3节中提到的中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。除了中断以外,还可以利用一个boolean变量来控制是否需要停止任务并终止该线程。
​ 在代码清单4-9所示的例子中,创建了一个线程CountThread,它不断地进行变量累加,而主线程尝试对其进行中断操作和停止操作。

代码清单4-9 Shutdown.java

  1. public class Shutdown {
  2. public static void main(String[] args) throws Exception {
  3. Runner one = new Runner();
  4. Thread countThread = new Thread(one, "CountThread");
  5. countThread.start();
  6. // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
  7. TimeUnit.SECONDS.sleep(1);
  8. countThread.interrupt();
  9. Runner two = new Runner();
  10. countThread = new Thread(two, "CountThread");
  11. countThread.start();
  12. // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
  13. TimeUnit.SECONDS.sleep(1);
  14. two.cancel();
  15. }
  16. private static class Runner implements Runnable {
  17. private long i;
  18. private volatile boolean on = true;
  19. @Override
  20. public void run() {
  21. while (on && !Thread.currentThread().isInterrupted()){
  22. i++;
  23. }
  24. System.out.println("Count i = " + i);
  25. }
  26. public void cancel() {
  27. on = false;
  28. }
  29. }
  30. }

​ 输出结果如下所示(输出内容可能不同)。

  1. Count i = 543487324
  2. Count i = 540898082

​ 示例在执行过程中,main线程通过中断操作和cancel()方法均可使CountThread得以终止。这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地将线程停止,因此这种终止线程的做法显得更加安全和优雅。

4.3 线程间通信

​ 线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

4.3.1 volatile和synchronized关键字

​ Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。
​ 举个例子,定义一个表示程序是否运行的成员变量boolean on=true,那么另一个线程可能对它执行关闭动作(on=false),这里涉及多个线程对变量的访问,因此需要将其定义成为volatile boolean on=true,这样其他线程对它进行改变时,可以让所有线程感知到变化,因为所有对on变量的访问和修改都需要以共享内存为准。但是,过多地使用volatile是不必要的,因为它会降低程序执行的效率。
关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。
​ 在代码清单4-10所示的例子中,使用了同步块和同步方法,通过使用javap工具查看生成的class文件信息来分析synchronized关键字的实现细节,示例如下。

  1. public class Synchronized {
  2. public static void main(String[] args) {
  3. // 对Synchronized Class对象进行加锁
  4. synchronized (Synchronized.class) {
  5. }
  6. // 静态同步方法,对Synchronized Class对象进行加锁
  7. m();
  8. }
  9. public static synchronized void m() {
  10. }
  11. }

​ 在Synchronized.class同级目录执行javap–v Synchronized.class,部分相关输出如下所示:

  1. public static void main(java.lang.String[]);
  2. // 方法修饰符,表示:public staticflags: ACC_PUBLIC, ACC_STATIC
  3. Code:
  4. stack=2, locals=1, args_size=1
  5. 0: ldc #1  // class com/murdock/books/multithread/book/Synchronized
  6. 2: dup
  7. 3: monitorenter  // monitorenter:监视器进入,获取锁
  8. 4: monitorexit   // monitorexit:监视器退出,释放锁
  9. 5: invokestatic  #16 // Method m:()V
  10. 8: return
  11. public static synchronized void m();
  12. // 方法修饰符,表示: public static synchronized
  13. flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
  14. Code:
  15. stack=0, locals=0, args_size=0
  16. 0: return

​ **从上面class信息中,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到有synchronized锁保护对象的监视器。

​ 任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。

​ 图4-2描述了对象、对象的监视器、同步队列和执行线程之间的关系。

对象、对象的监视器、同步队列和执行线程之间的关系

​ 从图4-2中可以看到,任意线程对Object(Object由synchronized保护)的访问,首先要获得Object的监视器。如果获取失败,线程进入同步队列,线程状态变为BLOCKED。当访问Object的前驱(获得了锁的线程)释放了锁,则该释放操作唤醒阻塞在同步队列中的线程,使其重新尝试对监视器的获取。

4.3.2 等待/通知机制

​ 一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良好的伸缩性,但是在Java语言中如何实现类似的功能呢?
​ 简单的办法是让消费者线程不断地循环检查变量是否符合预期,如下面代码所示,在while循环中设置不满足的条件,如果条件满足则退出while循环,从而完成消费者的工作。

  1. while (value != desire) {
  2. Thread.sleep(1000);
  3. }
  4. doSomething();

​ 上面这段伪代码在条件不满足时就睡眠一段时间,这样做的目的是防止过快的“无效”尝试,这种方式看似能够解实现所需的功能,但是却存在如下问题。
​ 1)难以确保及时性。在睡眠时,基本不消耗处理器资源,但是如果睡得过久,就不能及时发现条件已经变化,也就是及时性难以保证。
​ 2)难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
​ 以上两个问题,看似矛盾难以调和,但是Java通过内置的等待/通知机制能够很好地解决这个矛盾并实现所需的功能。
​ 等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类java.lang.Object上,方法和描述如表4-2所示。
​ 表4-2 等待/通知的相关方法






























方法名称 描述
notify() 通知一个在对象上等待的线程,使其从wait()返回,而返回的前提是该线程获取了对象的锁
notifyAll() 通知所有等待在该对象上的线程
wait() 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或者被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁
wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长大n毫秒,如果没有通知就超时返回
wait(long,int) 对于超时时间更细粒度的控制,可以达到纳秒

​ 等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
​ 在代码清单4-11所示的例子中,创建了两个线程——WaitThread和NotifyThread,前者检查flag值是否为false,如果符合要求,进行后续操作,否则在lock上等待,后者在睡眠了一段时间后对lock进行通知,示例如下所示。

  1. public class WaitNotify {
  2. static boolean flag = true;
  3. static Object lock = new Object();
  4. public static void main(String[] args) throws Exception {
  5. Thread waitThread = new Thread(new Wait(), "WaitThread");
  6. waitThread.start();
  7. TimeUnit.SECONDS.sleep(1);
  8. Thread notifyThread = new Thread(new Notify(), "NotifyThread");
  9. notifyThread.start();
  10. }
  11. static class Wait implements Runnable {
  12. public void run() {
  13. // 加锁,拥有lock的Monitor
  14. synchronized (lock) {
  15. // 当条件不满足时,继续wait,同时释放了lock的锁
  16. while (flag) {
  17. try {
  18. System.out.println(Thread.currentThread() + " flag is true. wait
  19. @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
  20. lock.wait();
  21. } catch (InterruptedException e) {
  22. }
  23. }
  24. // 条件满足时,完成工作
  25. System.out.println(Thread.currentThread() + " flag is false. running
  26. @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
  27. }
  28. }
  29. }
  30. static class Notify implements Runnable {
  31. public void run() {
  32. // 加锁,拥有lock的Monitor
  33. synchronized (lock) {
  34. // 获取lock的锁,然后进行通知,通知时不会释放lock的锁,
  35. // 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
  36. System.out.println(Thread.currentThread() + " hold lock. notify @ " +
  37. new SimpleDateFormat("HH:mm:ss").format(new Date()));
  38. lock.notifyAll();
  39. flag = false;
  40. SleepUtils.second(5);
  41. }
  42. // 再次加锁
  43. synchronized (lock) {
  44. System.out.println(Thread.currentThread() + " hold lock again. sleep
  45. @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
  46. SleepUtils.second(5);
  47. }
  48. }
  49. }
  50. }

​ 输出如下(输出内容可能不同,主要区别在时间上)。

  1. Thread[WaitThread,5,main] flag is true. wait @ 22:23:03
  2. Thread[NotifyThread,5,main] hold lock. notify @ 22:23:04
  3. Thread[NotifyThread,5,main] hold lock again. sleep @ 22:23:09
  4. Thread[WaitThread,5,main] flag is false. running @ 22:23:14

​ 上述第3行和第4行输出的顺序可能会互换,而上述例子主要说明了调用wait()、notify()以及notifyAll()时需要注意的细节,如下。
​ 1)使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
​ 2)调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
​ 3)notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
​ 4)notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKED。
​ 5)从wait()方法返回的前提是获得了调用对象的锁。
从上述细节中可以看到,等待/通知机制依托于同步机制,其目的就是确保等待线程从wait()方法返回时能够感知到通知线程对变量做出的修改。
​ 图4-3描述了上述示例的过程。

WaitNotify.java运行过程

​ 在图4-3中,WaitThread首先获取了对象的锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并从wait()方法返回继续执行。

4.3.3 等待/通知的经典范式

​ 从4.3.2节中的WaitNotify示例中可以提炼出等待/通知的经典范式,该范式分为两部分,分别针对等待方(消费者)和通知方(生产者)。
​ 等待方遵循如下原则。
​ 1)获取对象的锁。
​ 2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。
​ 3)条件满足则执行对应的逻辑。
​ 对应的伪代码如下。

  1. synchronized(对象) {
  2. while(条件不满足) {
  3. 对象.wait();
  4. }
  5. 对应的处理逻辑
  6. }

​ 通知方遵循如下原则。
​ 1)获得对象的锁。
​ 2)改变条件。
​ 3)通知所有等待在对象上的线程。
​ 对应的伪代码如下。

  1. synchronized(对象) {
  2. 改变条件
  3. 对象.notifyAll();
  4. }

4.3.4 管道输入/输出流

​ 管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
​ 管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。

​ 对于Piped类型的流,必须先要进行绑定【out.connect(in)】,也就是调用connect()方法,如果没有将输入/输出流绑定起来,对于该流的访问将会抛出异常。

4.3.5 Thread.join()的使用

​ 如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。线程Thread除了提供join()方法之外,还提供了join(long millis)和join(longmillis,int nanos)两个具备超时特性的方法。这两个超时方法表示,如果线程thread在给定的超时时间里没有终止,那么将会从该超时方法中返回。
​ 在代码清单4-13所示的例子中,创建了10个线程,编号0~9,每个线程调用前一个线程的join()方法,也就是线程0结束了,线程1才能从join()方法中返回,而线程0需要等待main线程结束。

  1. package com.howinfun.cloud;
  2. /** * @author Howinfun * @desc Thread -> join() * @date 2019/8/16 */
  3. public class ThreadJoin {
  4. public static void main(String[] args) {
  5. // 第一次循环,previous就是main
  6. Thread previous = Thread.currentThread();
  7. for (int i = 0; i < 10; i++) {
  8. Thread thread = new Thread(new Runner(previous,String.valueOf(i)));
  9. thread.start();
  10. // 将当前线程设置为下个线程的previous
  11. previous = thread;
  12. }
  13. System.out.println("main 线程结束");
  14. }
  15. static class Runner implements Runnable{
  16. private Thread previous;
  17. private String name;
  18. public Runner(Thread previous,String name){
  19. this.previous = previous;
  20. this.name = name;
  21. }
  22. @Override
  23. public void run() {
  24. try {
  25. // 调用previous的join方法
  26. this.previous.join();
  27. System.out.println(name+" 线程结束");
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }

输出如下:

  1. main 线程结束
  2. 0 线程结束
  3. 1 线程结束
  4. 2 线程结束
  5. 3 线程结束
  6. 4 线程结束
  7. 5 线程结束
  8. 6 线程结束
  9. 7 线程结束
  10. 8 线程结束
  11. 9 线程结束
  12. Process finished with exit code 0

​ 从上述输出可以看到,每个线程终止的前提是前驱线程的终止,每个线程等待前驱线程终止后,才从join()方法回来继续往下执行,这里涉及了等待/通知机制(等待前驱线程结束,接受前驱线程结束通知)。

​ 看一下JDK中Thread.join()方法的部分源码:

  1. /** * Waits at most {@code millis} milliseconds for this thread to * die. A timeout of {@code 0} means to wait forever. * * <p> This implementation uses a loop of {@code this.wait} calls * conditioned on {@code this.isAlive}. As a thread terminates the * {@code this.notifyAll} method is invoked. It is recommended that * applications not use {@code wait}, {@code notify}, or * {@code notifyAll} on {@code Thread} instances. * * @param millis * the time to wait in milliseconds * * @throws IllegalArgumentException * if the value of {@code millis} is negative * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */
  2. public final synchronized void join(long millis)
  3. throws InterruptedException {
  4. long base = System.currentTimeMillis();
  5. long now = 0;
  6. if (millis < 0) {
  7. throw new IllegalArgumentException("timeout value is negative");
  8. }
  9. if (millis == 0) {
  10. // 如果前驱线程是alive,调用自身wait方法
  11. while (isAlive()) {
  12. wait(0);
  13. }
  14. } else {
  15. while (isAlive()) {
  16. long delay = millis - now;
  17. if (delay <= 0) {
  18. break;
  19. }
  20. wait(delay);
  21. now = System.currentTimeMillis() - base;
  22. }
  23. }
  24. }

​ 看到源码的注释,我们可以知道,当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。可以看到join()方法的逻辑结构与4.3.3节中描述的等待/通知经典范式一致,即加锁、循环
和处理逻辑3个步骤。

4.3.6 ThreadLocal的使用

​ ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。

​ 小例子:

  1. /** * 获取当前线程操作对象 * * @author stephen * @since 2019/3/28 **/
  2. public class SecurityUser {
  3. private static final ThreadLocal<AuthCarUser> store = new ThreadLocal<>();
  4. // 设置当前线程的用户
  5. public static void setCurrentUser(AuthCarUser authCarUser) {
  6. if (Objects.isNull(authCarUser)) {
  7. throw new RuntimeException(SecurityUser.class + "不能添加空对象");
  8. }
  9. store.set(authCarUser);
  10. }
  11. // 获取当前线程的用户
  12. public static AuthCarUser getCurrentUser() {
  13. AuthCarUser authCarUser = store.get();
  14. if (Objects.isNull(authCarUser)) {
  15. throw new RuntimeException("当前没有操作对象");
  16. }
  17. return authCarUser;
  18. }
  19. public static void clear(){
  20. store.remove();
  21. }
  22. }

​ 看一下关于ThreadLocal的源码:

  1. // ThreadLocal设置值
  2. public void set(T value) {
  3. Thread t = Thread.currentThread();
  4. ThreadLocalMap map = getMap(t);
  5. if (map != null)
  6. map.set(this, value);
  7. else
  8. createMap(t, value);
  9. }
  10. // ThreadLocal获取值
  11. public T get() {
  12. Thread t = Thread.currentThread();
  13. ThreadLocalMap map = getMap(t);
  14. if (map != null) {
  15. ThreadLocalMap.Entry e = map.getEntry(this);
  16. if (e != null) {
  17. @SuppressWarnings("unchecked")
  18. T result = (T)e.value;
  19. return result;
  20. }
  21. }
  22. return setInitialValue();
  23. }
  24. // ThreadLoaclMap是Thread的一个成员变量
  25. ThreadLocal.ThreadLocalMap threadLocals = null;

​ 所以说,其实ThreadLoca操作的就是当前线程的TreadLocalMap成员变量。

4.4 线程应用实例

4.4.1 等待超时模式

​ 开发人员经常会遇到这样的方法调用场景:调用一个方法时等待一段时间(一般来说是给一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。
​ 前面的章节介绍了等待/通知的经典范式,即加锁、条件循环和处理逻辑3个步骤,而这种范式无法做到超时等待。而超时等待的加入,只需要对经典范式做出非常小的改动,改动内容如下所示。
​ 假设超时时间段是T,那么可以推断出在当前时间now+T之后就会超时。
​ 定义如下变量。
​ ·等待持续时间:REMAINING=T。
​ ·超时时间:FUTURE=now+T。
​ 这时仅需要wait(REMAINING)即可,在wait(REMAINING)返回之后会将执行:REMAINING=FUTURE–now。如果REMAINING小于等于0,表示已经超时,直接退出,否则将继续执行wait(REMAINING)。
​ 上述描述等待超时模式的伪代码如下。

  1. // 对当前对象加锁
  2. public synchronized Object get(long mills) throws InterruptedException {
  3. long future = System.currentTimeMillis() + mills;
  4. long remaining = mills;
  5. // 当超时大于0并且result返回值不满足要求
  6. while ((result == null) && remaining > 0) {
  7. wait(remaining);
  8. remaining = future - System.currentTimeMillis();
  9. }
  10. return result;
  11. }

​ 可以看出,等待超时模式就是在等待/通知范式基础上增加了超时控制,这使得该模式相比原有范式更具有灵活性,因为即使方法执行时间过长,也不会“永久”阻塞调用者,而是会按照调用者的要求“按时”返回。

4.4.2 一个简单的数据库连接池示例

​ // TODO

4.4.3 线程池技术及其示例

​ 对于服务端的程序,经常面对的是客户端传入的短小(执行时间短、工作内容较为单一)任务,需要服务端快速处理并返回结果。如果服务端每次接受到一个任务,创建一个线程,然后进行执行,这在原型阶段是个不错的选择,但是面对成千上万的任务递交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万记的线程,这不是一个好的选择。因为这会使操作系统频繁的进行线程上下文切换,无故增加系统的负载,而线程的创建和消亡都是需要耗费系统资源的,也无疑浪费了系统资源。
​ 线程池技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务的执行。这样做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面,面对过量任务的提交能够平缓的劣化。

​ 示例 //TODO

​ 线程池的本质就是使用了一个线程安全的工作队列连接工作者线程和客户端线程,客户端线程将任务放入工作队列后便返回,而工作者线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有的工作者线程均等待在工作队列上,当有客户端提交了一个任务之后会通知任意一个工作者线程,随着大量的任务被提交,更多的工作者线程会被唤醒。

4.4.4 一个基于线程池技术的简单Web服务器

​ // TODO

4.5 本章小结

​ 本章从介绍多线程技术带来的好处开始,讲述了如何启动和终止线程以及线程的状态,详细阐述了多线程之间进行通信的基本方式和等待/通知经典范式。在线程应用示例中,使用了等待超时、数据库连接池以及简单线程池3个不同的示例巩固本章前面章节所介绍的Java多线程基础知识。最后通过一个简单的Web服务器将上述知识点串联起来,加深我们对这些知识点的理解。

发表评论

表情:
评论列表 (有 0 条评论,155人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java并发编程艺术

         什么是上下文切换: 上下文切换 即使是单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现 这个机制。时间片是CPU分配给各个线程的时间