Netty实现自动回复步骤
主要分成五步
1、创建EventLoopGroup实现循环组 管理EventLoop线程
2、创建Bootstrap ,Bootstrap对于服务端而言,先后设置其中的线程组group、通道channel、处理器handler、客户端通道对应的处理器childHandler
3、自定义服务器接收和响应客户端处理器和初始化器
4、返回Future对象,通过Future来获取操作的结果,比如:将关闭的通道也设置成异步的
5、优雅的关闭事件循环组
这里我说明一下我觉得很重要 ChannelHandlerContext通道处理器上下文 、ChannelPipeline 管道、Hander处理器的关系,下面的一行代码可以说明
pipeline.addLast(new XXXHandler());其实pipeline添加的是ChannelHandlerContext 包裹的Hander 就像这样 new ChannelHandlerContext (new XXXHandler()) ,这个对象添加到了pipeline
1、服务端代码
创建MyChatServer 类作为服务端
public class MyChatServer {public static void main(String[] args) {// 创建Reactor// 用来管理channel 监听事件 ,是无限循环的事件组(线程池) 可以设置线程数,cup核 * numEventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);EventLoopGroup workerLoopGroup = new NioEventLoopGroup();// 服务端的启动对象ServerBootstrap serverBootstrap = new ServerBootstrap();// 设置相关参数 这是一个链式编程serverBootstrap.group(bossLoopGroup,workerLoopGroup)// 声明通道类型.channel(NioServerSocketChannel.class)// 设置处理器 我这里设置了netty提供的Handler 处理器.handler(new LoggingHandler(LogLevel.INFO))// 当连接被阻塞 ,SO_BACKLOG 阻塞队列的长度.option(ChannelOption.SO_BACKLOG,128)// 设置连接保持活跃的状态.childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(new MyChatServerInitializer());System.out.println("服务端初始化完成");// 启动需要设置端口 还需要设置是异步启动try {// 设置异步的futureChannelFuture future = serverBootstrap.bind(8899).sync();// 将关闭的通道也设置成异步的// 阻塞finally 中的代码future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}finally {// 优雅关闭bossLoopGroup.shutdownGracefully();workerLoopGroup.shutdownGracefully();}}
}
1.1 自定义初始化器
下面是自定义初始化器的代码,自定义handler的设置,需要先有通道初始化器ChannelInitializer,实现其中的通道初始化方法,具体逻辑为 获取通道中的管道,然后加入handler
public class MyChatServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 创建管道ChannelPipeline pipeline = socketChannel.pipeline();// 先设置解码器 后设置编码器// 基于lineDelimiter 分隔符的一种解码器pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// 设置处理器 pipeline 管理的是由handlerContext包裹的handle add[handlerContext(hander)]// pipeline 是双向链表 headContext头节点 tailContext尾节点pipeline.addLast(new MyChatServerHandler());}
}
1.2 自定义handler处理器
handler的逻辑如下
a) 继承SimpleChannelInboundHandler,还可以设置泛型对应需要处理的msg的类型是除继承适配器之外自定义处理器的另一种方法。
b) 重写其中的方法,channelActive 、channelRead、channelReadComplete,分别对应于通道创建、读事件发生、读事件完成三个时间点。
c) 方法的参数有一个 ChannelHandlerContext ,是处理器的上下文,除了获取通道和管道外,可以调用writeAndFlush() 直接写入数据。
/*** 自定义处理器的另一种方法* SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter的子类*/
public class MyChatServerHandler extends SimpleChannelInboundHandler {// 当多个通道传入handler , 使用通道组的管理方法// GlobalEventExecutor 全局事件执行器//INSTANCE 代表的是单例private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** channelRead0 处理读取数据逻辑的方法** @param ctx 通道处理器上下文 它是连接pipeline 和 handler 中间角色* @param msg* @throws Exception** 读取数据, 并广播给其它客户端* 步骤:* 接收自身的Channel* 这个方法可以帮我们自动释放ByteBuf的内存空间*/protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("[客户端]"+msg);Channel selfChannel = ctx.channel();//提前处理一些消息
// ByteBuf buf = (ByteBuf)msg;// 迭代channel通道组Iterator<Channel> it = channelGroup.iterator();while (it.hasNext()){Channel ch = it.next();if(selfChannel != ch){ // 如果这个通道不是自身通道 则ch.writeAndFlush("[服务器] - "+selfChannel.remoteAddress()+"发送的消息:"+msg);continue;}// 如果是自身的通道 【客户端发给服务端,服务端回应客户端的消息】String answer ;// 如果是发送消息的通道if (((String)msg).length() == 0 ){answer = "期待你说点什么\r\n";}else {answer = "你说的是这句话嘛:" + msg + "?\r\n";}ch.writeAndFlush(answer);}}/*** 连接成功, 此时通道是活跃的时候触发* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {LocalDate today = LocalDate.now();String dateStr = today.toString(); // 默认格式为 "yyyy-MM-dd"ctx.writeAndFlush("Welcome to server-- now :"+dateStr+"\r\n");}/*** 通道不活跃 ,用于处理用户下线的逻辑* @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress()+"下线了\r\n");}/**** @param ctx 通道处理器上下文* @throws Exception* 连接刚刚建立时 ,第一个被执行的方法,*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("[服务端地址]:"+ctx.channel().remoteAddress()+"连接成功\r\n");// 添加到通道组中管理channelGroup.add(ctx.channel());}/**** @param ctx 通道处理器上下文* @throws Exception* 当连接断开 最后执行的方法* 连接断开时 , channel 会自动从 通道组中移除*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("[服务端地址]:"+ctx.channel().remoteAddress()+"断开连接\r\n");}/*** 通用异常处理类* @param ctx 通道处理器上下文* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 关闭ctx.close();}
}
2、客户端代码
代码和服务端代码变化不大。直接上代码看看
不同的是,客户端事件循环组需要一个,同样也需要自定义初始化器和自定义处理器
public class MyChatClient {public static void main(String[] args) {// 客户端事件循环组 只需要一个EventLoopGroup group = new NioEventLoopGroup();// 客户端启动器Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class)// 自定义通道初始化器.handler(new MyChatClientInitializer());System.out.println("客户端初始化完成了");try {// 设置异步的future 需要连接的服务端参数ChannelFuture future = bootstrap.connect("127.0.0.1", 8899).sync();// 将关闭的通道也设置成异步的// 阻塞finally 中的代码
// future.channel().closeFuture().sync();// 因为 本次业务是可以多次发送消息的业务, 所以上面的阻塞关闭 需要注释掉// 通过键盘输入发送的数据 BufferedReader 来实现BufferedReader br = new BufferedReader(new InputStreamReader(System.in));for (;;) {String msg = br.readLine();// 发送到通道之中future.channel().writeAndFlush(msg+"\r\n");}} catch (Exception e) {e.printStackTrace();}finally {group.shutdownGracefully();}}
}
2.1 自定义初始化器
这个部分引入了Netty 编解码器
DelimiterBasedFrameDecoder 分隔符的一种解码器
StringEncoder / StringDecoder 对字符串处理
ObjectEncoder / ObjectDecoder 对java对象处理
最后还需要一个自定义的处理器MyChatClientHandler 后面有代码
public class MyChatClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();// 先设置解码器 后设置编码器// 基于lineDelimiter 分隔符的一种解码器pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));// 添加处理器pipeline.addLast(new MyChatClientHandler());}
}
2.2 自定义处理器
/*** SimpleChannelInboundHandler* 客户端需要添加 msg 泛型*/
public class MyChatClientHandler extends SimpleChannelInboundHandler<String> {/**** @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {// 接收服务的消息System.out.println("[服务端]:"+msg);}
}
最后展示
业务:自动回应客户端是否说的是msg ,如果直接回车回应“期待你说点什么”
结语:希望可以给@你带来一点点帮助,资料来源于网上学习视频总结。