1、首先引入依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version>
</dependency>
2、创建server层代码
2.1、编写服务端代码
public static void main(String[] args) {new Thread(()->{NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(new ServerChannelInitializer());ChannelFuture channelFuture = bootstrap.bind(8099).sync();channelFuture.channel().closeFuture().sync();}catch (Exception e){}finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}).start();
}
2.2、创建childHandler处理接收handler
public static class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(10,10,10, TimeUnit.SECONDS));pipeline.addLast(new ReadHandler());pipeline.addLast(new WriteHandler());pipeline.addLast(new ServerSendHandler());}
}
2.3、创建公共的读写处理器
public static class ReadHandler extends SimpleChannelInboundHandler<ByteBuf>{@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("from : "+ctx.channel().remoteAddress()+" data : "+ ByteBufUtil.hexDump(msg));msg.retain();ctx.fireChannelRead(msg);}
}public static class WriteHandler extends ChannelOutboundHandlerAdapter{@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (msg instanceof ByteBuf){ByteBuf buf = (ByteBuf)msg;System.out.println("to : "+ctx.channel().remoteAddress()+" data : "+ByteBufUtil.hexDump(buf));super.write(ctx, msg, promise);}}
}
2.4、创建服务端的简单业务处理器
public static class ServerSendHandler extends SimpleChannelInboundHandler<ByteBuf> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {private static AtomicInteger atomicInteger = new AtomicInteger(1);//这里简单模拟消息发送int a = msg.readInt();long b = msg.readLong();System.out.println("server ---- a:"+a +" b:"+b);ByteBuf buffer = Unpooled.buffer();buffer.writeInt(atomicInteger.getAndIncrement());buffer.writeLong(170);ctx.writeAndFlush(buffer);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.out.println("server disconnect "+ctx.channel().remoteAddress());ctx.channel().disconnect();ctx.fireChannelInactive();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.READER_IDLE) {// 读空闲超时,断开连接System.out.println("Channel read timeout, remote address:" + ctx.channel().remoteAddress().toString());ctx.close();} else if (state == IdleState.WRITER_IDLE) {ByteBuf buffer = Unpooled.buffer();buffer.writeInt(101);buffer.writeLong(171);ctx.writeAndFlush(buffer);System.out.println("Channel write timeout, remote address:" + ctx.channel().remoteAddress().toString());}}super.userEventTriggered(ctx, evt);}}
3、创建客户端代码连接
3.1、编写客户端代码
public static void main(String[] args) {new Thread(()->{client(new NioEventLoopGroup());}).start();
}public static void client(NioEventLoopGroup group){Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(new ClientChannelInitializer());try {bootstrap.connect("127.0.0.1",8099).addListener(new ClientChannelListener()).sync();} catch (InterruptedException e) {e.printStackTrace();}}
2.2、创建handler处理接收handler
public static class ClientChannelInitializer extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new IdleStateHandler(10,10,10, TimeUnit.SECONDS));pipeline.addLast(new ReadHandler());pipeline.addLast(new WriteHandler());pipeline.addLast(new ClientActHandler());}
}
3.3、创建客户端的简单业务处理器实现断线重连
public static class ClientActHandler extends SimpleChannelInboundHandler<ByteBuf>{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);ByteBuf buffer = Unpooled.buffer();buffer.writeInt(99);buffer.writeLong(180);ctx.writeAndFlush(buffer);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {int a = msg.readInt();long b = msg.readLong();System.out.println("client : "+a+" "+b);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);System.out.println("重新建立新的连接。。。。。");client(new NioEventLoopGroup());}
}
3.3、客户端监听器
public static class ClientChannelListener implements ChannelFutureListener{@Overridepublic void operationComplete(ChannelFuture future) throws Exception {boolean success = future.isSuccess();if (success){System.out.println("connect success ");}else {System.out.println("connect error");}}}
4、结果输出
可以看到因为读超时服务端断开连接,然后客户端又重新连接