【实战】Spring Cloud Stream 3.1+整合Kafka

文章目录

    • 前言
    • 新版版本优势
    • 实战演示
      • 增加maven依赖
      • 增加applicaiton.yaml配置
      • 新增Kafka通道消费者
      • 新增发送消息的接口
    • 实战测试
      • postman发送一个正常的消息
      • postman发送异常消息

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:kafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:binder:brokers: ${spring.kafka.bootstrap-servers}binders:kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub function:definition: inputChannel,outputChannelbindings:inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainoutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目

此时消息生产者为:

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

此时消息消费者为:

@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息Payloa:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency>
</dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

增加applicaiton.yaml配置

spring:#kafkakafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:  # kafka配置binder:brokers: ${spring.kafka.bootstrap-servers}auto-add-partitions: true #自动分区auto-create-topics: true #自动创建主题replication-factor: 3 #副本min-partition-count: 3 #最小分区bindings:outputChannel-out-0:producer:# 无限制重发不产生消息丢失retries: Integer.MAX_VALUE#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1acks: allmin:insync:replicas: 3 #感知副本数inputChannel-in-0:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量recovery-interval: 3000  #3s 重连auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡auto-commit-offset: false   #手动提交偏移量enable-dlq: true  # 开启 dlq队列dlq-name: test-kafka-topic.dlqdeserializationExceptionHandler: sendToDlq #异常加入死信binders: # 与外部mq组件绑定kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub #默认绑定function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)definition: inputChannel;outputChannel;dlqChannelbindings: # 通道绑定inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10soutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目dlqChannel-in-0:binder: kafkahubdestination: test-kafka-topic.dlqgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10sdlqChannel-out-0:binder: kafkahubdestination: test-kafka-topic.dlqcontent-type: text/plainproducer:partition-count: 3 #分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;/*** KafkaCustomer* @author senfel* @version 1.0* @date 2024/6/18 15:22*/
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息:" + message.getPayload());System.out.println("接收到消息:" + message.getHeaders());if(message.getPayload().contains("9")){boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());System.err.println("向dlqChannel发送消息:"+send);}};}/*** dlqChannel 死信消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> dlqChannel(){return message -> {System.out.println("死信dlqChannel接收到消息:" + message.getPayload());System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}
}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/356681.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

推荐4款电脑软件,简单已入手,关键是免费,建议收藏

拖把更名器 拖把更名器是一款功能强大的文件名称修改工具&#xff0c;主要用于批量进行文件更名和整理音乐文件标签。该软件结合资源管理器右键菜单&#xff0c;支持多种文件格式和操作方式&#xff0c;包括MP3、RM、RMVB、WMA等标签更名&#xff0c;文件名、标签内码转换、繁简…

【SpringSecurity】认证与鉴权框架SpringSecurity——认证

目录 SpringSecurity介绍特性CSRF攻击攻击模式攻击原理预防手段 XSS攻击攻击模式危害预防手段 SpringSecurity预防CSRF攻击SpringSecurity预防XSS攻击SpringSecurity与OAuth2的关系SpringSecurity的核心功能 代码实战依赖定义一个接口Redis工具类响应类直接运行工具类认证业务密…

VERYCLOUD睿鸿股份亮相亚马逊云科技中国峰会2024

5月30日&#xff0c;为期两天的亚马逊云科技中国峰会在上海世博中心圆满落幕。 多位大咖现场分享&#xff0c;生成式AI时代的数据战略&#xff0c;企业级AI应用&#xff0c;最新技术、产品重磅发布&#xff0c;创新行业解决方案 …… 作为亚马逊云科技的生态合作伙伴&#x…

Android面试题:App性能优化之Java和Kotlin常见的数据结构

本文首发于公众号“AntDream”&#xff0c;欢迎微信搜索“AntDream”或扫描文章底部二维码关注&#xff0c;和我一起每天进步一点点 Java常见数据结构特点 ArrayList ArrayList底层是基于数组实现add、删除元素需要进行元素位移耗性能&#xff0c;但查找和修改块适合不需要频…

GPT-4o的视觉识别能力,将绕过所有登陆的图形验证码

知识星球&#x1f517;除了包含技术干货&#xff1a;《Java代码审计》《Web安全》《应急响应》《护网资料库》《网安面试指南》还包含了安全中常见的售前护网案例、售前方案、ppt等&#xff0c;同时也有面向学生的网络安全面试、护网面试等。 我们来看一下市面上常见的图形验证…

状态压缩DP——AcWing 291. 蒙德里安的梦想

状态压缩DP 定义 状态压缩DP是一种利用二进制数来表示状态的动态规划算法。它通过将状态压缩成一个整数&#xff0c;从而减少状态数量&#xff0c;提高算法效率。 运用情况 状态压缩DP通常用于解决具有状态转移和最优解性质的问题&#xff0c;例如组合优化、图论、游戏等问…

Vue82-组件内路由守卫

一、组件内路由守卫的定义 在一个组件里面去写路由守卫&#xff0c;而不是在路由配置文件index.js中去写。 此时&#xff0c;该路由守卫是改组件所独有的&#xff01; 只有通过路由规则进入的方式&#xff0c;才会调这两个函数&#xff0c;否则&#xff0c;若是只是用<Ab…

LogicFlow 学习笔记——9. LogicFlow 进阶 节点

LogicFlow 进阶 节点&#xff08;Node&#xff09; 连线规则 在某些时候&#xff0c;我们可能需要控制边的连接方式&#xff0c;比如开始节点不能被其他节点连接、结束节点不能连接其他节点、用户节点后面必须是判断节点等&#xff0c;想要达到这种效果&#xff0c;我们需要为…

【经验分享】RT600 serial boot mode测试

【经验分享】RT600 serial boot mode测试 一&#xff0c; 文档描述二&#xff0c; Serial boot mode测试2.1 evkmimxrt685_gpio_led_output 工程测试2.2 evkmimxrt685_dsp_hello_world_usart_cm33工程测试 一&#xff0c; 文档描述 RT600的启动模式共支持4种&#xff1a; 1&am…

C++设计模式——Composite组合模式

一&#xff0c;组合模式简介 真实世界中&#xff0c;像企业组织、文档、图形软件界面等案例&#xff0c;它们在结构上都是分层次的。将系统分层次的方式使得统一管理和添加不同子模块变得容易&#xff0c;在软件开发中&#xff0c;组合模式的设计思想和它们类似。 组合模式是…

数据库设计概述-数据库设计内容、数据库设计方法(基于E-R模型的规范设计方法)

一、引言 如何利用关系数据库理论设计一个满足应用系统需求的数据库 二、数据库设计内容 1、数据库设计是基于应用系统需求分析中对数据的需求&#xff0c;解决数据的抽象、数据的表达和数据的存储结构等问题 2、其目标是设计出一个满足应用要求、简洁、高效、规范合理的数…

Redis 集群 - 数据分片算法

前言 广义的集群&#xff1a;只要是多个机器构成了一个分布式系统&#xff0c;都可以被称为集群。 狭义的集群&#xff1a;redis 的集群模式&#xff0c;这个集群模式下&#xff0c;主要是解决存储空间不足的问题。 Redis 集群 redis 采用主从结构&#xff0c;可以提高系统的可…

「动态规划」如何求最长湍流子数组的长度?

78. 最长湍流子数组https://leetcode.cn/problems/longest-turbulent-subarray/description/ 给定一个整数数组arr&#xff0c;返回arr的最长湍流子数组的长度。如果比较符号在子数组中的每个相邻元素对之间翻转&#xff0c;则该子数组是湍流子数组。更正式地来说&#xff0c;…

从开源EPR产品Odoo学习

前言 一个先进、敏捷、经济高效、可快速扩展的Odoo免费开源企业信息化解决方案&#xff0c;让企业获得适应未来发展的长期创新和增长能力。 Odoo 的免费开源模式 让我们可利用无数开发人员和业务专家&#xff0c;在短短数年内&#xff0c;打造数百款应用。凭借强大的技术基础&…

苹果智能和人工智能最大化

苹果智能和人工智能最大化 除了苹果公司&#xff0c;还没有人真正使用过苹果的智能功能。它要到秋天才会分阶段发布&#xff0c;即使到那时&#xff0c;它也无法在80%或90%的iPhone安装基础上运行&#xff0c;因为它需要只有iPhone 15 Pro才能使用的设备上处理功能。没有什么能…

现在这个行情,又又又要开始准备面试了~~

亲爱的程序员朋友们: 这些资料曾经帮助过许多有志之士顺利拿下抖音、快手、阿里等大厂的Offer&#xff0c;现在也希望它们能为你的面试旅程助力&#xff01; 关注【程序员世杰】回复【1024】惊喜等你来拿&#xff01; 截图 关注【程序员世杰】回复【1024】惊喜等你来拿&#xf…

车辆轨迹预测系列 (三):nuScenes数据集详细介绍-1

车辆轨迹预测系列 (三)&#xff1a;nuScenes数据集详细介绍-1 文章目录 车辆轨迹预测系列 (三)&#xff1a;nuScenes数据集详细介绍-1一、数据集准备1、解压2、安装nuscenes-devkit3、介绍 二、架构内容解释1、category 类别2、attribute 属性3、visibility 可见性4、instance …

包含网关的概念及案例演示

包容网关 知识点讲解 包容网关可以看作排他网关和并行网关的结合体。与排他网一样&#xff0c;可以在外出顺序流上定义条件&#xff0c;但与排他网关不同的是&#xff0c; 进行决策判读时&#xff0c;包容网关所有条件为true的后继分支都会被依次执行。如果所有分支条件都为fa…

IMU用于飞行坐姿校正

为了提升长途飞行的舒适度并预防乘客因不良坐姿导致的身体不适&#xff0c;来自荷兰上海两所大学的研究团队携手开发出一种创新的“舒适穿戴”设备&#xff0c;专为识别飞行中的坐姿设计。 研究团队制作了两种原型设备&#xff1a;一种追求极致舒适&#xff0c;另一种为紧身设…

干货!!SSAS模型刷新步骤

白茶在上一篇文章PowerBI迁移到SSAS向小伙伴们介绍了如何将已经开发好的PowerBI模型迁移到SSAS整个操作过程&#xff0c;与此同时也带来了新的问题&#xff0c;那就是SSAS的模型该如何刷新呢&#xff1f; 配套工具 SSMS Visual Studio SSIS SSIS[1]的全称是SQL Server Inte…