Kafka知识总结(事务+数据存储+请求模型+常见场景)

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

文章收录在网站:http://hardyfish.top/

在这里插入图片描述

事务

事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。

开启enable.idempotence = true

设置Producer端参数transctional.id

数据的发送需要放在beginTransaction和commitTransaction之间。

Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。

producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。

数据存储

Kafka 消息以 Partition 作为存储单元,每个 Topic 的消息被一个或者多个 Partition 进行管理。

  • Partition 是一个有序的,不变的消息队列,消息总是被追加到尾部。
  • 一个 Partition 不能被切分成多个散落在多个 Broker 上或者多个磁盘上。

Partition 又划分成多个 Segment 来组织数据。

Segment 在它的下面还有两个组成部分:

  • 索引文件:以 .index 后缀结尾,存储当前数据文件的索引。
  • 数据文件:以 .log 后缀结尾,存储当前索引文件名对应的数据文件。

在这里插入图片描述

请求模型

在这里插入图片描述

请求到Broker后,也会通过类似于请求转发的组件Acceptor转发到对应的工作线程上,Kafka中被称为网络线程池,一般默认每个Broker上为3个工作线程,可以通过参数 num.network.threads 进行配置。

并且采用轮询的策略,可以很均匀的将请求分发到不同的网络线程中进行处理。

但是实际的处理请求并不是由网络线程池进行处理的,而是会交给后续的IO线程池,当网络线程接受到请求的时候,会将请求写入到共享的请求队列中,而IO线程池会进行异步的处理,默认情况下是8个,可以通过 num.io.threads 进行配置。

常见场景

重复消费

consumer 在消费过程中,应用进程被强制kill掉或发生异常退出。

例如在一次poll500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。

下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。

消费者消费时间过长。

max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起 离开组 的请求,Coordinator 也会开启新一轮 Rebalance。

因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。

提高消费能力,提高单条消息的处理速度;根据实际场景max.poll.interval.ms值设置大一点,避免不必要的rebalance;

可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。

消息丢失

消费者程序丢失数据

Consumer 程序从 Kafka 获取到消息后开启了多个线程异步处理消息,而 Consumer 程序自动地向前更新位移

假如某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于 Consumer 而言实际上是丢失了。

最佳配置:

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)

设置 acks = all:

  • 设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是 已提交。

设置 retries 为一个较大的值。

  • 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

设置 unclean.leader.election.enable = false

设置 replication.factor >= 3

  • 防止消息丢失的主要机制就是冗余。

设置 min.insync.replicas > 1

  • 控制的是消息至少要被写入到多少个副本才算是 已提交 。
  • 设置成大于 1 可以提升消息持久性。
  • 在实际环境中千万不要使用默认值 1。

确保 replication.factor > min.insync.replicas

  • 如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。

确保消息消费完成再提交。

  • Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。

消息顺序

乱序场景一

因为一个topic可以有多个partition,kafka只能保证partition内部有序。

1、可以设置topic 有且只有一个partition。

2、根据业务需要,需要顺序的指定为同一个partition。

乱序场景二

对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序。

消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387697.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从零到一:用Go语言构建你的第一个Web服务

使用Go语言从零开始搭建一个Web服务,包括环境搭建、路由处理、中间件使用、JSON和表单数据处理等关键步骤,提供丰富的代码示例。 关注TechLead,复旦博士,分享云服务领域全维度开发技术。拥有10年互联网服务架构、AI产品研发经验、…

【HadoopShuffle原理剖析】基础篇二

Shuffle原理剖析 Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。 Shuffle过程 Map端的Shuffle Map的输出结果首先被缓存到内存,当缓存区容量到达80%(缓冲区默认100MB&#xff…

通过进程协作显示图像-C#

前言 如果一个软件比较复杂或者某些情况下需要拆解,可以考试将软件分解成两个或多个进程,但常规的消息传递又不能完全够用,使用消息共享内存,实现图像传递,当然性能这个方面我并没有测试,仅是一种解决思路…

Tekion 选择 ClickHouse Cloud 提升应用性能和指标监控

本文字数:4187;估计阅读时间:11 分钟 作者:ClickHouse team 本文在公众号【ClickHouseInc】首发 Tekion 由前 Tesla CIO Jay Vijayan 于 2016 年创立,利用大数据、人工智能和物联网等技术,为其汽车客户解决…

如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移

01 引言 随着大数据技术的飞速发展,Apache Kafka 作为一种高吞吐量、低延迟的分布式消息系统,已经成为企业实时数据处理的核心组件。然而,随着业务的扩展和技术的发展,企业面临着不断增加的存储成本和运维复杂性问题。为了更好地…

【数据中台】大数据管理平台建设方案(原件资料)

建设大数据管理中台,按照统一的数据规范和标准体系,构建统一数据采集﹣治理﹣共享标准、统一技术开发体系、统一接口 API ,实现数据采集、平台治理,业务应用三层解耦,并按照统一标准格式提供高效的…

electron安装及快速创建

electron安装及快速创建 electron是一个使用 JavaScript、HTML 和 CSS 构建桌面应用程序的框架。 详细内容见官网:https://www.electronjs.org/zh/docs/latest/。 今天来记录下练习中的安装过程和hello world的创建。 创建项目文件夹,并执行npm 初始化命…

ubuntu安装tar安装 nginx最新版本

一、需要先安装依赖 apt install gcc libpcre3 libpcre3-dev zlib1g zlib1g-dev openssl libssl-dev 二、上传安装包 并解压 下载地址 nginx news tar xvf nginx-1.25.2.tar.gz 进入nginx cd nginx-1.25.2 三、编译 ./configure --prefix=/usr/local/nginx --with-htt…

Dolphinscheduler 3.2.1bug记录

问题1:分页只展示首页 解决方案: [Bug][API] list paging missing totalpage by Gallardot Pull Request #15619 apache/dolphinscheduler GitHub 问题2:Hive 数据源连接失败 解决方案:修改源码:HiveDataSourceProcessor.cla…

《深度RAG系列》 LLM 为什么选择了RAG

2023年是AIGC(Artificial Intelligence Generated Content)元年,这一年见证了人工智能生成内容领域的巨大飞跃,特别是大模型的爆发,它们在自然语言处理、图像生成、音频处理等多个领域展现出了惊人的能力。 这些预训练…

数据结构和算法入门

1.了解数据结构和算法 1.1 二分查找 二分查找(Binary Search)是一种在有序数组中查找特定元素的搜索算法。它的基本思想是将数组分成两半,然后比较目标值与中间元素的大小关系,从而确定应该在左半部分还是右半部分继续查找。这个…

花8000元去培训机构学习网络安全值得吗,学成后就业前景如何?

我就是从培训机构学的网络安全,线下五六个月,当时学费不到一万,目前已成功入行。所以,只要你下决心要入这一行,过程中能好好学,那这8000就花得值~ 因为只要学得好,工作两个多月就能赚回学费&am…

浅谈取样器之OS进程取样器

浅谈取样器之OS进程取样器 JMeter 的 OS 进程取样器(OSProcess Sampler)允许用户在 JMeter 测试计划中直接执行操作系统命令或脚本。这一功能对于需要集成系统级操作到性能测试场景中尤为有用,比如运行数据库备份脚本、调用系统维护命令或执…

存储引擎MyISAM和InnoDB

存储引擎:创建、查询、更新、删除 innoDB:64T、支持事物、不支持全文索引、支持缓存、支持外键、行级锁定 MyISAM:256T、不支持事物、支持全文索引、插入和查询速度快 memory:内存、不支持事物、不支持全文索引,临时…

不得不安利的程序员开发神器,太赞了!!

作为一名程序员,你是否常常为繁琐的后端服务而感到头疼?是否希望有一种工具可以帮你简化开发流程,让你专注于创意和功能开发?今天,我要向大家隆重推荐一款绝佳的开发神器——MemFire Cloud。它专为懒人开发者准备&…

KVM高级功能部署

KVM(Kernel-based Virtual Machine)是一个在Linux内核中实现的全虚拟化解决方案。除了基本的虚拟化功能外,KVM还提供了许多高级功能,以增强其性能、安全性和灵活性。以下是一些KVM的高级功能: 硬件加速: In…

基于Deap遗传算法在全量可转债上做因子挖掘(附python代码及全量因子数据)

原创文章第604篇,专注“AI量化投资、世界运行的规律、个人成长与财富自由"。 在4.x的时候,咱们分享过deap遗传算法挖掘因子的代码和数据,今天我们来升级到5.x中。 源码发布Quantlab4.2,Deap因子挖掘|gplearn做不到的咱们也…

全新微软语音合成网页版源码,短视频影视解说配音网页版系统-仿真人语音

源码介绍 最新微软语音合成网页版源码,可以用来给影视解说和短视频配音。它是TTS文本转语言,API接口和PHP源码。 这个微软语音合成接口的源码,超级简单,就几个文件搞定。用的是官方的API,试过了,合成速度…

Datawhale AI夏令营 AI+逻辑推理 Task2总结

Datawhale AI夏令营 AI逻辑推理 Task2总结 一、大语言模型解题方案介绍 1.1 大模型推理介绍 ​ 推理是建立在训练完成的基础上,将训练好的模型应用于新的、未见过的数据,模型利用先前学到的规律进行预测、分类和生成新内容,使得AI在实际应…

力扣SQL50 换座位

Problem: 626. 换座位 👨‍🏫 参考题解 Code SELECT(CASEWHEN MOD(id, 2) ! 0 AND counts ! id THEN id 1WHEN MOD(id, 2) ! 0 AND counts id THEN idELSE id - 1END) AS id,student FROMseat,(SELECTCOUNT(*) AS countsFROMseat) AS seat_counts O…