elastic job with springboot
集成当当网Elastic Job定时任务分布式调度解决方案, 基于SpringBoot项目风格快速集成,拆箱即用,简单高效。
- 支持JavaBean加yml文件方式自动注册ElasticJob任务;
- 支持基于注解@ElasticJob方式自动注册ElasticJob任务;
- 代码结构中的Apache Curator 暂未完全集成,不影响1.2功能使用;
代码示例
/**
* <pre>
* <ElasticSimpleJob Bean方式注册Elastic Lite Job>
*
* @author: xiehongfei[[email protected]]
* @date: 2018年12月01日 17:11
* @version: V1.0
* @review: xiehongfei[[email protected]]/2018年12月01日 17:11
* {
* "jobName": "ElasticSimpleJobScheduler",
* "jobClass": "com.dusty.boring.demo.job.ElasticSimpleJobScheduler",
* "jobType": "SIMPLE",
* "cron": "0/5 * * * * ?",
* "shardingTotalCount": 1,
* "shardingItemParameters": "1=wechat",
* "jobParameter": "",
* "failover": false,
* "misfire": true,
* "description": "",
* "jobProperties": {
* "job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",
* "executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"
* },
* "monitorExecution": true,
* "maxTimeDiffSeconds": -1,
* "monitorPort": -1,
* "jobShardingStrategyClass": "",
* "reconcileIntervalMinutes": 10,
* "disabled": false,
* "overwrite": false
* }
* </pre>
*/
@Slf4j
@Component
public class ElasticSimpleJob implements SimpleJob {
/**
* 执行作业.
*
* @param shardingContext 分片上下文
*/
@Override
public void execute(ShardingContext shardingContext) {
log.info(
"\n- 开始执行定时任务:{}" +
"\n- 任务详情:{}",
shardingContext.getJobName(), JsonUtils.object2String(shardingContext));
}
}
代码示例
dusty:
elasticjob:
enabled-bean-mode: true #启用Bean作业解析
zk:
servers: localhost:2181 #如多个注册中心,可localhost:2181,host1:2181,host2:2181
namespace: ${spring.application.name}
dusty-simple-jobs:
- job-class: ElasticSimpleJob
corn: 0/30 * * * * ?
shardingTotalCount: 1
shardingItemParameters: 1=wechat
#~~ Elastic Job 配置BEGIN ~~
dusty:
elasticjob:
enabled-bean-mode: false #禁用Bean作业解析
zk:
servers: localhost:2181 #如多个注册中心,可localhost:2181,host1:2181,host2:2181
namespace: ${spring.application.name}
dusty-simple-jobs:
- job-class: ElasticSimpleJob
corn: 0/30 * * * * ?
shardingTotalCount: 1
shardingItemParameters: 1=wechat
#~~ Elastic Job 配置END ~~
@Slf4j
@ElasticJob(jobName = "ElasticJobAnnotationJobDemoScheduler",
jobType = JobType.SIMPLE,
corn = "0 0/6 * * * ?", //6分钟执行一次
shardingTotalCount = 1,
shardingItemParameters = "0=all")
@Component
public class ElasticJobAnnotationJobDemo implements SimpleJob {
/**<pre>
* 执行作业.
*
* @param shardingContext 分片上下文
* </pre>
*/
@Override
public void execute(ShardingContext shardingContext) {
log.info("\n- 开始执行作业:{},\n- 作业详情信息:\n- {}",this.getClass().getCanonicalName(), JsonUtils.object2String(shardingContext));
executeQuartzInternal();
log.info("\n- 完成作业{}执行。", this.getClass().getCanonicalName());
}
/**
* <pre>
* 执行任务
* </pre>
*/
private void executeQuartzInternal() {
}
}
优点:实现简单
缺点:职责结构耦合度偏高
代码示例如下:
/**
* <pre>
*
* <直接由注解@ElasticJob注解注册DataFlowJob>
*
* DataFlow类型Job,采用流式处理机制,fetchData不返回空结果将持续执行作业。
*
* @author xiehongfei[[email protected]]
* @date: 2018年12月08日 11:35
* @version V1.0
* @review: xiehongfei[[email protected]]/2018年12月08日 11:35
* </pre>
*/
@ElasticJob(jobName = "ElasticJobInnerAnnotationDataFlowJobScheduler",
corn = "0/30 * * * * ?",
shardingItemParameters = "0=alipay",
shardingTotalCount = 1,
jobType = JobType.DATAFLOW)
@Slf4j
@Component
public class ElasticJobInnerAnnotationDataFlowJobDemo implements DataflowJob<Foo> {
private ThreadLocal<Integer> LOCAL = ThreadLocal.withInitial(()-> 0);
private ThreadLocal<String> JOB_NAME = ThreadLocal.withInitial(()-> String.format("inner-job-%s", UUID.randomUUID().toString()));
/**
* 获取待处理数据.
*
* @param shardingContext 分片上下文
*
* @return 待处理的数据集合
*/
@Override
public List<Foo> fetchData(ShardingContext shardingContext) {
if (LOCAL.get() > 3) {
log.info("\n-\t {}-任务结束。", JOB_NAME.get());
//重置LOCAL=0,为下次任务配置执行条件。
LOCAL.set(0);
return null;
}
List<Foo> foos = Lists.newArrayList();
for (int i = 0; i < 10; i ++){
foos.add(new Foo());
}
LOCAL.set(LOCAL.get() + 1);
return foos;
}
/**
* 处理数据.
*
* @param shardingContext 分片上下文
* @param data 待处理数据集合
*/
@Override
public void processData(ShardingContext shardingContext, List<Foo> data) {
log.info("\n-\t\t {}-开始处理数据:", JOB_NAME.get());
data.forEach(item-> log.info("\n\t\t {}\t{}", JOB_NAME.get(), JsonUtils.object2String(item)));
}
}
优点:结构清晰
缺点:实现复杂度稍高
代码示例
参加springboot-dusty-elastic-job-demo -> ElasticJobDataFlowJobRegister 及 ElasticJobOuterAnnotationDataFlowJobDemo
访问 elastic job官网 运维平台部署指南,可通过github.com下载elastic-lite-console本地部署。
5.2.1.配置注册中心
DataFlow Job 补充效果图