目录
一、问题引出
二、架构图
三、实现方式
一、问题引出
在IM分布式系统的构建中遇到的问题:
Netty服务器通过客户端的连接信息来生成对应的Channel(可以理解为长连接的用户信息),Netty服务器通过Channel来进行消息转发。于是,提出初始构想:通过Redis来序列化Channel,再通过Netty服务器去获取Redis上的Channel,最后转发。但这个构思是错误的,因为Channel是硬件的连接信息,并不能被序列化。
最终构思解决Channel共享的方案有两个:
(1)GateWay网关来自定义负载均衡,当接收到Websocket消息时直接根据用户id进行路由,该方式完美兼容原始功能,原始功能采用Netty来开发Websocket,实现难度低,开发成本低。
(2)采用Netty高性能框架开发Websocket,通过MQ消息队列进行广播来实现Channel的共享,实现难度不大,开发成本较高。
二、架构图
最终,我选择第二种解决方案,IM系统架构如下:
三、实现方式
首先,我先搭建一个支持简单聊天的Netty-Websocket聊天服务器,之后,我先构建一个消息聊天对象如下:
package com.dragonwu.im.domain.dto;import com.dragonwu.common.basic.constant.Constants;
import com.dragonwu.common.security.basic.domain.emums.LoginType;
import com.dragonwu.im.domain.enums.FromUserType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.commons.lang.StringUtils;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;/*** @author Dragon Wu* @since 2023/2/27 13:05* 消息对象*/
@Getter
@Setter
@ToString
@Document("im_message") //集合名
// {"msg":"你的消息","loginType":"你的类型","userId":"你的id","to":"接收者","group":"群接受对象","isCustomerService":"是否为客服","isVisitor":"是否为游客",
// "isConnect":"是否为连接信息"}
public class IMessage implements Serializable {@Id //存入mongo里的idprivate String id;//登录类型private String loginType;//用户id@Indexedprivate String userId;//发送时间private LocalDateTime sendTime;//发送人idprivate String to;//群发列表private List<String> group;//发送者是否为客服private Boolean isCustomerService;//发送者是否为游客private Boolean isVisitor;//发送的消息private String msg;//是否为第一次连接信号private Boolean isConnect;@CreatedDate //创建时默认创建该时间字段private LocalDateTime createTime;/* 判断消息格式是否正确 */public boolean isMsgOK() {if (Objects.isNull(isVisitor)) {isVisitor = false;}if (Objects.isNull(to)) {to = Constants.EMPTY_STR;}if (Objects.isNull(isConnect)) {isConnect = false;}try {if ((!StringUtils.isEmpty(userId)) && (!StringUtils.isEmpty(msg))) {return ((!StringUtils.isEmpty(to)) || ((group != null) && (!group.isEmpty())) || isVisitor);}} catch (NullPointerException ignored) {}return false;}//获取发送者的类型public FromUserType getFromUserType() {LoginType exists = LoginType.isExists(loginType);if (Objects.isNull(isVisitor)) {isVisitor = false;}if (Objects.isNull(isCustomerService)) {isCustomerService = false;}if (Objects.isNull(exists) && isVisitor) {return FromUserType.VISITOR_TYPE;} else if ((exists == LoginType.USER_TYPE) && isCustomerService) {return FromUserType.CUSTOMER_SERVICE;} else if (exists == LoginType.USER_TYPE) {return FromUserType.USER_TYPE;} else if (exists == LoginType.CUSTOMER_TYPE) {return FromUserType.CUSTOMER_TYPE;}return null;}public void setNowAsSendTime() {sendTime = LocalDateTime.now();}public boolean isGroupChat() {if (group != null) {return StringUtils.isEmpty(to) && (!group.isEmpty()) && (!isVisitor);}return false;}
}
有了这样的对象以后,我便可对发送过来的消息进行序列化与反序列化获取数据,通过消息对象中的数据是否正确与是否认证来决定消息的转发。
每个用户第一次发送isConnect型号时将其注册到Redis中,Key为用户名唯一,Value为ChannelId的asShort值。当用户在不同Netty服务器上时(此时发送与接收者都在线),我会先让服务器去Redis获取对应用户名的ChannelId,先在本地服务器中查找,若查询到该ChannelId的Channel则直接转发,否则为不在同一个Netty服务器上,发送Channel寻找的信号到MQ进行广播,其他服务器获取到广播后查询直接是否有该ChannelId的Channel,若有则转发;离线消息的话,直接以Zset的形式加入Redis即可,当用户上线时再拉取数据。最后,无论哪种情况发送的消息,都会被MQ进行集群负载均衡来存储到数据库中。
以上,为个人本次实践的总结,希望对遇到相似问题的开发者,有所帮助,有问题可联系共同探讨!