spring boot 实现直播聊天室

spring boot 实现直播聊天室

技术方案:

  • spring boot
  • websocket
  • rabbitmq

使用 rabbitmq 提高系统吞吐量

引入依赖

<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.42</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.23</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency>
</dependencies>

websocket 实现

MHttpSessionHandshakeInterceptor

参数拦截

/*** @Date: 2023/12/8 14:52* websocket 握手拦截* 1. 参数拦截(header或者 url 参数)* 2. token 校验*/
@Slf4j
public class MHttpSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {if (request instanceof ServletServerHttpRequest servletRequest){//ws://127.0.0.1:8080/group/2?username=xxxxHttpServletRequest httpServletRequest = servletRequest.getServletRequest();String requestURI = httpServletRequest.getRequestURI();String groupId = requestURI.substring(requestURI.lastIndexOf("/") + 1);String username = httpServletRequest.getParameter("username");log.info(">>>>>>> beforeHandshake groupId: {} - username: {}", groupId, username);attributes.put("username", username);//解析占位符attributes.put("groupId", groupId);}return super.beforeHandshake(request, response, wsHandler, attributes);}}
GroupWebSocketHandler

消息发送

@Slf4j
public class GroupWebSocketHandler implements WebSocketHandler {//Map<room,List<map<session,username>>>private ConcurrentHashMap<String, Queue<WebSocketSession>> sessionMap = new ConcurrentHashMap<>();@Autowiredprivate MessageClient messagingClient;@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info("{} 用户上线房间 {}", username, groupId);TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);SessionRegistry.getInstance().addSession(wsSession);}@Overridepublic void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {String groupId = (String) session.getAttributes().get("groupId");String username = (String) session.getAttributes().get("username");if (message instanceof PingMessage){log.info("PING");return;}else if (message instanceof TextMessage textMessage) {MessageDto messageDto = new MessageDto();messageDto.setSessionId(session.getId());messageDto.setGroup(groupId);messageDto.setFromUser(username);messageDto.setContent(new String(textMessage.getPayload()));messagingClient.sendMessage(messageDto);}}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info(">>> handleTransportError {} 用户上线房间 {}", username, groupId);}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {String username = (String) session.getAttributes().get("username");String groupId = (String) session.getAttributes().get("groupId");log.info("{} 用户下线房间 {}", username, groupId);TomcatWsSession wsSession = new TomcatWsSession(session.getId(),groupId, username, session);SessionRegistry.getInstance().removeSession(wsSession);}@Overridepublic boolean supportsPartialMessages() {return false;}}
WebSocketConfig

websocket 配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myHandler(), "/group/{groupId}").addInterceptors(new MHttpSessionHandshakeInterceptor()).setAllowedOrigins("*");}@Beanpublic GroupWebSocketHandler myHandler() {return new GroupWebSocketHandler();}@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();container.setMaxTextMessageBufferSize(8192);  //文本消息最大缓存container.setMaxBinaryMessageBufferSize(8192);  //二进制消息大战缓存container.setMaxSessionIdleTimeout(3L * 60 * 1000); // 最大闲置时间,3分钟没动自动关闭连接container.setAsyncSendTimeout(10L * 1000); //异步发送超时时间return container;}}

session 管理

将 websocketSession进行抽像,websocketsession可以由不同容器实现

WsSession
public interface  WsSession {/*** session 组* @return*/String group();/*** session Id* @return*/String getId();/*** 用户名或其他唯一标识* @return*/String identity();/*** 发送文本消息* @param messageDto*/void sendTextMessage(MessageDto messageDto);
}public abstract class AbstractWsSession implements WsSession {private String id;private String group;private String identity;public AbstractWsSession(String id, String group, String identity) {this.id = id;this.group = group;this.identity = identity;}@Overridepublic String group() {return this.group;}@Overridepublic String getId() {return this.id;}@Overridepublic String identity() {return this.identity;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;AbstractWsSession that = (AbstractWsSession) o;//简单比较 sessionIdreturn Objects.equals(id, that.id);}@Overridepublic int hashCode() {return Objects.hash(id, group, identity);}
}
TomcatWsSession

默认session实现

@Slf4j
public class TomcatWsSession extends AbstractWsSession {private WebSocketSession webSocketSession;public TomcatWsSession(String id, String group, String identity, WebSocketSession webSocketSession) {super(id, group, identity);this.webSocketSession = webSocketSession;}@Overridepublic void sendTextMessage(MessageDto messageDto) {String content = messageDto.getFromUser() + " say: " + messageDto.getContent();try {webSocketSession.sendMessage(new TextMessage(content));} catch (IOException e) {log.error("TomcatWsSession sendTextMessage error: identity:{}-group:{}-msg: {}",super.identity(), super.group(), JSON.toJSONString(messageDto));}}
}

SessionRegistry

websocket session管理

public class SessionRegistry {private static SessionRegistry instance;private SessionRegistry() {}public static SessionRegistry getInstance() {if (instance == null) {synchronized (SessionRegistry.class) {if (instance == null) {instance = new SessionRegistry();}}}return instance;}//Map<group,List<Session>>private ConcurrentHashMap<String, Queue<WsSession>> sessionMap = new ConcurrentHashMap<>();/*** 添加 session* @param wsSession*/public void addSession(WsSession wsSession) {sessionMap.computeIfAbsent(wsSession.group(),g -> new ConcurrentLinkedDeque<>()).add(wsSession);}/*** 移除 session* @param wsSession*/public void removeSession(WsSession wsSession) {Queue<WsSession> wsSessions = sessionMap.get(wsSession.group());if (!CollectionUtils.isEmpty(wsSessions)){//重写 WsSession equals 和 hashCode 方法,不然会移除失败wsSessions.remove(wsSession);if (CollectionUtils.isEmpty(wsSessions)){sessionMap.remove(wsSession.group());}}}/*** 发送消息* @param messageDto*/public void sendGroupTextMessage(MessageDto messageDto){Queue<WsSession> wsSessions = sessionMap.get(messageDto.getGroup());if (!CollectionUtils.isEmpty(wsSessions)){for (WsSession wsSession : wsSessions) {if (wsSession.getId().equals(messageDto.getSessionId())){continue;}wsSession.sendTextMessage(messageDto);}}}/*** session 在线统计* @param groupId* @return*/public Integer getSessionCount(String groupId) {if (StrUtil.isNotBlank(groupId)) {return sessionMap.get(groupId).size();}return sessionMap.values().stream().map(l -> l.size()).collect(Collectors.summingInt(a -> a));}
}

消息队列

这里使用 rabbitmq

MessageDto

消息体

@Data
public class MessageDto {/*** sessionId*/private String sessionId;/*** 组*/private String group;/*** 消息发送者*/private String fromUser;/*** 发送内容*/private String content;
}
MessageClient
@Component
@Slf4j
public class MessageClient {private String routeKey = "bws.key";private String exchange = "bws.exchange";@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(MessageDto messageDto) {try {rabbitTemplate.convertAndSend(exchange, routeKey, JSON.toJSONString(messageDto));} catch (AmqpException e) {log.error("MessageClient.sendMessage: {}", JSON.toJSONString(messageDto), e);}}
}
MessageListener
@Slf4j
@Component
public class MessageListener {@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = "bws.exchange", type = "topic"), value =@Queue(value = "bws.queue", durable = "true"), key = "bws.key"))public void onMessage(Message message) {String messageStr = "";try {messageStr = new String(message.getBody(), StandardCharsets.UTF_8);log.info("<<<<<<<<< MessageListener.onMessage:{}", messageStr);MessageDto messageDto = JSON.parseObject(messageStr, MessageDto.class);if (!Objects.isNull(messageDto)) {SessionRegistry.getInstance().sendGroupTextMessage(messageDto);} else {log.info("<<<<<<<<< MessageListener.onMessage is null:{}", messageStr);}} catch (Exception e) {log.error("######### MessageListener.onMessage: {}-{}", messageStr, e);}}}

application.properties配置


spring.rabbitmq.host=192.168.x.x
spring.rabbitmq.password=guest
spring.rabbitmq.port=27067
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=my-cluster

测试

websoket链接: ws://127.0.0.1:8080/group/2?username=xxx, websocket客户端测试地址

在这里插入图片描述

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/218348.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

十六、YARN和MapReduce配置

1、部署前提 &#xff08;1&#xff09;配置前提 已经配置好Hadoop集群。 配置内容&#xff1a; &#xff08;2&#xff09;部署说明 &#xff08;3&#xff09;集群规划 2、修改配置文件 MapReduce &#xff08;1&#xff09;修改mapred-env.sh配置文件 export JAVA_HOM…

从零开始:前端架构师的基础建设和架构设计之路

文章目录 一、引言二、前端架构师的职责三、基础建设四、架构设计思想五、总结《前端架构师&#xff1a;基础建设与架构设计思想》编辑推荐内容简介作者简介目录获取方式 一、引言 在现代软件开发中&#xff0c;前端开发已经成为了一个不可或缺的部分。随着互联网的普及和移动…

智能科技企业网站搭建的作用是什么

随着科学技术快速提升&#xff0c;各种智能产品随之而来&#xff0c;每个赛道里都涌入了大量企业商家&#xff0c;有些热门产品更是广受关注&#xff0c;对企业来说&#xff0c;形象、品牌、信息等方面需要完美呈现到用户眼前&#xff0c;而网站无疑是很好的工具。 企业通过【…

打开软木塞,我们来谈谈葡萄酒泡泡吧

香槟是任何庆祝场合的最佳搭配。从婚礼和生日到单身派对和典型的周五晚上&#xff0c;这款气泡饮料是生活中特别聚会的受欢迎伴侣。 来自云仓酒庄品牌雷盛红酒分享你知道吗&#xff0c;你喜欢喝的那瓶香槟酒可能根本不是香槟&#xff0c;而是汽酒&#xff1f;你不是唯一一个认…

造型精致的冰精灵充电头,充电效率高安全可靠,居家出行皆可用

随着大家对手机的依赖度越来越高&#xff0c;快速充电已经成为必不可少的需求。快充当然少不了支持快充的充电器&#xff0c;现在市面上的快充头很多&#xff0c;安全性和便携性是我们选择时的重点关注方向&#xff0c;我目前用的是战飞ZEFi冰精灵&#xff0c;这款产品有着独特…

【LeetCode刷题】-- 161.相隔为1的编辑距离

161.相隔为1的编辑距离 方法&#xff1a;一次遍历 首先&#xff0c;我们要确认字符串的长度不会相差太远。如果长度差了2个或更多字符&#xff0c;那么 s 和 t 就不可能是一次编辑之差的字符串。 接下来&#xff0c;我们假设 s 的长度总是短于或等于 t 的长度。如果不是这样&…

【C语言】结构体内存对齐

目录 引入结构体 结构的声明 创建和初始化 内部元素的使用&#xff1b; 特殊声明&#xff1a; 结构体在内存中的对齐 练习&#xff1a; 引入结构体 C语言有各种数据类型&#xff0c;我们已经对一些数据类型很熟悉&#xff1a; 整型&#xff08;int&#xff09;- 存储整…

MAC IDEA Maven Springboot

在mac中&#xff0c;使用idea进行maven项目构建 环境配置如何运行maven项目1.直接在IDEA中运行2.使用jar打包后执行 如何搭建spring boot1.添加依赖2.创建入口类3.创建控制器4. 运行5.其他 环境配置 官网安装IDEA使用IDEA的创建新项目选择创建MAEVEN项目测试IDEA的MAVEN路径是…

Mybatis-plus介绍与入门

前言 MyBatis-Plus是在MyBatis基础上的一个增强工具库&#xff0c;旨在简化开发者的工作&#xff0c;提高开发效率&#xff0c;同时保留MyBatis的灵活性。使用 MyBatis-Plus 可以减少重复性的代码&#xff0c;简化常见的数据库操作 官方学习文档&#xff1a;MyBatis-Plus (bao…

phpstudy是什么?

PHPStudy 是一个集成环境工具&#xff0c;它将 PHP 开发所需的软件&#xff0c;如 Apache&#xff08;Web服务器&#xff09;、MySQL&#xff08;数据库服务器&#xff09;、PHP&#xff08;脚本语言&#xff09;等打包在一起&#xff0c;以便用户能够轻松安装和配置这些软件&a…

fl studio20中文内测版下载2024最新完美实现汉化

fl studio20是一款众所周知的水果编曲软件&#xff0c;能够剪辑、混音、录音&#xff0c;它的矢量界面能更好用在4K、5K甚至8K显示器上&#xff0c;还可以可以编曲、剪辑、录音、混音&#xff0c;让你的计算机成为全功能录音室&#xff0c;不论是在功能上面还是用户界面上都是数…

为了吃鸡苦练狙击,避免坑队友自己造一个狙击游戏!

引言 一文教会你造一个简易的狙击游戏。 说到狙击&#xff0c;相信大家都不陌生&#xff0c;无论是影视作品还是网络游戏&#xff0c;都经常能看到狙击枪的身影&#xff0c;最深刻的是它能够从百里之外&#xff0c;一枪爆头。 本文将介绍如何在Cocos Creator中造一个简易的狙…

真正可行的vue3迁移到nuxt3方法(本人亲测,完全避坑)

终于到了总结经验的时候了&#xff0c;这绝对是全网唯一、完全真正可行的干货。 在我看来&#xff0c;知识就是要拿来分享的&#xff0c;分享给他人也是在提高自己。我绝对不会搞什么订阅或者vip专栏来搞钱坑害各位&#xff0c; 因为我在csdn写文章最主要的目的是为了记录和总…

虚幻学习笔记13—C++静态和动态加载

一、前言 我们在蓝图中可以很方便的添加各种需要的组件&#xff0c;那么在C代码中要如何实现呢。在代码中分静态和动态加载&#xff0c;而无论静态和动态&#xff0c;加载的内容有资源和资源类&#xff0c;资源类通常为带资源的蓝图类。 二、实现 在实现静态或动态加载时&…

科技云报道:从数据到生成式AI,是该重新思考风险的时候了

科技云报道原创。 OpenAI“宫斗”大戏即将尘埃落定。 自首席执行官Sam Altman突然被董事会宣布遭解雇、董事长兼总裁Greg Brockman辞职&#xff1b;紧接着OpenAI员工以辞职威胁董事会要求Altman回归&#xff1b;再到OpenAI董事会更换成员、Altman回归OpenAI。 表面上看&…

OpenHarmony关于修改系统横屏导致启动视频显示不全问题解决

前言 OpenHarmony源码版本&#xff1a;4.0release 开发板&#xff1a;DAYU / rk3568 前段时间写的设置OpenHarmony启动视频&#xff0c;在竖屏状态下是正常的&#xff0c;但是横屏状态下显示不全。 链接直达&#xff1a;OpenHarmony 设备启动Logo和启动视频替换指南-CSDN博…

Java EE 多线程之线程安全的集合类

文章目录 1. 多线程环境使用 ArrayList1. 1 Collections.synchronizedList(new ArrayList)1.2 CopyOnWriteArrayList 2. 多线程环境使用队列2.1 ArrayBlockingQueue2.2 LinkedBlockingQueue2.3 PriorityBlockingQueue2.4 TransferQueue 3. 多线程环境使用哈希表3.1 Hashtable3.…

优雅玩转实验室服务器(一)登录服务器

这篇文章更加偏向于使用python程序进行研究的朋友们 原料 Windows主机实验室Linux服务器&#xff08;可以访问互联网&#xff09;一点点耐心 step.0 windows terminal is all you need 别跟我说什么putty&#xff0c;什么winscp&#xff0c;我就是单推Win11自带的软件——win…

深度学习在人体动作识别领域的应用:开源工具、数据集资源及趋动云GPU算力不可或缺

人体动作识别检测是一种通过使用计算机视觉和深度学习技术&#xff0c;对人体姿态和动作进行实时监测和分析的技术。该技术旨在从图像或视频中提取有关人体姿态、动作和行为的信息&#xff0c;以便更深入地识别和理解人的活动。 人体动作识别检测的基本步骤包括&#xff1a; 数…

[已解决】uniapp内置插件,editor富文本报错(附quill.min.js、image-resize.min.js文件)

在使用uni-app运行内置插件editor时&#xff0c;无法输入内容&#xff0c;控制台报错 原因&#xff1a;查看官网得知&#xff0c;需动态引入quill.min.js、image-resize.min.js文件 解决方法&#xff1a; 1.下载quill.min.js、image-resize.min.js到项目static/eidtor文件中 链…