ChannelFuture
客户端调用connect后返回值为ChannelFuture对象,我们可以利用ChannelFuture中的channel()方法获取到Channel对象。
由于上述代为为客户端实现,若想启动客户端实现连接操作,必须编写服务端代码,实现如下:
package netty.simpleNetty.channel;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;/*** 服务端配合ClientChannelFuture验证Channel关闭的异步以及回调函数实现*/
@Slf4j
public class ServerChannelFuture {public static void main(String[] args) throws InterruptedException {new ServerBootstrap().group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buffer = msg instanceof ByteBuf ? ((ByteBuf) msg) : null;if(buffer != null){byte[] bytes = new byte[16];ByteBuf len = buffer.readBytes(bytes, 0, buffer.readableBytes());log.info(new String(bytes));}}});}}).bind(8000).sync();}
}
随后运行上述客户端代码,观察到日志输出如下:
通过此输出日志发现借用channelFuture对象的channel方法获取的Channel对象并没有创建。
原因分析:connect方法是异步的,意味着不等待连接建立,方法执行就返回了。因此channelFuture对象中不能【立刻】获取到正确的Channel对象。
解决方法:
- 使用sync方法让异步操作同步等待连接建立。
- 使用回调方法,当connect连接建立后主动调用回调函数。
sync让异步操作同步
获取到channelFuture对象后,调用sync()方法,同步等待连接的结束。sync阻塞住当前线程,直到Nio线程连接建立完毕 。先启动服务端再启动客户端日志输出如下:
此时调用channelFuture.channle()方法获取到Channel,随后可以利用获取到的channel向服务端发送消息。
使用回调函数
调用获取的channelFuture对象的addListener()方法向其添加回调函数。
可以将上述代码用Lambda表达式简化如下:
服务端整体代码实现如下:
package netty.simpleNetty.channel;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;/*** 客户端关闭channel异步以及回调函数实现*/
@Slf4j
public class ClientChannelFuture {public static void main(String[] args) throws UnknownHostException, InterruptedException {ChannelFuture channelFuture = new Bootstrap().group(new NioEventLoopGroup(2)).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}}).connect(new InetSocketAddress(InetAddress.getLocalHost(), 8000));log.info("Before sync : {}",channelFuture.channel());
// channelFuture.sync();
// log.info("After sync : {}", channelFuture.channel());channelFuture.addListener((ChannelFutureListener)future -> {log.info("After listen : {}", future.channel());});
// channelFuture.addListener(new ChannelFutureListener() {
// @Override
// public void operationComplete(ChannelFuture future) throws Exception {
// log.info("After listen : {}", future.channel());
// }
// });}
}
CloseFuture
上述操作探讨了调用connect方法建立连接后,根据chanelFuture获取到Channel对象的sync以及回调方法实现。在获取到Channel对象后,执行对应的writeAndFlush()方法发送消息后,需要及时关闭channel。
channel关闭操作的实现有sync同步等待以及添加回调的实现方法:代码如下:
package netty.simpleNetty.channel;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Scanner;/*** Test CloseFuture的异步以及回调实现*/
@Slf4j
public class ClientCloseFuture {public static void main(String[] args) throws InterruptedException, UnknownHostException {Channel channel = new Bootstrap().group(new NioEventLoopGroup(2)).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));ch.pipeline().addLast(new StringEncoder());}}).connect(new InetSocketAddress(InetAddress.getLocalHost(), 8000)).sync().channel();log.info("channel : {}", channel);new Thread(()->{Scanner scanner = new Scanner(System.in);while (true){String line = scanner.nextLine();if("q".equals(line)){// channel的close也是一个异步操作,本线程不会执行close操作,而是交由一个其他线程执行close操作channel.close();break;}channel.writeAndFlush(line);}}).start();ChannelFuture closeFuture = channel.closeFuture();log.info("waiting close...");closeFuture.sync();log.info("closed...");channel.closeFuture().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info("closed...");}});// channel.closeFuture().addListener((ChannelFutureListener) channelFuture1 ->{
// log.info("closed...");
// });}
}
异步提升
针对上述connect以及close操作,真正执行对应方法是在一个新的线程中执行的,而不是在调用connect的线程中执行连接操作。
为什么不在一个线程中去执行建立连接、去执行关闭 channel【建立连接connet,关闭channel都是在EventLoopGroup中执行的。也即nio中执行的】,那样不是也可以吗?非要用这么复杂的异步方式:比如一个线程发起建立连接,另一个线程去真正建立连接。
因为 netty 异步方式用了多线程、多线程就效率高。(理解错误的)
netty中的异步指的是建立连接connet、读取数据read、关闭channel等不再一个线程中。而是由专门的线程(handler、EventLoop)负责执行专门的操作。
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96。
经研究发现,看病可以细分为四个步骤,经拆分后每个步骤需要 5 分钟,如下
因此可以做如下优化,只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍。
单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
异步并没有缩短响应时间,反而有所增加。【这里所说的响应时间的增加主要指的是不同子任务之间等待的耗时。】
合理进行任务拆分,也是利用异步的关键