Kafka 深入服务端 — 时间轮

Kafka中存在大量的延迟操作,比如延时生产、延时拉取和延时删除等。Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器,来完成这些延迟操作。

1 时间轮

Kafka没有使用基于JDK自带的Timer或DelayQueue来实现延迟功能,因为它们的插入和删除操作的时间复杂度为logn,这不能满足Kafka高性能要求。

1.1 Timer 和 DelayQueue

它们都使用了一个优先级队列(通常基于堆实现)来管理任务。

1.1.1 Timer

用于计划在特定时间后执行的任务,这些任务可以只执行一次或定期重复执行。其有以下特点:

  1. 运行在单线程中,无法满足多个任务同时执行的需求。如果前置任务耗时长,可能会阻塞后置任务。
  2. 如果任务执行过程中抛出异常,Timer会被异常中断停止。
public class TimerTest {public static void main(String[] args) {Timer timer = new Timer();Date startTime = new Date();TimerTask task1 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task1执行,距离开始时间:" + dis + "s");}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task2执行,距离开始时间:" + dis + "s,休眠5s。");try {Thread.sleep(5000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("task2休眠结束");}};TimerTask task3 = new TimerTask() {@Overridepublic void run() {long dis = (new Date().getTime() - startTime.getTime()) / 1_000;System.out.println("task3执行,距离开始时间:" + dis + "s");}};timer.schedule(task1,1000); // 1s后执行timer.schedule(task2,2000); // 2s后执行timer.schedule(task3,3000); // 3s后执行}
//    执行结果:
//    task1执行,距离开始时间:1s
//    task2执行,距离开始时间:2s,休眠5s。
//    task2休眠结束
//    task3执行,距离开始时间:7s
}

1.1.2 DelayQueue

是一个无界阻塞队列,用于存储实现了Delayed接口的元素,这些元素只有在它们的延迟期满时才会被取出。其有以下特点:

  1. 线程安全,可以在多线程环境中使用。
  2. 无界队列,可以存储任意数量的元素,直到系统内存耗尽。
  3. 延迟精度依赖与系统时钟。
public class DelayQueueTest {private static class DelayQueueTask implements Delayed {private final String taskName;private final long delayTime;private DelayQueueTask(String taskName, long delayTime) {this.taskName = taskName;this.delayTime = delayTime + System.currentTimeMillis();}@Overridepublic long getDelay(TimeUnit unit) { // 返回剩余延迟long diff = delayTime - System.currentTimeMillis();return unit.convert(diff,TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {if (this.delayTime < ((DelayQueueTask) o).delayTime) {return -1;}if (this.delayTime > ((DelayQueueTask) o).delayTime) {return 1;}return 0;}@Overridepublic String toString() {return "DelayQueueTask{" +"taskName='" + taskName + '\'' +", delayTime=" + delayTime +'}';}}public static void main(String[] args) throws InterruptedException {DelayQueue<DelayQueueTask> delayQueue = new DelayQueue<>();delayQueue.offer(new DelayQueueTask("task1",5000));delayQueue.offer(new DelayQueueTask("task2",2000));delayQueue.offer(new DelayQueueTask("task3",4000));System.out.println("开始执行delayQueue任务");while (!delayQueue.isEmpty()) {DelayQueueTask task = delayQueue.take();System.out.println("任务:" + task);}System.out.println("delayQueue任务 任务执行完毕");}
}

1.2 时间轮结构

Kafka 在任务的插入与删除采用了时间轮结构,其时间复杂度为O(1),而在时间推进上,还是依赖JDK提供的DelayQueue。

图 时间轮(TimingWheel)结构

Kafka的时间轮是一个存储定时任务的环形队列,每个元素(时间格)相当于一个桶(bucket),来存储一个定时任务列表TimerTaskList。TimerTaskList是一个环形的双向链表,链表中的每一项都是定时任务项TimerTaskEntry,其中封装了真正的定时任务TimerTask。

Kafka将TimerTaskList插入到DelayQueue(队列)中,使其成为其中的一个元素。它的过期时间为TimerTaskList的TimerTaskEntry中最快过期的时间。

1.2.1 时间格与时间跨度

时间轮由多个时间格组成(上面示意图中,每一层有10个时间格),每个时间格代表时间轮的基本时间跨度(tick),时间格数(wheelSize)是固定的,那么时间轮的总体跨度interval = tick * wheelSize。

时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间。currentTime是tick的整数倍,currentTime将整个时间轮划分为到期部分和未到期部分。当前指向的时间格也属于到期部分,此时需要处理此时间格所对应的TimerTaskList中的所有任务。

上面示意图中,tick = 1s,此时currentTime指向第2个时间格,需要处理这个时间格存储的所有任务。假设,此时插入了一个3s后的任务,则把该任务插入第5个时间格中的bucket。

1.2.2 时间轮层级

当currentTime 指向第2个时间格时,需要插入一个33s后的任务,此时时间超过了第一层的跨度(1s * 10 = 10s)。Kafka引入层级时间轮的概念,当到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层的时间轮中。

33s 的任务会被插入到第二层的第(33 / 10 = 3) 3 个时间格中。

第一层的开始时间(第0格)startMs 是当前系统时间。其余高层时间轮的起始时间都设置为创建此层时前面第一轮的currentTime。

每一层时间轮都会有指向更高一层的引用。

1.2.3 任务处理及时间轮降级

图 时钟

时间轮类似于时钟,当每一层走完一圈时,上一层就会走一格。例如当第1层的currentTime 指向第2格时,此时需要插入两个任务,分别是33s及39s后。它们都会被插入到第2层的第3格中的bucket(TimerTaskList)。

假设经过33s(第1层指向第5格,第2层指向第3格)后,此bucket还是只有这两个任务。Kafka会把它们所在的TimerTaskList从第2层的第3格中取出,将33s的任务执行并从TimerTaskList中删除。此时,39s的任务还剩6s,Kafka会把这个任务“降级”,插入到第1层第1((5+6)% 10)格中。

1.2.4 时间推进与DelayQueue

如果按照时间格一格格推进时间,这样消耗会比较大,而且可能好多时间格没有存储任务。Kafka借助DelayQueue来推进时间。

将时间格bucket的TimerTaskList封装成Delayed,其剩余时间取TimerTaskList中TimerTaskEntry最快达到的时间。然后将这些Delayed插入到DelayQueue中。DelayQueue会将这些Delayed排序,最快到达的排在队列头部。当到达时刻时,将表头的TimerTaskEntry取出,对它的TaskEntry执行任务执行或降级等操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/7390.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Baklib如何推动企业知识管理的创新与转型探讨

内容概要 在当今快速发展的数字化时代&#xff0c;企业需要不断适应变化&#xff0c;以保持竞争优势。Baklib作为一款企业知识管理中台&#xff0c;扮演着推动数字化转型的重要角色。它通过提供一个集成的知识管理平台&#xff0c;帮助企业高效管理和共享内部及外部的知识资源…

日志收集Day005

1.filebeat的input类型之filestream实战案例: 在7.16版本中已经弃用log类型,之后需要使用filebeat,与log不同&#xff0c;filebeat的message无需设置就是顶级字段 1.1简单使用&#xff1a; filebeat.inputs: - type: filestreamenabled: truepaths:- /tmp/myfilestream01.lo…

【Rust自学】15.3. Deref trait Pt.2:隐式解引用转化与可变性

喜欢的话别忘了点赞、收藏加关注哦&#xff08;加关注即可阅读全文&#xff09;&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 15.3.1. 函数和方法的隐式解引用转化(Deref Coercion) 隐式解引用转化(Deref Coercion)是为…

【技巧】优雅的使用 pnpm+Monorepo 单体仓库构建一个高效、灵活的多项目架构

单体仓库&#xff08;Monorepo&#xff09;搭建指南&#xff1a;从零开始 单体仓库&#xff08;Monorepo&#xff09;是一种将多个相关项目集中管理在一个仓库中的开发模式。它可以帮助开发者共享代码、统一配置&#xff0c;并简化依赖管理。本文将通过实际代码示例&#xff0…

【MySQL — 数据库增删改查操作】深入解析MySQL的create insert 操作

数据库CRUD操作 1 CRUD简介 CURD是对数据库中的记录进行基本的增删改查操作: 2. Create 新增 语法 INSERT [INTO] table_name[(column [&#xff0c;column] ...)] VALUES(value_list)[&#xff0c;(value_list)] ... # value 后面的列的个数和类型&#xff0c;要和表结构匹配…

VSCode下EIDE插件开发STM32

VSCode下STM32开发环境搭建 本STM32教程使用vscode的EIDE插件的开发环境&#xff0c;完全免费&#xff0c;有管理代码文件的界面&#xff0c;不需要其它IDE。 视频教程见本人的 VSCodeEIDE开发STM32 安装EIDE插件 Embedded IDE 嵌入式IDE 这个插件可以帮我们管理代码文件&am…

electron打包客户端在rk3588上支持h265硬解

目录 前言 chromium是如何支持h265硬解 electron/chromium第一次编译 electron/chromium第二次编译 前言 我们的客户端程序是用electron打包的前端程序&#xff0c;其在rk3588主机上的linux环境运行。之前使用客户端查看h264编码的视频直播是没有问题的&#xff0c;但视频源…

An OpenGL Toolbox

3.An OpenGL Toolbox 声明&#xff1a;该代码来自&#xff1a;Computer Graphics Through OpenGL From Theory to Experiments&#xff0c;仅用作学习参考 3.1 Vertex Arrays and Their Drawing Commands 顶点数组及其绘制命令&#xff1a;将几何数据存储在一个位置&#xff0c…

Three城市引擎地图插件Geo-3d

一、简介 基于Three开发&#xff0c;为Three 3D场景提供GIS能力和城市底座渲染能力。支持Web墨卡托、WGS84、GCJ02等坐标系&#xff0c;支持坐标转换&#xff0c;支持影像、地形、geojson建筑、道路&#xff0c;植被等渲染。支持自定义主题。 二、效果 三、代码 //插件初始化…

侧边导航(Semi Design)

根据前几次的导航栏设计&#xff0c;从最简单的三行导航栏到后面响应式的导航栏&#xff0c;其实可以在这个的基础上慢慢优化&#xff0c;就可以得到一个日常使用设计的导航栏。设计步骤也和之前的类似。 一、实现步骤 1、先下载安装好npm install douyinfe/semi-icons 2、引…

【中间件快速入门】什么是Redis

现在后端开发会用到各种中间件&#xff0c;一不留神项目可能在哪天就要用到一个我们之前可能听过但是从来没接触过的中间件&#xff0c;这个时候对于开发人员来说&#xff0c;如果你不知道这个中间件的设计逻辑和使用方法&#xff0c;那在后面的开发和维护工作中可能就会比较吃…

将 OneLake 数据索引到 Elasticsearch - 第二部分

作者&#xff1a;来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo 本文分为两部分&#xff0c;第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。 在本文中&#xff0c;我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器…

“AI教学实训系统:打造未来教育的超级引擎

嘿&#xff0c;各位教育界的伙伴们&#xff0c;今天我要跟你们聊聊一个绝对能让你们眼前一亮的教学神器——AI教学实训系统。作为资深产品经理&#xff0c;我可是亲眼见证了这款系统如何颠覆传统教学&#xff0c;成为未来教育的超级引擎。 一、什么是AI教学实训系统&#xff1f…

Linux下php8安装phpredis扩展的方法

Linux下php8安装phpredis扩展的方法 下载redis扩展执行安装编辑php.ini文件重启php-fpmphpinfo 查看 下载redis扩展 前提是已经安装好redis服务了 php-redis下载地址 https://github.com/phpredis/phpredis 执行命令 git clone https://github.com/phpredis/phpredis.git执行…

基于SMPL的三维人体重建-深度学习经典方法之VIBE

本文以开源项目VIBE[1-2]为例&#xff0c;介绍下采用深度学习和SMPL模板的从图片进行三维人体重建算法的整体流程。如有错误&#xff0c;欢迎评论指正。 一.算法流程 包含生成器模块和判别器模块&#xff0c;核心贡献就在于引入了GRU模块&#xff0c;使得当前帧包含了先前帧的先…

2.1.3 第一个工程,点灯!

新建工程 点击菜单栏左上角&#xff0c;新建工程或者选择“文件”-“新建工程”&#xff0c;选择工程类型“标准工程”选择设备类型和编程语言&#xff0c;并指定工程文件名及保存路径&#xff0c;如下图所示&#xff1a; 选择工程类型为“标准工程” 选择主模块机型&#x…

CVE-2025-0411 7-zip 漏洞复现

文章目录 免责申明漏洞描述影响版本漏洞poc漏洞复现修复建议 免责申明 本文章仅供学习与交流&#xff0c;请勿用于非法用途&#xff0c;均由使用者本人负责&#xff0c;文章作者不为此承担任何责任 漏洞描述 此漏洞 &#xff08;CVSS SCORE 7.0&#xff09; 允许远程攻击者绕…

mysql 学习6 DML语句,对数据库中的表进行 增 删 改 操作

添加数据 我们对 testdatabase 数据中 的 qqemp 这张表进行 增加数据&#xff0c;在这张表 下 打开 命令行 query console 在 软件中就是打开命令行的意思 可以先执行 desc qqemp; 查看一下当前表的结构。 插入一条数据 到qqemp 表&#xff0c;插入时要每个字段都有值 insert…

[特殊字符]【计算机视觉】r=2 采样滤波器全解析 ✨

Hey小伙伴们&#xff01;今天来给大家分享一个在 计算机视觉 领域中非常有趣但又超级重要的概念——r2 采样滤波器&#xff08;Sampling Filter with r2&#xff09;。通过这种滤波器&#xff0c;我们可以在图像降采样的过程中有效地减少混叠效应&#xff0c;提升图像质量。 如…

数据库SQLite和SCADA DIAView应用教程

课程简介 此系列课程大纲主要包含七个课时。主要使用到的开发工具有&#xff1a;SQLite studio 和 SCADA DIAView。详细的可成内容大概如下&#xff1a; 1、SQLite 可视化管理工具SQLite Studio &#xff1a;打开数据库和查询数据&#xff1b;查看视频 2、创建6个变量&#x…