Java并发编程:(5)线程池解析

怼烎@ 2022-06-07 23:53 322阅读 0赞

思考这样一个问题:

如果并发线程数量多,且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

本节所要说的【线程池】就可以解决这种问题。那么,什么是线程池呢?

线程池是指在初始化一个多线程应用程序过程中创建一个线程集合,然后在需要执行新的任务时重用这些线程而不是新建一个线程。线程池中线程的数量通常完全取决于可用内存数量和应用程序的需求。然而,增加可用线程数量是可能的。线程池中的每个线程都有被分配一个任务,一旦任务已经完成了,线程回到池子中并等待下一次分配任务。

使用线程池的带来的好处(原因);

  1. 线程池改进了一个应用程序的响应时间。由于线程池中的线程已经准备好且等待被分配任务,应用程序可以直接拿来使用而不用新建一个线程。
  2. 线程池节省了CLR 为每个短生存周期任务创建一个完整的线程的开销并可以在任务完成后回收资源。
  3. 线程池根据当前在系统中运行的进程来优化线程时间片
  4. 线程池允许我们开启多个任务而不用为每个线程设置属性
  5. 线程池允许我们为正在执行的任务的程序参数传递一个包含状态信息的对象引用
  6. 线程池可以用来解决处理一个特定请求最大线程数量限制问题

1 ThreadPoolExecutor类

java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的类,下面剖心其源码(提供了四个构造器):

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. .....
  3. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  4. BlockingQueue<Runnable> workQueue);
  5. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
  7. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  8. BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
  9. public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
  10. BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
  11. ...
  12. }

ThreadPoolExecutor类提供了四个构造函数,实际前三个构造函数都是调用第四个构造函数来初始化的。

参数解释:

corePoolSize:核心池的大小;在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程;当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,如果一个线程空闲的时间达到keepAliveTime,则会终止该线程,直到线程池中的线程数不大于corePoolSize;如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

unit:参数keepAliveTime的时间单位,在TimeUnit类中有7种静态属性取值:

  • TimeUnit.DAYS; //天
  • TimeUnit.HOURS; //小时
  • TimeUnit.MINUTES; //分钟
  • TimeUnit.SECONDS; //秒
  • TimeUnit.MILLISECONDS; //毫秒
  • TimeUnit.MICROSECONDS; //微妙
  • TimeUnit.NANOSECONDS; //纳秒

workQueue:一个阻塞队列,用来存储等待执行的任务,是一个比较重要的参数,一般有如下三种取值:

  • ArrayBlockingQueue; 有界队列,来任务则存入队列,队满后创建新线程执行,超过核心线程则拒绝执行。
  • LinkedBlockingQueue;无界队列,来任务则存入队列
  • SynchronousQueue; 来一个任务则创建一个线程执行

一般选择后两种,并且线程池的排队策略与BlockingQueue有关。

threadFactory:线程工厂,主要用来创建线程;

handler**:**表示当拒绝处理任务时的策略,有以下四种取值:

    • ThreadPoolExecutor**.AbortPolicy**:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor**.DiscardPolicy:**也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor**.DiscardOldestPolicy:**丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor**.CallerRunsPolicy:**由调用线程处理该任务

ThreadPoolExecutor继承了AbstractExecutorService,源码为:

  1. public abstract class AbstractExecutorService implements ExecutorService {
  2. protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
  3. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
  4. public Future<?> submit(Runnable task) {};
  5. public <T> Future<T> submit(Runnable task, T result) { };
  6. public <T> Future<T> submit(Callable<T> task) { };
  7. private <T> TdoInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
  8. throws InterruptedException, ExecutionException,TimeoutException {
  9. };
  10. public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
  11. throws InterruptedException, ExecutionException {
  12. };
  13. public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
  14. throws InterruptedException, ExecutionException,TimeoutException {
  15. };
  16. public <T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks)
  17. throws InterruptedException {
  18. };
  19. public <T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
  20. throws InterruptedException {
  21. };
  22. }

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口,ExecutorService源码为:

  1. public interface ExecutorService extends Executor {
  2. void shutdown();
  3. boolean isShutdown();
  4. boolean isTerminated();
  5. boolean awaitTermination(long timeout, TimeUnit unit)
  6. throws InterruptedException;
  7. <T>Future<T> submit(Callable<T> task);
  8. <T>Future<T> submit(Runnable task, T result);
  9. Future<?>submit(Runnable task);
  10. <T>List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  11. throws InterruptedException;
  12. <T>List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
  13. long timeout, TimeUnit unit)
  14. throws InterruptedException;
  15. <T>T invokeAny(Collection<? extends Callable<T>> tasks)
  16. throws InterruptedException, ExecutionException;
  17. <T>T invokeAny(Collection<? extends Callable<T>> tasks,
  18. long timeout, TimeUnit unit)
  19. throws InterruptedException, ExecutionException,TimeoutException;
  20. }

而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

  1. public interface Executor {
  2. void execute(Runnablecommand); //执行传进去的任务
  3. }

ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系:

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

然后ThreadPoolExecutor继承了类AbstractExecutorService。

  • execute()实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,是核心方法,可以向线程池提交任务,交由线程池去执行。
  • submit()是ExecutorService中声明的方法,在AbstractExecutorService有了具体的实现,但在ThreadPoolExecutor中并没有重写,和execute()方法不同,它既可以向线程池提交任务,又能够返回任务执行的结果(去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果)。
  • shutdown()和shutdownNow()是用来关闭线程池的
  • shutdownNow()

还有很多其他的方法:例如getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法。

2 线程池的实现原理

线程池主要包括线程池状态、任务执行、线程初始化、任务缓存队列排队策略、拒绝策略、线程池的关闭和容量动态调整都几个方面。

2.1 线程池状态

ThreadPoolExecutor中定义几个变量:

  1. volatile int runState;
  2. static final int RUNNING = 0;
  3. static final int SHUTDOWN = 1;
  4. static final int STOP = 2;
  5. static final int TERMINATED = 3;

其中static final变量表示线程池状态是 可见性volatile变量runState的可能取值。 

当创建线程池后,线程池处于RUNNING状态;

如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.2 任务执行

 先看一下ThreadPoolExecutor类比较重要成员变量:

  1. private final BlockingQueue<Runnable> workQueue;
  2. //任务缓存队列,用来存放等待执行的任务
  3. private final ReentrantLock mainLock = new ReentrantLock();
  4. //线程池状态锁,对线程池状态(比如线程池大小和线程池状态runState等)的改变都要使用这个锁
  5. private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
  6. private volatile int corePoolSize;
  7. //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
  8. private volatile int maximumPoolSize;
  9. //线程池最大能容忍的线程数,大于线程池大小;当任务繁重的时候,可能超出corPoolSize,但是小于maximumPollSize
  10. private volatile long keepAliveTime; //线程存活时间
  11. private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
  12. private volatile int poolSize; //线程池中当前的线程数
  13. private volatile RejectedExecutionHandler handler; //任务拒绝策略
  14. private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
  15. private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
  16. private long completedTaskCount; //用来记录已经执行完毕的任务个数

任务从提交到最终执行完毕经历了哪些过程?

上面说过,不管是execute()方法还是submit方法提交任务,最后都是通过执行execute()方法实现的,其源码为:

  1. public void execute(Runnablecommand) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. if (poolSize >= corePoolSize ||!addIfUnderCorePoolSize(command)) {
  5. if (runState == RUNNING &&workQueue.offer(command)){
  6. //防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭线程
  7. if (runState != RUNNING || poolSize == 0)
  8. ensureQueuedTaskHandled(command);
  9. // 保证 添加到任务缓存队列中的任务得到处理
  10. }
  11. else if (!addIfUnderMaximumPoolSize(command))
  12. reject(command); //is shutdown or saturated
  13. }
  14. }

代码意思:首先,判断提交的任务command是否为null,若是null,则抛出空指针异。然后,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

如果线程池中当前线程数小于核心池大小,则接着执行后半部分addIfUnderCorePoolSize(command),如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。

语句块内容:如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:addIfUnderMaximumPoolSize(command),如果执行失败,则执行reject()方法进行任务拒绝处理。

  1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {
  2. Thread t = null;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. if (poolSize < corePoolSize && runState ==RUNNING)
  7. t= addThread(firstTask); //创建线程去执行firstTask任务
  8. } finally {
  9. mainLock.unlock();
  10. }
  11. if (t == null)
  12. return false;
  13. t.start();
  14. return true;
  15. }

顾名思义:该方法的意图就是当线程数量低于核心池大小时执行的方法

代码大意:

首先,获取锁,通过if语句判断当前线程池中的线程数目是否小于核心池大小和线程池状态,前面已经判断过了,此处再次判断是为了防止加锁之前向线程池加入了新的线程导致超标和其他线程调用了shutdown或者shutdownNow方法关闭线程池。

其次,执行t= addThread(firstTask),穿进去执行任务的参数来创建一个线程返回来,如果t==null表示创建线程失败,否则表示创建成功则调用t.start()方法启动线程。

  1. private Thread addThread(Runnable firstTask) {
  2. Worker w = new Worker(firstTask);
  3. Thread t = threadFactory.newThread(w); //创建一个线程,执行任务
  4. if (t != null) {
  5. w.thread= t; //将创建的线程的引用赋值为w的成员变量
  6. workers.add(w);
  7. int nt = ++poolSize; //当前线程数加1
  8. if (nt > largestPoolSize)
  9. largestPoolSize= nt;
  10. }
  11. return t;
  12. }

首先,用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程;

然后,将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。

Worker类的实现:

  1. private final class Worker implements Runnable {
  2. private final ReentrantLock runLock = new ReentrantLock();
  3. private Runnable firstTask;
  4. volatile long completedTasks;
  5. Threadthread;
  6. Worker(RunnablefirstTask) { //传进去了一个Runnable任务
  7. this.firstTask= firstTask;
  8. }
  9. boolean isActive() {
  10. return runLock.isLocked();
  11. }
  12. void interruptIfIdle() {
  13. final ReentrantLock runLock = this.runLock;
  14. if (runLock.tryLock()) {
  15. try {
  16. if (thread != Thread.currentThread())
  17. thread.interrupt();
  18. } finally {
  19. runLock.unlock();
  20. }
  21. }
  22. }
  23. void interruptNow() {
  24. thread.interrupt();
  25. }
  26. private void runTask(Runnabletask) {
  27. final ReentrantLock runLock = this.runLock;
  28. runLock.lock();
  29. try {
  30. if (runState < STOP&&Thread.interrupted()&& runState>= STOP)
  31. boolean ran = false;
  32. beforeExecute(thread,task);
  33. //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
  34. //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等
  35. try {
  36. task.run();
  37. ran= true;
  38. afterExecute(task, null);
  39. ++completedTasks;
  40. } catch (RuntimeException ex) {
  41. if (!ran)
  42. afterExecute(task,ex);
  43. throw ex;
  44. }
  45. } finally {
  46. runLock.unlock();
  47. }
  48. }
  49. public void run() { //核心方法
  50. try {
  51. Runnabletask = firstTask;
  52. firstTask= null;
  53. while (task != null || (task = getTask()) != null) {
  54. runTask(task);
  55. task= null;
  56. }
  57. } finally {
  58. workerDone(this); //当任务队列中没有任务时,进行清理工作
  59. }
  60. }
  61. }

核心方法run():首先执行通过构造器传进来的任务firstTask:调用runTask()执行,执行完任务之后,在while循环里面不断通过从缓存队列里getTask()去取新的任务来执行。

但是getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:

  1. Runnable getTask() {
  2. for (;;) {
  3. try {
  4. int state = runState;
  5. if (state > SHUTDOWN)
  6. return null;
  7. Runnabler;
  8. if (state == SHUTDOWN) // Help drain queue
  9. r= workQueue.poll();
  10. else if (poolSize >corePoolSize || allowCoreThreadTimeOut)
  11. //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
  12. //则通过poll取任务,若等待一定的时间取不到任务,则返回null
  13. r= workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
  14. else
  15. r= workQueue.take();
  16. if (r != null)
  17. return r;
  18. if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出
  19. if (runState >= SHUTDOWN) // Wake up others
  20. interruptIdleWorkers(); //中断处于空闲状态的worker
  21. return null;
  22. }
  23. } catch (InterruptedException ie) { }
  24. }
  25. }

先判断当前线程池状态:

如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

   如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

  如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出

退出判断方法:

  1. private boolean workerCanExit() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. boolean canExit;
  5. //如果runState大于等于STOP,或者任务缓存队列为空了
  6. //或者 允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
  7. try {
  8. canExit= runState >= STOP ||
  9. workQueue.isEmpty()||
  10. (allowCoreThreadTimeOut&&
  11. poolSize> Math.max(1, corePoolSize));
  12. } finally {
  13. mainLock.unlock();
  14. }
  15. return canExit;
  16. }

如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出,然后调用interruptIdleWorkers()中断处于空闲状态的worker,中断方法interruptIdleWorkers()的实现:

  1. void interruptIdleWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) //实际上调用的是worker的interruptIfIdle()方法
  6. w.interruptIfIdle();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }
  11. void interruptIfIdle() {
  12. final ReentrantLock runLock = this.runLock;
  13. if (runLock.tryLock()){
  14. //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
  15. 如果成功获取了锁,说明当前worker处于空闲状态
  16. try {
  17. if (thread !=Thread.currentThread())
  18. thread.interrupt();
  19. } finally { runLock.unlock(); }
  20. }
  21. }

此处直接让空闲线程去任务缓存队列去取任务执行,而非把任务分派给线程,因为这样需要额外的管理,会增加难度和复杂度。

addIfUnderMaximumPoolSize方法的实现方法类似于addIfUnderCorePoolSize方法,唯一区别在于前者是的执行条件是:

线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败:

  1. private boolean addIfUnderMaximumPoolSize(Runnable firstTask){
  2. Thread t = null;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. if (poolSize < maximumPoolSize &&runState == RUNNING)
  7. t= addThread(firstTask);
  8. } finally {
  9. mainLock.unlock();
  10. }
  11. if (t == null)
  12. return false;
  13. t.start();
  14. return true;
  15. }

任务提交给线程池之后到被执行的整个过程:

  1. 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  2. 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  3. 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  4. 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

2.3 初始化线程池中的线程

 默认情况下,线程池创建后线程数为0,提交任务之后才会创建线程。

但是也可以提前预创建线程:两种方式

· prestartCoreThread():初始化一个核心线程;

· prestartAllCoreThreads():初始化所有核心线程

  1. public boolean prestartCoreThread() {
  2. return addIfUnderCorePoolSize(null);
  3. //传进null,会阻塞在getTask方法中的r = workQueue.take(),等待任务队列中有新任务。
  4. }
  5. public int prestartAllCoreThreads() {
  6. int n = 0;
  7. while (addIfUnderCorePoolSize(null)) //传进参数null……
  8. ++n;
  9. return n;
  10. }

2.4任务缓存队列及排队策略

  任务缓存队列—-workQueue,类型为BlockingQueue,用于存放等待的任务,通常有以下三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,创建时必指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,若创建时没有指定大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:特殊队列,不保存任务而是新建线程来执行提交的任务。

2.5 任务拒绝策略

当线程池的任务缓存队列并且线程池中的线程数目达到maximumPoolSize时,如果仍有任务提交则采取任务拒绝策略,四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:丢弃任务但不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

2.6 关闭线程池

ThreadPoolExecutor提供了两个方法shutdown()和shutdownNow()用于关闭线程池:

shutdown():等任务缓存队列中所有任务都执行完后终止线程池,不再接受新任务。

shutdownNow():打断正在执行的任务立即终止线程池,并清空任务缓存队列,返回未执行的任务

2.7 动态调整线程池容量

ThreadPoolExecutor提供了动态调整线程池容量大小的方法,通过调整线程池容量,可以立即创建新的线程执行任务:

1)setCorePoolSize():设置核心池大小

2)setMaximumPoolSize():设置线程池最大能创建的线程数目大小

3 线程池的具体应用

具体应用例子代码:

  1. public class Test {
  2. public static void main(String[] args) {
  3. ThreadPoolExecutorexecutor = new ThreadPoolExecutor(5, 10, 200,TimeUnit.MILLISECONDS,
  4. new ArrayBlockingQueue<Runnable>(5));
  5. for(int i=0;i<15;i++){
  6. MyTaskmyTask = new MyTask(i);
  7. executor.execute(myTask);
  8. System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
  9. executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
  10. }
  11. executor.shutdown();
  12. }
  13. }
  14. class MyTask implements Runnable {
  15. private int taskNum;
  16. public MyTask(int num) {
  17. this.taskNum= num;
  18. }
  19. @Override
  20. public void run() {
  21. System.out.println("正在执行task "+taskNum);
  22. try {
  23. Thread.currentThread().sleep(4000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println("task"+taskNum+"执行完毕");
  28. }
  29. }

执行结果:

  1. 正在执行task 0
  2. 线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
  3. 线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
  4. 正在执行task 1
  5. 线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
  6. 正在执行task 2
  7. 线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
  8. 正在执行task 3
  9. 线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
  10. 正在执行task 4
  11. 线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
  12. 线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
  13. 线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
  14. 线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
  15. 线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
  16. 线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
  17. 正在执行task 10
  18. 线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
  19. 正在执行task 11
  20. 线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
  21. 正在执行task 12
  22. 线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
  23. 正在执行task 13
  24. 线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
  25. 正在执行task 14
  26. task 3执行完毕
  27. task 0执行完毕
  28. task 2执行完毕
  29. task 1执行完毕
  30. 正在执行task 8
  31. 正在执行task 7
  32. 正在执行task 6
  33. 正在执行task 5
  34. task 4执行完毕
  35. task 10执行完毕
  36. task 11执行完毕
  37. task 13执行完毕
  38. task 12执行完毕
  39. 正在执行task 9
  40. task 14执行完毕
  41. task 8执行完毕
  42. task 5执行完毕
  43. task 7执行完毕
  44. task 6执行完毕
  45. task 9执行完毕

分析其执行结果:

当线程池中线程数目大于5时,将任务放入任务缓存队列;

当任务缓存队列满了之后,便创建新的线程。

如果将for循环中执行任务数量修改为20,则会抛出任务拒绝异常。

但是,javadoc提倡使用Executors类中的几个静态方法来创建线程池:

  1. public static ExecutorService newSingleThreadExecutor() { //创建容量为1的缓冲池
  2. return new FinalizableDelegatedExecutorService
  3. (new ThreadPoolExecutor(1, 1,
  4. 0L,TimeUnit.MILLISECONDS,
  5. new LinkedBlockingQueue<Runnable>()));
  6. }
  7. public static ExecutorService newFixedThreadPool(int nThreads) { //创建固定容量大小的缓冲池
  8. return new ThreadPoolExecutor(nThreads, nThreads,
  9. 0L,TimeUnit.MILLISECONDS,
  10. new LinkedBlockingQueue<Runnable>());
  11. }
  12. public static ExecutorService newCachedThreadPool() { //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
  13. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  14. 60L,TimeUnit.SECONDS,
  15. new SynchronousQueue<Runnable>());
  16. }

上述方法实际上也是调用了ThreadPoolExecutor,只不过参数都已配置:

newCachedThreadPool:使用SynchronousQueue,来任务时即时创建线程(线程空闲超过60秒则自动销毁),corePoolSize=0;

corePoolSize和maximumPoolSize:使用LinkedBlockingQueue,corePoolSize=maximumPoolSize=1;

newFixedThreadPool:使用LinkedBlockingQueue,corePoolSize=maximumPoolSize;

4 线程池大小分配

4.1 一般参考方法

要想合理的配置线程池的大小,首先得分析任务的特性,可以从以下几个角度分析:

  1. 任务的性质:CPU密集型任务、IO密集型任务、混合型任务。
  2. 任务的优先级:高、中、低。
  3. 任务的执行时间:长、中、短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接等。

性质不同的任务可以交给不同规模的线程池执行。

CPU密集型任务应配置尽可能小的线程,可以设为 1+NCPU

IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,为了提高CPU利用率,应加大线程数量,可以设置为1+2*NCPU

对于混合型任务,如果可以拆分,则拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。

若任务对其他系统资源有依赖,例如,需要链接数据库返回结果,等待时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。

当然具体合理线程池值大小,需要结合系统实际情况,在大量的尝试下比较才能得出,以上只是人总结的规律。

4.2 线程池容量计算公式

下面是别人得到的一个具体的公式:

最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目

化简得到:

最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

得出结论:线程等待时间所占比例越高,需要越多线程;线程CPU时间所占比例越高,需要越少线程。

4.3 线程池容量调整经验

高并发、任务执行时间短的业务怎样使用线程池?

高并发、任务执行时间长的业务怎样使用线程池?

非高并发、任务执行时间长的业务怎样使用线程池?

(1)高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换 。

(2)并发高、业务执行时间长,要从整体架构的设计来着手,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设 置,设置参考(3)。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件对任务进行拆分和解耦。

(3)并发不高、任务执行时间长的业务要区分开看:

a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务

b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,可以线程池中的线程数设置得少一些,减少线程上下文的切换 。

发表评论

表情:
评论列表 (有 0 条评论,322人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java并发框架:线使用案例

    在Java编程中,线程池是一个非常重要的并发工具。它能够重复利用已经创建的线程,避免频繁地创建和销毁线程,从而提高了程序的运行效率。 下面我们将通过一个具体的使用案例来解析线

    相关 Java并发编程:(5线

    思考这样一个问题: 如果并发线程数量多,且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一