Kafka生产问题总结及性能优化实践

Kafka可视化管理工具kafka-manager

0

安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html

线上环境规划

0

JVM参数设置

kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题

修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,可以如下设置:

export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"

这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长时间,当然,因为像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操作系统的page cache,所以JVM内存不宜分配过大,需要给操作系统的缓存留出几个G。

线上问题及优化

1、消息丢失情况:

消息发送端:

(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。

(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。

(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

消息消费端:

如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。

2、消息重复消费

消息发送端:

发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息

消息消费端:

如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理

一般消费端都是要做消费幂等处理的。

3、消息乱序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了

所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息发送的有序。

kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。

4、消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。

此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。

2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。

此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

5、延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :

1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。

2)订单完成1小时后通知用户进行评价。

实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,...topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

6、消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。

7、分区数越多吞吐量越高吗

可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量

# 往test里发送一百万消息,每条设置1KB # throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间 bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.65.60:9092 acks=1

0

网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。

当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。

注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。

异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535

8、消息传递保障

  • at most once(消费者最多收到一次消息,0--1次):acks = 0 可以实现。
  • at least once(消费者至少收到一次消息,1--多次):ack = all 可以实现。
  • exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实现。

kafka生产者的幂等性:因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。

具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。

PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。 Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。

9、kafka的事务

Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。

kafka的事务处理可以参考官方文档:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "my-transactional-id");Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());//初始化事务producer.initTransactions();try {//开启事务producer.beginTransaction();for (int i = 0; i < 100; i++){//发到不同的主题的不同分区producer.send(new ProducerRecord<>("hdfs-topic", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<>("es-topic", Integer.toString(i), Integer.toString(i)));producer.send(new ProducerRecord<>("redis-topic", Integer.toString(i), Integer.toString(i)));}//提交事务producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.//回滚事务producer.abortTransaction();}producer.close();

10、kafka高性能的原因

  • 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
  • 数据传输的零拷贝
  • 读写数据的批量batch处理以及压缩传输

数据传输零拷贝原理:

0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/174486.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

geoserver去除tif影像黑色的背景的方法

geoserver加载某些tif文件的时候,tif文件本身有黑色的背景,怎么去掉呢? 只要在geoserver中设置就行。 处理方法: 1.新建数据源时要选择ImageMosaic数据源 2,设置"Output Transparent Color" 设置"Output Transparent Color"为黑色(000000),在…

一文详解汽车电子CAN总线

0.什么是CAN总线 CAN总线(控制器区域网络Controller Area Network)是一个中央网络系统&#xff0c;连接不同的电子控制单元(ECU)以及车辆中的其他设备。现在的汽车可以有100个ECU&#xff0c;因此CAN总线通信变得非常重要。 1.CAN总线流行的背景 集中式:CAN总线系统允许对连接…

从 Hash索引、二叉树、B-Tree 与 B+Tree 对比看索引结构选择

从 Hash索引、二叉树、B-Tree 与 BTree 对比看索引结构选择 1、Hash 结构1.1、关于 Hash 数据结构1.2、InnoDB索引为啥不选 Hash 结构1.3、关于InnoDB 提供自适应 Hash 索引 &#xff08;Adaptive Hash Index&#xff09; 2、二叉搜索树3、平衡二叉树&#xff08;AVL树 &#x…

莫名其妙el-table不显示问题

完全复制element-ui中table代码&#xff0c;发现表格仍然不显示&#xff0c;看别人都说让降低版本&#xff0c;可我不想降低啊&#xff0c;不然其他组件有可能用不了&#xff0c;后来发现可以通过配置vite.config.js alias: {: path.resolve(__dirname, src),vue: vue/dist/vue…

<蓝桥杯软件赛>零基础备赛20周--第3周

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周&#xff08;读者可以按…

【脚本笔记】AssetDatabase

AssetDatabase是编辑器下的处理资源操作的重要类&#xff0c;主要用于访问资源并针对资源执行操作的接口。 这里面所有的操作路径都是基于Unity项目的相对路径也就是Assets/xxx或者Assets/xxx.jpg这种。CacheServer 主要解决的是缩短大型团队导入资源的时间。当配置后&#xff…

V90伺服调试软件使用介绍(SINAMICS V-ASSISTANT Commissioning tool)

V90伺服驱动器调试软件SINAMICS V-ASSISTANT Commissioning tool下载地址如下: 西门子官网 选型|资料https://www.ad.siemens.com.cn/productportal/prods/sinamics%20v90/00_selectionziliao.htmlCSDN网址 https://download.csdn.net/download/m0_46143730/88485182

【并发编程】进程与线程

主要知识点&#xff1a; 进程和线程的概念 并行和并发的概念 线程基本应用 一、进程与线程 进程 程序由指令和数据组成&#xff0c;但这些指令要运行&#xff0c;数据要读写&#xff0c;就必须将指令加载至 CPU&#xff0c;数据加载至内存。在指令运行过程中还需要用到磁盘、…

element-plus DateTimePicker日期选择器,限制指定日期和时间不可选择

element-plus日期选择器&#xff0c;在指定日期时间前不可选择。 限制日期选择&#xff0c;使用disabled-date 限制小时选择&#xff0c;使用disabled-hours 限制分钟选择&#xff0c;使用disabled-minutes 限制毫秒选择&#xff0c;使用disabled-seconds 指定日期当天的时间有…

ECCV 22丨BUTD-DETR:图像和点云的语言标定Transformer

来源&#xff1a;投稿 作者&#xff1a;橡皮 编辑&#xff1a;学姐 论文链接&#xff1a;https://arxiv.org/abs/2112.08879[1] 主页链接&#xff1a;https://github.com/nickgkan/butd\_detr[2] 摘要&#xff1a; 在二维和三维场景中&#xff0c;大多数模型的任务都是将指涉…

GoLand GC(垃圾回收机制)简介及调优

GC(Garbage Collector)垃圾回收机制及调优 简单理解GC机制 其实gc机制特别容易理解&#xff0c;就是物理内存的自动清理工。我们可以把内存想象成一个房间&#xff0c;程序运行时会在这个房间里存放各种东西&#xff0c;但有时候我们会忘记把不再需要的东西拿出去&#xff0c…

Windows下Redis配置密码,并设置自启动

1. 解压redis修改配置文件&#xff08;设置密码&#xff09; 打开Redis的配置文件&#xff0c;通常位于Redis安装目录下的 redis.windows.conf 去掉"requirepass"前注释&#xff0c;修改"foobared"为要设置的密码 requirepass foobared2.注册服务&#x…

Mybatis-Plus通用枚举功能 [MyBatis-Plus系列] - 第493篇

历史文章&#xff08;文章累计490&#xff09; 《国内最全的Spring Boot系列之一》 《国内最全的Spring Boot系列之二》 《国内最全的Spring Boot系列之三》 《国内最全的Spring Boot系列之四》 《国内最全的Spring Boot系列之五》 《国内最全的Spring Boot系列之六》 S…

JAVA数据类型和变量

一、字面常量 常量即程序运行期间&#xff0c;固定不变,不可修改的量称为常量。 public class Demo{public static void main(String[] args){System.Out.println("hello world!");System.Out.println(100);System.Out.println(3.14);System.Out.println(A);System…

Linux--进程替换

1.什么是进程替换 在fork函数之后&#xff0c;父子进程各自执行代码的一部分&#xff0c;但是如果子进程想要执行一份全新的程序呢&#xff1f; 通过进程替换来完成&#xff0c;进程替换就是父子进程代码发生写时拷贝&#xff0c;子进程执行自己的功能。 程序替换就是通过特定的…

maven环境变量,安装源,本地仓库配置

1. maven环境变量 我这里用的是idea自带的maven 数值为&#xff1a; D:\software\computer_software\java\IDEAJ\IDEAJ2021.2.1\IntelliJ IDEA 2021.2.1\plugins\maven\lib\maven3\bin 2. 安装源更换为阿里云&#xff08;我不知道清华源是什么网址&#xff0c;网上也没查到&am…

目标检测算法发展史

前言 比起图像识别&#xff0c;现在图片生成技术要更加具有吸引力&#xff0c;但是要步入AIGC技术领域&#xff0c;首先不推荐一上来就接触那些已经成熟闭源的包装好了再提供给你的接口网站&#xff0c;会使用别人的模型生成一些图片就能叫自己会AIGC了吗&#xff1f;那样真正…

Linux学习第25天:Linux 阻塞和非阻塞 IO 实验(二): 挂起

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 为方便和上一节的衔接&#xff0c;在正式开始学习前&#xff0c;先把本节的思维导图引入&#xff1a; 二、阻塞IO实验 1.硬件原理图分析 2.实验程序 #define I…

【排序算法】 归并排序详解!分治思想!

&#x1f3a5; 屿小夏 &#xff1a; 个人主页 &#x1f525;个人专栏 &#xff1a; 算法—排序篇 &#x1f304; 莫道桑榆晚&#xff0c;为霞尚满天&#xff01; 文章目录 &#x1f4d1;前言&#x1f324;️归并排序的思想☁️基本思想☁️归并的思想实现☁️分治法 &#x1f3…

国产芯片vs“国际水平”,有距离也有超越!

当前&#xff0c;国产芯片正在迎来全新的发展阶段。国产终端芯片性能怎么样&#xff0c;与国际主流产品相比&#xff0c;表现如何&#xff1f;今天笔者就针对目前热度较高的四款国产CPU进行参数分析与性能跑分横向对比。 此次国产芯片评测型号分别是海光C86-3250、龙芯3A5000H…