使用微服务Spring Cloud集成Kafka实现异步通信(消费者)

1、本文架构

本文目标是使用微服务Spring Cloud集成Kafka实现异步通信。其中Kafka Server部署在Ubuntu虚拟机上,微服务部署在Windows 11系统上,Kafka Producer微服务和Kafka Consumer微服务分别注册到Eureka注册中心。Kafka Producer和Kafka Consumer之间通过Kafka Server实现异步通信。

出于便于测试的目的,我通过浏览器触发Kafka Producer发送消息,观察Kafka Consumer的后台是否打印出接收到的消息内容。

Ubuntu 上部署Kafka Server,详见博文:Ubuntu下Kafka安装及使用-CSDN博客

Eureka注册中心的搭建过程和完整代码,详见博文:微服务1:搭建微服务注册中心(命令行简易版,不使用IDE)-CSDN博客

Kafka Producer微服务的完整代码,详见博文:使用微服务Spring Cloud集成Kafka实现异步通信-CSDN博客

本文的重点是实现下图中的深蓝色部分:Kafka Consumer微服务。

2、创建Spring boot项目(Kafka Consumer微服务项目):

mvn archetype:generate -DgroupId=com.test -DartifactId=microservice-kafka-consumer -DarchetypeArtifactId=maven-archetype-quickstart

项目代码的完整目录如下图所示:

编辑pom.xml,添加依赖包:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.test</groupId><artifactId>microservice-kafka-consumer</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>microservice-kafka-consumer</name><url>http://maven.apache.org</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.0.RELEASE</version><relativePath/> </parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>         <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>Hoxton.SR4</version><type>pom</type><scope>import</scope></dependency>               </dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

编辑application.yml,配置kafka消费者:

consumer:
      #消费的主题
      topic: test-topic
      #消费者组id
      group-id: test-group
      #是否自动提交偏移量
      enable-auto-commit: true
      #提交偏移量的间隔-毫秒
      auto-commit-ms: 1000
      #客户端消费的会话超时时间-毫秒
      session-timeout-ms: 10000
      #实现DeSerializer接口的反序列化类键
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #实现DeSerializer接口的反序列化类值
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

server:port: 8030
spring:application:name: microservice-kafka-consumerkafka:bootstrap-servers: 192.168.23.131:9092consumer:group-id: test-groupenable-auto-commit: trueauto-commit-ms: 1000session-timeout-ms: 10000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializereureka:client:serviceUrl:defaultZone: http://localhost:8080/eureka/instance:prefer-ip-address: true            

App.java的完整代码如下:

package com.test;import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.kafka.annotation.KafkaListener;@SpringBootApplication
@EnableDiscoveryClient
public class App 
{@KafkaListener(topics = "mydemo1")public void listen(String msg) throws Exception {System.out.println( "-----> Recv a msg: " + msg );}public static void main( String[] args ){System.out.println( "Hello World!" );SpringApplication.run(App.class, args);}
}

3、测试

在浏览器输入,触发Kafka Producer向Kafka Server发送消息:

http://localhost:8020/kafka/sendMsg?msg=测试消息testmsg

在Kafka Consumer的后台打印出收到的消息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/438084.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mybatis框架梳理

Mybatis框架梳理 前言1.ORM2.模块划分2.1 ORM的实现2.2 SQL的映射2.3 插件机制2.4 缓存机制2.5 其他 3. 愿景 前言 如果让我聊一聊mybatis&#xff0c;我该怎么说呢&#xff1f;开发中时时刻刻都在用它&#xff0c;此时此刻&#xff0c;脑海中却只浮现ORM框架这几个字&#xff…

[每周一更]-(第117期):硬盘分区表类型:MBR和GPT区别

文章目录 1. **支持的磁盘容量**2. **分区数量**3. **引导方式**4. **冗余和数据恢复**5. **兼容性**6. **安全性**7. **操作系统支持**8. 对比 国庆假期前补一篇 在一次扫描机械硬盘故障的问题&#xff0c;发现我本机SSD和机械硬盘的分类型不一样&#xff0c;分别是GPT和MBR&a…

Vue3轻松实现前端打印功能

文章目录 1.前言2.安装配置2.1 下载安装2.2 main.js 全局配置3.综合案例3.1 设置打印区域3.2 绑定打印事件3.3 完整代码4.避坑4.1 打印表格无边框4.2 单选框复选框打印不选中4.3 去除页脚页眉4.4 打印內容不自动换行1.前言 vue3 前端打印功能主要通过插件来实现。 市面上常用的…

C语言 | Leetcode C语言题解之第450题删除二叉搜索树中的节点

题目&#xff1a; 题解&#xff1a; struct TreeNode* deleteNode(struct TreeNode* root, int key){struct TreeNode *cur root, *curParent NULL;while (cur && cur->val ! key) {curParent cur;if (cur->val > key) {cur cur->left;} else {cur c…

telnet发送邮件教程:安全配置与操作指南?

telnet发送邮件的详细步骤&#xff1f;怎么用telnet命令发邮件&#xff1f; 尽管现代邮件客户端和服务器提供了丰富的功能和安全性保障&#xff0c;但在某些特定场景下&#xff0c;了解如何使用telnet发送邮件仍然是一项有价值的技能。AokSend将详细介绍如何安全配置和操作tel…

github/git密钥配置与使用

零、前言 因为要在ubuntu上做点东西&#xff0c;发现git clone 的时候必须输账户密码&#xff0c;后来发现密码是token&#xff0c;但是token一大串太烦了&#xff0c;忙了一天发现可以通过配置 公钥 来 替代 http 的 部署方式。 一、生成 ssh 密钥对 我们先测试下能不能 连接…

C语言复习概要(一)

本文 C语言入门详解&#xff1a;从基础概念到分支与循环1. C语言常见概念1.1 程序的基本结构1.2 变量作用域和存储类1.3 输入输出1.4 编译与运行 2. C语言中的数据类型和变量2.1 基本数据类型2.2 变量的声明与初始化2.3 常量与枚举 3. C语言的分支结构3.1 if语句3.2 if-else语句…

0108 Spring Boot启动过程

Spring Boot 的启动过程可以分为以下几个关键步骤&#xff1a; 1. SpringApplication 初始化 Spring Boot 应用的启动是通过调用 SpringApplication.run() 方法完成的。在这个过程中&#xff0c;Spring Boot 会通过 SpringApplication 类对应用进行初始化&#xff0c;包括设置…

国庆节快乐前端(HTML+CSS+JavaScript+BootStrap.min.css)

一、效果展示 二、制作缘由 最近&#xff0c;到了国庆节&#xff0c;自己呆在学校当守校人&#xff0c;太无聊了&#xff0c;顺便做一个小demo帮祖国目前庆生&#xff01;&#xff01;&#xff01; 三、项目目录结构 四、准备工作 (1)新建好对应的文件目录 为了方便&#xff…

Linux驱动开发(速记版)--设备树插件

第六十八章 设备树插件介绍 Linux 4.4之后引入了动态设备树&#xff0c;其中的设备树插件&#xff08;Device Tree Overlay&#xff09;是一种扩展机制&#xff0c;允许在运行时动态添加、修改或删除设备节点和属性。 设备树插件机制通过DTS&#xff08;设备树源文件&#xff0…

挖矿病毒记录 WinRing0x64.sys

之前下载过福晰pdf编辑器&#xff0c;使用正常。 某天发现机器启动后&#xff0c;过个几分钟(具体为5min)会自动运行几个 cmd 脚本(一闪而过)&#xff0c;但是打开任务管理器没有发现异常程序&#xff08;后面发现病毒程序伪装成System系统程序&#xff0c;见下图&#xff09;…

SpringCloud Config配置中心 SpringCloud Bus消息总线

一、SpringCloud Config 使用git储存配置信息 1&#xff09;什么是 SpringCloud Config项目实现的目标是将配置文件从本地项目中抽出来放到git仓库中&#xff0c;项目启动时自动从git仓库中取配置文件。 但是本地项目不直接和git仓库通信&#xff0c;而是通过配置服务器中转。…

python如何查询函数

1、通用的帮助函数help() 使用help()函数来查看函数的帮助信息。 如&#xff1a; import requests help(requests) 会有类似如下输出&#xff1a; 2、查询函数信息 ★查看模块下的所有函数&#xff1a; dir(module_name) #module_name是要查询的函数名 如&#xff1a; i…

vmvare虚拟机centos 忘记超级管理员密码怎么办?

vmvare虚拟机centos 忘记超级管理员密码怎么办?如何重置密码呢? 一、前置操作 重启vmvare虚拟机的过程中,长按住Shift键 选择第一个的时候,按下按键 e 进入编辑状态。 然后就会进入到类似这个界面中。 在下方界面 添加 init=/bin/sh,然后按下Ctrl+x进行保存退出。 init=/bi…

iPhone、iPad、iOS储存空间不足,瘦身终极方法

如果你实在是需要瘦身&#xff0c;但是确实没有什么可以删除了&#xff0c;也不想备份到其他地方&#xff0c;就这样做。 删除不需要的自带应用。 你可以删除FaceTime、股票、等app&#xff0c;但是不要删除你需要的app。 删除的界限是这样的&#xff1a;你永远都不可能使用…

OceanBase企业级分布式关系数据库

简介 OceanBase 数据库是阿里巴巴和蚂蚁集团不基于任何开源产品&#xff0c;完全自研的原生分布式关系数据库软件&#xff0c;在普通硬件上实现金融级高可用&#xff0c;首创“三地五中心”城市级故障自动无损容灾新标准&#xff0c;具备卓越的水平扩展能力&#xff0c;全球首…

Git版本控制工具--关于命令

Git版本控制工具 学习前言 在项目开发中&#xff0c;总是需要多个人同时对一个项目进行修改&#xff0c;如何高效快速地进行修改&#xff0c;且控制各自修改的版本不会和他人的进行重叠&#xff0c;这就需要用到Git分布式版本控制器了 作用 解决了一致性&#xff0c;并发性…

CSS 圆形边框与阴影

目录 1. 圆角边框 1.1 正圆 1.2 圆角矩形 1.3 任意圆角 1.4 某个圆角 2. 盒子阴影 3. 文字阴影 1. 圆角边框 1.1 正圆 1.2 圆角矩形 1.3 任意圆角 1.4 某个圆角 2. 盒子阴影 3. 文字阴影

Megabit兆比特10月比特币激增做好准备-最新加密货币新闻

Kaiko Research最近的分析表明&#xff0c;交易员正在积极为潜在的强劲表现做好准备特币(BTC)比今年十月。目前&#xff0c;BTC的交易价格为60800美元&#xff0c;在测试了60000美元的支撑位后&#xff0c;最近上涨了800美元。Megabit兆比特自成立以来,Megabit凭借用户友好的界…

【Spring】运行Spring Boot项目,请求响应流程分析以及404和500报错

1. 运行项目 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Appl…