kafka如何保证消息不丢?

概述

我们知道Kafka架构如下,主要由 Producer、Broker、Consumer 三部分组成。一条消息从生产到消费完成这个过程,可以划分三个阶段,生产阶段、存储阶段、消费阶段。

图片

  • 产阶段: 在这个阶段,从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。

  • 存储阶段: 在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。

  • 消费阶段: 在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到Consumer上。

那么如何保证消息不丢我们可以从这三部分来分析。

消息传递语义

在深度剖析消息丢失场景之前,我们先来聊聊「消息传递语义」到底是个什么玩意?

所谓的消息传递语义是 Kafka 提供的 Producer 和 Consumer 之间的消息传递过程中消息传递的保证性。主要分为三种, 如下图所示:

  1. 1. 首先当 Producer 向 Broker 发送数据后,会进行 commit,如果 commit 成功,由于 Replica 副本机制的存在,则意味着消息不会丢失,但是 Producer 发送数据给 Broker 后,遇到网络问题而造成通信中断,那么 Producer 就无法准确判断该消息是否已经被提交(commit),这就可能造成 at least once 语义。

  2. 2. 在 Kafka 0.11.0.0 之前, 如果 Producer 没有收到消息 commit 的响应结果,它只能重新发送消息,确保消息已经被正确的传输到 Broker,重新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本之后, Producer 支持幂等传递选项,保证重新发送不会导致消息在日志出现重复。为了实现这个, Broker 为 Producer 分配了一个ID,并通过每条消息的序列号进行去重。也支持了类似事务语义来保证将消息发送到多个 Topic 分区中,保证所有消息要么都写入成功,要么都失败,这个主要用在 Topic 之间的 exactly once 语义。 其中启用幂等传递的方法配置enable.idempotence = true。 启用事务支持的方法配置:设置属性 transcational.id = "指定值"

  3. 3. 从 Consumer 角度来剖析, 我们知道 Offset 是由 Consumer 自己来维护的, 如果 Consumer 收到消息后更新 Offset, 这时 Consumer 异常 crash 掉, 那么新的 Consumer 接管后再次重启消费,就会造成 at most once 语义(消息会丢,但不重复)。

  4. 4. 如果 Consumer 消费消息完成后, 再更新 Offset,如果这时 Consumer crash 掉,那么新的 Consumer 接管后重新用这个 Offset 拉取消息, 这时就会造成 at least once 语义(消息不丢,但被多次重复处理)。

总结: 默认 Kafka 提供「at least once」语义的消息传递,允许用户通过在处理消息之前保存 Offset的方式提供 「at mostonce」 语义。如果我们可以自己实现消费幂等,理想情况下这个系统的消息传递就是严格的「exactly once」, 也就是保证不丢失、且只会被精确的处理一次,但是这样是很难做到的。

接下来我们从生产阶段、存储阶段、消费阶段这几方面看下kafka如何保证消息不丢失。

生产阶段

通过深入解析Kafka消息发送过程我们知道Kafka生产者异步发送消息并返回一个Future,代表发送结果。首先需要我们获取返回结果判断是否发送成功。

// 异步发送消息,并设置回调函数 
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) { System.err.println("消息发送失败: " + exception.getMessage()); } else { System.out.println("消息发送成功到主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset()); } } 
});

消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。

Producer(生产者)保证消息不丢失的方法:

  1. 1. 发送确认机制:Producer可以使用Kafka的acks参数来配置发送确认机制。通过设置合适的acks参数值,Producer可以在消息发送后等待Broker的确认。确认机制提供了不同级别的可靠性保证,包括:

    • • acks=0:Producer在发送消息后不会等待Broker的确认,这可能导致消息丢失风险。

    • • acks=1:Producer在发送消息后等待Broker的确认,确保至少将消息写入到Leader副本中。

    • • acks=all或acks=-1:Producer在发送消息后等待Broker的确认,确保将消息写入到所有ISR(In-Sync Replicas)副本中。这提供了最高的可靠性保证。

  2. 2. 消息重试机制:Producer可以实现消息的重试机制来应对发送失败或异常情况。如果发送失败,Producer可以重新发送消息,直到成功或达到最大重试次数。重试机制可以保证消息不会因为临时的网络问题或Broker故障而丢失。

Broker存储阶段

正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

在kafka高性能设计原理中我们了解到kafka为了提高性能用到了 Page Cache 技术.在读写磁盘日志文件时,其实操作的都是内存,然后由操作系统决定什么时候将 Page Cache 里的数据真正刷入磁盘。如果内存中数据还未刷入磁盘服务宕机了,这个时候还是会丢消息的。

为了最大程度地降低数据丢失的可能性,我们可以考虑以下方法:

  1.  持久化配置优化:可以通过调整 Kafka 的持久化配置参数来控制数据刷盘的频率,从而减少数据丢失的可能性。例如,可以降低 flush.messages 和 flush.ms 参数的值,以更频繁地刷写数据到磁盘。

  2.  副本因子增加:在 Kafka 中,可以为每个分区设置多个副本,以提高数据的可靠性。当某个 broker 发生故障时,其他副本仍然可用,可以避免数据丢失。

  3. 使用acks=all:在生产者配置中,设置 acks=all 可以确保消息在所有ISR(In-Sync Replicas)中都得到确认后才被视为发送成功。这样可以确保消息被复制到多个副本中,降低数据丢失的风险。

  4. 备份数据:定期备份 Kafka 的数据,以便在发生灾难性故障时可以进行数据恢复。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

  1. 自动提交位移:Consumer可以选择启用自动提交位移的功能。当Consumer成功处理一批消息后,它会自动提交当前位移,标记为已消费。这样即使Consumer发生故障,它可以使用已提交的位移来恢复并继续消费之前未处理的消息。

  2. 手动提交位移:Consumer还可以选择手动提交位移的方式。在消费一批消息后,Consumer可以显式地提交位移,以确保处理的消息被正确记录。这样可以避免重复消费和位移丢失的问题。

下面是手动提交位移的例子:

// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题
consumer.subscribe(Collections.singletonList(topic));try {while (true) {// 消费消息ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// 处理消息逻辑System.out.println("消费消息:Topic = " + record.topic() +", Partition = " + record.partition() +", Offset = " + record.offset() +", Key = " + record.key() +", Value = " + record.value());// 手动提交位移TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());OffsetAndMetadata offsetMetadata = new OffsetAndMetadata(record.offset() + 1);consumer.commitSync(Collections.singletonMap(topicPartition, offsetMetadata));}}
} catch (Exception e) {e.printStackTrace();
} finally {consumer.close();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/257362.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mac IDEA基础配置和激活+maven配置+scala插件导入+scala文件打包

文章目录 下载IDEA通过插件激活下载Maven在IDEA上配置Maven在IDEA上加载Scala插件在IDEA中创建Maven项目在IDEA上通过Maven打包scala文件 下载IDEA通过插件激活 IDEA从这里下载&#xff0c;下载首次登陆需要创建一个IntelliJ账号&#xff0c;登陆后点击start trail开启一个月的…

Linux:搭建docker私有仓库(registry)

当我们内部需要存储镜像时候&#xff0c;官方提供了registry搭建好直接用&#xff0c;废话少说直接操作 1.下载安装docker 在 Linux 上安装 Docker Desktop |Docker 文档https://docs.docker.com/desktop/install/linux-install/安装 Docker 引擎 |Docker 文档https://docs.do…

【教程】C++语言基础学习笔记(九)——指针

写在前面&#xff1a; 如果文章对你有帮助&#xff0c;记得点赞关注加收藏一波&#xff0c;利于以后需要的时候复习&#xff0c;多谢支持&#xff01; 【C语言基础学习】系列文章 第一章 《项目与程序结构》 第二章 《数据类型》 第三章 《运算符》 第四章 《流程控制》 第五章…

【学网攻】 第(27)节 -- HSRP(热备份路由器协议)

系列文章目录 目录 系列文章目录 文章目录 前言 一、HSRP(热备份路由器协议)是什么&#xff1f; 二、实验 1.引入 实验目标 实验背景 技术原理 实验步骤 实验设备 实验拓扑图 实验配置 实验验证 文章目录 【学网攻】 第(1)节 -- 认识网络【学网攻】 第(2)节 -- 交…

FT2232调试记录(3)

FT2232调试记录&#xff08;1&#xff09;: FT2232调试记录&#xff08;2&#xff09;: FT2232调试记录&#xff08;3&#xff09;: FT2232 SPI读写函数: 参照SPI提供的文档&#xff1a; 工程&#xff1a; SPI 写函数&#xff1a; FT_STATUS write_byte(FT_HANDLE handle…

tkinter-TinUI-xml实战(10)展示画廊

tkinter-TinUI-xml实战&#xff08;10&#xff09;展示画廊 引言声明文件结构核心代码主界面统一展示控件控件展示界面单一展示已有展示多类展示 最终效果在这里插入图片描述 ![](https://img-blog.csdnimg.cn/direct/286fcaa2fa5648a992a0ac79b4efad82.png) ………… 结语 引言…

问题:3【单选题】实现职业理想的一般步骤是()。 #媒体#媒体

问题&#xff1a;3【单选题】实现职业理想的一般步骤是()。 A、创业-立业-择业 B、择业-创业-立业 C、择业-立业-创业 D、立业-择业-创业 参考答案如图所示

Zabbix图形中文乱码问题(显示口口)解决办法

一 切换到zabbix安装目录assets/fonts下&#xff0c;下载字体 这里使用是nginxphp作为zabbix-web展示&#xff0c;使用find 命令查找 进入目录下&#xff0c;将原有字体备份&#xff0c;下载msyh字体 wget https://www.xxshell.com/download/sh/zabbix/ttf/msyh.ttf 二 修改配…

华为OD机试 - 分配土地( Python C C++ JavaGo JS PHP)

题目描述 从前有个村庄&#xff0c;村民们在各种田地上插上小旗子&#xff0c;每个旗子上都标识了一个数字。现在&#xff0c;村民们想要找出一个包含相同数字的最小矩形区域&#xff0c;并将这块土地分配给对村庄做出巨大贡献的村民。我们需要找出这个矩形区域的最大面积。 …

修改npm 的运行命令详解

在Node.js和npm中&#xff0c;你可以通过修改package.json文件中的scripts部分来定义和运行自定义的npm脚本。这些脚本可以是任何你希望在项目中运行的命令&#xff0c;包括启动服务器、运行测试、构建项目等。下面是一些修改npm运行命令的详解和代码示例。 修改npm运行命令的…

python 基础知识点(蓝桥杯python科目个人复习计划41)

今日复习内容&#xff1a;动态规划&#xff08;基础&#xff09; 动态规划是一种解决多阶段决策过程中最优化问题的数学方法和算法思想。它通常用于解决具有重叠子问题和最优子结构性质的问题&#xff0c;通常将问题划分为相互重叠的子问题&#xff0c;利用子问题的解来求解原…

Git分支和迭代流程

Git分支 feature分支&#xff1a;功能分支 dev分支&#xff1a;开发分支 test分支&#xff1a;测试分支 master分支&#xff1a;生产环境分支 hotfix分支&#xff1a;bug修复分支。从master拉取&#xff0c;修复并测试完成merge回master和dev。 某些团队可能还会有 reale…

移动机器人激光SLAM导航(五):Cartographer SLAM 篇

参考 Cartographer 官方文档Cartographer 从入门到精通 1. Cartographer 安装 1.1 前置条件 推荐在刚装好的 Ubuntu 16.04 或 Ubuntu 18.04 上进行编译ROS 安装&#xff1a;ROS学习1&#xff1a;ROS概述与环境搭建 1.2 依赖库安装 资源下载完解压并执行以下指令 https://pa…

Vue项目创建和nodejs使用

Vue项目创建和nodejs使用 一、环境准备1.1.安装 node.js【下载历史版本node-v14.21.3-x64】1.2.安装1.3.检查是否安装成功&#xff1a;1.4.在Node下新建两个文件夹 node_global和node_cache并设置权限1.5.配置npm在安装全局模块时的路径和缓存cache的路径1.6.配置系统变量&…

Linux多线程[二]

引入知识 进程在线程内部执行是OS的系统调度单位。 内核中针对地址空间&#xff0c;有一种特殊的结构&#xff0c;VM_area_struct。这个用来控制虚拟内存中每个malloc等申请的空间&#xff0c;来区别每个malloc的是对应的堆区哪一段。OS可以做到资源的精细度划分。 对于磁盘…

C#调用WechatOCR.exe实现本地OCR文字识别

最近遇到一个需求&#xff1a;有大量的扫描件需要还原为可编辑的文本&#xff0c;很显然需要用到图片OCR识别为文字技术。本来以为这个技术很普遍的&#xff0c;结果用了几个开源库&#xff0c;效果不理想。后来&#xff0c;用了取巧的方法&#xff0c;直接使用了WX的OCR识别模…

算法与数据结构

算法与数据结构 前言 什么是算法和数据结构&#xff1f; 你可能会在一些教材上看到这句话&#xff1a; 程序 算法 数据结构 算法&#xff08;Algorithm&#xff09;&#xff1a;是指解题方案的准确而完整的描述&#xff0c;是一系列解决问题的清晰指令&#xff0c;算法代…

【Tomcat】:One or more listeners failed to start.报错解决方案

报错信息:One or more listeners failed to start. Full details will be found in the appropriate container log file. 具体就是web.xml此配置报错: 服务器启动错误Tomcat:One or more listeners failed to start.报错解决方案 IDEA:在使用IDEA运行SSM项目的时候 , Tomcat运…

ChatGpt报错:We ran into an issue while authenticating you解决办法

在登录ChatGpt时报错&#xff1a;Oops&#xff01;,We ran into an issue while authenticating you.(我们在验证您时遇到问题)&#xff0c;记录一下解决过程。 完整报错&#xff1a; We ran into an issue while authenticating you. If this issue persists, please contact…

怎么使用ChatGPT提高工作效率?

怎么使用ChatGPT提高工作效率&#xff0c;这是一个有趣的话题。 相信不同的人有不同的观点&#xff0c;大家的知识背景和从事的工作都不完全相同&#xff0c;所以最终ChatGPT能起到的作用也不一样。 在编程过程中&#xff0c;如果我们要找一个库&#xff0c;我们最先做的肯定…