定时任务与分布式定时任务框架XXL-JOB详解
一、为什么需要定时任务?
定时任务在业务场景中非常常见,主要用于以下场景:
-
时间驱动处理:
• 整点发送优惠券
• 每天更新收益
• 每天刷新标签数据和人群数据 -
批量处理数据:
• 按月批量统计报表数据
• 批量更新短信状态
• 实时性要求不高的数据处理 -
异步执行解耦:
• 活动状态刷新
• 数据同步
• 异步执行离线查询,与内部逻辑解耦
二、定时任务的实现方式
定时任务的实现方式有多种,以下是常见的几种:
-
JDK方式:
• 死循环
• Timer定时器
• JUC定时任务 -
Spring Scheduling声明式定时任务:
•@EnableScheduling
•@Scheduled
-
经典定时任务框架:
• Quartz -
分布式定时任务:
• XXL-JOB(基于Quartz)
• Elastic-Job(基于Quartz)
• Saturn(Elastic-Job的Fork版本) -
MQ延时队列
三、各解决方案的优缺点
-
JDK方式及Spring Scheduling:
• 优点:简单易用
• 缺点:不支持高可用,不支持动态配置 -
Quartz:
• 优点:支持高可用
• 缺点:配置复杂(需要10几张表),不支持动态配置 -
XXL-JOB:
• 优点:支持高可用,动态配置,任务统一管理
• 缺点:需要额外部署调度中心
四、分布式定时任务框架XXL-JOB
XXL-JOB是一个分布式任务调度平台,核心设计目标是开发迅速、学习简单、轻量级、易扩展。目前已有多家公司接入,包括大众点评、京东、优信二手车等。
4.1 项目结构
XXL-JOB支持通过Web页面对任务进行CRUD操作,支持动态修改任务状态、暂停/恢复任务,以及终止运行中任务。
源码地址:
• GitHub:https://github.com/xuxueli/xxl-job
• 码云:http://gitee.com/xuxueli0323/xxl-job
项目结构如下:
• xxl-job-admin:调度中心
• xxl-job-core:公共依赖
• xxl-job-executor-samples:执行器示例
• xxl-job-executor-sample-springboot
:SpringBoot版本(推荐)
• xxl-job-executor-sample-spring
:Spring版本
• xxl-job-executor-sample-frameless
:无框架版本
• 其他版本:JFinal、Nutz、jboot等
4.2 设计思想
XXL-JOB将调度行为抽象为“调度中心”,调度中心负责发起调度请求,而任务逻辑由“执行器”处理。调度中心与执行器解耦,提高了系统的稳定性和扩展性。
4.3 部署调度中心
4.3.1 初始化调度数据库
执行tables_xxl_job.sql
脚本初始化数据库。调度中心支持集群部署,集群节点需连接同一个MySQL实例。
4.3.2 修改调度中心配置
配置文件路径:xxl-job/xxl-job-admin/src/main/resources/application.properties
主要配置项:
• 端口号
• JDBC数据源
• 报警邮箱
• 调度中心通讯TOKEN
4.3.3 打包运行调度中心
- 编译打包:
mvn clean package -Dmaven.skip.test=true
- 启动调度中心:
java -jar xxl-job-admin-2.2.0.jar
- 访问调度中心:
http://localhost:8080/xxl-job-admin
默认登录账号:admin/123456
4.3.4 调度中心集群(可选)
调度中心支持集群部署,提升系统的容灾和可用性。集群部署时需保证DB配置一致,机器时钟一致。
4.4 搭建执行器项目
执行器负责接收调度中心的调度并执行任务。可以将执行器集成到现有项目中,也可以直接使用示例项目。
4.4.1 添加XXL-JOB依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.2.0</version>
</dependency>
4.4.2 修改执行器配置
配置文件路径:xxl-job-executor-sample-springboot/src/main/resources/application.properties
主要配置项:
• 调度中心地址
• 执行器AppName
• 执行器端口号
• 日志存储路径
4.4.3 添加执行器配置类
Copy示例工程的配置文件到项目中,无需修改。
4.4.4 给执行器添加任务
@Component
public class MyJobHandler {@XxlJob("myJobHandler")public ReturnT<String> execute(String param) {XxlJobLogger.log("任务执行日志");System.out.println("任务执行:" + param);return ReturnT.SUCCESS;}
}
4.4.5 执行器管理
在调度中心“执行器管理”界面添加执行器,支持自动注册和手动录入。
4.5 任务界面详解
任务配置项包括:
• 执行器
• 任务描述
• 路由策略
• Cron表达式
• 运行模式(BEAN模式、GLUE模式)
• 阻塞处理策略
• 子任务
• 任务超时时间
• 失败重试次数
• 报警邮件
• 负责人
• 执行参数
4.6 分片广播任务
分片广播任务适用于大数据量处理场景,如分片任务、广播任务等。任务会广播到所有执行器,执行器根据分片参数处理数据。
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {@Overridepublic ReturnT<String> execute(String param) {ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());return SUCCESS;}
}
五、使用XXL-JOB改造购物车数据同步
5.1 异步执行失败记录到Redis
改造CartAsyncExceptionHandler
,将异常用户记录到Redis。
@Slf4j
@Component
public class CartAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String KEY = "cart:async:exception";@Overridepublic void handleUncaughtException(Throwable throwable, Method method, Object... objects) {log.error("异步调用发生异常,方法:{},参数:{},异常信息:{}", method, objects, throwable.getMessage());String userId = objects[0].toString();BoundListOperations<String, String> listOps = this.redisTemplate.boundListOps(KEY);listOps.leftPush(userId);}
}
5.2 添加任务同步购物车数据
在定时任务工程中引入Redis和Mapper接口,添加CartJobHandler
任务。
@Component
public class CartJobHandler {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate CartMapper cartMapper;private static final String KEY = "cart:async:exception";private static final String KEY_PREFIX = "cart:info:";@XxlJob("cartJobHandler")public ReturnT<String> executor(String param) {BoundListOperations<String, String> listOps = this.redisTemplate.boundListOps(KEY);String userId = listOps.rightPop();while (StringUtils.isNotBlank(userId)) {this.cartMapper.delete(new UpdateWrapper<Cart>().eq("user_id", userId));BoundHashOperations<String, Object, Object> hashOps = this.redisTemplate.boundHashOps(KEY_PREFIX + userId);List<Object> cartJsons = hashOps.values();if (!CollectionUtils.isEmpty(cartJsons)) {cartJsons.forEach(cartJson -> {this.cartMapper.insert(JSON.parseObject(cartJson.toString(), Cart.class));});}userId = listOps.rightPop();}return ReturnT.SUCCESS;}
}
六、总结
XXL-JOB是一个功能强大、易于使用的分布式任务调度框架,支持高可用、动态配置和任务统一管理。通过XXL-JOB,可以轻松实现定时任务的调度和执行,适用于各种业务场景。