elasticjob接入方式和管理端使用 Dear 丶 2022-06-06 21:12 268阅读 0赞 ## 1.elasticjob接入方式 ## ### 1.1 加入依赖 ### <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency> ### 1.2 实现任务逻辑 ### #### 1.2.1 spring的方式 #### **通过实现SimpleJob接口,最终会定时调用execute方法** package com.company.job; /** * quartz-memory-project Created by caowenyi on 2017/9/27 . */ @Slf4j public class SampleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { Stopwatch stopwatch = Stopwatch.createStarted(); log.info("开始Sample任务的运行"); String jobParameter = shardingContext.getJobParameter(); long callCostTime = stopwatch.elapsed(TimeUnit.SECONDS); log.info("结束Sample任务的运行 任务参数={} 花费时间={}秒", jobParameter, callCostTime); } } **配置任务的zookeeper注册中心:applicationContext-zk.xml** 参数说明: \- zk.hosts: 具体环境对应的zookeeper服务器地址。 * zk.namespace:命名空间,用于在管理端区分不同的也业务方的不同环境。建议命令方式:业务方\_具体环境。 如:crm\_bi\_task * base-sleep-time-milliseconds、max-sleep-time-milliseconds、max-retries:设置client与zookeeper连接丢失时,进行重连的策略。 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 。时间间隔 = baseSleepTimeMs \* Math.max(1, random.nextInt(1 << (retryCount + 1))) <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.xxxx.com/schema/ddframe/reg" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.xxxx.com/schema/ddframe/reg http://www.xxxx.com/schema/ddframe/reg/reg.xsd"> <reg:zookeeper id="regCenter" server-lists="${zk.hosts}" namespace="${zk.namespace}" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3"/> </beans> **配置任务的调度信息:applicationContext-job.xml** 参数说明: \- id:bean的名字 \- registry-center-ref:注册中心bean \- cron: 定时调度的cron表达式 \- sharding-total-count:任务切片数量。一般只需要设置为1 \- sharding-item-parameters:任务分片参数。使用0=A即可。 \- description:任务的用途描述 \- event-trace-rdb-data-source:数据源bean对象,将任务的执行记录放到数据库中。如果不需要看任务的执行历史结果可以不用设置该值。注意:线上环境需要先建表,因为job应用一般没有建表的权限,测试环境一般可以自动建表。 \- class:任务执行类 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:job="http://www.xxxx.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.xxxx.com/schema/ddframe/job http://www.xxxx.com/schema/ddframe/job/job.xsd"> <job:simple id="sampleJob" registry-center-ref="regCenter" cron="0 30 * * * ?" sharding-total-count="1" sharding-item-parameters="0=A" description="demo任务" event-trace-rdb-data-source="dataSource" class="com.company.job.SampleJob"/> </beans> 最后将两个job加入到spring的beanfactory. **spring工程加入方式** <context-param> <param-name>contextConfigLocation</param-name> <param-value> classpath*:applicationContext-job.xml classpath*:applicationContext-zk.xml </param-value> </context-param> **springboot工程加入方式** package com.company.job; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.ImportResource; /** * Created by caowenyi on 2017/7/28. */ @Configuration @ImportResource(locations = { "classpath:applicationContext-zk.xml", "classpath:applicationContext-job.xml"}) public class JobConfig { } ps:后续开发新的job只需要实现SimpleJob接口、将job的配置信息加入xx-job.xml中 ## 2 非spring方式接入 ## package com.dangdang.job; import com.alibaba.druid.pool.DruidDataSource; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.event.JobEventConfiguration; import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import lombok.extern.slf4j.Slf4j; /** * Created by caowenyi on 2017/7/11. */ @Slf4j public class Main { private static final String zkHosts = "192.168.5.59:2181,192.168.5.61:2181,192.168.5.66:2181"; //private static final String zkHosts = "192.168.16.146:2181,192.168.16.147:2181,192.168.16.148:2181"; private static final String zkNameSpace = "zk-elastic-job"; public static void main(String[] args) { log.info("Main starting .........................."); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration( JobCoreConfiguration.newBuilder("simpleElasticJob", "* 0/30 * * * ?", 3) .shardingItemParameters("0=A,1=B,2=C").failover(true).build(), "com.dangdang.job.MySimpleJob"); LiteJobConfiguration simpleJobRootConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build(); DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(JobCoreConfiguration.newBuilder("dataFlowElasticJob", "0/10 * * * * ?", 3) .shardingItemParameters("0=A,1=B,2=C").build(), "com.dangdang.job.MyDataFlowJob", false); LiteJobConfiguration dataflowJobRootConfiguration = LiteJobConfiguration.newBuilder(dataflowJobConfiguration).build(); ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zkHosts, zkNameSpace); zookeeperConfiguration.setBaseSleepTimeMilliseconds(1000); zookeeperConfiguration.setMaxSleepTimeMilliseconds(3000); zookeeperConfiguration.setMaxRetries(3); CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); coordinatorRegistryCenter.init(); DruidDataSource dataSource = new DruidDataSource(); dataSource.setName("localdb"); dataSource.setUrl( "jdbc:mysql://localhost:3306/elastic_job?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8"); dataSource.setUsername("cweeyii"); dataSource.setPassword("cweeyii"); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); JobEventConfiguration jobEventConfiguration = new JobEventRdbConfiguration(dataSource); ElasticJobListener jobListener = new MyJobAllShardListener(); new JobScheduler(coordinatorRegistryCenter, simpleJobRootConfiguration, jobEventConfiguration, jobListener).init(); new JobScheduler(coordinatorRegistryCenter, dataflowJobRootConfiguration, jobEventConfiguration).init(); log.info("Main finished .........................."); } } 流程上总体分为三步: 1. 初始化配置中心(zookeeper) 2. 配置任务执行信息并注册到zookeeper 3. 初始化quartz调度主线程进行调度 ## 3.elasticjob管理端的使用 ## 管理端环境地址:[http://xxxxx:8899/][http_xxxxx_8899] 用户名:root 密码:root 只用于查看任务执行情况,没有操作权限的用户和密码:guest guest 线上环境:[http://xxxxxx:8899/][http_xxxxxx_8899] root权限用户和密码,找悦同申请 guest用户还是可用 ### 3.1 管理端的配置 ### * 首先可以在右上角点击switch language选择为中文 ![image][] * 添加namespace配置 1. 命名空间:配置文件中的zk.namespace 2. 注册中心名称:最好和配置文件中的命名空间一致 3. 注册中心地址:配置文件中的zk.hosts 4. 登录凭证:zookeeper是否设置登录凭证(一般都没有设置) ![image][image 1] * 配置完成点击提交,点击连接,变为已连接状态 * 作业操作选项下,进行作业操作 ![image][image 2] 1. 修改:支持修改调度任务的调度信息、传入参数等 ![image][image 3] 大部分情况下只需要修改:cron表达式和自定义参数 2. 服务器维度 ![image][image 4] 主要提供了:禁止某个服务器不能进行任务的调度,已经服务器状态信息等 3. 添加数据追踪源设置(用于获取数据库中记录的任务执行历史状态) 需要配置了event-trace-rdb-data-source属性 <job:simple id="sampleJob" registry-center-ref="regCenter" cron="0 30 * * * ?" sharding-total-count="1" sharding-item-parameters="0=A" description="demo任务" event-trace-rdb-data-source="dataSource" class="com.company.job.SampleJob"/> ![image][image 5] 1. 查看任务执行历史轨迹 可以查看任务的执行情况,并支持过滤搜索,支持时间排序等 ![image][image 6] 5. 查看历史执行状态(不常用) 主要是查看任务执行的历史运行状态 1. 任务下线 * 首先需要将任务从job服务器中删除(只需要删除xml中任务的配置即可) * 再从elasticjob管理端对应的namespace中删除已经下线的任务 ![image][image 7] ## 4.开发的注意事项 ## 1. 进行任务类名的重命名,需要在xml中新建新的任务id,因为类名是不可更改的 2. 进行管理端的任务调度时,需要先连接到具体namespace [http_xxxxx_8899]: http://xxxxx:8899/ [http_xxxxxx_8899]: http://xxxxxx:8899/ [image]: http://note.youdao.com/yws/api/personal/file/WEBd45444cdcc3cb546b93c253be22f48d0?method=download&shareKey=64c35dc49ecc1512ce072aedd0b5fefe [image 1]: http://note.youdao.com/yws/api/personal/file/WEBea2fc421a0cf7673bcc6fd6d3d40ccb0?method=download&shareKey=ce92bdc6fef8498ed286494e275ded2b [image 2]: http://note.youdao.com/yws/api/personal/file/WEB5dae31f8f701c8f348a156568c3168ed?method=download&shareKey=2bf49757f9454a247187735485575a18 [image 3]: http://note.youdao.com/yws/api/personal/file/WEB39667a3865abfdc83b06fb8d8e2b7e26?method=download&shareKey=1d4dbf0d0eff2be88d26bddb6d026904 [image 4]: http://note.youdao.com/yws/api/personal/file/WEB376a158aaa717af9e9a47890535a764f?method=download&shareKey=e5a40d46a7ccdc11840918b86b18712b [image 5]: http://note.youdao.com/yws/api/personal/file/WEB32a33ddb4165b87e18d7ba2273dbd593?method=download&shareKey=142ae50992b21b9a9eb930677daec28e [image 6]: http://note.youdao.com/yws/api/personal/file/WEB2ed5cfc6e9e553df1620cab469a173a7?method=download&shareKey=c8097ddde39c5f8df0fe72c930c8a9a2o [image 7]: http://note.youdao.com/yws/api/personal/file/WEBa27e8e66540056defe095b1e49d24e41?method=download&shareKey=51507d13b1cc2295df16541a2da5f85c
还没有评论,来说两句吧...