kafka消费者多线程开发

目录

前言

kafka consumer 设计原理

多线程的方案 

参考资料


前言

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

kafka consumer 设计原理

 从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

 所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

 单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

多线程的方案 

 我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

 2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。 

这两种方案的比较如下:

实现代码示例如下:

方案一的代码:

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));//  执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构

方案2 的代码:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true)  {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..

参考资料

20 | 多线程开发消费者实例-极客时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/139697.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pom.xml中解决“vulnerable dependency maven:org.yaml:snakeyaml:1.33“警告问题

问题 当我们引入依赖的时候&#xff0c;pom文件会有这样的提示&#xff0c;其大概的意思就是 maven:org.yaml:snakeyaml:1.30"表示通过Maven引入了一个潜在的安全漏洞依赖项"org.yaml:snakeyaml:1.30" 解决办法 其实我们就是要更改这个依赖的版本&#xff0c…

【李沐深度学习笔记】按特定轴求和

课程地址和说明 线性代数实现p4 本系列文章是我学习李沐老师深度学习系列课程的学习笔记&#xff0c;可能会对李沐老师上课没讲到的进行补充。 这节就算之前内容的复习&#xff0c;后面以截图形式呈现 这节课就简单说明以下&#xff0c;axis为0是行&#xff0c;1是列&#xf…

解决方案:TSINGSEE青犀+智能分析网关助力智慧仓储智能化监管

为全面保障物流仓储的安全性与完整性&#xff0c;解决仓库管理难题&#xff0c;优化物流仓储方式&#xff0c;提升仓储效率&#xff0c;降低人工成本&#xff0c;旭帆科技推出智慧仓储AI视频智能分析方案&#xff0c;利用物联网、大数据、云计算等技术&#xff0c;对仓储管理进…

Date类的学习笔记-超级详细

Date 的定义, 在开始研究这个之前我们首先要能够明白一点&#xff0c;这个 Date 其实本质上是一个对象&#xff0c;我们通过这个对象可以去构建变量&#xff0c;知道这个之后就可以开展后续的研究了 JDK 通用 Date 类的构造方法 测试 获取当前的时间 // 构造这个日期对象Date…

【@PostConstruct、 @Autowired与构造函数的执行顺序】

PostConstruct、 Autowired与构造函数的执行顺序 一、PostConstruct介绍二、Spring框架中在bean初始化和销毁时候执行实现方式三、项目验证1.MyServiceImpl2.测试结果3. 项目源码 最近对同事代码进行codeReview时候发现用PostConstruct注解&#xff0c;特地对此注解执行顺序进行…

IDEA2023新UI回退老UI

idea2023年发布了新UI&#xff0c;如下所示 但是用起来真心不好用&#xff0c;各种位置也是错乱&#xff0c;用下面方法可以回退老UI

【C刷题】day3

一、选择题 1、已知函数的原型是&#xff1a; int fun(char b[10], int *a); &#xff0c;设定义&#xff1a; char c[10];int d; &#xff0c;正确的调用语句是&#xff08; &#xff09; A: fun(c,&d); B: fun(c,d); C: fun(&c,&d); D: fun(&c,d); 【答案…

07_ElasticSearch:倒排序索引与分词Analysis

07_ElasticSearch&#xff1a;倒排序索引与分词Analysis 一、 倒排索引是什么&#xff1f;1.1 通过示例&#xff0c;简单理解下1.2 核心组成 二、倒排索引是怎么工作的&#xff1f;2.1 创建倒排索引2.2 倒排索引搜索 三、Analysis 进行分词3.1 Analyzer 由三部分组成3.2 Analyz…

【JS】—垃圾回收机制

一、指令材料 1.定义 JavaScript&#xff08;JS&#xff09;的垃圾回收机制是一种自动管理内存的过程&#xff0c;它有助于释放不再使用的内存&#xff0c;以避免内存泄漏和提高程序的性能。 JavaScript的垃圾回收机制是一种自动管理内存的方式&#xff0c;以确保不再被引用的…

Linux Shell 实现一键部署podman

podman 介绍 使用 Podman 管理容器、Pod 和映像。从本地环境中无缝使用容器和 Kubernetes&#xff0c;Podman 提供与 Docker 非常相似的功能&#xff0c;它不需要在你的系统上运行任何守护进程&#xff0c;并且它也可以在没有 root 权限的情况下运行。 Podman 可以管理和运行…

Hive 的函数介绍

目录 ​编辑 一、内置运算符 1.1 关系运算符 1.2算术运算符 1.3逻辑运算符 1.4复杂类型函数 1.5对复杂类型函数操作 二、内置函数 2.1数学函数 2.2收集函数 2.3类型转换函数 2.4日期函数 2.5条件函数 2.6字符函数 三、内置的聚合函数 四、内置表生成函数 五、…

Android Jetpack组件架构:Lifecycle的使用 和 原理

Android Jetpack组件架构&#xff1a;Lifecycle的使用和原理 导言 作为Jetpack中关于生命周期管理的核心组件&#xff0c;Lifecycle组件是其他比如LiveDate和ViewModel等组件的基础&#xff0c;本篇文章主要就将介绍关于Lifecycle的使用和它的运作原理。 Lifecycle的使用 我…

MyBatis 中的插件可以拦截哪些操作

MyBatis 中的插件可以拦截哪些操作 MyBatis 是一个优秀的持久化框架&#xff0c;在实际项目开发中广泛应用。MyBatis 的插件机制可以方便地对 MyBatis 的各个环节进行扩展和定制。在本文中&#xff0c;我们将详细介绍 MyBatis 中的插件机制&#xff0c;并探讨插件可以拦截哪些…

C语言大佬的必杀技---宏的高级用法

C语言大佬的必杀技—宏的高级用法 目录: 字符串化标记的拼接宏的嵌套替换多条语句防止一个文件被重复包含宏和函数的区别 可能大家在学习的时候用得比较少&#xff0c;但是在一些代码量比较大的时候&#xff0c;这样使用&#xff0c;可以大大的提高代码的可读性&#xff0c;…

Dependency ‘org.redisson:redisson:‘ not found解决方法 三种刷新Maven项目的方法

报错情况 在pom中导入redisson包 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId> </dependency> 爆红&#xff0c;还显示Dependency org.redisson:redisson: not found。 由于报错已经解决&#xff0c;…

002-第一代硬件系统架构确立及产品选型

第一代硬件系统架构确立及产品选型 文章目录 第一代硬件系统架构确立及产品选型项目介绍摘要硬件架构硬件结构选型及设计单片机选型上位机选型扯点别的 关键字&#xff1a; Qt、 Qml、 信号采集机、 数据处理、 上位机 项目介绍 欢迎来到我们的 QML & C 项目&#xff…

【视觉SLAM入门】8. 回环检测,词袋模型,字典,感知,召回,机器学习

"见人细过 掩匿盖覆” 1. 意义2. 做法2.1 词袋模型和字典2.1.2 感知偏差和感知变异2.1.2 词袋2.1.3 字典 2.2 匹配(相似度)计算 3. 提升 前言&#xff1a; 前端提取数据&#xff0c;后端优化数据&#xff0c;但误差会累计&#xff0c;需要回环检测构建全局一致的地图&…

【AI视野·今日Sound 声学论文速览 第十期】Fri, 22 Sep 2023

AI视野今日CS.Sound 声学论文速览 Fri, 22 Sep 2023 Totally 1 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Sound Papers Performance Conditioning for Diffusion-Based Multi-Instrument Music Synthesis Authors Ben Maman, Johannes Zeitler, Meinard M lle…

【新版】系统架构设计师 - 案例分析 - 架构设计<架构风格和质量属性>

个人总结&#xff0c;仅供参考&#xff0c;欢迎加好友一起讨论 文章目录 架构 - 案例分析 - 架构设计&#xff1c;架构风格和质量属性&#xff1e;例题1例题2例题3例题4例题5例题6 架构 - 案例分析 - 架构设计&#xff1c;架构风格和质量属性&#xff1e; 例题1 某软件公司为…

Python —— pytest框架

1、认识pytest框架 1、搭建自动化框架的思路与流程 1、搭建自动化测试框架的思路和流程&#xff0c;任意测试手段流程都是一致的&#xff1a;手工测试、自动化测试、工具测试 手工测试&#xff1a;熟悉业务 —— 写用例 —— 执行用例并记录结果 —— 生成测试报告自动化测试…