SpringBoot+Netty+Websocket实现消息推送

这样一个需求:把设备异常的状态每10秒推送到页面并且以弹窗弹出来,这个时候用Websocket最为合适,今天主要是后端代码展示。

添加依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.36.Final</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

定义netty端口号

websocket:netty:port: 8888path: /websocket

netty服务器

@Slf4j
@Component
public class NettyServer {/*** netty服务端口号*/@Value("${websocket.netty.port}")private int port;/*** netty事件辅助组*/private EventLoopGroup bossGroup;/*** netty事件工作组*/private EventLoopGroup workGroup;/*** 管道配置*/private final CustomChannelInitializer channelInitializer;public NettyServer(CustomChannelInitializer channelInitializer) {this.channelInitializer = channelInitializer;}/*** netty服务初始化*/@PostConstructpublic void start() {new Thread(() -> {bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();//bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作bootstrap.group(bossGroup, workGroup);//设置NIO类型的channelbootstrap.channel(NioServerSocketChannel.class);//设置监听端口bootstrap.localAddress(new InetSocketAddress(port));//设置管道bootstrap.childHandler(channelInitializer);try {ChannelFuture channelFuture = bootstrap.bind().sync();log.info("Netty服务启动成功,开启监听:{}", channelFuture.channel().localAddress());//对关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("Netty服务启动失败!", e);throw new RuntimeException(e);}}).start();}}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;import java.util.concurrent.ConcurrentHashMap;/*** @version 1.0.0* @description 业务类*/
public class NettyConfig {/*** 定义全局单利channel组 管理所有channel*/private static volatile ChannelGroup channelGroup = null;/*** 存放请求ID与channel的对应关系*/private static volatile ConcurrentHashMap<String, Channel> channelMap = null;/*** 定义两把锁*/private static final Object lock1 = new Object();private static final Object lock2 = new Object();public static ChannelGroup getChannelGroup() {if (null == channelGroup) {synchronized (lock1) {if (null == channelGroup) {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}}}return channelGroup;}public static ConcurrentHashMap<String, Channel> getChannelMap() {if (null == channelMap) {synchronized (lock2) {if (null == channelMap) {channelMap = new ConcurrentHashMap<>();}}}return channelMap;}public static Channel getChannel(String userId) {if (null == channelMap) {return getChannelMap().get(userId);}return channelMap.get(userId);}
}

管道配置

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** @version 1.0.0* @description Netty管道配置类*/
@Component
public class CustomChannelInitializer extends ChannelInitializer<SocketChannel> {/*** webSocket协议名*/private static final String WEBSOCKET_PROTOCOL = "WebSocket";/*** websocket服务地址*/@Value("${websocket.path:/websocket}")private String websocketPath;private final CustomChannelHandler channelHandler;public CustomChannelInitializer(CustomChannelHandler channelHandler) {this.channelHandler = channelHandler;}@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 设置管道ChannelPipeline pipeline = socketChannel.pipeline();// 流水线管理通道中的处理程序(Handler),用来处理业务// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ObjectEncoder());// 以块的方式来写的处理器pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));// 自定义的handler,处理业务逻辑pipeline.addLast(channelHandler);}
}

自定义CustomChannelHandler

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.ruoyi.common.utils.StringUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @version 1.0.0* @description Netty管道handler类*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class CustomChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {/*** 一旦连接,第一个被执行*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());// 添加到channelGroup 通道组NettyConfig.getChannelGroup().add(ctx.channel());}/*** 读取数据*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info("服务器收到消息:{}", msg.text());// 获取用户ID,关联channelJSONObject jsonObject = JSONUtil.parseObj(msg.text());String uid = jsonObject.getStr("uid");if(StringUtils.isNotEmpty(uid)){NettyConfig.getChannelMap().put(uid, ctx.channel());// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key = AttributeKey.valueOf("userId");ctx.channel().attr(key).setIfAbsent(uid);// 回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));}}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("用户下线了:{}", ctx.channel().id().asLongText());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常:{}", cause.getMessage());super.exceptionCaught(ctx,cause);// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);ctx.close();}/*** 删除用户与channel的对应关系*/private void removeUserId(ChannelHandlerContext ctx) {AttributeKey<String> key = AttributeKey.valueOf("userId");String userId = ctx.channel().attr(key).get();if(StringUtils.isNotEmpty(userId)){NettyConfig.getChannelMap().remove(userId);}}
}

推送消息接口及实现类

public interface PushMsgService {/*** 推送给指定用户*/void pushMsgToOne(String group, String msg);/*** 推送给所有用户*/void pushMsgToAll(String msg);
}

实现接口

@Service
public class PushMsgServiceImpl implements PushMsgService {@Overridepublic void pushMsgToOne(String group, String msg) {Channel channel = NettyConfig.getChannel(group);if (Objects.isNull(channel)) {throw new RuntimeException("未连接socket服务器");}channel.writeAndFlush(new TextWebSocketFrame(msg));}@Overridepublic void pushMsgToAll(String msg) {NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));}
}

具体的controller层接口

   /*** 获取弹框网关状态*/@GetMapping("/upKnxNetworkLink/{uid}")public void upKnxNetworkLink(@PathVariable String uid){KnxNetworkLinkInfo knxNetworkLinkInfo =new KnxNetworkLinkInfo();knxNetworkLinkInfo.setStatus("0");List<KnxNetworkLinkInfo>knxNetworkLinkInfoList=knxNetworkLinkInfoService.queryList(knxNetworkLinkInfo);JSONArray array= JSONArray.parseArray(JSON.toJSONString(knxNetworkLinkInfoList));pushMsgService.pushMsgToOne(uid,array.toJSONString());}

使用postman测试Websocket推送
在这里插入图片描述
连接Websocket
在这里插入图片描述
在开一个窗口测试发送消息的接口
在这里插入图片描述
发送过后在回到连接Websocket窗口
在这里插入图片描述
前端需要做一个定时访问发送消息的接口,每发一次就会往前端推送一次数据。
参考:Springboot + netty +websocket 实现推送消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/217733.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot Maven 项目打包的艺术--主清单属性缺失与NoClassDefFoundError的优雅解决方案

Maven项目的Jar包打包问题-没有主清单属性&&ClassNotFoundException 与 NoClassDefFoundError 文章目录 Maven项目的Jar包打包问题-没有主清单属性&&ClassNotFoundException 与 NoClassDefFoundError1、问题出现1.1、Jar包运行&#xff1a;没有主清单属性解决方…

快递批量查询高手:自动查询,精准掌控状态实时更新信息

随着电子商务的繁荣和智能化生活的普及&#xff0c;快递服务已经成为了我们生活中不可或缺的一部分。然而&#xff0c;在大量的快递信息中&#xff0c;如何快速、准确地查询和掌握快递的实时状态成为了许多人的痛点。今天&#xff0c;我们将介绍一款名为“快递批量查询高手”的…

智能优化算法应用:基于混合蛙跳算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于混合蛙跳算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于混合蛙跳算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.混合蛙跳算法4.实验参数设定5.算法结果6.…

Mac 文件高速下载工具 JDownloader2 下载安装详细教程

朋友们大家好&#xff0c;今天推荐一款多线程下载神器 JDownloader&#xff0c;JDownloader 支持多线程下载、断点续传、网盘资源下载、压缩包文件自解压、OCR识别等众多实用功能&#xff0c;最重要的就是可以提升我们的下载速度5-20倍&#xff0c;非常的硬核 安装也是比较简单…

SpringBoot使用自带的日志框架(开箱即用,同时输出到文件与控制台)

在SpringBoot内部中&#xff0c;默认就集成了LogBack的日志依赖&#xff0c;所以我们其实在实际开发中不需要直接添加该依赖。 你会发现spring-boot-starter其中包含了 spring-boot-starter-logging&#xff0c;Spring Boot为我们提供了很多默认的日志配置&#xff0c;所以&…

常见的Linux基本指令

目录 什么是Linux&#xff1f; Xshell如何远程控制云服务器 Xshell远程连接云服务器 Linux基本指令 用户管理指令 pwd指令 touch指令 mkdir指令 ls指令 cd指令 rm指令 man命令 cp指令 mv指令 cat指令 head指令 ​编辑 tail指令 ​编辑echo指令 find命令 gr…

【算法题】N进制减法(js)

返回结果-1 const str "2 11 1"; const str1 "8 07 1"; const str2 "16 af ff"; function solution(str) {const [n, minuend, subtrahend] str.split(" ");if (n < 2 || n > 35) return -1;else if (isValid(minuend) &am…

人工智能中的文本分类:技术突破与实战指导

在本文中&#xff0c;我们全面探讨了文本分类技术的发展历程、基本原理、关键技术、深度学习的应用&#xff0c;以及从RNN到Transformer的技术演进。文章详细介绍了各种模型的原理和实战应用&#xff0c;旨在提供对文本分类技术深入理解的全面视角。 关注TechLead&#xff0c;分…

探索SSL证书的应用场景,远不止网站,还有小程序、App Store等

说到SSL证书&#xff0c;我们都知道其是用于实现HTTPS加密保障数据安全的重要工具&#xff0c;在建设网站的时候经常会部署SSL证书。但实际上&#xff0c;SSL证书的应用场景远不止网站&#xff0c;它还被广泛地应用到小程序、App Store、抖音广告、邮件服务器以及各种物联网设备…

web网络安全

web安全 一&#xff0c;xss 跨站脚本攻击(全称Cross Site Scripting,为和CSS&#xff08;层叠样式表&#xff09;区分&#xff0c;简称为XSS)是指恶意攻击者在Web页面中插入恶意javascript代码&#xff08;也可能包含html代码&#xff09;&#xff0c;当用户浏览网页之时&…

html中一个div中平均一行分配四个盒子,可展开与收起所有的盒子

html中一个div中平均一行分配四个盒子&#xff0c;可展开与收起所有的盒子 1.截图显示部分 2.代码展示部分 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"wid…

测长机:精度与用途解析

测长机是一种用于测量物体长度或距离的专业测量仪器&#xff0c;而且测量结果能够稳定且可靠。其精度是衡量其优劣的重要标准之一。 在制造业中&#xff0c;长度尺寸是所有几何量尺寸测量的基准。通过测量产品的长度&#xff0c;可以及时发现并纠正尺寸偏差&#xff0c;保证产…

JVM的内存分区以及垃圾收集

1.JVM的内存分区 1.1方法区 方法区(永久代&#xff09;主要用来存储已在虚拟机加载的类的信息、常量、静态变量以及即时编译器编译后的代码信息。该区域是被线程共享的。 1.2虚拟机栈 虚拟机栈也就是我们平时说的栈内存&#xff0c;它是为java方法服务的。每个方法在执行的…

Vue--第八天

Vue3 1.优点&#xff1a; 2.创建&#xff1a; 3.文件&#xff1a; 换运行插件&#xff1a; 4.运行&#xff1a; setup函数&#xff1a; setup函数中获取不到this&#xff08;this 在定义的时候是Undefined) reactive()和ref(): 代码&#xff1a; <script setup> // …

解决el-table组件中,分页后数据的勾选、回显问题?

问题描述&#xff1a; 1、记录一个弹窗点击确定按钮后&#xff0c;table列表所有勾选的数据信息2、再次打开弹窗&#xff0c;回显勾选所有保存的数据信息3、遇到的bug&#xff1a;切换分页&#xff0c;其他页面勾选的数据丢失&#xff1b;点击确认只保存当前页的数据&#xff1…

学习MS Dynamics AX 2012编程开发 1. 了解Dynamics AX 2012

在本章中&#xff0c;您将了解开发环境的结构以及Microsoft Dynamics AX中的开发人员可以访问哪些工具。在本书的第一步演练之后&#xff0c;您将很容易理解著名的Hello World代码&#xff0c;您将知道应用程序对象树中的不同节点代表什么。 以下是您将在本章中学习的一些主题…

圆形多线图

const gaugeData [{value: 80,name: Perfect,title: {offsetCenter: [-100%, -100%]},detail: {valueAnimation: true,offsetCenter: [-70%, -100%]},itemStyle: {borderColor: #fff,borderWidth: 6,borderType: solid // 可选&#xff0c;指定边框类型}},{value: 40,name: Go…

MQ-Det: Multi-modal Queried Object Detection in the Wild

首个支持视觉和文本查询的开放集目标检测方法 NeurIPS2023 文章&#xff1a;https://arxiv.org/abs/2305.18980 代码&#xff1a;https://github.com/YifanXu74/MQ-Det 主框图 摘要 这篇文章提出了MQ-Det&#xff0c;一种高效的架构和预训练策略&#xff0c;它利用文本描述的…

JS的箭头函数this:

箭头函数不会创建自己的this&#xff0c;它只会从自己的作用域链的上一层沿用this。 具体看实例&#xff1a; //以前&#xff1a;谁调用的这个函数 this就指向谁// console.log(this);//window// function fn(){// console.log(this);//window 因为这个函数也是window调用…

【Vue+Python】—— 基于Vue与Python的图书管理系统

文章目录 &#x1f356; 前言&#x1f3b6;一、项目描述✨二、项目展示&#x1f3c6;三、撒花 &#x1f356; 前言 【VuePython】—— 基于Vue与Python的图书管理系统 &#x1f3b6;一、项目描述 描述&#xff1a; 本项目为《基于Vue与Python的图书管理系统》&#xff0c;项目…