Kafka核心参数与使用02

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    @RestController
    public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);}
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    @Component
    public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());}
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/505752.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Python】Python与C的区别

文章目录 语句结束符代码块表示变量声明函数定义注释格式Python的标识符数据输入input()函数数据输出print()函数 语句结束符 C 语言 C 语言中每条语句必须以分号;结束。例如&#xff0c;int a 10;、printf("Hello, World!");。分号是语句的一部分&#xff0c;用于…

了解模2除法:原理与应用

模2除法&#xff0c;也被称为二进制除法或XOR除法&#xff0c;是一种在二进制数制下进行的特殊除法运算。与常规的十进制或其他进制的除法不同&#xff0c;模2除法使用异或&#xff08;XOR&#xff09;运算代替减法&#xff0c;并且不涉及进位或借位。这种除法运算在数字通信、…

【GESP】C++二级练习 luogu-B2079, 求出 e 的值

GESP二级练习&#xff0c;循环语句嵌套&#xff0c;难度★✮☆☆☆。 题目题解详见&#xff1a;https://www.coderli.com/gesp-2-luogu-b2079/ https://www.coderli.com/gesp-2-luogu-b2079/https://www.coderli.com/gesp-2-luogu-b2079/

鼠标自动移动防止锁屏的办公神器 —— 定时执行专家

目录 ◆ 如何设置 ◇ 方法1&#xff1a;使用【执行Nircmd命令】任务 ◇ 方法2&#xff1a;使用【模拟键盘输入】任务 ◆ 定时执行专家介绍 ◆ 定时执行专家最新版下载 ◆ 如何设置 ◇ 方法1&#xff1a;使用【执行Nircmd命令】任务 1、点击工具栏第一个图标【新建任务】&…

2025新年源码免费送

2025很开门很开门的源码免费传递。不需要馒头就能获取4套大开门源码。 听泉偷宝&#xff0c;又进来偷我源码啦&#x1f44a;&#x1f44a;&#x1f44a;。欢迎偷源码 &#x1f525;&#x1f525;&#x1f525; 获取免费源码以及更多源码&#xff0c;可以私信联系我 我们常常…

微信小程序实现登录注册

文章目录 1. 官方文档教程2. 注册实现3. 登录实现4. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/路由跳转的几种方式&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/api/route/wx.switchTab…

1. Doris分布式环境搭建

一. 环境准备 本次测试集群采用3台机器hadoop1、hadoop2、hadoop3, Frontend和Backend部署在同一台机器上&#xff0c;Frontend部署3台组成高可用&#xff0c;Backend部署3个节点&#xff0c;组成3副本存储。 主机IP操作系统FrontendBackendhadoop1192.168.47.128Centos7Foll…

docker-compose安装canal并利用rabbitmq同步多个mysql数据

必看&#xff1a;本文默认已经安装好了docker-compose、rabbitmq、mysql并且mysql开启了binlog日志&#xff0c;不需要再安装&#xff1b; 流程图 如上图所示&#xff0c;左边是MQ模式流程图&#xff0c;右边则是TCP模式的流程图&#xff1b; 最终的目的是利用canal监听多个M…

ue5动画重定向,一键重定向。ue4小白人替换成ue5

这就是我们下载的 初学者动画内容包 点击设置选中列 绿色的是动画 黄色的关卡 蓝色是蓝图 ctrla 全选 ctrl鼠标左键 选中所有动画 重定向动画资产 不要选错&#xff0c;只要绿色 选择目标网格体 选择所有的绿色 动画 导出动画 添加前缀ycn 导出 一定要提前新建好存放的…

scrapy爬取图片

scrapy 爬取图片 环境准备 python3.10scrapy pillowpycharm 简要介绍scrapy Scrapy 是一个开源的 Python 爬虫框架&#xff0c;专为爬取网页数据和进行 Web 抓取而设计。它的主要特点包括&#xff1a; 高效的抓取性能&#xff1a;Scrapy 采用了异步机制&#xff0c;能够高效…

Hadoop3.x 万字解析,从入门到剖析源码

&#x1f496; 欢迎来到我的博客&#xff01; 非常高兴能在这里与您相遇。在这里&#xff0c;您不仅能获得有趣的技术分享&#xff0c;还能感受到轻松愉快的氛围。无论您是编程新手&#xff0c;还是资深开发者&#xff0c;都能在这里找到属于您的知识宝藏&#xff0c;学习和成长…

RabbitMQ介绍与使用

RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;基于 AMQP&#xff08;高级消息队列协议&#xff09;标准&#xff0c;使用 Erlang 编程语言构建。它是消息队列&#xff08;MQ&#xff09;的一种&#xff0c;广泛应用于分布式系统中&#x…

【爬虫】单个网站链接爬取文献数据:标题、摘要、作者等信息

源码链接&#xff1a; https://github.com/Niceeggplant/Single—Site-Crawler.git 一、项目概述 从指定网页中提取文章关键信息的工具。通过输入文章的 URL&#xff0c;程序将自动抓取网页内容 二、技术选型与原理 requests 库&#xff1a;这是 Python 中用于发送 HTTP 请求…

混合专家模型 (MoE)笔记摘要

ref&#xff1a; https://huggingface.co/blog/zh/moe#%E4%BB%80%E4%B9%88%E6%98%AF%E6%B7%B7%E5%90%88%E4%B8%93%E5%AE%B6%E6%A8%A1%E5%9E%8B 简短总结 混合专家模型 (MoEs): 与稠密模型相比&#xff0c; 预训练速度更快 与具有相同参数数量的模型相比&#xff0c;具有更快的…

解决idea中无法拖动tab标签页的问题

1、按 Ctrl Alt S 打开设置&#xff0c;找到路径 File | Settings | Appearance & Behavior | Appearance 2、去掉勾选 Drag-and-drop with Alt pressed only 即可

六、Angular 发送请求/ HttpClient 模块

一、应用 HttpClient 模块 angular/common/http 中的 HttpClient 类基于浏览器提供的 XMLHttpRequest 接口。要想使用 HtpClient 模块&#xff0c;就要先导入 Anqular 的 HttpClientModule。大多数 Web 应用程序都会在根模块 AppModule 中导入它。 编辑 src/app/app.module.ts…

基于单片机的无线智能窗帘控制器的设计

摘 要 : 本文以单片机为控制核心 , 基于 PT2262/ 2272 无线收发模块 , 实现了窗帘的无线远程智能控制 . 该控制器通过高频无线收发模块实现了遥控窗帘的开合控制; 根据外部光线强弱实现自动开关窗帘 ; 根据设定时间自动完成开关过程; 通过语音播报当前环境温湿度信息以…

android刷机

android ota和img包下载地址&#xff1a; https://developers.google.com/android/images?hlzh-cn android启动过程 线刷 格式&#xff1a;ota格式 模式&#xff1a;recovery 优点&#xff1a;方便、简单&#xff0c;刷机方法通用&#xff0c;不会破坏手机底层数据&#xff0…

Vivado中Tri_mode_ethernet_mac的时序约束、分析、调整——(一)时序约束的基本概念

1、基本概念 推荐阅读&#xff0c;Ally Zhou编写的《Vivado使用误区与进阶》系列文章&#xff0c;熟悉基本概念、tcl语句的使用。 《Vivado使用误区与进阶》电子书开放下载&#xff01;&#xff01; 2、Vivado中的语法例程 1&#xff09;语法例程 约束的语句可以参考vivado…

设计模式 行为型 责任链模式(Chain of Responsibility Pattern)与 常见技术框架应用 解析

责任链模式&#xff08;Chain of Responsibility Pattern&#xff09;是一种行为型设计模式&#xff0c;它允许将请求沿着处理者链进行发送。每个处理者对象都有机会处理该请求&#xff0c;直到某个处理者决定处理该请求为止。这种模式的主要目的是避免请求的发送者和接收者之间…