【java线程池】自己动手写线程池——向JDK线程池进发 青旅半醒 2024-04-01 09:45 18阅读 0赞 在本篇文章当中我们将要实现一个和JDK内部实现的线程池非常相似的线程池。 ### JDK线程池一瞥 ### 我们首先看一个JDK给我们提供的线程池`ThreadPoolExecutor`的构造函数的参数: <table> <tbody> <tr> <td></td> <td><code>public ThreadPoolExecutor(int corePoolSize,</code></td> </tr> <tr> <td></td> <td><code>int maximumPoolSize,</code></td> </tr> <tr> <td></td> <td><code>long keepAliveTime,</code></td> </tr> <tr> <td></td> <td><code>TimeUnit unit,</code></td> </tr> <tr> <td></td> <td><code>BlockingQueue<Runnable> workQueue,</code></td> </tr> <tr> <td></td> <td><code>ThreadFactory threadFactory,</code></td> </tr> <tr> <td></td> <td><code>RejectedExecutionHandler handler) </code></td> </tr> </tbody> </table> 参数解释: * corePoolSize:这个参数你可以理解为线程池当中至少需要 corePoolSize 个线程,初始时线程池当中线程的个数为0,当线程池当中线程的个数小于 corePoolSize 每次提交一个任务都会创建一个线程,并且先执行这个提交的任务,然后再去任务队列里面去获取新的任务,然后再执行。 * maximumPoolSize:这个参数指的是线程池当中能够允许的最大的线程的数目,当任务队列满了之后如果这个时候有新的任务想要加入队列当中,当发现队列满了之后就创建新的线程去执行任务,但是需要满足最大的线程的个数不能够超过 maximumPoolSize 。 * keepAliveTime 和 unit:这个主要是用于时间的表示,当队列当中多长时间没有数据的时候线程自己退出,前面谈到了线程池当中任务过多的时候会超过 corePoolSize ,当线程池闲下来的时候这些多余的线程就可以退出了。 * workQueue:这个就是用于保存任务的阻塞队列。 * threadFactory:这个参数倒不是很重要,线程工厂。 * handler:这个表示拒绝策略,JDK给我们提供了四种策略: * AbortPolicy:抛出异常。 * DiscardPolicy:放弃这个任务。 * CallerRunPolicy:提交任务的线程执行。 * DiscardOldestPolicy:放弃等待时间最长的任务。 基于上面谈到的参数,线程池当中提交任务的流程大致如下图所示: ![b392eae98c423ffb029522550f1c1dc5.png][] ### 自己动手实现线程池 ### 根据前面的参数分析我们自己实现的线程池需要实现一下功能: * 能够提交Runnable的任务和Callable的任务。 * 线程池能够自己实现动态的扩容和所容,动态调整线程池当中线程的数目,当任务多的时候能够增加线程的数目,当任务少的时候多出来的线程能够自动退出。 * 有自己的拒绝策略,当任务队列满了,线程数也达到最大的时候,需要拒绝提交的任务。 #### 线程池参数介绍 #### <table> <tbody> <tr> <td></td> <td><code>private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数</code></td> </tr> <tr> <td></td> <td><code>private int corePoolSize;</code></td> </tr> <tr> <td></td> <td><code>private int maximumPoolSize;</code></td> </tr> <tr> <td></td> <td><code>private long keepAliveTime;</code></td> </tr> <tr> <td></td> <td><code>private TimeUnit unit;</code></td> </tr> <tr> <td></td> <td><code>private BlockingQueue<Runnable> taskQueue;</code></td> </tr> <tr> <td></td> <td><code>private RejectPolicy policy;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private ArrayList<Worker> workers = new ArrayList<>();</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private volatile boolean isStopped;</code></td> </tr> <tr> <td></td> <td><code>private boolean useTimed;</code></td> </tr> <tr> <td></td> <td></td> </tr> </tbody> </table> 参数解释如下: * ct:表示当前线程池当中线程的个数。 * corePoolSize:线程池当中核心线程的个数,意义和上面谈到的JDK的线程池意义一致。 * maximumPoolSize:线程池当中最大的线程个数,意义和上面谈到的JDK的线程池意义一致。 * keepAliveTime 和 unit:和JDK线程池的参数意义一致。 * taskQueue:任务队列,用不保存提交的任务。 * policy:拒绝策略,主要有一下四种策略: <table> <tbody> <tr> <td></td> <td><code>public enum RejectPolicy { </code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>ABORT,</code></td> </tr> <tr> <td></td> <td><code>CALLER_RUN,</code></td> </tr> <tr> <td></td> <td><code>DISCARD_OLDEST,</code></td> </tr> <tr> <td></td> <td><code>DISCARD</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> </tbody> </table> * workers:用于保存工作线程。 * isStopped:线程池是否被关闭了。 * useTimed:主要是用于表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的时间内,如果没有从任务队列当中获取到任务,线程就从线程池退出,但是需要保证线程池当中最小的线程个数不小于 corePoolSize 。 #### 实现Runnable #### <table> <tbody> <tr> <td></td> <td><code>// 下面这个方法是向线程池提交任务</code></td> </tr> <tr> <td></td> <td><code>public void execute(Runnable runnable) throws InterruptedException { </code></td> </tr> <tr> <td></td> <td><code>checkPoolState();</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>if (addWorker(runnable, false) // 如果能够加入新的线程执行任务 加入成功就直接返回</code></td> </tr> <tr> <td></td> <td><code>|| !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了</code></td> </tr> <tr> <td></td> <td><code>|| addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程</code></td> </tr> <tr> <td></td> <td><code>return;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>// 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务</code></td> </tr> <tr> <td></td> <td><code>if (!taskQueue.offer(runnable))</code></td> </tr> <tr> <td></td> <td><code>reject(runnable);</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> </tbody> </table> 在上面的代码当中: * checkPoolState函数是检查线程池的状态,当线程池被停下来之后就不能够在提交任务: <table> <tbody> <tr> <td></td> <td><code>private void checkPoolState() { </code></td> </tr> <tr> <td></td> <td><code>if (isStopped) { </code></td> </tr> <tr> <td></td> <td><code>// 如果线程池已经停下来了,就不在向任务队列当中提交任务了</code></td> </tr> <tr> <td></td> <td><code>throw new RuntimeException("thread pool has been stopped, so quit submitting task");</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> </tbody> </table> * addWorker函数是往线程池当中提交任务并且产生一个线程,并且这个线程执行的第一个任务就是传递的参数。max表示线程的最大数目,max == true 的时候表示使用 maximumPoolSize 否则使用 corePoolSize,当返回值等于 true 的时候表示执行成功,否则表示执行失败。 <table> <tbody> <tr> <td></td> <td><code>/**</code></td> </tr> <tr> <td></td> <td><code>*</code></td> </tr> <tr> <td></td> <td><code>* @param runnable 需要被执行的任务</code></td> </tr> <tr> <td></td> <td><code>* @param max 是否使用 maximumPoolSize</code></td> </tr> <tr> <td></td> <td><code>* @return boolean</code></td> </tr> <tr> <td></td> <td><code>*/</code></td> </tr> <tr> <td></td> <td><code>public synchronized boolean addWorker(Runnable runnable, boolean max) { </code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>if (ct.get() >= corePoolSize && !max)</code></td> </tr> <tr> <td></td> <td><code>return false;</code></td> </tr> <tr> <td></td> <td><code>if (ct.get() >= maximumPoolSize && max)</code></td> </tr> <tr> <td></td> <td><code>return false;</code></td> </tr> <tr> <td></td> <td><code>Worker worker = new Worker(runnable);</code></td> </tr> <tr> <td></td> <td><code>workers.add(worker);</code></td> </tr> <tr> <td></td> <td><code>Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));</code></td> </tr> <tr> <td></td> <td><code>thread.start();</code></td> </tr> <tr> <td></td> <td><code>return true;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> </tbody> </table> #### 实现Callable #### 这个函数其实比较简单,只需要将传入的Callable对象封装成一个FutureTask对象即可,因为FutureTask实现了Callable和Runnable两个接口,然后将这个结果返回即可,得到这个对象,再调用对象的 get 方法就能够得到结果。 <table> <tbody> <tr> <td></td> <td><code>public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException { </code></td> </tr> <tr> <td></td> <td><code>checkPoolState();</code></td> </tr> <tr> <td></td> <td><code>FutureTask<V> futureTask = new FutureTask<>(task);</code></td> </tr> <tr> <td></td> <td><code>execute(futureTask);</code></td> </tr> <tr> <td></td> <td><code>return futureTask;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> </tbody> </table> #### 拒绝策略的实现 #### 根据前面提到的各种策略的具体实现方式,具体的代码实现如下所示: <table> <tbody> <tr> <td></td> <td><code>private void reject(Runnable runnable) throws InterruptedException { </code></td> </tr> <tr> <td></td> <td><code>switch (policy) { </code></td> </tr> <tr> <td></td> <td><code>case ABORT:</code></td> </tr> <tr> <td></td> <td><code>throw new RuntimeException("task queue is full");</code></td> </tr> <tr> <td></td> <td><code>case CALLER_RUN:</code></td> </tr> <tr> <td></td> <td><code>runnable.run();</code></td> </tr> <tr> <td></td> <td><code>case DISCARD: // 直接放弃这个任务</code></td> </tr> <tr> <td></td> <td><code>return;</code></td> </tr> <tr> <td></td> <td><code>case DISCARD_OLDEST:</code></td> </tr> <tr> <td></td> <td><code>// 放弃等待时间最长的任务 也就是队列当中的第一个任务</code></td> </tr> <tr> <td></td> <td><code>taskQueue.poll();</code></td> </tr> <tr> <td></td> <td><code>execute(runnable); // 重新执行这个任务</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> </tbody> </table> #### 线程池关闭实现 #### 一共两种方式实现线程池关闭: * 直接关闭线程池,不管任务队列当中的任务是否被全部执行完成。 * 安全关闭线程池,先等待任务队列当中所有的任务被执行完成,再关闭线程池,但是在这个过程当中不允许继续提交任务了,这一点已经在函数 checkPoolState 当中实现了。 <table> <tbody> <tr> <td></td> <td><code>// 强制关闭线程池</code></td> </tr> <tr> <td></td> <td><code>public synchronized void stop() { </code></td> </tr> <tr> <td></td> <td><code>isStopped = true;</code></td> </tr> <tr> <td></td> <td><code>for (Worker worker : workers) { </code></td> </tr> <tr> <td></td> <td><code>worker.stopWorker();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public synchronized void shutDown() { </code></td> </tr> <tr> <td></td> <td><code>// 先表示关闭线程池 线程就不能再向线程池提交任务</code></td> </tr> <tr> <td></td> <td><code>isStopped = true;</code></td> </tr> <tr> <td></td> <td><code>// 先等待所有的任务执行完成再关闭线程池</code></td> </tr> <tr> <td></td> <td><code>waitForAllTasks();</code></td> </tr> <tr> <td></td> <td><code>stop();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private void waitForAllTasks() { </code></td> </tr> <tr> <td></td> <td><code>// 当线程池当中还有任务的时候 就不退出循环</code></td> </tr> <tr> <td></td> <td><code>while (taskQueue.size() > 0) { </code></td> </tr> <tr> <td></td> <td><code>Thread.yield();</code></td> </tr> <tr> <td></td> <td><code>try { </code></td> </tr> <tr> <td></td> <td><code>Thread.sleep(1000);</code></td> </tr> <tr> <td></td> <td><code>} catch (InterruptedException e) { </code></td> </tr> <tr> <td></td> <td><code>e.printStackTrace();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> </tbody> </table> #### 工作线程的工作实现 #### <table> <tbody> <tr> <td></td> <td><code>@Override</code></td> </tr> <tr> <td></td> <td><code>public void run() { </code></td> </tr> <tr> <td></td> <td><code>// 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要</code></td> </tr> <tr> <td></td> <td><code>// 放入任务队列再取出来执行了</code></td> </tr> <tr> <td></td> <td><code>firstTask.run();</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>thisThread = Thread.currentThread();</code></td> </tr> <tr> <td></td> <td><code>while (!isStopped) { </code></td> </tr> <tr> <td></td> <td><code>try { </code></td> </tr> <tr> <td></td> <td><code>// 是否使用时间就在这里显示出来了</code></td> </tr> <tr> <td></td> <td><code>Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();</code></td> </tr> <tr> <td></td> <td><code>if (task == null) { </code></td> </tr> <tr> <td></td> <td><code>int i;</code></td> </tr> <tr> <td></td> <td><code>boolean exit = true;</code></td> </tr> <tr> <td></td> <td><code>// 如果当前线程数大于核心线程数 则使用 CAS 去退出 用于保证在线程安全下的退出</code></td> </tr> <tr> <td></td> <td><code>// 且保证线程的个数不小于 corePoolSize 下面这段代码需要仔细分析一下</code></td> </tr> <tr> <td></td> <td><code>if (ct.get() > corePoolSize) { </code></td> </tr> <tr> <td></td> <td><code>do{ </code></td> </tr> <tr> <td></td> <td><code>i = ct.get();</code></td> </tr> <tr> <td></td> <td><code>if (i <= corePoolSize) { </code></td> </tr> <tr> <td></td> <td><code>exit = false;</code></td> </tr> <tr> <td></td> <td><code>break;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}while (!ct.compareAndSet(i, i - 1));</code></td> </tr> <tr> <td></td> <td><code>if (exit) { </code></td> </tr> <tr> <td></td> <td><code>return;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}else { </code></td> </tr> <tr> <td></td> <td><code>task.run();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>} catch (InterruptedException e) { </code></td> </tr> <tr> <td></td> <td><code>// do nothing</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> </tbody> </table> 我们现在来仔细分析一下,线程退出线程池的时候是如何保证线程池当中总的线程数是不小于 corePoolSize 的!首先整体的框架是使用 CAS 进行实现,具体代码为 do ... while 操作,然后在 while 操作里面使用 CAS 进行测试替换,如果没有成功再次获取 ,当线程池当中核心线程的数目小于等于 corePoolSize 的时候也需要退出循环,因为线程池当中线程的个数不能小于 corePoolSize 。因此使用 break 跳出循环的线程是不会退出线程池的。 #### 线程池实现的BUG #### 在我们自己实现的线程池当中当线程退出的时候,workers 当中还保存这指向这个线程的对象,但是当线程退出的时候我们还没有在 workers 当中删除这个对象,因此这个线程对象不会被垃圾回收器收集掉,但是我们这个只是一个线程池实现的例子而已,并不用于生产环境,只是为了帮助大家理解线程池的原理。 #### 完整代码 #### <table> <tbody> <tr> <td></td> <td><code>package cscore.concurrent.java.threadpoolv2;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>import java.util.ArrayList;</code></td> </tr> <tr> <td></td> <td><code>import java.util.concurrent.*;</code></td> </tr> <tr> <td></td> <td><code>import java.util.concurrent.atomic.AtomicInteger;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public class ThreadPool { </code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数</code></td> </tr> <tr> <td></td> <td><code>private int corePoolSize;</code></td> </tr> <tr> <td></td> <td><code>private int maximumPoolSize;</code></td> </tr> <tr> <td></td> <td><code>private long keepAliveTime;</code></td> </tr> <tr> <td></td> <td><code>private TimeUnit unit;</code></td> </tr> <tr> <td></td> <td><code>private BlockingQueue<Runnable> taskQueue;</code></td> </tr> <tr> <td></td> <td><code>private RejectPolicy policy;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private ArrayList<Worker> workers = new ArrayList<>();</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private volatile boolean isStopped;</code></td> </tr> <tr> <td></td> <td><code>private boolean useTimed;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public int getCt() { </code></td> </tr> <tr> <td></td> <td><code>return ct.get();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy</code></td> </tr> <tr> <td></td> <td><code>, int maxTasks) { </code></td> </tr> <tr> <td></td> <td><code>// please add -ea to vm options to make assert keyword enable</code></td> </tr> <tr> <td></td> <td><code>assert corePoolSize > 0;</code></td> </tr> <tr> <td></td> <td><code>assert maximumPoolSize > 0;</code></td> </tr> <tr> <td></td> <td><code>assert keepAliveTime >= 0;</code></td> </tr> <tr> <td></td> <td><code>assert maxTasks > 0;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>this.corePoolSize = corePoolSize;</code></td> </tr> <tr> <td></td> <td><code>this.maximumPoolSize = maximumPoolSize;</code></td> </tr> <tr> <td></td> <td><code>this.unit = unit;</code></td> </tr> <tr> <td></td> <td><code>this.policy = policy;</code></td> </tr> <tr> <td></td> <td><code>this.keepAliveTime = keepAliveTime;</code></td> </tr> <tr> <td></td> <td><code>taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);</code></td> </tr> <tr> <td></td> <td><code>useTimed = keepAliveTime != 0;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>/**</code></td> </tr> <tr> <td></td> <td><code>*</code></td> </tr> <tr> <td></td> <td><code>* @param runnable 需要被执行的任务</code></td> </tr> <tr> <td></td> <td><code>* @param max 是否使用 maximumPoolSize</code></td> </tr> <tr> <td></td> <td><code>* @return boolean</code></td> </tr> <tr> <td></td> <td><code>*/</code></td> </tr> <tr> <td></td> <td><code>public synchronized boolean addWorker(Runnable runnable, boolean max) { </code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>if (ct.get() >= corePoolSize && !max)</code></td> </tr> <tr> <td></td> <td><code>return false;</code></td> </tr> <tr> <td></td> <td><code>if (ct.get() >= maximumPoolSize && max)</code></td> </tr> <tr> <td></td> <td><code>return false;</code></td> </tr> <tr> <td></td> <td><code>Worker worker = new Worker(runnable);</code></td> </tr> <tr> <td></td> <td><code>workers.add(worker);</code></td> </tr> <tr> <td></td> <td><code>Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));</code></td> </tr> <tr> <td></td> <td><code>thread.start();</code></td> </tr> <tr> <td></td> <td><code>return true;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>// 下面这个方法是向线程池提交任务</code></td> </tr> <tr> <td></td> <td><code>public void execute(Runnable runnable) throws InterruptedException { </code></td> </tr> <tr> <td></td> <td><code>checkPoolState();</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>if (addWorker(runnable, false) // 如果能够加入新的线程执行任务 加入成功就直接返回</code></td> </tr> <tr> <td></td> <td><code>|| !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了</code></td> </tr> <tr> <td></td> <td><code>|| addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程</code></td> </tr> <tr> <td></td> <td><code>return;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>// 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务</code></td> </tr> <tr> <td></td> <td><code>if (!taskQueue.offer(runnable))</code></td> </tr> <tr> <td></td> <td><code>reject(runnable);</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private void reject(Runnable runnable) throws InterruptedException { </code></td> </tr> <tr> <td></td> <td><code>switch (policy) { </code></td> </tr> <tr> <td></td> <td><code>case ABORT:</code></td> </tr> <tr> <td></td> <td><code>throw new RuntimeException("task queue is full");</code></td> </tr> <tr> <td></td> <td><code>case CALLER_RUN:</code></td> </tr> <tr> <td></td> <td><code>runnable.run();</code></td> </tr> <tr> <td></td> <td><code>case DISCARD:</code></td> </tr> <tr> <td></td> <td><code>return;</code></td> </tr> <tr> <td></td> <td><code>case DISCARD_OLDEST:</code></td> </tr> <tr> <td></td> <td><code>// 放弃等待时间最长的任务</code></td> </tr> <tr> <td></td> <td><code>taskQueue.poll();</code></td> </tr> <tr> <td></td> <td><code>execute(runnable);</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private void checkPoolState() { </code></td> </tr> <tr> <td></td> <td><code>if (isStopped) { </code></td> </tr> <tr> <td></td> <td><code>// 如果线程池已经停下来了,就不在向任务队列当中提交任务了</code></td> </tr> <tr> <td></td> <td><code>throw new RuntimeException("thread pool has been stopped, so quit submitting task");</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException { </code></td> </tr> <tr> <td></td> <td><code>checkPoolState();</code></td> </tr> <tr> <td></td> <td><code>FutureTask<V> futureTask = new FutureTask<>(task);</code></td> </tr> <tr> <td></td> <td><code>execute(futureTask);</code></td> </tr> <tr> <td></td> <td><code>return futureTask;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>// 强制关闭线程池</code></td> </tr> <tr> <td></td> <td><code>public synchronized void stop() { </code></td> </tr> <tr> <td></td> <td><code>isStopped = true;</code></td> </tr> <tr> <td></td> <td><code>for (Worker worker : workers) { </code></td> </tr> <tr> <td></td> <td><code>worker.stopWorker();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public synchronized void shutDown() { </code></td> </tr> <tr> <td></td> <td><code>// 先表示关闭线程池 线程就不能再向线程池提交任务</code></td> </tr> <tr> <td></td> <td><code>isStopped = true;</code></td> </tr> <tr> <td></td> <td><code>// 先等待所有的任务执行完成再关闭线程池</code></td> </tr> <tr> <td></td> <td><code>waitForAllTasks();</code></td> </tr> <tr> <td></td> <td><code>stop();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private void waitForAllTasks() { </code></td> </tr> <tr> <td></td> <td><code>// 当线程池当中还有任务的时候 就不退出循环</code></td> </tr> <tr> <td></td> <td><code>while (taskQueue.size() > 0) { </code></td> </tr> <tr> <td></td> <td><code>Thread.yield();</code></td> </tr> <tr> <td></td> <td><code>try { </code></td> </tr> <tr> <td></td> <td><code>Thread.sleep(1000);</code></td> </tr> <tr> <td></td> <td><code>} catch (InterruptedException e) { </code></td> </tr> <tr> <td></td> <td><code>e.printStackTrace();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>class Worker implements Runnable { </code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private Thread thisThread;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>private final Runnable firstTask;</code></td> </tr> <tr> <td></td> <td><code>private volatile boolean isStopped;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public Worker(Runnable firstTask) { </code></td> </tr> <tr> <td></td> <td><code>this.firstTask = firstTask;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>@Override</code></td> </tr> <tr> <td></td> <td><code>public void run() { </code></td> </tr> <tr> <td></td> <td><code>// 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要</code></td> </tr> <tr> <td></td> <td><code>// 放入任务队列再取出来执行了</code></td> </tr> <tr> <td></td> <td><code>firstTask.run();</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>thisThread = Thread.currentThread();</code></td> </tr> <tr> <td></td> <td><code>while (!isStopped) { </code></td> </tr> <tr> <td></td> <td><code>try { </code></td> </tr> <tr> <td></td> <td><code>Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();</code></td> </tr> <tr> <td></td> <td><code>if (task == null) { </code></td> </tr> <tr> <td></td> <td><code>int i;</code></td> </tr> <tr> <td></td> <td><code>boolean exit = true;</code></td> </tr> <tr> <td></td> <td><code>if (ct.get() > corePoolSize) { </code></td> </tr> <tr> <td></td> <td><code>do{ </code></td> </tr> <tr> <td></td> <td><code>i = ct.get();</code></td> </tr> <tr> <td></td> <td><code>if (i <= corePoolSize) { </code></td> </tr> <tr> <td></td> <td><code>exit = false;</code></td> </tr> <tr> <td></td> <td><code>break;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}while (!ct.compareAndSet(i, i - 1));</code></td> </tr> <tr> <td></td> <td><code>if (exit) { </code></td> </tr> <tr> <td></td> <td><code>return;</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}else { </code></td> </tr> <tr> <td></td> <td><code>task.run();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>} catch (InterruptedException e) { </code></td> </tr> <tr> <td></td> <td><code>// do nothing</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public synchronized void stopWorker() { </code></td> </tr> <tr> <td></td> <td><code>if (isStopped) { </code></td> </tr> <tr> <td></td> <td><code>throw new RuntimeException("thread has been interrupted");</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>isStopped = true;</code></td> </tr> <tr> <td></td> <td><code>thisThread.interrupt();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> </tbody> </table> #### 线程池测试 #### <table> <tbody> <tr> <td></td> <td><code>package cscore.concurrent.java.threadpoolv2;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>import java.util.concurrent.ExecutionException;</code></td> </tr> <tr> <td></td> <td><code>import java.util.concurrent.RunnableFuture;</code></td> </tr> <tr> <td></td> <td><code>import java.util.concurrent.TimeUnit;</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public class Test { </code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>public static void main(String[] args) throws InterruptedException, ExecutionException { </code></td> </tr> <tr> <td></td> <td><code>var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);</code></td> </tr> <tr> <td></td> <td></td> </tr> <tr> <td></td> <td><code>for (int i = 0; i < 10; i++) { </code></td> </tr> <tr> <td></td> <td><code>RunnableFuture<Integer> submit = pool.submit(() -> { </code></td> </tr> <tr> <td></td> <td><code>System.out.println(Thread.currentThread().getName() + " output a");</code></td> </tr> <tr> <td></td> <td><code>try { </code></td> </tr> <tr> <td></td> <td><code>Thread.sleep(10);</code></td> </tr> <tr> <td></td> <td><code>} catch (InterruptedException e) { </code></td> </tr> <tr> <td></td> <td><code>e.printStackTrace();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>return 0;</code></td> </tr> <tr> <td></td> <td><code>});</code></td> </tr> <tr> <td></td> <td><code>System.out.println(submit.get());</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>int n = 15;</code></td> </tr> <tr> <td></td> <td><code>while (n-- > 0) { </code></td> </tr> <tr> <td></td> <td><code>System.out.println("Number Threads = " + pool.getCt());</code></td> </tr> <tr> <td></td> <td><code>Thread.sleep(1000);</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>pool.shutDown();</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td><code>}</code></td> </tr> <tr> <td></td> <td></td> </tr> </tbody> </table> 上面测试代码的输出结果如下所示: <table> <tbody> <tr> <td></td> <td><code>ThreadPool-Thread-2 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-1 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-3 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-4 output a</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-5 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-2 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-1 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-3 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-4 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-5 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-2 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-1 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-4 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-3 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-5 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-2 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-1 output a</code></td> </tr> <tr> <td></td> <td><code>ThreadPool-Thread-4 output a</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 5</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 3</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 2</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 2</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 2</code></td> </tr> <tr> <td></td> <td><code>Number Threads = 2</code></td> </tr> </tbody> </table> 从上面的代码可以看出我们实现了正确的任务实现结果,同时线程池当中的核心线程数从 2 变到了 5 ,当线程池当中任务队列全部别执行完成之后,线程的数目重新降下来了,这确实是我们想要达到的结果。 ### ### [b392eae98c423ffb029522550f1c1dc5.png]: https://image.dandelioncloud.cn/pgy_files/images/2024/04/01/d3895079776d433884430155833faa69.png
还没有评论,来说两句吧...