将Abp默认事件总线改造为分布式事件总线

文章目录

  • 原理
    • 创建分布式事件总线
    • 实现自动订阅和事件转发
  • 使用
    • 启动Redis服务
    • 配置
    • 传递Abp默认事件
    • 传递自定义事件
  • 项目地址

原理

本地事件总线是通过Ioc容器来实现的。

IEventBus接口定义了事件总线的基本功能,如注册事件、取消注册事件、触发事件等。

Abp.Events.Bus.EventBus是本地事件总线的实现类,其中私有成员ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时,通过订阅表查找所有事件处理器,通过Ioc容器来获取处理器实例,然后通过反射来调用事件处理器的"HandleEvent"方法。

创建分布式事件总线

首先,我们需要一个分布式事件总线中间件,用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有RabbitMQ、Kafka、Redis等。

开源社区已经有实现好的库,本项目参考了 wuyi6216/Abp.RemoteEventBus

这里已经定义好了一个分布式事件总线接口


public interface IDistributedEventBus : IDisposable
{void MessageHandle(string topic, string message);void Publish(IDistributedEventData eventData);void Subscribe(string topic);void Unsubscribe(string topic);void UnsubscribeAll();
}

为了兼容本地事件总线,我们需要定义一个分布式事件总线接口,继承自IEventBus接口。


public interface IMultipleEventBus : IDistributedEventBus, IEventBus
{}

实现自动订阅和事件转发

当注册本地事件时,将订阅分布式事件,事件Topic为类型的字符串表现形式

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{GetOrCreateHandlerFactories(eventType);List<IEventHandlerFactory> currentLists;if (_handlerFactories.TryGetValue(eventType, out currentLists)){lock (currentLists){if (currentLists.Count == 0){//Register to distributed eventthis.Subscribe(eventType.ToString());}currentLists.Add(factory);}}return new FactoryUnregistrar(this, eventType, factory);
}

创建TriggerRemote,此方法用于将本地事件参数打包成为分布式事件消息payload,并发布该消息

public void TriggerRemote(Type eventType, object eventSource, IEventData eventData)
{var exceptions = new List<Exception>();eventData.EventSource = eventSource;try{var payloadDictionary = new Dictionary<string, object>{{ PayloadKey, eventData }};var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);Publish(distributedeventData);}catch (Exception ex){exceptions.Add(ex);}if (exceptions.Any()){if (exceptions.Count == 1){exceptions[0].ReThrow();}throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);}
}

当触发本地事件时,将消息转发至分布式事件总线。
在Trigger方法中调用TriggerRemote,事件状态回调和事件异常回调将不会被转发。

if (!(typeof(DistributedEventBusEvent) == eventType|| typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)|| typeof(DistributedEventMessageHandleExceptionData) == eventType|| typeof(DistributedEventHandleExceptionData) == eventType))
{if (typeof(DistributedEventArgs) != eventType){TriggerRemote(eventType, eventSource, eventData);}
}

在消费端接收到分布式事件消息时,从Topic中解析类型,转发给本地事件。若此类型在本地事件注册过,则将消息反序列化为本地事件参数,然后触发本地事件。
本地事件处理器将触发最终的处理方法。


public virtual void MessageHandle(string topic, string message)
{Logger.Debug($"Receive message on topic {topic}");try{var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message);var eventArgs = new DistributedEventArgs(eventData, topic, message);Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));if (!string.IsNullOrEmpty(eventData.Type)){string pattern = @"(.*?)\[(.*?)\]";Match match = Regex.Match(eventData.Type, pattern);if (match.Success){var type = match.Groups[1].Value;var type2 = match.Groups[2].Value;var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault();var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();if (localTriggerType != null && genericType != null){if (localTriggerType.GetTypeInfo().IsGenericType&& localTriggerType.GetGenericArguments().Length == 1&& !genericType.IsAbstract && !genericType.IsInterface){var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject = (payload as JObject).ToObject(localTriggerGenericType);Trigger(localTriggerGenericType, this, (IEventData)payloadObject);}}}}else{var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault();if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface){if (eventData.Data.TryGetValue(PayloadKey, out var payload)){var payloadObject = (payload as JObject).ToObject(localTriggerType);Trigger(localTriggerType, this, (IEventData)payloadObject);}}}Trigger(this, new DistributedEventBusHandledEvent(eventArgs));}}catch (Exception ex){Logger.Error("Consume remote message exception", ex);Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic));}
}

使用

DistributedEventBus有不同的实现方式,这里以Redis为例

启动Redis服务

下载Redis并启动服务,使用默认端口6379

配置

生产者和消费者端都需要配置分布式事件总线

首先引用Abp.DistributedEventBus.Redis,并配置Abp模块依赖

[DependsOn(typeof(AbpDistributedEventBusRedisModule))]

在PreInitialize方法中配置Redis连接信息

 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting =>{setting.Server = "127.0.0.1:6379";});

用MultipleEventBus替换Abp默认事件总线

 //todo: 事件总线Configuration.ReplaceService(typeof(IEventBus),() => IocManager.IocContainer.Register(Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()));

传递Abp默认事件

我们知道在使用仓储时,Abp会自动触发一些事件,如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。

定义一个实体类,用于传递实体的增删改事件。


public class Person : FullAuditedEntity<int>
{public string Name { get; set; }public int Age { get; set; }public string PhoneNumber { get; set; }}

在消费者端,定义一个事件处理器,用于处理实体的增删改事件。


public class RemoteEntityChangedEventHandler :IEventHandler<EntityUpdatedEventData<Person>>,IEventHandler<EntityCreatedEventData<Person>>,IEventHandler<EntityDeletedEventData<Person>>,ITransientDependency
{void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData){var person = eventData.Entity;Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");}
}

在生产者端,用IRepository对实体进行增删改操作。


var person = new Person()
{Name = "John",Age = 36,PhoneNumber = "18588888888"};personRepository.Insert(person);var person2 = new Person()
{Name = "John2",Age = 36,PhoneNumber = "18588888889"};
personRepository.Insert(person2);var persons = personRepository.GetAllList();
foreach (var p in persons)
{p.Age += 1;personRepository.Update(p);Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");}
foreach (var p in persons)
{personRepository.Delete(p);Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");}

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了实体的增删改事件。

在这里插入图片描述

注意:

分布式事件总线在两个独立系统间传递事件,所以需要定义一个共同的类型对象,用于事件参数的传递。
因此消费者端需要引用生产者端的模块,以便获取共同的类型对象。

public override Assembly[] GetAdditionalAssemblies()
{var clientModuleAssembly = typeof(Person).GetAssembly();return [clientModuleAssembly];
}

传递自定义事件

定义NotificationEventData,用于传递自定义事件。


public class NotificationEventData : EventData
{public int Id { get; set; }public string Title { get; set; }public string Message { get; set; }public bool IsRead { get; set; }
}

在消费者端,定义一个事件处理器,用于处理自定义事件。

public class NotificationEventHandler :IEventHandler<NotificationEventData>,      ITransientDependency
{void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData){Console.WriteLine($"Id: {eventData.Id}");Console.WriteLine($"Title: {eventData.Title}");Console.WriteLine($"Message: {eventData.Message}");Console.WriteLine($"IsRead: {eventData.IsRead}");}
}

在生产者端,触发自定义事件。

var eventBus = IocManager.Instance.Resolve<IEventBus>();eventBus.Trigger<NotificationEventData>(new NotificationEventData()
{Title = "Hi",Message = "Customized definition event test!",Id = 100,IsRead = true,
});

运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了自定义事件。

在这里插入图片描述

项目地址

Github:DistributedEventBus

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/221632.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Keil编译STM32工程,提示__align(4)处语法错误

好久没有用Keil编程&#xff0c;因为别人的代码是用Keil写的&#xff0c;所以又得安装起来&#xff0c;编译时遇到__align(4)的错误提示。 这个问题主要是编译器版本的问题&#xff0c;默认使用的是v6.19版本的编译器&#xff0c;而工程原来使用的是v5版本的&#xff0c;两个编…

B039-SpringMVC基础

目录 SpringMVC简介复习servletSpringMVC入门导包配置前端控制器编写处理器实现Contoller接口普通类加注解(常用) 路径问题获取参数的方式过滤器简介自定义过滤器配置框架提供的过滤器 springMVC向页面传值的三种方式视图解析器springMVC的转发和重定向 SpringMVC简介 1.Sprin…

Windows如何安装使用TortoiseSVN客户端并实现公网访问本地SVN Server

文章目录 前言1. TortoiseSVN 客户端下载安装2. 创建检出文件夹3. 创建与提交文件4. 公网访问测试 前言 TortoiseSVN是一个开源的版本控制系统&#xff0c;它与Apache Subversion&#xff08;SVN&#xff09;集成在一起&#xff0c;提供了一个用户友好的界面&#xff0c;方便用…

接口测试和测试用例分析

只要有软件产品的公司百分之九十以上都会做接口测试&#xff0c;要做接口测试的公司那是少不了接口测试工程师的&#xff0c;接口测试工程师相对于其他的职位又比较轻松并且容易胜任。如果你想从事接口测试的工作那就少不了对接口进行分析&#xff0c;同时也会对测试用例进行研…

解决 Hive 外部表分隔符问题的实用指南

简介&#xff1a; 在使用 Hive 外部表时&#xff0c;分隔符设置不当可能导致数据导入和查询过程中的问题。本文将详细介绍如何解决在 Hive 外部表中正确设置分隔符的步骤。 问题描述&#xff1a; 在使用Hive外部表时&#xff0c;可能会遇到分隔符问题。这主要是因为Hive在读…

同一个数组中对象去重

封装方法 fn1 (tempArr) {this.echartList.map(item > {for (let i 0; i < item.data.length; i) {for (let j i 1; j < item.data.length; j) {if (item.data[i].deviceId item.data[j].deviceId && item.data[i].time item.data[j].time && it…

解决win10下强制设置web浏览器为microsoft edge的方法

目录 问题场景实现方法禁止edge默认选项设置默认浏览器 反思 问题场景 因为一些特殊的原因&#xff0c;我需要第二个浏览器&#xff0c;我的第一个浏览器是google的chrome浏览器&#xff0c;所以我选择的是windows的默认浏览器&#xff0c;就是microsoft edge浏览器&#xff0…

easyExcel生成excel并导出自定义样式------添加复杂表头

easyExcel生成excel并导出自定义样式------添加复杂表头 设置合并竖行单元格&#xff0c;表头设置 OutputStream outputStream ExcelUtils.getResponseOutputStream(response, fileName);//根据数据组装需要合并的单元格Map<String, List<String>> strategyMap …

React学习计划-React16--React基础(二)组件与组件的3大核心属性state、props、ref和事件处理

1. 组件 函数式组件&#xff08;适用于【简单组件】的定义&#xff09; 示例&#xff1a; 执行了ReactDOM.render(<MyComponent/>, ...)之后执行了什么&#xff1f; React解析组件标签&#xff0c;找到了MyComponent组件发现组件是使用函数定义的&#xff0c;随后调用该…

kafka文件存储机制

Topic分为好几个partition分区&#xff0c;每个分区对应于一个log文件&#xff0c;log文件其实是虚的&#xff0c;Kafka采取了分片和索引机制&#xff0c; 将每个partition分为多个segment&#xff08;大小为1G&#xff09;。每个segment包括&#xff1a;“.index”文件、“.lo…

java SSM健身跑步爱好者社区系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM健身跑步爱好者社区系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整 的源代码和数据库&#xff0c;系统…

Linux 操作系统(Vim)

vim 编译器&#xff08;相当于windows中记事本&#xff09; 当在终端窗口直接运行vim命令&#xff0c;会出现以下截图&#xff08;类似手册对vim编译器简单的介绍&#xff09;&#xff1a; vim提供三种基本工作模式&#xff1a; 命令模式(默认模式) 插入模式 末行模式 创建文本…

Qt前端技术:2.QSS

border-style&#xff1a;后边是两个参数的话第一个参数改变上下的style 第二个参数改变左右的style 如果后边是三个参数的话第一个参数改变上边的style第二个参数改变左右的style&#xff0c;第三个参数改变的下边的style 如果后边是四个参数的话对应的顺序为上&#xff0c;右…

C# WPF上位机开发(进度条操作)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 软件上面如果一个操作比较缓慢&#xff0c;或者说需要很长的时间&#xff0c;那么这个时候最好添加一个进度条&#xff0c;提示一下当前任务的进展…

【数据结构】线段树算法总结(区间修改)

知识概览 线段树一般有5个操作&#xff1a; pushup&#xff1a;用子节点更新当前节点信息pushdown&#xff1a;把懒标记往下传build&#xff1a;初始化一棵树modify&#xff1a;修改一个区间query&#xff1a;查询一个区间 不带懒标记&#xff08;支持单点修改&#xff09;的线…

Matlab-修改默认启动路径

Matlab-修改默认启动路径 第一:找到MATLAB的安装路径 第二步&#xff1a;进入到…\toolbox\local下&#xff0c;找到matlabrc.m 第三部&#xff1a;编辑matlabrc.m&#xff0c;在文本最后一行加入启动文件路径

Vue 2.5 入门学习记录

Vue 2.5 入门学习记录 1. 基础知识Vue 是什么Vue引入方式Vue特点Vue实例中的数据事件方法Vue中的属性绑定和双向绑定Vue中的v-if、v-show、v-fortoDoList制作局部组件&全局组件 2. vue-cli工程3. 工程案例实践使用vue-cli实现todoList及删除某个元素全局样式与局部样式 4. …

SVN搭建指导

环境 centos 7.9 SVN安装方式一&#xff1a;yum 1.1 http服务 至今还没有搞定网页版&#xff0c;网页版需要搭建apache http服务。遇到如下问题&#xff1a; centos - svn: Could not open the requested SVN filesystem - Stack Overflow 在试了加777权限&#xff0c;加a…

泽攸科技┃扫描电镜技术原理及应用

扫描电镜&#xff08;Scanning Electron Microscope&#xff0c;简称SEM&#xff09;作为一种新型的多功能电子光学仪器&#xff0c;近几十年来在科学研究和工业应用中发挥了巨大的作用。其工作原理以及技术特点使其在生物学、医学、冶金学等多个学科领域都得到广泛应用&#x…

HP服务器idrac设置以及系统安装

HP服务器idrac设置以及系统安装 一、设置管理口的地址和密码1、HP服务器重新界面选择"F9"进入BIOS&#xff0c;设置iLo5(idrac)的IP和用户名密码。2、选择"系统配置"。3、选择"iLO 4"配置程序。4、网络选项是设置idrac管理口的地址&#xff0c;设…