目录
- 1.连接的状态
- 2.reactor netty中的连接状态
- 3. webflux中的io处理
- 4.总结
为什么webflux在io密集型的场景能有效的提升系统吞吐量呢? 是因为它使用的是响应式编程,使用的是NIO,但这里的响应式、nio到底是怎么样的呢?响应式编程上一篇已经做了说明,本文会重点梳理结合了NIO的实现。
netty是java里最流行的nio的通信框架,webflux也是用的它,但却不是直接用的,为了适配响应式编程,对于netty进行了一层封装,使用的是 Reactor Netty
,其从connection上获取请求,并将请求发送至 DispatcherHandler
中。
1.连接的状态
在编程和网络配置的上下文中,一个连接(connection)的生命周期会经历多个状态,每个状态都具有特定的含义。这些状态的出现顺序和可用性可能会因为不同的网络库和框架(如 Netty、Reactor Netty、Java NIO)而有所不同,但大体上会涉及以下几个常见的状态:
-
CONFIGURED:连接已被配置。这个状态意味着连接的各种参数(如超时时间、缓冲区大小、协议版本等)已经被设置完毕,准备开始建立连接。
-
CONNECTING:正在尝试连接。在这个状态下,系统正在尝试与目标服务器建立连接,但连接尚未完成。
-
CONNECTED:连接成功建立。当连接成功建立后,将进入这个状态,此时可以开始进行数据的发送和接收。
-
READY:连接准备就绪。某些框架或系统可能会进一步区分“已连接”和“准备就绪”两个状态。"准备就绪"状态意味着连接不仅已建立,且所有初始化步骤(如握手、认证等)都已成功完成,连接可以用于通信。
-
DISCONNECTING:正在尝试断开连接。在这个状态下,连接正在进行关闭的准备工作,如发送剩余的数据或进行必要的清理操作。
-
DISCONNECTED:连接已经完全断开。此状态表示连接已经被关闭,所有的资源都已释放,且无法再进行数据传输。
-
ERROR:连接出错。这个状态表示在连接的建立、使用或关闭过程中遇到了错误。具体的错误信息通常需要通过日志或异常机制进一步检查。
-
CLOSED:连接被关闭。这个状态往往与 DISCONNECTED 类似,但强调的是连接被有意地关闭,而不是因为错误。
值得注意的是,不同的框架和库可能会有细微的状态差异,一些状态可能会被合并或细分。
2.reactor netty中的连接状态
连接上的状态定义
reactor.netty.ConnectionObserver.State
注意此处是一个interface,在别处还有扩展
connection上有不同的事件,相应的有不同的处理类进行处理,这些处理类都是 ConnectionObserver
的实现类,具体的处理方法为 ConnectionObserver#onStateChange
例如其中的 HttpServerHandle#onStateChange
, 进入执行逻辑前,会判断是否为目标事件
不同的事件类型对应的处理类关系是:
- State.CONNECTED —> TcpServerDoOn
- State.CONFIGURED —> TcpServerHandle
- State.CONFIGURED —> TcpClientDoOn
- State.DISCONNECTING —> PooledConnection
- State.CONFIGURED —> TcpServerDoOnConnection
- State.DISCONNECTING —> TcpServerBind.ChildObserver
- ConnectionObserver.State.CONFIGURED —> HttpClientConnect
- HttpServerState.REQUEST_RECEIVED —> HttpServerHandle
- State.CONFIGURED —> HttpClientDoOn
- HttpClientState.RESPONSE_RECEIVED、State.CONFIGURED —> HttpObserver
- State.CONFIGURED、State.DISCONNECTING —> UdpServerDoOn
- State.CONFIGURED、State.DISCONNECTING、State.RELEASED —> UdpClientDoOn
- State.CONFIGURED、State.DISCONNECTING —> NewConnectionProvider.NewConnectionObserver
3. webflux中的io处理
io线程相关的逻辑:
reactor.netty.http.server.HttpTrafficHandler#channelRead
HttpTrafficHandler 同时是netty的ChannelInboundHandler、ChannelInboundHandler的实现类
reactor.netty.channel.ChannelOperationsHandler#channelRead
ChannelOperationsHandler 是netty的 ChannelInboundHandler 的实现类
通信核心
ChannelOperationsHandler
ChannelOperationsHandler 直接继承io.netty.channel.ChannelInboundHandlerAdapter
, 接入netty后,将netty不同的事件转换成本地的事件
reactor.netty.http.server.HttpServerOperations#onInboundNext
监听connection中的 REQUEST_RECEIVED 事件
outbound\ inbound的注册
上述涉及的有ChannelOperationsHandler
、HttpTrafficHandler
、ChannelOperationsHandler
不同的处理器有对应的类型:
ChannelOperationsHandler
的类型为NettyPipeline.ReactiveBridge
HttpTrafficHandler
的类型为NettyPipeline.HttpTrafficHandler
HttpTrafficHandler
的注册在HttpServerBind
中完成reactor.netty.http.server.HttpServerBind.Http1Initializer#accept
HttpServerOperations
继承ChannelOperations
,其中完成了ChannelOperationsHandler
的注册reactor.netty.channel.ChannelOperations#addReactiveBridge
reactor.netty.tcp.TcpServerBind.ChildObserver#onStateChange
当连接的状态发生变化时调用,例如 http的request或者response
reactor.netty.http.server.HttpServerHandle#onStateChange
HttpServerOperations
和HttpServerHandle
都是reactor.netty ConnectionObserver
的实现类
在该方法中调用了 subscribe 方法
上述方法中调用的 handler.apply()
方法 即为ReactorHttpHandlerAdapter#apply
org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#apply
方法中的 handler.hanlder()
方法调用路径为:
org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.ServerManager#handle
org.springframework.web.server.adapter.HttpWebHandlerAdapter#handle
org.springframework.web.server.handler.FilteringWebHandler#handle
org.springframework.web.server.handler.DefaultWebFilterChain#filter
相关的filter是 org.springframework.web.server.WebFilter
的实现类
此处 DefaultWebFilterChain
中handler即为 DispatcherHandler
:
org.springframework.web.reactive.DispatcherHandler#handle
至此, DispatcherHandler
就到了webflux处理请求的主流程,可参考 webflux源码解析(1)-主流程
4.总结
在Recator Netty
的部分,完成了从connection中接收请求动作,之后请求会传递到了DispatcherHandler
,进入webflux处理请求的主流程,可参考 webflux源码解析(1)-主流程。
这个链路上的 reactor.netty.http.server.HttpServerHandle#onStateChange
方法调用了subscribe 方法,触发了响应式调用链的执行(可参考webflux源码解析(2)-reactor)