[ Spring ] Spring Cloud Alibaba Message Stream Binder for RocketMQ 2025

文章目录

          • Introduce
          • Project Structure
          • Declare Plugins and Modules
          • Apply Plugins and Add Dependencies
          • Sender Properties
          • Sender Application
          • Sender Controller
          • Receiver Properties
          • Receiver Application
          • Receiver Message Handler
          • Congratulations
          • Automatically Send Message By Interval
          • Type Adapter for Payload
          • Send Message Model as JSON
          • Receive JSON as Message Model

Introduce

spring-cloud-starter-stream have a great change since version 4.x

most annotations like @EnableBinding @Input @Output @StreamListener were all removed

this blog is about stream-rocketmq, but also fit for stream-kafaka

just migrate dependency from rocketmq to kafaka

Project Structure
  • stream-binder-sender : rocket message sender
  • stream-binder-receiver : rocket message receiver
Declare Plugins and Modules
pluginManagement {repositories {gradlePluginPortal()google()mavenCentral()}
}dependencyResolutionManagement {repositoriesMode = RepositoriesMode.PREFER_SETTINGSrepositories {gradlePluginPortal()google()mavenCentral()}
}buildscript {repositories {gradlePluginPortal()google()mavenCentral()}
}plugins {id("org.jetbrains.kotlin.jvm") version "2.0.21" apply falseid("org.jetbrains.kotlin.kapt") version "2.0.21" apply falseid("org.jetbrains.kotlin.plugin.spring") version "2.0.21" apply falseid("org.springframework.boot") version "3.4.1" apply false
}include("stream-binder-sender")
include("stream-binder-receiver")
Apply Plugins and Add Dependencies
plugins {id("org.jetbrains.kotlin.jvm")id("org.jetbrains.kotlin.kapt")id("org.jetbrains.kotlin.plugin.spring")id("org.springframework.boot")
}java {toolchain {languageVersion = JavaLanguageVersion.of(17)}
}dependencies {val springBootVersion = "3.4.1"val springCloudVersion = "4.2.0"val springCloudAlibabaVersion = "2023.0.3.2"// commonsapi("io.github.hellogoogle2000:kotlin-commons:1.0.19")// kotlinapi("org.jetbrains.kotlin:kotlin-reflect:2.0.21")// springapi("org.springframework.boot:spring-boot-starter:$springBootVersion")api("org.springframework.boot:spring-boot-starter-web:$springBootVersion")api("org.springframework.cloud:spring-cloud-starter-bootstrap:$springCloudVersion")// spring cloud stream binderapi("com.alibaba.cloud:spring-cloud-starter-stream-rocketmq:$springCloudAlibabaVersion")
}
Sender Properties

configTopicSender-out is the name for customized output binding object

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Sender Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderSenderApplicationfun main(args: Array<String>) {runApplication<StreamBinderSenderApplication>(*args)
}
Sender Controller

the binding name for sending should be same with output name in properties

package x.spring.hello.controllerimport org.springframework.beans.factory.annotation.Autowired
import org.springframework.cloud.stream.function.StreamBridge
import org.springframework.messaging.support.MessageBuilder
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController@RestController
class MessageSendController {@Autowiredprivate lateinit var bridge: StreamBridge@GetMapping("send")fun send(): String {val payload = "config"val message = MessageBuilder.withPayload(payload).build()bridge.send("configTopicProducer-out", message)return "send successfully"}
}
Receiver Properties

plainTextConsumer is the name of message handler function

remember it and you should implement it by yourself

you can define multiple message handler functions, and split with ,

plainTextConsumer-in-0 is the name of input binding object

its format is constrained to format of <definition>-in-<index>

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876
Receiver Application
package x.spring.helloimport org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication@SpringBootApplication
class StreamBinderReceiverApplicationfun main(args: Array<String>) {runApplication<StreamBinderReceiverApplication>(*args)
}
Receiver Message Handler

function name correspond to properties specified by spring.cloud.function.definition property

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Congratulations

now, you have get known about basic usage of message binder

do not modify demos above, it may cause a failure, and waste lots of time

try your own ways, let them run out first

let us try some advanced way, after achieve goals above

Automatically Send Message By Interval

register a supplier object to automatically generate heartbeat message

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
import org.springframework.util.MimeTypeUtils
import java.util.function.Supplier@Component
class MessageSupplierObject {@Beanfun heartPacketProducer(): Supplier<Message<String>> {return Supplier<Message<String>> {println("send heart packet message")val payload = "heart"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build()return@Supplier message}}
}

update properties of sender project, add a output binding object named heartPacketProducer

# service
server.port=10003
spring.application.name=stream-binder-sender
# stream binder
spring.cloud.function.definition=heartPacketProducer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configTopicProducer-out.destination=topic-config
spring.cloud.stream.bindings.configTopicProducer-out.content-type=text/plain
spring.cloud.stream.bindings.configTopicProducer-out.consumer.concurrency=100
spring.cloud.stream.bindings.heartPacketProducer-out-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketProducer-out-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketProducer-out-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketProducer-out-0.consumer.concurrency=100
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

update properties of receiver project, add a input binding object named heartPacketConsumer

# service
server.port=10004
spring.application.name=stream-binder-receiver
# stream binder
spring.cloud.function.definition=configTopicConsumer;heartPacketConsumer
spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configTopicConsumer-in-0.destination=topic-config
spring.cloud.stream.bindings.configTopicConsumer-in-0.content-type=text/plain
spring.cloud.stream.bindings.heartPacketConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.heartPacketConsumer-in-0.destination=topic-heart
spring.cloud.stream.bindings.heartPacketConsumer-in-0.content-type=text/plain
spring.cloud.stream.rocketmq.binder.name-server=localhost:9876

register message handler function for receiver project

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.stereotype.Component
import java.util.function.Consumer@Component
class MessageConsumerObject {@Bean("heartPacketConsumer")fun heartPacketConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive heart packet message: $payload")}}@Bean("configTopicConsumer")fun configTopicConsumer(): Consumer<Message<String>> {return Consumer<Message<String>> { message ->val payload = message.payloadprintln("consumer receive config topic message: $payload")}}
}
Type Adapter for Payload

this enable your auto send and receive advanced object like class/json/xml

put this adapter file into both sender project and receiver object

package x.spring.hello.componentimport org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import x.kotlin.commons.serialize.JSON.fromJson
import x.kotlin.commons.serialize.JSON.toJson
import x.spring.hello.model.ConfigModel
import java.util.function.Function@Component
class MessageModelAdapter {@Beanfun configModelConvertor1(): Function<ConfigModel, String> {return Function { it.toJson() }}@Beanfun configModelConvertor2(): Function<String, ConfigModel> {return Function { it.fromJson(ConfigModel::class.java) }}
}
Send Message Model as JSON
@GetMapping("send2")
fun send2(): String {val payload = ConfigModel()payload.username = "admin"payload.password = "123456"val message = MessageBuilder.withPayload(payload).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build()bridge.send("configModelProducer-out", message)return "send successfully"
}
spring.cloud.stream.bindings.configModelProducer-out.binder=rocketmq
spring.cloud.stream.bindings.configModelProducer-out.destination=topic-config-model
spring.cloud.stream.bindings.configModelProducer-out.content-type=application/json
spring.cloud.stream.bindings.configModelProducer-out.consumer.concurrency=100
Receive JSON as Message Model
@Bean
fun configModelConsumer(): Consumer<Message<ConfigModel>> {return Consumer<Message<ConfigModel>> { message ->val payload = message.payload.toJson()println("consumer receive config model message: $payload")}
}
spring.cloud.function.definition=configModelConsumer
spring.cloud.stream.bindings.configModelConsumer-in-0.binder=rocketmq
spring.cloud.stream.bindings.configModelConsumer-in-0.destination=topic-config-model
spring.cloud.stream.bindings.configModelConsumer-in-0.content-type=application/json

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/8227.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI赋能医疗:智慧医疗系统源码与互联网医院APP的核心技术剖析

本篇文章&#xff0c;笔者将深入剖析智慧医疗系统的源码架构以及互联网医院APP背后的核心技术&#xff0c;探讨其在医疗行业中的应用价值。 一、智慧医疗系统的核心架构 智慧医疗系统是一个高度集成的信息化平台&#xff0c;主要涵盖数据采集、智能分析、决策支持、远程医疗等…

mongoDB常见指令

即使我们自己开发用不到mongoDB&#xff0c;但是接手别人项目的时候&#xff0c;别人如果用了&#xff0c;我们也要会简单调试一下 虽然mongoDB用的不是sql语句&#xff0c;但语句的逻辑都是相似的&#xff0c;比如查看数据库、数据表&#xff0c;增删改查这些 我们下面以doc…

K8S部署DevOps自动化运维平台

持续集成&#xff08;CI&#xff09; 持续集成强调开发人员提交了新代码之后&#xff0c;立刻自动的进行构建、&#xff08;单元&#xff09;测试。根据测试结果&#xff0c;我 们可以确定新代码和原有代码能否正确地集成在一起。持续集成过程中很重视自动化测试验证结果&#…

SpringCloud系列教程:微服务的未来(十七)监听Nacos配置变更、更新路由、实现动态路由

前言 在微服务架构中&#xff0c;API 网关是各个服务之间的入口点&#xff0c;承担着路由、负载均衡、安全认证等重要功能。为了实现动态的路由配置管理&#xff0c;通常需要通过中心化的配置管理系统来实现灵活的路由更新&#xff0c;而无需重启网关服务。Nacos 作为一个开源…

Lua 环境的安装

1.安装Lua运行环境 本人采用的是在windows系统中使用cmd指令方式进行安装&#xff0c;安装指令如下&#xff1a; winget install "lua for windows" 也曾使用可执行程序安装过&#xff0c;但由于电脑是加密电脑&#xff0c;最后都已失败告终。使用此方式安装可以安…

03-画P封装(制作2D+添加3D)

画P封装的方法2D制作3D添加 使用P封装自己画0603格式的电阻的P封装1. 看规格书,找参数2. 创建一个新的P封装3. 灯泡两侧放焊盘4.设置焊盘大小和形状5.根据坐标定义中间间隔: L/2原则6. 画最外层丝印(丝印层直接围住即可)7.在平面的P封装上,添加3D立体封装库 立创商城下载P封装向…

libOnvif通过组播不能发现相机

使用libOnvif库OnvifDiscoveryClient类&#xff0c; auto discovery new OnvifDiscoveryClient(QUrl(“soap.udp://239.255.255.250:3702”), cb.Build()); 会有错误&#xff1a; end of file or no input: message transfer interrupted or timed out(30 sec max recv delay)…

高德开放平台:红绿灯倒计时与车车协同安全预警,开启出行新时代

近期&#xff0c;有幸参加了“高德开放平台第二期开发者开放日”。这次活动不仅有机会近距离了解高德地图的前沿技术动态和最新产品&#xff0c;还看到了高德开放平台在各个行业中的广泛应用。高德展厅里&#xff0c;每一处展示都让人感到震撼&#xff0c;仿佛置身于一个充满无…

C语言------指针从入门到精通

第一部分: 前言: 本篇文章主要划分为两大部分: 第一部分适合零基础的同学,主要学习了解指针的概念&#xff0c;对指针大概有个概念。如果你已经有基础,即可跳过第一部分的内容。 第二部分主要是分解指针的实现逻辑,通过19个例子,再结合代码公式把不同类型的指针及指针的应用详细…

JavaScript赋能智能网页设计

构建AI驱动的实时风格迁移系统 案例概述 本案例将实现一个基于深度学习的实时图像风格迁移系统&#xff0c;通过浏览器端神经网络推理实现以下高级特性&#xff1a; WebAssembly加速的ONNX模型推理 WebGL Shader实现的风格混合算法 WebRTC实时视频流处理 基于Web Workers的…

‌Windows系统cmd命令行创建vue项目

Windows系统cmd命令行创建vue项目 首先确保node.js已安装(也就是JavaScript运行时环境已安装)找到我们要创建项目的文件夹 直接在路径上输入cmd 按Enter(回车键)后&#xff0c;弹出命令行窗口在命令行窗口输入npm init vuelatest,执行该命令&#xff0c;将会安装并执行Vue项目…

[C语言日寄]exit函数的使用及其拓展

【作者主页】siy2333 【专栏介绍】⌈c语言日寄⌋&#xff1a;这是一个专注于C语言刷题的专栏&#xff0c;精选题目&#xff0c;搭配详细题解、拓展算法。从基础语法到复杂算法&#xff0c;题目涉及的知识点全面覆盖&#xff0c;助力你系统提升。无论你是初学者&#xff0c;还是…

GestureDetector组件的功能与用法

文章目录 1 概念介绍2 使用方法3 示例代码 我们在上一章回中介绍了ListView响应事件的内容,本章回中将介绍GestureDetector Widget.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1 概念介绍 我们在这里介绍的GestureDetector是一个事件响应Widget,它可以响应双击事件&…

Java Web-Cookie与Session

会话跟踪技术 会话跟踪技术是一种在 Web 应用程序中跟踪用户会话状态的机制&#xff0c;它允许服务器在多个请求之间识别和关联属于同一用户的请求&#xff0c;以便在整个会话过程中保持用户相关的信息。以下是几种常见的会话跟踪技术&#xff1a; Cookie 概念&#xff1a;Cook…

线性回归、协同过滤、基于内容过滤、主成分分析(PCA)

线性回归 使用item特征用户打分标签线性回归训练&#xff0c;最小化成本函数&#xff0c;得到每个用户的参数 协同过滤 协同过滤基于一个核心假设&#xff1a;相似的用户会有相似的兴趣&#xff0c;因此可以通过分析相似用户历史行为&#xff0c;来预测当前用户可能感兴趣的i…

WPS数据分析000009

一、函数与数据透视表统计数据时效率差异 函数 F4绝对引用 数据透视表 二、数据透视表基础操作 数据透视表&#xff1a;一个快速的生成报表的工具 显示详细信息 方式一; 方式二&#xff1a; 移动数据透视表 删除数据透视表 复制粘贴数据透视表 留足空间&#xff0c;否则拖动字…

idea实用设置

一.View 1.配置工具包方便按 二.File->Settings 点开设置然后进行后面的配置 1.这个看个人习惯 2.更新 3.更改菜单字体大小 4.鼠标控制字体大小 5.文件默认字体大小 6. 代码的智能提示功能 7.自动导包 8.编码 9.取消双击shift搜索

CE-PBFT:大规模联盟区块链的高可用一致性算法

摘要 区块链已广泛应用于农产品溯源、供应链管理、物流运输等各个领域。作为联盟区块链不可缺少的组成部分&#xff0c;共识算法保证了网络中每个节点的一致性和可信度。然而&#xff0c;由于通信过程的复杂性&#xff0c;现有的大规模联盟区块链场景中的共识算法存在低系统吞…

基于Springboot用axiospost请求接收字符串参数为null的解决方案

问题 ​ 今天在用前端 post 请求后端时发现&#xff0c;由于是以 Json对象的形式传输的&#xff0c;后端用两个字符串形参无法获取到对应的参数值 前端代码如下&#xff1a; axios.post(http://localhost:8083/test/postParams,{a: 1, b:2} ,{Content-Type: application/jso…

【云安全】云原生-K8S-简介

K8S简介 Kubernetes&#xff08;简称K8S&#xff09;是一种开源的容器编排平台&#xff0c;用于管理容器化应用的部署、扩展和运维。它由Google于2014年开源并交给CNCF&#xff08;Cloud Native Computing Foundation&#xff09;维护。K8S通过提供自动化、灵活的功能&#xf…