Java 并发编程 | 线程池详解 小鱼儿 2021-12-24 02:29 336阅读 0赞 原文: [https://chenmingyu.top/concurrent-threadpool/][https_chenmingyu.top_concurrent-threadpool] ### 线程池 ### 线程池用来处理异步任务或者并发执行的任务 优点: 1. 重复利用已创建的线程,减少创建和销毁线程造成的资源消耗 2. 直接使用线程池中的线程,提高响应速度 3. 提高线程的可管理性,由线程池同一管理 #### ThreadPoolExecutor #### `java`中线程池使用`ThreadPoolExecutor`实现 ##### 构造函数 ##### `ThreadPoolExecutor`提供了四个构造函数,其他三个构造函数最终调用的都是下面这个构造函数 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } **入参:** 1. `corePoolSize`:线程池的核心线程数量 线程池维护的核心线程数量,当线程池初始化后,核心线程数量为零,当有任务来到的时候才会创建线程去执行任务,当线程池中的工作线程数量等于核心线程数量时,新到的任务就会放到缓存队列中 2. `maximumPoolSize`:线程池允许创建的最大线程数量 当阻塞队列满了的时候,并且线程池中创建的线程数量小于`maximumPoolSize`,此时会创建新的线程执行任务 3. `keepAliveTime`:线程活动保持时间 只有当线程池数量大于核心线程数量时,`keepAliveTime`才会有效,如果当前线程数量大于核心线程数量时,并且线程的空闲时间达到`keepAliveTime`,当前线程终止,直到线程池数量等于核心线程数 4. `unit`:线程活动保持时间的单位 `keepAliveTime`的单位,包括:`TimeUnit.DAYS`天,`TimeUnit.HOURS`小时,`TimeUnit.MINUTES`分钟,`TimeUnit.SECONDS`秒,`TimeUnit.MILLISECONDS`毫秒,`TimeUnit.MICROSECONDS`微秒,`TimeUnit.NANOSECONDS`纳秒 5. `workQueue`:任务队列,用来保存等待执行任务的阻塞队列 `ArrayBlockingQueue`:是一个基于数组结构的有界队列 `LinkedBlockingQueue`:是一个基于链表结构的阻塞队列 `SynchronousQueue`:不存储元素的阻塞队列,每一个插入操作必须等到下一个线程调用移除操作,否则插入操作一直阻塞 `PriorityBlockingQueue`:一个具有优先级的无线阻塞队列 6. `threadFactory`:用来创建线程的工厂 7. `handler`:饱和策略,当线程池和队列都满了的时候,必须要采取一种策略处理新的任务,默认策略是`AbortPolicy`,根据自己需求选择合适的饱和策略 `AbortPolicy`:直接抛出异常 `CallerRunsPolicy`:用调用者所在的线程来运行当前任务 `DiscardOldestPolicy`:丢弃队列里面最近的一个任务,并执行当前任务 `DiscardPolicy`:不处理,丢弃掉 当然我们也可以通过实现`RejectedExecutionHandler`去自定义实现处理策略 入参不同,线程池的运行机制也不同,了解每个入参的含义由于我们更透传的理解线程池的实现原理 ##### 提交任务 ##### 线程池处理提交任务流程如下 ![16a9bdf44aca1dc6?w=700&h=410&f=png&s=112313][16a9bdf44aca1dc6_w_700_h_410_f_png_s_112313] **处理流程**: 1. 如果核心线程数量未满,创建线程执行任务,否则添加到阻塞队列中 2. 如果阻塞队列中未满,将任务存到队列里 3. 如果阻塞队列满了,看线程池数量是否达到了线程池最大数量,如果没达到,创建线程执行任务 4. 如果已经达到线程池最大数量,根据饱和策略进行处理 `ThreadPoolExecutor`使用`execute(Runnable command)`和`submit(Runnable task)`向线程池中提交任务,在`submit(Runnable task)`方法中调用了`execute(Runnable command)`,所以我们只要了解`execute(Runnable command)` public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取线程池状态,并且可以通过ctl获取到当前线程池数量及线程池状态 int c = ctl.get(); // 如果工作线程数小于核心线程数量,则创建一个新线程执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 如果不符合上面条件,当前线程处于运行状态并且写入阻塞队列成功 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 双重检查,再次获取线程状态,如果当前线程状态变为非运行状态,则从队列中移除任务,执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 检查工作线程数量是否为0 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //创建线程执行任务,如果添加失败则执行拒绝策略 else if (!addWorker(command, false)) reject(command); } `execute(Runnable command)`方法中我们比较关心的就是如何创建新的线程执行任务,就`addWorker(command, true)`方法 `workQueue.offer(command)`方法是用来向阻塞队列中添加任务的 `reject(command)`方法会根据创建线程池时传入的饱和策略对任务进行处理,例如默认的`AbortPolicy`,查看源码后知道就是直接抛了个`RejectedExecutionException`异常,其他的饱和策略的源码也是特别简单 关于线程池状态与工作线程的数量是如何表示的 在`ThreadPoolExecutor`中使用一个`AtomicInteger`类型变量表示 /** * ctl表示两个信息,一个是线程池的状态(高3位表示),一个是当前线程池的数量(低29位表示),这个跟我们前面 * 说过的读写锁的state变量是一样的,以一个变量记录两个信息,都是以利用int的32个字节,高十六位表述读,低十 * 六位表示写锁 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //低29位保存线程池数量 private static final int COUNT_BITS = Integer.SIZE - 3; //线程池最大容量 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 运行状态存储在高3位 // 运行状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; `addWorker(command, boolean)`创建工作线程,执行任务 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 线程池状态 int rs = runStateOf(c); // 判断线程池状态,以及阻塞队列是否为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程工作线程数量 int wc = workerCountOf(c); // 判断是否大于最大容量,以及根据传入的core判断是否大于核心线程数量还是最大线程数量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增加工作线程数量 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果线程池状态改变,则重试 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建Worker,内部创建了一个新的线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 线程池状态判断 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将创建的线程添加到线程池 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //执行任务,首先会执行Worker对象的firstTask t.start(); workerStarted = true; } } } finally { //如果任务执行失败 if (! workerStarted) //移除worker addWorkerFailed(w); } return workerStarted; } ##### 关闭线程池 ##### `ThreadPoolExecutor`中关闭线程池使用`shutdown()`和`shutdownNow()`方法,原理都是通过遍历线程池中的线程,对线程进行中断 for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } ##### Executor框架 ##### `Executor`框架将任务的提交与任务的执行进行分离 `Executors`提供了一系列工厂方法用于创先线程池,返回的线程池都实现了 `ExecutorService` 接口 工厂方法: 1. `newFixedThreadPool`:用于创建固定数目线程的线程池 2. `newCachedThreadPool`:用于创建一个可缓存的线程池,调用execute将重用以前构造的线程,如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程 3. `newSingleThreadExecutor`:用于创建只有一个线程的线程池 4. `newScheduledThreadPool`:用于创建一个支持定时及周期性的任务执行的线程池 在阿里巴巴手册中强制要求禁止使用`Executors`提供的工厂方法创建线程池 ![16a9bdf88dd8c735?w=603&h=176&f=png&s=63774][16a9bdf88dd8c735_w_603_h_176_f_png_s_63774] 这个确实是一个很严重的问题,我们部门曾经就出现过使用`FixedThreadPool`线程池,导致OOM,这是因为线程执行任务的时候被阻塞或耗时很长时间,导致阻塞队列一直在添加任务,直到内存被打满,报OOM 所以我们在使用线程池的时候要使用`ThreadPoolExecutor`的构造函数去创建线程池,根据自己的任务类型来确定核心线程数和最大线程数,选择适合阻塞队列和阻塞队列的长度 ##### 合理配置线程池 ##### 合理的配置线程池需要分析一下任务的性质(使用`ThreadPoolExecutor`创建线程池): 1. CPU密集型任务应配置竟可能小的线程,比如 cpu数量+1 2. IO密集型任务并不是一直在执行任务,应该配置尽可能多的线程,比如 cpu数量x2 可通过`Runtime.getRuntime().availableProcessors()`获取cpu数量 3. 执行的任务有调用外部接口比较费时的时候,这时cup空闲的时间就越长,可以将线程池数量设置大一些,这样cup空闲的时间就可以去执行别的任务 4. 建议使用有界队列,可根据需要将长度设置大一些,防止OOM **参考:java并发编程的艺术** **推荐阅读**: [java并发编程 | 线程详解][java_ _] [java并发编程 | 锁详解:AQS,Lock,ReentrantLock,ReentrantReadWriteLock][java_ _ _AQS_Lock_ReentrantLock_ReentrantReadWriteLock] 转载于:https://www.cnblogs.com/cmyxn/p/10872084.html [https_chenmingyu.top_concurrent-threadpool]: https://chenmingyu.top/concurrent-threadpool/ [16a9bdf44aca1dc6_w_700_h_410_f_png_s_112313]: https://user-gold-cdn.xitu.io/2019/5/9/16a9bdf44aca1dc6?w=700&h=410&f=png&s=112313 [16a9bdf88dd8c735_w_603_h_176_f_png_s_63774]: https://user-gold-cdn.xitu.io/2019/5/9/16a9bdf88dd8c735?w=603&h=176&f=png&s=63774 [java_ _]: https://chenmingyu.top/concurrent-thread/ [java_ _ _AQS_Lock_ReentrantLock_ReentrantReadWriteLock]: https://chenmingyu.top/concurrent-lock/
还没有评论,来说两句吧...