Guava-EventBus 源码解析


EventBus 采用发布订阅者模式的实现方式,它实现了泛化的注册方法以及泛化的方法调用,另外还考虑到了多线程的问题,对多线程使用时做了一些优化,观察者模式都比较熟悉,这里会简单介绍一下,重点介绍的是如何泛化的进行方法的注册以及调用,还有在单个线程和多线程不同的实现方式。

#发布订阅者模式
观察者模式又名发布订阅者模式,类结构图如下:

观察者类结构图

package com.dfan.设计模式.观察者模式;
import java.util.Arrays;
import java.util.List;public class ObeserverMode {static class TestResult {private List<IObserver> iObservers;public TestResult() {}public void register(IObserver iObserver){this.iObservers.add(iObserver);}public void declaration() {System.out.println("this is result");}public void notifyRunners(List<IObserver> iObservers) {for(IObserver iObserver : iObservers) {iObserver.run(this);}}public  void notifyRunner(IObserver iObserver){iObserver.run(this);}}static class UITestResult extends TestResult{public UITestResult() {super();}public void declaration() {System.out.println("i am ui test result");}public  void notifyRunner(IObserver iObserver){iObserver.run(this);}}interface IObserver{void run(TestResult testResult) ;}static class TestObserver implements IObserver {private TestResult testResult;public TestResult createTestResult(){return new TestResult();}public void run(TestResult testResult) {this.testResult = testResult;System.out.println("this is test obeserver");testResult.declaration();}}static class TestObserver1 implements IObserver {private TestResult testResult;public TestResult createTestResult(){return new TestResult();}public void run(TestResult testResult) {testResult = createTestResult();System.out.println("this is test obeserver 1");testResult.declaration();}}public static void main(String[] args) {IObserver testRunner = new TestObserver();UITestResult uiTestResult = new UITestResult();uiTestResult.notifyRunner(testRunner);System.out.println("华丽分割线");List<IObserver> observers = Arrays.asList(new TestObserver1(), new TestObserver());uiTestResult.notifyRunners(observers);}
}

#Guava EventBus
Guva中EventBus的机制就是观察者模式,因此符合观察者模式的一般结构:

监听者:监听来自被监听者的变更事件,完成动作变更
被监听者:发送变更事件给监听者,使监听者监听到变更事件后,完成动作变更

EventBus的用法简单总结为一句话就是:

订阅者向EventBus进行事件注册(register),表示对这个事件关心;
EventBus会向所有订阅发布者事件的订阅者进行事件的发送(post)

EventBus 区分 同步模式和异步模式,下面将根据这两个点进行展开
##同步模式
###向EventBus进行注册

/*** Returns all subscribers for the given listener grouped by the type of event they subscribe to.*/private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();Class<?> clazz = listener.getClass();for (Method method : getAnnotatedMethods(clazz)) {Class<?>[] parameterTypes = method.getParameterTypes();Class<?> eventType = parameterTypes[0];methodsInListener.put(eventType, Subscriber.create(bus, listener, method));}return methodsInListener;}/*** Registers all subscriber methods on the given listener object.*/void register(Object listener) {Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {Class<?> eventType = entry.getKey();Collection<Subscriber> eventMethodsInListener = entry.getValue();CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);}eventSubscribers.addAll(eventMethodsInListener);}}
  • 其中 findAllSubscribers方法 目的是获取所有添加注解@Subscriber的方法,并将根据当前EventBusListener、以及加有@Subscriber注解的方法生成的Subscribe作为 Multimap<Class<?>, Subscriber>的value返回(其中key为方法[注释]的入参)

  • registerSubscriber注册到集合private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();中,其中该Map的key为EventType(即方法[注释]入参)。

EventBus发送事件给所有订阅者

/*** Gets an iterator representing an immutable snapshot of all subscribers to the given event at* the time this method is called.*/Iterator<Subscriber> getSubscribers(Object event) {ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());List<Iterator<Subscriber>> subscriberIterators =Lists.newArrayListWithCapacity(eventTypes.size());for (Class<?> eventType : eventTypes) {CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers != null) {// eager no-copy snapshotsubscriberIterators.add(eventSubscribers.iterator());}}return Iterators.concat(subscriberIterators.iterator());}/*** Posts an event to all registered subscribers.  This method will return* successfully after the event has been posted to all subscribers, and* regardless of any exceptions thrown by subscribers.** <p>If no subscribers have been subscribed for {@code event}'s class, and* {@code event} is not already a {@link DeadEvent}, it will be wrapped in a* DeadEvent and reposted.** @param event  event to post.*/public void post(Object event) {Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);if (eventSubscribers.hasNext()) {dispatcher.dispatch(event, eventSubscribers);} else if (!(event instanceof DeadEvent)) {// the event had no subscribers and was not itself a DeadEventpost(new DeadEvent(this, event));}}
  • getSubscribers 根据刚才提到的参数类型会查找对应的Subscribe,而且不止查指定的类型,还会对这个类型的继承体系上的其他参数类型也会查,比如对于String类型,他会找Serializable,CharSequence,Comparable,Object四种类型,
    举个例子说明下这种情况,在这个例子中,会有两个task被执行,分别是task1和task3
public class EventBusSyncEx {static class SimpleListener1 {/***订阅方式,通过@Subscribe进行事件订阅,方法名随意**/@Subscribepublic void task1(String s) {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("listener1 do task , String param:" + s);}@Subscribepublic void task3(Object s) {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("listener1 do task , Object param:" + s);}@Subscribepublic void task1(Integer s) {System.out.println("listener1 do task , int  param:" + s);}}public static class SimpleEventBusExample {public static void main(String[] args) {EventBus eventBus = new EventBus();eventBus.register(new SimpleListener1());System.out.println("Post Simple EventBus Example");eventBus.post("Simple EventBus Example");}}
}
  • subscribers 就是刚才注册subscriber的集合(Map<ParamType, Subscribers>),通过getSubscribers获取到了Subscribe之后,下面就是要根据这个Event的type来执行对应的Event了,首先这里引入一个属性dispatcher (事件分发器 : 用于分发事件给订阅对象的事件处理器,该对象在EventBus构造方法内部初始化,默认的实现是,该分发器将事件存入队列).

    PerThreadQueuedDispatcher: 默认实现,该分发器将事件存入队列,并保证在同一个线程上发送的事件能够按照他们发布的顺序被分发给所有的订阅者。

    private static final class PerThreadQueuedDispatcher extends Dispatcher 
    // This dispatcher matches the original dispatch behavior of EventBus.
    /*** Per-thread queue of events to dispatch.*/
    private final ThreadLocal<Queue<Event>> queue =new ThreadLocal<Queue<Event>>() {@Overrideprotected Queue<Event> initialValue() {return Queues.newArrayDeque();}};/*** Per-thread dispatch state, used to avoid reentrant event dispatching.*/
    private final ThreadLocal<Boolean> dispatching =new ThreadLocal<Boolean>() {@Overrideprotected Boolean initialValue() {return false;}};@Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);checkNotNull(subscribers);Queue<Event> queueForThread = queue.get();queueForThread.offer(new Event(event, subscribers));if (!dispatching.get()) {dispatching.set(true);try {Event nextEvent;while ((nextEvent = queueForThread.poll()) != null) {while (nextEvent.subscribers.hasNext()) {nextEvent.subscribers.next().dispatchEvent(nextEvent.event);}}} finally {dispatching.remove();queue.remove();}}
    }private static final class Event {private final Object event;private final Iterator<Subscriber> subscribers;private Event(Object event, Iterator<Subscriber> subscribers) {this.event = event;this.subscribers = subscribers;}
    }
    }
    
    • 这段代码有三个关键点需要注意:
      1. PerThreadQueuedDispatcher 通过Queue来对Event进行存储
      2. Queue以及 dispatching都是ThreadLocal变量,也就意味着每个线程维护自己的一个变量,即线程安全的
      3. nextEvent.subscribers.next().dispatchEvent(nextEvent.event);调用了 Subscribe的dispatchEvent (类似于文中开篇所讲的Observer模式中的被监听者中的iObserver.run(this),只是Observer模式中,是在被监听者中执行的,而EventBus中是在dispatcher中执行的) ,如果继续跟进代码会发现,这个dispatchEvent实际工作就是直接通过反射执行了Method方法(method.invoke(target, checkNotNull(event));)

至此,EventBus的同步执行方式已经分析完成


异步模式

异步模式, 它与同步模式的EventBus的主要区别有两点:

  1. 声明EventBus时,声明为 AsyncEventBus, 而AsyncEventBus的构造函数必须要传入一个 Executor
  2. 在Dispatcher上,AsyncEventBus 采用的事件分发器为 LegacyAsyncDispatcher
/*** Dispatches {@code event} to this subscriber using the proper executor.*/final void dispatchEvent(final Object event) {executor.execute(new Runnable() {@Overridepublic void run() {try {invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}}});}private static final class LegacyAsyncDispatcher extends Dispatcher {/*** Global event queue.*/private final ConcurrentLinkedQueue<EventWithSubscriber> queue =Queues.newConcurrentLinkedQueue();@Overridevoid dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);while (subscribers.hasNext()) {queue.add(new EventWithSubscriber(event, subscribers.next()));}EventWithSubscriber e;while ((e = queue.poll()) != null) {e.subscriber.dispatchEvent(e.event);}}private static final class EventWithSubscriber {private final Object event;private final Subscriber subscriber;private EventWithSubscriber(Object event, Subscriber subscriber) {this.event = event;this.subscriber = subscriber;}}}

之所以异步模式传入Executor就是在通过 dispatchEvent 进行多线程的创建 new ThreadPoolExecutor().excute(new Runnable)
而之所以使用 LegacyAsyncDispatcher 目的还有一个就是这个 Dispatcher中使用的queue是ConcurrentLinkedQueue, 之所以使用这个Queue,后面会有专门的一个讲解。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/350987.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FineReport简单介绍

一、介绍 官网 &#xff1a;FineReport产品简介- FineReport帮助文档 - 全面的报表使用教程和学习资料 报表是以表格、图表的形式来动态展示数据&#xff0c;企业通过报表进行数据分析&#xff0c;进而用于辅助经营管理决策。 FineReport 是一款用于报表制作&#xff0c;分析和…

uniapp中unicloud接入支付宝订阅消息完整教程

经过无数次的尝试,终于还是让我做出来了 准备工作 设置接口加签方式 使用支付宝小程序订阅消息,首先要设置接口加签方式,需要下载支付宝开放平台密钥工具,按照步骤生成秘钥,然后按照支付宝设置密钥加签方式添加接口加签方式。 有一点需要注意的,因为要在云函数中使用,…

Mac M3 Pro安装Hadoop-3.3.6

1、下载Hadoop安装包 可以到官方网站下载&#xff0c;也可以使用网盘下载 官网下载地址&#xff1a;Hadoop官网下载地址 网盘地址&#xff1a;https://pan.baidu.com/s/1p4BXq2mvby2B76lmpiEjnA?pwdr62r提取码: r62r 2、解压并添加环境变量 # 将安装包移动到指定目录 mv …

基于flask的网站如何使用https加密通信-问题记录

文章目录 项目场景&#xff1a;问题1问题描述原因分析解决步骤解决方案 问题2问题描述原因分析解决方案 参考文章 项目场景&#xff1a; 项目场景&#xff1a;基于flask的网站使用https加密通信一文中遇到的问题记录 问题1 问题描述 使用下面的命令生成自签名的SSL/TLS证书和…

大模型基础——从零实现一个Transformer(3)

大模型基础——从零实现一个Transformer(1)-CSDN博客 大模型基础——从零实现一个Transformer(2)-CSDN博客 一、前言 之前两篇文章已经讲了Transformer的Embedding,Tokenizer,Attention,Position Encoding, 本文我们继续了解Transformer中剩下的其他组件. 二、归一化 2.1 L…

红队攻防渗透技术实战流程:中间件安全:JettyJenkinsWeblogicWPS

红队攻防渗透实战 1. 中间件安全1.1 中间件-Jetty-CVE&信息泄漏1.2 中间件-Jenkins-CVE&RCE执行1.2.1 cve_2017_1000353 JDK-1.8.0_291 其他版本失效1.2.2 CVE-2018-10008611.2.3 cve_2019_100300 需要用户帐号密码1.3 中间件-Weblogic-CVE&反序列化&RCE1.4 应…

使用python绘制三维曲线图

使用python绘制三维曲线图 三维曲线图定义特点 效果代码 三维曲线图 三维曲线图&#xff08;3D曲线图&#xff09;是一种用于可视化三维数据的图表&#xff0c;它展示了数据在三个维度&#xff08;X、Y、Z&#xff09;上的变化。 定义 三维曲线图通过在三维坐标系中绘制曲线…

数据结构之线性表(4)

前面我们了解到线性表中的顺序表、链表等结构&#xff0c;今天我们探讨新的一种线性表——栈。 那么我们开始栈的探讨之旅吧。 1.栈的基本概念 1.1栈&#xff08;Stack&#xff09;&#xff1a; 是只允许在一端进行插入或删除的线性表。首先栈是一种线性表&#xff0c;但限定…

sudo 用户切换

切换到centos 用户 sudo -i -u centos 解决centos sudo执行仍旧显示Permission denied 方法一&#xff08;建议&#xff09; 暂时切换到root用户 sudo -i然后执行命令即可 方法二 赋给当前用户权限&#xff1a; sudo chmod -R 777 目录路径 sudo chmod 777 文件路径.txt…

IDEA 设置主题、背景图片、背景颜色

一、设置主题 1、点击菜单 File -> Settings : 点击 Settings 菜单 2、点击 Editor -> Color Scheme -> Scheme, 小哈的 IDEA 版本号为 2022.2.3 , 官方默认提供了 4 种主题&#xff1a; Classic Light &#xff08;经典白&#xff09; ;Darcula &#xff08;暗黑主…

2.2 抽头

目录 为什么要抽头 什么是抽头 接入系数 怎么抽头 信号源端抽头 负载端抽头 例题分析 要点总结 为什么要抽头 阻抗转换&#xff0c;使信号源内阻Rs与负载电阻RL变得很大&#xff0c;分流小&#xff0c;再使用并联方式。 什么是抽头 接入系数 电容越大&#xff0c;分压越…

【RabbitMQ】异步消息及Rabbitmq安装

https://blog.csdn.net/weixin_73077810/article/details/133836287 https://www.bilibili.com/video/BV1mN4y1Z7t9/ 同步调用和异步调用 如果我们的业务需要实时得到服务提供方的响应&#xff0c;则应该选择同步通讯&#xff08;同步调用&#xff09;。 如果我们追求更高的效…

MySQL-连接查询

049-内连接之等值连接 案例&#xff1a;查询每个员工所在的部门名称&#xff0c;要求显示员工名、部门名。 select e.ename, d.dname from emp e inner join dept d on e.deptnod.deptno;注意&#xff1a;inner可以省略 select e.ename, d.dname from emp e join dept d on…

Vue49-props属性

一、当同一个组件标签被使用多次 因为data属性写的是函数形式&#xff01; 二、需求&#xff1a;老王也想用<Student>组件&#xff0c;但是需要动态把老王想要的值传进来。 2-1、使用props属性接收参数 使用props属性&#xff0c;接收的这三个参数&#xff0c;是被保存在…

vs+qt5.0 使用poppler 操作库

Poppler 是一个用来生成 PDF 的C类库&#xff0c;从xpdf 继承而来。vs编译库如下&#xff1a; vs中只需要添加依赖库即可 头文件&#xff1a;

部署LVS-DR群集...

目录 最后一台主机&#xff08;第四台&#xff09; 本地yum源安装httpd&#xff08;非必做&#xff09; 继续开始从最后一台主机开始&#xff08;第四台&#xff09; 转第二台主机 转第三台主机 回第二台 上传 转第三台主机 上传 回第二台 转第三台 转第一台主机…

试论地产需求政策的有效性边界

分析师通过对传统框架因子的分析和美日地产的回顾&#xff0c;指出收入政策将成为核心&#xff0c;测算认为地方收储面积约0.5-1.1亿平、收储资金0.8-1.9万亿元&#xff0c;70城二手房价降幅收窄至[-4.5%&#xff0c;-1.6%]。 事件&#xff1a;2024年5月17日&#xff0c;央行印…

异常封装类统一后端响应的数据格式

异常封装类 如何统一后端响应的数据格式 1. 背景 后端作为数据的处理和响应&#xff0c;如何才能和前端配合好&#xff0c;能够高效的完成任务&#xff0c;其中一个比较重要的点就是后端返回的数据格式。 没有统一的响应格式&#xff1a; // 第一种&#xff1a; {"dat…

数 组

概述 数组是一个引用类型&#xff0c;是一种容器。 数组存储多个相同数据类型的数据&#xff0c;允许自动类型转换。例如 int 类型的数组&#xff0c;可以存放 byte、short 和 int 类型的数据&#xff0c;double 类型的数组&#xff0c;可以存放 byte、short、int、long、floa…

【Apache Doris】周FAQ集锦:第 5 期

【Apache Doris】周FAQ集锦&#xff1a;第 5 期 SQL问题数据操作问题运维常见问题其它问题关于社区 欢迎查阅本周的 Apache Doris 社区 FAQ 栏目&#xff01; 在这个栏目中&#xff0c;每周将筛选社区反馈的热门问题和话题&#xff0c;重点回答并进行深入探讨。旨在为广大用户和…