Spring Boot集成定时任务elastic-job

elastic-job是当当开源的一款分布式定时作业框架。在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService),无论是使用quartz还是spring-task,我们都会至少遇到两个痛点:

  1. 不敢轻易跟着应用服务多节点部署,可能会重复多次执行而引发系统逻辑的错误。
  2. quartz的集群仅仅只是用来HA,节点数量的增加并不能给我们的每次执行效率带来提升,即不能实现水平扩展。

elastic-job在2.x之后,出了两个产品线:Elastic-Job-LiteElastic-Job-Cloud。我们一般使用Elastic-Job-Lite就能够满足需求,本文也是以Elastic-Job-Lite为主。1.x系列对应的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心类名,差别虽大,原理类似,建议使用2.x系列。

elastic-job-lite原理

举个典型的job场景,比如余额宝里的昨日收益,系统需要job在每天某个时间点开始,给所有余额宝用户计算收益。如果用户数量不多,我们可以轻易使用quartz来完成,我们让计息job在某个时间点开始执行,循环遍历所有用户计算利息,这没问题。可是,如果用户体量特别大,我们可能会面临着在第二天之前处理不完这么多用户。另外,我们部署job的时候也得注意,我们可能会把job直接放在我们的webapp里,webapp通常是多节点部署的,这样,我们的job也就是多节点,多个job同时执行,很容易造成重复执行,比如用户重复计息,为了避免这种情况,我们可能会对job的执行加锁,保证始终只有一个节点能执行,或者干脆让job从webapp里剥离出来,独自部署一个节点。

elastic-job就可以帮助我们解决上面的问题,elastic底层的任务调度还是使用的quartz,通过zookeeper来动态给job节点分片

很大体量的用户需要在特定的时间段内计息完成

我们肯定是希望我们的任务可以通过集群达到水平扩展,集群里的每个节点都处理部分用户,不管用户数量有多庞大,我们只要增加机器就可以了,比如单台机器特定时间能处理n个用户,2台机器处理2n个用户,3台3n,4台4n...,再多的用户也不怕了。

使用elastic-job开发的作业都是zookeeper的客户端,比如我希望3台机器跑job,我们将任务分成3片,框架通过zk的协调,最终会让3台机器分别分配到0,1,2的任务片,比如server0-->0server1-->1server2-->2,当server0执行时,可以只查询id%3==0的用户,server1执行时,只查询id%3==1的用户,server2执行时,只查询id%3==2的用户。

任务部署多节点引发重复执行

在上面的基础上,我们再增加server3,此时,server3分不到任务分片,因为只有3片,已经分完了。没有分到任务分片的作业程序将不执行。
如果此时server2挂了,那么server2的分片项会分配给server3server3有了分片,就会替代server2执行。
如果此时server3也挂了,只剩下server0server1了,框架也会自动把server3的分片随机分配给server0或者server1,可能会这样,server0-->0server1-->1,2

这种特性称之为弹性扩容,即elastic-job名称的由来。

SpringBoot集成elastic-job

Maven依赖

<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-spring -->
<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-spring</artifactId>
    <version>2.1.5</version>
</dependency>

配置ZooKeeper注册中心

import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 注册中心配置
 * 用于注册和协调作业分布式行为的组件,目前仅支持Zookeeper
 */
@Configuration
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class RegistryCenterConfig {

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace) {
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
    }

}

持久化配置

import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import javax.sql.DataSource;

/**
 * 开启事件追踪器
 * 如果想把作业运行的内容写到DB中,我们需要用到另一个构造器
 * 同时定义自己的JobEventConfiguration
 * 目前来说实现这个接口的只有一个类JobEventRdbConfiguration
 * 通过这个可以将作业运行的痕迹进行持久化到DB的操作
 */
@Configuration
public class JobEventConfig {
    @Resource
    private DataSource dataSource;

    @Bean
    public JobEventConfiguration jobEventConfiguration() {
        return new JobEventRdbConfiguration(dataSource);
    }
}

配置文件

配置ZooKeeper和Elastic Job的执行时间、分片数、和定义分片参即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码

application.properties

regCenter.serverList=127.0.0.1:2181  
regCenter.namespace=elastic-job  

simpleJob.cron=0/5 * * * * ?  
simpleJob.shardingTotalCount=3  
simpleJob.shardingItemParameters=0=A,1=B,2=C

application.yml

regCenter:
  serverList: 127.0.0.1:2181  
  namespace: elastic-job

simpleJob:
  #每隔5秒执行一次
  cron: 0/5 * * * * ?
  shardingTotalCount: 3
  shardingItemParameters: 0=A,1=B,2=C

dataflowJob:
  #每天凌晨1点执行一次
  cron: 0 0 1 * * ?
  shardingTotalCount: 3
  shardingItemParameters: 0=A,1=B,2=C

作业配置

elastic-job提供了三种类型的作业:Simple类型作业、Dataflow类型作业、Script类型作业。这里主要讲解前两者。Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本,使用不多,可以参见github文档。

简单Simple类型作业配置

SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似。

(1)任务执行类

public class SpringSimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        /**
         * 实际开发中,有了任务总片数和当前分片项,就可以对任务进行分片执行了
         * 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
         */
        System.out.println(String.format("Thread ID: %s, 任务总片数: %s, 当前分片项: %s, 分片参数: %s",
                Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter()));
    }
}

(2)任务配置类

import cn.appblog.springboot.model.SpringSimpleJob;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 简单任务配置
 */
@Configuration
public class SimpleJobConfig {
    // 注册中心配置
    @Resource
    private ZookeeperRegistryCenter regCenter;

    // 将作业运行的痕迹进行持久化到DB的操作配置
    @Resource
    private JobEventConfiguration jobEventConfiguration;

    @Bean
    public SimpleJob simpleJob() {
        return new SpringSimpleJob();
    }

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final SimpleJob simpleJob, @Value("${simpleJob.cron}") final String cron,
                                           @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
                                           @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(simpleJob, regCenter, getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
    }

    /*
     * 作业配置
     * 作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration
     * LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套
     * JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和ScriptJobConfiguration
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
    }
}

(3)任务执行结果

Thread ID: 151, 任务总片数: 3, 当前分片项: 0, 分片参数: A
Thread ID: 152, 任务总片数: 3, 当前分片项: 1, 分片参数: B
Thread ID: 153, 任务总片数: 3, 当前分片项: 2, 分片参数: C
Thread ID: 101, 任务总片数: 3, 当前分片项: 2, 分片参数: C
Thread ID: 154, 任务总片数: 3, 当前分片项: 0, 分片参数: A
Thread ID: 155, 任务总片数: 3, 当前分片项: 1, 分片参数: B

流式任务配置

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
可通过DataflowJobConfiguration配置是否流式处理。

流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

(1)任务执行类

import cn.appblog.springboot.dao.entity.User;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * 流式任务类型:业务实现两个接口 - 抓取(fetchData)和处理(processData)数据
 *  a.流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去
 *  b.非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业
 */
@Slf4j
public class SpringDataflowJob implements DataflowJob<User> {
    @Override
    public List<User> fetchData(ShardingContext shardingContext) {
        log.info(String.format("Thread ID: %s, 任务总片数: %s, 当前分片项: %s, 分片参数: %s",
                Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter()));
        List<User> userList = null;
        /**
         * users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
         */
        return userList;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<User> userList) {
        if (userList != null && userList.size() > 0) {
            for (User user : userList) {
                log.info(String.format("用户 %s 开始计息", user.getName()));
            }
        }
    }
}

(2)任务配置类

import cn.appblog.springboot.model.SpringDataflowJob;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.event.JobEventConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 流式任务配置
 */
@Configuration
public class DataflowJobConfig {
    // 注册中心配置
    @Resource
    private ZookeeperRegistryCenter regCenter;

    // 将作业运行的痕迹进行持久化到DB的操作配置
    @Resource
    private JobEventConfiguration jobEventConfiguration;

    @Bean
    public DataflowJob dataflowJob() {
        return new SpringDataflowJob();
    }

    //@Bean(initMethod = "init")
    public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron,
                                             @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
                                             @Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
        return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
    }

    /*
     * 作业配置
     * 作业配置分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration
     * LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,层层嵌套
     * JobTypeConfiguration根据不同实现类型分为SimpleJobConfiguration,DataflowJobConfiguration和ScriptJobConfiguration
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
                jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
    }
}

elastic-job的功能集还不止这些,比如像作业事件追踪、任务监听等,另外,elastic-job-lite-console作为一个独立的运维平台还提供了用来查询和操作任务的web页面。

版权声明:
作者:Joe.Ye
链接:https://www.appblog.cn/index.php/2023/04/01/spring-boot-integrate-timed-task-elastic-job/
来源:APP全栈技术分享
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
打赏
海报
Spring Boot集成定时任务elastic-job
elastic-job是当当开源的一款分布式定时作业框架。在这之前,我们开发定时任务一般都是使用quartz或者spring-task(ScheduledExecutorService),无论是使用qu……
<<上一篇
下一篇>>
文章目录
关闭
目 录