java 实现自定义线程池

矫情吗;* 2022-05-23 08:28 297阅读 0赞

java 实现自定义线程池

定义线程池接口

  1. public interface ThreadPool<Job extends Runnable> {
  2. // 执行一个Job
  3. void execute(Job job);
  4. // 关闭线程池
  5. void shutdown();
  6. // 添加工作者线程
  7. void addWorkers(int num);
  8. // 减少工作者线程
  9. void removeWorker(int num);
  10. // 得到正在等待执行的任务数量
  11. int getJobSize();
  12. }

线程池接口的默认实现

  1. public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
  2. // 线程最大限制
  3. private static int MAX_WORKER_NUMBERS = 10;
  4. //线程默认数量
  5. private static int DEFAULT_WORKER_NUMBERS = 5;
  6. // 线程最小数量
  7. private static int MIN_WORKER_NUMBERS = 1;
  8. // 工作列表
  9. private final LinkedList<Job> jobs = new LinkedList<>();
  10. // 工作者列表
  11. private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
  12. // 当前工作者的线程数量
  13. private int workerNum = DEFAULT_WORKER_NUMBERS;
  14. private AtomicLong threadNum = new AtomicLong();
  15. public DefaultThreadPool() {
  16. initializeWorkers(DEFAULT_WORKER_NUMBERS);
  17. }
  18. public DefaultThreadPool(int num) {
  19. workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
  20. initializeWorkers(workerNum);
  21. }
  22. // 初始化线程工作者
  23. private void initializeWorkers(int num) {
  24. for (int i = 0; i < num; i++) {
  25. Worker worker = new Worker();
  26. workers.add(worker);
  27. Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());
  28. thread.start();
  29. }
  30. }
  31. @Override
  32. public void execute(Job job) {
  33. if (job != null) {
  34. synchronized (jobs) {
  35. //添加一个工作 然后通知
  36. jobs.addLast(job);
  37. jobs.notify();
  38. }
  39. }
  40. }
  41. @Override
  42. public void shutdown() {
  43. workers.forEach(Worker::shutdown);
  44. }
  45. @Override
  46. public void addWorkers(int num) {
  47. synchronized (jobs) {
  48. // 限制新增的worker数量不能超过最大值
  49. if (num + this.workerNum > MAX_WORKER_NUMBERS) {
  50. num = MAX_WORKER_NUMBERS - this.workerNum;
  51. }
  52. initializeWorkers(num);
  53. this.workerNum += num;
  54. }
  55. }
  56. @Override
  57. public void removeWorker(int num) {
  58. synchronized (jobs) {
  59. if (num >= this.workerNum) {
  60. throw new IllegalArgumentException("beyond workNum");
  61. }
  62. //按照给定的数量停止worker
  63. int count = 0;
  64. while (count < num) {
  65. Worker worker = workers.get(count);
  66. if (workers.remove(worker)) {
  67. worker.shutdown();
  68. count++;
  69. }
  70. }
  71. this.workerNum -= count;
  72. }
  73. }
  74. @Override
  75. public int getJobSize() {
  76. return jobs.size();
  77. }
  78. // 工作者 负责消费任务
  79. class Worker implements Runnable {
  80. // 是否工作
  81. private volatile boolean running = true;
  82. @Override
  83. public void run() {
  84. while (running) {
  85. Job job;
  86. synchronized (jobs) {
  87. while (jobs.isEmpty()) {
  88. try {
  89. jobs.wait();
  90. } catch (InterruptedException e) {
  91. // 感知到外部对WorkerThread的中斷操作,返回
  92. Thread.currentThread().interrupt();
  93. return;
  94. }
  95. }
  96. job = jobs.removeFirst();
  97. }
  98. if (job != null) {
  99. try {
  100. job.run();
  101. } catch (Exception ex) {
  102. //忽略job执行中的异常
  103. }
  104. }
  105. }
  106. }
  107. public void shutdown() {
  108. running = false;
  109. }
  110. }
  111. }

示例摘抄于《Java并发变成的艺术》4.4.3线程池技术及其示例

发表评论

表情:
评论列表 (有 0 条评论,297人围观)

还没有评论,来说两句吧...

相关阅读

    相关 定义java 线

     为什么要建立线程池? 在多线程项目中,如果建立的线程过多,反而可能导致运行速度大大减慢,这是由于线程建立所花费的时间和资源都比较多。 所以我们在多线程中必须很好地

    相关 定义线

    如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务; 如果当前线程池中的线程数目>=corePoolSize,则每来一个