Netty学习——源码篇2 客户端Bootstrap(二)

接上篇 Bootstrap源码-客户端

1 Handler的添加过程

        Netty有一个强大和灵活之处就是基于Pipeline的自定义Handler机制。基于此,可以像添加插件一样自由组合各种各样的Handler来完成业务逻辑。例如,需要处理HTTP数据,那么就可以在Pipeline前添加一个针对HTTP编解码的Handler,然后添加自己的业务逻辑的Handler,这样网络上的数据流就像通过一个管道一样,从不同的Handler中流过并进行编解码,最终到达自定义的Handler中。

        先看一下Handler是如何及何时添加到ChannelPipeline中的。看一段用户代码片段:

bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new ChatClientHandler());System.out.println("初始化channel:" + socketChannel);}});

        这段代码就实现了Handler的添加功能,Bootstrap的handler方法接收一个ChannelHandler,而我们传入的参数是一个派生于抽象类ChannelInitializer的匿名类,它也实现了ChannelHandler接口,来看一下ChannelInitializer类,代码如下:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);private final ConcurrentMap<ChannelHandlerContext, Boolean> initMap = PlatformDependent.newConcurrentHashMap();public ChannelInitializer() {}protected abstract void initChannel(C var1) throws Exception;public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();} else {ctx.fireChannelRegistered();}}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);ctx.close();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {this.initChannel(ctx);}}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (this.initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {try {this.initChannel(ctx.channel());} catch (Throwable var6) {this.exceptionCaught(ctx, var6);} finally {this.remove(ctx);}return true;} else {return false;}}private void remove(ChannelHandlerContext ctx) {try {ChannelPipeline pipeline = ctx.pipeline();if (pipeline.context(this) != null) {pipeline.remove(this);}} finally {this.initMap.remove(ctx);}}
}

        ChannelInitializer是一个抽象类,它有一个抽象方法initChannel(),我们的匿名类正是实现了这个方法,并在这个方法中添加了自定义的Handler。这个initChannel方法是在ChannelInitializer的channelRegistered()方法中被调用的。

        接下来关注一下channelRegistered方法。从上面的代码中可以看到,在channelRegistered方法中,会调用initChannel方法,将自定义的Handler添加到ChannelPipeline中,然后调用ctx.pipeline().remove(this)方法将自己从ChannelPipeline中删除。

        一开始,ChannelPipeline中只有三个Handler,分别是Head、Tail和我们添加的ChannelInitializer,如下图所示:

        接着调用 initChannel方法,添加自定义的Handler,如下图:

        最后将 ChannelInitializer删除,如下图:

        分析到这里,我们已经简单了解了自定义的Handler是如何添加到ChannelPipeline中的,后面在进行深入探讨。

2 客户端发起连接请求 

        经过前面的分析,我们已经大致了解了Netty客户端初始化时所做的工作,接下来分析一下客户端是如何发起TCP连接的。

        客户端通过调用Bootstrap的connect方法进行连接。在connect方法中进行一些参数检查,并调用doConnect方法,代码如下:

private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {final Channel channel = connectPromise.channel();channel.eventLoop().execute(new Runnable() {public void run() {if (localAddress == null) {channel.connect(remoteAddress, connectPromise);} else {channel.connect(remoteAddress, localAddress, connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}});}

        在doConnect方法中,eventLoop线程会调用Channel的connect方法,而这个Channel的具体类型实际就是NioSocketChannel,前面已经分析过。继续跟踪channel.connect()方法,发现它调用的是DefaultChannelPipeline的connect方法,Pipeline的connect方法代码如下:

public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {return this.tail.connect(remoteAddress, promise);}

        前面分析过,Tail是一个TailContext的实例,而TailContext又是AbstractChannelHandlerContext的子类,并且没有实现connect方法,因此这里调用的其实是AbstractChannelHandlerContext的connect方法,看一下这个方法的代码:

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {if (remoteAddress == null) {throw new NullPointerException("remoteAddress");} else if (!this.validatePromise(promise, false)) {return promise;} else {final AbstractChannelHandlerContext next = this.findContextOutbound();EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeConnect(remoteAddress, localAddress, promise);} else {safeExecute(executor, new Runnable() {public void run() {next.invokeConnect(remoteAddress, localAddress, promise);}}, promise, (Object)null);}return promise;}}

        上面有一行非常关键的代码,

final AbstractChannelHandlerContext next = this.findContextOutbound();

        这里调用findContextOutbound方法,从DefaultChannelPipeline内的双向链表的Tail开始,不断向前找到第一个Outbound为true的AbstractChannelHandlerContext,然后调用它的invokeConnect方法,代码如下:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (this.invokeHandler()) {try {((ChannelOutboundHandler)this.handler()).connect(this, remoteAddress, localAddress, promise);} catch (Throwable var5) {notifyOutboundHandlerException(var5, promise);}} else {this.connect(remoteAddress, localAddress, promise);}}

        前面提到,在DefaultChannelPipeline的构造器中,实例化两个对象:Head和Tail,并形成了双向链表的头和尾。Head是HeadContext的实例,它实现了ChannelOunboundHandler接口,并挨它的Ounbound设置为true。因此在findContextOutbound方法中,找到的AbstractChannelHandlerContext对象其实就是Head,进而在invokeConnect方法中,向上转换为ChannelOuntboundHandler就问题了。而又因为HeadContext重写了connect方法,所以实际上调用的是HeadContext的connect方法。接着跟踪HeadContext的connect方法。

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {this.unsafe.connect(remoteAddress, localAddress, promise);}

        这个方法connect很简单,只是调用了Unsafe的connect方法。回顾一下HeadContext的构造器,就会发现这个Unsafe方法其实就是pipeline.channel().unsafe返回的Channel的Unsafe属性。到这里为止,已经知道,其实是AbstractNioByteChannel.NioByteUnsafe内部类转了一大圈。最后,找到创建Socket连接的关键代码继续跟踪,其实调用的就是AbstractNioUnsafe的connect方法。

public final void connect(final SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {if (promise.setUncancellable() && this.ensureOpen(promise)) {try {if (AbstractNioChannel.this.connectPromise != null) {throw new ConnectionPendingException();}boolean wasActive = AbstractNioChannel.this.isActive();if (AbstractNioChannel.this.doConnect(remoteAddress, localAddress)) {this.fulfillConnectPromise(promise, wasActive);} else {AbstractNioChannel.this.connectPromise = promise;AbstractNioChannel.this.requestedRemoteAddress = remoteAddress;int connectTimeoutMillis = AbstractNioChannel.this.config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {AbstractNioChannel.this.connectTimeoutFuture = AbstractNioChannel.this.eventLoop().schedule(new Runnable() {public void run() {ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);if (connectPromise != null && connectPromise.tryFailure(cause)) {AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());}}}, (long)connectTimeoutMillis, TimeUnit.MILLISECONDS);}promise.addListener(new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {if (future.isCancelled()) {if (AbstractNioChannel.this.connectTimeoutFuture != null) {AbstractNioChannel.this.connectTimeoutFuture.cancel(false);}AbstractNioChannel.this.connectPromise = null;AbstractNioUnsafe.this.close(AbstractNioUnsafe.this.voidPromise());}}});}} catch (Throwable var6) {promise.tryFailure(this.annotateConnectException(var6, remoteAddress));this.closeIfClosed();}}}

        在这个connect方法中,又调用了doConnect方法,注意,这个方法并不是AbstractNioUnsafe的方法,而是AbstractNioChannel的抽象方法。doConnect方法是在NioSocketChannel中实现的,因此,进入NioSocketChannel的doConnect方法,代码如下:

 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {if (localAddress != null) {this.doBind0(localAddress);}boolean success = false;boolean var5;try {boolean connected = this.javaChannel().connect(remoteAddress);if (!connected) {this.selectionKey().interestOps(8);}success = true;var5 = connected;} finally {if (!success) {this.doClose();}}return var5;}

        上面代码的功能是,首先获取Java NIO的SocketChannel,然后获取NioSocketChannel 的newSocket方法返回的SocketChannel对象;在调用SocketChannel的connect方法完成Java NIO底层的Socket连接。总结一下,客户端Bootstrap发起连接请求的流程可以用如下时序图直观展示:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/278914.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于java+springboot+vue实现的电影院选票系统(文末源码+Lw+ppt)23-467

摘 要 时代在飞速进步&#xff0c;每个行业都在努力发展现在先进技术&#xff0c;通过这些先进的技术来提高自己的水平和优势&#xff0c;电影院选票系统当然不能排除在外。电影院选票系统是在实际应用和软件工程的开发原理之上&#xff0c;运用java语言&#xff0c;前台Vue框…

Hive:数据仓库利器

1. 简介 Hive是一个基于Hadoop的开源数据仓库工具&#xff0c;可以用来存储、查询和分析大规模数据。Hive使用SQL-like的HiveQL语言来查询数据&#xff0c;并将其结果存储在Hadoop的文件系统中。 2. 基本概念 介绍 Hive 的核心概念&#xff0c;例如表、分区、桶、HQL 等。 …

k8s部署hadoop

&#xff08;作者&#xff1a;陈玓玏&#xff09; 配置和模板参考helm仓库&#xff1a;https://artifacthub.io/packages/helm/apache-hadoop-helm/hadoop 先通过以下命令生成yaml文件&#xff1a; helm template hadoop pfisterer-hadoop/hadoop > hadoop.yaml用kube…

NodeJs利用腾讯云实现手机发送验证码

本文介绍如何在nodejs实现短信发送&#xff0c;以腾讯云的短信验证为例。 腾讯云中准备工作 首先需要腾讯云的个人或者企业认证的账号&#xff0c;个人会赠送一百条&#xff0c;企业赠送一千条&#xff0c;可以用于测试&#xff0c;地址&#xff1a;腾讯云短信服务。然后需要…

电机学(笔记一)

磁极对数p&#xff1a; 直流电机的磁极对数是指电机定子的磁极对数&#xff0c;也等于电机电刷的对数。它与电机的转速和扭矩有直接关系。一般来说&#xff0c;极对数越多&#xff0c;电机转速越低&#xff0c;扭矩越大&#xff0c;适用于低速、高扭矩的场合&#xff1b;相反&…

免 费 搭 建 多模式商城:b2b2c、o2o、直播带货一网打尽

鸿鹄云商 b2b2c产品概述 【b2b2c平台】&#xff0c;以传统电商行业为基石&#xff0c;鸿鹄云商支持“商家入驻平台自营”多运营模式&#xff0c;积极打造“全新市场&#xff0c;全新 模式”企业级b2b2c电商平台&#xff0c;致力干助力各行/互联网创业腾飞并获取更多的收益。从消…

IonQ最新研究突破!引入光量子纠缠以构建量子计算网络

内容来源&#xff1a;量子前哨&#xff08;ID&#xff1a;Qforepost&#xff09; 编辑丨慕一 编译/排版丨沛贤 深度好文&#xff1a;700字丨5分钟阅读 2024年2月22日&#xff0c;美国量子计算公司IonQ宣布&#xff0c;公司研究团队已实现可重复地生成与离子纠缠的光子&#…

Python之Web开发中级教程----Django站点管理

Python之Web开发中级教程----Django站点管理 网站的开发分为两部分&#xff1a;内容发布和公共访问 内容发布是由网站的管理员负责查看、添加、修改、删除数据 Django能够根据定义的模型类自动地生成管理模块 使用Django的管理模块, 需要按照如下步骤操作 : 1.管理界面本地…

图论题目集一(代码 注解)

目录 题目一&#xff1a; 题目二&#xff1a; 题目三&#xff1a; 题目四&#xff1a; 题目五&#xff1a; 题目六&#xff1a; 题目七&#xff1a; 题目一&#xff1a; #include<iostream> #include<queue> #include<cstring> using namespace st…

rviz上不显示机器人模型(模型只有白色)

文档中的是base_footprint&#xff0c;需要根据自己所设的坐标系更改&#xff0c;我的改为base_link 如何查看自己设的坐标系&#xff1a; 这些parent父坐标系就是 同时打开rviz后需要更改成base_link

Java后端八股----JVM篇

上图中线程1&#xff0c;2如果资源被抢占了&#xff0c;则程序计数器记录一下执行的行号&#xff0c;等到资源就绪后会从记录的行号继续向后执行。 Java8把静态变量以及常量放到了线程的本地内存原空间中(避免放在堆中不可控)。 &#x1f446;图中第二种情况不太容易出现…

gPTP简介

1、gPTP&#xff08;generalized precision time protocol&#xff09;广义时钟同步协议 gPTP&#xff08;generalized precision time protocol&#xff09;广义时钟同步协议&#xff0c;即IEEE 802.1AS协议。它是IEEE 1588协议的延伸&#xff0c;可以为TSN提供全局精准…

使用RabbitMQ,关键点总结

文章目录 1.MQ的基本概念2.常见的MQ产品3.MQ 的优势和劣势3.1 优势3.2 劣势 4.RabbitMQ简介4.1RabbitMQ 中的相关概念 1.MQ的基本概念 MQ全称 Message Queue&#xff08;消息队列&#xff09;&#xff0c;是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。…

RabbitMQ 安装保姆级教程

目录 1.MQ引言 1.1 什么是MQ 1.2 MQ有哪些 1.3 不同MQ特点 2.RabbitMQ 的引言 2.1 RabbitMQ 2.2 RabbitMQ 的安装 2.2.1 下载 2.2.2 下载的安装包 2.2.3 安装步骤 3. RabiitMQ 配置 3.1RabbitMQ 管理命令行 3.2 web管理界面介绍 3.2.1 overview概览 3.2.2 Admin用…

Linux的背景介绍

1.Linux的发展史 Linux&#xff0c;一般指GNU/Linux&#xff08;单独的Linux内核并不可直接使用&#xff0c;一般搭配GNU套件&#xff0c;故得此称呼&#xff09;&#xff0c;是一种免费使用和自由传播的类UNIX操作系统&#xff0c;其内核由林纳斯本纳第克特托瓦兹&#xff08…

C到C++的敲门砖-2

文章目录 引用内联函数auto关键字基于范围的for循环指针空值nullptr后记 引用 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空 间&#xff0c;它和它引用的变量共用同一块内存空间。 所谓引用就是给变量起别名&am…

[C语言]指针详解一、数组指针、二维数组传参、函数指针

一、数组指针 对一个数组&#xff0c;如果我们想要让一个指针指向这个数组&#xff0c;我们应该如何定义呢?我们知道一个数组定义本来就是一个指针&#xff0c;那为何要多定义一个数组指针呢?我们来看看下面这个代码就理解了 #include <stdio.h> int main() {int arr…

数据结构——lesson10排序之插入排序

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…

第十五届蓝桥杯模拟考试III_物联网设计与开发官方代码分析

目录 前言&#xff1a;显示界面部分&#xff1a;页面切换:数值的轮回调整&#xff1a;传递数据&#xff1a; 前言&#xff1a; 这次模拟的效果很不好。85分&#xff0c;4h的限时我花了两天完成&#xff0c;这个时间是远远超出要求的&#xff0c;而且最后还只拿到56分&#xff0…

使用 Boot Camp 助理查明您的 Mac 需不需要 Windows 安装介质

使用 Boot Camp 助理查明您的 Mac 需不需要 Windows 安装介质 当前的 Mac 机型无需介质即可安装 Windows&#xff0c;也就是说&#xff0c;您不需要用到外置驱动器。较早的 Mac 机型需要用到 USB 驱动器或光盘驱动器。使用 Boot Camp 助理可查明您需要用到什么。 Boot Camp 助…