基于Netty构建WebSocket服务并实现项目群组聊天和实时消息通知推送

文章目录

  • 前言
    • 需求分析
    • 技术预研
      • Web端方案
      • 服务端技术
  • 技术方案
    • 设计思路
    • 功能实现
      • 添加依赖
      • 自定义NettyServer
      • 自定义webSocketHandler
      • 使用NettyServer向在线用户发送消息
  • 需要完善的地方

前言

我们的项目有个基于项目的在线文档编制模块,可以邀请多人项目组成员在线协同编制项目文档,现在的需求是要实现项目组成员在线实时协作沟通交流功能以及消息实时推送功能。

需求分析

根据需求分析,首先我们要基于项目组成员构建在线聊天群组并支持在线聊天,同时成员在线时支持实时推送消息。
在这里插入图片描述

技术预研

Web端方案

实现Web消息实时推送的方案比较多,包括轮询、长轮询、SSE、AJAX、WebSocket等。根据对比我们最终选择使用WebSocket来实现Web消息实时推送。

  • WebSocket: WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
    WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

服务端技术

  • spring-boot-starter-websocket
    SpringBoot框架提供了WebSockets自动配置,通过spring-boot-starter-websocket模块轻松访问。
 <!-- 引入 WebSocket 模块依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>// 创建一个配置类来配置 WebSocket 服务器
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(new MyWebSocketHandler(), "/ws").setAllowedOrigins("*");}
}
// 自定义消息处理
public class MyWebSocketHandler extends TextWebSocketHandler {@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {log.debug("Connection established: " + session.getId());}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();log.debug("Received message: " + payload);// 回复消息session.sendMessage(new TextMessage("Echo: " + payload));}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {log.debug("Connection closed: " + session.getId());}
}

如果不考虑吞吐和并发,spring-boot-starter-websocket非常适合构建WebSocket Server端。

  • Netty
    Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

技术方案

设计思路

在这里插入图片描述

  1. 自定义NettyServer 基于项目构建用户群组
  2. 用户在指定群组发送消息,NettyServer向群组所有用户推送消息
  3. 业务系统向指定用户发送通知消息到kafka
  4. 消费者消费消息通过暴露出的NettyServerHanlder向所有在线用户实时推送消息

功能实现

添加依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.112.Final</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

自定义NettyServer

因为我们需要暴露NettyServer 的webSocketHandler,所以将NettyServer实例交由Spring管理,并暴露广播消息和系统消息接口

@Slf4j
@Component
public class CusNettyServer implements InitializingBean, DisposableBean {@Value("${netty.port:9000}")Integer nettyPort;EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 通常只需要一个线程即可EventLoopGroup workerGroup = new NioEventLoopGroup(); // 根据实际情况调整线程数 默认创建与 CPU 核心数相等的线程数private ChannelFuture channelFuture;private NettyWebSocketHandler webSocketHandler;@Overridepublic void destroy() throws Exception {if (channelFuture != null && channelFuture.channel().isOpen()) {channelFuture.channel().closeFuture().sync();}bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void afterPropertiesSet() throws Exception {ServerBootstrap sb = new ServerBootstrap();sb.option(ChannelOption.SO_BACKLOG, 128); // 考虑调整这个值sb.option(ChannelOption.SO_REUSEADDR, true); // 避免地址重用问题sb.childOption(ChannelOption.TCP_NODELAY, true); // 减少延迟sb.childOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接webSocketHandler = new NettyWebSocketHandler();// 绑定线程池sb.group(bossGroup, workerGroup)// 指定使用的channel.channel(NioServerSocketChannel.class)// 绑定监听端口.localAddress(this.nettyPort)// 绑定客户端连接时候触发操作.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("收到新连接: {}", ch.remoteAddress());//websocket协议本身是基于http协议的,所以这边也要使用http解编码器ch.pipeline().addLast(new HttpServerCodec());//以块的方式来写的处理器ch.pipeline().addLast(new ChunkedWriteHandler());ch.pipeline().addLast(new HttpObjectAggregator(8192));ch.pipeline().addLast(webSocketHandler);//添加聊天消息处理类ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));}});// 服务器异步创建绑定channelFuture = sb.bind().sync();log.debug("{} 启动正在监听: {}", NettyServer.class, channelFuture.channel().localAddress());}// 广播消息接口public void broadcastMessage(SocketMessage socketMessage) {webSocketHandler.broadcastMessage(socketMessage);}// 系统通知接口public void sendSystemMessage(SocketMessage socketMessage, String toUserId) {webSocketHandler.sendSystemMessage(socketMessage, toUserId);}
}

自定义webSocketHandler

前面自定义CusNettyServer 过程我们是将NettyWebSocketHandler 放在外层初始化的,为了避免一个Handler被多个channel传递抛io.netty.channel.ChannelPipelineException异常,我们需要将NettyWebSocketHandler 标记为 @ChannelHandler.Sharable

@ChannelHandler.Sharable
@Slf4j
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {/*** 存储已经登录用户的channel对象*/public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 存储用户id和用户的channelId绑定*/public static ConcurrentHashMap<String, ChannelId> userMap = new ConcurrentHashMap<>();/*** 存储广播消息的channel对象*/private static final ConcurrentHashMap<String, Channel> broadcastClients = new ConcurrentHashMap<>();/*** 用于存储群聊房间号和群聊成员的channel信息*/public static ConcurrentHashMap<String, ChannelGroup> groupMap = new ConcurrentHashMap<>();private static final ExecutorService executor = Executors.newFixedThreadPool(10); // 创建线程池private final TokenStore tokenStore = SpringUtil.getBean(TokenStore.class);private final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class);/*** 获取用户拥有的群聊id号*/private final UserGroupRepository userGroupRepository = SpringUtil.getBean(UserGroupRepository.class);private final MessageDataAssembler messageDataAssembler = SpringUtil.getBean(MessageDataAssembler.class);private final MessageManagerService messageService = SpringUtil.getBean(MessageManagerService.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("与客户端建立连接,通道开启!");//添加到channelGroup通道组channelGroup.add(ctx.channel());ctx.channel().id();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("与客户端断开连接,通道关闭!");//添加到channelGroup 通道组channelGroup.remove(ctx.channel());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//首次连接是FullHttpRequest,把用户id和对应的channel对象存储起来if (msg instanceof FullHttpRequest) {FullHttpRequest request = (FullHttpRequest) msg;// 首次握手进行登录验证String uri = request.uri();String token = getUrlParams(uri);String userId = chkLogin(token);userMap.put(userId, ctx.channel().id());broadcastClients.put(userId, ctx.channel());log.info("登录的用户id是:{}", userId);//第1次登录,需要查询下当前用户是否加入项目组,没有拒绝连接,有将群聊管理对象放入groupMap中List<UserGroup> groups = userGroupRepository.findGroupIdByUserId(userId);if (CollUtil.isNotEmpty(groups)) {groups.stream().map(UserGroup::getProjectId).forEach(groupId -> {ChannelGroup cGroup = Optional.ofNullable(groupMap.get(groupId)).orElseGet(() -> {ChannelGroup newGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);groupMap.put(groupId, newGroup);return newGroup;});//把用户放到群聊管理对象里去cGroup.add(ctx.channel());});}//如果url包含参数,需要处理if (uri.contains("?")) {String newUri = uri.substring(0, uri.indexOf("?"));request.setUri(newUri);}} else if (msg instanceof TextWebSocketFrame) {//正常的TEXT消息类型TextWebSocketFrame frame = (TextWebSocketFrame) msg;log.info("客户端收到服务器数据:{}", frame.text());SocketMessage socketMessage = JSON.parseObject(frame.text(), SocketMessage.class);socketMessage.setSendTime(new Date());socketMessage.setId(IdUtil.getSnowflakeNextIdStr());// 如果群聊不存在,则不处理消息if (!groupMap.containsKey(socketMessage.getProjectId())) {log.info("无效消息,对应群聊不存在 {}", socketMessage.getProjectId());return;}// 将消息存储到 RedisString projectId = socketMessage.getProjectId();String messageKey = String.join(":", "message", projectId, socketMessage.getId());String messageJson = JSON.toJSONString(socketMessage);redisTemplate.opsForValue().set(messageKey, messageJson, 10, TimeUnit.MINUTES);// 异步处理消息executor.submit(() -> {// 从 Redis 中获取消息String storedMessageJson = redisTemplate.opsForValue().get(messageKey);if (storedMessageJson != null) {SocketMessage storedMessage = JSON.parseObject(storedMessageJson, SocketMessage.class);// 持久化消息Message message = messageDataAssembler.toEntity(storedMessage);message.setBizType(MsgBizType.PROJECT.getCode());Message saved = messageService.saveMessage(message);storedMessage.setId(saved.getId());// 推送群聊信息// 这里假设 groupMap 已经定义并且是线程安全的ChannelGroup group = groupMap.get(projectId);if (group != null) {group.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(storedMessage)));}// 处理完成移除RedisredisTemplate.delete(messageKey);}});}super.channelRead(ctx, msg);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {}public void broadcastMessage(SocketMessage socketMessage) {// 异步处理消息executor.submit(() -> {// 持久化消息Message message = messageDataAssembler.toEntity(socketMessage);message.setSendTime(new Date());message.setBizType(MsgBizType.BROADCAST.getCode());message.setMessageType(MessageType.TEXT.getCode());messageService.broadcastMessage(message);channelGroup.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(socketMessage)));});}public void sendSystemMessage(SocketMessage socketMessage, String toUserId) {// 持久化消息Message message = messageDataAssembler.toEntity(socketMessage);messageService.sendUserMessage(message, toUserId);// 如何用户在线则推送websocket消息Optional.ofNullable(userMap.get(toUserId)).map(channelId -> channelGroup.find(channelId)).ifPresent(channel -> channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(socketMessage))));}private static String getUrlParams(String url) {if (!url.contains("=")) {throw new BusinessException(CusBusinessExceptionEnum.BUSINESS_ERROR_NETTY_SERVER_PATH_MUST_HAS_USER_ID_ERROR);}return url.substring(url.indexOf("=") + 1);}private String chkLogin(String token) {OAuth2AccessToken accessToken =  tokenStore.readAccessToken(token);if (accessToken == null) {throw new BusinessException(401, "Invalid access token:" + token);}if (accessToken.isExpired()) {throw new BusinessException(401, "Expired access token:" + token);}OAuth2Authentication oauth2Authentication = tokenStore.readAuthentication(accessToken);if (tokenStore.readAuthentication(accessToken) == null) {throw new BusinessException(401, "access token Authentication error:" + token);}LoginAppUser loginAppUser = (LoginAppUser) oauth2Authentication.getPrincipal();return loginAppUser.getUserId();}
}

这里有个问题是我们始终未解决的,那就是首次握手token传递的问题,最开始后端是FullHttpRequest request中获取token,且通过apifox也验证通过,但是前端在实现过程始终无法传递token(这块有大佬实现可以在评论下留言指点下)
这是我最开始的实现

 FullHttpRequest request = (FullHttpRequest) msg;// 首次握手进行登录验证// String userId = chkLogin(request);private String chkLogin(FullHttpRequest request) {String token = Optional.ofNullable(request.headers()).map(headers -> headers.get(HttpHeaderNames.AUTHORIZATION)).map(authHeader -> authHeader.replace("Bearer ", "")).orElseThrow(() -> new BusinessException(CusBusinessExceptionEnum.BUSINESS_ERROR_NETTY_SERVER_NOT_LOGIN_ERROR));OAuth2AccessToken accessToken =  tokenStore.readAccessToken(token);if (accessToken == null) {throw new BusinessException(401, "Invalid access token:" + token);}if (accessToken.isExpired()) {throw new BusinessException(401, "Expired access token:" + token);}OAuth2Authentication oauth2Authentication = tokenStore.readAuthentication(accessToken);if (tokenStore.readAuthentication(accessToken) == null) {throw new BusinessException(401, "access token Authentication error:" + token);}LoginAppUser loginAppUser = (LoginAppUser) oauth2Authentication.getPrincipal();return loginAppUser.getUserId();}

下图是我通过apifox使用header传递token验证成功
在这里插入图片描述

使用NettyServer向在线用户发送消息

消费kafka消息并使用NettyServer向在线用户发送消息

@Component
public class NotifyMsgConsumer {private final MessageManagerApplication messageManagerApplication;@KafkaListener(topics = "system_message_notify")public void processMessage(ConsumerRecord<Long, String> record, Acknowledgment acknowledgment) {log.info("system_message_notify 通知: {} {} ", record.key(), record.value());if (StringUtils.isEmpty(record.value())) {log.debug("system_message_notify 消息为空 {} 消息直接丢弃", record.key());acknowledgment.acknowledge();return;}NotifyMsg notifyMsg = null;try {notifyMsg = JSONObject.parseObject(record.value(), NotifyMsg.class);} catch (Exception e) {log.debug("system_message_notify 消息格式异常 {} {} 消息直接丢弃", record.key(), record.value());acknowledgment.acknowledge();return;}messageManagerApplication.sendNotify(notifyMsg);acknowledgment.acknowledge();}}@Override
public void sendNotify(NotifyMsg notifyMsg) {SocketMessage socketMessage = SocketMessage.buildNotifyMessage(messageDataAssembler, notifyMsg);cusNettyServer.sendSystemMessage(socketMessage, notifyMsg.getTo());}

下图是我们实现的一个前端效果图:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

需要完善的地方

该方案目前是我们单机部署的方案,集群下还需要扩展,包括:

  1. 在线用户同步的问题:群组新消息处理如何实时同步到所有NettyServer节点连接下的客户端
  2. 通知消息处理问题:也是实时同步的问题,要考虑到所有NettyServer节点连接下的客户端

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/456645.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

日常记录,使用springboot,vue2,easyexcel使实现字段的匹配导入

目前的需求是数据库字段固定&#xff0c;而excel的字段不固定&#xff0c;需要实现excel导入到一个数据库内。 首先是前端的字段匹配&#xff0c;显示数据库字段和表头字段 读取表头字段&#xff1a; 我这里实现的是监听器导入&#xff0c;需要新建一个listen类。 读Excel …

BiGRU实现中文关系抽取算法

获取更多完整项目代码数据集&#xff0c;点此加入免费社区群 &#xff1a; 首页-置顶必看 1. 项目简介 本项目旨在实现并训练一个深度学习模型&#xff0c;应用于时间序列数据处理或自然语言处理任务中。项目采用了门控循环单元&#xff08;GRU&#xff0c;Gated Recurrent U…

Python爬虫进阶(实战篇一)

接&#xff0c;基础篇&#xff0c;链接&#xff1a;python爬虫入门&#xff08;所有演示代码&#xff0c;均有逐行分析&#xff01;&#xff09;-CSDN博客 目录 1.爬取博客网站全部文章列表 ps:补充&#xff08;正则表达式&#xff09; 爬虫实现 爬虫代码&#xff1a; 2.爬…

uniapp uview 上传图片,数据以formData + File 形式传输

期望 后端期望前端给的传参为 formData 形式, 同时文件的数据类型为File 形式. 解决过程 将文件处理为 File 格式 uview 中的 upload 组件点击上传之后不是标准的 File 形式,点击上传单个文件之后的控制台信息如下: [{"url": "blob:http://localhost:8081/…

《Sui区块链:重塑去中心化应用的新星与未来潜力》

目录 引言 一、Sui 1、 技术架构 2、 编程语言 3、Move起源 4、Move的几个关键点&#xff1a; 5、Move 智能合约编程语言 6、智能合约编程语言可以做什么 7、和其他编程语言有什么不同 8、 安全性 9、开发者体验 10、生态系统 11、 未来发展 总结 引言 在区块链技…

鸿蒙到底是不是纯血?到底能不能走向世界?

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 2016年5月鸿蒙系统开始立项。 2018年美国开始经济战争&#xff0c;其中一项就是制裁华为&#xff0c;不让华为用安卓。 2019年8月9日华为正式发布鸿蒙系统。问题就出在这里&#xff0c;大家可以仔细看。 安卓一…

kafka 的高可用机制是什么?

大家好&#xff0c;我是锋哥。今天分享关于【kafka 的高可用机制是什么&#xff1f;】面试题&#xff1f;希望对大家有帮助&#xff1b; kafka 的高可用机制是什么&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Apache Kafka 是一个分布式消息系统&am…

【AI学习】Mamba学习(十二):深入理解S4模型

#1024程序员节&#xff5c;征文# HiPPO的学习暂告一段落&#xff0c;按照“HiPPO->S4->Mamba 演化历程”&#xff0c;接着学习S4。 S4对应的论文&#xff1a;《Efficiently Modeling Long Sequences with Structured State Spaces》 文章链接&#xff1a;https://ar5iv…

【移动应用开发】界面设计(二)实现水果列表页面

续上一篇博客 【移动应用开发】界面设计&#xff08;一&#xff09;实现登录页面-CSDN博客 目录 一、采用ViewBinding实现一个RecyclerView 1.1 在app/build.gradle中添加recyclerview依赖&#xff0c;并打开viewBinding &#xff08;1&#xff09;在app/build.gradle中添加…

CORS预检请求配置流程图 srpingboot和uniapp

首先要会判断预检请求 还是简单请求 简单请求 预检请求 #mermaid-svg-1R9nYRa7P9Pll4AK {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-1R9nYRa7P9Pll4AK .error-icon{fill:#552222;}#mermaid-svg-1R9nYRa7P9Pll4…

智能园艺:Spring Boot植物健康系统

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理植物健康系统的相关信息成为必然。开发合适…

51单片机——OLED显示图片

取模软件&#xff1a;链接:https://pan.baidu.com/s/1UcrbS7nU4bsawNxsaaULfQ 提取码:gclc 1、如果图片大小和格式不合适&#xff0c;可以先用Img2Lcd软件进行调整图片大小&#xff0c;一般取模软件使用的是.bmp图片&#xff0c;可以进行输出.bmp格式。软件界面如下&#xff1…

开源vGPU方案 HAMi实现细粒度GPU切分——筑梦之路

前言 为什么需要 GPU 共享、切分等方案&#xff1f; 在使用GPU的过程中我们会发现&#xff0c;直接在裸机环境使用&#xff0c;都可以多个进程共享 GPU&#xff0c;怎么到 k8s 环境就不行了&#xff1f; 1. 资源感知 在 k8s 中资源是和节点绑定的&#xff0c;对于 GPU 资源…

打包方式-jar和war的区别

1、jar包 JAR包是类的归档文件&#xff0c;与平台无关的文件格式&#xff0c;其实jar包就是java的类进行编译生成的class文件进行打包的压缩包。 JAR以ZIP文件格式为基础&#xff0c;与ZIP不同的是&#xff0c;JAR不仅用于压缩和发布&#xff0c;还用于部署和封装库、组件和插…

R实验——logistic回归、LDA、QDAKNN

数据集介绍&#xff1a; mpg&#xff0c;miles per gallon即油耗&#xff0c;这个数据集来自卡内基梅隆大学维护的StatLib库。1983年美国统计协会博览会使用了该数据集。这个数据集是对StatLib库中提供的数据集稍加修改的版本。根据Ross Quinlan(1993)在预测属性“mpg”中的使…

利用飞腾派进行OpenCV开发

实验目标&#xff1a; 完成飞腾平台OpenCV开发。 实验大纲&#xff1a; Mat数据结构加载、显示、保存图像读写像素RGB图像分离彩色图转灰度图 Mat数据结构 Mat是一个类&#xff0c;由两个数据部分组成&#xff1a;矩阵头(大小,通道,数据类型等)和数据块(像素 值)。创建示例…

vue3 选中对话框时,对话框右侧出一个箭头

先看下做出的效果&#xff1a; html代码&#xff0c;其中listPlan.records是后台拿到的数据进行遍历 <template><ul class"list"><li style"height: 180px;width: 95%":key"index"v-for"(item, index) in listPlan.record…

Android 判断手机放置的方向

#1024程序员节&#xff5c;征文# 文章目录 前言一、pandas是什么&#xff1f;二、使用步骤 1.引入库2.读入数据总结 需求 老板&#xff1a;我有个手持终端&#xff0c;不能让他倒了&#xff0c;当他倒或者倾斜的时候要发出报警&#xff1b; 程序猿&#xff1a;我这..... 老板…

Servlet(三)-------Cookie和session

一.Cookie和Session Cookie和Session都是用于在Web应用中跟踪用户状态的技术。Cookie是存储在用户浏览器中的小文本文件&#xff0c;由服务器发送给浏览器。当用户再次访问同一网站时&#xff0c;浏览器会把Cookie信息发送回服务器。例如&#xff0c;网站可以利用Cookie记住用…

Python与MySQL

一、Python 操作 MySQL 数据库软件 我们在上一篇文章《SQL入门》中使用了图形化工具DBeaver操作MySQL数据库软件&#xff0c;除了使用图形化工具以外&#xff0c;我们也可以使用编程语言来执行 SQL 从而操作数据库&#xff0c;可以在 Python 中&#xff0c;使用第三方库 pymys…