一、开启网络监听
Fantasy.Net\Runtime\Core\Network\Protocol\KCP\Server\KCPServerNetwork.cs:72
public void Initialize(NetworkTarget networkTarget, IPEndPoint address){_startTime = TimeHelper.Now;Settings = KCPSettings.Create(networkTarget);base.Initialize(NetworkType.Server, NetworkProtocolType.KCP, networkTarget);_socket = new Socket(address.AddressFamily, SocketType.Dgram, ProtocolType.Udp);_socket.Blocking = false;_socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, false);if (address.AddressFamily == AddressFamily.InterNetworkV6){_socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.IPv6Only, false);}_socket.Blocking = false;_socket.Bind(address);_socket.SetSocketBufferToOsLimit();_socket.SetSioUdpConnReset();ReadPipeDataAsync().Coroutine();ReceiveSocketAsync().Coroutine();Log.Info($"SceneConfigId = {Scene.SceneConfigId} networkTarget = {networkTarget.ToString()} KCPServer Listen {address}");}
二、收到数据
Fantasy.Net\Runtime\Core\Network\Protocol\TCP\Server\TCPServerNetworkChannel.cs:127
private async FTask ReadPipeDataAsync(){var pipeReader = _pipe.Reader;while (!_cancellationTokenSource.IsCancellationRequested){ReadResult result = default;try{result = await pipeReader.ReadAsync(_cancellationTokenSource.Token);}catch (OperationCanceledException){// 出现这个异常表示取消了_cancellationTokenSource。一般Channel断开会取消。break;}var buffer = result.Buffer;var consumed = buffer.Start;var examined = buffer.End;while (TryReadMessage(ref buffer, out var message)){ReceiveData(ref message);consumed = buffer.Start;}if (result.IsCompleted){break;}pipeReader.AdvanceTo(consumed, examined);}await pipeReader.CompleteAsync();}
三、进入到自己封装的Session
一个Scene中的Socket监听到网络事件
Fantasy.Net\Runtime\Core\Network\Protocol\TCP\Server\TCPServerNetworkChannel.cs:185
private void ReceiveData(ref ReadOnlyMemory<byte> buffer){try{while (_packetParser.UnPack(ref buffer, out var packInfo)){if (_cancellationTokenSource.IsCancellationRequested){return;}Session.Receive(packInfo);}}catch (ScanException e){Log.Warning($"RemoteAddress:{RemoteEndPoint} \n{e}");Dispose();}catch (Exception e){Log.Error($"RemoteAddress:{RemoteEndPoint} \n{e}");Dispose();}}
四、其实就是直接开始处理
Fantasy.Net\Runtime\Core\Network\Session\Session.cs:240
internal void Receive(APackInfo packInfo){if (IsDisposed){return;}LastReceiveTime = TimeHelper.Now;try{NetworkMessageScheduler.Scheduler(this, packInfo);}catch (Exception e){// 如果解析失败,只有一种可能,那就是有人恶意发包。// 所以这里强制关闭了当前连接。不让对方一直发包。Dispose();Log.Error(e);}}
五、处理的时候,其实是不阻塞,这样子能快速接下一个消息
Fantasy.Net\Runtime\Core\Network\Message\Scheduler\OuterMessageScheduler.cs:43
public override void Scheduler(Session session, APackInfo packInfo){HandlerAsync(session, packInfo).Coroutine();}
六、直接进入自己的MessageHandler进行处理
Fantasy.Net\Runtime\Core\Network\Message\Scheduler\OuterMessageScheduler.cs:48
private async FTask HandlerAsync(Session session, APackInfo packInfo){if (session.IsDisposed){return;}switch (packInfo.OpCodeIdStruct.Protocol){case OpCodeType.OuterPingRequest:{// 注意心跳目前只有外网才才会有、内网之间不需要心跳。session.LastReceiveTime = TimeHelper.Now;_pingResponse.Now = session.LastReceiveTime;using (packInfo){session.Send(_pingResponse, packInfo.RpcId);}return;}case OpCodeType.OuterMessage:case OpCodeType.OuterRequest:{var messageType = MessageDispatcherComponent.GetOpCodeType(packInfo.ProtocolCode);try{if (messageType == null){throw new Exception($"可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var message = packInfo.Deserialize(messageType);MessageDispatcherComponent.MessageHandler(session, messageType, message, packInfo.RpcId, packInfo.ProtocolCode);}catch (Exception e){Log.Error($"ANetworkMessageScheduler OuterResponse error messageProtocolCode:{packInfo.ProtocolCode} messageType:{messageType} SessionId {session.Id} IsDispose {session.IsDisposed} {e}");}finally{packInfo.Dispose();}return;}case OpCodeType.OuterResponse:{using (packInfo){var messageType = MessageDispatcherComponent.GetOpCodeType(packInfo.ProtocolCode);if (messageType == null){throw new Exception($"可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}NetworkMessagingComponent.ResponseHandler(packInfo.RpcId, (IResponse)packInfo.Deserialize(messageType));}return;}case OpCodeType.OuterAddressableMessage:{var packInfoPackInfoId = packInfo.PackInfoId;try{var messageType = MessageDispatcherComponent.GetOpCodeType(packInfo.ProtocolCode);if (messageType == null){throw new Exception($"OuterMessageScheduler error 可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var addressableRouteComponent = session.GetComponent<AddressableRouteComponent>();if (addressableRouteComponent == null){throw new Exception("OuterMessageScheduler error session does not have an AddressableRouteComponent component");}await addressableRouteComponent.Send(messageType, packInfo);}finally{if (packInfo.PackInfoId == packInfoPackInfoId){packInfo.Dispose();}}return;}case OpCodeType.OuterAddressableRequest:{var packInfoPackInfoId = packInfo.PackInfoId;try{var messageType = MessageDispatcherComponent.GetOpCodeType(packInfo.ProtocolCode);if (messageType == null){throw new Exception($"OuterMessageScheduler error 可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var addressableRouteComponent = session.GetComponent<AddressableRouteComponent>();if (addressableRouteComponent == null){throw new Exception("OuterMessageScheduler error session does not have an AddressableRouteComponent component");}var rpcId = packInfo.RpcId;var runtimeId = session.RunTimeId;var response = await addressableRouteComponent.Call(messageType, packInfo);// session可能已经断开了,所以这里需要判断if (session.RunTimeId == runtimeId){session.Send(response, rpcId);}}finally{if (packInfo.PackInfoId == packInfoPackInfoId){packInfo.Dispose();}}return;}case OpCodeType.OuterCustomRouteMessage:{var packInfoProtocolCode = packInfo.ProtocolCode;var packInfoPackInfoId = packInfo.PackInfoId;try{if (!MessageDispatcherComponent.GetCustomRouteType(packInfoProtocolCode, out var routeType)){throw new Exception($"OuterMessageScheduler error 可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var messageType = MessageDispatcherComponent.GetOpCodeType(packInfo.ProtocolCode);if (messageType == null){throw new Exception($"OuterMessageScheduler error 可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var routeComponent = session.GetComponent<RouteComponent>();if (routeComponent == null){throw new Exception("OuterMessageScheduler CustomRouteType session does not have an routeComponent component");}if (!routeComponent.TryGetRouteId(routeType, out var routeId)){throw new Exception($"OuterMessageScheduler RouteComponent cannot find RouteId with RouteType {routeType}");}NetworkMessagingComponent.SendInnerRoute(routeId, messageType, packInfo);}finally{if (packInfo.PackInfoId == packInfoPackInfoId){packInfo.Dispose();}}return;}case OpCodeType.OuterCustomRouteRequest:{var packInfoProtocolCode = packInfo.ProtocolCode;var packInfoPackInfoId = packInfo.PackInfoId;try{if (!MessageDispatcherComponent.GetCustomRouteType(packInfoProtocolCode, out var routeType)){throw new Exception($"OuterMessageScheduler error 可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var messageType = MessageDispatcherComponent.GetOpCodeType(packInfo.ProtocolCode);if (messageType == null){throw new Exception($"OuterMessageScheduler error 可能遭受到恶意发包或没有协议定义ProtocolCode ProtocolCode:{packInfo.ProtocolCode}");}var routeComponent = session.GetComponent<RouteComponent>();if (routeComponent == null){throw new Exception("OuterMessageScheduler CustomRouteType session does not have an routeComponent component");}if (!routeComponent.TryGetRouteId(routeType, out var routeId)){throw new Exception($"OuterMessageScheduler RouteComponent cannot find RouteId with RouteType {routeType}");}var rpcId = packInfo.RpcId;var runtimeId = session.RunTimeId;var response = await NetworkMessagingComponent.CallInnerRoute(routeId, messageType, packInfo);// session可能已经断开了,所以这里需要判断if (session.RunTimeId == runtimeId){session.Send(response, rpcId);}}finally{if (packInfo.PackInfoId == packInfoPackInfoId){packInfo.Dispose();}}return;}default:{packInfo.Dispose();throw new NotSupportedException($"OuterMessageScheduler Received unsupported message protocolCode:{packInfo.ProtocolCode}");}}}}
七、进入MessageHandler进行处理
调用的地方: 可以看出是没有await的
MessageDispatcherComponent.MessageHandler(session, messageType, message, packInfo.RpcId, packInfo.ProtocolCode);
Fantasy.Net\Runtime\Core\Network\Message\Dispatcher\MessageDispatcherComponent.cs:287
public void MessageHandler(Session session, Type type, object message, uint rpcId, uint protocolCode){
#if FANTASY_UNITYif(_messageDelegateHandlers.TryGetValue(type,out var messageDelegateHandler)){messageDelegateHandler.Handle(session, message);return;}
#endifif (!_messageHandlers.TryGetValue(type, out var messageHandler)){Log.Warning($"Scene:{session.Scene.Id} Found Unhandled Message: {message.GetType()}");return;}// 调用消息处理器的Handle方法并启动协程执行处理逻辑messageHandler.Handle(session, rpcId, protocolCode, message).Coroutine();}
八、处理完毕后,会进行消息发送
Fantasy.Net\Runtime\Core\Network\Message\Dispatcher\Interface\IMessageHandler.cs:97
public async FTask Handle(Session session, uint rpcId, uint messageTypeCode, object message){if (message is not TRequest request){Log.Error($"消息类型转换错误: {message.GetType().Name} to {typeof(TRequest).Name}");return;}var response = new TResponse();var isReply = false;void Reply(){if (isReply){return;}isReply = true;if (session.IsDisposed){return;}session.Send(response, rpcId);}try{await Run(session, request, response, Reply);}catch (Exception e){Log.Error(e);response.ErrorCode = InnerErrorCode.ErrRpcFail;}finally{Reply();}}