【半原创】springboot 1.X 集成Quartz.

冷不防 2022-05-27 10:14 222阅读 0赞

springboot 1.X 集成Quartz.

〇, 个人理解,欢迎指正

一,本文主要解决的问题

查找了一些资料相关资料,发现要么写得太肤浅,要么没有解决一些实际问题。故在别人的基础上抄了一抄,梳理了一下有了此文。背景跳过,这里主要介绍使用sring集成Quartz 的代码,使用spring+Quartz的JDBC持久化任务特性:

主要解决的问题:

A.任务涉及到的数据库事务问题 (Quartz的Job任务执行会吞掉Exception不会触发Spring事务的回滚);

B.JobBean 中依赖的springBean的注入(即如何把Quartz及Job纳入Spring容器的管理)

C.有状态任务数据传递:(如针对订单过期的Job,需要传递订单Id等)

二,前置要求

1,在Spring工程中添加 Quartz依赖:Maven为例:

  1. <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz --> <dependency>
  2. <groupId>org.quartz-scheduler</groupId>
  3. <artifactId>quartz</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>
  6. <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs --> <dependency>
  7. <groupId>org.quartz-scheduler</groupId>
  8. <artifactId>quartz-jobs</artifactId>
  9. <version>2.3.0</version>
  10. </dependency>

2,Quartz 的数据库脚本准备,导入数据库中。脚本可在官网下载源码包中包含。

3,Quartz 的配置文件准备:这里贴出我用的,

  1. # Default Properties file for use by StdSchedulerFactory
  2. # to create a Quartz Scheduler Instance, if a different
  3. # properties file is not explicitly specified.
  4. #
  5. # StdSchedulerFactory使用quartz.properties 创建一个Quartz Scheduler实例
  6. # 参数请参考:http://www.quartz-scheduler.org/documentation/quartz-2.x/configuration/
  7. #
  8. # Quartz提供两种基本作业存储类型
  9. # --->第一种类型叫做RAMJobStore:
  10. # 最佳的性能,因为内存中数据访问最快
  11. # 不足之处是缺乏数据的持久性,当程序路途停止或系统崩溃时,所有运行的信息都会丢失
  12. # --->第二种类型叫做JDBC作业存储:
  13. # 通过调整其quartz.properties属性文件,持久化任务调度信息
  14. # 使用数据库保存任务调度信息后,即使系统崩溃后重新启动,任务的调度信息将得到恢复
  15. #
  16. #============================================================================
  17. # 基础配置
  18. #============================================================================
  19. # 设置调度器的实例名(instanceName) 和实例ID (instanceId)
  20. org.quartz.scheduler.instanceName: DefaultQuartzScheduler
  21. #如果使用集群,instanceId必须唯一,设置成AUTO
  22. org.quartz.scheduler.instanceId = AUTO
  23. org.quartz.scheduler.rmi.export: false
  24. org.quartz.scheduler.rmi.proxy: false
  25. org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
  26. #============================================================================
  27. # 调度器线程池配置
  28. #============================================================================
  29. org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
  30. # 指定多少个工作者线程被创建用来处理 Job
  31. org.quartz.threadPool.threadCount: 10
  32. # 设置工作者线程的优先级(最大值10,最小值1,常用值5)
  33. org.quartz.threadPool.threadPriority: 5
  34. # 加载任务代码的ClassLoader是否从外部继承
  35. org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
  36. org.quartz.jobStore.misfireThreshold: 60000
  37. #============================================================================
  38. # Configure JobStore 作业存储配置
  39. #============================================================================
  40. # 默认配置,数据保存到内存(调度程序信息是存储在被分配给JVM的内存里面,运行速度快)
  41. #org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
  42. # 持久化配置(存储方式使用JobStoreTX,也就是数据库)
  43. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
  44. org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
  45. #使用自己的配置文件
  46. org.quartz.jobStore.useProperties:false
  47. #数据库中quartz表的表名前缀 这里可能需要根据需要修改
  48. org.quartz.jobStore.tablePrefix:qrtz_
  49. org.quartz.jobStore.dataSource:qzDS
  50. #是否使用集群(如果项目只部署到 一台服务器,就不用了)
  51. org.quartz.jobStore.isClustered = true
  52. #============================================================================
  53. # Configure Datasources 配置数据源
  54. #============================================================================
  55. org.quartz.dataSource.qzDS.driver:com.mysql.jdbc.Driver
  56. org.quartz.dataSource.qzDS.URL:jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
  57. org.quartz.dataSource.qzDS.user:root
  58. org.quartz.dataSource.qzDS.password:123456
  59. org.quartz.dataSource.qzDS.maxConnections:10

三.核心代码(为groovy代码,java版请相应修改)

1 AutowireBeanFactory类

使用注解@Component定义为Spring的bean. 继承的AdaptableJobFactory是Spring专门为集成Quartz提供的Job工厂,所有的JobBean都是这个工厂创建的。Override的方法很简单,做了一件事:就是把才建立的JobBean使用AbstractAutowireCapableBeanFactory 的autowire方法 将JobBean中的相应依赖进行注入。(bean 的域需要注解@Autowire)

  1. package XXXX
  2. import org.quartz.spi.TriggerFiredBundle
  3. import org.springframework.beans.factory.annotation.Autowired
  4. import org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory
  5. import org.springframework.context.annotation.Profile
  6. import org.springframework.scheduling.quartz.AdaptableJobFactory
  7. import org.springframework.stereotype.Component
  8. @Component
  9. class AutowireBeanJobFactory extends AdaptableJobFactory {
  10. /**
  11. * AutowireCapableBeanFactory接口是BeanFactory的子类
  12. * 可以连接和填充那些生命周期不被Spring管理的已存在的bean实例
  13. * 具体请参考:http://blog.csdn.net/iycynna_123/article/details/52993542
  14. */
  15. @Autowired
  16. AbstractAutowireCapableBeanFactory capableBeanFactory
  17. /**
  18. *
  19. * @param bundle
  20. * @return
  21. * @throws Exception
  22. */
  23. @Override
  24. protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
  25. def obj = super.createJobInstance(bundle)
  26. capableBeanFactory.autowireBean(obj)
  27. return obj
  28. }
  29. }

2 QuartzConfig类

这是配置类, 需要在Spring工程中声明为@Bean,作为配置的入口。

做了2件事:使用类路径加载xml 配置文件储存为Properties , 定义了Quartz JDBC连接需要的连接池。

  1. package XXX
  2. import com.zaxxer.hikari.HikariDataSource
  3. import org.quartz.Scheduler
  4. import org.quartz.ee.servlet.QuartzInitializerListener
  5. import org.quartz.spi.JobFactory
  6. import org.springframework.beans.BeansException
  7. import org.springframework.beans.factory.annotation.Autowired
  8. import org.springframework.beans.factory.config.PropertiesFactoryBean
  9. import org.springframework.beans.factory.support.BeanDefinitionRegistry
  10. import org.springframework.beans.factory.support.DefaultListableBeanFactory
  11. import org.springframework.context.ApplicationContext
  12. import org.springframework.context.ApplicationContextAware
  13. import org.springframework.context.annotation.Bean
  14. import org.springframework.context.annotation.Configuration
  15. import org.springframework.context.annotation.Profile
  16. import org.springframework.core.env.Environment
  17. import org.springframework.core.io.ClassPathResource
  18. import org.springframework.scheduling.quartz.SchedulerFactoryBean
  19. import javax.annotation.PostConstruct
  20. import javax.sql.DataSource
  21. class QuartzConfig {
  22. String projectNameSpace
  23. Properties quartzProperties
  24. DataSource dataSource
  25. QuartzConfig(String projectNameSpace, String configClasspathLocation) {
  26. this.projectNameSpace = projectNameSpace
  27. initProperty(configClasspathLocation)
  28. initDataSource()
  29. }
  30. /**
  31. * 初始化property
  32. */
  33. void initProperty(String location){
  34. def factory = new PropertiesFactoryBean()
  35. factory.setLocation(new ClassPathResource(location))
  36. factory.afterPropertiesSet()
  37. this.quartzProperties = factory.getObject()
  38. }
  39. /**
  40. * 定义dataSource
  41. */
  42. void initDataSource(){
  43. def property = this.quartzProperties
  44. def dataSource = new HikariDataSource()
  45. dataSource.setDriverClassName(property.getProperty("org.quartz.dataSource.qzDS.driver"))
  46. dataSource.setJdbcUrl(property.getProperty("org.quartz.dataSource.qzDS.URL"))
  47. dataSource.setUsername(property.getProperty("org.quartz.dataSource.qzDS.user"))
  48. dataSource.setPassword(property.getProperty("org.quartz.dataSource.qzDS.password"))
  49. this.dataSource = dataSource
  50. }
  51. }

3 JobReflection 类

从JobBean中反射得到数据域的值,得到JobDataMap。该Map的hashCode()方法用于有状态的JobBean定义时各个Job名字的区分,Job

  1. package com.XXX.quartz
  2. import org.quartz.JobDataMap
  3. import org.springframework.scheduling.quartz.QuartzJobBean
  4. import java.beans.Introspector
  5. import java.beans.PropertyDescriptor
  6. /**
  7. * jobBean 的设置
  8. */
  9. class JobReflection {
  10. //这里的是在JobDetail 里面的data定义的几种数据
  11. private final static List<Class<?>> desireClass = [Long.class, Float.class,
  12. Double.class, String.class,
  13. Boolean.class, Integer.class,
  14. long.class, float.class,
  15. double.class,int.class]
  16. /**
  17. * 解析出jobBean 的DataMap
  18. * @param jobBean
  19. * @return
  20. */
  21. static JobDataMap getJobDetailData(AbstractQuartzJobBean jobBean){
  22. def clazz = jobBean.getClass()
  23. def result = new JobDataMap()
  24. def propertyMap =getPropertyDescriptorMap(clazz)
  25. propertyMap.each {key,value->
  26. def objValue = value.getReadMethod().invoke(jobBean)
  27. result.put(key, objValue)
  28. }
  29. return result
  30. }
  31. /**
  32. *
  33. * @param clazz
  34. * @return
  35. */
  36. private static Map<String, PropertyDescriptor> getPropertyDescriptorMap(Class<?> clazz){
  37. def map = [:]
  38. def propertyDescriptors = Introspector.getBeanInfo(clazz).getPropertyDescriptors()
  39. propertyDescriptors.each {
  40. def fieldClass = it.getPropertyType()
  41. if(desireClass.contains(fieldClass)){
  42. map.put(it.getName(), it)
  43. }
  44. }
  45. return map
  46. }
  47. }

4 TaskRegistry类

注册动态任务的类,这里只给出了一次性任务的注册方法,其他的类似。

  1. package com.XXX.quartz
  2. import org.quartz.CronScheduleBuilder
  3. import org.quartz.JobBuilder
  4. import org.quartz.JobDataMap
  5. import org.quartz.ObjectAlreadyExistsException
  6. import org.quartz.Scheduler
  7. import org.quartz.TriggerBuilder
  8. import org.slf4j.Logger
  9. import org.slf4j.LoggerFactory
  10. import org.springframework.scheduling.quartz.QuartzJobBean
  11. import java.sql.Timestamp
  12. import java.text.SimpleDateFormat
  13. import java.time.LocalDateTime
  14. /**
  15. * 自定义任务注册中心
  16. */
  17. class TaskRegistry {
  18. private Scheduler scheduler
  19. private String projectNameSpace
  20. private static final Logger logger = LoggerFactory.getLogger(TaskRegistry.class)
  21. public static final String DEFAULT_GROUP = "defaultGroup"
  22. TaskRegistry(Scheduler scheduler,String projectNameSpace) {
  23. this.scheduler = scheduler
  24. this.projectNameSpace = projectNameSpace
  25. }
  26. /**
  27. * 注册一次性的bean
  28. * @param jobBean
  29. * @param jobRunTime
  30. * @return true if success ,or false if conflict task.
  31. */
  32. Boolean registerOnceJob(AbstractQuartzJobBean jobBean, LocalDateTime jobRunTime){
  33. def jobDataMap = JobReflection.getJobDetailData(jobBean)
  34. def taskName = projectNameSpace+"_"+jobBean.getClass().getName()+jobDataMap.hashCode()
  35. def groupName = getGroupName(jobBean)
  36. def jobDetail = JobBuilder.newJob(jobBean.getClass())
  37. .withIdentity(taskName, groupName)
  38. .usingJobData(jobDataMap)
  39. .build()
  40. def scheduleBuilder = CronScheduleBuilder.cronSchedule(getCronExpression(jobRunTime))
  41. def trigger = TriggerBuilder.newTrigger()
  42. .withIdentity(taskName, groupName)
  43. .withSchedule(scheduleBuilder)
  44. .build()
  45. try{
  46. scheduler.scheduleJob(jobDetail, trigger)
  47. scheduler.start()
  48. return true
  49. }catch (ObjectAlreadyExistsException e){
  50. logger.warn("重复任务注册:${jobBean.getClass().getName()} ${jobDataMap.toString()}".toString())
  51. return false
  52. }
  53. }
  54. /**
  55. * 转换得到cron
  56. * @param targetTime
  57. * @return
  58. */
  59. private static String getCronExpression(LocalDateTime targetTime){
  60. String dateFormat="ss mm HH dd MM ? yyyy"
  61. def targetDate = Timestamp.valueOf(targetTime)
  62. SimpleDateFormat sdf = new SimpleDateFormat(dateFormat)
  63. return sdf.format(targetDate)
  64. }
  65. private static String getGroupName(QuartzJobBean jobBean){
  66. if(jobBean instanceof GroupNameAware){
  67. return jobBean.getGroupName()
  68. }else{
  69. return DEFAULT_GROUP
  70. }
  71. }
  72. }

5 QuartzBeans 类

总的配置类:

  1. package com.XXX.quartz
  2. import org.quartz.Scheduler
  3. import org.quartz.ee.servlet.QuartzInitializerListener
  4. import org.springframework.beans.factory.annotation.Autowired
  5. import org.springframework.context.annotation.Bean
  6. import org.springframework.context.annotation.Configuration
  7. import org.springframework.scheduling.quartz.SchedulerFactoryBean
  8. @Configuration
  9. class QuartzBeans {
  10. @Autowired
  11. QuartzConfig quartzConfig
  12. @Autowired
  13. AutowireBeanJobFactory autowireBeanJobFactory
  14. @Bean
  15. QuartzInitializerListener quartzInitializerListener(){
  16. return new QuartzInitializerListener()
  17. }
  18. @Bean
  19. TaskRegistry taskRegistry(Scheduler scheduler){
  20. new TaskRegistry(scheduler, quartzConfig.projectNameSpace)
  21. }
  22. /**
  23. * 自定义的jobFacotry
  24. */
  25. @Bean
  26. SchedulerFactoryBean schedulerFactoryBean(){
  27. def b = new SchedulerFactoryBean()
  28. b.with {
  29. setSchedulerName("scheduler_"+quartzConfig.projectNameSpace)
  30. setOverwriteExistingJobs(true)
  31. setStartupDelay(5)
  32. setQuartzProperties(quartzConfig.quartzProperties)
  33. setJobFactory(autowireBeanJobFactory)
  34. setDataSource(quartzConfig.dataSource)
  35. }
  36. return b
  37. }
  38. }

6, AbstractQuartzJobBean类

提供手动的事务管理,这里只catch了RunningTimeException。

  1. package com.XXX.quartz
  2. import org.quartz.JobExecutionContext
  3. import org.quartz.JobExecutionException
  4. import org.slf4j.Logger
  5. import org.slf4j.LoggerFactory
  6. import org.springframework.beans.factory.annotation.Autowired
  7. import org.springframework.scheduling.quartz.QuartzJobBean
  8. import org.springframework.transaction.PlatformTransactionManager
  9. import org.springframework.transaction.TransactionDefinition
  10. import org.springframework.transaction.support.DefaultTransactionDefinition
  11. import javax.transaction.TransactionManager
  12. abstract class AbstractQuartzJobBean extends QuartzJobBean {
  13. @Autowired
  14. PlatformTransactionManager manager //根据自己的情况选择事务管理器。
  15. static final Logger logger = LoggerFactory.getLogger(AbstractQuartzJobBean.class)
  16. /**
  17. * 提供事务管理
  18. * @param jobExecutionContext
  19. * @throws JobExecutionException
  20. */
  21. @Override
  22. protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
  23. DefaultTransactionDefinition definition = new DefaultTransactionDefinition()
  24. definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED)
  25. def transaction = manager.getTransaction(definition)
  26. try{
  27. doExecuteInternal(jobExecutionContext)
  28. manager.commit(transaction)
  29. }catch (RuntimeException e){
  30. logger.error("定时任务[${getClass().getName()}]出错:",e)
  31. manager.rollback(transaction)
  32. throw new JobExecutionException(e)
  33. }
  34. }
  35. /**
  36. * 定时方法
  37. * @param jobExecutionContext
  38. */
  39. abstract void doExecuteInternal(JobExecutionContext jobExecutionContext)
  40. }

7、GroupNameAware

定义的job类可以实现这个接口自定义自己的名字,否则用默认分组。

  1. package com.pkbigdata.quartz
  2. /**
  3. * 给bean自定义groupName
  4. */
  5. interface GroupNameAware {
  6. String getGroupName()
  7. }

四,使用方法

1定义配置类:

  1. @Bean
  2. QuartzConfig QuartzConfig(){
  3. new QuartzConfig("projectName", '/config/quartz-config.properties')
  4. }

2定义任务:

需要继承AbstractQuartzJobBean.

  1. package com.XXX.groovy.quartzJob
  2. import com.datacastle.groovy.Constants
  3. import com.datacastle.groovy.service.CommonService
  4. import com.datacastle.groovy.service.UserService
  5. import com.pkbigdata.entity.train.DcTrainOrder
  6. import com.pkbigdata.quartz.AbstractQuartzJobBean
  7. import org.quartz.JobExecutionContext
  8. import org.springframework.beans.factory.annotation.Autowired
  9. /**
  10. * 订单过期任务
  11. */
  12. class OrderExpireJob extends AbstractQuartzJobBean {
  13. String orderId //自定义数据域
  14. @Autowired
  15. UserService userService //注入需要的service
  16. @Autowired
  17. CommonService commonService
  18. /**
  19. * 定时方法
  20. * @param jobExecutionContext
  21. */
  22. @Override
  23. void doExecuteInternal(JobExecutionContext jobExecutionContext) {
  24. DcTrainOrder order=userService.findUniqueByProperty(DcTrainOrder.class,"orderId",orderId)
  25. if(order){
  26. boolean isSuccess=commonService.setOrderExpire(order)
  27. if(isSuccess){
  28. userService.writeBusinessSerial(order,Constants.T_BUSINESSTYPE.ORDEREXPIRED.index,Constants.T_BUSINESSTYPE.ORDEREXPIRED.info,true)
  29. logger.info("order.orderId=${order.orderId} 到期,设置状态为EXPIRED")
  30. }else{
  31. logger.info("order.orderId=${order.orderId} 到期,状态为'${order.statusInfo}',跳过。")
  32. }
  33. }
  34. }
  35. }

3,注册:

在需要的地方注入TaskRegistry

然后:

  1. OrderExpireJob job = new OrderExpireJob()
  2. job.orderId = order.orderId
  3. taskRegistry.registerOnceJob(job, LocalDateTime.now().plusMinutes(expireMinute))

五,运行流程分析

个人理解如下:

项目启动,以上定义的组件纳入spring容器管理 —->

当使用TaskRegistry.registerOnceJob 时,注意其代码虽然传入了jobBean的对象,其实对Quartz来说是用Class<? extends Job>注册的,使用usingJobData设置的Job属性。简单数据类型的域的值由反射工具得到,设定为jobDataMap,该任务及相关的Trigger 被Quartz序列化进数据库,等待执行时间的到来 —->

当执行时间到时,Quartz根据className 从数据库反序列化JobBean,其中简单类型的域值由jobDataMap得到,而@Autowire的域通过autowireBeanJobFactory 的方法进行依赖注入(这里我只测试了@Autowire注解,没有试@Resource注解是否有效),被赋值为当前容器对应的组件,最后执行JobBean 的任务方法. —->

在AbstractQuartzJobBean中提供事务包裹,正常执行就提交,RunningTimeException 就回滚。

六,

参考了很多网友的帖子,不一一列出了。

发表评论

表情:
评论列表 (有 0 条评论,222人围观)

还没有评论,来说两句吧...

相关阅读