线程池ThreadPoolExecutor源码剖析

刺骨的言语ヽ痛彻心扉 2023-10-08 18:01 130阅读 0赞

一、Java构建线程的方式

  • 继承Thread (也实现了Runnable)
    请添加图片描述
  • 实现Runnable
  • 实现Callable (与Runnable区别…)
  • 线程池方式 (Java提供了构建线程池的方式)[可以实现Runnable 和 Callable 功能]

    • Java提供了Executors可以去创建(规范中不允许使用这种方式创建线程池,这种方式对线程的控制粒度比较低)
    • 推荐手动创建线程池 ThreadPoolExecutor

二、线程池的7个参数

  1. public ThreadPoolExecutor(int corePoolSize, // 核心线程数
  2. int maximumPoolSize, // 最大线程数
  3. long keepAliveTime, // 最大空闲时间
  4. TimeUnit unit, // 时间单位
  5. BlockingQueue<Runnable> workQueue, // 阻塞队列
  6. ThreadFactory threadFactory, // 线程工厂
  7. RejectedExecutionHandler handler) {
  8. // 拒绝策略
  9. }

三、线程池的执行流程















线程池执行流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ygpetg5E-1676901244832)(C:\Users\b\AppData\Roaming\Typora\typora-user-images\image-20230220132010848.png)]

为什么要先进阻塞再去尝试创建非核心线程:

饭店(线程池)— 厨子(线程)— 人多先排队(阻塞队列)— 招厨子I(创建最大线程数) – 今日客满(拒绝)

四、线程池属性标识

4.1 核心属性

ThreadPoolExecutor 中的属性(核心成员变量)标识

学习课程 : https://www.bilibili.com/video/BV1244y1n7bz/?p=4&spm_id_from=pageDriver&vd_source=c81fe4418bb0d2f341abd89cbfa157aahttps://www.bilibili.com/video/BV1244y1n7bz/?p=4&spm_id_from=pageDriver&vd_source=c81fe4418bb0d2f341abd89cbfa157aa

  1. // 是一个int类型的数值,表达了两个意思,1:声明当前线程池的状态,2:声明线程池中的线程数
  2. // 高3位是:线程池状态 低29位是:线程池中的线程个数
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  4. private static final int COUNT_BITS = Integer.SIZE - 3; // 29,方便后面做位运算
  5. private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 通过为运 算得出最大容量
  6. // 线程池状态
  7. // runState is stored in the high-order bits
  8. private static final int RUNNING = -1 << COUNT_BITS; // 111 代表线程池为RUNNING, 代表正常接收任务
  9. private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 代表线程池为SHUTDOWN状态,不接收新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理
  10. private static final int STOP = 1 << COUNT_BITS; // O01 代表线程池为STOP状态,不接收新任务,也不去处理阻塞队列中的任务,同时会中断正在执行的任务
  11. private static final int TIDYING = 2 << COUNT_BITS; // 010 代表线程池为TIDYING状态,过渡的状态,代表当前线程池即将Game Over
  12. private static final int TERMINATED = 3 << COUNT_BITS; // O11 代表线程池为TERMINATED, 要执行terminated(), 真的凉凉了
  13. // Packing and unpacking ctl
  14. private static int runStateOf(int c) {
  15. return c & ~CAPACITY; } // 得到线程池的状态
  16. private static int workerCountOf(int c) {
  17. return c & CAPACITY; } // 得到当前线程池的线程数量
  18. private static int ctlOf(int rs, int wc) {
  19. return rs | wc; }
  20. // 想对位移更掌握,看雪花算法,达到手写的能力,位移方向和各种二进制运算就没问题了。
4.2 线程池状态变化














线程池状态变化
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i70mMtI0-1676901244832)(C:\Users\b\AppData\Roaming\Typora\typora-user-images\image-20230220145338221.png)]

五、线程池的execute方法执行

5.1 从execute方法开始
  1. public void execute(Runnable command) {
  2. // 健壮性判断
  3. if (command == null)
  4. throw new NullPointerException();
  5. /*
  6. * Proceed in 3 steps:
  7. *
  8. * 1. If fewer than corePoolSize threads are running, try to
  9. * start a new thread with the given command as its first
  10. * task. The call to addWorker atomically checks runState and
  11. * workerCount, and so prevents false alarms that would add
  12. * threads when it shouldn't, by returning false.
  13. *
  14. * 2. If a task can be successfully queued, then we still need
  15. * to double-check whether we should have added a thread
  16. * (because existing ones died since last checking) or that
  17. * the pool shut down since entry into this method. So we
  18. * recheck state and if necessary roll back the enqueuing if
  19. * stopped, or start a new thread if there are none.
  20. *
  21. * 3. If we cannot queue task, then we try to add a new
  22. * thread. If it fails, we know we are shut down or saturated
  23. * and so reject the task.
  24. */
  25. // 拿到32位的int
  26. int c = ctl.get();
  27. // 获取 工作线程数 < 核心线程数
  28. if (workerCountOf(c) < corePoolSize) {
  29. // 进到if,代表可以创建 核心 线程数
  30. if (addWorker(command, true))
  31. // 到这结束
  32. return;
  33. // 如果if没进去,代表创建核心线程数失败,重新获取ct1
  34. c = ctl.get();
  35. }
  36. // 判断线程池是不是RUNNING,将任务添加到阻塞队列中
  37. if (isRunning(c) && workQueue.offer(command)) {
  38. // 再次获取ct1
  39. int recheck = ctl.get();
  40. // 再次判断是否是RUNNUING, 如果不是RUNNING,移除任务
  41. if (! isRunning(recheck) && remove(command))
  42. reject(command); // 拒绝策略
  43. else if (workerCountOf(recheck) == 0) // 如果线程池处在RUNNING状态,BUT工作线程为0
  44. addWorker(null, false); // 阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务
  45. }
  46. // 创建非核心线程,处理任务
  47. else if (!addWorker(command, false))
  48. reject(command); // 拒绝策略
  49. }
5.2 通过上述源码,掌握了线程池的执行流程,再次查看addWorker方法内部做了什么处理
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. // 经过大量的判断,给工作线程数标识+1,
  4. for (;;) {
  5. // 获取ct1,(32位)
  6. int c = ctl.get();
  7. // 获取线程池状态
  8. int rs = runStateOf(c);
  9. // 除了RUNNING都有可能
  10. // Check if queue empty only if necessary.
  11. if (rs >= SHUTDOWN &&
  12. ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
  13. // rs == SHUTDOWN,如果不是SHUTDOWN,就代表是STOP或者跟高的状态,这时,不需要添加线程处理任务
  14. // 任务为空,如果任务为null, 并且线程池状态不是RUNNING,不需要处理
  15. // 阻塞队列不为nu11,如果阻塞队列为空,返回false,.外侧的!再次取反,获取true,不需要处理
  16. )
  17. // 构建工作线程失败!
  18. return false;
  19. for (;;) {
  20. // 获取工作线程个数
  21. int wc = workerCountOf(c);
  22. // 如果当前线程已经大于线程池最大容量,不去创建了
  23. // 判断wC是否超过核心线程或者最大线程
  24. if (wc >= CAPACITY ||
  25. wc >= (core ? corePoolSize : maximumPoolSize))
  26. // 构建工作线程失败!
  27. return false;
  28. //将工作线程数+1,采用CAS的方式
  29. if (compareAndIncrementWorkerCount(c))
  30. // 成功就退出外侧for循环
  31. break retry;
  32. // 重新获取ct1
  33. c = ctl.get(); // Re-read ctl
  34. // 重新判断线程池状态,如果有变化 如果状态没变化,重新执行内部循环即可
  35. if (runStateOf(c) != rs)
  36. // 结束这次外侧循环,开始下次外侧循环
  37. continue retry;
  38. // else CAS failed due to workerCount change; retry inner loop
  39. }
  40. }
  41. // worker开始 = false
  42. boolean workerStarted = false;
  43. // worker添加 = false
  44. boolean workerAdded = false;
  45. // Worker就是工作线程
  46. Worker w = null;
  47. try {
  48. // 创建Worker,传入任务
  49. w = new Worker(firstTask);
  50. // 从Worker中获取线程t
  51. final Thread t = w.thread;
  52. // 如果线程t不为nul1
  53. if (t != null) {
  54. // 获取线程池的全局锁,避免我添加任务时,其他线程干掉了线程池,干掉线程池需要先获取这个锁
  55. final ReentrantLock mainLock = this.mainLock;
  56. mainLock.lock(); // 加锁
  57. try {
  58. // Recheck while holding lock.
  59. // Back out on ThreadFactory failure or if
  60. // shut down before lock acquired.
  61. // 获取线程池状态
  62. int rs = runStateOf(ctl.get());
  63. // 是RUNNING状态
  64. if (rs < SHUTDOWN ||
  65. (rs == SHUTDOWN && firstTask == null)) {
  66. // 是SHUTDOV状态,创建空任务工作线程,处理阻塞队列中的任务
  67. // 线程是否是运行状态
  68. if (t.isAlive()) // precheck that t is startable
  69. throw new IllegalThreadStateException();
  70. // 将工作线程添加到集合中
  71. workers.add(w);
  72. // 获取工作线程个数
  73. int s = workers.size();
  74. // 如果线程工作线程数,大于之前记录的最大工作线程数,就替换一下
  75. if (s > largestPoolSize)
  76. largestPoolSize = s;
  77. // workerAdded为true,添加工作线程成功
  78. workerAdded = true;
  79. }
  80. } finally {
  81. // 释放锁
  82. mainLock.unlock();
  83. }
  84. if (workerAdded) {
  85. // 启动工作线程
  86. t.start();
  87. // 启动工作线程成功
  88. workerStarted = true;
  89. }
  90. }
  91. } finally {
  92. // 如果启动工作线程失败,调用下面方法
  93. if (! workerStarted)
  94. addWorkerFailed(w);
  95. }
  96. return workerStarted; // 返回工作是否启动
  97. }

六、Worker的封装

Worker的封装

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable
  4. {
  5. /**
  6. * This class will never be serialized, but we provide a
  7. * serialVersionUID to suppress a javac warning.
  8. */
  9. private static final long serialVersionUID = 6138294804551838833L;
  10. /** Thread this worker is running in. Null if factory fails. */
  11. final Thread thread;
  12. /** Initial task to run. Possibly null. */
  13. Runnable firstTask;
  14. /** Per-thread task counter */
  15. volatile long completedTasks;
  16. /**
  17. * Creates with given first task and thread from ThreadFactory.
  18. * @param firstTask the first task (null if none)
  19. */
  20. Worker(Runnable firstTask) {
  21. setState(-1); // inhibit interrupts until runWorker
  22. this.firstTask = firstTask;
  23. this.thread = getThreadFactory().newThread(this);
  24. }
  25. /** Delegates main run loop to outer runWorker */
  26. public void run() {
  27. runWorker(this);
  28. }
  29. }

看runWorker方法

  1. final void runWorker(Worker w) {
  2. // 获取当前线程
  3. Thread wt = Thread.currentThread();
  4. // 拿到任务
  5. Runnable task = w.firstTask;
  6. // 先不关注
  7. w.firstTask = null;
  8. w.unlock(); // allow interrupts
  9. // 标识为true
  10. boolean completedAbruptly = true;
  11. try {
  12. // 任务不空,执行任务。 如果任务为空,通过getTask从阻塞队列中获取任务!
  13. while (task != null || (task = getTask()) != null) {
  14. w.lock(); // 加锁,避免你shutdown我任务也不会中断
  15. // If pool is stopping, ensure thread is interrupted;
  16. // if not, ensure thread is not interrupted. This
  17. // requires a recheck in second case to deal with
  18. // shutdownNow race while clearing interrupt
  19. // 获取当前状态,是否大于等于STOP,悲剧!
  20. if ((runStateAtLeast(ctl.get(), STOP) ||
  21. (Thread.interrupted() &&
  22. runStateAtLeast(ctl.get(), STOP))) &&
  23. !wt.isInterrupted())
  24. wt.interrupt();
  25. try {
  26. // 执行任务前的操作
  27. beforeExecute(wt, task);
  28. Throwable thrown = null;
  29. try {
  30. // 开始执行任务
  31. task.run();
  32. } catch (RuntimeException x) {
  33. thrown = x; throw x;
  34. } catch (Error x) {
  35. thrown = x; throw x;
  36. } catch (Throwable x) {
  37. thrown = x; throw new Error(x);
  38. } finally {
  39. // 执行任务后的操作
  40. afterExecute(task, thrown);
  41. }
  42. } finally {
  43. task = null;
  44. w.completedTasks++;
  45. w.unlock();
  46. }
  47. }
  48. completedAbruptly = false;
  49. } finally {
  50. processWorkerExit(w, completedAbruptly);
  51. }
  52. }

有时间再去查看getTask方法。processWorkerExit线程执行完毕的后续处理。

发表评论

表情:
评论列表 (有 0 条评论,130人围观)

还没有评论,来说两句吧...

相关阅读