Springboot整合Netty简单实现1对1聊天(vx小程序服务端)

本文功能实现较为简陋,demo内容仅供参考,有不足之处还请指正。

背景

一个小项目,用于微信小程序的服务端,需要实现小程序端可以和他人1对1聊天

实现功能

Websocket、心跳检测、消息持久化、离线消息存储

Netty配置类

/*** @author Aseubel*/
@Component
@Slf4j
@EnableConfigurationProperties(NettyServerConfigProperties.class)
public class NettyServerConfig {private ChannelFuture serverChannelFuture;// 心跳间隔(秒)private static final int HEARTBEAT_INTERVAL = 15;// 读超时时间private static final int READ_TIMEOUT = HEARTBEAT_INTERVAL * 2;// 使用线程池管理private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);private final EventLoopGroup workerGroup = new NioEventLoopGroup();private final NettyServerConfigProperties properties;// 由于在后面的handler中有依赖注入类,所以要通过springboot的ApplicationContext来获取Bean实例@Autowiredprivate ApplicationContext applicationContext;public NettyServerConfig(NettyServerConfigProperties properties) {this.properties = properties;}@PostConstructpublic void startNettyServer() {// 使用独立线程启动Netty服务new Thread(() -> {try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();SSLContext sslContext = SslUtil.createSSLContext("PKCS12",properties.getSslPath(), properties.getSslPassword());// SSLEngine 此类允许使用ssl安全套接层协议进行安全通信SSLEngine engine = sslContext.createSSLEngine();engine.setUseClientMode(false);pipeline.addLast(new SslHandler(engine)); // 设置SSLpipeline.addLast(new HttpServerCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));// 最大10MBpipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpHandler());// 只有text和binarytext的帧能经过WebSocketServerProtocolHandler,所以心跳检测这两个都得放前面pipeline.addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0, TimeUnit.SECONDS));pipeline.addLast(new HeartbeatHandler());pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024 * 1024));pipeline.addLast(applicationContext.getBean(MessageHandler.class));pipeline.addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 统一处理所有未被前面handler捕获的异常log.error("全局异常捕获: {}", cause.getMessage());ctx.channel().close();}});}});serverChannelFuture = bootstrap.bind(properties.getPort()).sync();// 保持通道开放serverChannelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();}@PreDestroypublic void stopNettyServer() {// 优雅关闭if (serverChannelFuture != null) {serverChannelFuture.channel().close();}bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}

Handler

心跳检测

/*** @author Aseubel*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {private static final int HEARTBEAT_INTERVAL = 15; // 心跳间隔(秒)private static final int MAX_MISSED_HEARTBEATS = 2; // 允许丢失的心跳次数// 记录每个连接的丢失心跳次数private final Map<ChannelId, Integer> missedHeartbeats = new ConcurrentHashMap<>();@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 添加 IdleStateHandler 触发读空闲事件ctx.pipeline().addLast(new IdleStateHandler(HEARTBEAT_INTERVAL * MAX_MISSED_HEARTBEATS, 0, 0));scheduleHeartbeat(ctx);}private void scheduleHeartbeat(ChannelHandlerContext ctx) {ctx.executor().scheduleAtFixedRate(() -> {if (ctx.channel().isActive()) {ctx.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)));// 记录丢失的心跳次数missedHeartbeats.compute(ctx.channel().id(), (k, v) -> v == null ? 1 : v + 1);}}, HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof PongWebSocketFrame) {// 收到 Pong 后重置丢失计数missedHeartbeats.remove(ctx.channel().id());ctx.fireChannelRead(msg); // 传递消息给后续处理器} else {ctx.fireChannelRead(msg);}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {int missed = missedHeartbeats.getOrDefault(ctx.channel().id(), 0);if (missed >= MAX_MISSED_HEARTBEATS) {// 超过最大丢失次数,关闭连接System.out.println("连接超时,关闭连接" + ctx.channel().id().asLongText());ctx.close();cleanOfflineResources(ctx.channel());}}}private void cleanOfflineResources(Channel channel) {MessageHandler.removeUserChannel(channel);missedHeartbeats.remove(channel.id());}
}

处理http请求,建立连接

/*** @author Aseubel* @description 处理websocket连接请求,将code参数存入channel的attribute中* @date 2025-02-21 15:34*/
public class HttpHandler extends ChannelInboundHandlerAdapter {public static final AttributeKey<String> WS_TOKEN_KEY = AttributeKey.valueOf("code");@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 判断是否是连接请求if (msg instanceof FullHttpRequest) {FullHttpRequest request = (FullHttpRequest) msg;try {QueryStringDecoder decoder = new QueryStringDecoder(request.uri());ctx.channel().attr(WS_TOKEN_KEY).set(decoder.parameters().get("code").get(0));} catch (Exception e) {throw new AppException("非法的websocket连接请求");}// 将 FullHttpRequest 转发到 MessageHandlerctx.fireChannelRead(request);// 重新设置 uri,将请求转发到 websocket handler,否则无法成功建立连接request.setUri("/ws");}// 消息直接交给下一个 handlersuper.channelRead(ctx, msg);}}

 消息处理

/*** @author Aseubel* @description 处理 WebSocket 消息* @date 2025-02-21 15:33*/
@Component
@Slf4j
@Sharable
public class MessageHandler extends SimpleChannelInboundHandler<WebSocketFrame> {public static final AttributeKey<String> WS_TOKEN_KEY = AttributeKey.valueOf("code");public static final AttributeKey<String> WS_USER_ID_KEY = AttributeKey.valueOf("userId");private static final Map<String, Queue<WebSocketFrame>> OFFLINE_MSGS = new ConcurrentHashMap<>();private static final Map<String, Channel> userChannels = new ConcurrentHashMap<>();@Autowiredprivate ThreadPoolTaskExecutor threadPoolExecutor;@Resourceprivate IMessageRepository messageRepository;// 提供受控的访问方法public static void removeUserChannel(Channel channel) {userChannels.values().remove(channel);}public static boolean containsUser(String userId) {return userChannels.containsKey(userId);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object req) throws Exception {if (req instanceof FullHttpRequest) {String code = getCodeFromRequest(ctx); // 从请求中提取 codeString userId = getOpenid(APPID, SECRET, code);    // 验证 code 获取 openiduserChannels.put(userId, ctx.channel());ctx.channel().attr(WS_USER_ID_KEY).set(userId);System.out.println("客户端连接成功,用户id:" + userId);// 由于这里还在处理握手请求也就是建立连接,所以需要延迟发送离线消息new Thread(() -> {try {Thread.sleep(50);OFFLINE_MSGS.getOrDefault(userId, new LinkedList<>()).forEach(ctx::writeAndFlush);OFFLINE_MSGS.remove(userId);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();} else if (req instanceof TextWebSocketFrame ) {this.channelRead0(ctx, (TextWebSocketFrame) req);} else {ctx.fireChannelRead(req);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {if (frame instanceof TextWebSocketFrame) {MessageEntity message = validateMessage(ctx.channel().attr(WS_USER_ID_KEY).get(), (TextWebSocketFrame) frame);saveMessage(message);sendOrStoreMessage(message.getToUserId(), frame);} else {ctx.close();}}// 处理连接断开@Overridepublic void channelInactive(ChannelHandlerContext ctx) {System.out.println("客户端断开连接,用户id:" + ctx.channel().attr(WS_USER_ID_KEY).get());Channel channel = ctx.channel();for (Map.Entry<String, Channel> entry : userChannels.entrySet()) {if (entry.getValue() == channel) {userChannels.remove(entry.getKey());break;}}}private MessageEntity validateMessage(String userId, TextWebSocketFrame textFrame) {String message = textFrame.text();try {JsonObject json = JsonParser.parseString(message).getAsJsonObject();String toUserId = json.get("toUserId").getAsString();String content = json.get("content").getAsString();String type = json.get("type").getAsString();if (type.equals("text") || type.equals("image")) {return new MessageEntity(userId, toUserId, content, type);} else {throw new AppException("非法的消息类型!");}} catch (Exception e) {throw new AppException("非法的消息格式!");}}private void sendOrStoreMessage(String toUserId, WebSocketFrame message) {if (isUserOnline(toUserId)) {Channel targetChannel = userChannels.get(toUserId);if (targetChannel != null && targetChannel.isActive()) {targetChannel.writeAndFlush(message.retain());}} else {// 存储原始WebSocketFrame(需保留引用)OFFLINE_MSGS.computeIfAbsent(toUserId, k -> new LinkedList<>()).add(message.retain());}}private void saveMessage(MessageEntity message) {threadPoolExecutor.execute(() -> {messageRepository.saveMessage(message);});}private boolean isUserOnline(String userId) {return userChannels.containsKey(userId);}private String getCodeFromRequest(ChannelHandlerContext ctx) {String code = ctx.channel().attr(WS_TOKEN_KEY).get();// 检查 code 参数是否存在且非空if (code == null || code.isEmpty()) {throw new IllegalArgumentException("WebSocket token  is missing or empty");}return code;}private String getOpenid(String appid, String secret, String code) {Map<String, String> paramMap = new HashMap<>();paramMap.put("appid", appid);paramMap.put("secret", secret);paramMap.put("js_code", code);paramMap.put("grant_type", "authorization_code");String result = HttpClientUtil.doGet(WX_LOGIN, paramMap);//获取请求结果JSONObject jsonObject = JSON.parseObject(result);String openid = jsonObject.getString("openid");//判断openid是否存在if (StringUtils.isEmpty(openid)) {throw new WxException(jsonObject.getString("errcode"), jsonObject.getString("errmsg"));}return openid;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof AppException appCause) {log.error("AppException caught: {}", appCause.getInfo());} else if (cause instanceof WxException wxCause) {log.error("WxException caught: {}", wxCause.getMessage());} else {log.error("Exception caught: {}", cause.getMessage(), cause);}ctx.close(); // 建议关闭发生异常的连接}}

连接及消息格式:

wss://127.0.0.1:21611/ws?code=xxxxxx{"toUserId": "1001","type": "text","content": "Hello World!"
}

规定了type只有text和image两种,text为文本content,image则为Base64编码格式

本文功能实现较为简陋,demo内容仅供参考,可能有注释错误或设计不合理的地方

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/37528.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【架构】单体架构 vs 微服务架构:如何选择最适合你的技术方案?

文章目录 ⭐前言⭐一、架构设计的本质差异&#x1f31f;1、代码与数据结构的对比&#x1f31f;2、技术栈的灵活性 ⭐二、开发与维护的成本博弈&#x1f31f;1、开发效率的阶段性差异&#x1f31f;2、维护成本的隐形陷阱 ⭐三、部署与扩展的实战策略&#x1f31f;1、部署模式的本…

Java-SpringBootWeb入门、Spring官方脚手架连接不上解决方法

一. Spring 官网&#xff1a;Spring | Home Spring发展到今天已经形成了一种开发生态圈&#xff0c;Spring提供了若干个子项目&#xff0c;每个项目用于完成特定的功能(Spring全家桶) Spring Boot可以帮助我们非常快速的构建应用程序、简化开发、提高效率 。 二. Spring Boot入…

springboot整合redis

创建springboot整合redis工程&#xff1a; 一、springboot整合redis步骤 首先我们要知道什么是redis&#xff1a; 三步骤完成springboot对redis数据库的整合&#xff1a; 1、导入springboot整合redis坐标&#xff08;上面勾选的那个就是&#xff09; 2、在yml配置文件中配置re…

图解AUTOSAR_CP_EEPROM_Abstraction

AUTOSAR EEPROM抽象模块详细说明 基于AUTOSAR标准的EEPROM抽象层技术解析 目录 1. 概述 1.1 核心功能1.2 模块地位2. 架构概览 2.1 架构层次2.2 模块交互3. 配置结构 3.1 主要配置容器3.2 关键配置参数4. 状态管理 4.1 基本状态4.2 状态转换5. 接口设计 5.1 主要接口分类5.2 接…

【AI Infra】【RLHF框架】二、VeRL中colocate实现解析

​ colocate的作用是使多个Worker共享相同的资源池。当然&#xff0c;目前verl中所有模型的Worker都共享相同的资源池:global_pool。这篇博客主要通过例子和源代码理解verl中colocate的实现&#xff0c;需要一些前置知识。建议先阅读 【AI Infra】【RLHF框架】一、VeRL中基于R…

PostgreSQL_数据表结构设计并创建

目录 前置&#xff1a; 1 数据表设计思路 2 数据表格SQL 3 创建 3.1 创建数据库 db_stock 3.2 在 pgAdmin4 中创建表 前置&#xff1a; 本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文 1 数据表设计思路 1 日数据来自优矿&#xff0c;优矿的数据…

植物来源药用天然产物的合成生物学研究进展-文献精读121

植物来源药用天然产物的合成生物学研究进展 摘要 大多数药用天然产物在植物中含量低微&#xff0c;提取分离困难&#xff1b;而且这些化合物一般结构复杂&#xff0c;化学合成难度大&#xff0c;还容易造成环境污染。基于合成生物学技术获得药用天然产物具有绿色环保和可持续发…

Nordic nRF 蓝牙的 Direct Test Mode (DTM) 测试介绍

目录 概述 1. 核心物理层参数 1.1 射频频率 (RF Channel Frequency) 1.2 发射功率 (TX Power) 1.3 调制方式 (Modulation) 1.4 数据包类型 (Packet Type) 1.5 测试模式 (Test Mode) 2. 参数配置方法 2.1 通过 HCI 命令配置 2.2 示例&#xff08;nRF52 系列&#xff0…

区间震荡指标

区间震荡指标的逻辑如下&#xff1a; 一、函数注解 1. Summation函数 功能&#xff1a; 计算给定价格序列Price的前Length个数据点的和&#xff0c;或在数据点数量超过Length时&#xff0c;计算滚动窗口内的价格和。 参数&#xff1a; Price(1)&#xff1a;价格序列&#…

文章防洗稿隐蔽混淆软件

如果你的文章经常被人洗稿搬运&#xff0c;那么这个小工具或许可以帮到你 基本原理: 在文章的每个字后面&#xff0c;加上一些随机的隐藏字符 人眼看不到&#xff0c;但是机器会读取到&#xff0c;如果别人是用AI工具来对你的文章进行洗稿&#xff0c;就会发现这是一堆乱码 你…

车载软件架构 --- AUTOSAR AP/CP中诊断的区别

我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师&#xff1a; 周末洗了一个澡&#xff0c;换了一身衣服&#xff0c;出了门却不知道去哪儿&#xff0c;不知道去找谁&am…

百度OCR调用记录

根据说明&#xff0c;调用测试 设置注册的API Key和Secret Key 调用类&#xff08;官方文档中有&#xff09; 这里改传入路径&#xff1b; 测试问题 1.{"error_code":110,"error_msg":"Access token invalid or no longer valid"} 查到说是 …

19.哈希表的实现

1.哈希的概念 哈希(hash)⼜称散列&#xff0c;是⼀种组织数据的⽅式。从译名来看&#xff0c;有散乱排列的意思。本质就是通过哈希函数把关键字Key跟存储位置建⽴⼀个映射关系&#xff0c;查找时通过这个哈希函数计算出Key存储的位置&#xff0c;进⾏快速查找。 1.2.直接定址法…

网络编程之解除udp判断客户端是否断开

思路&#xff1a;每几秒发送一条不显示的信息&#xff0c;客户端断开则不再发送信息&#xff0c;超时则表示客户端断开连接。&#xff08;心跳包&#xff09; 服务器 #include <head.h>#define MAX_CLIENTS 100 // 最大支持100个客户端 #define TIMEOUT 5 // 5秒…

Java 大视界 -- Java 大数据在智能医疗远程会诊与专家协作中的技术支持(146)

&#x1f496;亲爱的朋友们&#xff0c;热烈欢迎来到 青云交的博客&#xff01;能与诸位在此相逢&#xff0c;我倍感荣幸。在这飞速更迭的时代&#xff0c;我们都渴望一方心灵净土&#xff0c;而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识&#xff0c;也…

调用feapder作为子程序时setting.py文件不起作用

feaper 官方文档地址&#xff1a; 简介及安装 - feapder官方文档|feapder-document 问题&#xff1a; 在最近的开发中需要调用feapder作为主程序调用的子程序时发现自动入库时无法入库&#xff0c;通过查看日志信息发现连接数据库时被拒绝连接了&#xff0c;但是我的setting.p…

【STM32】SPI通信协议W25Q64Flash存储器芯片(学习笔记)

通信接口部分有介绍SPI&#xff1a;【STM32】USART串口协议&串口外设-学习笔记-CSDN博客 SPI通信协议 SPI通信 SPI&#xff08;Serial Peripheral Interface&#xff09;是由Motorola公司开发的一种通用数据总线四根通信线&#xff1a;SCK&#xff08;Serial Clock&…

刘强东突然发声:不该用算法压榨最底层兄弟!东哥,真正的人民企业家

今天忙了一天&#xff0c;很累&#xff0c;准备睡觉的时候&#xff0c;看到网上盛传的刘强东的朋友圈&#xff0c;东哥又在朋友圈发文了。 说实话&#xff0c;看完之后&#xff0c;感动&#xff0c;真的感动。 尤其是当我看到这两句话的时候。 1、我们所学的知识、商业模式、技…

Maven安装与环境配置

首先我们先介绍一些关于Maven的知识&#xff0c;如果着急直接看下面的安装教程。 目录 Maven介绍 Maven模型 Maven仓库 Maven安装 下载 安装步骤 Maven介绍 Apache Maven是一个项目管理和构建工具&#xff0c;它基于项目对象模型(Project Object Model , 简称: POM)的概念…

C++ 语法之数组指针

一维数组&#xff1a; 如果我们定义了一个一维数组&#xff0c;那么这个数组名&#xff0c;就是指向第一个数组元素的地址&#xff0c;也即&#xff0c;是整个数组分配的内存空间的首地址。 比如 int a[3]; 定义了一个包含三个元素的数组。因为一个int占4个字节&#xff0c;那…