Kafka消费异常处理

异常

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:775)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:726)

异常的主要信息:

a) CommitFailedException

b) Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

其实如果我们对其中的参数,或是对消费的机制比较了解,这个问题就很好解决。当我看到这个异常,我很开心,因为我知道我能通过此异常了解一下Kafka Consumer 消费消息的大致过程。心态是好的~~~

其实现在看这个异常是说:该Consumer不能提交offset了,因为它已经出局了,是因为你的处理小时时间长于你要报告给server的时间。同时还告诉我们怎么处理:要么增加超时时间,要么减少每次poll回来的消息个数。

主要问题在于,何为session timeout?maximum size of batches?poll(timeout)中timeout什么意思?

处理过程

a) 找官网doc

版本:1.1.0

有效信息:


换成通俗易懂的人话:

poll() API 主要是判断consumer是否还活着,只要我们持续调用poll(),消费者就会存活在自己所在的group中,并且持续的消费指定partition的消息。底层是这么做的:消费者向server持续发送心跳,如果一个时间段(session.timeout.ms)consumer挂掉或是不能发送心跳,这个消费者会被认为是挂掉了,这个Partition也会被重新分配给其他consumer

下边这个例子如果理解不上,请通读全文后,再回来理解一下笔者的意思

------------------------------------分割线------------------------------------

背景:你是个搬砖的,同时还是个瓦工,en….你还有个儿子
上述错误就是:工头命令每个码农(consumer)最多10分钟把一个100块转运到目的地并把搬来的砖垒房子,然后回来接着取砖、垒房子。问题在于,你搬了100块砖走了,但是10分钟过去了,你还没回来,那我怎么知道你是不是偷懒睡觉去了,工头就把这个搬砖垒房子的活分给同在一起干活的其他人了(同group不同consumer)。其实你可能没有偷懒,是因为你太追求完美了(估计是处女座,或是垒自家的房子),垒房子的时间很长(spending too much time message processing),10分钟内没能回来向工头报道,这时,你就得和工头商量,两种办法:1、能不能15分钟内回来就行,2、10分钟内回来,但每次搬80块砖来垒房子。
如果老板是个比较有控制欲的人,对于第二中办法,同样的工作量,你无非是多跑几趟。还能很好的控制你;但是对于第一种办法,老板是不愿意的,为什么,因为和你一起搬砖的还有其他人,他可以协调(rebalance)其他5分钟就回来的人来干你的活。你告诉他15分钟对于工头来说是相对不可控的。
当然你还有两位一种办法,你可以找你儿子来搬砖(另起一个线程),你来垒房子,等你垒完了100块转,你儿子去告诉工头,并搬回下一个100块转。但是要注意有一个问题,就是你儿子不能在你还没有垒完上一个100块转前就报告给工头,去获取下一批100块转。这样你就处理不过来了。

------------------------------------分割线------------------------------------


通过上边的例子,我们大致清楚了max.poll.interval.ms?maximum size of batches?

max.poll.interval.ms:消费者最大心跳时间间隔

maximum size of batches:消费者每次获取消息的个数

什么时候发送心跳呢?是poll()方法被调用发送心跳吗?那poll(timeout)中timeout是什么意思呢?

官网对poll(timeout)中timeout的解释如下:

Parameters:
timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.

这个我费了很大力气都没有给它翻译成人话……

怎么办?看源码?大致看了下,但是水平有限。。。真的不知道什么时候发送心跳。那就剩下最后一招了(杀手锏)---写例子验证

验证

1、producer

public class ProducerTest {@Testpublic void TestPro() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<String, String>(props);for (int i = 0; i < 30; i++)producer.send(new ProducerRecord<String, String>("user_behavior", Integer.toString(i), "hello-"+i));producer.close();}
}

2、consumer

public class ConsumerTest {@Testpublic void TestCon() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("max.poll.records", 5);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("user_behavior"));int i = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(3000);System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis()));for (ConsumerRecord<String, String> record : records) {System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n",KafkaHelper.timestmp2date(record.timestamp()),record.partition(),record.offset(),record.key(),record.value());}consumer.commitSync();}}
}

a)测试poll中的参数作用

直接启动Consumer打印结果:

polls out: 1time: 2018-06-13 15:25:19
polls out: 2time: 2018-06-13 15:25:22
polls out: 3time: 2018-06-13 15:25:25
polls out: 4time: 2018-06-13 15:25:28

一开始我错误以为:这个timeout是Consumer每次拉去消息的时间间隔

但我启动了Producer后,打印结果:

polls out: 1time: 2018-06-13 15:27:40
polls out: 2time: 2018-06-13 15:27:43
polls out: 3time: 2018-06-13 15:27:46
polls out: 4time: 2018-06-13 15:27:49
polls out: 5time: 2018-06-13 15:27:52
time = 2018-06-13 15:27:52, partition = 0, offset = 503, key = 1, value = hello-1
time = 2018-06-13 15:27:52, partition = 0, offset = 504, key = 5, value = hello-5
polls out: 6time: 2018-06-13 15:27:52
time = 2018-06-13 15:27:52, partition = 1, offset = 157, key = 4, value = hello-4
time = 2018-06-13 15:27:52, partition = 2, offset = 129, key = 0, value = hello-0
time = 2018-06-13 15:27:52, partition = 2, offset = 130, key = 2, value = hello-2
time = 2018-06-13 15:27:52, partition = 2, offset = 131, key = 3, value = hello-3
polls out: 7time: 2018-06-13 15:27:55
polls out: 8time: 2018-06-13 15:27:58
polls out: 9time: 2018-06-13 15:28:01
polls out: 10time: 2018-06-13 15:28:04

由此可见,第5次和第6次调用poll方法的时间相同。

至此,结合官网的描述对poll(timeout) 的timeout参数认识如下

如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。

b)测试max.poll.interval.ms

public class ConsumerTest {@Testpublic void TestCon() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("max.poll.records", 5);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("user_behavior"));int i = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(3000);System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis()));for (ConsumerRecord<String, String> record : records) {System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n",KafkaHelper.timestmp2date(record.timestamp()),record.partition(),record.offset(),record.key(),record.value());}consumer.commitSync();}}
}

启动Consumer、Producer运行正常不报错

polls out: 1time: 2018-06-13 15:53:07
polls out: 2time: 2018-06-13 15:53:07
time = 2018-06-13 15:53:07, partition = 1, offset = 158, key = 4, value = hello-4
time = 2018-06-13 15:53:07, partition = 0, offset = 505, key = 1, value = hello-1
time = 2018-06-13 15:53:07, partition = 0, offset = 506, key = 5, value = hello-5
time = 2018-06-13 15:53:07, partition = 2, offset = 132, key = 0, value = hello-0
time = 2018-06-13 15:53:07, partition = 2, offset = 133, key = 2, value = hello-2
polls out: 3time: 2018-06-13 15:53:07
time = 2018-06-13 15:53:07, partition = 2, offset = 134, key = 3, value = hello-3
polls out: 4time: 2018-06-13 15:53:10
polls out: 5time: 2018-06-13 15:53:13

想到异常里提到的处理消息时间过长(spending too much time message processing)

Consumer代码增加处理时间

public class ConsumerTest {@Testpublic void TestCon() throws InterruptedException {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("max.poll.records", 5);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("user_behavior"));int i = 0;while (true) {ConsumerRecords<String, String> records = consumer.poll(3000);System.out.println("polls out: " + ++i + "time: " + KafkaHelper.timestmp2date(System.currentTimeMillis()));for (ConsumerRecord<String, String> record : records) {System.out.printf("time = %s, partition = %s, offset = %d, key = %s, value = %s%n",KafkaHelper.timestmp2date(record.timestamp()),record.partition(),record.offset(),record.key(),record.value());
                TimeUnit.SECONDS.sleep(2);
            }consumer.commitSync();}}
}
polls out: 1time: 2018-06-13 15:59:13
polls out: 2time: 2018-06-13 15:59:16
polls out: 3time: 2018-06-13 15:59:19
polls out: 4time: 2018-06-13 15:59:22
polls out: 5time: 2018-06-13 15:59:22
time = 2018-06-13 15:59:22, partition = 2, offset = 135, key = 0, value = hello-0
time = 2018-06-13 15:59:22, partition = 2, offset = 136, key = 2, value = hello-2
time = 2018-06-13 15:59:22, partition = 2, offset = 137, key = 3, value = hello-3org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713)at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596)...

久违的异常终于出现了

至此可以看出max.poll.interval.ms是获取消息后,处理这些消息所用时间不能超过该值。即:两次poll的时间间隔最大时间

那么对于何时发送心跳想必也是在调用poll(timeout)方法的时候发送的(猜测),因为超过了max.poll.interval,ms后,这个consumer就被视为挂了。

ps:sleep时间改为0.5秒也会抛异常,因为每次poll5条消息,处理时间2.5s>max.poll.interval,ms=1000ms

异常解决

a)调大max.poll.interval,ms,默认300000(300s)

b)调小max.poll.records,默认500

c)另起线程

后续:

写一个单独处理的message的线程,这样消费和处理分开就不会出现此异常。但要注意处理完一批消息后才能提交offset,然后进行下次的poll(会用到CountDownLatch)

总结:

遇到Exception要淡定,每个Exception搞清楚缘由后都是一次提高的过程。


Ps: 代码中包含<strong>标签是因为我想在更改出加粗,生成后就多了<strong>标签了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/72238.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

账户系统,余额与体现

参考连接: https://blog.pingxx.com/2018/02/27/用户账户系统该怎么用?/ 账户体系的建立实际上是将结清算分开(即实时清算/定时结算)利于更复杂的支付业务(如分账/层级分润等): 建立账户体系时要根据业务需求考虑各种账户(如余额账户/冻结资金账户/红包账户(不能提现但是能…

微信支付成功,如何刷新用户当前页面的余额

本项目中&#xff0c;使用微信支付&#xff0c;支付成功后&#xff0c;弹出提示框&#xff0c;并且目的是改变当前用户的余额。。。我们在互动直播项目中发现 &#xff0c;然而事情并没有那么简单。 代码如下&#xff1a; 我们知道&#xff0c;应该在appdelegate中调用微信支…

开源趣事~ 记给 OpenHarmony 提 PR 的那些事

大家好哇&#xff0c;许久不见&#xff0c;也感谢大家这么久一直以来的关注&#xff0c;也感谢在短视频盛行的今天&#xff0c;你们还能静下心来坚守文字的阵地。 说到这次的主题&#xff0c;参加鸿蒙项目的开源&#xff0c;也是小编第一次拥抱开源&#xff0c;就像是别人有困…

基于大规模边缘计算的千万级聊天室技术实践

在技术的迭代更新下&#xff0c;面对大型、超大型的直播场景&#xff0c;大规模边缘聊天室成为新热潮。 作者 | 张超 责编 | 王子彧 出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09; 当前直播成为一种流行趋势&#xff0c;带货直播&#xff0c;网红带货&#…

JavaScript成功背后的四个关键人物!

前言&#xff1a;JavaScript能如此成功&#xff0c;至少有四位关键人物&#xff1a; 1. JavaScript作者Brendan Eich 2. JSLint&#xff0c;JSON作者Douglas Crockford 3. jQuery作者John Resig 4. Node.js作者Ryan Dahl。 Brendan Eich以及JavaScript的发明过程大家已经非常熟…

爬虫教程( 3 ) --- 手机 APP 数据抓取

1. Fiddler 设置 这是使用 fiddler 进行手机 app 的抓包&#xff0c;也可以使用 Charles&#xff0c;burpSuite 等。。。 电脑安装 Fiddler&#xff0c;手机 和 安装 fiddler 的电脑处于同一个网络里&#xff0c; 否则手机不能把 HTTP 发送到 Fiddler 的机器上来。 配置 Fiddle…

以某乎为实战案例,教你用Python爬取手机App数据

1 前言 最近爬取的数据都是网页端&#xff0c;今天来教大家如何爬取手机端app数据&#xff08;本文以ios苹果手机为例&#xff0c;其实安卓跟ios差不多&#xff09;&#xff01; 本文将以『某乎』为实战案例&#xff0c;手把手教你从配置到代码一步一步的爬取App数据&#xff0…

利用Python爬虫抓取手机APP的传输数据

大多数APP里面返回的是json格式数据&#xff0c;或者一堆加密过的数据 。这里以超级课程表APP为例&#xff0c;抓取超级课程表里用户发的话题。 1、抓取APP数据包 表单&#xff1a; 表单中包括了用户名和密码&#xff0c;当然都是加密过了的&#xff0c;还有一个设备信息&am…

22. 听说你想要用爬虫采集我的手机号?哎 ~ 我展示用的是图片

本篇博客我们实现图片渲染手机号码案例,用于防止爬虫直接采集文字信息。 爬虫训练场 本案例实现的效果如下所示 文章目录 bootstrap5 实现名片样式卡片补充数据生成逻辑生成用户 5 个汉字的昵称调用头像 API,生成图片将手机号码生成图片bootstrap5 实现名片样式卡片 在 Boo…

一种解决Qobuz客户端一直转圈加载不出来的思路

先上图&#xff0c;Qobuz在Win10上的客户端是这样滴 之前是最高音质&#xff0c;换到最差音质还是加载不出来。可能是我网络的问题&#xff0c;但是代理节点是没问题的。然后我尝试了一下Qobuz的Web Player。 就是登录之后画红圈这个 秒开好吧&#xff0c;也不卡顿&#xff…

2022年注册会计师(CPA)考试测试题及答案

1、某外国投资者协议购买境内公司股东的股权,将境内公司变更为外商投资企业,该外商投资企业的注册资本为700万美元。根据外国投资者并购境内企业的有关规定,该外商投资企业的投资总额的上限是( )万美元。 A&#xff0e;1000 B&#xff0e;1400 C&#xff0e;1750 D…

【PMP】PMP考试练习题(中英文对照)

1. A company wants to ensure that project failures are addressed in project documentation. Where should the project manager include them? A. Project management plan B. Risk management plan C. Change management plan D. Communications management plan 公司希…

PMP通过率大跌,是否与新版考试大纲有关?

通过率的增长和下降并不是只看考试内容或者说考试是否有重大改革来的&#xff0c;毕竟每年的考生水平都是不一的&#xff0c;我们也没有办法去确定一个考试的通过率高低是否准确&#xff0c;你相信那就是&#xff0c;不相信同样对于你是否能过通过考试也没有多大影响。 考试并不…

超级好用『PMP考试答题24计』一次通过考试~(1)

作为一个想一次通过PMP考试的老考试人。 刷题、报班、看视频、看教材甚至是通过人的经验贴都不会放过的我&#xff0c;只要是与通过PMP考试有关的都想去看看了解了解&#xff0c;避避坑。 但是内容有太多&#xff0c;而且考试的经验也就只能看看&#xff0c;在自己身上好像没…

PMP中文报名注意事项

随着PMP得到越来越多的关注和认可&#xff0c;报考人数也在逐年快速增长着。 而PMP的考试&#xff0c;分为英文报名和中文报名。在PMI官网通过英文报名之后才能进行英文报名。 一般报了机构的学员&#xff0c;机构都会提供英文代报名服务。 而中文报名因为涉及预约考点&…

证券从业资格考试 超全指南

一、考试科目 分为一般从业资格考试、专项业务类资格考试和管理类资格考试三种情况。 一般从业资格考试&#xff0c;即“入门资格考试”&#xff0c;主要面向即将进入证券业从业的人员&#xff0c;具体测试考生是否具备证券从业人员执业所需专业基础知识&#xff0c;是否掌握…

PMP扫盲篇2 | PMP报名、缴费、考试那些事儿~~

接上一篇&#xff1a; ​PMP是一种项目管理考试认证&#xff0c;更是一种思路。 抱着考试的思路&#xff0c;你必须至少把PMBOK完整学下来&#xff1b;抱着学习PMP知识和思路的态度&#xff0c;你要终生阅读PMI的各种guide和参加各种pmp分享会、讨论会——因为你必须不停的学…

2022年注册会计师(CPA)考试模拟题及答案

1、股份有限公司的下列股票发行方式中&#xff0c;不需要证监会核准的是&#xff08;  &#xff09;。 A.上市公司发行新股 B.非公众公司非公开发行股票&#xff0c;发行后股东人数为80人 C.非公众公司向特定对象发行股票&#xff0c;发行后股东人数为210人 D.非上市公众…

PMP澳门机考3A学员考试攻略

&#xff08;刚到澳门&#xff0c;考试前一晚&#xff0c;寻找考场&#xff09; 备考篇 如何高效学习&#xff1f; 项目整合管理大概13章节&#xff0c;每次直播上课会讲其中的几个章节。 在上课前最好可以预习下讲义&#xff0c;很多内容并不是非得在课上才能获得&#xff0c;…

1. python学习基础

这里写目录标题 python学习总结python学习参考网址环境配置⚖ 包管理说明&#xff08;涉及包的位置&#xff0c;包的查找原理&#xff0c;如何设置模块&#xff0c;__init.py__&#xff09;conda和pip换源conda创建激活和切换环境pip进行包的管理&#x1fa9d;[pip官网&#xf…