IdleStateHandler 心跳机制源码详解

优质博文:IT-BLOG-CN

一、心跳机制

Netty支持心跳机制,可以检测远程服务端是否存活或者活跃。心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制。在服务器和客户端之间一定时间内没有数据交互时, 即处于idle状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 即一个PING-PONG交互。当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保TCP连接的有效性。

Netty提供了IdleStateHandler可以对三种类型心跳进行检测,是用来监测连接的空闲情况。然后我们就可以根据心跳情况,来实现具体的处理逻辑,比如说断开连接、重新连接等等。同时,还提供了ReadTimeoutHandlerWriteTimeoutHandler检测连接的有效性。

名称作用
IdleStateHandler当连接的空闲时间(读或者写)太长时,将会触发一个IdleStateEvent事件。然后,你可以通过你的ChannelInboundHandler中重写userEventTrigged方法来处理该事件。
ReadTimeoutHandler如果在指定的事件没有发生读事件,就会抛出这个异常,并自动关闭这个连接。你可以在exceptionCaught方法中处理这个异常。
WriteTimeoutHandler当一个写操作不能在一定的时间内完成时,抛出此异常,并关闭连接。你同样可以在exceptionCaught方法中处理这个异常。

二、IdleStateHandler 简介

IdleStateHandler也是一个ChannelHandler,也需要被载入到ChannelPipeline中,加入我们在服务器端的ChannelInitializer中。我们在channel链中加入了IdleSateHandler,第一个参数是5,单位是秒,那么这样做的意思就是:在服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内该链上的channelRead方法都没有被触发,就会调用userEventTriggered方法:

//创建服务类
ServerBootstrap serverBootstrap = new ServerBootstrap();//boss和worker
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();try {//设置线程池serverBootstrap.group(boss,worker);//设置socket工厂,Channel 是对 Java 底层 Socket 连接的抽象serverBootstrap.channel(NioServerSocketChannel.class);//设置管道工厂serverBootstrap.childHandler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel ch) throws Exception {//设置后台转换器(二进制转换字符串)ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));ch.pipeline().addLast(new StringDecoder());ch.pipeline().addLast(new StringEncoder());ch.pipeline().addLast(new ServerSocketHandler());}});

并且还是个ChannelInboundHandler,是用来处理入站事件的。看下它的构造器:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
名称作用
readerIdleTimeSeconds读超时。即当在指定的时间间隔内没有从Channel读取到数据时,会触发一个READER_IDLEIdleStateEvent事件
writerIdleTimeSeconds写超时。即当在指定的时间间隔内没有数据写入到Channel时,会触发一个WRITER_IDLEIdleStateEvent事件
allIdleTimeSeconds读/写超时。即当在指定的时间间隔内没有读或写操作时,会触发一个ALL_IDLEIdleStateEvent事件

三、IdleStateHandler 源码

【1】handlerAddedhandlerRemovedIdleStateHandler是在创建IdleStateHandler实例并添加到ChannelPipeline时添加定时任务来进行定时检测的,具体在initialize(ctx)方法实现;同时在从ChannelPipeline移除或Channel关闭时,移除这个定时检测,具体在destroy()实现。IdleStateHandlerchannelActive()方法在socket通道建立时被触发。

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isActive() && ctx.channel().isRegistered()) {this.initialize(ctx);}}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {this.destroy();
}public void channelInactive(ChannelHandlerContext ctx) throws Exception {this.destroy();super.channelInactive(ctx);
}

【2】initialize:根据配置的readerIdleTimeWriteIdleTIme等超时事件参数往任务队列taskQueue中添加定时任务task。我先先查看ReaderIdleTimeoutTask的作用。

private void initialize(ChannelHandlerContext ctx) {switch(this.state) {case 1:case 2:return;default:this.state = 1;this.initOutputChanged(ctx);this.lastReadTime = this.lastWriteTime = this.ticksInNanos();if (this.readerIdleTimeNanos > 0L) {// 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中this.readerIdleTimeout = this.schedule(ctx, new IdleStateHandler.ReaderIdleTimeoutTask(ctx), this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (this.writerIdleTimeNanos > 0L) {this.writerIdleTimeout = this.schedule(ctx, new IdleStateHandler.WriterIdleTimeoutTask(ctx), this.writerIdleTimeNanos, TimeUnit.NANOSECONDS);}if (this.allIdleTimeNanos > 0L) {this.allIdleTimeout = this.schedule(ctx, new IdleStateHandler.AllIdleTimeoutTask(ctx), this.allIdleTimeNanos, TimeUnit.NANOSECONDS);}}
}

定时任务添加到对应线程EventLoopExecutor对应的任务队列taskQueue中,在对应线程的run()方法中循环执行。只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state状态设置为1,防止重复初始化。调用initOutputChanged方法,初始化监控出站数据属性

private void initOutputChanged(ChannelHandlerContext ctx) {if (this.observeOutput) {Channel channel = ctx.channel();Unsafe unsafe = channel.unsafe();ChannelOutboundBuffer buf = unsafe.outboundBuffer();// 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数if (buf != null) {this.lastMessageHashCode = System.identityHashCode(buf.current());this.lastPendingWriteBytes = buf.totalPendingWriteBytes();this.lastFlushProgress = buf.currentProgress();}}
}

这边会触发一个ReaderIdleTimeoutTasknextDelay的初始化值为超时秒数readerIdleTimeNanos。如果检测的时候没有正在读,且计算多久没读了,nextDelay -= 当前时间 - 上次读取时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了。则创建IdleStateEvent事件,IdleState枚举值为READER_IDLE,然后调用channelIdle方法分发给下一个ChannelInboundHandler,通常由用户自定义一个ChannelInboundHandler来捕获并处理。

private final class ReaderIdleTimeoutTask extends IdleStateHandler.AbstractIdleTask {ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {super(ctx);}protected void run(ChannelHandlerContext ctx) {long nextDelay = IdleStateHandler.this.readerIdleTimeNanos;if (!IdleStateHandler.this.reading) {nextDelay -= IdleStateHandler.this.ticksInNanos() - IdleStateHandler.this.lastReadTime;}if (nextDelay <= 0L) {IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, IdleStateHandler.this.readerIdleTimeNanos, TimeUnit.NANOSECONDS);boolean first = IdleStateHandler.this.firstReaderIdleEvent;IdleStateHandler.this.firstReaderIdleEvent = false;try {IdleStateEvent event = IdleStateHandler.this.newIdleStateEvent(IdleState.READER_IDLE, first);IdleStateHandler.this.channelIdle(ctx, event);} catch (Throwable var6) {ctx.fireExceptionCaught(var6);}} else {IdleStateHandler.this.readerIdleTimeout = IdleStateHandler.this.schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);}}
}

firstxxxxIdleEvent作用:假设当你的客户端应用每次接收数据是30秒,而你的写空闲时间是25秒,那么,当你数据还没有写出的时候,写空闲时间触发了。实际上是不合乎逻辑的。因为你的应用根本不空闲。

Netty的解决方案是:记录最后一次输出消息的相关信息,并使用一个值firstXXXXIdleEvent表示是否再次活动过,每次读写活动都会将对应的first值更新为true,如果是false,说明这段时间没有发生过读写事件。同时如果第一次记录出站的相关数据和第二次得到的出站相关数据不同,则说明数据在缓慢的出站,就不用触发空闲事件。

总的来说,这个字段就是用来对付 “客户端接收数据奇慢无比,慢到比空闲时间还多” 的极端情况。所以,Netty默认是关闭这个字段的。

总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发·UserEventTriggered`方法:

protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {ctx.fireUserEventTriggered(evt);
}

【3】写任务的run方法逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对 出站较慢数据的判断。

if (hasOutputChanged(ctx, first)) {return;
}

如果这个方法返回true,就不执行触发事件操作了,即使时间到了。看看该方法实现:

private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {if (observeOutput) {// 如果最后一次写的时间和上一次记录的时间不一样,说明写操作进行过了,则更新此值if (lastChangeCheckTimeStamp != lastWriteTime) {lastChangeCheckTimeStamp = lastWriteTime;// 但如果,在这个方法的调用间隙修改的,就仍然不触发事件if (!first) { // #firstWriterIdleEvent or #firstAllIdleEventreturn true;}}Channel channel = ctx.channel();Unsafe unsafe = channel.unsafe();ChannelOutboundBuffer buf = unsafe.outboundBuffer();// 如果出站区有数据if (buf != null) {// 拿到出站缓冲区的 对象 hashcodeint messageHashCode = System.identityHashCode(buf.current());// 拿到这个 缓冲区的 所有字节long pendingWriteBytes = buf.totalPendingWriteBytes();// 如果和之前的不相等,或者字节数不同,说明,输出有变化,将 "最后一个缓冲区引用" 和 “剩余字节数” 刷新if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {lastMessageHashCode = messageHashCode;lastPendingWriteBytes = pendingWriteBytes;// 如果写操作没有进行过,则任务写的慢,不触发空闲事件if (!first) {return true;}}}}return false;
}

如果用户没有设置需要观察出站情况。就返回false,继续执行事件。如果设置了观察出站的情况,且最后一次写的时间和上一次记录的时间不一样,说明写操作刚刚做过了,则更新此值,但仍然需要判断这个first的值,如果这个值还是false,说明在这个写事件是在两个方法调用间隙完成的,或者是第一次访问这个方法,就仍然不触发事件。如果不满足上面的条件,就取出缓冲区对象,如果缓冲区没对象了,说明没有发生写的很慢的事件,就触发空闲事件。反之,记录当前缓冲区对象的hashcode和剩余字节数,再和之前的比较,如果任意一个不相等,说明数据在变化,或者说数据在慢慢的写出去。那么就更新这两个值,留在下一次判断。继续判断first,如果是fasle,说明这是第二次调用,就不用触发空闲事件了。

【4】所有事件的run方法:这个类叫做AllIdleTimeoutTask,表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致,除了这里:

long nextDelay = allIdleTimeNanos;
if (!reading) {// 当前时间减去 最后一次写或读 的时间 ,若大于0,说明超时了nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}

这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。最后调用ctx.fireUserEventTriggered(evt)方法。

通常这个使用的是最多的。构造方法一般是:

pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

读写都是0表示禁用,30表示30秒内没有任务读写事件发生,就触发事件。注意,当不是0的时候,这三个任务会重叠。

四、总结

【1】IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;
【2】Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内没有收到客户端的心跳数据包则认为客户端已经下线,将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,服务端都能感知到,而客户端不能感知到服务端的非正常下线;
【3】要想实现客户端感知服务端的存活情况,需要进行双向的心跳;Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/209023.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vscode非常好用的扩展插件

1、Code Spell Checker&#xff1a; 帮助我们检查单词是否拼写错误&#xff0c;检查规则遵循驼峰拼写法。 2、Color Highlight&#xff1a;高亮显示颜色值 3、Svg Preview&#xff1a; 实时预览svg图片&#xff08;修改width、height、fill等值来实时查看效果&#xff09; 4、…

人工智能(pytorch)搭建模型21-基于pytorch搭建卷积神经网络VoVNetV2模型,并利用简单数据进行快速训练

大家好&#xff0c;我是微学AI&#xff0c;今天给大家介绍一下人工智能(pytorch)搭建模型21-基于pytorch搭建卷积神经网络VoVNetV2模型&#xff0c;并利用简单数据进行快速训练。VoVNetV2模型是计算机视觉领域的一个重要研究成果&#xff0c;它采用了Voice of Visual Residual&…

第十五届蓝桥杯模拟赛(第二期)

大家好&#xff0c;我是晴天学长&#xff0c;本次分享&#xff0c;制作不易&#xff0c;本次题解只用于学习用途&#xff0c;如果有考试需要的小伙伴请考完试再来看题解进行学习&#xff0c;需要的小伙伴可以点赞关注评论一波哦&#xff01;后续会继续更新第三期的。&#x1f4…

wvp gb28181 pro 平台国标级连功能说明

国标28181不同平台之间支持两种连接方式&#xff0c;平级和上下级&#xff0c;WVP目前支持向上级级联。 测试环境 测试平台上级&#xff1a;192.168.10.209&#xff08;Alam centos8&#xff09; 测试平台下级&#xff1a;192.168.10.206&#xff08;ky10_x86&#xff09; 下级…

VUE语法-ref和reactive响应式数据引用

1、响应式概述 在vue中定义一个参数&#xff0c;当这个参数在使用中发生了变化&#xff0c;在页面中对这个数据应用的地方都会同步的发生变化&#xff0c;这个就是数据响应式。 2、创建一个非响应式的参数 该程序中采用的是VUE3的用法&#xff1a; 1、在程序中定义了一个局…

应用于智慧金融的AI边缘计算盒子+AI算法软硬一体化方案

传统金融营业厅存在运营管理模式落后、资源投放不平衡、从业人员培训效果不达预期、客户体验割裂等普遍现象&#xff1b; 部署英码数字金融解决方案&#xff0c;将助力企业从传统金融模式快速向数字金融模式转变&#xff0c;可针对每一个客户定制个性化“一对一”服务&#xff…

【栈和队列(2)】

文章目录 前言队列队列方法队列模拟实现循环队列练习1 队列实现栈 前言 队列和栈是相反的&#xff0c;栈是先进后出&#xff0c;队列是先进先出&#xff0c;相当于排队打饭&#xff0c;排第一的是最先打到饭出去的。 队列 队列&#xff1a;只允许在一端进行插入数据操作&…

MySQL 8创建数据库、数据表、插入数据并且查询数据

我使用的数据库是MySQL 8。 创建数据库 create database Bookbought; -- 创建数据库Bookbought use Bookbought; -- 使用数据库Bookbought创建数据表 创建用户表bookuser。 create table ## 往allbook里边插入数据(id INT PRIMARY KEY AUTO_INCREMENT, -- id 为 主键userna…

Golang数据类型(字符串)

字符串重要概念 根据Go语言官方的定义&#xff1a; In Go, a string is in effect a read-only slice of bytes. 意思是Go中的字符串是一组只读的字节切片&#xff08;slice of bytes&#xff09;&#xff0c;每个字符串都使用一个或多个字节表示&#xff08;当字符为 ASCII 码…

OpenWrt作为旁路由(网关)配置

目录 背景前提条件环境操作步骤物理层连接设置与主路由同一网段禁用IPv6取消LAN接口桥接防火墙配置 背景 本文简介如何配置OpenWrt&#xff0c;使其作为旁路由&#xff08;网关&#xff09;运行。 旁路由大概有以下这几种工作方式&#xff1a; 主路由开DHCP&#xff0c;网关未…

LeetCode刷题---反转链表

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏&#xff1a;http://t.csdnimg.cn/ZxuNL http://t.csdnimg.cn/c9twt 前言&#xff1a;这个专栏主要讲述递归递归、搜索与回溯算法&#xff0c;所以下面题目主要也是这些算法做的 我讲述…

XSS漏洞原理

XSS漏洞介绍&#xff1a; 跨站脚本攻击XSS(Cross Site Scripting)&#xff0c;为了不和层叠样式表(Cascading Style Sheets, CSS)的缩写混淆&#xff0c;故将跨站脚本攻击缩写为XSS。恶意攻击者往Web页面里插入恶意Script代码&#xff0c;当用户浏览该页面时&#xff0c;嵌入We…

【读书笔记】微习惯

周日晚上尝试速读一本书《微习惯》&#xff0c;共七章看了下目录结构并不复杂&#xff0c;计划每章7-8分钟读完&#xff0c; 从20:15-21:00。读的时候&#xff0c;订下闹钟&#xff0c;催促着自己的进度。边读边记了一些要点和微信读书里面的划线。 第六章实践内容最为丰富&…

CnosDB有主复制演进历程

分布式存储系统的复杂性涉及数据容灾备份、一致性、高并发请求和大容量存储等问题。本文结合CnosDB在分布式环境下的演化历程&#xff0c;分享如何将分布式理论应用于实际生产&#xff0c;以及不同实现方式的优缺点和应用场景。 分布式系统架构模式 分布式存储系统下按照数据复…

计算机网络TCP篇①

目录 一、TCP 基本信息 1.1、TCP 的头格式 1.2、什么是 TCP 1.3、什么是 TCP 连接 1.4、TCP 与 UDP 的区别 1.2、TCP 连接建立 1.2.1、TCP 三次握手的过程 1.2.2、为什么是三次握手&#xff1f;不是两次&#xff1f;四次&#xff1f;&#xff08;这个问题真是典中典&am…

C++基础 -34- 输入输出运算符重载

输出运算符重载格式 ostream & operator<<(ostream &out,person a) {cout << a.a << endl;return out; }举例输出运算符重载 #include "iostream"using namespace std;class person {public:person(int a):a(a){}int a; };ostream &…

LabVIEW在调用image.cpp或drawmgr.cpp因为DAbort而崩溃

LabVIEW在调用image.cpp或drawmgr.cpp因为DAbort而崩溃 出现下列问题&#xff0c;如何解决&#xff1f; 1. LabVIEW 程序因image.cpp或drawmgr.cpp中的错误而崩溃 2. 正在通过cRIO-9034运行独立的LabVIEW应用程序&#xff0c;但它因drawmgr.cpp中的错误而崩溃 …

什么是DDoS攻击

DDoS攻击 1. 定义2. DDoS攻击类型2.1 网络层攻击2.2 传输层攻击2.3 应用层攻击 3.DDoS攻击态势特点 1. 定义 分布式拒绝服务&#xff08;DDoS&#xff09;攻击是一种常见的网络攻击形式。攻击者利用恶意程序对一个或多个目标发起攻击&#xff0c;企图通过大规模互联网流量耗尽…

Zabbix监控接收SNMPTrap消息与SNMPTT结合

一.SNMP 协议 1.协议介绍 snmp 协议是日常使用的较多的一种协议&#xff0c;绝大多数网络设备/存储等都支持 snmp 协议&#xff0c;通过此协议可以实现设备状态的监控及管理。 2.主要组成 SNMP 协议包括以下三个部分: SNMP Agent&#xff1a;负责处理 snmp 请求&#xff0c…

Android11适配已安装应用列表

Android11适配已安装应用列表 之前做过已安装应用列表的适配&#xff0c;最近国内版SDK升级到33和隐私合规遇到很多问题&#xff0c;于是把已安装应用列表记录一下&#xff1a; 1、在Android11及以上的适配&#xff1a; package com.example.requestinsttallapplistdemoimpo…