使用springboot-3.4.1搭建一个netty服务并且WebSocket消息通知(适用于设备直连操作,以及回复操作)

引入最新版本

<!--websocket-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

启动类加入

//netty 协议服务端口启动
NettyTcpHandler.start();
package com.cqcloud.platform.handler;import com.cqcloud.platform.service.IotMqttService;
import com.cqcloud.platform.service.impl.IotMqttServiceImpl;
import org.springframework.stereotype.Component;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2022年9月23日 🐬🐇 💓💕*/
@Component
public class NettyTcpHandler {/*** IoT设备协议端口*/private static int PORT = 1883;/*** 使用方法在启动类* 加上 NettyTcpHandler.start();* @throws Exception*/public static void start() throws Exception {final NioEventLoopGroup bossGroup = new NioEventLoopGroup();final NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();// 创建 IotPushService 实例IotMqttService iotPushService = new IotMqttServiceImpl();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 添加处理器,处理所有连接的业务逻辑pipeline.addLast(new TcpMqttServerHandler(iotPushService));}});// 绑定端口并启动ChannelFuture future = bootstrap.bind(PORT).sync();// 等待服务器关闭future.channel().closeFuture().sync();} finally {// 优雅地关闭线程池workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
}
package com.cqcloud.platform.handler;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;import cn.hutool.json.JSONObject;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2022年10月06日 🐬🐇 💓💕*/
@Slf4j
@Component
public class TcpEventHandler {// 使用 BiConsumer 来处理两个参数private final static Map<String, BiConsumer<String, String>> eventActions = new HashMap<>();public void registerEventAction(String eventCode, BiConsumer<String, String> action) {// 动态注册事件处理逻辑eventActions.put(eventCode, action);}public static void handleEvent(String evt, String imei, String reportContent) {// 根据事件类型找到对应的处理逻辑,并执行eventActions.getOrDefault(evt, TcpEventHandler::handleUnknownEvent).accept(imei, reportContent);}private static void handleUnknownEvent(String imei, String reportContent) {// 处理未知事件的逻辑System.out.println("imei: " + imei + ", 报告内容: " + reportContent);}public static void handleAlarm(String imei, String reportContent) {log.info("内容: {}", reportContent);// 获取目标用户列表(包括固定的用户 ID)List<String> targetUsers = buildTargetUsersList();// 构建消息并发送sendAlarmMessage(targetUsers, imei, reportContent);}private static List<String> buildTargetUsersList() {List<String> targetUsers = new ArrayList<>();targetUsers.add("1");targetUsers.add("2");//实际根据业务查询数据targetUsers.add("26967563820859392");return targetUsers;}private static void sendAlarmMessage(List<String> targetUsers, String imei, String reportContent) {// 将目标用户列表转换为逗号分隔的字符串String users = String.join(",", targetUsers);// 构建 JSON 消息JSONObject obj = new JSONObject();obj.set("imei", imei);obj.set("message", reportContent);obj.set("userId", users);// 发送消息WebSocketHandler.sendMessageToUser(users, obj.toString());}
}
package com.cqcloud.platform.handler;import java.util.ArrayList;
import java.util.List;
import java.util.Optional;import com.cqcloud.platform.service.IotMqttService;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** 物联网云平台设备协议* @author weimeilayer@gmail.com ✨* @date 💓💕 2022年9月23日 🐬🐇 💓💕*/
public class TcpMqttServerHandler extends SimpleChannelInboundHandler<ByteBuf>  {// 接口注入private final IotMqttService iotPushService;public TcpMqttServerHandler(IotMqttService iotPushService) {this.iotPushService = iotPushService;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {byte[] byteArray;if (in.readableBytes() <= 0) {in.release();return;}byteArray = new byte[in.readableBytes()];in.readBytes(byteArray);if (byteArray.length <= 0) {in.release();return;}// 将消息传递给 iotPushServiceiotPushService.pushMessageArrived(byteArray);// 下发指令,假设返回的是多个指令List<String> externalValues = extractExternalValue("deviceId");// 转换为十六进制字符串String hexString = bytesToHex(byteArray);System.out.println("来自于物联网云平台设备协议的数据: " + hexString);// 使用 Optional 判断外部值// 使用 Optional 判断外部值,如果不为空则逐一处理每条指令Optional.ofNullable(externalValues).ifPresent(values -> {values.forEach(value -> {// 提取外部数据值并发送System.out.println("来自于物联网云平台设备协议的1885端口的提取外部数据值: " + value);sendResponse(ctx, value);});});}// 辅助方法:将字节数组转换为十六进制字符串private static String bytesToHex(byte[] bytes) {StringBuilder hexString = new StringBuilder();for (byte b : bytes) {String hex = Integer.toHexString(0xFF & b);if (hex.length() == 1) {hexString.append('0'); // 确保每个字节都为两位}hexString.append(hex);}return hexString.toString().toUpperCase(); // 返回大写格式}// 发送响应的统一辅助方法private void sendResponse(ChannelHandlerContext ctx, String hexResponse) {byte[] responseBytes = hexStringToByteArray(hexResponse);ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);ctx.writeAndFlush(responseBuffer);}// 将响应消息转换为字节数组public static byte[] hexStringToByteArray(String s) {int len = s.length();byte[] data = new byte[len / 2];for (int i = 0; i < len; i += 2) {data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4) + Character.digit(s.charAt(i + 1), 16));}return data;}// 查询数据库当前设备号下需要下发的命令private List<String> extractExternalValue(String deviceId) {//这里自行查询数据库数据库,这里只模拟一个list集合ArrayList<Object> list = new ArrayList<>();// 如果记录不为空,获取最新记录的 externalValuelist.stream().findFirst() // 获取最新的一条记录.map(latestRecord -> {// 处理最新记录的逻辑return ""; // 需要返回的值是 externalValue});// 获取最新的一条记录return null; // 如果没有找到,返回 null}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 打印异常堆栈跟踪,便于调试和错误排查cause.printStackTrace();// 关闭当前的通道,释放相关资源ctx.close();}
}
package com.cqcloud.platform.handler;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;import org.springframework.stereotype.Component;import cn.hutool.json.JSONUtil;
import io.micrometer.common.util.StringUtils;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnError;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2022年4月12日 🐬🐇 💓💕*/
@Component
@ServerEndpoint("/websocket/{username}")
public class WebSocketHandler {public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {WebSocketHandler.onlineCount++;}public static synchronized void subOnlineCount() {WebSocketHandler.onlineCount--;}// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。private static int onlineCount = 0;// 根据名字存储websocket对象CopyOnWriteArraySet线程安全set,ConcurrentHashMap线程安全mappublic static Map<String, CopyOnWriteArraySet<WebSocketHandler>> webSocketMap = new ConcurrentHashMap<>();// 与某个客户端的连接会话,需要通过它来给客户端发送数据public Session session;// 心跳时间,长时间没心跳踢掉连接public long heartBeatTime;// 初次连接时间,用于控制连接时间过长,踢掉连接public long beginTime;/*** 用户名称*/public String username;/*** 发送消息* @param username* @param message*/public static void sendMessageToUser(String username, String message) {// 检查用户名是否在 map 中存在if (webSocketMap.containsKey(username)) {// 获取该用户的 WebSocketHandler 集合CopyOnWriteArraySet<WebSocketHandler> userHandlers = webSocketMap.get(username);// 遍历该用户的所有连接(每个用户可能有多个 WebSocket 连接)for (WebSocketHandler handler : userHandlers) {// 通过 WebSocketHandler 实例发送消息handler.sendMessageOne(message, username);}} else {System.out.println("并无在线用户: " + username);}}/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(@PathParam("username") String username, Session session) {this.username = username;this.session = session;this.heartBeatTime = System.currentTimeMillis();this.beginTime = System.currentTimeMillis();// 登陆用户必须按照用户id 格式登陆if (!"server".equals(username) && username.split(",").length < 3) {return;}// 将用户添加到websocket,支持单用户多出链接if (webSocketMap.containsKey(username)) {webSocketMap.get(username).add(this);} else {CopyOnWriteArraySet websocketSet = new CopyOnWriteArraySet();websocketSet.add(this);webSocketMap.put(username, websocketSet);addOnlineCount(); // 在线数加1}//注释掉 会退出Map<String, Object> messageMap = new ConcurrentHashMap<>();messageMap.put("type", "0");messageMap.put("message", username + "加入8000端口的的当前在线人数为" + getOnlineCount());messageMap.put("to", "all");messageMap.put("users", webSocketMap.keySet());messageMap.put("username", "server");sendMessageAll(JSONUtil.toJsonStr(messageMap));}/*** 发送消息给所有用户** @param message* @throws IOException*/public void sendMessageAll(String message) {for (String key : webSocketMap.keySet()) {for (WebSocketHandler websocket : webSocketMap.get(key)) {websocket.session.getAsyncRemote().sendText(message);}}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {if (StringUtils.isNotEmpty(this.username)) {try {if (this.session.isOpen()) {this.session.close();// 强制关闭}webSocketMap.get(username).remove(this);// 删除链接if (webSocketMap.get(username).isEmpty()) {webSocketMap.remove(username);subOnlineCount(); // 在线数减1// 刷新用户列表Map<String, Object> messageMap = new ConcurrentHashMap<>();messageMap.put("type", 0);messageMap.put("message", username + "退出!当前在线人数为" + getOnlineCount());messageMap.put("users", webSocketMap.keySet());sendMessageAll(JSONUtil.toJsonStr(messageMap));}} catch (Exception e) {System.err.println("关闭连接出错 : " + e.getLocalizedMessage());}}}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息*/@OnMessagepublic void onMessage(String message) {// 刷新心跳时间this.heartBeatTime = System.currentTimeMillis();// 群发消息cn.hutool.json.JSONObject messageJson = JSONUtil.parseObj(message);Object type = messageJson.get("type");// 消息类型Object toUser = messageJson.get("to");// 接收对象// 心跳检测if ("999".equals(type)) {Map<String, Object> messageMap = new ConcurrentHashMap<>();messageMap.put("type", "1");messageMap.put("message", "pong");messageMap.put("username", "服务器");messageMap.put("to", this.username);sendMessageOne(JSONUtil.toJsonStr(messageMap), this.username);return;}// 发送消息if ("All".equalsIgnoreCase(type + "")) {sendMessageAll(message);}else {sendMessageOne(message, toUser + "");}}/*** 发生错误时调用*/@OnErrorpublic void onError(Throwable error) {error.printStackTrace();}/*** 发送消息** @param message* @throws IOException*/public void sendMessage(String message){//this.session.getBasicRemote().sendText(message);//同步this.session.getAsyncRemote().sendText(message);// 异步}/*** 发送消息给指定用户** @param message* @param toUserName*/public void sendMessageOne(String message, String toUserName) {webSocketMap.keySet().forEach(e -> {if (e.equals(toUserName)) {webSocketMap.get(e).forEach(f -> {try {f.session.getAsyncRemote().sendText(message);} catch (Exception e2) {f.session.getAsyncRemote().sendText(message);}});}});}
}
package com.cqcloud.platform.service;/*** @author weimeilayer@gmail.com* @date 💓💕2022年9月8日🐬🐇💓💕*/
public interface IotMqttService {/*** 扩展传输原文* @param message*/void pushMessageArrived(byte[] message);
}
package com.cqcloud.platform.service.impl;import org.springframework.stereotype.Service;import com.cqcloud.platform.service.IotMqttService;import lombok.AllArgsConstructor;/*** @author weimeilayer@gmail.com* @date 💓💕2022年9月8日🐬🐇💓💕*/
@Service
@AllArgsConstructor
public class IotMqttServiceImpl implements IotMqttService {/*** 获取拓展接口原文值* @param message*/@Overridepublic void pushMessageArrived(byte[] message) {// 拓展方法TcpEventHandler.handleAlarm("设备号","告警信息");}
}
package com.cqcloud.platform.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2022年4月12日 🐬🐇 💓💕*/
@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter, 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint* @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
进行批量用户的id进行下发webscoket信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/484931.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从仪表盘探索 MongoDB 关键指标

这是 MongoDB 监控系列文章的第七篇&#xff0c;前面几篇文章的链接如下&#xff1a; MongoDB 监控&#xff08;一&#xff09;MongoDB 监控&#xff08;二&#xff09;MongoDB 监控&#xff08;三&#xff09;MongoDB 监控&#xff08;四&#xff09;MongoDB 监控&#xff08…

手机LCD分区刷新技术介绍

分区刷新也称为分区变频&#xff0c;LCD分区刷新功能的目的是将屏幕分为上下半区&#xff0c;分区显示不同帧率&#xff0c;上方区块High Frame Rate&#xff0c;下方区块Low Frame Rate。使用者可以动态自定义上方高刷显示区的结尾位置。 当前的智能手机屏幕上&#xff0c;显示…

php基础:文件处理

​​​​​​1.PHP 操作文件 读取文件并写到输出流的 PHP 代码如下&#xff08;如读取成功则 readfile() 函数返回字节数&#xff09;&#xff1a; <?php echo readfile("webdictionary.txt"); ?> 2.PHP 文件打开/读取/关闭 打开使用fopen&#xff08;&…

Redis高阶集群搭建+集群读写

问题 容量不够&#xff0c;redis 如何进行扩容&#xff1f;并发写操作&#xff0c; redis 如何分摊&#xff1f;另外&#xff0c;主从模式&#xff0c;薪火相传模式&#xff0c;主机宕机&#xff0c;导致 ip 地址发生变化&#xff0c;应用程序中配置需要修改对应的主机地址、端…

【H2O2|全栈】MySQL的基本操作(三)

目录 前言 开篇语 准备工作 案例准备 多表查询 笛卡尔积 等值连接 外连接 内连接 自连接 子查询 存在和所有 含于 分页查询 建表语句 结束语 前言 开篇语 本篇继续讲解MySQL的一些基础的操作——数据字段的查询中的多表查询和分页查询&#xff0c;与单表查询…

springboot vue 会员收银系统 (12)购物车关联服务人员 订单计算提成 开源

前言 完整版演示 http://120.26.95.195/ 开发版演示 http://120.26.95.195:8889/ 在之前的开发进程中&#xff0c;我们完成订单的挂单和取单功能&#xff0c;今天我们完成购物车关联服务人员&#xff0c;用户计算门店服务人员的提成。 1.商品关联服务人员 服务人员可以选择 一…

leetcode 1853 转换日期格式(postgresql)

需求 表: Days ----------------- | Column Name | Type | ----------------- | day | date | ----------------- day 是这个表的主键。 给定一个Days表&#xff0c;请你编写SQL查询语句&#xff0c;将Days表中的每一个日期转化为"day_name, month_name day, year"…

java操作doc(二)——java利用Aspose.Words动态创建自定义doc文档

有关java动态操作word文档&#xff0c;上一篇写了如何使用模板动态设置对于内容以及相关单元格的动态合并问题&#xff0c;详细请参看如下文档&#xff1a; java利用Aspose.Words操作Word动态模板文档并动态设置单元格合并 这篇文档说说&#xff0c;如何利用Aspose.Words动态…

仿蝠鲼软体机器人实现高速多模态游动

近期&#xff0c;华南理工大学周奕彤老师研究团队最新成果"Manta Ray-Inspired Soft Robotic Swimmer for High-speed and Multi-modal Swimming"被机器人领域会议 IEEE/RSJ International Conference on Intelligent Robots and Systems&#xff08;IROS 2024&#…

【网络原理】网络地址转换----NAT技术详解

&#x1f490;个人主页&#xff1a;初晴~ &#x1f4da;相关专栏&#xff1a;计算机网络那些事 我们在 IP协议 一文中介绍过&#xff0c;由于IPv4协议中 IP地址只有32位&#xff0c;导致最多只能表示 42亿9千万个IP地址。但我们需要通过IP地址来标识网络上的每一个设备&#x…

D86【python 接口自动化学习】- pytest基础用法

day86 pytest配置testpaths 学习日期&#xff1a;20241202 学习目标&#xff1a;pytest基础用法 -- pytest配置testpaths 学习笔记&#xff1a; pytest配置项 主目录创建pytest.ini文件 [pytest] testpaths./testRule 然后Terminal里直接命令&#xff1a;pytest&#xff…

电机瞬态分析基础(15):电机的电磁转矩(三相同步电机和三相感应电机)

1. 三相同步电机电磁转矩 1.1 隐极同步电机 图1. 三相隐极同步电机基本结构 三相隐极同步电机的基本结构可用图1来简单表示&#xff0c;图中&#xff0c;定子分布绕组可等效为三相对称绕组A-X、B-Y和C-Z&#xff1b;转子分布绕组为励磁绕组。若在定子三相对称绕组中通入三相…

人工智能与机器学习在智能扭矩系统中的应用

【大家好&#xff0c;我是唐Sun&#xff0c;唐Sun的唐&#xff0c;唐Sun的Sun。】 在当今科技飞速发展的时代&#xff0c;智能扭矩系统正经历着一场深刻的变革&#xff0c;而人工智能&#xff08;AI&#xff09;和机器学习算法的应用成为了推动这一变革的关键力量。 传统的扭矩…

Android hid 数据传输(device 端 )

最近一直在处理hid 数据需求&#xff0c;简而言之就是两台设备直接可以通过usb 线互相传递数据。 项目架构 为什么Device 端要采用HID&#xff08;人机接口设备&#xff09;的方式发送和接收数据呢&#xff1f; 主要是速度快&#xff0c;举个例子&#xff0c;就是鼠标移动&am…

K8S离线部署Nacos集群【Oracle作外部数据源】

一、前言 由于公司的要求下要使Nacos集群以Oracle作为外部数据源&#xff0c;前期咱们已经阐述了如何在本地搭建&#xff08;Nacos集群搭建【Oracle作外部数据源】&#xff09;&#xff0c;本次将带领大家在k8s上部署Nacos集群并以Oracle作为外部数据源。 二、软件包 nacos-f…

【Java开发】Springboot集成mybatis-plus

1、引入 mybatis-plus 依赖 <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.0</version> </dependency> <!--mysql依赖--> <dependen…

技术成长战略是什么?

文章目录 技术成长战略是什么&#xff1f;1. 前言2. 跟技术大牛学成长战略2.1 系统性能专家案例2.2 从开源到企业案例2.3 技术媒体大V案例2.4 案例小结 3. 学习金字塔和刻意训练4. 战略思维的诞生5. 建议 技术成长战略是什么&#xff1f; 1. 前言 在波波的微信技术交流群里头…

【开源免费】基于Vue和SpringBoot的课程答疑系统(附论文)

博主说明&#xff1a;本文项目编号 T 070 &#xff0c;文末自助获取源码 \color{red}{T070&#xff0c;文末自助获取源码} T070&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…

【二分查找】力扣 275. H 指数 II

一、题目 二、思路 h 指数是高引用引用次数&#xff0c;而 citations 数组中存储的就是不同论文被引用的次数&#xff0c;并且是按照升序排列的。也就是说 h 指数将整个 citations 数组分成了两部分&#xff0c;左半部分是不够引用 h 次 的论文&#xff0c;右半部分论文的引用…

【LeetCode】169.多数元素

题目连接&#xff1a; https://leetcode.cn/problems/majority-element/solutions/2362000/169-duo-shu-yuan-su-mo-er-tou-piao-qing-ledrh/?envTypestudy-plan-v2&envIdtop-interview-150 题目描述&#xff1a; 思路一&#xff1a; 使用哈希表unordered_map记录每个元…