分布式任务调度

今天我们讲讲分布式定时任务调度—ElasticJob。

一、概述

1、什么是分布式任务调度

我们可以思考⼀下下⾯业务场景的解决⽅案:

  • 某电商平台需要每天上午10点,下午3点,晚上8点发放⼀批优惠券

  • 某银⾏系统需要在信⽤卡到期还款⽇的前三天进⾏短信提醒

  • 某财务系统需要在每天凌晨0:10分结算前⼀天的财务数据,统计汇总

以上场景就是任务调度所需要解决的问题

任务调度是为了⾃动完成特定任务,在约定的特定时刻去执⾏任务的过程

2、为什么需要分布式调度

我们在之前使⽤过Spring中提供的定时任务注解@Scheduled,在业务类中⽅法中贴上这个注解然后在启动类上贴上 @EnableScheduling 注解

那为什么又需要分布式调度?

感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?

主要有如下这⼏点原因:

1.单机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来⼀个统计需要1⼩时,现在业务⽅需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并⾏处理可以提⾼单位时间的处理效率,但是单机能⼒毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。

2.⾼可⽤:单机版的定式任务调度只能在⼀台机器上运⾏,如果程序或者系统出现异常就会导致功能不可⽤。虽然可以在单机程序实现的⾜够稳定,但始终有机会遇到⾮程序引起的故障,⽽这个对于⼀个系统的核⼼功能来说是不可接受的。

3.防⽌重复执⾏: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时⼜每台服务⼜有定时任务时,可能会出现任务重复执行

这个时候就需要分布式的任务调度来实现了。

3、Elastic-Job介绍

Elastic-Job是⼀个分布式调度的解决⽅案,由当当⽹开源,它由两个相互独⽴的⼦项⽬Elastic-job-Lite和

Elastic-Job-Cloud组成,使⽤Elastic-Job可以快速实现分布式任务调度。

Elastic-Job的地址: ElasticJob - Distributed scheduled job solution

功能列表:

  • 分布式调度协调

在分布式环境中,任务能够按照指定的调度策略执⾏,并且能够避免同⼀任务多实例重复执⾏。

  • 丰富的调度策略:

基于成熟的定时任务作业框架Quartz cron表达式执⾏定时任务。

  • 弹性拓容缩容

当集群中增加⼀个实例,它应当能够被选举被执⾏任务;当集群减少⼀个实例时,他所执⾏的任务能被转移到别的示例中执⾏。

  • 失效转移

某示例在任务执⾏失败后,会被转移到其他实例执⾏。

  • 错过执⾏任务重触发

若因某种原因导致作业错过执⾏,⾃动记录错误执⾏的作业,并在下次次作业完成后⾃动触发。

  • ⽀持并⾏调度

⽀持任务分⽚,任务分⽚是指将⼀个任务分成多个⼩任务在多个实例同时执⾏。

  • 作业分⽚⼀致性

当任务被分⽚后,保证同⼀分⽚在分布式环境中仅⼀个执⾏实例。

  • ⽀持作业⽣命周期操作

可以动态对任务进⾏开启及停⽌操作。

  • 丰富的作业类型

⽀持Simple、DataFlow、Script三种作业类型

系统运行架构图

二、Zookeeper下载

建议:zookeeper3.4.6以上版本,JDK1.7以上,maven在3.0.4以上

我这里将Zookeeper安装到了Linux系统上,以我自己安装为例,可自行选择。

1.上传,将zookeeper-3.4.11.tar.gz上传到/usr/local/src/soft/zookeeper目录下

2.解压文件到指定目录

tar -zxvf /usr/local/src/soft/zookeeper-3.4.11.tar.gz -C /usr/local/src/soft/zookeeper

3.拷贝配置文件

cp /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo.cfg

4.启动

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh start

5.检查进程是否开启

这个命令不一定有效

jps

可以试试这个命令,查看状态

/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh status

三、任务分片参数

1、分片的概念

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

例如:Elastic-job快速入门中文件备份的案例,现有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2査找radio和vedio类型文件执行备份。如果由于服务器拓容应用实例数量增加为4,则作业谝历数据的逻辑应为: 4个实例分别处理text,image,radio,video类型的文件。

可以看到,通过对任务的合理分片化,从而达到任务并行处理的效果.

  • 当只有一台机器时,给定时任务分片四个,在机器中启动四个线程,分别处理四个分片的内容

  • 当只有两台机器时,分片由两台机器进行分配,A负责索引为0,1的内容,B负责索引为2,3的内容

  • 三台机器,如图

  • 四台机器,平均分摊

集群之后,可以分摊CPU的处理压力,提高数据处理的速度,那到底几台机器好呢?

分片数建议是机器个数的倍数

在秒杀项目中,我们将秒杀商品的场次分成了10、12、14三个场次。在这里我们就根据场次将其分成三片

2、分片分配机制

ElasticJob的分片分配机制

  • Zookeeper 协调:ElasticJob 通过 Zookeeper 协调任务实例的分片分配。每个任务实例启动时,会向 Zookeeper 注册自己,并获取分配给自己的分片信息。

  • 动态分片分配:如果任务实例的数量发生变化(比如新增或减少实例),Zookeeper 会重新分配分片,确保每个分片都有任务实例处理。

  • 分片参数传递:分片参数通过 ShardingContext 传递给任务实例,任务实例根据分片参数执行对应的逻辑。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

三、项目集成

1、依赖添加

<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version>
</dependency>
<!--zookeeper客户端-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.10.0</version>
</dependency>

2、分布式调度配置

1)注册中心配置

获取zk的地址和任务名称,将任务注册到zk注册中心中

@Configuration
public class RegistryCenterConfig {@Bean(initMethod = "init")public CoordinatorRegistryCenter createRegistryCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {//zk的配置ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl,groupName);//设置zk超时时间zookeeperConfiguration.setSessionTimeoutMilliseconds(100);//创建注册中心CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);return zookeeperRegistryCenter;}
}
2)分布式调度参数配置
#分布式定时任务配置
elasticjob:zookeeper-url: 192.168.88.130:2181group-name: shop-job-groupjobCron:#3分钟执行一次seckillProduct: 0 0/3 * * * ?
3)定时任务配置

需要使用定时任务的服务可能不止一个,不同的定时任务,表达式和分片参数、个数都不一样

将不同的定时任务创建不同的LiteJobConfiguration对象,指定不同的参数类型,将其创建为一个Bean,交给spring容器管理

@Configuration
public class BusinessJobConfig {
@Bean(initMethod = "init")public SpringJobScheduler initSPJob(CoordinatorRegistryCenter registryCenter, SeckillProductJob seckillProductJob){LiteJobConfiguration jobConfiguration = ElasticJobUtil.createJobConfiguration(seckillProductJob.getClass(),seckillProductJob.getCron(),//任务类的cron表达式seckillProductJob.getShardingTotalCount(), //分片个数seckillProductJob.getShardingParameters(), //分片参数seckillProductJob.isDataflowType());//不是dataflow类型SpringJobScheduler springJobScheduler = new SpringJobScheduler(seckillProductJob, registryCenter,jobConfiguration );return springJobScheduler;}
}    
4)分布式调度工具类

主要是用于创建LiteJobConfiguration对象,为不同的定时任务定义不同的配置类型

  • 指定定时任务类

  • 任务类的cron表达式

  • 分片个数

  • 分片参数

  • 是否为dataflow类型

public class ElasticJobUtil {public static LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,
final String shardingItemParameters,boolean dataflowType) {// 定义作业核心配置JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);if(!StringUtils.isEmpty(shardingItemParameters)){jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);}JobTypeConfiguration jobConfig = null;if(dataflowType){jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);}else {// 定义SIMPLE类型配置jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());}// 定义Lite作业根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();return simpleJobRootConfig;}public static LiteJobConfiguration createDefaultSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {// 创建默认的SIMPLE类型作业配置return createJobConfiguration(jobClass,cron,1,null,false);}public static LiteJobConfiguration createDefaultDataFlowJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {// 创建默认的DataFlow类型作业配置return createJobConfiguration(jobClass,cron,1,null,true);}
}
5)任务分片参数配置
  1. 删除之前的数据

  2. 查询当天的数据同步Redis,定时任务每天执行一次

  3. 给定时任务分片处理,分三片

给定时任务做分片处理:0=10,1=12,2=14(第10场秒杀任务,第12场.........)

jobSharding:seckillProduct:shardingParameters: 0=10,1=12,2=14shardingTotalCount: 3dataflowType: false

3、定时任务类

初始化秒杀商品定时任务

@Data
@RefreshScope
@Component
public class SeckillProductJob implements SimpleJob {//表达式@Value("${elasticjob.jobCron.seckillProduct}")private String cron;//分片参数@Value("${jobSharding.seckillProduct.shardingParameters}")private String shardingParameters;//分片个数@Value("${jobSharding.seckillProduct.shardingTotalCount}")private int shardingTotalCount;@Value("${jobSharding.seckillProduct.dataflowType}")private boolean dataflowType;@Resourceprivate SeckillProductFeignApi seckillProductFeignApi;@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic void execute(ShardingContext shardingContext) {String time = shardingContext.getShardingParameter();//远程调用商品服务获取秒杀列表集合Result<List<SeckillProductVo>> result = seckillProductFeignApi.queryByTime(Integer.valueOf(time));if (result==null||result.hasError()) {//通知管理员return;}List<SeckillProductVo> seckillProductVoList = result.getData();//获取秒杀商品key-seckillProductHash:10String key = JobRedisKey.SECKILL_PRODUCT_HASH.getRealKey(time);//删除之前的缓存redisTemplate.delete(key);HashMap<String, String> seckillProductMap = new HashMap<>();//存储集合数据到Redis中for (SeckillProductVo vo : seckillProductVoList) {seckillProductMap.put(vo.getId().toString(), JSON.toJSONString(vo));}redisTemplate.opsForHash().putAll(key, seckillProductMap);System.out.println("分布式商品秒杀任务执行...............");}
}

秒杀列表缓存成功

4、任务分片处理逻辑

分片参数设置为 0=10,1=12,2=14,表示分片0处理时间参数为10的任务,分片1处理时间参数为12的任务,分片2处理时间参数为14的任务。ElasticJob 会根据分片参数将任务分片,并将每个分片分配给不同的任务实例执行。

  • 分片总数(ShardingTotalCount):表示任务的总分片数。比如你有3场秒杀活动,可以将分片总数设置为3,每个分片处理一场秒杀活动。

  • 分片参数(ShardingParameters):可以为每个分片指定参数,比如分片0处理第一场秒杀,分片1处理第二场秒杀,分片2处理第三场秒杀。

ElasticJob 通过 Zookeeper 协调,将分片分配给不同的任务实例。每个任务实例启动时,会从 Zookeeper 获取分配给自己的分片信息(分片编号和分片参数)。

今天的分享结束,感兴趣的兄弟请点赞、收藏,关注我不迷路!下期再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/37014.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Blender标注工具

按住键盘D键 鼠标左键绘制 / 右键擦除 也可以在上方选择删除

Second Me:在 AI 中保留自我的火种丨社区来稿

今天想和所有朋友们分享一种全新的 AI 可能性&#xff0c;Second Me&#xff01; 2025年了&#xff0c;很多人和我一样&#xff0c;都越来越确信&#xff0c;AGI 的到来只是一个时间问题。 然而我也经常想&#xff0c;当我们所有人&#xff0c;都心甘情愿地为自己“造神” –…

仿新浪微博typecho主题源码

源码介绍 仿新浪微博typecho主题源码&#xff0c;简约美观&#xff0c;适合做个人博客&#xff0c;该源码为主题模板&#xff0c;需要先搭建typecho&#xff0c;然后吧源码放到对应的模板目录下&#xff0c;后台启用即可 源码特点 支持自适应 个性化程度高 可设置背景图、顶…

Ubuntu24搭建k8s高可用集群

Ubuntu24搭建k8s高可用集群 环境信息 主机名IPk8s版本备注vm-master192.168.103.2501.28.2master1vm-master2192.168.103.2491.28.2master2vm-master3192.168.103.2541.28.2master3vm-node1192.168.103.2511.28.2node1vm-node2192.168.103.2521.28.2node2 容器进行时&#xf…

洛谷P1216 [IOI 1994] 数字三角形 Number Triangles(动态规划)

P1216 [IOI 1994] 数字三角形 Number Triangles - 洛谷 代码区&#xff1a; #include<algorithm> #include<iostream>using namespace std; const int R 1005; int dp[R][R]; int arr[R][R]; int main() {int n;cin >> n;for (int i 1; i < n; i) {for…

Spring Boot Actuator 自定义健康检查(附Demo)

目录 前言1. Demo2. 拓展 前言 &#x1f91f; 找工作&#xff0c;来万码优才&#xff1a;&#x1f449; #小程序://万码优才/r6rqmzDaXpYkJZF Spring Boot 的 actuator 提供了应用监控的功能&#xff0c;其中健康检查&#xff08;Health Check&#xff09;是一个重要的部分&…

2025年优化算法:人工旅鼠算法(Artificial lemming algorithm,ALA)

人工旅鼠算法(Artificial lemming algorithm&#xff0c;ALA)是发表在中科院二区期刊“ARTIFICIAL INTELLIGENCE REVIEW”&#xff08;IF&#xff1a;11.7&#xff09;的2025年智能优化算法 01.引言 随着信息技术与工程科学的快速发展&#xff0c;现代优化问题呈现出高维、非线…

「实战指南 」Swift 并发中的任务取消机制

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

实验12深度学习

实验12深度学习 一、实验目的 &#xff08;1&#xff09;理解并熟悉深度神经网络的工作原理&#xff1b; &#xff08;2&#xff09;熟悉常用的深度神经网络模型及其应用环境&#xff1b; &#xff08;3&#xff09;掌握Anaconda的安装和设置方法&#xff0c;进一步熟悉Jupyte…

【问题解决】Postman 测试报错 406

现象 Tomcat 日志 org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.logException Resolved org.springframework.web.HttpMediaTypeNotAcceptableException: No acceptable representation HTTP状态 406 - 不可接收 的报错&#xff0c;核心原因 客…

微信小程序:用户拒绝小程序获取当前位置后的处理办法

【1】问题描述&#xff1a; 小程序在调用 wx.getLocation() 获取用地理位置时&#xff0c;如果用户选择拒绝授权&#xff0c;代码会直接抛出错误。如果再次调用 wx.getLocation() 时&#xff0c;就不会在弹窗询问用户是否允许授权。导致用户想要重新允许获取地理位置时&#x…

【MySQL】内置函数

目录 一、日期时间函数1.1 简单使用1.2 案例实操 二、字符串函数2.1 简单使用2.2 案例实践2.2.1 获取emp表的ename列的字符集2.2.2 要求显示exam_result表中的信息&#xff0c;显示格式&#xff1a;“XXX的语文是XXX分&#xff0c;数学XXX分&#xff0c;英语XXX分”2.2.3 求exa…

模块二 单元4 安装AD+DC

模块二 单元4 安装ADDC 两个任务&#xff1a; 1.安装AD活动目录 2.升级当前服务器为DC域控制器 安装前的准备工作&#xff1a; 确定你要操作的服务器系统&#xff08;Windows server 2022&#xff09;&#xff1b; 之前的服务器系统默认是工作组的模式workgroup模式&#xff08…

卫星互联网智慧杆:开启智能城市新时代​

哇哦&#xff01;在当下这个数字化浪潮正以雷霆万钧之势席卷全球的超酷时代&#xff0c;智慧城市建设已然成为世界各国你追我赶、竞相发力的核心重点领域啦&#xff01;而咱们的卫星互联网智慧杆&#xff0c;作为一项完美融合了卫星通信与物联网顶尖技术的创新结晶&#xff0c;…

ThreadLocal 的详细使用指南

一、ThreadLocal 核心原理 ThreadLocal 是 Java 提供的线程绑定机制&#xff0c;为每个线程维护变量的独立副本。其内部通过 ThreadLocalMap 实现&#xff0c;每个线程的 Thread 对象都有一个独立的 ThreadLocalMap&#xff0c;存储以 ThreadLocal 对象为键、线程局部变量为值…

免费开源的NAS解决方案:TrueNAS

TrueNAS是业内知名的FreeNAS系统的升级版&#xff0c;是一款开源的网络存储系统&#xff0c;具有高性能、稳定性和易用性等优点。 TrueNAS目前有三个版本&#xff0c;分别是TrueNAS CORE、TrueNAS ENTERPRISE、TrueNAS SCALE。其中&#xff0c;TrueNAS CORE基于FreeBSD开发&…

Fisher 信息矩阵公式原理:使用似然估计,二阶导数等知识点

Fisher 信息矩阵公式原理:使用似然估计,二阶导数等知识点 目录 Fisher 信息矩阵公式原理:使用似然估计,二阶导数等知识点Fisher 通过似然估计求解真实数据和权重参数之间的差异**1. Fisher 信息矩阵的定义****2. 计算对数似然函数的二阶导数****3. 代入 Fisher 信息矩阵定义…

自定义myshell(精讲)

我们都知道&#xff0c;我们给Linux下发的指令都是shell帮我们处理并完成的&#xff0c;那么他是怎么完成的呢&#xff1f;不难想到他都是通过环境变量以及程序替换来完成的。我们这一篇文章就手把手来教你怎么自己实现一个简单的shell。 目标&#xff1a; 1.要能处理普通命令 …

HTML图像标签的详细介绍

1. 常用图像格式 格式特点适用场景JPEG有损压缩&#xff0c;文件小&#xff0c;不支持透明适合照片、复杂图像PNG无损压缩&#xff0c;支持透明&#xff08;Alpha通道&#xff09;适合图标、需要透明背景的图片GIF支持动画&#xff0c;最多256色简单动画、低色彩图标WebP谷歌开…

信号的捕捉(操作部分)

目录 信号集和信号屏蔽字 信号集 信号屏蔽字 信号位操作函数 sigemptyset sigaddset sigismember sigprocmask sigpending 手动操作让2号信号屏蔽打印pending 信号处理函数sigaction 我们继续来学习信号的捕捉 信号集和信号屏蔽字 信号集 信号集是存储一组信号的…