关于ASP.NET Core WebSocket实现集群的思考

前言

    提到WebSocket相信大家都听说过,它的初衷是为了解决客户端浏览器与服务端进行双向通信,是在单个TCP连接上进行全双工通讯的协议。在没有WebSocket之前只能通过浏览器到服务端的请求应答模式比如轮询,来实现服务端的变更响应到客户端,现在服务端也可以主动发送数据到客户端浏览器。WebSocket协议和Http协议平行,都属于TCP/IP四层模型中的第四层应用层。由于WebSocket握手阶段采用HTTP协议,所以也需要进行跨域处理。它的协议标识是wswss对应了常规标识和安全通信协议标识。本文重点并不是介绍WebSocket协议相关,而是提供一种基于ASP.NET Core原生WebSocket的方式实现集群的实现思路。关于这套思路其实很早之前我就构思过了,只是之前一直没有系统的整理出来,本篇文章就来和大家分享一下,由于主要是提供一种思路,所以涉及到具体细节或者业务相关的可能没有体现出来,还望大家理解。

实现

咱们的重点关键字就是两个WebSocket集群,实现的框架便是基于ASP.NET Core,我也基于golang实现了一套,本文涉及到的相关源码和golang版本的实现都已上传至我的github,具体仓库地址可以转到文末自行跳转到#示例源码中查看。既然涉及到集群,这里咱们就用nginx作为反向代理,来搭建一个集群实例。大致的示例结构如下图所示`

redis在这里扮演的角色呢,是用来处理Server端的消息相互传递用的,主要是使用的redis的pub/sub`功能来实现的,这里便涉及到几个核心问题

  • 首先,集群状态每个用户被分发到具体的哪台服务器上是不得而知的
  • 其次,处在不同Server端的不同用户间的相互通信是需要一个传递媒介
  • 最后,针对不同的场景比如单发消息、分组消息、全部通知等要有不同的处理策略

这里需要考虑的是,如果需要搭建实时通信服务器的话,需要注意集群的隔离性,主要是和核心业务进行隔离,毕竟WebSocket需要保持长链接、且消息的大小需要评估。

上面提到了redis的主要功能就是用来传递消息用的,毕竟每个server服务器是无状态的。这当然不是必须的,任何可以进行消息分发的中间件都可以,比如消息队列rabbitmq、kafka、rocketmq、mqtt等,甚至只要能把要处理的消息存储起来都可以比如缓存甚至是关系型数据库等等。这压力使用redis主要是因为操作起来简单、轻量级、灵活,让大家关注点在思路上,而不是使用中案件的代码上。

nginx配置

通过上面的图我们可以看到,我们这里构建集群示例使用的nginx,如果让nginx支持WebSocket的话,需要额外的配置,这个在网上有很多相关的文章介绍,这里就来列一下咱们示例的nginx配置,在配置文件nginx.conf

//上游服务器地址也就是websocket服务的真实地址
upstream wsbackend {server 127.0.0.1:5001;server 127.0.0.1:5678;
}server {listen       5000;server_name  localhost;location ~/chat/{//upstream地址proxy_pass http://wsbackend;proxy_connect_timeout 60s; proxy_read_timeout 3600s;proxy_send_timeout 3600s;//记得转发避免踩坑proxy_set_header Host $host;proxy_http_version 1.1; //http升级成websocket协议的头标识proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "Upgrade";}
}

这套配置呢,在搜索引擎上能收到很多,不过不妨碍我把使用的粘贴出来。这一套亲测有效,也是我使用的配置,请放心使用。个人认为如果是线上环境采用的负载均衡策略可以选择ip_hash的方式,保证同一个ip的客户端用户可以分发到一台WebSocket实例中去,这样的话能尽量避免使用redis的用户频道做消息传递。好了,接下来准备开始展示具体实现的代码了。

一对一发送

首先介绍的就是一对一发送的情况,也就是我把消息发给你,聊天的时候私聊的情况。这里呢涉及到两种情况

  • 如果你需要通信的客户端和你连接在一个Server端里,这样的话可以直接在链接里找到这个端的通信实例直接发送。
  • 如果你需要通信的客户端和你不在一个Server端里,这个时候咱们就需要借助redis的pub/sub的功能,把消息传递给另一个Server端。

咱们通过一张图大致的展示一下它的工作方式

解释一下,每个客户端注册到WebSocket服务里的时候会在redis里订阅一个user:用户唯一标识的频道,这个频道用于接收和当前WebSocket连接不在一个服务端的其他WebSocket发送过来的消息。每次发送消息的时候你会知道你要发送给谁,不在当前服务器的话则发送到redis的user:用户唯一标识频道,这样的话目标WebSocket就能收到消息了。首先是注入相关的依赖项,这里我使用的redis客户端是freeredis,主要是因为操作起来简单,具体实现代码如下

var builder = WebApplication.CreateBuilder(args);
//注册freeredis
builder.Services.AddSingleton(provider => {var logger = provider.GetService<ILogger<WebSocketChannelHandler>>();RedisClient cli = new RedisClient("127.0.0.1:6379");cli.Notice += (s, e) => logger?.LogInformation(e.Log);return cli;
});
//注册WebSocket具体操作的类
builder.Services.AddSingleton<WebSocketHandler>();
builder.Services.AddControllers();var app = builder.Build();var webSocketOptions = new WebSocketOptions
{KeepAliveInterval = TimeSpan.FromMinutes(2)
};
//注册WebSocket中间件
app.UseWebSockets(webSocketOptions);app.MapGet("/", () => "Hello World!");
app.MapControllers();app.Run();

接下来我们定义一个Controller用来处理WebSocket请求

public class WebSocketController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketHandler _socketHandler;public WebSocketController(ILogger<WebSocketController> logger, WebSocketHandler socketHandler, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_socketHandler = socketHandler;}//这里的id代表当前连接的客户端唯一标识比如用户唯一标识[HttpGet("/chat/user/{id}")]public async Task ChatUser(string id){//判断是否是WebSocket请求if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//处理请求相关await _socketHandler.Handle(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

这里的WebSocketHandler是用来处理具体逻辑用的,咱们看一下相关代码

public class WebSocketHandler:IDisposable
{//存储当前服务用户的集合private readonly UserConnection UserConnection = new();//redis频道前缀private readonly string userPrefix = "user:";//用户对应的redis频道private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly ILogger<WebSocketHandler> _logger;//redis客户端private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task Handle(string id, WebSocket webSocket){//把当前用户连接存储起来_ = UserConnection.GetOrAdd(id, webSocket);//订阅一个当前用户的频道await SubMsg($"{userPrefix}{id}");var buffer = new byte[1024 * 4];//接收发送过来的消息,这个方法是阻塞的,如果没收到消息则一直阻塞var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//循环接收消息while (webSocket.State == WebSocketState.Open){try{//因为缓冲区长度是固定的所以要获取实际长度string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//接收的到消息转换成实体MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(msg);//发送到其他客户端的数据byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标客户端是否在当前当前服务,如果在当前服务直接扎到目标连接直接发送if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果要发送的目标端不在当前服务,则发送给目标redis端的频道ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };//目标的redis频道_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}//继续阻塞循环接收消息receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//循环结束意味着当前端已经退出//从当前用户的集合移除当前用户_ = UserConnection.TryRemove(id, out _);//关闭当前WebSocket连接await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在当前订阅集合移除当前用户_disposables.TryRemove($"{userPrefix}{id}", out var disposable);//关闭当前用户的通道disposable.Dispose();}private async Task SubMsg(string channel){//订阅当前用户频道var sub = _redisClient.Subscribe(channel,  async (channel, data) => {//接收过来当前频道数据,说明发送端不在当前服务ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");//在当前服务找到目标的WebSocket连接并发送消息if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把redis订阅频道添加到集合中_disposables.TryAdd(channel, sub);}//程序退出的时候取消当前服务订阅的redis频道public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到几个辅助相关的类,其中UserConnection类是存储注册到当前服务的连接,MsgBody类用来接受客户端发送过来的消息,ChannelMsgBody是用来发送redis频道的相关消息,因为要把相关消息通过redis发布出去,咱们列一下这几个类的相关代码

//注册到当前服务的连接
public class UserConnection : IEnumerable<KeyValuePair<string, WebSocket>>
{//存储用户唯一标识和WebSocket的对应关系private ConcurrentDictionary<string, WebSocket> _users = new ConcurrentDictionary<string, WebSocket>();//当前服务的用户数量public int Count => _users.Count;public WebSocket GetOrAdd(string userId, WebSocket webSocket){return _users.GetOrAdd(userId, webSocket);}public bool TryGetValue(string userId, out WebSocket webSocket){return _users.TryGetValue(userId, out webSocket);}public bool TryRemove(string userId, out WebSocket webSocket){return _users.TryRemove(userId, out webSocket);}public void Clear(){_users.Clear();}public IEnumerator<KeyValuePair<string, WebSocket>> GetEnumerator(){return _users.GetEnumerator();}IEnumerator IEnumerable.GetEnumerator(){return this.GetEnumerator();}
}//客户端消息
public class MsgBody
{//目标用户标识public string Id { get; set; }//要发送的消息public string Msg { get; set; }
}//频道订阅消息
public class ChannelMsgBody
{//用户标识public string FromId { get; set; }//目标用户标识,也就是要发送给谁public string ToId { get; set; }//要发送的消息public string Msg { get; set; }
}

这样的话关于一对一发送消息的相关逻辑就实现完成了,启动两个Server端,由于nginx默认的负载均衡策略是轮询,所以注册两个用户的话会被分发到不同的服务里去

Postman连接三个连接唯一标识分别是1、2、3,模拟一下消息发送,效果如下,发送效果

接收效果

群组发送

上面我们展示了一对一发送的情况,接下来我们来看一下,群组发送的情况。群组发送的话就是只要大家都加入一个群组,只要客户端在群组里发送一条消息,则注册到当前群组内的所有客户端都可以收到消息。相对于一对一的情况就是如果当前WebSocket服务端如果存在用户加入某个群组,则当前当前WebSocket服务端则可以订阅一个group:群组唯一标识的redis频道,集群中的其他WebSocket服务器通过这个redis频道接收群组消息,通过一张图描述一下
群组的实现方式相对于一对一要简单一点

  • 发送端可以不用考虑当前服务中的客户端连接,一股脑的交给redis把消息发布出去
  • 如果有WebSocket服务中的用户订阅了当前分组则可以接受消息,获取组内的用户循环发送消息

展示一下代码实现的方式,首先是定义一个action用于表示群组的相关场景

//包含两个标识一个是组别标识一个是注册到组别的用户
[HttpGet("/chat/group/{groupId}/{userId}")]
public async Task ChatGroup(string groupId, string userId)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"group:{groupId} user:{userId}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();//调用HandleGroup处理群组相关的消息await _socketHandler.HandleGroup(groupId, userId, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

接下来看一下HandleGroup的相关逻辑,还是在WebSocketHandler类中,看一下代码实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection UserConnection = new();private readonly GroupUser GroupUser = new();private readonly SemaphoreSlim _lock = new(1, 1);private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string groupPrefix = "group:";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleGroup(string groupId, string userId, WebSocket webSocket){//因为群组的集合可能会存在很多用户一起访问所以限制访问数量await _lock.WaitAsync();//初始化群组容器 群唯一标识为key 群员容器为valuevar currentGroup = GroupUser.Groups.GetOrAdd(groupId, new UserConnection { });//当前用户加入当前群组_ = currentGroup.GetOrAdd(userId, webSocket);//只有有当前WebSocket服务的第一个加入当前组的时候才去订阅群组频道//如果不限制的话则会出现如果当前WebSocket服务有多个用户在一个组内则会重复收到redis消息if (currentGroup.Count == 1){//订阅redis频道await SubGroupMsg($"{groupPrefix}{groupId}");}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收WebSocket消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);//服务不退出的话则一直等待接收while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msg}");//组装redis频道发布的消息,目标为群组标识ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msg };//通过redis发布消息_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//如果客户端退出则在当前群组集合删除当前用户_ = currentGroup.TryRemove(userId, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubGroupMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//在当前WebSocket服务器找到当前群组里的用户GroupUser.Groups.TryGetValue(msgBody.ToId, out var currentGroup);//循环当前WebSocket服务器里的用户发送消息foreach (var user in currentGroup){//不用给自己发送了if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});//把当前频道加入订阅集合_disposables.TryAdd(channel, sub);}
}

这里涉及到了GroupUser类,是来存储群组和群组用户的对应关系的,定义如下

public class GroupUser
{//key为群组的唯一标识public ConcurrentDictionary<string, UserConnection> Groups = new ConcurrentDictionary<string, UserConnection>();
}

演示一下把两个用户添加到一个群组内,然后发送接收消息的场景,用户u1发送

用户u2接收

发送所有人

发送给所有用户的逻辑比较简单,不用考虑到用户限制,只要用户连接到了WebSocket集群则都可以接收到这个消息,大致工作方式如下图所示

这个比较简单,咱们直接看实现代码,首先是定义一个地址,用于发布消息

//把用户注册进去
[HttpGet("/chat/all/{id}")]
public async Task ChatAll(string id)
{if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"all user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _socketHandler.HandleAll(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}
}

具体的实现逻辑还是在HandleGroup类里,是HandleAll方法,看一下具体实现

public class WebSocketHandler:IDisposable
{private readonly UserConnection AllConnection = new();private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleAll(string id, WebSocket webSocket){await _lock.WaitAsync();//把用户加入用户集合_ = AllConnection.GetOrAdd(id, webSocket);//WebSocket集群中的每个服务只定义一次if (AllConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//阻塞接收信息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');_logger.LogInformation($"user {id} send:{msg}");//获取接收信息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msg };//把消息通过redis发布到集群中的其他服务_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}//用户退出则删除集合中的当前用户信息_ = AllConnection.TryRemove(id, out _);await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);}private async Task SubAllMsg(string channel){var sub = _redisClient.Subscribe(channel, async (channel, data) => {ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//接收到消息后遍历用户集合把消息发送给所有用户foreach (var user in AllConnection){   //如果包含当前用户跳过if (user.Key == msgBody.FromId){continue;}if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}}});_disposables.TryAdd(channel, sub);}
}

效果在这里就不展示了,和群组的效果是类似的,只是一个是部分用户,一个是全部的用户。

整合到一起

上面我们分别展示了一对一、群组、所有人的场景,但是实际使用的时候,每个用户只需要注册到WebSocket集群一次也就是保持一个连接即可,而不是一对一一个连接、注册群组一个连接、所有消息的时候一个连接。所以我们需要把上面的演示整合一下,一个用户只需要连接到WebSocket集群一次即可,至于发送给谁,加入什么群组,接收全部消息等都是连接后通过一些标识区分的,而不必每个类型的操作都注册一次,就和微信和QQ一样我只要登录了即可,至于其他操作都是靠数据标识区分的。接下来咱们就整合一下代码达到这个效果,大致的思路是

  • 用户连接到WebSocket集群,把用户和连接保存到当前WebSocket服务器的用户集合中去。
  • 一对一发送的时候,只需要在具体的服务器中找到具体的客户端发送消息
  • 群组的时候,先把当前用户标识加入群组集合即可,接收消息的时候根据群组集合里的用户标识去用户集合里去拿具体的WebSocket连接发送消息
  • 全员消息的时候,直接遍历集群中的每个WebSocket服务里的用户集合里的WebSocket连接训话发送消息

这样的话就保证了每个客户端用户在集群中只会绑定一个连接,首先还是单独定义一个action,用于让客户端用户连接上来,具体实现代码如下所示

public class WebSocketChannelController : ControllerBase
{private readonly ILogger<WebSocketController> _logger;private readonly WebSocketChannelHandler _webSocketChannelHandler;public WebSocketChannelController(ILogger<WebSocketController> logger, WebSocketChannelHandler webSocketChannelHandler){_logger = logger;_webSocketChannelHandler = webSocketChannelHandler;}//只需要把当前用户连接到服务即可[HttpGet("/chat/channel/{id}")]public async Task Channel(string id){if (HttpContext.WebSockets.IsWebSocketRequest){_logger.LogInformation($"user:{id}-{Request.HttpContext.Connection.RemoteIpAddress}:{Request.HttpContext.Connection.RemotePort} join");var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();await _webSocketChannelHandler.HandleChannel(id, webSocket);}else{HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;}}
}

接下来看一下WebSocketChannelHandler类的HandleChannel方法实现,用于处理不同的消息,比如一对一、群组、全员消息等不同类型的消息

public class WebSocketChannelHandler : IDisposable
{//用于存储当前WebSocket服务器链接上来的所有用户对应关系private readonly UserConnection UserConnection = new();//用于存储群组和用户关系,用户集合采用HashSet保证每个用户只加入一个群组一次private readonly ConcurrentDictionary<string, HashSet<string>> GroupUser = new ConcurrentDictionary<string, HashSet<string>>();private readonly SemaphoreSlim _lock = new(1, 1);//存放redis订阅实例private readonly ConcurrentDictionary<string, IDisposable> _disposables = new();//一对一redis频道前缀private readonly string userPrefix = "user:";//群组redis频道前缀private readonly string groupPrefix = "group:";//全员redis频道private readonly string all = "all";private readonly ILogger<WebSocketHandler> _logger;private readonly RedisClient _redisClient;public WebSocketChannelHandler(ILogger<WebSocketHandler> logger, RedisClient redisClient){_logger = logger;_redisClient = redisClient;}public async Task HandleChannel(string id, WebSocket webSocket){await _lock.WaitAsync();//每次连接进来就添加到用户集合_ = UserConnection.GetOrAdd(id, webSocket);//每个WebSocket服务实例只需要订阅一次全员消息频道await SubMsg($"{userPrefix}{id}");if (UserConnection.Count == 1){await SubAllMsg(all);}_lock.Release();var buffer = new byte[1024 * 4];//接收客户端消息var receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);while (webSocket.State == WebSocketState.Open){try{string msg = Encoding.UTF8.GetString(buffer[..receiveResult.Count]).TrimEnd('\0');//读取客户端消息ChannelData channelData = JsonConvert.DeserializeObject<ChannelData>(msg);//判断消息类型switch (channelData.Method){//一对一case "One":await HandleOne(id, channelData.MsgBody, receiveResult);break;//把用户加入群组case "UserGroup":await AddUserGroup(id, channelData.Group, webSocket);break;//处理群组消息case "Group":await HandleGroup(channelData.Group, id, webSocket, channelData.MsgBody);break;//处理全员消息default:await HandleAll(id, channelData.MsgBody);break;}receiveResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);}catch (Exception ex){_logger.LogError(ex, ex.Message);break;}}await webSocket.CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription, CancellationToken.None);//在群组中移除当前用户foreach (var users in GroupUser.Values){lock (users){users.Remove(id);}}//当前客户端用户退出则移除连接_ = UserConnection.TryRemove(id, out _);//取消用户频道订阅_disposables.Remove($"{userPrefix}{id}", out var sub);sub?.Dispose();}public void Dispose(){foreach (var disposable in _disposables){disposable.Value.Dispose();}_disposables.Clear();}
}

这里涉及到了ChannelData类是用于接收客户端消息的类模板,具体定义如下

public class ChannelData
{//消息类型 比如一对一 群组 全员public string Method { get; set; }//群组标识public string Group { get; set; }//消息体public object MsgBody { get; set; }
}

类中并不会包含当前用户信息,因为连接到当前服务的时候已经提供了客户端唯一标识。结合上面的处理代码我们可以看出,客户端用户连接到WebSocket实例之后,先注册当前用户的redis订阅频道并且当前实例仅注册一次全员消息的redis频道,用于处理非当前实例注册客户端的一对一消息处理和全员消息处理,然后等待接收客户端消息,根据客户端消息的消息类型来判断是进行一对一、群组、或者全员的消息类型处理,它的工作流程入下图所示
由代码和上面的流程图可知,它根据不同的标识去处理不同类型的消息,接下来我们可以看下每种消息类型的处理方式。

一对一处理

首先是一对一的消息处理情况,看一下具体的处理逻辑,首先是一对一发布消息

 private async Task HandleOne(string id, object msg, WebSocketReceiveResult receiveResult){MsgBody msgBody = JsonConvert.DeserializeObject<MsgBody>(JsonConvert.SerializeObject(msg));byte[] sendByte = Encoding.UTF8.GetBytes($"user {id} send:{msgBody.Msg}");_logger.LogInformation($"user {id} send:{msgBody.Msg}");//判断目标用户是否在当前WebSocket服务器if (UserConnection.TryGetValue(msgBody.Id, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), receiveResult.MessageType, true, CancellationToken.None);}}else{//如果不在当前服务器,则直接把消息发布到具体的用户频道去,由具体用户去订阅ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, ToId = msgBody.Id, Msg = msgBody.Msg };_redisClient.Publish($"{userPrefix}{msgBody.Id}", JsonConvert.SerializeObject(channelMsgBody));}
}

接下来是用于处理订阅其他用户发送过来消息的逻辑,这个和整合之前的逻辑是一致的,在当前服务器中找到用户对应的连接,发送消息

private async Task SubMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user {msgBody.FromId} send:{msgBody.Msg}");if (UserConnection.TryGetValue(msgBody.ToId, out var targetSocket)){if (targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(msgBody.FromId, out _);}}});//把订阅实例加入集合_disposables.TryAdd(channel, sub);
}

如果给某个用户发送消息则可以使用如下的消息格式

{"Method":"One", "MsgBody":{"Id":"2","Msg":"Hello"}}

Method为One代表着是私聊一对一的情况,消息体内Id为要发送给的具体用户标识和消息体。

群组处理

接下来看群组处理方式,这个和之前的逻辑是有出入的,首先是用户要先加入到某个群组然后才能接收群组消息或者在群组中发送消息,之前是一个用户对应多个连接,整合了之后集群中每个用户只关联唯一的一个WebSocket连接,首先看用户加入群组的逻辑

private async Task AddUserGroup(string user, string group, WebSocket webSocket)
{//获取群组信息var currentGroup = GroupUser.GetOrAdd(group, new HashSet<string>());lock (currentGroup){//把用户标识加入当前组_ = currentGroup.Add(user);}//每个组的redis频道,在每台WebSocket服务器实例只注册一次订阅if (currentGroup.Count == 1){//订阅当前组消息await SubGroupMsg($"{groupPrefix}{group}");}string addMsg = $"user 【{user}】 add  to group 【{group}】";byte[] sendByte = Encoding.UTF8.GetBytes(addMsg);await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);//如果有用户加入群组,则通知其他群成员ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = user, ToId = group, Msg = addMsg };_redisClient.Publish($"{groupPrefix}{group}", JsonConvert.SerializeObject(channelMsgBody));
}

用户想要在群组内发消息,则必须先加入到一个具体的群组内,具体的加入群组的格式如下

{"Method":"UserGroup", "Group":"g1"}

Method为UserGroup代表着用户加入群组的业务类型,Group代表着你要加入的群组唯一标识。接下来就看下,用户发送群组消息的逻辑了

private async Task HandleGroup(string groupId, string userId, WebSocket webSocket, object msgBody)
{//判断群组是否存在var hasValue = GroupUser.TryGetValue(groupId, out var users);if (!hasValue){byte[] sendByte = Encoding.UTF8.GetBytes($"group【{groupId}】 not exists");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}//只有加入到当前群组,才能在群组内发送消息if (!users.Contains(userId)){byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{userId}】 not in 【{groupId}】");await webSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);return;}_logger.LogInformation($"group 【{groupId}】 user 【{userId}】 send:{msgBody}");//发送群组消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = userId, ToId = groupId, Msg = msgBody.ToString() };_redisClient.Publish($"{groupPrefix}{groupId}", JsonConvert.SerializeObject(channelMsgBody));
}

加入群组之后则可以发送和接收群组内的消息了,给群组发送消息的格式如下

{"Method":"Group", "Group":"g1", "MsgBody":"Hi All"}

Method为Group代表着用户加入群组的业务类型,Group则代表你要发送到具体的群组的唯一标识,MsgBody则是发送到群组内的消息。最后再来看下订阅群组内消息的情况,也就是处理群组消息的逻辑

private async Task SubGroupMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{//接收群组订阅消息ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"group 【{msgBody.ToId}】 user 【{msgBody.FromId}】 send:{msgBody.Msg}");//获取当前服务器实例中当前群组的所有用户连接GroupUser.TryGetValue(msgBody.ToId, out var currentGroup);foreach (var user in currentGroup){if (user == msgBody.FromId){continue;}//通过群组内的用户标识去用户集合获取用户集合里的用户唯一连接发送消息if (UserConnection.TryGetValue(user, out var targetSocket) && targetSocket.State == WebSocketState.Open){await targetSocket.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{currentGroup.Remove(user);}}});_disposables.TryAdd(channel, sub);
}
全员消息处理

全员消息处理相对来说思路比较简单,因为当服务启动的时候就会监听redis的全员消息频道,这样的话具体的实现也就只包含发送和接收全员消息了,首先看一下全员消息发送的逻辑

private async Task HandleAll(string id, object msgBody)
{_logger.LogInformation($"user {id} send:{msgBody}");//直接给redis的全员频道发送消息ChannelMsgBody channelMsgBody = new ChannelMsgBody { FromId = id, Msg = msgBody.ToString() };_redisClient.Publish(all, JsonConvert.SerializeObject(channelMsgBody));
}

全员消息的发送数据格式如下所示

{"Method":"All", "MsgBody":"Hello All"}

Method为All代表着全员消息类型,MsgBody则代表着具体消息。接收消息出里同样很简单,订阅redis全员消息频道,然后遍历当前WebSocket服务器实例内的所有用户获取连接发送消息,具体逻辑如下

private async Task SubAllMsg(string channel)
{var sub = _redisClient.Subscribe(channel, async (channel, data) =>{ChannelMsgBody msgBody = JsonConvert.DeserializeObject<ChannelMsgBody>(data.ToString());byte[] sendByte = Encoding.UTF8.GetBytes($"user 【{msgBody.FromId}】 send all:{msgBody.Msg}");//获取当前服务器实例内所有用户的连接foreach (var user in UserConnection){//不给自己发送消息,因为发送的时候可以通过具体的业务代码处理if (user.Key == msgBody.FromId){continue;}//给每个用户发送消息if (user.Value.State == WebSocketState.Open){await user.Value.SendAsync(new ArraySegment<byte>(sendByte, 0, sendByte.Length), WebSocketMessageType.Text, true, CancellationToken.None);}else{_ = UserConnection.TryRemove(user.Key, out _);}}});_disposables.TryAdd(channel, sub);
}

示例源码

由于篇幅有限,没办法设计到全部的相关源码,因此在这里贴出来github相关的地址,方便大家查看和运行源码。相关的源码我这里实现了两个版本,一个是基于asp.net core的版本,一个是基于golang的版本。两份源码的实现思路是一致的,所以这两份代码可以运行在一套集群示例里,配置在一套nginx里,并且连接到同一个redis实例里即可

  • asp.net core源码示例 https://github.com/softlgl/WebsocketCluster
  • golang源码示例 https://github.com/softlgl/websocket-cluster

仓库里还涉及到本人闲暇之余开源的其他仓库,由于本人能力有限难登大雅之堂,就不做广告了,有兴趣的同学可以自行浏览一下。

总结

    本文基于ASP.NET Core框架提供了一个基于WebSocket做集群的示例,由于思想是通用的,所以基于这个思路楼主也实现了golang版本。其实在之前就想自己动手搞一搞关于WebSocket集群方面的设计,本篇文章算是对之前想法的一个落地操作。其核心思路文章已经做了相关介绍,由于这些只是博主关于构思的实现,可能有很多细节尚未体现到,还希望大家多多理解。其核心思路总结一下

  • 首先是,利用可以构建WebSocket服务的框架,在当前服务实例中保存当前客户端用户和WebSocket的连接关系
  • 如果消息的目标客户端不在当前服务器,可以利用redis频道、消息队列相关、甚至是数据库类的共享回话发送的消息,由目标服务器获取目标是否属于自己的ws会话
  • 本文设计的思路使用的是无状态的方式,即WebSocket服务实例之间不存在直接的消息通信和相互的服务地址存储,当然也可以利用redis等存储在线用户信息等,这个可以参考具体业务自行设计

读万卷书,行万里路。在这个时刻都在变化点的环境里,唯有不断的进化自己,多接触多尝试不用的事物,多扩展自己的认知思维,方能构建自己的底层逻辑。毕竟越底层越抽象,越通用越抽象。面对未知的挑战,自身作为自己坚强的后盾,可能才会让自己更踏实。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/304320.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

潍微科技-水务信息管理平台 ChangePwd SQL注入漏洞复现

0x01 产品简介 水务信息管理平台主要帮助水务企业实现水质状态监测、管网运行监控、水厂安全保障、用水实时监控以及排放有效监管,确保居民安全稳定用水、环境有效保护,全面提升水务管理效率。由山东潍微科技股份有限公司研发,近年来,公司全力拓展提升水务、水利信息化业务…

机器学习实训 Day1

线性回归练习 Day1 手搓线性回归 随机初始数据 import numpy as np x np.array([56, 72, 69, 88, 102, 86, 76, 79, 94, 74]) y np.array([92, 102, 86, 110, 130, 99, 96, 102, 105, 92])from matplotlib import pyplot as plt # 内嵌显示 %matplotlib inlineplt.scatter…

专为苹果系统设计的精美可视化图表 | 开源日报 No.219

danielgindi/Charts Stars: 27.3k License: Apache-2.0 Charts 是为 iOS/tvOS/OSX 提供美观图表的开源项目&#xff0c;是跨平台 MPAndroidChart 在苹果设备上的实现。该项目提供了以下主要功能和优势&#xff1a; 支持 iOS、tvOS 和 macOS 平台使用 Swift 编写&#xff0c;可…

Harmony鸿蒙南向驱动开发-PWM

PWM&#xff08;Pulse Width Modulation&#xff09;即脉冲宽度调制&#xff0c;是一种对模拟信号电平进行数字编码并将其转换为脉冲的技术&#xff0c;广泛应用在从测量、通信到功率控制与变换的许多领域中。通常情况下&#xff0c;在使用马达控制、背光亮度调节时会用到PWM模…

数据降维方法-主成分分析(PCA)

目录 一、前言 二、向量的表示及基变换 三、基变换 四、协方差矩阵 五、协方差 六、优化目标 一、前言 主成分分析(Principal Component Analysis) 用途&#xff1a;降维中的常用手段 目标&#xff1a;提取最有价值的信息&#xff08;基于方差&#xff09; 问题&#x…

Acwing.4009 收集卡牌(期望dp)

题目 小林在玩一个抽卡游戏&#xff0c;其中有 n种不同的卡牌&#xff0c;编号为 1到 n。 每一次抽卡&#xff0c;她获得第 i种卡牌的概率为 pi。 如果这张卡牌之前已经获得过了&#xff0c;就会转化为一枚硬币。 可以用 k枚硬币交换一张没有获得过的卡。 小林会一直抽卡&…

最祥解决python 将Dataframe格式数据上传数据库所碰到的问题

碰到的问题 上传Datafrane格式的数据到数据库 会碰见很多错误 举几个很普遍遇到的问题(主要以SqlServer举例) 这里解释下 将截断字符串或二进制数据 这个是字符长度超过数据库设置的长度 然后还有字符转int失败 或者字符串转换日期/或时间失败 这个是碰到的需要解决的最多的问…

网站HTTP升级成为HTTPS的方法

将网站从HTTP免费升级为HTTPS&#xff0c;您可以按照以下步骤操作&#xff1a; 1. 选择证书颁发机构&#xff08;CA&#xff09;&#xff1a; - 为了免费升级&#xff0c;您可以选择使用JoySSL这样的公益项目。JoySSL提供免费、自动化的SSL/TLS证书颁发服务&#xff0c;适用于各…

三、Mat、Bitmap和Image数据类型之间的转换(OpenCvSharp)

在OpenCV中可以通过ImRead方法读取照片&#xff0c;通过ImShow方法显示照片&#xff1b;但是无法在PictureBox控件中显示 PictureBox控件只能展示Bitmap和Image数据类型图片 为此查阅了网上很多篇博文&#xff0c;将三种数据类型之间的转换进行了归纳整理&#xff0c;感谢网上…

【Qt】:对话框(一)

对话框 一.基本的对话框二.自定义对话框三.通过图形化界面自定义对话框四.关于对话框mode 对话框是GUI程序中不可或缺的组成部分。一些不适合在主窗口实现的功能组件可以设置在对话框中。对话框通常是一个顶层窗口&#xff0c;出现在程序最上层&#xff0c;用于实现短期任务或者…

【mT5多语言翻译】之一——实战项目总览

[1] 总览 【mT5多语言翻译】系列共六篇文章&#xff1a; 【mT5多语言翻译】之一——实战项目总览   【mT5多语言翻译】之二——模型&#xff1a;T5模型与mT5模型与前置知识   【mT5多语言翻译】之三——数据集&#xff1a;多语言翻译数据集与预处理   【mT5多语言翻译】之…

cesium 添加动态波纹效果 圆形扩散效果 波纹材质

一、扩展材质 /*** 水波纹扩散材质* param {*} options* param {String} options.color 颜色* param {Number} options.duration 持续时间 毫秒* param {Number} options.count 波浪数量* param {Number} options.gradient 渐变曲率*/function CircleWaveMaterialProperty(opt…

代码随想录--数组--移除元素

题目 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并原地修改输入数组。 元素的顺序可以改变。你不需要考虑数组中超出新长度…

Win11 使用 WSL2 安装 linux 子系统 ubuntu

Win11 使用 WSL2 安装 linux 子系统 ubuntu 段子手168 1、用 部署映像服务和管理工具 dism.exe 命令&#xff0c;开启 WSL2 按【WIN R】&#xff0c;打开【运行】&#xff0c;输入&#xff1a;【cmd】&#xff0c;管理员打开【命令行提示符】。 启用适用于 Linux 的 Windo…

PHP自助建站系统,小白也能自己搭建网站

无需懂代码&#xff0c;用 自助建站 做企业官网就像做PPT一样简单&#xff0c;您可以亲自操刀做想要的效果&#xff01; 自助建站是一款简单、快捷、高效的工具&#xff0c;可以帮助您制作响应式网站。我们的自助建站系统&#xff0c;将传统的编码工作转化为直观的拖拽操作和文…

python 有哪些函数

Python内置的函数及其用法。为了方便记忆&#xff0c;已经有很多开发者将这些内置函数进行了如下分类&#xff1a; 数学运算(7个) 类型转换(24个) 序列操作(8个) 对象操作(7个) 反射操作(8个) 变量操作(2个) 交互操作(2个) 文件操作(1个) 编译执行(4个) 装饰器(3个) …

STM32H7通用定时器计数功能的使用

目录 概述 1 STM32定时器介绍 1.1 认识通用定时器 1.2 通用定时器的特征 1.3 递增计数模式 1.4 时钟选择 2 STM32Cube配置定时器时钟 2.1 配置定时器参数 2.2 配置定时器时钟 3 STM32H7定时器使用 3.1 认识定时器的数据结构 3.2 计数功能实现 4 测试案例 4.1 代码…

3D Matching:实现halcon中的find_surface_model

halcon中的三维匹配大致分为两类&#xff0c;一类是基于形状的(Shape-Based)&#xff0c;一类是基于表面的(Surface-Based)。基于形状的匹配可用于单个2D图像中定位复杂的3D物体&#xff0c;3D物体模型必须是CAD模型&#xff0c;且几何边缘清晰可见&#xff0c;使用的相机也要预…

废品回收 小程序+APP

用户实名认证、回收员实名认证、后台审核、会员管理、回收员管理、订单管理、提现管理、地图、档案管理。 支持&#xff0c;安卓APP、苹果APP、小程序 流程&#xff1a; 一、用户端下单&#xff0c;地图选择上门位置、填写具体位置、废品名称、预估重量、选择是企业废旧、家…

嵌入式ARM版本银河麒麟操作系统V10SP1安装OPenGauss数据库

前言&#xff1a; 官网提供了非常完整的openGauss安装步骤。 https://opengauss.org/zh/download/archive/列举一下个人的使用环境&#xff1a; 麒麟V10 rk3588工控板&#xff08;ARM&#xff09; openGauss-3.0.5&#xff08;极简版&#xff09;浏览一下官网&#xff0c;可以…