Netty-ChannelPipeline

EventLoop可以说是 Netty 的调度中心,负责监听多种事件类型:I/O 事件、信号事件、定时事件等,然而实际的业务处理逻辑则是由 ChannelPipeline 中所定义的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler应用开发的过程中打交道最多的组件,为用户提供了 I/O 事件的全部控制权。

文章目录

  • 一、ChannelPipeline 是什么?🤔️
  • 二、ChannelPipeline 的内部结构🔍
    • 1、HeadContext
    • 2、TailContext
    • 3、addLiast() 方法🔍
  • 三、ChannelPipeline 事件传播机制
  • 四、ChannelPipeline 异常传播机制
  • 五、统一的异常处理器

一、ChannelPipeline 是什么?🤔️

pipeline 有管道,流水线的意思,最早使用在 Unix 操作系统中,可以让不同功能的程序相互通讯,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不同的程序或组件,使它们组成一条直线的工作流。

ChannelPipeline 也是 Netty 中的一个比较重要的组件,从上面的 Channel 实例化过程可以看出,每一个 Channel 实例中都会包含一个对应的 ChannelPipeline 属性。ChannelPipeline维护着处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,可以增加或删除ChannelHandler来实现对不同业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。

二、ChannelPipeline 的内部结构🔍

final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

从 ChannelPipeline 的构造函数可以看出,ChannelPipeline 维护了一组 ChannelHandlerContext 实例组成双向链表。默认会包含 head 和 tail 头尾节点,用来进行一些默认的逻辑处理。我们自定义的ChannelHandler会插入到 head 和 tail 之间,这两个节点在 Netty 中已经默认实现了,它们在ChannelPipeline 中起到了至关重要的作用。

那么你可能会有疑问,为什么这里会多一层 ChannelHandlerContext 的封装呢?

其实这是一种比较常用的编程思想。ChannelHandlerContext用于保存ChannelHandler。ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。

可以试想一下,如果没有ChannelHandlerContext 的这层封装,那么我们在做 ChannelHandler 之间传递的时候。前置后置的通用逻辑就要在每个 ChannelHandler 里都实现一份。

首先我们看下 HeadContext 和 TailContext 的继承关系
在这里插入图片描述

1、HeadContext

通过集成关系我们发现 HeadContext 分别实现了ChannelInboundHandler 和 ChannelOutboundHandler,即 HeadContext 既是 入站处理器,也是出站处理器。

HeadContext是入站第一站出站最后一站。对于1个请求先由HeadContext处理入栈,经过一系列的入栈处理器然后传递到TailContext,由TailContext往下传递经过一系列的出栈处理器,最后再经过HeadContext返回。

2、TailContext

TailContext 只实现了 ChannelInboundHandler 接口。它会在 ChannelInboundHandler 调用链路的最后一步执行,主要用于终止 入站事件传播,例如释放 Message 数据资源等。

TailContext是入站最后一站出站第一站。TailContext节点作为出站事件传播的第一站,仅仅是将出站事件传递给下一个节点。

从整个 ChannelPipeline 调用链路来看,如果由 Channel 直接触发事件传播,那么调用链路将贯穿整个 ChannelPipeline。然而也可以在其中某一个 ChannelHandlerContext 触发同样的方法,这样只会从当前的 ChannelHandler 开始执行事件传播,该过程不会从头贯穿到尾,在一定场景下,可以提高程序性能。

3、addLiast() 方法🔍

addLast() 方法是向 ChannelPipeline 中添加 ChannelHandler 用来进行业务处理,关于ChannelHandler将会在下文中详细讲解!
在这里插入图片描述

三、ChannelPipeline 事件传播机制

入站事件是由I/O线程被动触发,由入站处理器按自下而上的方向处理,在中途可以被拦截丢弃,出站事件由用户handler主动触发,由出站处理器按自上而下的方向处理。
在这里插入图片描述
接下来用一个示例来讲解~

服务端代码,

public class PipelineServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup boss = new NioEventLoopGroup(1);NioEventLoopGroup worker = new NioEventLoopGroup(2);new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(1);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(2);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(3);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(4);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(5);super.write(ctx, msg, promise);}});ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println(6);super.write(ctx, msg, promise);}});}}).bind(8080);}
}

客户端代码,

public class PipelineClient {public static void main(String[] args) throws InterruptedException {new Bootstrap().group( new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println(msg);super.channelRead(ctx, msg);}});ch.pipeline().addLast(new StringEncoder());}}).connect("127.0.0.1", 8080).sync().channel().writeAndFlush("Hello,server!");}
}

依次启动服务端和客户端,服务端打印如下:

1
2
3

以上我们通过 Pipeline 的 addLast 方法分别添加了三个 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter,添加顺序分别是 1 -> 2 -> 3,4 -> 5 -> 6。

此时为什么没有打印 4、5、6呢,即没有触发出站的操作❓

出站处理器只有向channel中写入数据才会触发,我们在第三个 ChannelInboundHandlerAdapter 实现类中加入以下代码!

在这里插入图片描述
通过依次点入,我们发现最终是调用了 tail节点 的writeAndFlush 方法,即TailContext节点作为出站事件传播的第一站!
在这里插入图片描述

最终服务端打印如下:

1
2
3
6
5
4

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

在这里插入图片描述

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
    • 如果注释掉 1 处代码,则仅会打印 1
    • 如果注释掉 2 处代码,则仅会打印 1 2
  • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
    • 如果注释掉 3 处代码,则仅会打印 1 2 3
  • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
    • 如果注释掉 6 处代码,则仅会打印 1 2 3 6
  • ctx.channel().write(msg) vs ctx.write(msg)
    • 都是触发出站处理器的执行
    • ctx.channel().write(msg) 从尾部开始查找出站处理器
    • ctx.write(msg) 是从当前节点找上一个出站处理器
    • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
    • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己

如图,服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序

在这里插入图片描述

四、ChannelPipeline 异常传播机制

ChannelPipeline 事件传播的实现采用了经典的责任链模式,调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢?我们通过修改 第二个 ChannelInboundHandlerAdapter 实现类 的实现来模拟业务逻辑异常:
在这里插入图片描述
由输出结果可以看出 ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点
在这里插入图片描述
如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理,在 TailContext 源码中可以找到具体实现:
在这里插入图片描述

五、统一的异常处理器

在 Netty 应用开发的过程中,良好的异常处理机制会让开发在排查问题的时候事半功倍。虽然 Netty 中 TailContext 提供了兜底的异常处理逻辑,但是在很多场景下,并不能满足我们的需求。假如你需要拦截指定的异常类型,并做出相应的异常处理,应该如何实现呢?

小编个人推荐用户对异常进行统一拦截,然后根据实际业务场景实现更加完善的异常处理机制。

通过异常传播机制的学习,我们应该可以想到最好的方法是在 ChannelPipeline 自定义处理器的末端添加统一的异常处理器!

/*** 自定义异常处理器*/
public static class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof RuntimeException) {System.out.println();log.error("业务异常处理,异常信息:{}", cause.getMessage());}}
}

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/117050.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

专门针对开发人员,攻击者利用Rust获取操作系统信息

近日&#xff0c;研究人员在 Rust 编程语言的 crate 注册表中发现了一些恶意软件包&#xff0c;专门针对开发人员。 Phylum 在上周发布的一份报告中称&#xff0c;这些库是由一个名为 "amaperf "的用户在 2023 年 8 月 14 日至 16 日之间上传的。现已删除的软件包名…

【kubernetes系列】Calico原理及配置

概述 Calico是针对容器&#xff0c;虚拟机和基于主机的本机工作负载的开源网络和网络安全解决方案。 Calico支持广泛的平台&#xff0c;包括Kubernetes&#xff0c;OpenShift&#xff0c;Docker EE&#xff0c;OpenStack和裸机服务。 Calico在每个计算节点都利用Linux Kernel实…

改革企业治理结构,建立国有企业全面预算管理制度

随着我国市场经济的推广&#xff0c;国有企业进入到改革发展的必经之路上&#xff0c;企业应当结合自身实际情况加强成本管控&#xff0c;提高管理效率&#xff0c;为企业的发展提供有力保障。近年来&#xff0c;全面预算管理的理念在国有企业实施范围内不断扩大&#xff0c;加…

企业知识管理的解决方案

人们发现&#xff0c;挖掘知识、创造知识、生产知识&#xff0c;用知识为自己的产品赋予高附加值&#xff0c;才是企业和社会可持续发展的动力之源。所以知识管理越来越受到重视。 知识管理作为一个新兴的管理概念&#xff0c;已经被学术界所接受&#xff0c;但尚未形成一个…

python3+requests:接口自动化测试(二)

前言&#xff1a;上篇文章python3requestsunittest&#xff1a;接口自动化测试&#xff08;一&#xff09;&#xff1a;已经介绍了基于unittest框架的实现接口自动化&#xff0c;但是也存在一些问题&#xff0c;比如最明显的测试数据和业务没有区分开&#xff0c;接口用例不便于…

Android OTA 相关工具(八) 使用 lpadd 添加镜像到 super.img

文章目录 1. lpadd 的编译2. lpadd 的帮助信息3. lpadd 的用法3.1 准备工作empty 的 super 设备镜像raw 格式的 super 设备镜像sparse 格式的 super 设备镜像 3.1 lpadd 分区操作示例 4. 其它 我一直以为没有人会使用 lpadd 工具&#xff0c;就像我以为没有人会去使用 lpmake 手…

Netty源码NioEventLoop解析

带着问题源码 Netty 的 NioEventLoop 是如何实现的&#xff1f;它为什么能够保证 Channel 的操作是线程安全的&#xff1f;Netty 如何解决 JDK epoll 空轮询 Bug&#xff1f;NioEventLoop 是如何实现无锁化的&#xff1f; 一、作用与设计原理 Netty的NioEventLoop并不是一个存…

无涯教程-Android - 应用组件

应用程序组件是Android应用程序的基本组成部分&#xff0c;这些组件需要在应用程序清单文件 AndroidManifest.xml 注册&#xff0c;该文件描述了应用程序的每个组件以及它们如何交互。 Android应用程序可以使用以下四个主要组件- Sr.NoComponents & 描述1 Activities 它们…

2023-9-2 Kruskal算法求最小生成树

题目链接&#xff1a;Kruskal算法求最小生成树 #include <iostream> #include <algorithm>using namespace std;const int N 200010;// 与并查集中的p含义相同 int p[N];struct Edge {int a, b, w;bool operator< (const Edge & W)const{return w < W.w…

浅析Linux SCSI子系统:错误恢复

文章目录 概述SCSI错误恢复处理添加错误恢复命令错误恢复线程scsi_eh_ready_devs IO超时处理相关参考 概述 IO路径是一个漫长的过程&#xff0c;从SCSI命令请求下发到请求完成返回&#xff0c;中间的任何一个环节出现问题都会导致IO请求的失败。从SCSI子系统到低层驱动&#x…

【快手小玩法-弹幕游戏】开发者功能测试报告提交模板

背景 快手有明确的要求&#xff0c;准入和准出更加严格&#xff0c;要求有明确的测试报告。格式如下&#xff1a; *本文参考字节wiki&#xff1a;CP侧测试报告模板(复制填写轻雀文档) 其他文章推荐&#xff1a;【抖音小玩法-弹幕游戏】开发者功能测试报告提交模板 一、前言…

Java后端开发面试题——集合篇

ArrayList底层的实现原理是什么 底层数据结构 ArrayList底层是用动态的数组实现的 初始容量 ArrayList初始容量为0&#xff0c;当第一次添加数据的时候才会初始化容量为10 扩容逻辑 ArrayList在进行扩容的时候是原来容量的1.5倍&#xff0c;每次扩容都需要拷贝数组 添加逻…

MMSegmentation训练自己的语义分割数据集

全流程&#xff0c;训练语义分割数据集 数据标注json转mask 运行源码MMSegmentation模型选择运行部分 数据标注 # 安装 pip install labelme # 启动labelme labelme然后 ctrl N 开启多边形标注即可&#xff0c;命名类为person 之后会保存到同目录下json文件&#xff1a; js…

WordPress(6)网站侧边栏倒计时进度小工具

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 效果图在这里插入图片描述一、添加位置二、主题style.css文件中添加美化1.引入库2.添加自定义的HTML模块效果图 提示:以下是本篇文章正文内容,下面案例可供参考 一、添加位置 在主题中 child.js…

【1++的数据结构】之AVL树

&#x1f44d;作者主页&#xff1a;进击的1 &#x1f929; 专栏链接&#xff1a;【1的数据结构】 文章目录 一&#xff0c;什么是AVL树二&#xff0c;AVL树的插入三&#xff0c;AVL树的旋转3.1 向左旋转3.2 向右旋转3.3 左右双旋3.4 右左双旋 四&#xff0c;验证AVL树是否平衡 …

Data truncation: Out of range value for column ‘id‘ at row 1

错误信息&#xff1a;Data truncation: Out of range value for column id at row 1 数据截断&#xff1a;第1行“id”列的值超出范围 很多人会回复&#xff1a;数据库 类型由int改为 bigInt 我看了表结构 可以放的下的。 是 bigint(20) 没有问题啊。 默认的 bigint 类型…

C语言面试题值反转字符串

知识捡漏本 1.C语言优先级 &#xff1a;左高于高于 右 2.定义宏函数product&#xff0c;调用product后&#xff0c;里面的i和i都是加两次1&#xff0c;i就是两个加2后的i相乘&#xff0c;i是开始的i和1后的i相乘。 3.用i (j4,k 8,m 16);这种定义方法&#xff0c;最终i和最后一…

dji uav建图导航系列()ROS中创建dji_sdk节点包(一)项目结构

文章目录 1、整体项目结构1.1、 目录launch1.2、文件CMakeLists.txt1.3、文件package.xml1.4、目录include1.4、目录srv在ROS框架下创建一个无人机的节点dji_sdk,实现必需的订阅(控制指令)、发布(无人机里程计)、服务(无人机起飞降落、控制权得很)功能,就能实现一个类似…

全球免费编程教育网站:Code.org

全球免费编程教育网站&#xff1a;Code.org 官网地址注册使用 你还在为小朋友的编程教育而发愁吗&#xff1f; 你还在为小朋友放假无聊而头疼吗&#xff1f; 他来了他来了&#xff0c;全球免费编程教育网站来了。 2013年成立的Code.org是一个非营利组织。 它致力于为年轻女子、…

【RISC-V】RISC-V寄存器

一、通用寄存器 32位RISC-V体系结构提供32个32位的整型通用寄存器寄存器别名全称说明X0zero零寄存器可做源寄存器(rs)或目标寄存器(rd)X1ra链接寄存器保存函数返回地址X2sp栈指针寄存器指向栈的地址X3gp全局寄存器用于链接器松弛优化X4tp线程寄存器常用于在OS中保存指向进程控…