Netty组件
1. Bootstrap, ServerBootstrap
Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
2. NioEventLoop, NioEventLoopGroup
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
- I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
- 非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
NioEventLoopGroup,主要管理 EventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。
3. Channel
Netty 网络通信的组件,能够用于执行网络 I/O 操作. 常用Channel类型:
- NioSocketChannel,异步的客户端 TCP Socket 连接。
- NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
- NioDatagramChannel,异步的 UDP 连接。
- NioSctpChannel,异步的客户端 Sctp 连接。
- NioSctpServerChannel,异步的 Sctp 服务器端连接。
4. ChannelHandler
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
可以继承它的子类:
- ChannelInboundHandler 用于处理入站 I/O 事件。
- ChannelOutboundHandler 用于处理出站 I/O 操作。
- ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
- ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
5. ChannelPipline
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
客户端与服务端通信
Netty服务端
public class NettyServer {public static void main(String[] args) throws Exception {//创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {//创建服务器端的启动对象ServerBootstrap bootstrap = new ServerBootstrap();//使用链式编程来配置参数bootstrap.group(bossGroup, workerGroup) //设置两个线程组.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//对workerGroup的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());}});System.out.println("netty server start。。");//绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况//启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕ChannelFuture cf = bootstrap.bind(9000).sync();//对通道关闭进行监听,closeFuture是异步操作,监听通道关闭// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成cf.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 读取客户端发送的数据** @param ctx 上下文对象, 含有通道channel,管道pipeline* @param msg 就是客户端发送的数据* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBufferByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));}/*** 数据读取完毕处理方法** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ByteBuf buf = Unpooled.copiedBuffer("HelloClient", CharsetUtil.UTF_8);ctx.writeAndFlush(buf);}/*** 处理异常, 一般是需要关闭通道** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}
客户端代码:
public class NettyClient {public static void main(String[] args) throws Exception {//客户端需要一个事件循环组EventLoopGroup group = new NioEventLoopGroup();try {//创建客户端启动对象//注意客户端使用的不是 ServerBootstrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();//设置相关参数bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class) // 使用 NioSocketChannel 作为客户端的通道实现.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel channel) throws Exception {//加入处理器channel.pipeline().addLast(new NettyClientHandler());}});System.out.println("netty client start");//启动客户端去连接服务器端ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9000).sync();//对关闭通道进行监听channelFuture.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {/*** 当客户端连接服务器完成就会触发该方法** @param ctx*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {ByteBuf buf = Unpooled.copiedBuffer("HelloServer", CharsetUtil.UTF_8);ctx.writeAndFlush(buf);}//当通道有读取事件时会触发,即服务端发送数据给客户端@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf buf = (ByteBuf) msg;System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));System.out.println("服务端的地址: " + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 发生异常时, 关闭连接cause.printStackTrace();ctx.close();}
}
Netty优化
内置通信传输模式(Channel):
- NIO
- Epoll
- OIO
- Local
- Embedded
ChannelOption参数
共享Handler
@Sharable 注解
ByteBUF
TCP粘包/半包
触发条件
- 粘包: 密集发送小报文
- 半包: 发送大报文
解决方案
- 协商发送固定大小
- 发送的报文前拼接上报文大小
编解码
http web
序列化选择
应考虑因素:
- 是否跨语言
- 空间/时间
- 编解码速度
- 可读性
# select, poll, epoll区别
实现web客户端
1. 自动选择SSL服务端AutoSSLHttpServer
public class AutoSSLHttpServer {public static final int port = 6790; //设置服务端端口private static EventLoopGroup group = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接private static ServerBootstrap b = new ServerBootstrap();/*** Netty创建全部都是实现自AbstractBootstrap。* 客户端的是Bootstrap,服务端的则是ServerBootstrap。**/public static void main(String[] args) throws Exception {SelfSignedCertificate ssc = new SelfSignedCertificate();final SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(),ssc.privateKey()).build();try {b.group(group);b.channel(NioServerSocketChannel.class);b.childHandler(new AutoSSLServerHandlerInit(sslCtx)); //设置过滤器// 服务器绑定端口监听ChannelFuture f = b.bind(port).sync();System.out.println("服务端启动成功,端口是:"+port);// 监听服务器关闭监听f.channel().closeFuture().sync();} finally {group.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程}}
}
2. AutoSSLServerHandlerInit
public class AutoSSLServerHandlerInit extends ChannelInitializer<SocketChannel> {private final SslContext sslCtx;public AutoSSLServerHandlerInit(SslContext sslCtx) {this.sslCtx = sslCtx;}@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline ph = ch.pipeline();/*根据客户端的访问来决定是否启用SSL*/ph.addLast(new OptionalSslHandler(sslCtx));/*把应答报文 编码*/ph.addLast("encoder",new HttpResponseEncoder());/*把请求报文 解码*/ph.addLast("decoder",new HttpRequestDecoder());/*聚合http为一个完整的报文*/ph.addLast("aggregator",new HttpObjectAggregator(10*1024*1024));/*把应答报文 压缩,非必要*/ph.addLast("compressor",new HttpContentCompressor());// 真正处理业务Handlerph.addLast(new BusiHandler());}
}
3. 处理业务Handler-BusiHandler
public class BusiHandler extends ChannelInboundHandlerAdapter {/*** 发送的返回值* @param ctx 返回* @param context 消息* @param status 状态*/private void send(ChannelHandlerContext ctx, String context,HttpResponseStatus status) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,status,Unpooled.copiedBuffer(context,CharsetUtil.UTF_8));response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain;charset=UTF-8");ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String result="";FullHttpRequest httpRequest = (FullHttpRequest)msg;System.out.println(httpRequest.headers());try{//获取路径String path=httpRequest.uri();//获取bodyString body = httpRequest.content().toString(CharsetUtil.UTF_8);//获取请求方法HttpMethod method=httpRequest.method();System.out.println("接收到:"+method+" 请求");//如果不是这个路径,就直接返回错误if(!"/testNt".equalsIgnoreCase(path)){result="非法请求!"+path;send(ctx,result,HttpResponseStatus.BAD_REQUEST);return;}//如果是GET请求if(HttpMethod.GET.equals(method)){//接受到的消息,做业务逻辑处理...System.out.println("body:"+body);result="GET请求,应答:\n"+RespConstant.getNews();send(ctx,result,HttpResponseStatus.OK);return;}//如果是其他类型请求,如postif(HttpMethod.POST.equals(method)){//接受到的消息,做业务逻辑处理...//....return;}}catch(Exception e){System.out.println("处理请求失败!");e.printStackTrace();}finally{//释放请求httpRequest.release();}}/** 建立连接时,返回消息*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());}
}
4. 浏览器访问效果
实现一个聊天室
==