kafka广播消费组停机后未删除优化

背景

kafka广播消息的时候为了保证groupId不重复,再创建的时间采用前缀+时间戳的形式,这样可以保证每次启动的时候是创建的新的,但是

会出现一个问题:就是每次停机或者重启都会新建一个应用实例,关闭应用后并不会删除kafka下面的消费组,导致消费组越来越多,目前

我们有promethes监控kafka消息偏移,一直没有消费的消费组就会进行报警;

解决思路

既然是没有删除消费组就通过优雅停机,应用关闭前采用java的api操作kafka消费组,进行删除

代码实现

1)编写类实现DisposableBean接口,实现destroy方法,注意每个项目定义的id会不一样,此例子中 id = “cfgs-broadcast”

package com.simo.vsim.cfgs.init;import com.alibaba.nacos.api.config.annotation.NacosValue;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;@Data
@Component
@Slf4j
public class ApplicationListen implements InitializingBean, DisposableBean {@Resourceprivate KafkaListenerEndpointRegistry registry;@NacosValue(value = "${spring.kafka.bootstrap-servers}", autoRefreshed = true)private String servers;@Overridepublic void destroy()  {MessageListenerContainer listenerContainer = registry.getListenerContainer("cfgs-broadcast");String groupId = listenerContainer.getGroupId();Map<String, Object> props = new HashMap<>(1);props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,servers);AdminClient adminClient = AdminClient.create(props);DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(groupId));KafkaFuture resultFuture = deleteConsumerGroupsResult.all();try {resultFuture.get();log.info("kafka关闭消费组="+groupId);} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}adminClient.close();}@Overridepublic void afterPropertiesSet() {}
}

2)接收kafka广播消息的时候指定容器id,用于第一步通过id进行删除,id = “cfgs-broadcast”

/*** groupId不一样代表广播模式,earliest 可能重复消费,latest可能漏消费* @param message* @param ack*/
@KafkaListener(containerFactory = "manualImmediateListenerContainerFactory" , topics = {"${kafka.topic.cfgs-broadcast}"},properties = {"auto.offset.reset=latest"},groupId = "cfgs-broadcast-" + "#{T(java.lang.System).currentTimeMillis()}",idIsGroup = false,id = "cfgs-broadcast")
public void onMessageManualBroadcast(List<Object> message, Acknowledgment ack){message.forEach(item -> handleMsg(2,item));//直接提交offsetack.acknowledge();
}

效果

1)正常启动有这个消费组:cfgs-broadcast-1696754926097

2)重新启动,通过日志显示已经删除(k8s默认是优雅停机)
在这里插入图片描述
如果是iead直接关闭下,不要一下子点击两下停止,点击一次是优雅停机,连续点击2次就是kill -9的效果,就无法看到效果
![在这里插入图片描述](https://img-blog.csdnimg.cn/d36947cdd8f048acaa886eadafeaa34b.png

3)查看kafka消费组,确实已经删除

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/158764.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE3页面截取部署后的二级目录地址

用vue3开发了一个项目&#xff0c;只能部署在根目录&#xff0c;不能加二级目录&#xff0c;后来网上找了解决方案&#xff0c;在vite.config.ts中增加base: ./,配置解决问题&#xff0c;参考下图&#xff1a; 但部署后要获取部署的二级目录地址切遇到问题&#xff0c;后来想了…

发布npm包质量分测试

查询质量分接口 https://registry.npmjs.org/-/v1/search?textcanvas-plus v0.0.1 quality 0.2987 新建文件夹 canvas-plus 执行命令 npm init 生成package.json {"name": "3r/canvas-plus","version": "0.0.1","descript…

Profinet主站转EtherNET/IP从站连接profinet从站设备方法

YC-PNM-EIP网关产品可以通过各种数据接口与工业领域的仪表、PLC、计量设备等产品连接&#xff0c;实时采集这些设备中的运行数据、状态数据等信息&#xff0c;并将采集的数据进行整合、运算等操作后传输到其他设备或云平台。网关可采集Profinet EtherCAT设备的实时数据采集&…

大数据Doris(十):添加BE步骤

文章目录 添加BE步骤 一、使用mysql连接 二、​​​​​​​添加be

微信小程序支持h5实现webrtc h264 h265低延迟传输渲染

微信小程序自成体系&#xff0c;自身也带了很强的rtc音视频能力&#xff0c;但是他捆绑了他自己的服务&#xff0c;开发也相对受限于他的api。基于以前的了解可以采webview的方式内嵌h5网址来实现自定义的webrtc.但实践起来并不轻松&#xff0c;主要是小程序的严格限制&#xf…

Spring MVC 十一:@EnableWebMvc

我们从两个角度研究EnableWebMvc&#xff1a; EnableWebMvc的使用EnableWebMvc的底层原理 EnableWebMvc的使用 EnableWebMvc需要和java配置类结合起来才能生效&#xff0c;其实Spring有好多Enablexxxx的注解&#xff0c;其生效方式都一样&#xff0c;通过和Configuration结合…

搭建Umijs环境并创建一个项目 介绍基本操作

上文 Umijs介绍中 我们简单了解了一下这个东西 这里 我们还是不要关说不练了 直接终端执行 npm install -g umi可能会比较久 因为这个东西还挺大的 我们执行 umi -v这里就会输出版本 然后 我们创建一个文件夹目录 作为项目目录 然后 我们可以 通过 终端输入 mkdir 项目名称…

07 | @Entity 之间的关联关系注解如何正确使用?

实体与实体之间的关联关系一共分为四种&#xff0c;分别为 OneToOne、OneToMany、ManyToOne 和 ManyToMany&#xff1b;而实体之间的关联关系又分为双向的和单向的。实体之间的关联关系是在 JPA 使用中最容易发生问题的地方&#xff0c;接下来我将一一揭晓并解释。我们先看一下…

用 Three.js 创建一个酷炫且真实的地球

接下来我会分步骤讲解&#xff0c;在线示例在数字孪生平台。 先添加一个球体 我们用threejs中的SphereGeometry来创建球体&#xff0c;给他贴一张地球纹理。 let earthGeo new THREE.SphereGeometry(10, 64, 64) let earthMat new THREE.MeshStandardMaterial({map: albed…

【汇编语言学习笔记】一、基础知识

引言 汇编语言是直接在硬件之上工作的编程语言&#xff0c;首先要了解硬件系统的结构&#xff0c;才能有效的应用汇编语言对其编程。 1.1机器语言 机器语言是机器指令的集合。 机器指令展开来讲就是一台机器可以正确执行的命令。 1.2汇编语言 汇编语言的主体是汇编指令。 …

【云计算】相关解决方案介绍

文章目录 1.1 云服务环境 Eucalyptus1.1.1 介绍1.1.2 开源协议及语言1.1.3 官方网站 1.2 开源云计算平台 abiCloud1.2.1 开源协议及语言1.2.2 官方网站 1.3 分布式文件系统 Hadoop1.3.1 开源协议及语言1.3.2 官方网站 1.4 JBoss云计算项目集 StormGrind1.4.1 开源协议及语言1.4…

uniapp打包配置

安卓&#xff1a; 首先不管是什么打包都需要证书&#xff0c;安卓的证书一般都是公司提供或者自己去申请。然后把包名等下图框住的信息填上&#xff0c;点击打包即可。 ios&#xff1a;ios需要使用mac到苹果开发者平台去申请证书&#xff0c;流程可以参考下边的链接 参考链接…

解决git在window11操作很慢,占用很大cpu的问题

【git在window11操作很慢&#xff0c;占用很大cpu&#xff0c;最后也执行失败】 在谷歌输入&#xff1a;git very slow in window 11。通过下面链接终于找到了解决方案&#xff1a; https://www.reddit.com/r/vscode/comments/sulebx/slow_git_in_wsl_after_updating_to_window…

【Java基础知识】

文章目录 前言1. 添加环境变量的方法2.JDK JRE JVM3.main方法4.注释5.标识符6.数据类型与变量6.1字节&#xff1a;6.2变量6.3找最大值和最小值范围整型变量浮点型变量字符型变量布尔型变量 前言 最近焦虑迷茫&#xff0c;学习过程中遇到了困难&#xff0c;所以想重新复习一遍S…

【C++】适配器模式 - - stack/queue/deque

目录 一、适配器模式 1.1迭代器模式 1.2适配器模式 二、stack 2.1stack 的介绍和使用 2.2stack的模拟实现 三、queue 3.1queue的介绍和使用 3.2queue的模拟实现 四、deque&#xff08;不满足先进先出&#xff0c;和队列无关&#xff09; 4.1deque的原理介绍 4.2dequ…

在线兴趣教学类线上学习APP应用开发部署程序组建研发团队需要准备什么?

哈哈哈&#xff0c;同学们&#xff0c;我又来了&#xff0c;这个问题最近问的人有点多&#xff0c;但是说实话我也不知道&#xff0c;但是我还是总结了一下&#xff0c;毕竟我懂点代码的皮毛&#xff0c;同时我检索内容的时候&#xff0c;都是一些没有很新鲜的文案&#xff0c;…

大数据笔记-大数据处理流程

大家对大数据处理流程大体上认识差不多&#xff0c;具体做起来可能细节各不相同&#xff0c;一幅简单的大数据处理流程图如下&#xff1a; 1&#xff09;数据采集&#xff1a;数据采集是大数据处理的第一步。 数据采集面对的数据来源是多种多样的&#xff0c;包括各种传感器、社…

极简c++(4)类的静态成员

静态数据成员 ::是作用域操作符&#xff01; #include<iostream> using namespace std;class Point{private:int x,y;public:point(int x 0,int y 0):x(x),y(y){}~point();int getX(){return x;}int getY(){return x;} }假设需要统计点的个数&#xff0c;考虑添加一个…

计算机网络 | 网络层

计算机网络 | 网络层 计算机网络 | 网络层功能概述SDN&#xff08;Software-Defined Networking&#xff09;路由算法与路由协议IPv4IPv4 分组IPv4 分组的格式IPv4 数据报分片 参考视频&#xff1a;王道计算机考研 计算机网络 参考书&#xff1a;《2022年计算机网络考研复习指…

【VSCode】Windows环境下,VSCode 搭建 cmake 编译环境(VSCode 插件配置)

目录 一、下载编译器 1、下载 Windows GCC 2、选择编译器路径 二、下载插件 三、配置 cmake generator 四、编译工程 一、下载编译器 1、下载 Windows GCC 这里是在Windows环境下&#xff0c;所以下载的是 Windows 环境使用的 gcc 编译器。 下载地址: MinGW-w64 - for…