Netty入门代码示例(基于TCP服务)
Server端
package com.bierce.io.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;
public class NettyServer {public static void main(String[] args) throws InterruptedException {//创建BossGroup和WorkerGroup线程池组,均属于自旋状态EventLoopGroup bossGroup = new NioEventLoopGroup(); //负责连接请求处理EventLoopGroup workerGroup = new NioEventLoopGroup(); //进行业务处理try {//创建服务器端启动,通过链式编程配置相关参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //设置为服务端通道.option(ChannelOption.SO_BACKLOG,128) //设置线程队列等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() { //匿名创建通道初始对象@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyServerHandler()); //为workerGroup下的NioEventLoop对应管道pipeline设置自定义处理器}});System.out.println("Server is start Successful !!!");ChannelFuture cf = bootstrap.bind(6668).sync(); //绑定指定端口并同步处理cf.channel().closeFuture().sync(); //监听关闭通道方法}finally { //关闭线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取客户端发送的数据//ctx:上下文对象,包含管道pipeline,通道等信息//msg:即客户端发送的数据ByteBuf buf = (ByteBuf) msg; //ByteBuf是Netty提供的缓冲区,性能更高System.out.println("客户端发送过来的msg = " + buf.toString((CharsetUtil.UTF_8)));System.out.println("客户端地址 = " + ctx.channel().remoteAddress());}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //读取客户端信息完成后进行的业务处理
// super.channelReadComplete(ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Client!",CharsetUtil.UTF_8)); //将数据写到缓存并刷新}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close(); //处理异常需要关闭通道}
}
Client
package com.bierce.io.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
public class NettyClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //客户端事件循环组try {Bootstrap bootstrap = new Bootstrap(); //客户端启动对象bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) //客户端通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyClientHandler()); //添加自定义处理器}});System.out.println("Client Start Successful!!!");ChannelFuture sync = bootstrap.connect("127.0.0.1", 6668).sync();sync.channel().closeFuture().sync(); //监听关闭通道}finally {eventLoopGroup.shutdownGracefully();}}
}
class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { //通道就绪会触发该方法ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8)); //将数据写到缓存并刷新}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //读取服务端返回信息ByteBuf buf = (ByteBuf) msg; //ByteBuf是Netty提供的缓冲区,性能更高System.out.println("服务端发送过来的msg = " + buf.toString((CharsetUtil.UTF_8)));System.out.println("服务端地址 = " + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close(); //处理异常需要关闭通道}
}
运行结果
Netty入门代码示例(基于HTTP服务)
package com.bierce.io.netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;import java.net.URI;public class TestServer {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {//创建服务器端启动,通过链式编程配置相关参数ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //设置为服务端通道.childHandler(new TestServerInitializer()); //设置为自定义的初始化System.out.println("Server is start Successful !!!");ChannelFuture cf = bootstrap.bind(9999).sync(); //绑定指定端口并同步处理cf.channel().closeFuture().sync(); //监听关闭通道方法}finally { //关闭线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class TestServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//HttpServerCodec是Netty提供的处理Http的编-解码器 使用io.netty:netty-all:4.1.20Final版本,其他版本不支持会报错pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());//增加自定义的handlerpipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());}
}
class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {//读取客户端数据@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {if (httpObject instanceof HttpRequest){System.out.println("httpObject Type = " + httpObject.getClass());System.out.println("Client Address = " + channelHandlerContext.channel().remoteAddress());//对特定资源进行过滤HttpRequest httpRequest = (HttpRequest)httpObject;URI uri = new URI(httpRequest.getUri());if ("/favicon.ico".equals(uri.getPath())){System.out.println("favicon.ico资源不做响应");return;}//回复浏览器信息(http协议)ByteBuf content = Unpooled.copiedBuffer("Hello, I'm Server", CharsetUtil.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());channelHandlerContext.writeAndFlush(response);}}
}
运行结果
Netty心跳检测机制
package com.bierce.io.netty.heartbeat;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;
public class MyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler:Netty提供的处理空闲状态的处理器//readerIdleTime:多长时间没有读操作,会发送心跳检测包检测是否连接//writerIdleTime:多长时间没有写操作,会发送心跳检测包检测是否连接//allIdleTime:多长时间没有读写操作,会发送心跳检测包检测是否连接pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS));//IdleStateHandler触发后,将传递给下一个Handler的userEventTriggered方法去处理//通过自定义的Handler对空闲状态进一步处理pipeline.addLast(new MyServerHandler());}});ChannelFuture sync = bootstrap.bind(9999).sync().channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class MyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent){IdleStateEvent IdleStateEvent = (IdleStateEvent) evt;String eventType = null;switch (IdleStateEvent.state()){case READER_IDLE:eventType = "读空闲";break;case WRITER_IDLE:eventType = "写空闲";break;case ALL_IDLE:eventType = "读写空闲";break;}System.out.println(ctx.channel().remoteAddress() + "-已超时,超时类型为: " + eventType );System.out.println("Server will deal with it instantly...");//发生空闲则关闭当前通道ctx.close();}}
}
class NettyClient {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); //客户端事件循环组try {Bootstrap bootstrap = new Bootstrap(); //客户端启动对象bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) //客户端通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new NettyClientHandler()); //添加自定义处理器}});System.out.println("Client Start Successful!!!");ChannelFuture sync = bootstrap.connect("127.0.0.1", 9999).sync();sync.channel().closeFuture().sync(); //监听关闭通道}finally {eventLoopGroup.shutdownGracefully();}}
}
注意: 需要调整readerIdleTime|writerIdleTime|allIdleTime参数才会显示对应超时信息
Netty入门代码示例(基于WebSocket协议)
服务端
package com.bierce.websocket;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;import java.time.LocalDateTime;public class MyWebsocketServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//基于Http协议,所以需要http解码编码pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ChunkedWriteHandler()); //处理块方式的写操作//http传输过程数据量非常大时会分段,而HttpObjectAggregator可以将多个分段聚合pipeline.addLast(new HttpObjectAggregator(8192));//webSocket采用帧方式传输数据//WebSocketServerProtocolHandler作用是将http协议升级为ws协议,且保持长连接pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));pipeline.addLast(new MyWebsocketServerHandler());}});ChannelFuture sync = bootstrap.bind(9999).sync().channel().closeFuture().sync();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
class MyWebsocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {System.out.println("Server receive the Info: " + msg.text());ctx.channel().writeAndFlush(new TextWebSocketFrame("Server time " + LocalDateTime.now() + " --- " + msg.text()));}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("有客户端连接成功 --" + ctx.channel().id().asLongText()); //asLongText唯一值}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("有客户端已经离开 --" + ctx.channel().id().asLongText()); //asLongText唯一值}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异常Info:" + cause.getMessage());ctx.close();}
}
客户端(浏览器)
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Websocket</title>
</head>
<body>
<script>var socket;if (window.WebSocket) {socket = new WebSocket("ws://localhost:9999/hello");//相当于channelRead0,读取服务器端的消息socket.onmessage = function(ev){var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + ev.data;}//开启连接socket.onopen = function(ev){var rt = document.getElementById("responseText");rt.value = "开启连接成功!";}//连接关闭socket.onclose = function(ev){var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + "连接关闭成功!";}}//发送消息给服务器function send(msg){if(!window.socket){ //是否已创建socketreturn;}if(socket.readyState == WebSocket.OPEN){socket.send(msg);}else{alert("socket未连接");}}
</script><form onsubmit="return false"><textarea name="message" style="height:300px;width:300px"></textarea><input type="button" value="Send" onclick="send(this.form.message.value)"><textarea id="responseText" style="height:300px;width:300px"></textarea><input type="button" value="Clear" onclick="document.getElementById('responseText').value=''"></form>
</body>
</html>
效果图