服务端界面使用相同的clusterid和serviceid,相同ip地址,不同网关端口号和服务端口号,启动两个silo服务,并使用MySql数据库做Silo间信息同步,实现集群。
silo服务启动代码如下(从nuget下载Microsoft.Orleans.Clustering.AdoNet库):
var clusterID = ClusterID;var serviceID = ServiceID;var siloPort = SiloPort;var gatewayPort = GatewayPort;//本地集群方式部署var primarySiloEndPoint = new IPEndPoint(IPAddress.Parse(PrimarySiloIPAddr), siloPort);var silo = new HostBuilder().UseOrleans(builder =>{builder.UseAdoNetClustering(options =>{options.Invariant = GlobalValueDefinition.MySqlInvariant;options.ConnectionString = GlobalValueDefinition.MySqlConnection;}).Configure<ClusterOptions>(options =>{options.ClusterId = clusterID;options.ServiceId = serviceID;}).ConfigureEndpoints(siloPort: siloPort, gatewayPort: gatewayPort).ConfigureLogging(logging => logging.AddConsole()).UseDashboard(options =>{options.Username = "henreash";options.Password = "123456";options.Host = "*";options.Port = 8081;})//.AddMemoryStreams(GlobalValueDefinition.StreamProviderName) //unget引入 Microsoft.Orleans.Streaming.AddMemoryGrainStorage(GlobalValueDefinition.GrainStorageName).AddPlacementDirector<MyPlacementStrategy>(sp=>new MyPlacementDirector())#region 拦截器.AddIncomingGrainCallFilter(async context =>{if (context.InterfaceMethod.Name == nameof(IHello.HelloCallFilter)){RequestContext.Set("CallFilterValue", "this value was added by the filter");}await context.Invoke();if (context.InterfaceMethod.Name == nameof(IHello.HelloCallFilter)){context.Result = $"{context.Result},added by the filter {clusterID} - {serviceID} - {gatewayPort}";}});#endregion#region 使用NewtonJson做序列化引擎//需引用Microsoft.Orleans.Serialization.NewtonsoftJson包builder.Services.AddSerializer(serializerBuilder =>{serializerBuilder.AddNewtonsoftJsonSerializer(isSupported: type => true/*type.Namespace.StartsWith("Example.Namespace")*/);});#endregion}).Build();await silo.RunAsync();
MySql数据库常量定义:
public const string MySqlInvariant = "MySql.Data.MySqlClient";public const string MySqlConnection = "server=localhost;user id=root;database=orleans_test;port=3306;password=xxxxxx";
客户端启动代码(从nuget下载Microsoft.Orleans.Clustering.AdoNet库):
var clusterID = ClusterID;var serviceID = ServiceID;var serviceID02 = ServiceID02;var PRIMARY_SILO_IP_ADDRESS = IPAddress.Parse(textBox_IPAddr.Text);#region ADO服务器群集模式连接host01 = Host.CreateDefaultBuilder().UseOrleansClient(clientBuilder =>clientBuilder.UseAdoNetClustering(options => {options.Invariant = GlobalValueDefinition.MySqlInvariant;options.ConnectionString = GlobalValueDefinition.MySqlConnection;}).Configure<ClusterOptions>(options =>{options.ClusterId = clusterID;options.ServiceId = serviceID;})#region Stream测试.AddMemoryStreams(GlobalValueDefinition.StreamProviderName)#endregion).Build();await host01.StartAsync();client01 = host01.Services.GetRequiredService<IClusterClient>();#endregion#region 订阅friend1 = client01.GetGrain<IHello>("user1");friend2 = client01.GetGrain<IHello>("user2");Chat c = new Chat();(c as IChatNotify).ReceiveAct += (text, data) => ShowLog($"user1 callback {text} dateLen={data.Length}");chat = client01.CreateObjectReference<IChat>(c);await friend1.Subscribe(chat);Chat c02 = new Chat();(c02 as IChatNotify).ReceiveAct += (text, data) => ShowLog($"user2 callback {text} dateLen={data.Length}");chat02 = client01.CreateObjectReference<IChat>(c02);await friend2.Subscribe(chat02);#endregion
客户端只需指定clusterid、serviceid、mysql数据库连接字符串即可。测试过程中发现这种模式有几秒延时,服务端启动后立即启动客户端并进行连接,可能出现某个silo服务无法连接的情况。
Placement:
比如特定业务场景,服务器上部署了硬件外设,希望每个silo服务器都启动一个grain实例,可使用Placement机制进行制约;
首先定义Placement规则:
public class MyPlacementDirector : IPlacementDirector{public Task<SiloAddress> OnAddActivation(PlacementStrategy strategy, PlacementTarget target, IPlacementContext context){var userID = target.GrainIdentity.Key.ToString();var silos = context.GetCompatibleSilos(target).ToArray();SiloAddress siloAddress = silos.First();if (userID == "user1")siloAddress = silos?.FirstOrDefault(x=>x.Endpoint.Port == 11111) ?? siloAddress;else if(userID == "user2")siloAddress = silos?.FirstOrDefault(x => x.Endpoint.Port == 11112) ?? siloAddress;return Task.FromResult(siloAddress);}}[Serializable]public sealed class MyPlacementStrategy: PlacementStrategy{}[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]public sealed class MyPlacementStrategyAttribute : PlacementAttribute{public MyPlacementStrategyAttribute() : base(new MyPlacementStrategy()) { }}
上面代码做了简单约定,grain user1在端口11111的silo上创建,user2在端口11112的silo上创建。
Grain定义类上增加PlacementAttribute注解,服务端启动silo服务时注册自定义Placement规则:
另外Stream会根据StreamID创建额外的Grain,实际应用需注意。
双向通信过程中,grain向客户端发消息,数据量几十K效率还可以,过大(几兆)导致卡顿。