ThreadPoolExecutor源码解析

ゝ一纸荒年。 2023-01-07 14:28 259阅读 0赞

原创不易,转载请注明出处

文章目录

      • 前言
      • 1.线程池参数解析与运行原理
      • 2.ThreadPoolExecutor实例化
      • 3.位运算表示线程池状态与线程数
      • 4.提交任务任务
      • 5.执行任务
      • 6.keepalive清理的是core还是max
      • 7.shutdown
      • 8.shutdownNow

前言

主要是从源码的角度看看线程池是怎样添加任务,怎样运行任务的。

1.线程池参数解析与运行原理

参数 core 线程数, max线程数,keepAlive,keepAlive单位,队列,线程工厂,拒绝策略。
一开始的时候,会根据core线程数来创建线程,如果线程池里面的线程已经达到了core 线程数
这个时候有任务就会往队列里面添加,如果队列任务也满了,就会往接着创建新的线程来执行,线程池里面线程最多不能超过 max线程数线程数,如果超过,就将任务交给拒绝策略来处理,默认的拒绝策略就是抛出异常。

2.ThreadPoolExecutor实例化

在这里插入图片描述
ThreadPoolExecutor 构造参数最少也要 core,max ,keepalive,单位,队列。
关于线程工厂,拒绝策略这玩意,你不指定的话,ThreadPoolExecutor 也会有默认的能用,线程工厂默认的没啥好说的,拒绝策略的就是AbortPolicy,就是直接抛出异常。
在这里插入图片描述
这里没啥好说的,就是一堆赋值,唯一要说的是core线程数可以是0的。

3.位运算表示线程池状态与线程数

在这里插入图片描述
这一大堆就是关于线程池状态与线程数的,以及相应的获取方法,比较方法等等。
高3为表示的是线程池的状态,低29位表示的是当前线程数
关于这几个状态对应的二进制,位运算,可以看下这篇文章
参考文章:连接

这里解释下下面的几个方法:

  1. runStateOf 获取当前状态的
  2. workerCountOf 获取当前线程数
  3. ctlOf 创建一个 ctl,里面包含了 线程池状态+ 线程数
  4. runStateLessThan 状态比较,这个的大小看状态对应的数字大小就可以了。
  5. runStateAtLeast 与4同理,不过这个是大于等于
  6. isRunning 判断是不是running状态。
    其实这些东西不用太纠结。

4.提交任务任务

这里直接看下这个execute 方法代码
在这里插入图片描述
就是分为3步,从上面的注释中也能得到。
先是判断下当前线程数是否大于core线程数,如果不大于的话执行addWorker(command, true),需要注意后面参数是true。
如果是当前线程数大于了core线程数,就会往workQueue 队列中添加任务,添加完成后,再次检查下这个线程池的运行状态,如果不是runing状态了就会从队列中移除掉,然后拒绝掉,如果线程数是0的话,就会执行addWorker(null, false),这里是为啥呢?因为线程池支持core线程数是0 ,这个时候往里面添加,都添加到了队列中,然后没有对应的线程来执行任务,岂不是很尴尬,这个时候,就会调用addWorker 添加worker,先执行着任务。
如果队列也满了,没有添加进去的话,就会执行addWorker(command, false)方法来添加,如果失败的就拒绝。
接下来看看addWorker方法的执行(由于太长我们分成2部分)
在这里插入图片描述
这块核心逻辑就是判断线程数有没有超,没有超的话就是先增加线程数的计数。
compareAndIncrementWorkerCount方法:

  1. private boolean compareAndIncrementWorkerCount(int expect) {
  2. return ctl.compareAndSet(expect, expect + 1);
  3. }

如果线程数计数增加成功的话,就会执行后续创建worker的逻辑
在这里插入图片描述
这个worker 其实就是个Runnable,然后里面维护了一个thread对象,在创建worker实例的时候会将thread的实例化,接着就是将worker 添加到workers这个缓存中,workers是个HashSet,最后是如果添加成功的话,就会启动线程。

5.执行任务

在这里插入图片描述
执行原理其实很简单,就是先讲每个worker里面自己的那个firstTask执行完成,接着就不断的从我们配置的那个队列中获取任务执行。

在实例化Worker的时候,对应的创建了一个线程,然后将Worker 作为Runnable传了进去
在这里插入图片描述
这里我们只需要看下Worker 的run方法就可以了
在这里插入图片描述
在这里插入图片描述
在这个runWorker 方法中可以看到有个while循环,只要是个task不是null就会执行里面的逻辑,这个task首先是创建worker的时候被添加进去的firstTask,执行完这个firstTask就会通过getTask方法从队列中获取任务。
我们看下while里面的处理逻辑,一开始有个判断,这个判断线程池状态的,然后中断线程的,这个先不用管,重要的是在try代码块中的代码,先是执行beforeExecute 方法,这个方法在ThreadPoolExecutor 是空的,子类可以重写,还有后面的afterExecute 方法也是一样,可以理解为任务执行前执行后的钩子,最最最重要的就是task.run() ,这行就是执行用户的任务,看一下异常的处理,全部都抛出来了,执行完成后,接着就是将task重置为null,然后进行下次循环。

接着我们看下getTask方法,看看是怎样获取任务的。
在这里插入图片描述
可以看到也是有个死循环不断从队列中获取,我们重点关注下

  1. Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();

可以看到从workQueue中获取任务,这个workQueue 就是我们自己定义的那个队列。
关于timed 这块的东西我们接下来章节会有介绍,这块涉及到线程空闲清理的情况。

6.keepalive清理的是core还是max

这里我们接着看getTask方法里面的逻辑,看下 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;这行意思,allowCoreThreadTimeOut 这个默认是false,也就是说允许core线程数超时;
wc > corePoolSize, 当前的线程数大于了core线程数,也就是说只要满足(allowCoreThreadTimeOut=true或者当前线程数大于core线程数)任意一个条件timed 就是true。

  1. Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
  2. if (r != null)
  3. return r;
  4. timedOut = true;

咱们这里假设当前线程数是大于core线程数的,这个时候timed =true,然后从获取队列中的任务就是走workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,keepAliveTime 参数就派上用场了,堵塞keepAliveTime 时间,获取不到就返回null,如果获取到了就直接return返回了,获取不到接着往下执行timedOut = true; ,这个时候这次循环执行完了,下次循环开始也是先执行 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;比如说现在线程数还是大于core核心线程数(没如果没有其他线程被剔除的情况下)接着执行

  1. if ((wc > maximumPoolSize || (timed && timedOut))
  2. && (wc > 1 || workQueue.isEmpty())) {
  3. if (compareAndDecrementWorkerCount(c))
  4. return null;
  5. continue;
  6. }

判断如果是 (现在线程数大于max线程数,这个条件是不可能存在的或者是有timeout的 ) && (线程数大于1 或者是 队列是空的)
这个时候才会尝试减少线程数,compareAndDecrementWorkerCount 这个方法只是减少计数器里面的数值,然后return null。
return null之后,在runWorker 方法中就会逃离while 循环,最后就会执行 processWorkerExit(w, completedAbruptly); 方法将这个worker从workers 中移除掉,线程执行完成,就会被GC回收掉。
可以看出这个keepalive 参数默认是给max 线程数设置的,所以清理空闲线程也是清理这个max 线程,一直清理到有任务或者是减少到core线程数的数量。

7.shutdown

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. checkShutdownAccess();
  6. advanceRunState(SHUTDOWN);
  7. interruptIdleWorkers();
  8. onShutdown(); // hook for ScheduledThreadPoolExecutor
  9. } finally {
  10. mainLock.unlock();
  11. }
  12. tryTerminate();
  13. }

checkShutdownAccess();这个方法不用关心
advanceRunState(SHUTDOWN);主要就是将线程池状态变成SHUTDOWN
interruptIdleWorkers();这个是尝试中断掉空闲的worker,我们看看这个方法。
onShutdown(); 这个方法是给子类用的。
在这里插入图片描述
这里其实就是遍历所有的worker,然后尝试获取锁,如果是线程没有中断而且获取锁成功,就执行中断。
我们看下执行完shutdown方法后,线程池里面线程执行任务是怎样变化的。
在这里插入图片描述
再次将runWorker执行任务的方法拿出来分析下,先看下这个lock,如果当前线程正在执行任务的话,也就是说在while循环体内,shutdown遍历的时候就不会获取锁,也就不会中断。如果是在getTask那堵塞住了,因为core默认是没有超时的,一直等着获取队列里面的任务,这个时候会被shutdown遍历workers的时候获取到锁,从而执行中断(其实就是设置个中断标志,如果线程正在等待的话,抛出中断异常),其实这种情况可以想想,队列里面没任务了,线程自己拿到的任务也执行完事了,shutdown正好把线程中断了就可以了。

在分析一下 while循环里面那一坨if

  1. if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&
  2. !wt.isInterrupted())
  3. wt.interrupt();

(如果线程池状态是STOP TIDYING TERMINATED 或者(是当前线程是中断并且线程池状态是STOP TIDYING TERMINATED))并且当前线程不是中断,就中断当前线程。
说白了就是线程池状态是STOP TIDYING TERMINATED 的就中断当前线程。

在看下shutdown对添加任务的影响
不是running状态就不能将任务添加到队列中,然后addWorker 这个方法中有这么一段代码

  1. if (rs >= SHUTDOWN &&
  2. ! (rs == SHUTDOWN &&
  3. firstTask == null &&
  4. ! workQueue.isEmpty()))
  5. return false;

这行SHUTDOWN 状态绝对会返回false的,也就是没法再创建worker了,相当于没法添加任务。

总而言之,SHUTDOWN状态不能继续提交添加任务,已经有的任务还是可以执行完成的。

8.shutdownNow

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. advanceRunState(STOP);
  8. interruptWorkers();
  9. tasks = drainQueue();
  10. } finally {
  11. mainLock.unlock();
  12. }
  13. tryTerminate();
  14. return tasks;
  15. }

advanceRunState(STOP);将状态改变成STOP
interruptWorkers()中断所有线程,不过已经拿到任务的线程还是可以执行完成的。

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

drainQueue()清空队列。
添加任务肯定也是添加不上的,队列里面的任务也没法继续执行,而是取出来返回给用户自己执行。

发表评论

表情:
评论列表 (有 0 条评论,259人围观)

还没有评论,来说两句吧...

相关阅读

    相关 hashMap

    源码来自jdk:1.8,和其他jdk版本可能有少许差异。 一.hashMap的实现原理     hashMap底层是一个有Node组成的数组,每个Node都有一个key