Spring Cloud Stream 3.x+kafka 3.8整合

Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接

  • 前言
  • 一、如何看官方文档(有深入了解需求的人)
  • 二、kafka的安装
    • tar包安装
    • docker安装
  • 三、代码中集成
    • 创建一个测试topic:test
    • producer代码
    • producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
    • Consumer代码
    • Consumer 配置
    • Consumer 2的代码和配置
  • 四、测试
  • 五、结语

前言

上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。

由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。

官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。

一、如何看官方文档(有深入了解需求的人)

1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程
在这里插入图片描述
2.配置建议看中文版本
在这里插入图片描述

二、kafka的安装

tar包安装

  1. 下载链接:kafka_2.13-3.8.0.tgz

  2. 选择一个合适的位置解压

    tar -zxvf kafka_2.12-3.8.0.tgz
    
  3. 启动自带的zookeeper(后台启动)

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  4. 修改kafka server的配置文件,便于外网能够访问
    找到bin\config目录下的server.properties文件
    修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://172.16.72.133:9092
    
  5. 启动kafka server(后台启动)

    nohup bin/kafka-server-start.sh config/server.properties &
    
  6. 稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
    首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    

    编辑这些新文件并设置如下属性:

    config/server-1.properties:broker.id=1listeners=PLAINTEXT://:9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2listeners=PLAINTEXT://:9094log.dir=/tmp/kafka-logs-2
    

    broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
    然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    

    但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。

docker安装

  1. 拉取镜像

    docker pull apache/kafka:3.8.0
    
  2. 启动

    docker run -p -d 9092:9092 apache/kafka:3.8.0
    

三、代码中集成

创建一个测试topic:test

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。

producer代码

@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);@Autowiredprivate StreamBridge streamBridge;@RequestMapping("/test1")public void testOne() {Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));streamBridge.send("broadcastMessage-out-0", msg);}
}

自定义消息体SimpleMsg,此类不需要序列化

@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{private String msg;private String time;
}

producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)

key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String

spring:cloud:stream:kafka:binder:##kafka的server地址brokers: 172.16.72.133:9092##如果topic不存在则创建auto-create-topics: trueauto-add-partitions: true #自动分区min-partition-count: 1 #最小分区##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误configuration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-out-0:destination: testcontent-type: application/json

Consumer代码

 @Beanpublic Consumer<Message<SimpleMsg>> broadcastMessage() {return msg -> {log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());};}

Consumer 配置

项目中有更详细的配置,这里为了测试用的简化版

spring:cloud:stream:function:definition: broadcastMessagekafka:binder:brokers: 172.16.72.133:9092auto-create-topics: trueconfiguration:key:serializer: org.apache.kafka.common.serialization.ByteArraySerializerbindings:broadcastMessage-in-0:destination: testgroup: test-topic-accountcontent-type: application/json

Consumer 2的代码和配置

项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。

四、测试

生产者发送两个消息
在这里插入图片描述
两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:
在这里插入图片描述
friend模块消费者:
在这里插入图片描述

五、结语

到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。

本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/445603.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot+Vue的疫苗预约接种管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

DELL R720服务器阵列数据恢复,磁盘状态为Foreign

服务器无法正常进入系统&#xff0c;物理磁盘状态变成了Foreign 虚拟磁盘状态变成了Failed 阵列已经丢失了&#xff0c;需要手工强制导入外部配置 单击 Main Menu 屏幕上的 Configuration Management。单击 Manage Foreign Configuration 单击 Preview Foreign Configurati…

60. 排列序列【回溯】

文章目录 60. 排列序列解题思路Go代码 60. 排列序列 60. 排列序列 给出集合 [1,2,3,...,n]&#xff0c;其所有元素共有 n! 种排列。 按大小顺序列出所有排列情况&#xff0c;并一一标记&#xff0c;当 n 3 时, 所有排列如下&#xff1a; “123”“132”“213”“231”“31…

VMDK 0X80BB0005 VirtualBOX虚拟机错误处理-数据恢复——未来之窗数据恢复

打开虚拟盘文件in7.vmdk 失败. Could not get the storage format of the medium 7\win7.vmdk (VERR_NOT_SUPPORTED). 返回 代码:VBOX_E_IPRT_ERROR (0X80BB0005) 组件:MediumWrap 界面:IMedium {a a3f2dfb1} 被召者:IVirtualBox {768 cd607} 被召者 RC:VBOX_E_OBJECT_NOT_F…

Qt基础对话框QDialog

模态显示对话框 调用exec方法可以使得对话框模态显示&#xff0c;但是一个阻塞函数 [virtual slot] int QDialog::exec() 对话框的三个槽函数 accept [virtual slot] void QDialog::accept(); reject [virtual slot] void QDialog::reject() done [virtual slot] void QDia…

Nginx从入门到实战(八):版本平滑无感知,不停机升级

一、查看旧版本信息 可以通过nginx -V命令&#xff0c;来查看当前nginx的版本信息&#xff0c;和配置参数。 [rootnb001 sbin]# nginx -V -bash: nginx: command not found [rootnb001 sbin]# ./nginx -V nginx version: nginx/1.20.1 built by gcc 4.8.5 20150623 (Red Hat …

Spring Boot读取resources目录下文件(打成jar可用),并放入Guava缓存

1、文件所在位置&#xff1a; 2、需要Guava依赖&#xff1a; <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>23.0</version></dependency>3、启动时就读取放入缓存的代码&#xf…

gaussdb hccdp理论考试总结

判断题1分&#xff0c;单选题2分&#xff0c;多选题3分 共50道题&#xff0c;满分100分&#xff0c;60分通过。 理论考试知识点占比&#xff1a; 理论考试参考策略&#xff1a; 1.7张PPT看一遍 2.思考题做一遍 3.模拟题做一遍 4.7张PPT再看一遍 5.考题知识点过一遍 6.考试前一…

ZYNQ使用XGPIO驱动外设模块(前半部分)

目录 目录 一、新建BD文档&#xff0c;添加ZYNQ处理器 1.BD文档: 2.在Vivado中&#xff0c;BD文件的生成过程通常包括以下步骤&#xff1a; 1)什么是Tcl Console: 3.PL部分是FPGA可编程逻辑部分&#xff0c;它提供了丰富的IO资源&#xff0c;可以用于实现各种硬件接口和功…

QT 连接SQL SEVER 之后无法读取浮点和整型

1、ODBC Driver 的版本要对应上。 if (!strDbDirPath.isEmpty())m_strDbDirPath strDbDirPath;m_strDatabaseName strDatabaseName;if (m_database.isOpen() || m_bConnected){qDebug() << QString("QODBC:已经连接成功&#xff01;") << "\n&quo…

Power Pivot, PowerView和PowerBI在产品宣传,功能,及本质上有什么不同?

微软的Power Pivot、Power View和Power BI是三个不同的数据分析和商业智能工具&#xff0c;它们在产品宣传、功能和本质上有所区别&#xff0c;并且各自适用于不同的场景。 1. Power Pivot Power Pivot是一种数据建模技术&#xff0c;用于在Excel中创建数据模型&#xff0c;建…

Halcon 3D应用 - 胶路提取

1. 需求 本文基于某手环&#xff08;拆机打磨处理&#xff09;做的验证性工作&#xff0c;为了项目保密性&#xff0c;只截取部分数据进行测试。 这里使用的是海康3D线激光轮廓相机直线电机的方式进行的高度数据采集&#xff0c;我们拿到的是高度图亮度图数据。 提取手环上的胶…

Java面向对象编程--高级

目录 一、static关键字 1.1 静态变量 1.2 静态内存解析 1.3 static的应用与练习 二、单例设计模式 2.1 单例模式 2.2 如何实现单例模式 三、代码块 3.1 详解 3.2 练习&#xff0c;测试 四、final关键字 五、抽象类与抽象方法 5.1 abstract 5.2 练习 六、接口 6.…

d3dcompiler_47.dll缺失怎么修复,马上教你六种靠谱的方法

在使用计算机的过程中&#xff0c;我们可能会遇到各种问题&#xff0c;其中一个就是某些dll文件缺失。比如d3dcompiler_47.dll&#xff0c;这个文件是DirectX的一部分&#xff0c;主要用于编译DirectX的着色器代码。当这个文件缺失时&#xff0c;一些程序就无法正常运行了&…

typescript使用webpack打包编译问题

解决方案&#xff1a;在webpack.config.js中的mdule.exports中设置mode。 再次运行npm run start即可。

pytest的基础入门

pytest判断用例的成功或者失败 pytest识别用例失败时会报AssertionError或者xxxError错误&#xff0c;当捕获异常时pytest无法识别到失败的用例 pytest的fixture夹具 pytest的参数化 #coding:utf-8 import pytestfrom PythonProject.pytest_test.funcs.guess_point import ge…

GAN(Generative Adversarial Nets)

GAN(Generative Adversarial Nets) 引言 GAN由Ian J. Goodfellow等人提出&#xff0c;是Ian J. Goodfellow的代表作之一&#xff0c;他还出版了大家耳熟能详的花书&#xff08;Deep Learning深度学习&#xff09;&#xff0c;GAN主要的思想是同时训练两个模型&#xff0c;生成…

【重磅升级】基于大数据的股票量化分析与预测系统

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长 QQ 名片 :) 1. 项目简介 伴随全球经济一体化和我国经济的快速发展&#xff0c;中国股票市场对世界经济的影响力不断攀升&#xff0c;中国股市已成为全球第二大股票交易市场。在当今的金融市场中&#xff0c;股票价格的波动…

只需5步,就可以使用大语言模型(LLM)打造高效的应用

01 概述 随着人工智能技术的飞速发展&#xff0c;大型语言模型&#xff08;LLM&#xff09;正逐渐成为各个领域的得力助手。从最初的文本理解、生成到翻译&#xff0c;这些模型在自然语言处理&#xff08;NLP&#xff09;中的出色表现&#xff0c;让它们在聊天机器人、虚拟助…

微调大语言模型——超详细步骤

微调一个语言模型&#xff0c;其实就是在一个已经训练过的模型上&#xff0c;继续用新数据进行训练&#xff0c;帮助模型更好地理解和处理这个新的任务。可以把这个过程想象成教一个已经懂很多道理的人去解决新的问题。 这个过程可以分为五个简单的步骤&#xff1a; 加载预训练…