Unity Protobuf+RPC+UniTask

远程过程调用(RPC)协议详解

  • 什么是RPC协议
    • RPC的基本原理
    • RPC的关键组件
    • RPC的优缺点
    • Protobuf
    • 函数绑定
    • Call
    • Encode
    • Recv
    • Decode
    • Socket.Send和Recv
    • 项目地址

什么是RPC协议

远程过程调用(Remote Procedure Call,简称RPC)是一种网络通信协议,允许程序在不同的地址空间(通常在不同的物理计算机上)中调用彼此的方法,好像它们是在本地执行的一样。RPC隐藏了底层的网络通信细节,使开发人员能够像调用本地函数一样简单地调用远程服务。

RPC的基本原理

RPC的工作原理基于客户端-服务器模型,主要包括以下步骤:

1.客户端调用:客户端程序发起对某个远程过程的调用请求。
2.请求打包:调用参数被打包成消息,发送到服务器。
3.服务器解包和执行:服务器接收到消息,解包获取调用参数,执行相应的远程过程。
4.结果打包和返回:执行结果被打包成消息,发送回客户端。
5.客户端接收结果:客户端解包消息,获取调用结果

RPC的关键组件

1.客户端代理:负责将本地调用请求转换为远程调用请求,打包参数,并通过网络发送给服务器。
2.服务器代理:负责接收客户端的请求,解包参数,调用相应的服务方法,并将结果打包返回给客户端。
3.通信协议:定义客户端和服务器之间如何通信,常见的协议有HTTP、TCP等。
4.编解码器:负责参数和结果的序列化和反序列化,常见的格式有JSON、XML、Protobuf等。

RPC的优缺点

优点
1.简化远程调用:使得远程调用像本地调用一样简单,开发人员无需关心底层的网络通信细节。
2.语言无关:大多数RPC框架支持多种编程语言,方便不同语言的系统互操作。

缺点
1.调试困难:由于涉及网络通信,调试远程调用的问题比本地调用更加复杂。
2.可靠性要求高:需要处理网络延迟、丢包、超时等问题,增加了系统的复杂性。
3.耦合性:客户端和服务器需要共同遵循同一套接口定义,一旦接口发生变化,可能需要同时更新多个系统。

Protobuf

protoc.exe 生成C#文件
在这里插入图片描述

Gen.bat

@echo offrem 设置路径变量
set PROTOC_PATH="protoc.exe"
set PROTO_DIR="Protos"
set OUTPUT_DIR="ProtocolCodes"rem 创建日志头
echo .......................proto2C#.......................
echo.rem 检查目录是否存在
if not exist %PROTO_DIR% (echo Error: Protocols directory does not exist.echo Please create the Protocols directory and place your .proto files in it.echo.pauseexit /b
)rem 创建输出目录
if not exist %OUTPUT_DIR% mkdir %OUTPUT_DIR%rem 批量处理 .proto 文件
for %%f in (%PROTO_DIR%\*.proto) do (echo %%f complete%PROTOC_PATH% --proto_path=%PROTO_DIR% --csharp_out=%OUTPUT_DIR% %%f
)echo code generation complete. Press any key to close.
pause > nul

函数绑定

1.使用反射自动获取所有RPC函数, 对其进行Hash绑定

函数的定义 RPCMsgHandles.cs

public sealed class RPCMsgHandles
{private static void ReqMove(int unitId, Move move){}private static void RecvAttack(int skillid, Attack attack, ItemList itemList){LogHelper.Log($"Recv: skillid = {attack.Id}, targetId = {attack.TargetId}, itemList.Count = {itemList.Items.Count}");}private static void RecvDelete(int msg){LogHelper.Log($"Recv: state = {msg}");}private static void RecvReflectMove(Move move){LogHelper.Log($"move reflect sync: x:{move.X}, y:{move.Y}, speed:{move.Speed}, dir:{move.Dir}");}
}

使用反射进行函数绑定 RPCMoudle.cs

public sealed class RPCMoudle
{private static Dictionary<int, IRPC> _msg = new Dictionary<int, IRPC>();public static void Init(){System.Type type = typeof(RPCMsgHandles);MethodInfo[] methods = type.GetMethods(BindingFlags.Static | BindingFlags.NonPublic);foreach (MethodInfo methodInfo in methods){RPC method = new RPC(methodInfo);int index = 0;ParameterInfo[] infos = methodInfo.GetParameters();foreach (var info in infos){if (typeof(IMessage).IsAssignableFrom(info.ParameterType)){IMessage message = Activator.CreateInstance(info.ParameterType) as IMessage;method.AddParamType(DateType.Message);method.AddParam(index, message);}else{DateType dateType = GetDateType(info.ParameterType);method.AddParamType(dateType);}index++;}int hash = Globals.Hash(methodInfo.Name);if (_msg.ContainsKey(hash))throw new Exception("AddParamType rpc _method hash conflict: " + methodInfo.Name);_msg.Add(hash, method);}}
}

2.使用泛型手动进行RPC函数绑定

泛型类进行函数绑定 RPCMoudle.cs

public static void Register<T>(string methodName, Action<T> action) where T : class, IMessage, new()
{int id = Globals.Hash(methodName);RPCStatic<T> method = new RPCStatic<T>();method.Register(action, new T());if (_msg.ContainsKey(id)){LogHelper.LogError($"repeat id, id = {id}");}_msg[id] = method;
}public static void Unregister(string methodName)
{int id = Globals.Hash(methodName);if (_msg.ContainsKey(id)){_msg.Remove(id);}else{LogHelper.LogError($"no find method, id = {id}");}
}

Call

Call的实现, encode数据到byte[],第一个参数必须为远程函数名字, 用于将函数名字的hashid写入数据头中, 这样远程服务器在解析数据的时候会先解析4字节的数据头表示函数的hashid

Call中的Send函数 是Socket发送协议, Send函数中会在数据头中写入数据的长度, 在接收方根据数据的长度接收完整数据 防止粘包

Call函数有多个方法重载, 根据业务需求使用
1.public static void Call(string methodName, IMessage message) 类型安全, 类型固定
2.public static void Call(string id, params object[] args) 类型不安全, 可以传入任何参数, 使用更加方便快捷

具体调用例子
Move move = new Move();
move.X = 10;
move.Y = 20;
move.Speed = 100;
move.Dir = 20;这里使用的是object[] args 类型不安全, 也会有装箱拆箱的开销, 使用这用方式需要前后端统一类型
使用起来简单方便, 业务逻辑开发上使用较为方便
比如请求领取奖励 RPCMoudle.Call("ReqAward", 传入表奖励id);
比如请求保存勾选 RPCMoudle.Call("Save", true);
RPCMoudle.Call("ReqMove", 10016, move);这样是类型安全的, 也不会存在装箱拆箱的开销
更加高效, 战斗场景较为适合
RPCMoudle.Call("ReqMove", move);

Call的实现, 将数据进行Encode转换成二进制

public static void Call(string methodName, IMessage message)
{if (message == null) return;try{int id = Globals.Hash(methodName);int offset = 0;BuffMessage msg = GameFrame.message.GetBuffMessage();BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);offset += sizeof(int);BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);BitConverterHelper.WriteMessage(msg.bytes, ref offset, message);msg.length = offset;Main.Instance.Send(msg);}catch(Exception ex){LogHelper.LogError(ex.ToString());}
}public static void Call(string id, params object[] args)
{try{Profiler.BeginSample("rpc call");int hash = Globals.Hash(id);BuffMessage msg = Encode(hash, args);Main.Instance.Send(msg);Profiler.EndSample();}catch(Exception ex){LogHelper.LogError(ex.ToString());}
}

Encode

Encode函数的实现

private static BuffMessage Encode(int id, params object[] args)
{int offset = 0;BuffMessage msg = GameFrame.message.GetBuffMessage();BitConverter.TryWriteBytes(msg.bytes.AsSpan(offset), id);offset += sizeof(int);BitConverterHelper.WriteString(msg.bytes, ref offset, GameFrame.myRole.Id);foreach (object arg in args){try{System.Type type = arg.GetType();switch (arg){case IMessage:BitConverterHelper.WriteMessage(msg.bytes, ref offset, (IMessage)arg);break;case Int16:BitConverterHelper.WriteInt16(msg.bytes, ref offset, (Int16)arg);break;case Int32:BitConverterHelper.WriteInt32(msg.bytes, ref offset, (Int32)arg);break;case Int64:BitConverterHelper.WriteInt64(msg.bytes, ref offset, (Int64)arg);break;case UInt16:BitConverterHelper.WriteUInt16(msg.bytes, ref offset, (UInt16)arg);break;case UInt32:BitConverterHelper.WriteUInt32(msg.bytes, ref offset, (UInt32)arg);break;case UInt64:BitConverterHelper.WriteUInt64(msg.bytes, ref offset, (UInt64)arg);break;case bool:BitConverterHelper.WriteBool(msg.bytes, ref offset, (bool)arg);break;case Byte:BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);break;case SByte:BitConverterHelper.WriteByte(msg.bytes, ref offset, (byte)arg);break;case Char:BitConverterHelper.WriteChar(msg.bytes, ref offset, (Char)arg);break;case Single:BitConverterHelper.WriteSingle(msg.bytes, ref offset, (Single)arg);break;case Double:BitConverterHelper.WriteDouble(msg.bytes, ref offset, (Double)arg);break;case string:BitConverterHelper.WriteString(msg.bytes, ref offset, (string)arg);break;}}catch(Exception ex){LogHelper.LogError($"id: {id}, " + ex.ToString());msg.Dispose();return msg;}}msg.length = offset;return msg;
}

0GC的TryWriteBytes方案

namespace Game
{public static class BitConverterHelper{private static readonly int BUFFER_SIZE = 1024 * 1024;private static readonly byte[] buffer = new byte[BUFFER_SIZE];private static CodedOutputStream _stream;private static Stopwatch _watch;public static void Init(){CreateStream();_watch = new Stopwatch();_watch.Start();}private static void CreateStream(){if (_stream != null)_stream.Dispose();if (_watch != null){_watch.Stop();LogHelper.LogWarning($"create stream interval time: {_watch.ElapsedMilliseconds / 1000.0f} s");_watch.Restart();}_stream = new CodedOutputStream(buffer);}private static Span<byte> ToByteArray(IMessage message){if (message == null)return new byte[0];int length = message.CalculateSize();if (length == 0)return new byte[0];if (length >= BUFFER_SIZE){throw new Exception($"overflow: message length >= {BUFFER_SIZE}");}if (_stream.Position + length >= BUFFER_SIZE)CreateStream();int position = (int)_stream.Position;message.WriteTo(_stream);return buffer.AsSpan(position, length);}public static void WriteInt16(byte[] buffer, ref int offset, Int16 arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Int16;Check(buffer, offset + sizeof(Int16));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(Int16);}public static void WriteInt32(byte[] buffer, ref int offset, Int32 arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Int32;Check(buffer, offset + sizeof(Int32));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(Int32);}public static void WriteInt64(byte[] buffer, ref int offset, Int64 arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Int64;Check(buffer, offset + sizeof(Int64));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(Int64);}public static void WriteUInt16(byte[] buffer, ref int offset, UInt16 arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.UInt16;Check(buffer, offset + sizeof(UInt16));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(UInt16);}public static void WriteUInt32(byte[] buffer, ref int offset, UInt32 arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.UInt32;Check(buffer, offset + sizeof(UInt32));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(UInt32);}public static void WriteUInt64(byte[] buffer, ref int offset, UInt64 arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.UInt64;Check(buffer, offset + sizeof(UInt64));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(UInt64);}public static void WriteBool(byte[] buffer, ref int offset, bool arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Boolean;Check(buffer, offset + sizeof(bool));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(bool);}public static void WriteByte(byte[] buffer, ref int offset, byte arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Byte;Check(buffer, offset + 1);buffer[offset++] = arg;}public static void WriteChar(byte[] buffer, ref int offset, Char arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Char;Check(buffer, offset + sizeof(Char));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(Char);}public static void WriteSingle(byte[] buffer, ref int offset, Single arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Single;Check(buffer, offset + sizeof(Single));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(Single);}public static void WriteDouble(byte[] buffer, ref int offset, Double arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Double;Check(buffer, offset + sizeof(Double));BitConverter.TryWriteBytes(buffer.AsSpan(offset), arg);offset += sizeof(Double);}public static void WriteString(byte[] buffer, ref int offset, string arg){Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.String;byte[] bytes = Encoding.UTF8.GetBytes(arg);Check(buffer, offset + bytes.Length);BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);offset += sizeof(int);Span<byte> target = new Span<byte>(buffer, offset, buffer.Length - offset);bytes.CopyTo(target);offset += bytes.Length;}public static void WriteMessage(byte[] buffer, ref int offset, IMessage arg){IMessage message = arg;Span<byte> bytes = ToByteArray(message);Check(buffer, offset + 1);buffer[offset++] = (byte)DateType.Message;Check(buffer, offset + bytes.Length);BitConverter.TryWriteBytes(buffer.AsSpan(offset), bytes.Length);offset += sizeof(int);Span<byte> target = new Span<byte>(buffer, offset, bytes.Length);bytes.CopyTo(target);offset += bytes.Length;}private static void Check(byte[] buffer, int offset){if (offset >= buffer.Length)throw new Exception($"date length: {offset} > {Globals.DATA_SZIE}, Invalid data!!");}public static void Dispose(){_stream?.Dispose();_stream = null;}}
}

Recv

Decode 数据解析,调用本地方法


public static void OnRPC(BuffMessage msg)
{if(msg == null){LogHelper.LogError("socket recv error, msg == null");return;}Decode(msg.bytes);
}

Decode

0GC的Decode方案

private static void Decode(byte[] buffer)
{if (buffer == null || buffer.Length < sizeof(int)){LogHelper.LogError("Invalid buffer received");return;}int protoId = BitConverter.ToInt32(buffer, 0);if (!_msg.TryGetValue(protoId, out IRPC method)){LogHelper.LogError($"Method not found for protoId: {protoId}");return;}BuffMessage buffMessage = GameFrame.message.GetBuffMessage();try{Array.Copy(buffer, sizeof(int), buffMessage.bytes, 0, buffer.Length - sizeof(int));method.Decode(buffMessage.bytes);}catch (Exception ex){LogHelper.LogError($"Error invoking method for protoId {protoId}: {ex.Message}");}finally{GameFrame.message.PutBuffMessage(buffMessage);}
}
namespace Game
{public interface IRPC : IDisposable{public void Decode(byte[] buffer);}public abstract class RPCBase : IRPC{protected byte[] buffer;public abstract void Decode(byte[] buffer);protected ReadOnlySpan<byte> ReadData(DateType type, ref int offset){ReadOnlySpan<byte> data = null;int length = GetLength(type);if (length > 0){data = new ReadOnlySpan<byte>(buffer, offset, length);offset += length;}return data;}protected bool ToBoolean(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Boolean, ref offset);return BitConverter.ToBoolean(data);}protected Byte ToByte(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);return data[0];}protected char ToChar(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Char, ref offset);return BitConverter.ToChar(data);}protected Int16 ToInt16(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Int16, ref offset);return BitConverter.ToInt16(data);}protected UInt16 ToUInt16(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.UInt16, ref offset);return BitConverter.ToUInt16(data);}protected Int32 ToInt32(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Int32, ref offset);return BitConverter.ToInt32(data);}protected UInt32 ToUInt32(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.UInt32, ref offset);return BitConverter.ToUInt32(data);}protected Int64 ToInt64(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Int64, ref offset);return BitConverter.ToInt64(data);}protected UInt64 ToUInt64(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.UInt64, ref offset);return BitConverter.ToUInt64(data);}protected Single ToSingle(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Single, ref offset);return BitConverter.ToSingle(data);}protected Double ToDouble(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.Double, ref offset);return BitConverter.ToDouble(data);}protected string ToString(ref int offset){ReadOnlySpan<byte> data = ReadData(DateType.String, ref offset);return DecodeString(ref offset);}protected IMessage ToMessage(ref int offset, IMessage message){ReadOnlySpan<byte> data = ReadData(DateType.Message, ref offset);return DecodeMessage(ref offset, message);}private IMessage DecodeMessage(ref int offset, IMessage message){int length = BitConverter.ToInt32(buffer, offset);offset += sizeof(int);ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);offset += length;return message.Descriptor.Parser.ParseFrom(messageData)!;}private string DecodeString(ref int offset){int length = BitConverter.ToInt32(buffer, offset);offset += sizeof(int);ReadOnlySpan<byte> messageData = new ReadOnlySpan<byte>(buffer, offset, length);offset += length;return Encoding.UTF8.GetString(messageData);}private static int GetLength(DateType type){switch (type){case DateType.Boolean:return sizeof(bool);case DateType.Char:return sizeof(char);case DateType.SByte:case DateType.Byte:return sizeof(byte);case DateType.Int16:return sizeof(Int16);case DateType.UInt16:return sizeof(UInt16);case DateType.Int32:return sizeof(Int32);case DateType.UInt32:return sizeof(UInt32);case DateType.Int64:return sizeof(Int64);case DateType.UInt64:return sizeof(UInt64);case DateType.Single:return sizeof(Single);case DateType.Double:return sizeof(double);}return -1;}public virtual void Dispose(){buffer = null;}}
}

Decode数据到对象列表, 然后Invoke

namespace Game
{public class RPC : RPCBase{private MethodInfo _method;private List<DateType> _types;private List<object> _params;private Dictionary<int, IMessage> _param;private int _paramIndex;public RPC(MethodInfo method){this._method = method;_types = new List<DateType>();_params = new List<object>();_param = new Dictionary<int, IMessage>();}public void AddParamType(DateType type){_types?.Add(type!);}public void AddParam(int index, IMessage message){_param[index] = message;}public override void Decode(byte[] buffer){base.buffer = buffer;_paramIndex = 0;int offset = 0;_params.Clear();foreach (DateType type in _types){DateType dateType = (DateType)buffer[offset++];if (dateType != type){LogHelper.LogError($"dateType bo equals, recv: {Enum.GetName(typeof(DateType), type)} != local: {Enum.GetName(typeof(DateType), dateType)}");}object obj = ToObject(dateType, ref offset);_params.Add(obj!);_paramIndex++;}_method?.Invoke(null, _params.ToArray());}private object ToObject(DateType type, ref int offset){switch (type){case DateType.Message:IMessage message = null;if (!_param!.TryGetValue(_paramIndex, out message)){LogHelper.LogError("no find message");return null;}return ToMessage(ref offset, message);case DateType.Boolean:return ToBoolean(ref offset);case DateType.Char:return ToChar(ref offset);case DateType.SByte:case DateType.Byte:return ToByte(ref offset);case DateType.Int16:return ToInt16(ref offset);case DateType.UInt16:return ToUInt16(ref offset);case DateType.Int32:return ToInt32(ref offset);case DateType.UInt32:return ToUInt32(ref offset);case DateType.Int64:return ToInt64(ref offset);case DateType.UInt64:return ToUInt64(ref offset);case DateType.Single:return ToSingle(ref offset);case DateType.Double:return ToDouble(ref offset);case DateType.String:return ToString(ref offset);default:LogHelper.LogError("no find dateType: " + type);break;}return null;}public override void Dispose(){base.Dispose();_method = null;_types = null;}}
}

泛型Decode,然后Invokde

namespace Game
{public class RPCStatic<T> : RPCBase{private Action<T> _action;private IMessage _message;public RPCStatic(){}public virtual void Register(Action<T> action, IMessage message){this._message = message;this._action = action;}public override void Decode(byte[] buffer){base.buffer = buffer;int offset = 0;DateType dateType = (DateType)buffer[offset++];try{if (dateType == DateType.Message){IMessage arg = ToMessage(ref offset, _message);_action?.Invoke((T)arg);}else{LogHelper.LogError($"invoke error, type != DateType.Message, type = {dateType}");}}catch (Exception ex){LogHelper.LogError(ex.ToString());}}public override void Dispose(){}}
}

Socket.Send和Recv

使用UniTask实现的多线程异步收发消息,处理了超时重发和异常处理,接收消息时的粘包处理

namespace Game
{public enum SocketState{None = 0,Connected = 1,Disconnected = 2,Connecting = 3,ConnectFailed = 4,Close = 5,Dispose = 6,}public class Tcp{private ConcurrentQueue<BuffMessage> _sendMsgs;private ConcurrentQueue<BuffMessage> _receiveMsgs;private TcpClient _tcpClient;private SocketState _socketState;private byte[] _recvBuff;private int _recvOffset;private int _delay = 10;private CancellationTokenSource _recvCancelToken;private CancellationTokenSource _sendCancelToken;public SocketState State { get { return _socketState; } }public string IP { get; set; }public int Port { get; set; }public NetworkStream Stream{get { return _tcpClient.GetStream(); }}public Tcp(){_sendMsgs = new ConcurrentQueue<BuffMessage>();_receiveMsgs = new ConcurrentQueue<BuffMessage>();_recvBuff = new byte[Globals.BUFFER_SIZE];}private void InitTcpClient(){_tcpClient = new TcpClient();_recvCancelToken = new CancellationTokenSource();_sendCancelToken = new CancellationTokenSource();}public void Update(){Profiler.BeginSample("on tcp rpc");if (_receiveMsgs.TryDequeue(out BuffMessage msg)){RPCMoudle.OnRPC(msg);GameFrame.message.PutBuffMessage(msg);}Profiler.EndSample();}public void Connect(string ip, int port){IP = ip;Port = port;Connect();}public async void Connect(){try{Close();InitTcpClient();SetSocketState(SocketState.Connecting);await _tcpClient.ConnectAsync(IP, Port);OnConnect();}catch (Exception ex){LogHelper.LogError(ex.ToString());}}private void OnConnect(){try{if (_tcpClient.Connected){LogHelper.Log("connected...");SetSocketState(SocketState.Connected);StartAsyncTasks();}else{SetSocketState(SocketState.ConnectFailed);}}catch (Exception ex){LogHelper.LogError("连接或通信发生错误:{0}" + ex.Message);SetSocketState(SocketState.ConnectFailed);}}private void StartAsyncTasks(){UniTask send = UniTask.Create(SendThread);UniTask recv = UniTask.Create(RecvThread);}private async UniTask SendThread(){await UniTask.SwitchToThreadPool();while (_socketState == SocketState.Connected){while (true){if (!_sendMsgs.TryDequeue(out BuffMessage msg))break;var timeoutToken = new CancellationTokenSource();timeoutToken.CancelAfterSlim(TimeSpan.FromMilliseconds(msg.TimeoutMillisecond));var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_sendCancelToken.Token, timeoutToken.Token);try{if(_sendCancelToken.IsCancellationRequested)break;await Stream.WriteAsync(msg.bytes, 0, msg.length, linkedCts.Token);LogHelper.Log($"发送完成: {msg.length} byte");GameFrame.message.PutBuffMessage(msg);}catch (OperationCanceledException ex){if (timeoutToken.IsCancellationRequested){_sendMsgs.Enqueue(msg);LogHelper.LogWarning("消息发送超时, 添加到队列末尾, 等待发送...");await UniTask.Delay(10);continue;}LogHelper.LogWarning("发送操作被终止..." + ex.Message);break;}catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.ConnectionAborted){LogHelper.Log("发送操作被终止...");break;}catch (Exception ex){LogHelper.LogError("发送错误: " + ex.Message);break;}}await UniTask.Delay(_delay);}}private async UniTask RecvThread(){await UniTask.SwitchToThreadPool();while (_socketState == SocketState.Connected){try{if (_recvCancelToken.IsCancellationRequested) break;int length = await Stream.ReadAsync(_recvBuff, _recvOffset, _recvBuff.Length - _recvOffset, _recvCancelToken.Token);if (length == 0){LogHelper.Log("connect failed...");break;}_recvOffset += length;int offset = 0;while (true){if (_recvOffset - offset < sizeof(int))// 没有足够的数据读取下一个消息的长度break;int dataLength = BitConverter.ToInt32(_recvBuff, offset);if (_recvOffset - offset < dataLength + sizeof(int))// 没有足够的数据读取完整的消息break;// 读取完整消息BuffMessage msg = GameFrame.message.GetBuffMessage();Buffer.BlockCopy(_recvBuff, offset + sizeof(int), msg.bytes, 0, dataLength);_receiveMsgs.Enqueue(msg);// 移动偏移量到下一个消息offset += sizeof(int) + dataLength;}// 将未处理的数据移到缓冲区开头if (_recvOffset - offset > 0)Buffer.BlockCopy(_recvBuff, offset, _recvBuff, 0, _recvOffset - offset);_recvOffset -= offset;}catch(OperationCanceledException ex){LogHelper.Log("读取操作被终止: " + ex.Message);break;}catch (IOException ex) when (ex.InnerException is SocketException socketEx && socketEx.SocketErrorCode == SocketError.OperationAborted){LogHelper.Log("读取操作被终止...");break;}catch (Exception ex){LogHelper.LogError("读取错误: " + ex.ToString());break;}await UniTask.Delay(_delay);}}private void SetSocketState(SocketState state){_socketState = state;}public void Send(BuffMessage message){if (message.length > 0){int headLength = sizeof(int);Buffer.BlockCopy(message.bytes, 0, message.bytes, headLength, message.length);BitConverter.TryWriteBytes(message.bytes.AsSpan(0), message.length);message.length += headLength;_sendMsgs.Enqueue(message);}else{GameFrame.message.PutBuffMessage(message);}}public void Close(){if (_tcpClient == null)return;try{if (_tcpClient.Connected){SetSocketState(SocketState.Close);_recvCancelToken.Dispose();_sendCancelToken.Dispose();_tcpClient.Close();}}catch (Exception ex){LogHelper.LogError(ex.ToString());}}public void Dispose(){Close();if (_tcpClient != null){_tcpClient.Dispose();_tcpClient = null;}if (_sendMsgs != null){_sendMsgs.Clear();_sendMsgs = null;}if (_receiveMsgs != null){_receiveMsgs.Clear();_receiveMsgs = null;}SetSocketState(SocketState.Dispose);}}
}

项目地址

SimpleRPC

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/349337.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分布式高性能计算 (HPC)的工作负载管理平台和作业调度程序—— IBM Spectrum® LSF® Suites

IBM Spectrum LSF Suites 是面向分布式高性能计算 (HPC) 的工作负载管理平台和作业调度程序。基于 Terraform 的自动化现已可用&#xff0c;该功能可在 IBM Cloud 上为基于 IBM Spectrum LSF 的集群供应和配置资源。 借助我们针对任务关键型 HPC 环境的集成解决方案&#xff0…

CentOS7 配置Nginx域名HTTPS

Configuring Nginx with HTTPS on CentOS 7 involves similar steps to the ones for Ubuntu, but with some variations in package management and service control. Here’s a step-by-step guide for CentOS 7: Prerequisites Domain Name: “www.xxx.com”Nginx Install…

农业领域科技查新点提炼方法附案例!

农业学科是人类通过改造和利用生物有机体(植物、动物、微生物等)及各种自然资源(光、热、水、土壤等)生产出人类需求的农产品的过程&#xff0c;人类在这一过程中所积累的科学原理、技术、工艺和技能&#xff0c;统称为农业科学技术&#xff0c;该领域具有研究范围广、综合性强…

沉睡而且“狡猾”的特工:大模型也可以是!

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调或者LLM背后的基础模型新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则提…

VMware ESXi 8.0U2c macOS Unlocker OEM BIOS ConnectX-3 网卡定制版 (集成驱动版)

VMware ESXi 8.0U2c macOS Unlocker & OEM BIOS ConnectX-3 网卡定制版 (集成驱动版) 发布 ESXi 8.0U2 集成驱动版&#xff0c;在个人电脑上运行企业级工作负载 请访问原文链接&#xff1a;https://sysin.org/blog/vmware-esxi-8-u2-sysin/&#xff0c;查看最新版。原创作…

【Da-SimaRPN】《Distractor-aware Siamese Networks for Visual Object Tracking》

ECCV-2018 中科大 文章目录 1 Background and Motivation2 Related Work3 Advantages / Contributions4 Method4.1 Features and Drawbacks in Traditional Siamese Networks4.2 Distractor-aware Training4.3 Distractor-aware Incremental Learning4.4 DaSiamRPN for Long-t…

GPT办公与科研应用、论文撰写、数据分析、机器学习、深度学习及AI绘图高级应用

原文链接&#xff1a;GPT办公与科研应用、论文撰写、数据分析、机器学习、深度学习及AI绘图高级应用https://mp.weixin.qq.com/s?__bizMzUzNTczMDMxMg&mid2247606667&idx3&sn2c5be84dfcd62d748f77b10a731d809d&chksmfa82606ccdf5e97ad1a2a86662c75794033d8e2e…

基于Pytorch实现AI写藏头诗

网上你找了一圈发现开源的代码不是付费订阅就是代码有问题,基于Pytorch实现AI写藏头诗看我这篇就够了。 用到的工具:华为云ModelArts平台的notebook/Pycharm/Vscode都行。 镜像:pytorch1.8-cuda10.2-cudnn7-ubuntu18.04,有GPU优先使用GPU资源。 实验背景 在短时测试使用场…

字符串及其应用

内容 编写程序实现字符串的基本运算&#xff1a; (1) 求串的长度、两串连接、串比较、子串匹配&#xff1b; (2) 用库函数直接实现上一步的字符申操作 完整代码 #include <iostream> #include <stdio.h> #include<string.h> using namespace std; #define M…

ASUS华硕ROG幻14Air笔记本GA403UI(UI UV UU UJ)工厂模式原厂Windows11系统安装包,带MyASUS in WinRE重置还原

适用型号&#xff1a;GA403UI、GA403UV、GA403UU、GA403UJ 链接&#xff1a;https://pan.baidu.com/s/1tz8PZbYKakfvUoXafQPLIg?pwd1mtc 提取码&#xff1a;1mtc 华硕原装WIN11系统工厂包带有ASUS RECOVERY恢复功能、自带面部识别,声卡,显卡,网卡,蓝牙等所有驱动、出厂主题…

pdf格式转成jpg图片,pdf格式如何转jpg

pdf转图片的方法&#xff0c;对于许多人来说可能是一个稍显陌生的操作。然而&#xff0c;在日常生活和工作中&#xff0c;我们有时确实需要将pdf文件转换为图片格式&#xff0c;以便于在特定的场合或平台上进行分享、展示或编辑。以下&#xff0c;我们将详细介绍一个pdf转成图片…

博客摘录「 AXI三种接口及DMA DDR XDMA介绍(应用于vivado中的ip调用)」2024年6月10日

关键要点&#xff1a; 1.AXI Stream经过协议转换可使用AXI_FULL&#xff08;PS与PL间的接口&#xff0c;如GP、HP和ACP&#xff09;。 2.传输数据类里就涉及一个握手协议&#xff0c;即在主从双方数据通信前&#xff0c;有一个握手的过程。基本内容&#xff1a;数据的传输源会…

探索Jetpack Compose中的高效导航库:Voyager项目

探索Jetpack Compose中的高效导航库&#xff1a;Voyager项目 在Jetpack Compose中实现高效、可扩展的导航是每个开发者的追求。Voyager作为一个多平台导航库&#xff0c;不仅与Jetpack Compose无缝集成&#xff0c;还提供了一套务实的API&#xff0c;帮助开发者创建单活动应用…

数据分析常用6种分析思路(下)

作为一名数据分析师&#xff0c;你又没有发现&#xff0c;自己经常碰到一些棘手的问题就没有思路&#xff0c;甚至怀疑自己究竟有没有好好学过分析&#xff1f; 在上篇文章里&#xff0c;我们讲到了数据分析中的流程、分类、对比三大块&#xff0c;今天&#xff0c;我们继续讲…

哈喽GPT-4o——对GPT-4o Prompt的思考与看法

目录 一、提示词二、提示词的优势1、提升理解能力2、增强专注力3、提高效率 三、什么样的算无效提示词&#xff1f;1、过于宽泛2、含糊不清3、太过复杂4、没有具体上下文5、缺乏明确目标6、过于开放7、使用专业术语但未定义8、缺乏相关性&#xff1a; 四、提示词正确的编写步骤…

Linux2-系统自有服务防火墙与计划任务

一、什么是防火墙 防火墙主要用于防范网络攻击&#xff0c;防火墙一般分为软件防火墙、硬件防火墙 1、Windows中的防护墙设置 2、防火墙的作用 3、Linux中的防火墙分类 Centos6、Centos6>防火墙>iptables防火墙 防火墙系统管理工具 Centos7>防火墙>firewalld防火…

【踩坑】修复Ubuntu远程桌面忽然无法Ctrl C/V复制粘贴及黑屏

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 说在前面&#xff1a; 需要注意的是&#xff0c;我发现他应该是新开了一个窗口给我。我之前打开的东西&#xff0c;在这个新窗口里都没有了&#xff0c…

长亭培训加复习安全产品类别

下面这个很重要参加hw时要问你用的安全产品就有这个 检测类型产品 偏审计 安全防御类型 EDR类似于杀毒软件 安全评估 任何东西都要经过这个机械勘察才能上线 安全管理平台 比较杂 比较集成 审计 漏扫 评估 合在这一个平台 也有可能只是管理 主机理解为一个电脑 安了终端插件…

五、特征缩放和多项式回归

目录 一、为什么要使用特征缩放(Feature Scaling) 1.首先来看预测房价的例子 2.特征缩放前后效果对比 二、特征缩放方法 1.统一除以范围最大值 2.均值归一化(Mean Normalization) 3.Z-score标准化(Z-score Normalization) 4.一些可以接受/不接受的缩放范围 三、如何识别…

C# WPF入门学习主线篇(三十四)—— 图形和动画

C# WPF入门学习主线篇&#xff08;三十四&#xff09;—— 图形和动画 图形和动画是WPF的重要组成部分&#xff0c;能够大幅提升应用程序的用户体验。本篇博客将详细介绍WPF中图形和动画的使用方法&#xff0c;涵盖基本图形绘制、动画创建及多媒体的应用。通过本文&#xff0c;…