Spring Boot集成定时任务elastic-job
elastic-job是当当开源的一款分布式定时作业框架。在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService
),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:
- 不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
- quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。
elastic-job在2.x之后,出了两个产品线:Elastic-Job-Lite
和Elastic-Job-Cloud
。我们一般使用Elastic-Job-Lite
就能够满足需求,本文也是以Elastic-Job-Lite
为主。1.x系列对应的就只有Elastic-Job-Lite
,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。
elastic-job-lite原理
举个典型的job场景,比如余额宝里的昨日收益,系统需要job在每天某个时间点开始,给所有余额宝用户计算收益。如果用户数量不多,我们可以轻易使用quartz来完成,我们让计息job在某个时间点开始执行,循环遍历所有用户计算利息,这没问题。可是,如果用户体量特别大,我们可能会面临着在第二天之前处理不完这么多用户。另外,我们部署job的时候也得注意,我们可能会把job直接放在我们的webapp里,webapp通常是多节点部署的,这样,我们的job也就是多节点,多个job同时执行,很容易造成重复执行,比如用户重复计息,为了避免这种情况,我们可能会对job的执行加锁,保证始终只有一个节点能执行,或者干脆让job从webapp里剥离出来,独自部署一个节点。
elastic-job就可以帮助我们解决上面的问题,elastic底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片。
很大体量的用户需要在特定的时间段内计息完成
我们肯定是希望我们的任务可以通过集群达到水平扩展,集群里的每个节点都处理部分用户,不管用户数量有多庞大,我们只要增加机器就可以了,比如单台机器特定时间能处理n个用户,2台机器处理2n个用户,3台3n,4台4n...,再多的用户也不怕了。
使用elastic-job开发的作业都是zookeeper的客户端,比如我希望3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配到0,1,2
的任务片,比如server0-->0
,server1-->1
,server2-->2
,当server0
执行时,可以只查询id%3==0
的用户,server1
执行时,只查询id%3==1
的用户,server2
执行时,只查询id%3==2
的用户。
任务部署多节点引发重复执行
在上面的基础上,我们再增加server3
,此时,server3
分不到任务分片,因为只有3片,已经分完了。没有分到任务分片的作业程序将不执行。
如果此时server2
挂了,那么server2
的分片项会分配给server3
,server3
有了分片,就会替代server2
执行。
如果此时server3
也挂了,只剩下server0
和server1
了,框架也会自动把server3
的分片随机分配给server0
或者server1
,可能会这样,server0-->0
,server1-->1,2
。
这种特性称之为弹性扩容,即elastic-job名称的由来。
SpringBoot集成elastic-job
Maven依赖
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-spring -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
配置ZooKeeper注册中心
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 注册中心配置
* 用于注册和协调作业分布式行为的组件,目前仅支持Zookeeper
*/
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}
持久化配置
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import javax.sql.DataSource;
/**
* 开启事件追踪器
* 如果想把作业运行的内容写到DB中,我们需要用到另一个构造器
* 同时定义自己的JobEventConfiguration
* 目前来说实现这个接口的只有一个类JobEventRdbConfiguration
* 通过这个可以将作业运行的痕迹进行持久化到DB的操作
*/
@Configuration
public class JobEventConfig {
@Resource
private DataSource dataSource;
@Bean
public JobEventConfiguration jobEventConfiguration() {
return new JobEventRdbConfiguration(dataSource);
}
}
配置文件
配置ZooKeeper和Elastic Job
的执行时间、分片数、和定义分片参即shardingItemParameter
,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码
application.properties
regCenter.serverList=127.0.0.1:2181
regCenter.namespace=elastic-job
simpleJob.cron=0/5 * * * * ?
simpleJob.shardingTotalCount=3
simpleJob.shardingItemParameters=0=A,1=B,2=C
application.yml
regCenter:
serverList: 127.0.0.1:2181
namespace: elastic-job
simpleJob:
#每隔5秒执行一次
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=A,1=B,2=C
dataflowJob:
#每天凌晨1点执行一次
cron: 0 0 1 * * ?
shardingTotalCount: 3
shardingItemParameters: 0=A,1=B,2=C
作业配置
elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。
简单Simple类型作业配置
SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似。
(1)任务执行类
public class SpringSimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
/**
* 实际开发中,有了任务总片数和当前分片项,就可以对任务进行分片执行了
* 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
*/
System.out.println(String.format("Thread ID: %s, 任务总片数: %s, 当前分片项: %s, 分片参数: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter()));
}
}
(2)任务配置类
import cn.appblog.springboot.model.SpringSimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* 简单任务配置
*/
@Configuration
public class SimpleJobConfig {
// 注册中心配置
@Resource
private ZookeeperRegistryCenter regCenter;
// 将作业运行的痕迹进行持久化到DB的操作配置
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean
public SimpleJob simpleJob() {
return new SpringSimpleJob();
}
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron,
@Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}
/*
* 作业配置
* 作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration
* LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套
* JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和ScriptJobConfiguration
*/
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}
(3)任务执行结果
Thread ID: 151, 任务总片数: 3, 当前分片项: 0, 分片参数: A
Thread ID: 152, 任务总片数: 3, 当前分片项: 1, 分片参数: B
Thread ID: 153, 任务总片数: 3, 当前分片项: 2, 分片参数: C
Thread ID: 101, 任务总片数: 3, 当前分片项: 2, 分片参数: C
Thread ID: 154, 任务总片数: 3, 当前分片项: 0, 分片参数: A
Thread ID: 155, 任务总片数: 3, 当前分片项: 1, 分片参数: B
流式任务配置
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
(1)任务执行类
import cn.appblog.springboot.dao.entity.User;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* 流式任务类型:业务实现两个接口 - 抓取(fetchData)和处理(processData)数据
* a.流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去
* b.非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业
*/
@Slf4j
public class SpringDataflowJob implements DataflowJob<User> {
@Override
public List<User> fetchData(ShardingContext shardingContext) {
log.info(String.format("Thread ID: %s, 任务总片数: %s, 当前分片项: %s, 分片参数: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter()));
List<User> userList = null;
/**
* users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
*/
return userList;
}
@Override
public void processData(ShardingContext shardingContext, List<User> userList) {
if (userList != null && userList.size() > 0) {
for (User user : userList) {
log.info(String.format("用户 %s 开始计息", user.getName()));
}
}
}
}
(2)任务配置类
import cn.appblog.springboot.model.SpringDataflowJob;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/**
* 流式任务配置
*/
@Configuration
public class DataflowJobConfig {
// 注册中心配置
@Resource
private ZookeeperRegistryCenter regCenter;
// 将作业运行的痕迹进行持久化到DB的操作配置
@Resource
private JobEventConfiguration jobEventConfiguration;
@Bean
public DataflowJob dataflowJob() {
return new SpringDataflowJob();
}
//@Bean(initMethod = "init")
public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron,
@Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}
/*
* 作业配置
* 作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration
* LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套
* JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和ScriptJobConfiguration
*/
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
}
}
elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面。
版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/04/01/spring-boot-integrate-timed-task-elastic-job/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。
共有 0 条评论