文章目录
- 前言
- 需求分析
- 技术预研
- Web端方案
- 服务端技术
- 技术方案
- 设计思路
- 功能实现
- 添加依赖
- 自定义NettyServer
- 自定义webSocketHandler
- 使用NettyServer向在线用户发送消息
- 需要完善的地方
前言
我们的项目有个基于项目的在线文档编制模块,可以邀请多人项目组成员在线协同编制项目文档,现在的需求是要实现项目组成员在线实时协作沟通交流功能以及消息实时推送功能。
需求分析
根据需求分析,首先我们要基于项目组成员构建在线聊天群组并支持在线聊天,同时成员在线时支持实时推送消息。
技术预研
Web端方案
实现Web消息实时推送的方案比较多,包括轮询、长轮询、SSE、AJAX、WebSocket等。根据对比我们最终选择使用WebSocket来实现Web消息实时推送。
- WebSocket: WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。
WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
服务端技术
- spring-boot-starter-websocket
SpringBoot框架提供了WebSockets自动配置,通过spring-boot-starter-websocket模块轻松访问。
<!-- 引入 WebSocket 模块依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>// 创建一个配置类来配置 WebSocket 服务器
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(new MyWebSocketHandler(), "/ws").setAllowedOrigins("*");}
}
// 自定义消息处理
public class MyWebSocketHandler extends TextWebSocketHandler {@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {log.debug("Connection established: " + session.getId());}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {String payload = message.getPayload();log.debug("Received message: " + payload);// 回复消息session.sendMessage(new TextMessage("Echo: " + payload));}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {log.debug("Connection closed: " + session.getId());}
}
如果不考虑吞吐和并发,spring-boot-starter-websocket非常适合构建WebSocket Server端。
- Netty
Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
技术方案
设计思路
- 自定义NettyServer 基于项目构建用户群组
- 用户在指定群组发送消息,NettyServer向群组所有用户推送消息
- 业务系统向指定用户发送通知消息到kafka
- 消费者消费消息通过暴露出的NettyServerHanlder向所有在线用户实时推送消息
功能实现
添加依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.112.Final</version></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
自定义NettyServer
因为我们需要暴露NettyServer 的webSocketHandler,所以将NettyServer实例交由Spring管理,并暴露广播消息和系统消息接口
@Slf4j
@Component
public class CusNettyServer implements InitializingBean, DisposableBean {@Value("${netty.port:9000}")Integer nettyPort;EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 通常只需要一个线程即可EventLoopGroup workerGroup = new NioEventLoopGroup(); // 根据实际情况调整线程数 默认创建与 CPU 核心数相等的线程数private ChannelFuture channelFuture;private NettyWebSocketHandler webSocketHandler;@Overridepublic void destroy() throws Exception {if (channelFuture != null && channelFuture.channel().isOpen()) {channelFuture.channel().closeFuture().sync();}bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}@Overridepublic void afterPropertiesSet() throws Exception {ServerBootstrap sb = new ServerBootstrap();sb.option(ChannelOption.SO_BACKLOG, 128); // 考虑调整这个值sb.option(ChannelOption.SO_REUSEADDR, true); // 避免地址重用问题sb.childOption(ChannelOption.TCP_NODELAY, true); // 减少延迟sb.childOption(ChannelOption.SO_KEEPALIVE, true); // 保持连接webSocketHandler = new NettyWebSocketHandler();// 绑定线程池sb.group(bossGroup, workerGroup)// 指定使用的channel.channel(NioServerSocketChannel.class)// 绑定监听端口.localAddress(this.nettyPort)// 绑定客户端连接时候触发操作.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.debug("收到新连接: {}", ch.remoteAddress());//websocket协议本身是基于http协议的,所以这边也要使用http解编码器ch.pipeline().addLast(new HttpServerCodec());//以块的方式来写的处理器ch.pipeline().addLast(new ChunkedWriteHandler());ch.pipeline().addLast(new HttpObjectAggregator(8192));ch.pipeline().addLast(webSocketHandler);//添加聊天消息处理类ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));}});// 服务器异步创建绑定channelFuture = sb.bind().sync();log.debug("{} 启动正在监听: {}", NettyServer.class, channelFuture.channel().localAddress());}// 广播消息接口public void broadcastMessage(SocketMessage socketMessage) {webSocketHandler.broadcastMessage(socketMessage);}// 系统通知接口public void sendSystemMessage(SocketMessage socketMessage, String toUserId) {webSocketHandler.sendSystemMessage(socketMessage, toUserId);}
}
自定义webSocketHandler
前面自定义CusNettyServer 过程我们是将NettyWebSocketHandler 放在外层初始化的,为了避免一个Handler被多个channel传递抛io.netty.channel.ChannelPipelineException异常,我们需要将NettyWebSocketHandler 标记为 @ChannelHandler.Sharable
@ChannelHandler.Sharable
@Slf4j
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {/*** 存储已经登录用户的channel对象*/public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 存储用户id和用户的channelId绑定*/public static ConcurrentHashMap<String, ChannelId> userMap = new ConcurrentHashMap<>();/*** 存储广播消息的channel对象*/private static final ConcurrentHashMap<String, Channel> broadcastClients = new ConcurrentHashMap<>();/*** 用于存储群聊房间号和群聊成员的channel信息*/public static ConcurrentHashMap<String, ChannelGroup> groupMap = new ConcurrentHashMap<>();private static final ExecutorService executor = Executors.newFixedThreadPool(10); // 创建线程池private final TokenStore tokenStore = SpringUtil.getBean(TokenStore.class);private final StringRedisTemplate redisTemplate = SpringUtil.getBean(StringRedisTemplate.class);/*** 获取用户拥有的群聊id号*/private final UserGroupRepository userGroupRepository = SpringUtil.getBean(UserGroupRepository.class);private final MessageDataAssembler messageDataAssembler = SpringUtil.getBean(MessageDataAssembler.class);private final MessageManagerService messageService = SpringUtil.getBean(MessageManagerService.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("与客户端建立连接,通道开启!");//添加到channelGroup通道组channelGroup.add(ctx.channel());ctx.channel().id();}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("与客户端断开连接,通道关闭!");//添加到channelGroup 通道组channelGroup.remove(ctx.channel());}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//首次连接是FullHttpRequest,把用户id和对应的channel对象存储起来if (msg instanceof FullHttpRequest) {FullHttpRequest request = (FullHttpRequest) msg;// 首次握手进行登录验证String uri = request.uri();String token = getUrlParams(uri);String userId = chkLogin(token);userMap.put(userId, ctx.channel().id());broadcastClients.put(userId, ctx.channel());log.info("登录的用户id是:{}", userId);//第1次登录,需要查询下当前用户是否加入项目组,没有拒绝连接,有将群聊管理对象放入groupMap中List<UserGroup> groups = userGroupRepository.findGroupIdByUserId(userId);if (CollUtil.isNotEmpty(groups)) {groups.stream().map(UserGroup::getProjectId).forEach(groupId -> {ChannelGroup cGroup = Optional.ofNullable(groupMap.get(groupId)).orElseGet(() -> {ChannelGroup newGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);groupMap.put(groupId, newGroup);return newGroup;});//把用户放到群聊管理对象里去cGroup.add(ctx.channel());});}//如果url包含参数,需要处理if (uri.contains("?")) {String newUri = uri.substring(0, uri.indexOf("?"));request.setUri(newUri);}} else if (msg instanceof TextWebSocketFrame) {//正常的TEXT消息类型TextWebSocketFrame frame = (TextWebSocketFrame) msg;log.info("客户端收到服务器数据:{}", frame.text());SocketMessage socketMessage = JSON.parseObject(frame.text(), SocketMessage.class);socketMessage.setSendTime(new Date());socketMessage.setId(IdUtil.getSnowflakeNextIdStr());// 如果群聊不存在,则不处理消息if (!groupMap.containsKey(socketMessage.getProjectId())) {log.info("无效消息,对应群聊不存在 {}", socketMessage.getProjectId());return;}// 将消息存储到 RedisString projectId = socketMessage.getProjectId();String messageKey = String.join(":", "message", projectId, socketMessage.getId());String messageJson = JSON.toJSONString(socketMessage);redisTemplate.opsForValue().set(messageKey, messageJson, 10, TimeUnit.MINUTES);// 异步处理消息executor.submit(() -> {// 从 Redis 中获取消息String storedMessageJson = redisTemplate.opsForValue().get(messageKey);if (storedMessageJson != null) {SocketMessage storedMessage = JSON.parseObject(storedMessageJson, SocketMessage.class);// 持久化消息Message message = messageDataAssembler.toEntity(storedMessage);message.setBizType(MsgBizType.PROJECT.getCode());Message saved = messageService.saveMessage(message);storedMessage.setId(saved.getId());// 推送群聊信息// 这里假设 groupMap 已经定义并且是线程安全的ChannelGroup group = groupMap.get(projectId);if (group != null) {group.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(storedMessage)));}// 处理完成移除RedisredisTemplate.delete(messageKey);}});}super.channelRead(ctx, msg);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {}public void broadcastMessage(SocketMessage socketMessage) {// 异步处理消息executor.submit(() -> {// 持久化消息Message message = messageDataAssembler.toEntity(socketMessage);message.setSendTime(new Date());message.setBizType(MsgBizType.BROADCAST.getCode());message.setMessageType(MessageType.TEXT.getCode());messageService.broadcastMessage(message);channelGroup.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(socketMessage)));});}public void sendSystemMessage(SocketMessage socketMessage, String toUserId) {// 持久化消息Message message = messageDataAssembler.toEntity(socketMessage);messageService.sendUserMessage(message, toUserId);// 如何用户在线则推送websocket消息Optional.ofNullable(userMap.get(toUserId)).map(channelId -> channelGroup.find(channelId)).ifPresent(channel -> channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(socketMessage))));}private static String getUrlParams(String url) {if (!url.contains("=")) {throw new BusinessException(CusBusinessExceptionEnum.BUSINESS_ERROR_NETTY_SERVER_PATH_MUST_HAS_USER_ID_ERROR);}return url.substring(url.indexOf("=") + 1);}private String chkLogin(String token) {OAuth2AccessToken accessToken = tokenStore.readAccessToken(token);if (accessToken == null) {throw new BusinessException(401, "Invalid access token:" + token);}if (accessToken.isExpired()) {throw new BusinessException(401, "Expired access token:" + token);}OAuth2Authentication oauth2Authentication = tokenStore.readAuthentication(accessToken);if (tokenStore.readAuthentication(accessToken) == null) {throw new BusinessException(401, "access token Authentication error:" + token);}LoginAppUser loginAppUser = (LoginAppUser) oauth2Authentication.getPrincipal();return loginAppUser.getUserId();}
}
这里有个问题是我们始终未解决的,那就是首次握手token传递的问题,最开始后端是FullHttpRequest request中获取token,且通过apifox也验证通过,但是前端在实现过程始终无法传递token(这块有大佬实现可以在评论下留言指点下)
这是我最开始的实现
FullHttpRequest request = (FullHttpRequest) msg;// 首次握手进行登录验证// String userId = chkLogin(request);private String chkLogin(FullHttpRequest request) {String token = Optional.ofNullable(request.headers()).map(headers -> headers.get(HttpHeaderNames.AUTHORIZATION)).map(authHeader -> authHeader.replace("Bearer ", "")).orElseThrow(() -> new BusinessException(CusBusinessExceptionEnum.BUSINESS_ERROR_NETTY_SERVER_NOT_LOGIN_ERROR));OAuth2AccessToken accessToken = tokenStore.readAccessToken(token);if (accessToken == null) {throw new BusinessException(401, "Invalid access token:" + token);}if (accessToken.isExpired()) {throw new BusinessException(401, "Expired access token:" + token);}OAuth2Authentication oauth2Authentication = tokenStore.readAuthentication(accessToken);if (tokenStore.readAuthentication(accessToken) == null) {throw new BusinessException(401, "access token Authentication error:" + token);}LoginAppUser loginAppUser = (LoginAppUser) oauth2Authentication.getPrincipal();return loginAppUser.getUserId();}
下图是我通过apifox使用header传递token验证成功
使用NettyServer向在线用户发送消息
消费kafka消息并使用NettyServer向在线用户发送消息
@Component
public class NotifyMsgConsumer {private final MessageManagerApplication messageManagerApplication;@KafkaListener(topics = "system_message_notify")public void processMessage(ConsumerRecord<Long, String> record, Acknowledgment acknowledgment) {log.info("system_message_notify 通知: {} {} ", record.key(), record.value());if (StringUtils.isEmpty(record.value())) {log.debug("system_message_notify 消息为空 {} 消息直接丢弃", record.key());acknowledgment.acknowledge();return;}NotifyMsg notifyMsg = null;try {notifyMsg = JSONObject.parseObject(record.value(), NotifyMsg.class);} catch (Exception e) {log.debug("system_message_notify 消息格式异常 {} {} 消息直接丢弃", record.key(), record.value());acknowledgment.acknowledge();return;}messageManagerApplication.sendNotify(notifyMsg);acknowledgment.acknowledge();}}@Override
public void sendNotify(NotifyMsg notifyMsg) {SocketMessage socketMessage = SocketMessage.buildNotifyMessage(messageDataAssembler, notifyMsg);cusNettyServer.sendSystemMessage(socketMessage, notifyMsg.getTo());}
下图是我们实现的一个前端效果图:
需要完善的地方
该方案目前是我们单机部署的方案,集群下还需要扩展,包括:
- 在线用户同步的问题:群组新消息处理如何实时同步到所有NettyServer节点连接下的客户端
- 通知消息处理问题:也是实时同步的问题,要考虑到所有NettyServer节点连接下的客户端