Sprint Boot教程之五十八:动态启动/停止 Kafka 监听器

Spring Boot – 动态启动/停止 Kafka 监听器

当 Spring Boot 应用程序启动时,Kafka Listener 的默认行为是开始监听某个主题。但是,有些情况下我们不想在应用程序启动后立即启动它。

动态启动或停止 Kafka Listener,我们需要三种主要方法,即在需要处理 Kafka 消息时启动/停止、使用@KafkaListener注释、使用 kafkaListenerEndpointRegistry

在本文中,我们将介绍如何动态启动或停止 Kafka 监听器。

启动/停止 Kafka 监听器的不同方法

方法一

  • 当需要处理 Kafka 消息时,启动一个应用程序。
  • 处理成功后停止应用程序。

方法 2:在注册 Kafka Listener 时,我们可以设置以下 id 属性。

@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message)

方法 3:自动连接KafkaListenerEndpointRegistry bean 来控制 Kafka Listener 的启动或停止。

@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

开始:

 public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();

停止:

public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);

下面我们将以上述句法方法为例进行实现。

启动或停止特定 Kafka Listener的实现

创建一个类,其对象将被 Kafka 侦听器使用。

文件:Message.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String message;
}

配置 Kafka Listener 将使用的消费者。

文件:KakfaConsumerConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {private String kafkaUrl = "localhost:9092";@Beanpublic ConsumerFactory<String, Message> messageConsumerFactory() {JsonDeserializer<Message> deserializer = new JsonDeserializer<>(Message.class, false);deserializer.setRemoveTypeHeaders(false);deserializer.addTrustedPackages("*");deserializer.setUseTypeMapperForKey(true);Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);config.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerFactory() {ConcurrentKafkaListenerContainerFactory<String, Message> containerFactory = new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(messageConsumerFactory());return containerFactory;}
}

创建一个具有必要参数的 Kafka 监听器。

  • id:此侦听器的容器唯一标识符。如果未指定,则使用自动生成的 ID。
  • groupId:仅为该监听器使用该值覆盖消费者工厂的 group.id 属性。
  • 主题:此侦听器的主题。条目可以是“主题名称”、“属性占位符键”或“表达式”。主题名称必须从表达式解析。这使用组管理,Kafka 将为组成员分配分区。
  • containerFactory:KafkaListenerContainerFactory的 bean 名称,将用于创建为该端点提供服务的消息侦听器容器。
  • autoStartup:设置为 true 或 false 以覆盖容器工厂的默认设置。默认情况下,该值设置为 true,因此,它将在我们的应用程序启动时立即开始使用消息。

文件:KafkaMessageListener.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;@Configuration
public class KafkaMessageListener {Logger logger = LoggerFactory.getLogger(KafkaMessageListener.class);@KafkaListener(id = "id-1", groupId = "group-1", topics = "Message-topic", containerFactory = "messageListenerFactory", autoStartup = "false")public void consumeMessage(Message message) {logger.info("Message received : -> {}", message);}
}

KafkaListenerEndpointRegistry 类可用于通过 listenerId 获取 Kafka 侦听器容器。这里我们使用了@KafkaListener注释来将 bean 方法声明为 Kafka 侦听器容器的侦听器。现在可以使用此容器启动或停止 Kafka 侦听器。

文件:KafkaListenerAutomation.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;@Component
public class KafkaListenerAutomation {private final Logger logger = LoggerFactory.getLogger(KafkaListenerAutomation.class);@AutowiredKafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;public boolean startListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.start();logger.info("{} Kafka Listener Started", listenerId);return true;}public boolean stopListener(String listenerId) {MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(listenerId);assert listenerContainer != null : false;listenerContainer.stop();logger.info("{} Kafka Listener Stopped.", listenerId);return true;}
}

使用 API 端点,我们可以通过提供 listenerID 来启动或停止特定的 Kafka 监听器。

文件:StartOrStopListenerController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class StartOrStopListenerController {@AutowiredKafkaListenerAutomation kafkaListenerAutomation;@GetMapping("/start")public void start(@RequestParam("id") String listenerId) {kafkaListenerAutomation.startListener(listenerId);}@GetMapping("/stop")public void stop(@RequestParam("id") String listenerId) {kafkaListenerAutomation.stopListener(listenerId);}
}

输出:

1.Kafka Listener启动:

2.Kafka Listener 收到消息:

3. Kafka Listener 停止:

最后

理想情况下,应用程序应在需要处理 Kafka 消息时启动,并在该过程完成后立即停止。限制 Kafka 侦听器以有效利用它是一种很好的做法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2353.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

编译pytorch——cuda-toolkit-nvcc

链接 https://blog.csdn.net/wjinjie/article/details/108997692https://docs.nvidia.com/cuda/cuda-installation-guide-linux/#switching-between-driver-module-flavorshttps://forums.developer.nvidia.com/t/can-not-load-nvidia-drivers-on-ubuntu-22-10/239750https://…

如何发布自己的第一个Chrome扩展程序

如何发布自己的Chrome扩展程序 只需要六步即可完成Chrome扩展程序的发布 &#xff08;1&#xff09;首先打开google chrome 应用商城注册开发者账号的页面 &#xff08;2&#xff09;现在进行一个绑卡支付5美元的一次性注册费用即可。【不知道如何绑卡的支付的&#xff0c;文…

SpringBoot入门实现简单增删改查

本例子的依赖 要实现的内容 通过get、post、put和delete接口,对数据库中的trade.categories表进行增删改查操作。 目录结构 com.test/ │ ├── controller/ │ ├── CateController.java │ ├── pojo/ │ ├── dto/ │ │ └── CategoryDto.java │ ├─…

electron 如何申请 Mac 系统权限

对于一些使用 Electron开发的app, 需要获取一些系统权限,比如录屏权限, 获取摄像头权限,麦克风等等,类似于以下界面: 那么Electron App 应该如何申请呢? 首先我们明确一下macOS中基础权限的分类,可以分为以下几种: 隐私权限(Private Permissions) : <!-- entitlements.ma…

浅谈云计算02 | 云计算模式的演进

云计算计算模式的演进 一、云计算计算模式的起源追溯1.2 个人计算机与桌面计算 二、云计算计算模式的发展阶段2.1 效用计算的出现2.2 客户机/服务器模式2.3 集群计算2.4 服务计算2.5 分布式计算2.6 网格计算 三、云计算计算模式的成熟与多元化3.1 主流云计算服务模式的确立3.1.…

An FPGA-based SoC System——RISC-V On PYNQ项目复现

本文参考&#xff1a; &#x1f449; 1️⃣ 原始工程 &#x1f449; 2️⃣ 原始工程复现教程 &#x1f449; 3️⃣ RISCV工具链安装教程 1.准备工作 &#x1f447;下面以LOCATION代表本地源存储库的安装目录&#xff0c;以home/xilinx代表在PYNQ-Z2开发板上的目录 ❗ 下载Vivad…

AI智能体实战|使用扣子Coze搭建AI智能体,看这一篇就够了(新手必读)

有朋友看到我使用Coze搭建的AI智能体蛮实用的&#xff0c;也想自己尝试一下。那今天我就分享一下如何使用Coze&#xff08;扣子&#xff09;搭建AI智能体&#xff0c;手把手教学&#xff0c;流程超级详细&#xff0c;学会了的话&#xff0c;欢迎分享转发&#xff01; 一、搭建A…

.NET8.0多线程编码结合异步编码示例

1、创建一个.NET8.0控制台项目来演示多线程的应用 2、快速创建一个线程 3、多次运行程序&#xff0c;可以得到输出结果 这就是多线程的特点 - 当多个线程并行执行时&#xff0c;它们的具体执行顺序是不确定的&#xff0c;除非我们使用同步机制&#xff08;如 lock、信号量等&am…

nginx 实现 正向代理、反向代理 、SSL(证书配置)、负载均衡 、虚拟域名 ,使用其他中间件监控

我们可以详细地配置 Nginx 来实现正向代理、反向代理、SSL、负载均衡和虚拟域名。同时&#xff0c;我会介绍如何使用一些中间件来监控 Nginx 的状态和性能。 1. 安装 Nginx 如果你还没有安装 Nginx&#xff0c;可以通过以下命令进行安装&#xff08;以 Ubuntu 为例&#xff0…

《数据思维》之数据可视化_读书笔记

文章目录 系列文章目录前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 前言 数据之道&#xff0c;路漫漫其修远兮&#xff0c;吾将上下而求索。 一、数据可视化 最基础的数据可视化方法就是统计图。一个好的统计图应该满足四个标准&#xff1a;准确、有…

Linux之进程

Linux之进程 一.进程进程之形ps命令进程状态特殊进程孤儿进程守护进程 进程创建之创建子进程进程特性优先级进程切换&#xff08;分时操作系统&#xff09; 二.环境变量三.进程地址空间四.进程终止&进程等待五.进程替换六.自定义shell 本篇博客希望简略的介绍进程&#xff…

漫话架构师|什么是系统架构设计师(开篇)

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 关注犬余&#xff0c;共同进步 技术从此不孤单

在AI智能中有几种重要的神经网络类型?6种重要的神经网络类型分享!

神经网络今天已经变得非常流行&#xff0c;但仍然缺乏对它们的了解。一方面&#xff0c;我们已经看到很多人无法识别各种类型的神经网络及其解决的问题&#xff0c;更不用说区分它们中的每一个了。其次&#xff0c;在某种程度上更糟糕的是&#xff0c;当人们在谈论任何神经网络…

业务幂等性技术架构体系之消息幂等深入剖析

在系统中当使用消息队列时&#xff0c;无论做哪种技术选型&#xff0c;有很多问题是无论如何也不能忽视的&#xff0c;如&#xff1a;消息必达、消息幂等等。本文以典型的RabbitMQ为例&#xff0c;讲解如何保证消息幂等的可实施解决方案&#xff0c;其他MQ选型均可参考。 一、…

【C语言】线程----同步、互斥、条件变量

目录 3. 同步 3.1 概念 3.2 同步机制 3.3 函数接口 1. 同步 1.1 概念 同步(synchronization)指的是多个任务(线程)按照约定的顺序相互配合完成一件事情 1.2 同步机制 通过信号量实现线程间的同步 信号量&#xff1a;通过信号量实现同步操作&#xff1b;由信号量来决定…

Linux内核的启动

一、需求 Linux系统中内核处于硬件和应用层之间。整个系统启动和初始化的过程&#xff0c;Linux内核是在主处理器启动之后才会执行。不同的处理器启动流程并不相同&#xff0c;这就要求内核能支持各种处理器的初始化操作。Liux内核各个模块&#xff0c;大部分设计时做到了体系…

[手机Linux] ubuntu 错误解决

Ubuntu: 1,ttyname failed: Inappropriate ioctl for device 将 /root/.profile 文件中的 mesg n || true 改为如下内容。 vim /root/.profile tty -s && mesg n || true 2,Errors were encountered while processing: XXX XXXX sudo apt-get --purge remove xxx…

Docker的入门

一、安装Docker 本教程参考官网文档&#xff0c;链接如下: CentOS | Docker Docs 这个教程是基于你的虚拟机已经弄好了&#xff08;虚拟机用的CentOS&#xff09;&#xff0c;并且有SecureCRT或者MobaXterm等等任意一个工具 1.1 卸载旧版 如果系统中存在旧版本的Docker&a…

Onedrive精神分裂怎么办(有变更却不同步)

Onedrive有时候会分裂&#xff0c;你在本地删除文件&#xff0c;并没有同步到云端&#xff0c;但是本地却显示同步成功。 比如删掉了一个目录&#xff0c;在本地看已经删掉&#xff0c;onedrive显示已同步&#xff0c;但是别的电脑并不会同步到这个删除操作&#xff0c;在网页版…

机器学习06-正则化

机器学习06-正则化 文章目录 机器学习06-正则化0-核心逻辑脉络1-参考网址3-大模型训练中的正则化1.正则化的定义与作用2.常见的正则化方法及其应用场景2.1 L1正则化&#xff08;Lasso&#xff09;2.2 L2正则化&#xff08;Ridge&#xff09;2.3 弹性网络正则化&#xff08;Elas…