【半原创】springboot 1.X 集成Quartz.
springboot 1.X 集成Quartz.
〇, 个人理解,欢迎指正
一,本文主要解决的问题
查找了一些资料相关资料,发现要么写得太肤浅,要么没有解决一些实际问题。故在别人的基础上抄了一抄,梳理了一下有了此文。背景跳过,这里主要介绍使用sring集成Quartz 的代码,使用spring+Quartz的JDBC持久化任务特性:
主要解决的问题:
A.任务涉及到的数据库事务问题 (Quartz的Job任务执行会吞掉Exception不会触发Spring事务的回滚);
B.JobBean 中依赖的springBean的注入(即如何把Quartz及Job纳入Spring容器的管理)
C.有状态任务数据传递:(如针对订单过期的Job,需要传递订单Id等)
二,前置要求
1,在Spring工程中添加 Quartz依赖:Maven为例:
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz --> <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs --> <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.3.0</version>
</dependency>
2,Quartz 的数据库脚本准备,导入数据库中。脚本可在官网下载源码包中包含。
3,Quartz 的配置文件准备:这里贴出我用的,
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
# StdSchedulerFactory使用quartz.properties 创建一个Quartz Scheduler实例
# 参数请参考:http://www.quartz-scheduler.org/documentation/quartz-2.x/configuration/
#
# Quartz提供两种基本作业存储类型
# --->第一种类型叫做RAMJobStore:
# 最佳的性能,因为内存中数据访问最快
# 不足之处是缺乏数据的持久性,当程序路途停止或系统崩溃时,所有运行的信息都会丢失
# --->第二种类型叫做JDBC作业存储:
# 通过调整其quartz.properties属性文件,持久化任务调度信息
# 使用数据库保存任务调度信息后,即使系统崩溃后重新启动,任务的调度信息将得到恢复
#
#============================================================================
# 基础配置
#============================================================================
# 设置调度器的实例名(instanceName) 和实例ID (instanceId)
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
#如果使用集群,instanceId必须唯一,设置成AUTO
org.quartz.scheduler.instanceId = AUTO
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
#============================================================================
# 调度器线程池配置
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
# 指定多少个工作者线程被创建用来处理 Job
org.quartz.threadPool.threadCount: 10
# 设置工作者线程的优先级(最大值10,最小值1,常用值5)
org.quartz.threadPool.threadPriority: 5
# 加载任务代码的ClassLoader是否从外部继承
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
#============================================================================
# Configure JobStore 作业存储配置
#============================================================================
# 默认配置,数据保存到内存(调度程序信息是存储在被分配给JVM的内存里面,运行速度快)
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
# 持久化配置(存储方式使用JobStoreTX,也就是数据库)
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#使用自己的配置文件
org.quartz.jobStore.useProperties:false
#数据库中quartz表的表名前缀 这里可能需要根据需要修改
org.quartz.jobStore.tablePrefix:qrtz_
org.quartz.jobStore.dataSource:qzDS
#是否使用集群(如果项目只部署到 一台服务器,就不用了)
org.quartz.jobStore.isClustered = true
#============================================================================
# Configure Datasources 配置数据源
#============================================================================
org.quartz.dataSource.qzDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL:jdbc:mysql://localhost/test?useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false
org.quartz.dataSource.qzDS.user:root
org.quartz.dataSource.qzDS.password:123456
org.quartz.dataSource.qzDS.maxConnections:10
三.核心代码(为groovy代码,java版请相应修改)
1 AutowireBeanFactory类
使用注解@Component定义为Spring的bean. 继承的AdaptableJobFactory是Spring专门为集成Quartz提供的Job工厂,所有的JobBean都是这个工厂创建的。Override的方法很简单,做了一件事:就是把才建立的JobBean使用AbstractAutowireCapableBeanFactory 的autowire方法 将JobBean中的相应依赖进行注入。(bean 的域需要注解@Autowire)
package XXXX
import org.quartz.spi.TriggerFiredBundle
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory
import org.springframework.context.annotation.Profile
import org.springframework.scheduling.quartz.AdaptableJobFactory
import org.springframework.stereotype.Component
@Component
class AutowireBeanJobFactory extends AdaptableJobFactory {
/**
* AutowireCapableBeanFactory接口是BeanFactory的子类
* 可以连接和填充那些生命周期不被Spring管理的已存在的bean实例
* 具体请参考:http://blog.csdn.net/iycynna_123/article/details/52993542
*/
@Autowired
AbstractAutowireCapableBeanFactory capableBeanFactory
/**
*
* @param bundle
* @return
* @throws Exception
*/
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
def obj = super.createJobInstance(bundle)
capableBeanFactory.autowireBean(obj)
return obj
}
}
2 QuartzConfig类
这是配置类, 需要在Spring工程中声明为@Bean,作为配置的入口。
做了2件事:使用类路径加载xml 配置文件储存为Properties , 定义了Quartz JDBC连接需要的连接池。
package XXX
import com.zaxxer.hikari.HikariDataSource
import org.quartz.Scheduler
import org.quartz.ee.servlet.QuartzInitializerListener
import org.quartz.spi.JobFactory
import org.springframework.beans.BeansException
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.config.PropertiesFactoryBean
import org.springframework.beans.factory.support.BeanDefinitionRegistry
import org.springframework.beans.factory.support.DefaultListableBeanFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.ApplicationContextAware
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Profile
import org.springframework.core.env.Environment
import org.springframework.core.io.ClassPathResource
import org.springframework.scheduling.quartz.SchedulerFactoryBean
import javax.annotation.PostConstruct
import javax.sql.DataSource
class QuartzConfig {
String projectNameSpace
Properties quartzProperties
DataSource dataSource
QuartzConfig(String projectNameSpace, String configClasspathLocation) {
this.projectNameSpace = projectNameSpace
initProperty(configClasspathLocation)
initDataSource()
}
/**
* 初始化property
*/
void initProperty(String location){
def factory = new PropertiesFactoryBean()
factory.setLocation(new ClassPathResource(location))
factory.afterPropertiesSet()
this.quartzProperties = factory.getObject()
}
/**
* 定义dataSource
*/
void initDataSource(){
def property = this.quartzProperties
def dataSource = new HikariDataSource()
dataSource.setDriverClassName(property.getProperty("org.quartz.dataSource.qzDS.driver"))
dataSource.setJdbcUrl(property.getProperty("org.quartz.dataSource.qzDS.URL"))
dataSource.setUsername(property.getProperty("org.quartz.dataSource.qzDS.user"))
dataSource.setPassword(property.getProperty("org.quartz.dataSource.qzDS.password"))
this.dataSource = dataSource
}
}
3 JobReflection 类
从JobBean中反射得到数据域的值,得到JobDataMap。该Map的hashCode()方法用于有状态的JobBean定义时各个Job名字的区分,Job
package com.XXX.quartz
import org.quartz.JobDataMap
import org.springframework.scheduling.quartz.QuartzJobBean
import java.beans.Introspector
import java.beans.PropertyDescriptor
/**
* jobBean 的设置
*/
class JobReflection {
//这里的是在JobDetail 里面的data定义的几种数据
private final static List<Class<?>> desireClass = [Long.class, Float.class,
Double.class, String.class,
Boolean.class, Integer.class,
long.class, float.class,
double.class,int.class]
/**
* 解析出jobBean 的DataMap
* @param jobBean
* @return
*/
static JobDataMap getJobDetailData(AbstractQuartzJobBean jobBean){
def clazz = jobBean.getClass()
def result = new JobDataMap()
def propertyMap =getPropertyDescriptorMap(clazz)
propertyMap.each {key,value->
def objValue = value.getReadMethod().invoke(jobBean)
result.put(key, objValue)
}
return result
}
/**
*
* @param clazz
* @return
*/
private static Map<String, PropertyDescriptor> getPropertyDescriptorMap(Class<?> clazz){
def map = [:]
def propertyDescriptors = Introspector.getBeanInfo(clazz).getPropertyDescriptors()
propertyDescriptors.each {
def fieldClass = it.getPropertyType()
if(desireClass.contains(fieldClass)){
map.put(it.getName(), it)
}
}
return map
}
}
4 TaskRegistry类
注册动态任务的类,这里只给出了一次性任务的注册方法,其他的类似。
package com.XXX.quartz
import org.quartz.CronScheduleBuilder
import org.quartz.JobBuilder
import org.quartz.JobDataMap
import org.quartz.ObjectAlreadyExistsException
import org.quartz.Scheduler
import org.quartz.TriggerBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.scheduling.quartz.QuartzJobBean
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.LocalDateTime
/**
* 自定义任务注册中心
*/
class TaskRegistry {
private Scheduler scheduler
private String projectNameSpace
private static final Logger logger = LoggerFactory.getLogger(TaskRegistry.class)
public static final String DEFAULT_GROUP = "defaultGroup"
TaskRegistry(Scheduler scheduler,String projectNameSpace) {
this.scheduler = scheduler
this.projectNameSpace = projectNameSpace
}
/**
* 注册一次性的bean
* @param jobBean
* @param jobRunTime
* @return true if success ,or false if conflict task.
*/
Boolean registerOnceJob(AbstractQuartzJobBean jobBean, LocalDateTime jobRunTime){
def jobDataMap = JobReflection.getJobDetailData(jobBean)
def taskName = projectNameSpace+"_"+jobBean.getClass().getName()+jobDataMap.hashCode()
def groupName = getGroupName(jobBean)
def jobDetail = JobBuilder.newJob(jobBean.getClass())
.withIdentity(taskName, groupName)
.usingJobData(jobDataMap)
.build()
def scheduleBuilder = CronScheduleBuilder.cronSchedule(getCronExpression(jobRunTime))
def trigger = TriggerBuilder.newTrigger()
.withIdentity(taskName, groupName)
.withSchedule(scheduleBuilder)
.build()
try{
scheduler.scheduleJob(jobDetail, trigger)
scheduler.start()
return true
}catch (ObjectAlreadyExistsException e){
logger.warn("重复任务注册:${jobBean.getClass().getName()} ${jobDataMap.toString()}".toString())
return false
}
}
/**
* 转换得到cron
* @param targetTime
* @return
*/
private static String getCronExpression(LocalDateTime targetTime){
String dateFormat="ss mm HH dd MM ? yyyy"
def targetDate = Timestamp.valueOf(targetTime)
SimpleDateFormat sdf = new SimpleDateFormat(dateFormat)
return sdf.format(targetDate)
}
private static String getGroupName(QuartzJobBean jobBean){
if(jobBean instanceof GroupNameAware){
return jobBean.getGroupName()
}else{
return DEFAULT_GROUP
}
}
}
5 QuartzBeans 类
总的配置类:
package com.XXX.quartz
import org.quartz.Scheduler
import org.quartz.ee.servlet.QuartzInitializerListener
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.scheduling.quartz.SchedulerFactoryBean
@Configuration
class QuartzBeans {
@Autowired
QuartzConfig quartzConfig
@Autowired
AutowireBeanJobFactory autowireBeanJobFactory
@Bean
QuartzInitializerListener quartzInitializerListener(){
return new QuartzInitializerListener()
}
@Bean
TaskRegistry taskRegistry(Scheduler scheduler){
new TaskRegistry(scheduler, quartzConfig.projectNameSpace)
}
/**
* 自定义的jobFacotry
*/
@Bean
SchedulerFactoryBean schedulerFactoryBean(){
def b = new SchedulerFactoryBean()
b.with {
setSchedulerName("scheduler_"+quartzConfig.projectNameSpace)
setOverwriteExistingJobs(true)
setStartupDelay(5)
setQuartzProperties(quartzConfig.quartzProperties)
setJobFactory(autowireBeanJobFactory)
setDataSource(quartzConfig.dataSource)
}
return b
}
}
6, AbstractQuartzJobBean类
提供手动的事务管理,这里只catch了RunningTimeException。
package com.XXX.quartz
import org.quartz.JobExecutionContext
import org.quartz.JobExecutionException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.quartz.QuartzJobBean
import org.springframework.transaction.PlatformTransactionManager
import org.springframework.transaction.TransactionDefinition
import org.springframework.transaction.support.DefaultTransactionDefinition
import javax.transaction.TransactionManager
abstract class AbstractQuartzJobBean extends QuartzJobBean {
@Autowired
PlatformTransactionManager manager //根据自己的情况选择事务管理器。
static final Logger logger = LoggerFactory.getLogger(AbstractQuartzJobBean.class)
/**
* 提供事务管理
* @param jobExecutionContext
* @throws JobExecutionException
*/
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
DefaultTransactionDefinition definition = new DefaultTransactionDefinition()
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED)
def transaction = manager.getTransaction(definition)
try{
doExecuteInternal(jobExecutionContext)
manager.commit(transaction)
}catch (RuntimeException e){
logger.error("定时任务[${getClass().getName()}]出错:",e)
manager.rollback(transaction)
throw new JobExecutionException(e)
}
}
/**
* 定时方法
* @param jobExecutionContext
*/
abstract void doExecuteInternal(JobExecutionContext jobExecutionContext)
}
7、GroupNameAware
定义的job类可以实现这个接口自定义自己的名字,否则用默认分组。
package com.pkbigdata.quartz
/**
* 给bean自定义groupName
*/
interface GroupNameAware {
String getGroupName()
}
四,使用方法
1定义配置类:
@Bean
QuartzConfig QuartzConfig(){
new QuartzConfig("projectName", '/config/quartz-config.properties')
}
2定义任务:
需要继承AbstractQuartzJobBean.
package com.XXX.groovy.quartzJob
import com.datacastle.groovy.Constants
import com.datacastle.groovy.service.CommonService
import com.datacastle.groovy.service.UserService
import com.pkbigdata.entity.train.DcTrainOrder
import com.pkbigdata.quartz.AbstractQuartzJobBean
import org.quartz.JobExecutionContext
import org.springframework.beans.factory.annotation.Autowired
/**
* 订单过期任务
*/
class OrderExpireJob extends AbstractQuartzJobBean {
String orderId //自定义数据域
@Autowired
UserService userService //注入需要的service
@Autowired
CommonService commonService
/**
* 定时方法
* @param jobExecutionContext
*/
@Override
void doExecuteInternal(JobExecutionContext jobExecutionContext) {
DcTrainOrder order=userService.findUniqueByProperty(DcTrainOrder.class,"orderId",orderId)
if(order){
boolean isSuccess=commonService.setOrderExpire(order)
if(isSuccess){
userService.writeBusinessSerial(order,Constants.T_BUSINESSTYPE.ORDEREXPIRED.index,Constants.T_BUSINESSTYPE.ORDEREXPIRED.info,true)
logger.info("order.orderId=${order.orderId} 到期,设置状态为EXPIRED")
}else{
logger.info("order.orderId=${order.orderId} 到期,状态为'${order.statusInfo}',跳过。")
}
}
}
}
3,注册:
在需要的地方注入TaskRegistry
然后:
OrderExpireJob job = new OrderExpireJob()
job.orderId = order.orderId
taskRegistry.registerOnceJob(job, LocalDateTime.now().plusMinutes(expireMinute))
五,运行流程分析
个人理解如下:
项目启动,以上定义的组件纳入spring容器管理 —->
当使用TaskRegistry.registerOnceJob 时,注意其代码虽然传入了jobBean的对象,其实对Quartz来说是用Class<? extends Job>注册的,使用usingJobData设置的Job属性。简单数据类型的域的值由反射工具得到,设定为jobDataMap,该任务及相关的Trigger 被Quartz序列化进数据库,等待执行时间的到来 —->
当执行时间到时,Quartz根据className 从数据库反序列化JobBean,其中简单类型的域值由jobDataMap得到,而@Autowire的域通过autowireBeanJobFactory 的方法进行依赖注入(这里我只测试了@Autowire注解,没有试@Resource注解是否有效),被赋值为当前容器对应的组件,最后执行JobBean 的任务方法. —->
在AbstractQuartzJobBean中提供事务包裹,正常执行就提交,RunningTimeException 就回滚。
六,
参考了很多网友的帖子,不一一列出了。
还没有评论,来说两句吧...