SpringBoot中@Scheduled实现多线程并发定时任务

偏执的太偏执、 2023-10-01 11:09 77阅读 0赞

SpringBoot中@Scheduled实现多线程并发定时任务

1.背景

  • Spring Boot实现定时任务非常容易,只需要使用Spring自带的Schedule注解

    @Scheduled(cron = “0 /1 ?”)

    1. public void cancleOrderTask() {
    2. //实现业务
    3. }
  • 记得在启动类中开启定时任务

    1. @EnableScheduling //开启定时任务
  • 定时任务开启成功,但所有的任务都是在同一个线程池中的同一个线程来完成的。在实际开发过程中,我们当然不希望所有的任务都运行在一个线程中

    image-20211229143442078

2.方案解决

首选:

  1. package com.example.demo.test;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.context.annotation.Primary;
  5. import org.springframework.scheduling.annotation.EnableAsync;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. @Configuration
  9. public class ThreadPoolConfig {
  10. /** 获取当前系统的CPU 数目*/
  11. static int cpuNums = Runtime.getRuntime().availableProcessors();
  12. /** 线程池核心池的大小*/
  13. private static int corePoolSize = cpuNums*2+1;
  14. /** 线程池的最大线程数*/
  15. private static int maximumPoolSize = cpuNums * 5;
  16. /**
  17. * @Primary 优先使用该全局配置线程池
  18. * 如果不加@primary @async注解默认采用SimpleAsyncTaskExecutor
  19. * 不加@primary 可使用@async("threadPoolTaskExecutor")指定线程池
  20. */
  21. @Primary
  22. @Bean
  23. public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
  24. ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
  25. /** 核心线程数,默认为1 **/
  26. threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
  27. /** 最大线程数,默认为Integer.MAX_VALUE **/
  28. threadPoolTaskExecutor.setMaxPoolSize(maximumPoolSize);
  29. /** 队列最大长度,一般需要设置值: 大于等于notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE **/
  30. threadPoolTaskExecutor.setQueueCapacity(50);
  31. /** 线程池维护线程所允许的空闲时间,默认为60s **/
  32. threadPoolTaskExecutor.setKeepAliveSeconds(60);
  33. /**
  34. * 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
  35. *
  36. * AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常
  37. * CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度
  38. * DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行
  39. * DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行
  40. */
  41. threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  42. threadPoolTaskExecutor.setThreadNamePrefix("task--thread");
  43. //执行初始化会自动执行afterPropertiesSet()初始化
  44. threadPoolTaskExecutor.initialize();
  45. return threadPoolTaskExecutor;
  46. }
  47. }

image-20211231164217732

方案一:

1:通过ScheduleConfig配置文件实现SchedulingConfigurer接口,并重写setSchedulerfang方法

  1. package com.lds.springbootdemo.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.TaskScheduler;
  5. import org.springframework.scheduling.annotation.SchedulingConfigurer;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  7. import org.springframework.scheduling.config.ScheduledTaskRegistrar;
  8. import java.util.concurrent.Executor;
  9. import java.util.concurrent.Executors;
  10. @Configuration
  11. public class ScheduledConfig implements SchedulingConfigurer {
  12. @Override
  13. public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
  14. scheduledTaskRegistrar.setScheduler(setTaskExecutors());
  15. }
  16. @Bean(destroyMethod="shutdown")
  17. public Executor setTaskExecutors(){
  18. // 10个线程来处理。
  19. return Executors.newScheduledThreadPool(10);
  20. }
  21. }

image-20211229143639978

2:创建Bean

  1. package com.example.morningrundata.config;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.TaskScheduler;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  6. @Configuration
  7. public class TaskSchedulerConfig {
  8. //线程池应该交给容器管理
  9. @Bean
  10. public TaskScheduler taskScheduler() {
  11. ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
  12. scheduler.setPoolSize(10);
  13. return scheduler;
  14. }
  15. }

方案二:

1.@Async异步+线程池的两种方式

  1. 在启动类加上@EnableAsync(不一定是启动类,可以是controller、service等启动时加载)

    1. package com.example.worktest.async;
    2. @SpringBootApplication
    3. @EnableAsync
    4. public class AsyncApplication {
    5. public static void main(String[] args) {
    6. SpringApplication.run(AsyncApplication.class, args);
    7. }
    8. }
  2. @Async注解,可以在类,方法,controller,service

    1. package com.example.morningrundata.task;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.scheduling.annotation.Async;
    4. import org.springframework.scheduling.annotation.EnableScheduling;
    5. import org.springframework.scheduling.annotation.Scheduled;
    6. import org.springframework.stereotype.Component;
  1. /**
  2. * 定时查询学生晨跑记录
  3. * @author Administrator
  4. */
  5. @Component
  6. @Slf4j
  7. @EnableScheduling
  8. @Async
  9. public class TimerProcessTaskTest {
  10. @Scheduled(cron = "0/2 * * * * ?")
  11. public void doTask() throws InterruptedException {
  12. log.info(Thread.currentThread().getName()+"===task run");
  13. Thread.sleep(5);
  14. }
  15. @Scheduled(cron = "0/2 * * * * ?")
  16. public void doTask1() throws InterruptedException {
  17. log.info(Thread.currentThread().getName()+"===task end");
  18. }
  19. }
  20. ![image-20211229145908541][]
  1. 解释

    @Async异步方法默认使用Spring创建ThreadPoolTaskExecutor(参考TaskExecutionAutoConfiguration),

    其中默认核心线程数为8, 默认最大队列和默认最大线程数都是Integer.MAX_VALUE. 创建新线程的条件是队列填满时, 而

    这样的配置队列永远不会填满, 如果有@Async注解标注的方法长期占用线程(比如HTTP长连接等待获取结果),

    在核心8个线程数占用满了之后, 新的调用就会进入队列, 外部表现为没有执行.

    image-20211229150456698

    1. 解决:
    2. 手动配置相应属性即可. 比如
    3. spring.task.execution.pool.queueCapacity=4
    4. spring.task.execution.pool.coreSize=20
    5. 备注:
    6. 此处没有配置maxSize, 仍是默认的Integer.MAX_VALUE. 如果配置的话, 请考虑达到最大线程数时的处理策略(JUC包查找RejectedExecutionHandler的实现类)
    7. (默认为拒绝执行AbortPolicy, 即抛出异常)
    8. AbortPolicy: 直接抛出java.util.concurrent.RejectedExecutionException异常
    9. CallerRunsPolicy: 主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度
    10. DiscardOldestPolicy: 抛弃旧的任务
    11. DiscardPolicy: 抛弃当前任务
    12. //更好的解释
    13. AbortPolicy:直接抛出 RejectedExecutionException 异常并阻止系统正常运行。
    14. CallerRunsPolicy:“调用者运行”机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,由调用者来完成任务。
    15. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
    16. DiscarePolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案。
    17. package com.example.morningrundata.config;
    18. import org.springframework.context.annotation.Configuration;
    19. import org.springframework.scheduling.annotation.AsyncConfigurer;
    20. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    21. import java.util.concurrent.Executor;
    22. import java.util.concurrent.ThreadPoolExecutor;
    23. @Configuration
    24. public class TaskExecutorConfig implements AsyncConfigurer {
    25. /**
    26. * Set the ThreadPoolExecutor's core pool size.
    27. */
    28. private static final int CORE_POOL_SIZE = 5;
    29. /**
    30. * Set the ThreadPoolExecutor's maximum pool size.
    31. */
    32. private static final int MAX_POOL_SIZE = 5;
    33. /**
    34. * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
    35. */
    36. private static final int QUEUE_CAPACITY = 1000;
    37. /**
    38. * 通过重写getAsyncExecutor方法,制定默认的任务执行由该方法产生
    39. * <p>
    40. * 配置类实现AsyncConfigurer接口并重写getAsyncExcutor方法,并返回一个ThreadPoolTaskExevutor
    41. * 这样我们就获得了一个基于线程池的TaskExecutor
    42. */
    43. @Override
    44. public Executor getAsyncExecutor() {
    45. ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    46. //cpu核数*2+1
    47. taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
    48. taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
    49. taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
    50. taskExecutor.setThreadNamePrefix("test-");
    51. taskExecutor.setKeepAliveSeconds(3);
    52. taskExecutor.initialize();
    53. //设置线程池拒绝策略,四种线程池拒绝策略,具体使用哪种策略,还得根据实际业务场景才能做出抉择
    54. taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    55. return taskExecutor;
    56. }
    57. }

    4.彻彻底底解决Spring中@EnableAsync、@Async异步调用的使用、原理及源码分析源码解释如下:https://www.jianshu.com/p/5f3bf8a12e26

    配置文件:

    1. #核心线程数
    2. spring.task.execution.pool.core-size=200
    3. #最大线程数
    4. spring.task.execution.pool.max-size=1000
    5. #空闲线程保留时间
    6. spring.task.execution.pool.keep-alive=3s
    7. #队列容量
    8. spring.task.execution.pool.queue-capacity=1000
    9. #线程名称前缀
    10. spring.task.execution.thread-name-prefix=test-thread-
    11. spring:
    12. profiles:
    13. # active: prod
    14. active: test
    15. #自用
    16. task:
    17. execution:
    18. pool:
    19. core-size: 10 #cpu核数*2+1
    20. keep-alive: 3s
    21. max-size: 20
    22. queue-capacity: 200
    23. thread-name-prefix: thread-

    配置类是TaskExecutionProperties【org.springframework.boot.autoconfigure.task.TaskExecutionProperties】

3.springboot的线程池的创建的两种方法

  1. 使用static代码块创建

    这样的方式创建的好处是当代码用到线程池的时候才会初始化核心线程数

    1. public class HttpApiThreadPool {
    2. /** 获取当前系统的CPU 数目*/
    3. static int cpuNums = Runtime.getRuntime().availableProcessors();
    4. /** 线程池核心池的大小*/
    5. private static int corePoolSize = 10;
    6. /** 线程池的最大线程数*/
    7. private static int maximumPoolSize = cpuNums * 5;
    8. public static ExecutorService httpApiThreadPool = null;
  1. /**
  2. * 静态方法
  3. */
  4. static{
  5. System.out.println("创建线程数:"+corePoolSize+",最大线程数:"+maximumPoolSize);
  6. //建立10个核心线程,线程请求个数超过20,则进入队列等待
  7. httpApiThreadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
  8. TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build());
  9. }
  10. }
  11. 使用方法:
  12. public static void main(String[] args) {
  13. HttpApiThreadPool.httpApiThreadPool.execute(()->System.out.println("测试"));
  14. }
  15. **注意:**
  16. 1.不能使用**Executors**的方法创建线程池,这个是大量的生产事故得出来的结论
  17. 2.maximumPoolSize本程序使用的是cup数的5倍,你可以看你实际情况用
  18. 3.new ThreadFactoryBuilder().setNameFormat(“PROS-%d”).build() 给每个线程已名字,可以方便调试
  1. 使用static代码块创建

    1. @Configuration
    2. public class TreadPoolConfig {
    3. private Logger logger = LoggerFactory.getLogger(TreadPoolConfig.class);
    4. /** 获取当前系统的CPU 数目*/
    5. int cpuNums = Runtime.getRuntime().availableProcessors();
    6. /** 线程池核心池的大小*/
    7. private int corePoolSize = 10;
    8. /** 线程池的最大线程数*/
    9. private int maximumPoolSize = cpuNums * 5;
    10. /**
    11. * 消费队列线程
    12. * @return
    13. */
    14. @Bean(value = "httpApiThreadPool")
    15. public ExecutorService buildHttpApiThreadPool(){
    16. logger.info("TreadPoolConfig创建线程数:"+corePoolSize+",最大线程数:"+maximumPoolSize);
    17. ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L,
    18. TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),new ThreadFactoryBuilder().setNameFormat("PROS-%d").build());
    19. return pool ;
    20. }
    21. }

    使用方法:

    1. //注入
    2. @Resource
    3. private TreadPoolConfig treadPoolConfig;
    4. //调用
    5. public void test() {
    6. treadPoolConfig.buildHttpApiThreadPool().execute(()->System.out.println("tre"));
    7. }

4.其他创建线程池的方法(没有用过)

  1. 推荐方式1:
    首先引入:commons-lang3包

    1. ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    2. new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
  2. 推荐方式 2:
    首先引入:com.google.guava包

    1. ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
    2. .setNameFormat("demo-pool-%d").build();
    3. //Common Thread Pool
    4. ExecutorService pool = new ThreadPoolExecutor(5, 200,
    5. 0L, TimeUnit.MILLISECONDS,
    6. new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    7. pool.execute(()-> System.out.println(Thread.currentThread().getName()));
    8. pool.shutdown();//gracefully shutdown
  3. 推荐方式 3:spring配置线程池方式:自定义线程工厂bean需要实现ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入bean
    调用execute(Runnable task)方法即可

    1. <bean id="userThreadPool"
    2. class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    3. <property name="corePoolSize" value="10" />
    4. <property name="maxPoolSize" value="100" />
    5. <property name="queueCapacity" value="2000" />
    6. <property name="threadFactory" value= threadFactory />
    7. <property name="rejectedExecutionHandler">
    8. <ref local="rejectedExecutionHandler" />
    9. </property>
    10. </bean>
    11. //in code
    12. userThreadPool.execute(thread);

发表评论

表情:
评论列表 (有 0 条评论,77人围观)

还没有评论,来说两句吧...

相关阅读