面试官:聊聊kafka线上使用会有哪些问题?

哪些环节会造成消息丢失?

首先说说哪些环节会丢消息

  • 消息生产者:

(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消 息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。

(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消 息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。

(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一 个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果 min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。

  • 消息消费端

如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了。

怎么保证消息不丢失?

生产端:消息发送+回调

伪代码

消费端:业务处理完后手动提交

消息重复消费

消息发送端

发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加 上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。

消息消费端

如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重 复处理

一般消费端都是要做消费幂等处理的。 比如分布式锁、全局唯一id

at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。

at least once(消费者至少收到一次消息,1-多次):ack = all 可以实现。

exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实

现。

消息顺序

如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第 一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了 所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送 端到消费端全链路有序。

kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较 低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消 息。

消息积压

1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。 此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分 区),然后再启动多个消费者同时消费新主题的不同分区。

2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。 此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

kafka高性能原因

  • 磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾, 不会写入文件中的某个位置(随机写)保证了磁盘顺序写。

  • 数据传输的零拷贝

  • 读写数据的批量batch处理以及压缩传输

传统文件复制方式: 需要对文件在内存中进行四次拷贝。

零拷贝: 有两种方式, mmap和transfile

Java当中对零拷贝进行了封装, Mmap方式通过MappedByteBuffer对象进行操作,而transfile通过FileChannel来进行操作。

Mmap 适合比较小的文件,通常文件大小不要超过1.5G ~2G 之间。

Transfile没有文件大小限制

在kafka当中,他的index日志文件也是通过mmap的方式来读写的。在其他日志文件当中,并没有使用零拷贝的方式。

kafka使用transfile方式将硬盘数据加载到网卡。

延时队列

延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者

才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :

1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,

这时就可以使用延时队列来处理这些订单了。

2)订单完成1小时后通知用户进行评价。

发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对 应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

生产者参数:

    Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");/*发出消息持久化机制参数(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。*/props.put(ProducerConfig.ACKS_CONFIG, "1");/*发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理*/props.put(ProducerConfig.RETRIES_CONFIG, 3);//重试间隔设置props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);//设置发送消息的本地缓冲区,如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MBprops.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);/*kafka本地线程会从缓冲区取数据,批量发送到broker,设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去*/props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);/*默认值是0,意思就是消息必须立即被发送,但这样会影响性能一般设置10毫秒左右,就是说这个消息发送完后会进入本地的一个batch,如果10毫秒内,这个batch满了16kb就会随batch一起被发送出去如果10毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长*/props.put(ProducerConfig.LINGER_MS_CONFIG, 10);//把发送的key从字符串序列化为字节数组props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//把发送消息value从字符串序列化为字节数组props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<String, String>(props);int msgNum = 5;final CountDownLatch countDownLatch = new CountDownLatch(msgNum);for (int i = 1; i <= msgNum; i++) {Order order = new Order(i, 100 + i, 1, 1000.00);//指定发送分区/*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order));*///未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));//等待消息发送成功的同步阻塞方法RecordMetadata metadata = producer.send(producerRecord).get();System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());//异步回调方式发送消息/*producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("发送消息失败:" + exception.getStackTrace());}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}countDownLatch.countDown();}});*///送积分 TODO}

消费者参数:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, “192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094”);
// 消费分组名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
// 是否自动提交offset,默认就是true
/props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “true”);
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, “1000”);
/
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);
/*
当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费
latest(默认) :只消费自己启动之后发送到主题的消息
earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
/
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, “earliest”);
/

consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
rebalance方案下发给consumer,这个时间可以稍微短一点
/
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
/

服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,
对应的Partition也会被重新分配给其他consumer,默认是10秒
*/
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

    //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);/*如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费*/props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费指定分区//consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));//消息回溯消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*///指定offset消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*///从指定时间点开始消费/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);//从1小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();//根据消费里的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}*/while (true) {/** poll() API 是拉取消息的长轮询*/ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了//consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑/*consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.println("Commit failed exception: " + exception.getStackTrace());}}});*/}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/176243.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关键点检测、姿态识别、目标检测、车牌识别等项目部署代码+数据集汇总

一、AI健身计数 1、图片视频检测 &#xff08;cpu运行&#xff09;&#xff1a; 注&#xff1a;左上角为fps&#xff0c;左下角为次数统计。 1.哑铃弯举&#xff1a;12&#xff0c;14&#xff0c;16 详细环境安装教程&#xff1a;pyqt5AI健身CPU实时检测mediapipe 可视化界面…

VS LiveShare使用操作介绍

VS LiveShare的使用教程 文章简介下载过程 文章简介 本篇文章主要介绍了如何安装和使用LiveShare的过程。 下载过程 1.在扩展->管理扩展&#xff0c;搜索Live Share后&#xff0c;下载对应的安装包&#xff0c;安装后对VS进行重启 2.安装后界面右上角会出现Live Share标…

docker环境安装+maven依赖继承问题

1&#xff0c;docker环境安装 我们使用yum指令进行安装&#xff0c;分别cmd运行&#xff1a; yum install -y yum-utils device-mapper-persistent-data lvm2 yum-contig-manager --add-repo https://download.docker.com/linux/centos/docker-ce.rep具体解释如下&#xff1a;…

目标检测中常见指标 - mAP

文章目录 1. 评价指标2. 计算示例3. COCO评价指标 1. 评价指标 在目标检测领域&#xff0c;比较常用的两个公开数据集&#xff1a;pascal voc和coco。 目标检测与图像分类明显差距是很大的&#xff0c;在图像分类中&#xff0c;我们通常是统计在验证集当中&#xff0c;分类正…

【Azure】存储服务:Azure 的存储账户

文章目录 一、前提知识&#xff08;建议了解&#xff09;二、介绍 Azure 存储帐户三、使用 Microsoft Azure 门户创建存储帐户 一、前提知识&#xff08;建议了解&#xff09; 在每一个云厂商中&#xff0c;都有自身的云存储&#xff0c;也有根据不同功能进行区分的不同类型的…

SPSS两独立样本t检验

前言&#xff1a; 本专栏参考教材为《SPSS22.0从入门到精通》&#xff0c;由于软件版本原因&#xff0c;部分内容有所改变&#xff0c;为适应软件版本的变化&#xff0c;特此创作此专栏便于大家学习。本专栏使用软件为&#xff1a;SPSS25.0 本专栏所有的数据文件请点击此链接下…

循环神经网络(RNN)与长短期记忆网络(LSTM)

前言&#xff1a; 通过前面的学习&#xff0c;我们以BP神经网络为基础&#xff0c;认识到了损失函数&#xff0c;激活函数&#xff0c;以及梯度下降法的原理&#xff1b;而后学习了卷积神经网络&#xff0c;知道图像识别是如何实现的。今天这篇文章&#xff0c;讲述的就是计算机…

【Linux】第七站:vim的使用以及配置

文章目录 一、vim1.vim的介绍2.vim基本使用3.vim的命令模式常用命令4.底行模式 二、vim的配置 一、vim 1.vim的介绍 vim编辑器&#xff0c;用来文本编写&#xff0c;可以写代码 它是一个多模式的编辑器 它有很多的模&#xff0c;不过我们暂时先只考虑这三种模式 命令模式插入模…

v-bind动态改变样式

通过v-bind切换样式&#xff0c;:class"{ active:true}"为true展示样式&#xff0c;false不展示。也可以由:style"{ width:percent %}"动态控制宽度。 注意后面是JS对象&#xff0c;所以后面的值不可以包含-&#xff0c;比如background-color会解析出错&a…

Windows下多Chrome谷歌浏览器版本共存

场景 某些年代久远的 WEB 应用&#xff0c;必须在指定的浏览器或版本才能正常运行&#x1f602;&#xff0c;此时就需要多个版本 chrome 浏览器共存。 解决方案 下载指定版本 可以从 https://www.chromedownloads.net/ 下载需要的版本&#xff0c;此处下载的是87.0.4280.14…

C++之string

C之string #include <iostream>using namespace std;/*string();//创建一个空的字符串string(const char* s);//使用字符串s初始化string(const string& str);//使用一个string对象初始化另外一个string对象string(int n,char c);//使用n个字符c初始化*/void test1()…

带你从0开始学习自动化框架Airtest

现在市面上做UI自动化的框架很多&#xff0c;包括我们常用的Web自动化框架Selenium&#xff0c;移动端自动化框架Appium。 虽然Selenium和Appium分属同源&#xff0c;而且API都有很多相同的地方&#xff0c;可以无损耗切换&#xff0c;但是还是需要引入不同的库&#xff0c;而…

在NISQ小型计算机上执行大型并行量子计算的可能性

简介 Steve White提出了密度矩阵重整化群&#xff08;DMRG&#xff09;的基本思想&#xff0c;即纠缠是一种有价值的资源&#xff0c;可以用来精确或近似地描述大量子系统。后来&#xff0c;这一思想被理解为优化矩阵积状态&#xff08;MPS&#xff09;的算法&#xff0c;支持…

喜报!CACTER邮件安全网关荣获2023鲲鹏应用创新大赛广东赛区三等奖

近期&#xff0c;2023鲲鹏应用创新大赛广东赛区暨广东省信息技术应用创新产业联盟创新大赛圆满落幕&#xff0c;Coremail凭借“基于鲲鹏CPU的邮件网关一体机解决方案”&#xff0c;荣获“金融行业方向”三等奖。 ​ 鲲鹏凌粤 展翅湾区 本届大赛广东区域赛以“鲲鹏凌粤 展翅湾…

OpenCV—自动驾驶实时道路车道检测(完整代码)

自动驾驶汽车是人工智能领域最具颠覆性的创新之一。在深度学习算法的推动下,它们不断推动我们的社会向前发展,并在移动领域创造新的机遇。自动驾驶汽车可以去传统汽车可以去的任何地方,并且可以完成经验丰富的人类驾驶员所做的一切。但正确地训练它是非常重要的。自动驾驶汽…

Git工作原理和常见问题处理方案

博客定位Git工作区域工作区域划分暂存区设计目的 Git基本操作核心操作初始化和配置指令 HEAD指针Git版本回滚指令介绍reset模式reset hard使用场景reset soft使用场景reset mixed使用场景reset使用注意事项checkout使用场景 Git分支管理什么是分支分支应用场景分支相关指令被合…

10 MIT线性代数-四个基本子空间 four fundamental subspaces

1. 四个子空间 Four subspaces (mxn) 列空间 Column space C(A) in 零空间Nullspace N(A) in 行空间Row space all combs of rows all combs of columns of AT C(AT) in 左零空间Left nullspace Nullspace of AT N(AT) left nullspace of A in 2. 基和维数 Basis&…

nodejs+vue黄花岗社区核酸检测站-计算机毕业设计python-django-php

对黄花岗社区核酸检测站系统进行大力的研究&#xff0c;主要是因为黄花岗社区核酸检测站系统对于社区的推进有着十分重要的作用&#xff0c; 对于社区的管理来说&#xff0c;黄花岗社区核酸检测站系统是十分有效的一个途径&#xff0c;也正是因为这样的特殊性使得在对社区进行管…

[SpringCloud] Feign 与 Gateway 简介

目录 一、Feign 简介 1、RestTemplate 远程调用中存在的问题 2、定义和使用 Feign 客户端 3、Feign 自定义配置 4、Feign 性能优化 5、Feign 最佳实践 6、Feign 使用问题汇总 二、Gateway 网关简介 1、搭建网关服务 2、路由断言工厂 3、路由的过滤器配置 4、全局过…

钢琴培训答题服务预约小程序的效果怎样

很多家长都会从小培养孩子的兴趣&#xff0c;钢琴便是其中热度较高的一种&#xff0c;而各城市也不乏线下教育培训机构&#xff0c;除了青少年也有成年人参加培训&#xff0c;市场教育高需求下&#xff0c;需要商家不断拓展客户和转化。 那么通过【雨科】平台制作钢琴培训服务…