Netty 介绍、使用场景及案例

Netty 介绍、使用场景及案例

在这里插入图片描述

1、Netty 介绍

https://github.com/netty/netty

Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可扩展的网络服务器和客户端。它是一个开源项目,最初由JBoss公司开发,现在由社区维护。Netty的设计和实现以处理高并发、低延迟、可靠性和灵活性为目标,因此非常适合构建各种网络应用,包括网络服务器、代理、聊天应用、在线游戏、实时通信和分布式系统等。

以下是一些Netty的主要特点和优势:

  1. 异步和事件驱动:Netty采用异步非阻塞的IO模型,允许处理大量并发连接而不会阻塞应用程序线程。它使用事件驱动的方式来处理网络事件,这使得编写高效的网络应用程序变得更容易。

  2. 高性能:Netty在性能方面表现出色,其底层的NIO实现充分利用了现代操作系统的异步IO特性,能够处理大量并发连接和数据传输,同时保持低延迟。

  3. 可扩展性:Netty提供了灵活的扩展机制,可以轻松地定制和扩展功能,以满足不同应用程序的需求。它支持各种协议和编解码器,如HTTP、WebSocket、TLS/SSL等。

  4. 安全性:Netty内置了对TLS/SSL的支持,可以加密网络连接以确保数据的安全传输。

  5. 多协议支持:Netty支持多种网络协议,包括TCP、UDP、HTTP、WebSocket等,使其适用于各种应用场景。

  6. 大型社区和活跃开发:Netty有一个庞大的开发社区,不断更新和改进框架,以适应新的技术和需求。

  7. 文档丰富:Netty提供了详细的文档和示例代码,使开发者可以快速上手并学习如何使用框架。

总之,Netty是一个强大的网络应用程序框架,适用于构建高性能、可扩展和可靠的网络应用。它在许多大型互联网公司和开源项目中被广泛使用,并且在处理网络通信方面具有广泛的应用。如果您需要开发网络应用程序或服务器,特别是需要处理大量并发连接和低延迟的场景,Netty是一个值得考虑的选择。

Netty 概述:
Netty是一个基于Java的高性能网络应用框架,它提供了简单而强大的网络编程接口,使得开发者可以轻松地构建各种类型的网络应用程序,包括服务器和客户端。Netty是一个开源项目,广泛用于构建可伸缩性、高性能、可维护性好的网络服务器和客户端应用。它提供了一组易于使用的API,用于处理底层的网络通信,包括TCP、UDP、HTTP等协议,以及各种编解码、数据传输和其他网络相关的功能。Netty的设计理念是简单而灵活,同时具备高性能和可扩展性。

2、原生 NIO 存在的问题

原生Java NIO(New I/O)提供了一种非阻塞I/O的编程方式,相对于传统的阻塞I/O(BIO)来说,它在某些情况下可以提供更好的性能,但也存在一些问题和挑战:

  1. 复杂性:NIO编程模型相对复杂,需要程序员处理底层的缓冲区、通道、选择器等概念,编写代码较为繁琐。

  2. 可读性:NIO代码通常相对难以理解和维护,因为需要处理很多底层细节,使得代码可读性较差。

  3. 错误处理:NIO中的错误处理相对复杂,需要处理各种异常和错误状态,容易引入bug。

  4. 编程难度:NIO编程难度较大,需要处理事件驱动的异步编程模型,容易出现并发问题。

  5. 性能限制:虽然NIO可以提供非阻塞I/O,但在高并发和高负载情况下,仍然存在性能瓶颈,需要合理的线程管理和资源调度。

Netty作为一个网络编程框架,通过对原生NIO的封装和优化,解决了上述问题,提供了更加简洁、高效、可维护的网络编程接口,使得开发者能够更容易地构建高性能的网络应用。它的异步、事件驱动、高性能和可扩展性等特点使得它在网络编程领域得到广泛应用。

3、Netty 线程模型

线程模型基本介绍:

  1. 传统阻塞I/O服务模型: 传统的阻塞I/O服务模型是最简单的,每个连接都需要一个独立的线程来处理,这导致了线程数量的快速增长,对系统资源的浪费。

  2. Reactor模式: Reactor模式是一种基于事件驱动的模型,它通过一个事件循环来处理所有的I/O操作。它通常包括一个主线程(Reactor)和多个工作线程,主线程负责接收连接,工作线程负责处理I/O操作。

不同的线程模式对程序性能的影响:

不同的线程模式对程序性能有显著的影响。传统阻塞I/O服务模型通常会导致资源浪费和性能下降,因为每个连接都需要一个线程,线程的创建和销毁开销很大。

Reactor模式通过事件驱动的方式可以显著提高性能,特别是在高并发情况下。但是,Reactor模式的性能仍然受限于单个主线程的处理能力。

Netty线程模型:

Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型包括多个Reactor。以下是Netty线程模型的一些特点:

  1. 单Reactor单线程: Netty可以采用单Reactor单线程模型,这是一种简单的模型,适用于处理低并发的情况。主Reactor负责接收连接,子Reactor负责处理I/O操作。

  2. 单Reactor多线程: 这种模型使用多个工作线程来处理I/O操作,可以提高并发性能。主Reactor接收连接并将其分发给工作线程处理。

  3. 主从Reactor多线程: 这是Netty中最常用的模型,包括一个主Reactor和多个从Reactor,以及每个从Reactor对应的工作线程池。主Reactor负责接收连接,从Reactor负责处理I/O操作。这种模型可以在高并发情况下充分利用多核处理器,提高性能。

Netty线程模型的优越性:

Netty的线程模型的优越性在于其高度可扩展性和性能。通过采用主从Reactor多线程模型,Netty可以轻松地适应高并发的情况,同时充分利用多核处理器,提供出色的性能表现。此外,Netty还提供了异步事件处理和内存管理等高级功能,使得开发网络应用变得更加方便和高效。

4、案例:Netty TCP服务

它抽象出了两组线程池,即BossGroup和WorkerGroup,来处理不同的网络任务。

  1. BossGroup和WorkerGroup都是NioEventLoopGroup类型,代表了事件循环组。每个NioEventLoopGroup包含多个NioEventLoop,每个NioEventLoop都是一个不断循环执行处理任务的线程。

  2. BossGroup负责接收客户端的连接请求,它的主要工作是轮询监听accept事件,当有新的连接请求时,会处理该事件,并生成NioSocketChannel,然后将它注册到某个WorkerGroup的NioEventLoop上的Selector中。这个过程确保了每个连接的读写操作都会由WorkerGroup来处理。

  3. WorkerGroup负责处理网络的读写操作,它的主要工作是轮询监听read和write事件,当有数据需要读写时,会处理对应的事件,即在NioSocketChannel上执行读写操作。此外,WorkerGroup也负责处理任务队列中的任务,这些任务通常是业务逻辑相关的任务。

  4. 每个BossGroup下的NioEventLoop循环执行的步骤包括轮询accept事件、处理accept事件建立连接、继续处理任务队列中的任务。

  5. 每个WorkerGroup下的NioEventLoop循环执行的步骤包括轮询read和write事件、处理I/O事件、处理任务队列中的任务。

  6. 在处理业务逻辑时,每个WorkerGroup的NioEventLoop会使用管道(pipeline)来管理不同的处理器(Handler)。管道中包含了通道(channel),通过管道可以获取到对应的通道,从而处理具体的业务逻辑。

通过BossGroup和WorkerGroup的组合,实现了高效的网络通信处理。BossGroup专门处理连接请求,WorkerGroup专门处理读写操作和业务逻辑,通过事件循环的方式,实现了高性能的网络编程。管道和处理器的使用也使得开发者能够方便地定制和扩展网络应用程序。

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) throws Exception {// 创建BossGroup和WorkerGroup// 说明// 1. 创建两个线程组,bossGroup和workerGroup// 2. bossGroup只是处理连接请求,真正的和客户端业务处理,会交给workerGroup完成// 3. 两个都是无限循环// 4. bossGroup和workerGroup含有的子线程(NioEventLoop)的个数//    默认实际CPU核数 * 2EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认 CPU核数 * 2try {// 创建服务器端的启动对象,配置参数ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程来进行设置bootstrap.group(bossGroup, workerGroup) // 设置两个线程组.channel(NioServerSocketChannel.class) // 使用NioSocketChannel作为服务器的通道实现.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() {// 创建一个通道初始化对象(匿名对象)// 给pipeline设置处理器@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("客户SocketChannel hashCode=" + ch.hashCode()); // 可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应的NIOEventLoop的taskQueue或scheduleTaskQueuech.pipeline().addLast(new NettyServerHandler());}}); // 给我们的workerGroup的EventLoop对应的管道设置处理器System.out.println("...服务器 is ready...");// 绑定一个端口并同步生成了一个ChannelFuture对象(也就是立马返回这样一个对象)// 启动服务器(并绑定端口)ChannelFuture cf = bootstrap.bind(6668).sync();// 给cf注册监听器,监控我们关心的事件cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("监听端口 6668 成功");} else {System.out.println("监听端口 6668 失败");}}});// 对关闭通道事件进行监听cf.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;/*** 说明:* 1. 我们自定义一个Handler需要继承Netty规定好的某个HandlerAdapter(规范)* 2. 这时我们自定义一个Handler,才能称为一个handler*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {// 读取数据事件(这里我们可以读取客户端发送的消息)/*1. ChannelHandlerContext ctx:上下文对象,含有管道pipeline,通道channel,地址2. Object msg:就是客户端发送的数据,默认Object*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());System.out.println("server ctx =" + ctx);System.out.println("看看channel和pipeline的关系");Channel channel = ctx.channel();ChannelPipeline pipeline = ctx.pipeline(); // 本质是一个双向链表// 将msg转成一个ByteBuf// ByteBuf是Netty提供的,不是NIO的ByteBuffer.ByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));System.out.println("客户端地址:" + channel.remoteAddress());}// 数据读取完毕@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// writeAndFlush是write + flush// 将数据写入到缓存,并刷新// 一般讲,我们对这个发送的数据进行编码ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));}// 发生异常后,一般是需要关闭通道@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

NettyClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws Exception {//客户端需要一个事件循环组EventLoopGroup group = new NioEventLoopGroup();try {//创建客户端启动对象//注意客户端使用的不是 ServerBootstrap 而是 BootstrapBootstrap bootstrap = new Bootstrap();//设置相关参数bootstrap.group(group) //设置线程组.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器}});System.out.println("客户端 ok..");//启动客户端去连接服务器端//关于 ChannelFuture 要分析,涉及到netty的异步模型ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();//对关闭通道事件  进行监听channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();}}
}

NettyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** 说明:* 1. 当通道就绪时会触发channelActive方法,用于向服务器发送初始消息。* 2. 当通道有读取事件时会触发channelRead方法,用于处理从服务器接收到的消息。* 3. 如果发生异常,会触发exceptionCaught方法,通常会在发生异常时关闭连接。*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {// 当通道就绪时会触发该方法@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("Client " + ctx);ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));}// 当通道有读取事件时会触发@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println("Server's reply: " + buf.toString(CharsetUtil.UTF_8));System.out.println("Server's address: " + ctx.channel().remoteAddress());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

5、Task使用场景

任务队列中的Task具有三种典型的使用场景,这些场景展示了Netty异步模型的优势和灵活性:

  1. 用户程序自定义的普通任务:在这种情况下,用户可以将自定义的任务提交到任务队列中,这些任务可以是耗时的操作,但由于Netty的异步模型,不会阻塞主线程。举例来说,可以在一个连接处理器中,将一些需要耗时处理的任务交给任务队列处理,以确保不会阻塞其他连接的处理。

  2. 用户自定义定时任务:Netty允许用户定义定时任务,这些任务会在一定的延迟后执行。这对于执行定期操作非常有用,例如定时向客户端发送心跳消息或执行其他周期性任务。

以下是前两种场景的示例代码:

// 解决方案1: 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {@Overridepublic void run() {// 执行耗时操作}
});// 解决方案2: 用户自定义定时任务
ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {// 延迟一定时间后执行任务}
}, 5, TimeUnit.SECONDS);
  1. 非当前Reactor线程调用Channel的各种方法:有时候,需要在业务线程中处理某个特定连接的操作,例如向特定用户推送消息。这会导致非当前Reactor线程调用Channel的方法,这种情况下,Netty会将这些操作提交到任务队列中,以确保线程安全和异步执行。(外部线程调用:有时,其他部分的代码可能会在非当前Reactor线程上调用Netty的Channel方法,例如在业务线程中找到特定用户的连接并向其发送消息。这也属于非当前Reactor线程的情况。)

无论是定时任务、自定义任务还是外部线程调用,Netty都提供了机制来确保线程安全和异步执行,以避免对Reactor线程的阻塞和提高性能。任务队列和异步执行是Netty的核心特性,使其成为高性能和可扩展的网络编程框架。

6、案例:Netty HTTP服务

TestServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class TestServer {public static void main(String[] args) throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());ChannelFuture channelFuture = serverBootstrap.bind(16668).sync();channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

TestServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;public class TestServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//向管道加入处理器//得到管道ChannelPipeline pipeline = ch.pipeline();//加入一个netty 提供的httpServerCodec codec =>[coder - decoder]//HttpServerCodec 说明//1. HttpServerCodec 是netty 提供的处理http的 编-解码器pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());//2. 增加一个自定义的handlerpipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());System.out.println("ok~~~~");}
}

TestHttpServerHandler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;import java.net.URI;/*
说明
1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
2. HttpObject 客户端和服务器端相互通讯的数据被封装成 HttpObject*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {//channelRead0 读取客户端数据@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx.pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel());System.out.println("当前ctx的handler=" + ctx.handler());//判断 msg 是不是 httprequest请求if (msg instanceof HttpRequest) {System.out.println("ctx 类型=" + ctx.getClass());System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode());System.out.println("msg 类型=" + msg.getClass());System.out.println("客户端地址" + ctx.channel().remoteAddress());//获取到HttpRequest httpRequest = (HttpRequest) msg;//获取uri, 过滤指定的资源URI uri = new URI(httpRequest.uri());if ("/favicon.ico".equals(uri.getPath())) {System.out.println("请求了 favicon.ico, 不做响应");return;}//回复信息给浏览器 [http协议]ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);//构造一个http的相应,即 httpresponseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());//将构建好 response返回ctx.writeAndFlush(response);}}
}

7、Netty的核心模块和组件

  1. Bootstrap和ServerBootstrap:Bootstrap用于客户端的启动引导,而ServerBootstrap用于服务端的启动引导。它们用于配置和启动整个Netty应用程序,包括串联各个组件。

  2. Channel和ChannelFuture:Channel表示一个网络连接的通道,它可以用于执行网络I/O操作。ChannelFuture用于处理异步操作,可以注册监听器来处理操作成功、失败或取消时的事件。

  3. ChannelHandler:ChannelHandler是一个接口,用于处理I/O事件或拦截I/O操作,并将它们转发到ChannelPipeline中的下一个处理程序。通常需要自定义ChannelHandler来实现业务逻辑。

  4. ChannelPipeline:ChannelPipeline是一组ChannelHandler的集合,它负责处理和拦截入站和出站的事件和操作。它的作用是串联和管理ChannelHandler,允许用户完全控制事件的处理方式。

  5. Selector:Selector是Netty基于的多路复用机制,用于实现非阻塞I/O。它可以同时监听多个通道上的事件,以便高效地管理多个连接。

  6. EventLoopGroup:EventLoopGroup是一组EventLoop的抽象,用于管理多个EventLoop线程。在Netty中,通常会有两个EventLoopGroup,一个用于Boss线程,负责接受客户端连接,另一个用于Worker线程,负责处理I/O操作。

  7. ChannelOption:ChannelOption用于设置Channel的配置参数,例如接收缓冲区大小等。

  8. ByteBuf:ByteBuf是Netty用于操作缓冲区的工具类,它是数据容器,用于存储和处理数据。

8、案例:Netty 群聊系统

服务器端:可以监测用户上线,离线,并实现消息转发功能
客户端:通过 channel 可以非阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息

GroupChatServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class GroupChatServer {private int port; //监听端口public GroupChatServer(int port) {this.port = port;}//编写run方法,处理客户端的请求public void run() throws Exception {//创建两个线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLooptry {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//获取到pipelineChannelPipeline pipeline = ch.pipeline();//向pipeline加入解码器pipeline.addLast("decoder", new StringDecoder());//向pipeline加入编码器pipeline.addLast("encoder", new StringEncoder());//加入自己的业务处理handlerpipeline.addLast(new GroupChatServerHandler());}});System.out.println("netty 服务器启动");ChannelFuture channelFuture = b.bind(port).sync();//监听关闭channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new GroupChatServer(7000).run();}	
}

GroupChatServerHandler

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {//这样写还要自己遍历Channel//public static List<Channel> channels = new ArrayList<Channel>();//使用一个hashmap 管理私聊(私聊本案例并未实现,只是提供个思路)//public static Map<String, Channel> channels = new HashMap<String,Channel>();//定义一个channle 组,管理所有的channel//GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//handlerAdded 表示连接建立,一旦连接,第一个被执行//将当前channel 加入到  channelGroup@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();//将该客户加入聊天的信息推送给其它在线的客户端//该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历channelGroup.add(channel);channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");//私聊如何实现
//         channels.put("userid100",channel);}//断开连接, 将xx客户离开信息推送给当前在线的客户@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n");channelGroup.remove(channel);System.out.println("channelGroup size" + channelGroup.size());}//表示channel 处于活动状态, 提示 xx上线@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//这个是给服务端看的,客户端上面已经提示xxx加入群聊了System.out.println(ctx.channel().remoteAddress() + " 上线了~");}//表示channel 处于不活动状态, 提示 xx离线了@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println(ctx.channel().remoteAddress() + " 离线了~");}//读取数据,转发给在线的每一个客户端@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {//获取到当前channelChannel channel = ctx.channel();//这时我们遍历channelGroup, 根据不同的情况,回送不同的消息channelGroup.forEach(ch -> {if (channel != ch) { //不是当前的channel,转发消息ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n");} else {//回显自己发送的消息给自己ch.writeAndFlush("[自己]发送了消息" + msg + "\n");}});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//关闭通道ctx.close();}
}

GroupChatClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;public class GroupChatClient {//属性private final String host;private final int port;public GroupChatClient(String host, int port) {this.host = host;this.port = port;}public void run() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//得到pipelineChannelPipeline pipeline = ch.pipeline();//加入相关handlerpipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());//加入自定义的handlerpipeline.addLast(new GroupChatClientHandler());}});ChannelFuture channelFuture = bootstrap.connect(host, port).sync();//得到channelChannel channel = channelFuture.channel();System.out.println("-------" + channel.localAddress() + "--------");//客户端需要输入信息,创建一个扫描器Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String msg = scanner.nextLine();//通过channel 发送到服务器端channel.writeAndFlush(msg + "\r\n");}} finally {group.shutdownGracefully();}}public static void main(String[] args) throws Exception {new GroupChatClient("127.0.0.1", 7000).run();}
}

GroupChatClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {//从服务器拿到的数据@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(msg.trim());}
}

9、案例:Netty 心跳检测

MyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;public class MyServer {public static void main(String[] args) throws Exception {//创建两个线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 并没有传入参数,因此默认会创建多个NioEventLoop(通常是CPU核心数的两倍)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class);//在bossGroup增加一个日志处理器serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//加入一个netty 提供 IdleStateHandler/*说明1. IdleStateHandler 是netty 提供的处理空闲状态的处理器2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接5. 文档说明triggers an {@link IdleStateEvent} when a {@link Channel} has not performedread, write, or both operation for a while.6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理,通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)7.handlerRemoved有时候是无法感知连接断掉,所以还是需要心跳包的检测来判断连接是否还有效*/pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));//加入一个对空闲检测进一步处理的handler(自定义)pipeline.addLast(new MyServerHandler());}});//启动服务器ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

MyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;public class MyServerHandler extends ChannelInboundHandlerAdapter {/*** @param ctx 上下文* @param evt 事件* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {//将  evt 向下转型 IdleStateEventIdleStateEvent event = (IdleStateEvent) evt;String eventType = null;switch (event.state()) {case READER_IDLE:eventType = "读空闲";break;case WRITER_IDLE:eventType = "写空闲";break;case ALL_IDLE:eventType = "读写空闲";break;}System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);System.out.println("服务器做相应处理..");//如果发生空闲,我们关闭通道// ctx.channel().close();}}
}

10、案例:WebSocket 服务器和客户端长连接

MyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;public class MyServer {public static void main(String[] args) throws Exception {//创建两个线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLooptry {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//因为基于http协议,使用http的编码和解码器pipeline.addLast(new HttpServerCodec());//http是以块方式写,添加ChunkedWriteHandler处理器pipeline.addLast(new ChunkedWriteHandler());/*说明1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求*/pipeline.addLast(new HttpObjectAggregator(8192));/*说明1. 对应websocket ,它的数据是以 帧(frame) 形式传递2. 可以看到WebSocketFrame 下面有六个子类3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接5. 是通过一个 状态码 101*/pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));//自定义的handler ,处理业务逻辑pipeline.addLast(new MyTextWebSocketFrameHandler());}});//启动服务器ChannelFuture channelFuture = serverBootstrap.bind(7888).sync();channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

MyTextWebSocketFrameHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import java.time.LocalDateTime;//这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {System.out.println("服务器收到消息 " + msg.text());//回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text()));}//当web客户端连接后, 触发方法@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {//id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("异常发生 " + cause.getMessage());ctx.close(); //关闭连接}
}

页面

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<script>var socket;//判断当前浏览器是否支持websocketif (window.WebSocket) {//go onsocket = new WebSocket("ws://localhost:7888/hello");//相当于channelReado, ev 收到服务器端回送的消息socket.onmessage = function (ev) {var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + ev.data;}//相当于连接开启(感知到连接开启)socket.onopen = function (ev) {var rt = document.getElementById("responseText");rt.value = "连接开启了.."}//相当于连接关闭(感知到连接关闭)socket.onclose = function (ev) {var rt = document.getElementById("responseText");rt.value = rt.value + "\n" + "连接关闭了.."}} else {alert("当前浏览器不支持websocket")}//发送消息到服务器function send(message) {if (!window.socket) { //先判断socket是否创建好return;}if (socket.readyState == WebSocket.OPEN) {//通过socket 发送消息socket.send(message)} else {alert("连接没有开启");}}
</script>
<form onsubmit="return false"><textarea name="message" style="height: 300px; width: 300px"></textarea><input type="button" value="发送消息" onclick="send(this.form.message.value)"><textarea id="responseText" style="height: 300px; width: 300px"></textarea><input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/236055.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

k8s--集群调度(kube-scheduler)

了解kube-scheduler 由之前博客可知kube-scheduler是k8s中master的核心组件之一 scheduler&#xff1a;负责调度资源。把pod调度到node节点。他有两种策略&#xff1a; 预算策略&#xff1a;人为部署&#xff0c;指定node节点去部署新建的pod 优先策略&#xff1a;通过算法选…

怎么做微信秒活动_掀起购物狂潮,引爆品牌影响力

微信秒杀活动&#xff1a;掀起购物狂潮&#xff0c;引爆品牌影响力 在数字化时代&#xff0c;微信已经成为人们日常生活中不可或缺的一部分。作为中国最大的社交媒体平台&#xff0c;微信不仅为人们提供了便捷的通讯方式&#xff0c;还为商家提供了一个广阔的营销舞台。其中&a…

vue3的福音框架arco.design

前言&#xff1a; 在vue2于2023年底正式宣布不在维护&#xff0c;vue3使用越来越频繁的时刻&#xff0c;我们实现项目的辅助框架也越来越多。element, iview, antd 等经典框架继续风靡一时&#xff0c;不过也有很多好的框架&#xff0c;功能也强大&#xff0c;比如我们今天说的…

2.【CPP】入门(宏||内联函数||拷贝构造||析构函数||构造函数)

0x01.引言 1.实现一个宏函数ADD #define ADD(x,y) ((x)(y))//宏是预编译阶段完成替换&#xff0c;注意括号2.宏的优缺点 优点&#xff1a; 1.增强代码的复用性 2.宏函数不用建立栈帧&#xff0c;提高性能 缺点&#xff1a; 1.不方便调试 2.没有安全检查 0x02.内联函数 1.以空…

什么是冒泡排序?如何实现?

一、是什么 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff0c;是一种计算机科学领域的较简单的排序算法 冒泡排序的思想就是在每次遍历一遍未排序的数列之后&#xff0c;将一个数据元素浮上去&#xff08;也就是排好了一个数据&#xff09; 如同碳酸饮料中二氧化碳的…

摄像头视频录制程序使用教程(Win10)

摄像头视频录制程序-Win10 &#x1f957;介绍&#x1f35b;使用说明&#x1f6a9;config.json 说明&#x1f6a9;启动&#x1f6a9;关闭&#x1f6a9;什么时候开始录制&#xff1f;&#x1f6a9;什么时候触发录制&#xff1f;&#x1f6a9;调参 &#x1f957;介绍 检测画面变化…

“数据要素×”行动计划发布,粮食安全监管如何应变?

近日&#xff0c;国家数据局发布“数据要素”三年行动计划&#xff08;2024-2026年&#xff09;&#xff0c;在“数据要素现代农业“部分提到&#xff1a;提升农业综合生产能力&#xff0c;支持农业生产经营主体和相关服务企业融合利用气象、土壤、农事作业、病虫害、市场等数据…

FineBI实战项目一(17):热门商品Top10分析开发

点击新建组件&#xff0c;创建热门商品Top10组件。 选择柱状图&#xff0c;拖拽cnt&#xff08;总数&#xff09;到横轴&#xff0c;拖拽goodName到纵轴。 选择排序规则。 修改横轴和纵轴的标签名称 切换到仪表板&#xff0c;拖拽组件到仪表板 效果如下&#xff1a;

别再纠结,这8款设计工具助你轻松绘制原型图!

原型图设计工具有很多优点。除了帮助设计师快速与客户达成协议&#xff0c;避免项目前景的冲突外&#xff0c;原型图设计工具还可以让客户看到正在创建的内容。如果需要更改&#xff0c;原型图设计工具也可以轻松完成。本文快速总结了8种原型图设计工具。无论你是专业设计师还是…

使用AUTOSAR来开发汽车基础软件的优点

1、高质量。以前我们采用手写代码的方式&#xff0c;是几个工程师在战斗。现在我们采用平台&#xff0c;BSW代码都是供应商提供的&#xff0c;我们相当于后面还有一个团队陪着我们在战斗。 2、低成本。大家都说采用AUTOSAR平台好贵&#xff0c;但是从长远来看是值得的&#xff…

Windows安全基础:认证基础知识

目录 Windows凭据 Windows访问控制模型 访问令牌&#xff1a; 安全标识符&#xff08;SID&#xff09;&#xff1a; 安全描述符&#xff1a; 令牌安全防御 1、禁止域管理员异机登录 2、开启“审核进程创建”策略 Windows凭据 SSPI&#xff08;Security Support Provide…

华为ipv4+ipv6双栈加isis多拓扑配置案例

实现效果&#xff1a;sw1中的ipv4和ipv6地址能ping通sw2中的ipv4和ipv6地址 R2-R4为存IPV4连接&#xff0c;其它为ipv6和ipv4双连接 sw1 ipv6 interface Vlanif1 ipv6 enable ip address 10.0.11.1 255.255.255.0 ipv6 address 2001:DB8:11::1/64 interface MEth0/0/1 inter…

登录模块的实现

一.前期的准备工作 1.页面的布局 (1)表单的校验: 利用element-ui提供的文档绑定rules规则后实现校验 (2)跨域的配置 &#xff1a; 利用proxy代理来解决跨域的问题 (3)axios拦截器的配置 两个点:1. 在请求拦截的成功回调中,如果token,因为调用其它的接口需要token才能调取。 在请…

二刷Laravel 教程(用户注册)总结Ⅳ

一、显示用户信息 1&#xff09;resource Route::resource(users, UsersController); 相当于下面这7个路由 我们先用 Artisan 命令查看目前应用的路由&#xff1a; php artisan route:list 2&#xff09; compact 方法 //我们将用户对象 $user 通过 compact 方法转化为一个关联…

数据聚合、自动补全、数据同步、es集群

目录 数据聚合 聚合的分类 DSL实现bucket聚合 DSL实现Metrics聚合 RestAPI实现聚合 多条件聚合 带过滤条件的聚合 自动补全 安装拼音分词器 自定义分词器 completion suggester查询 修改索引库数据结构 RestAPI实现自动补全查询 实现搜索框自动补全 数据同步 数…

华为HarmonyOS 创建第一个鸿蒙应用 运行Hello World

使用DevEco Studio创建第一个项目 Hello World 1.创建项目2.预览 Hello World3.安装模拟器4.运行 Hello World 1.创建项目 创建第一个项目&#xff0c;命名为HelloWorld&#xff0c;点击Finish 选择Empty Ability模板&#xff0c;点击Next Hello World 项目已经成功创建…

【VRTK】【VR开发】【Unity】19-VRTK实现旋转运动

课程配套学习项目源码资源下载 https://download.csdn.net/download/weixin_41697242/88485426?spm=1001.2014.3001.5503 【背景】 在实际开发中,旋转运动也是时常需要模拟的重要运动类型。常见的场景有开关门,方向盘轮胎以及拉动拉杆等等。 旋转运动的实现可以基于物理系…

使用 matlab 求解最小二乘问题

有约束线性最小二乘 其标准形式为&#xff1a; min ⁡ x 1 2 ∥ C x − d ∥ 2 2 \mathop {\min }\limits_x \quad \frac{1}{2}\left\| Cx-d \right\|_2^2 xmin​21​∥Cx−d∥22​ 约束条件为&#xff1a; A ⋅ x ≤ b A e q ⋅ x b e q l b ≤ x ≤ u b \begin{aligned} …

基于多反应堆的高并发服务器【C/C++/Reactor】(中)HttpResponse的定义和初始化 以及组织 HttpResponse 响应消息

一、HttpResponse的定义 1.定义状态码枚举 // 定义状态码枚举 enum HttpStatusCode {Unknown 0,OK 200,MovedPermanently 301,MovedTemporarily 302,BadRequest 400,NotFound 404 }; 2.HTTP 响应报文格式 这个数据块主要是分为四部分 第一部分是状态行第二部分是响应…

游戏、设计选什么内存条?光威龙武系列DDR5量大管饱

如果你是一位PC玩家或者创作者&#xff0c;日常工作娱乐中&#xff0c;确实少不了大容量高频内存的支持&#xff0c;这样可以获得更高的工作效率&#xff0c;光威龙武系列DDR5内存条无疑是理想之选。它可以为计算机提供强劲的性能表现和稳定的运行体验&#xff0c;让我们畅玩游…