Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

1.引入RocketMQ依赖:首先,在pom.xml文件中添加RocketMQ的依赖:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.配置RocketMQ连接信息:在application.propertiesapplication.yml中配置RocketMQ的连接信息,包括Name Server地址等:

spring:application:name: ${sn.publish}cloud:stream:rocketmq:binder:name-server: ${rocket-mq.name-server}bindings:output:producer:group: testSocketsync: truebindings:output:destination: test-topiccontent-type: application/json

3.消息发布组件

@Component
public class MqSourceComponent {@ResourceSource source;public void publishNotify(SampleNotifyDTO notify) {source.output().send(MessageBuilder.withPayload(notify).build());}
}

4.消息发布控制器

@RestController
@Api(tags = "rocketmq")
public class MqController {@ResourceMqSourceComponent mq;@ApiOperation(value = "测试发布消息")@PostMapping("test-publish")public JsonVO<String> testSend(SampleNotifyDTO notify) {mq.publishNotify(notify);return JsonVO.success("消息已发送");}
}

项目结构:

接下来是websocket模块的搭建

1. 依赖添加

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version> <!-- 版本号根据实际情况调整 -->
</dependency>

2.application.yml配置文件

server:port: ${sp.ws}
spring:application:name: ${sn.ws}cloud:stream:rocketmq:binder:name-server: ${rocket-mq.name-server}bindings:input:destination: test-topiccontent-type: application/jsongroup: testSocket

3.将应用程序绑定到消息代理

@EnableBinding(Sink.class): 这是Spring Cloud Stream的注解,它用于将应用程序绑定到消息代理(如Kafka、RabbitMQ等)。Sink.class是Spring Cloud Stream提供的预定义输入通道,允许你接收消息。通过这个注解,你的应用程序可以监听消息通道,并定义消息处理逻辑。

@SpringBootApplication
@EnableDiscoveryClient
@EnableBinding(Sink.class)
public class WsApplication {public static void main(String[] args) {SpringApplication.run(WsApplication.class, args);}}

4.消息订阅组件

监听消息通道中的消息,一旦有消息到达,就会触发listenNotify方法,该方法负责处理消息并通过chat服务发送响应。

@Component
@Slf4j
public class MqListenComponent {@ResourceChatService chat;@StreamListener(Sink.INPUT)public void listenNotify(SampleNotifyDTO notify) {log.info(notify.toString());chat.sendMessage(notify.getClientId(), notify);}
}

5.消息通知服务

package com.zeroone.star.ws.service;import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;@Component
@ServerEndpoint("/chat")
public class ChatService {/*** 连接会话池*/private static ConcurrentHashMap<String, Session> SESSION_POOL = new ConcurrentHashMap<>();@OnOpenpublic void onOpen(Session session) throws IOException {// 判断客户端对象是否存在if (SESSION_POOL.containsKey(session.getQueryString())) {CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "ID冲突,连接拒绝");session.getUserProperties().put("reason", closeReason);session.close();return;}// 将客户端对象存储到会话池SESSION_POOL.put(session.getQueryString(), session);System.out.println("客户端(" + session.getQueryString() + "):开启了连接");}@OnMessagepublic String onMessage(String msg, Session session) throws IOException {// 解析消息 ==> ID::消息内容String[] msgArr = msg.split("::", 2);// 处理群发消息,ID==all表示群发if ("all".equalsIgnoreCase(msgArr[0])) {for (Session one : SESSION_POOL.values()) {// 排除自己if (one == session) {continue;}// 发送消息one.getBasicRemote().sendText(msgArr[1]);}}// 指定发送else {// 获取接收方Session target = SESSION_POOL.get(msgArr[0]);if (target != null) {target.getBasicRemote().sendText(msgArr[1]);}}return session.getQueryString() + ":消息发送成功";}@OnClosepublic void onClose(Session session) {// 连接拒绝关闭会话Object reason = session.getUserProperties().get("reason");if (reason instanceof CloseReason) {CloseReason creason = (CloseReason) reason;if (creason.getCloseCode() == CloseReason.CloseCodes.CANNOT_ACCEPT) {System.out.println("拒绝客户(" + session.getQueryString() + "):关闭连接");return;}}// 从会话池中移除会话SESSION_POOL.remove(session.getQueryString());System.out.println("客户端(" + session.getQueryString() + "):关闭连接");}@OnErrorpublic void onError(Session session, Throwable throwable) {System.out.println("客户端(" + session.getQueryString() + ")错误信息:" + throwable.getMessage());}@SneakyThrowspublic void sendMessage(String id, Object message) {// 群发if ("all".equalsIgnoreCase(id)) {for (Session one : SESSION_POOL.values()) {// 发送消息one.getBasicRemote().sendText(JSONUtil.toJsonStr(message));}}// 指定发送else {// 获取接收方Session target = SESSION_POOL.get(id);if (target != null) {target.getBasicRemote().sendText(JSONUtil.toJsonStr(message));}}}
}

项目结构:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/174790.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

轻量级 IDE 文本编辑器 Geany 发布 2.0

Geany 是功能强大、稳定、轻量的开发者专用文本编辑器&#xff0c;支持 Linux、Windows 和 macOS&#xff0c;内置支持 50 多种编程语言。 2005 年Geany 发布首个版本 0.1。上周四刚好是 Geany 诞生 18 周年纪念日&#xff0c;官方发布了 2.0 正式版以表庆祝。 下载地址&#…

正则表达式引擎比较(翻译自:A comparison of regex engines)

原文&#xff1a; A comparison of regex engines – Rust Leipzig 引言 正则表达式&#xff08;或简称regex&#xff09;通常用于模式搜索算法。 有许多不同的正则表达式引擎提供不同的表达式支持、性能约束和语言绑定。 基于 John Maddock 之前的工作 (regex comparison)和…

window11 更改 vscode 插件目录,释放C盘内存

由于经常使用vscode开发会安装一些代码提示插件&#xff0c;然后C盘内容会逐渐缩小&#xff0c;最终排查定位到vscode。这个吃内存不眨眼的家伙。 建议&#xff1a;不要把插件目录和vscode安装目录放在同一个位置&#xff0c;不然这样vscode更新后&#xff0c;插件也会消失。 v…

Git窗口打开vim后如何退出编辑(IDEA/Goland等编辑器)

最近在学习git高级操作过程中&#xff0c;遇到了一下问题&#xff1a; 我在学习Git合并多个commit为一个的时候&#xff0c;需要输入一个命令 git rebase -i HEAD~2 这说明已经是编辑模式了。当我写好后&#xff0c;我还按照原来在linux上的按下ESC键&#xff0c;但是只是光…

【数据结构】数组和字符串(四):特殊矩阵的压缩存储:稀疏矩阵——三元组表

文章目录 4.2.1 矩阵的数组表示4.2.2 特殊矩阵的压缩存储a. 对角矩阵的压缩存储b~c. 三角、对称矩阵的压缩存储d. 稀疏矩阵的压缩存储——三元组表结构体初始化元素设置打印矩阵主函数输出结果代码整合 4.2.1 矩阵的数组表示 【数据结构】数组和字符串&#xff08;一&#xff…

双11满减大促,直播间1折抢购!

你是知道的&#xff0c;双11本来是光棍节&#xff01; 2009年&#xff0c;阿里掀起了一场网络促销活动&#xff0c;光棍节从此成了全民的购物节&#xff01; 从2009年到2021年&#xff0c;阿里双11当天的交易额&#xff0c;从仅有的0.5亿猛增至5403亿&#xff0c;以惊人的速度…

目标检测及锚框、IoU

文章目录 1. 目标检测2. 锚框3. IoU - 交并比4. 赋予锚框标号5. 使用非极大值抑制&#xff08;NMS&#xff09;输出 1. 目标检测 物体检测&#xff08;目标检测&#xff09;是计算机视觉和数字图像处理的热门方向&#xff0c;意在判断一幅图像上是否存在感兴趣物体&#xff0c…

List的add(int index,E element)陷阱,不得不防

项目场景&#xff1a; 项目中有两个List列表&#xff0c;一个是List1用来存储一个标识&#xff0c;后续会根据这个标识去重。 一个List2是用来返回对象的&#xff0c;其中对象里也有一个属性List3。现需要将重复的标识数据追加到List3 我想到的两个方案&#xff1a; 尽量不动…

C/S架构和B/S架构

1. C/S架构和B/S架构简介 C/S 架构&#xff08;Client/Server Architecture&#xff09;和 B/S 架构&#xff08;Browser/Server Architecture&#xff09;是两种不同的软件架构模式&#xff0c;它们描述了客户端和服务器之间的关系以及数据交互的方式。 C/S 架构&#xff08…

diffusion model (八) Dalle3 技术小结

paper&#xff1a;https://cdn.openai.com/papers/dall-e-3.pdf 创建时间&#xff1a; 2023-10-25 相关阅读 diffusion model&#xff08;一&#xff09;DDPM技术小结 (denoising diffusion probabilistic)diffusion model&#xff08;二&#xff09;—— DDIM技术小结diffu…

机器人入门(四)—— 创建你的第一个虚拟小车

机器人入门&#xff08;四&#xff09;—— 创建你的第一个虚拟小车 一、小车建立过程1.1 dd_robot.urdf —— 建立身体1.2 dd_robot2.urdf —— 添加轮子1.3 dd_robot3.urdf —— 添加万向轮1.4 dd_robot4.urdf —— 添加颜色1.5 dd_robot5.urdf —— 添加碰撞检测(Collision …

最近面试遇到的高频面试题

大家好&#xff0c;我是 jonssonyan 互联网寒冬&#xff1f;金九银十真的不存在了么&#xff1f;虽说现在行情是差了一些&#xff0c;面试机会少了一些&#xff0c;但是大部分公司还是或多或少的招人&#xff0c;春招秋招都在进行。有人离职就有人入职。所以如果你还没约到面试…

【Linux】安装与配置虚拟机及虚拟机服务器坏境配置与连接

目录 操作系统介绍 什么是操作系统 常见操作系统 UNIX操作系统 linux操作系统 mac操作系统 嵌入式操作系统 个人版本和服务器版本的区别 安装VMWare虚拟机 VMWare虚拟网卡 ​编辑 配置虚拟网络编辑器 ​编辑 安装配置Windows Server 2012 R2 安装Windows Server 2…

钉钉超过90天的文件需要一分钟重新激活的实现原理是什么?

具体实现原理可能包括以下几点&#xff1a; 冷热数据分类&#xff1a;系统会根据文件的访问频率将文件分为热数据和冷数据两类。热数据是经常被访问的文件&#xff0c;这些文件会被存储在快速的存储设备上&#xff0c;以便快速访问。冷数据是很少被访问的文件&#xff0c;这些…

小红书app拉新上线了 适合网推社群和校园渠道作业

小红书app签到拉新上线了可以通过“聚量推客”进行申请&#xff0c;下面大概是要求和流程 要求网推社群渠道或者地推校园渠道&#xff0c;其它类型渠道禁止

18 行为型模式-观察者模式

行为模式共有11种&#xff1a; 观察者模式 模板方法模式 策略模式 职责链模式 状态模式 命令模式 中介者模式 迭代器模式 访问者模式 备忘录模式 解释器模式 以上 11 种行为型模式&#xff0c;除了模板方法模式和解释器模式是类行为型模式&#xff0c;其他的全部属于对象行为型…

华为eNSP配置专题-策略路由的配置

文章目录 华为eNSP配置专题-策略路由的配置0、概要介绍1、前置环境1.1、宿主机1.2、eNSP模拟器 2、基本环境搭建2.1、终端构成和连接2.2、终端的基本配置 3、配置接入交换机上的VLAN4、配置核心交换机为网关和DHCP服务器5、配置核心交换机和出口路由器互通6、配置PC和出口路由器…

redis archive github

https://github.com/redis/redis/releases/tag/7.2.2https://github.com/redis/redis/releases/tag/7.2.2

塔式服务器介绍

大家都知道服务器分为机架式服务器、刀片式服务器、塔式服务器三类&#xff0c;今天小编就分别讲一讲这三种服务器&#xff0c;第三篇先来讲一讲塔式服务器的介绍。 塔式服务器定义&#xff1a;塔式服务器的外观和普通电脑差不多&#xff0c;直立放置。机箱比较大&#xff0c;服…