Netty学习——源码篇9 Handler其他处理与异步处理

1 ChannelHandlerContext

        每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext,并与ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler进行交互。ChannelHandlerContext不会改变添加到其中的ChannelHandler,因此它是安全的。ChannelHandlerContext、ChannelHandler、ChannelPipeline的关系如下图:

2 Channel的声明周期

        Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。

         一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的ChannelHandler来触发相应的操作。

3 ChannelHandler常用的API

        先看一个Netty中整个Handler体系的类关系图。

        Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是ChannelHandler, ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表

        Netty还提供了一个实现了ChannelHandler的抽象类ChannelHandlerAdapter。 ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个ChannelHandler往下传递到下一个ChannelHandler,直到全部ChannelHandler传递完毕。也可以直接继承于ChannelHandlerAdapter,然后重写里面的方法。

4 ChannelInboundHandler

        ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。

5 异步处理Future

        java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。

        Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。

public interface Future<V> extends java.util.concurrent.Future<V> {boolean isSuccess();boolean isCancellable();Throwable cause();Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);Future<V> sync() throws InterruptedException;Future<V> syncUninterruptibly();Future<V> await() throws InterruptedException;Future<V> awaitUninterruptibly();boolean await(long timeout, TimeUnit unit) throws InterruptedException;boolean await(long timeoutMillis) throws InterruptedException;boolean awaitUninterruptibly(long timeout, TimeUnit unit);boolean awaitUninterruptibly(long timeoutMillis);V getNow();@Overrideboolean cancel(boolean mayInterruptIfRunning);
}

        ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。

public interface ChannelFuture extends Future<Void> {/*** Returns a channel where the I/O operation associated with this* future takes place.*/Channel channel();@OverrideChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelFuture sync() throws InterruptedException;@OverrideChannelFuture syncUninterruptibly();@OverrideChannelFuture await() throws InterruptedException;@OverrideChannelFuture awaitUninterruptibly();boolean isVoid();
}

6 异步执行Promise

        Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。


/*** Special {@link Future} which is writable.*/
public interface Promise<V> extends Future<V> {Promise<V> setSuccess(V result);boolean trySuccess(V result);/*** Marks this future as a failure and notifies all* listeners.** If it is success or failed already it will throw an {@link IllegalStateException}.*/Promise<V> setFailure(Throwable cause);/*** Marks this future as a failure and notifies all* listeners.** @return {@code true} if and only if successfully marked this future as*         a failure. Otherwise {@code false} because this future is*         already marked as either a success or a failure.*/boolean tryFailure(Throwable cause);/*** Make this future impossible to cancel.** @return {@code true} if and only if successfully marked this future as uncancellable or it is already done*         without being cancelled.  {@code false} if this future has been cancelled already.*/boolean setUncancellable();@OverridePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);@OverridePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);@OverridePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);@OverridePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);@OverridePromise<V> await() throws InterruptedException;@OverridePromise<V> awaitUninterruptibly();@OverridePromise<V> sync() throws InterruptedException;@OverridePromise<V> syncUninterruptibly();
}

        ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {@OverrideChannel channel();@OverrideChannelPromise setSuccess(Void result);ChannelPromise setSuccess();boolean trySuccess();@OverrideChannelPromise setFailure(Throwable cause);@OverrideChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);@OverrideChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);@OverrideChannelPromise sync() throws InterruptedException;@OverrideChannelPromise syncUninterruptibly();@OverrideChannelPromise await() throws InterruptedException;@OverrideChannelPromise awaitUninterruptibly();/*** Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.*/ChannelPromise unvoid();
}

        DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下

    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");synchronized (this) {addListener0(listener);}if (isDone()) {notifyListeners();}return this;}private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {if (listeners == null) {listeners = listener;} else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener);} else {listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);}}private void notifyListeners() {EventExecutor executor = executor();if (executor.inEventLoop()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();final int stackDepth = threadLocals.futureListenerStackDepth();if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1);try {notifyListenersNow();} finally {threadLocals.setFutureListenerStackDepth(stackDepth);}return;}}safeExecute(executor, new Runnable() {@Overridepublic void run() {notifyListenersNow();}});}

        从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。

        再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。

public interface GenericFutureListener<F extends Future<?>> extends EventListener {/*** Invoked when the operation associated with the {@link Future} has been completed.** @param future  the source {@link Future} which called this callback*/void operationComplete(F future) throws Exception;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/293369.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis超好用可视化工具--RedisInsight工具安装

RedisInsight 保姆级安装 RedisInsight 是Redis官方出品的可视化redis管理工具&#xff0c;具有很强大的功能。接下来&#xff0c;让我们一起去完成这款炫酷工具的安装 1. RedisInsight 下载 RedisInsight 官方下载地址&#xff0c;https://redis.io/docs/connect/insight/ …

邀请媒体采访报道对企业宣传有何意义?

传媒如春雨&#xff0c;润物细无声的&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 邀请媒体采访报道对企业宣传具有多重意义&#xff1a; 提升品牌知名度和曝光度&#xff1a;媒体是信息传播的重要渠道&#xff0c;通过媒体的报道&#xff0c;企业及其活动、产品能够…

从vrrp、bfd、keepalived到openflow多控制器--理论篇

vrrp 在一个网络中&#xff0c;通常会使用vrrp技术来实现网关的高可用。 vrrp&#xff0c;即Virtual Router Redundancy Protocol&#xff0c;虚拟路由冗余协议。 应用场景 典型的如下面这个例子&#xff1a; 当Router故障后&#xff0c;将会导致HostA-C都无法连接外部的I…

C#.net8创建webapi,使用SqlSugar,仓储模式,DTO,服务层,控制层的综合应用(企业级)

本文源码地址: https://download.csdn.net/download/u012563853/89036104 源码中,也有详细的注释说明。 代码总览: 这是一个综合性比较强的文章,需要有一定的基础,没有基础的人,看了后,会全面的了解一下,有基础的人,看了后会加强认识,更加的巩固,直接在项目中去应…

自贡市第一人民医院:超融合与 SKS 承载 HIS 等核心业务应用,加速国产化与云原生转型

自贡市第一人民医院始建于 1908 年&#xff0c;现已发展成为集医疗、科研、教学、预防、公共卫生应急处置为一体的三级甲等综合公立医院。医院建有“全国综合医院中医药工作示范单位”等 8 个国家级基地&#xff0c;建成高级卒中中心、胸痛中心等 6 个国家级中心。医院日门诊量…

Docker实现原理

namespaces System V IPC概述 System V引入了三种高级进程间的通信机制&#xff1a;消息队列、共享内寸和信号量 IPC对象(消息队列、共享内存和信号量)存在于内核中而不是文件系统中&#xff0c;由用户控制释放&#xff0c;不像管道的释放由内核控制 IPC对象通过其标识符来引…

后端SpringBoot+Mybatis 查询订单数据库奇怪报错加一

排错过程&#xff1a; 看报错意思是SQL语句存在错误&#xff0c;然后使用图形化工具运行这个SQL语句 其实这里稍微细心想一下就能发现问题&#xff0c;但是当时没深入想&#xff0c;就觉得order表前加了数据库名字影响不大&#xff0c;所以感觉SQL语句是没问题的&#xff0c;然…

基于两个单片机串行通信的电子密码锁设计

1.功能 电子号码锁在实际应用中应该有两部分&#xff0c;一部分在外部&#xff0c;有键盘部分和密码显示&#xff1b;另一部分内部&#xff0c;设置密码、显示密码。使用单片机自身带有的串口可以很方便的实现单片机之间的通信&#xff0c;使输入的密码值传送到主机检验是否是…

硬件设备对接协议

硬件设备对接协议是指在硬件设备之间进行数据交换时所遵循的一系列规则和标准。这些协议定义了数据的传输方式、数据格式、通信接口等&#xff0c;以确保设备间能够有效地进行通信和数据交换。以下是一些常见的硬件设备对接协议。北京木奇移动技术有限公司&#xff0c;专业的软…

网络基础二补充——json与http协议

五、市面上常用序列化和反序列化工具 ​ 常用的有&#xff1a;json、protobuf、xml三种方案&#xff1b; 5.1json的使用 1.安装jsoncpp库&#xff0c;是一个第三方的开发库文件&#xff1b; sudo yum install -y jsoncpp-devel2.使用json ​ 经常使用的头文件是json.h&…

气膜室内足球馆:价格究竟是多少?

气膜建筑在体育场馆中的应用越来越广泛&#xff0c;而气膜室内足球馆作为其中的一种代表&#xff0c;备受关注。那么&#xff0c;建设气膜室内足球馆到底需要多少投资呢&#xff1f;轻空间就带您一起来了解一下。 气膜室内足球馆的价格因地区、规格和材料等因素而有所不同。具体…

LLM--提示词Propmt的概念、作用及如何设计提示词

文章目录 1. 什么是提示词&#xff1f;2. 提示词的作用3. 如何设计提示词&#xff1f;3.1. 提供详细的信息3.2. 指定角色3.3. 使用分隔符和特殊符号3.4. 提供示例3.5. 少量示例的思维链&#xff08;COT&#xff09;模型3.6. 思维树&#xff08;TOT&#xff09;模型3.7. 自洽性 …

C++ MFC

C是一种静态数据类型检查的、支持多重编程范式的程序设计语言&#xff0c;支持过程化程序设计、数据抽象、面向对象程序设计、制作图标等泛型程序设计的多种程序设计风格。 MFC(Microsoft Foundation Classes)&#xff0c;是一个微软公司提供的类库&#xff0c;以C类的形式封装…

unity学习(74)——服务器Dispose异常

1.返回的1 2 11是怪物初始化&#xff0c;源代码中也没有 2. 3.客户端中的网络连接初始化如下&#xff1a; 4.不是因为超时&#xff0c;设置10s为超时期限后&#xff0c;客户端和服务器有时依然会报错&#xff01; 5.我感觉就是update中发包给弄坏的&#xff01; 6.不在“帧”…

本地GPU调用失败问题解决3重新配置anaconda环境(成功)

1、右键“以管理员身份”打开anaconda prompt conda create -n python 3.9 2、使用官方下载源的配置 3、修改conda下载超时 conda config --set remote_connect_timeout_secs 60 conda config --set remote_read_timeout_secs 100 查看配置结果conda config --show 配置内…

快速排序---算法

1、算法概念 快速排序&#xff1a;通过一趟排序将待排记录分隔成独立的两部分&#xff0c;其中一部分记录的数据均比另一部分的数据小&#xff0c;则可分别对这两部分记录继续进行排序&#xff0c;以达到震哥哥序列有序。 快速排序的最坏运行情况是O()&#xff0c;比如说顺序数…

蓝桥备赛——贪心

题干 AC Code n, w = map(int, input().split()) # n种类, w核载重 a = [] # [[weight1, value1], [weight2, value2], ...] for _ in range(n):a.append(list(map(int, input().split()))) a.sort(key=lambda x: x[1] / x[0], reverse=True)maxVal = 0for i in a:if i[0…

原生js实现循环滚动效果

原生js实现如下图循环滚动效果 核心代码 <div class"scroll"><div class"blist" id"scrollContainer"><div class"bitem"></div>......<div class"bitem"></div></div> </di…

ViveNAS性能调试笔记(一)

ViveNAS是一个开源的NAS文件服务软件&#xff0c;有一套独立自创的架构&#xff0c;ViveNAS希望能做到下面的目标&#xff1a; - 能支持混合使用高性能的介质(NVMe SSD)和低性能介质&#xff08;HDD&#xff0c;甚至磁带&#xff09;。做到性能、成本动态均衡。因此ViveNAS使用…