ThreadPoolExecutor源码解析
原创不易,转载请注明出处
文章目录
- 前言
- 1.线程池参数解析与运行原理
- 2.ThreadPoolExecutor实例化
- 3.位运算表示线程池状态与线程数
- 4.提交任务任务
- 5.执行任务
- 6.keepalive清理的是core还是max
- 7.shutdown
- 8.shutdownNow
前言
主要是从源码的角度看看线程池是怎样添加任务,怎样运行任务的。
1.线程池参数解析与运行原理
参数 core 线程数, max线程数,keepAlive,keepAlive单位,队列,线程工厂,拒绝策略。
一开始的时候,会根据core线程数来创建线程,如果线程池里面的线程已经达到了core 线程数
这个时候有任务就会往队列里面添加,如果队列任务也满了,就会往接着创建新的线程来执行,线程池里面线程最多不能超过 max线程数线程数,如果超过,就将任务交给拒绝策略来处理,默认的拒绝策略就是抛出异常。
2.ThreadPoolExecutor实例化
ThreadPoolExecutor 构造参数最少也要 core,max ,keepalive,单位,队列。
关于线程工厂,拒绝策略这玩意,你不指定的话,ThreadPoolExecutor 也会有默认的能用,线程工厂默认的没啥好说的,拒绝策略的就是AbortPolicy,就是直接抛出异常。
这里没啥好说的,就是一堆赋值,唯一要说的是core线程数可以是0的。
3.位运算表示线程池状态与线程数
这一大堆就是关于线程池状态与线程数的,以及相应的获取方法,比较方法等等。
高3为表示的是线程池的状态,低29位表示的是当前线程数
关于这几个状态对应的二进制,位运算,可以看下这篇文章
参考文章:连接
这里解释下下面的几个方法:
- runStateOf 获取当前状态的
- workerCountOf 获取当前线程数
- ctlOf 创建一个 ctl,里面包含了 线程池状态+ 线程数
- runStateLessThan 状态比较,这个的大小看状态对应的数字大小就可以了。
- runStateAtLeast 与4同理,不过这个是大于等于
- isRunning 判断是不是running状态。
其实这些东西不用太纠结。
4.提交任务任务
这里直接看下这个execute 方法代码
就是分为3步,从上面的注释中也能得到。
先是判断下当前线程数是否大于core线程数,如果不大于的话执行addWorker(command, true)
,需要注意后面参数是true。
如果是当前线程数大于了core线程数,就会往workQueue 队列中添加任务,添加完成后,再次检查下这个线程池的运行状态,如果不是runing状态了就会从队列中移除掉,然后拒绝掉,如果线程数是0的话,就会执行addWorker(null, false)
,这里是为啥呢?因为线程池支持core线程数是0 ,这个时候往里面添加,都添加到了队列中,然后没有对应的线程来执行任务,岂不是很尴尬,这个时候,就会调用addWorker 添加worker,先执行着任务。
如果队列也满了,没有添加进去的话,就会执行addWorker(command, false)
方法来添加,如果失败的就拒绝。
接下来看看addWorker方法的执行(由于太长我们分成2部分)
这块核心逻辑就是判断线程数有没有超,没有超的话就是先增加线程数的计数。
compareAndIncrementWorkerCount方法:
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
如果线程数计数增加成功的话,就会执行后续创建worker的逻辑
这个worker 其实就是个Runnable,然后里面维护了一个thread对象,在创建worker实例的时候会将thread的实例化,接着就是将worker 添加到workers这个缓存中,workers是个HashSet,最后是如果添加成功的话,就会启动线程。
5.执行任务
执行原理其实很简单,就是先讲每个worker里面自己的那个firstTask执行完成,接着就不断的从我们配置的那个队列中获取任务执行。
在实例化Worker的时候,对应的创建了一个线程,然后将Worker 作为Runnable传了进去
这里我们只需要看下Worker 的run方法就可以了
在这个runWorker 方法中可以看到有个while循环,只要是个task不是null就会执行里面的逻辑,这个task首先是创建worker的时候被添加进去的firstTask,执行完这个firstTask就会通过getTask方法从队列中获取任务。
我们看下while里面的处理逻辑,一开始有个判断,这个判断线程池状态的,然后中断线程的,这个先不用管,重要的是在try代码块中的代码,先是执行beforeExecute 方法,这个方法在ThreadPoolExecutor 是空的,子类可以重写,还有后面的afterExecute 方法也是一样,可以理解为任务执行前执行后的钩子,最最最重要的就是task.run() ,这行就是执行用户的任务,看一下异常的处理,全部都抛出来了,执行完成后,接着就是将task重置为null,然后进行下次循环。
接着我们看下getTask方法,看看是怎样获取任务的。
可以看到也是有个死循环不断从队列中获取,我们重点关注下
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
可以看到从workQueue中获取任务,这个workQueue 就是我们自己定义的那个队列。
关于timed 这块的东西我们接下来章节会有介绍,这块涉及到线程空闲清理的情况。
6.keepalive清理的是core还是max
这里我们接着看getTask方法里面的逻辑,看下 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
这行意思,allowCoreThreadTimeOut 这个默认是false,也就是说允许core线程数超时;
wc > corePoolSize, 当前的线程数大于了core线程数,也就是说只要满足(allowCoreThreadTimeOut=true或者当前线程数大于core线程数)任意一个条件timed 就是true。
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
if (r != null)
return r;
timedOut = true;
咱们这里假设当前线程数是大于core线程数的,这个时候timed =true,然后从获取队列中的任务就是走workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
,keepAliveTime 参数就派上用场了,堵塞keepAliveTime 时间,获取不到就返回null,如果获取到了就直接return返回了,获取不到接着往下执行timedOut = true;
,这个时候这次循环执行完了,下次循环开始也是先执行 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
比如说现在线程数还是大于core核心线程数(没如果没有其他线程被剔除的情况下)接着执行
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
判断如果是 (现在线程数大于max线程数,这个条件是不可能存在的或者是有timeout的 ) && (线程数大于1 或者是 队列是空的)
这个时候才会尝试减少线程数,compareAndDecrementWorkerCount 这个方法只是减少计数器里面的数值,然后return null。
return null之后,在runWorker 方法中就会逃离while 循环,最后就会执行 processWorkerExit(w, completedAbruptly); 方法将这个worker从workers 中移除掉,线程执行完成,就会被GC回收掉。
可以看出这个keepalive 参数默认是给max 线程数设置的,所以清理空闲线程也是清理这个max 线程,一直清理到有任务或者是减少到core线程数的数量。
7.shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
checkShutdownAccess();这个方法不用关心
advanceRunState(SHUTDOWN);主要就是将线程池状态变成SHUTDOWN
interruptIdleWorkers();这个是尝试中断掉空闲的worker,我们看看这个方法。
onShutdown(); 这个方法是给子类用的。
这里其实就是遍历所有的worker,然后尝试获取锁,如果是线程没有中断而且获取锁成功,就执行中断。
我们看下执行完shutdown方法后,线程池里面线程执行任务是怎样变化的。
再次将runWorker执行任务的方法拿出来分析下,先看下这个lock,如果当前线程正在执行任务的话,也就是说在while循环体内,shutdown遍历的时候就不会获取锁,也就不会中断。如果是在getTask那堵塞住了,因为core默认是没有超时的,一直等着获取队列里面的任务,这个时候会被shutdown遍历workers的时候获取到锁,从而执行中断(其实就是设置个中断标志,如果线程正在等待的话,抛出中断异常),其实这种情况可以想想,队列里面没任务了,线程自己拿到的任务也执行完事了,shutdown正好把线程中断了就可以了。
在分析一下 while循环里面那一坨if
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
(如果线程池状态是STOP TIDYING TERMINATED 或者(是当前线程是中断并且线程池状态是STOP TIDYING TERMINATED))并且当前线程不是中断,就中断当前线程。
说白了就是线程池状态是STOP TIDYING TERMINATED 的就中断当前线程。
在看下shutdown对添加任务的影响
不是running状态就不能将任务添加到队列中,然后addWorker 这个方法中有这么一段代码
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
这行SHUTDOWN 状态绝对会返回false的,也就是没法再创建worker了,相当于没法添加任务。
总而言之,SHUTDOWN状态不能继续提交添加任务,已经有的任务还是可以执行完成的。
8.shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
advanceRunState(STOP);将状态改变成STOP
interruptWorkers()中断所有线程,不过已经拿到任务的线程还是可以执行完成的。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
drainQueue()清空队列。
添加任务肯定也是添加不上的,队列里面的任务也没法继续执行,而是取出来返回给用户自己执行。
还没有评论,来说两句吧...