文章目录
- 1 摘要
- 2 核心 Maven 依赖
- 3 核心代码
- 3.1 服务端事务处理器 (DemoNettyServerHandler)
- 3.2 服务端连接类(InitNettyServer)
- 3.3 客户端事务处理器(DemoNettyClientHandler)
- 3.4 客户端连接类(DemoNettyClient)
- 4 测试
- 4.1 测试流程
- 4.2 测试结果
- 4.3 测试结论
- 5 推荐参考资料
- 6 Github 源码
1 摘要
Netty 作为一款 NIO 底层通讯框架,在高并发场景有着广泛的应用,众多消息中间件内部也是基于 Netty 进行开发。本文将介绍基于 SpringBoot 2.7 集成 Netty 来模拟服务端与客户端进行通讯。
2 核心 Maven 依赖
demo-netty-server/pom.xml
<!-- Netty --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>${netty.version}</version></dependency>
netty 版本:
<netty.version>4.1.96.Final</netty.version>
3 核心代码
3.1 服务端事务处理器 (DemoNettyServerHandler)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoNettyServerHandler.java
package com.ljq.demo.springboot.netty.server.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @Description: Netty 服务处理器* @Author: junqiang.lu* @Date: 2023/8/18*/
@Slf4j
@Component
// 标记该类实例可以被多个 channel 共享
@ChannelHandler.Sharable
public class DemoNettyServerHandler extends ChannelInboundHandlerAdapter {/*** 每个传入的消息都会调用该方法** @param ctx* @param msg*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = (ByteBuf) msg;byte[] body = new byte[byteBuf.readableBytes()];byteBuf.readBytes(body);log.info("服务端接收到数据:{}", new String(body));byte[] responseBytes = "hi,客户端,我是服务端".getBytes();ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes));}/*** 在读取期间,有异常抛出时会调用** @param ctx* @param cause*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 打印异常栈跟踪log.error("netty server error",cause);//关闭该channelctx.close();}}
3.2 服务端连接类(InitNettyServer)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/init/InitNettyServer.java
package com.ljq.demo.springboot.netty.server.init;import com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.net.InetSocketAddress;/*** @Description: 初始化 netty 服务端* @Author: junqiang.lu* @Date: 2023/8/18*/
@Slf4j
@Component
public class InitNettyServer implements ApplicationRunner {@Value("${netty.port:9120}")private Integer nettyPort;@Resourceprivate DemoNettyServerHandler nettyServerHandler;@Overridepublic void run(ApplicationArguments args) throws Exception {this.start();}/*** 启动服务** @throws InterruptedException*/public void start() throws InterruptedException {// 连接管理线程池EventLoopGroup mainGroup = new NioEventLoopGroup(1);// 工作线程池EventLoopGroup workGroup = new NioEventLoopGroup(4);ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(mainGroup, workGroup)// 指定 nio 通道.channel(NioServerSocketChannel.class)// 指定 socket 地址和端口.localAddress(new InetSocketAddress(nettyPort))// 添加子通道 handler.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(nettyServerHandler);}});// 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成ChannelFuture channelFuture = bootstrap.bind().sync();log.info("---------- [init] netty server start ----------");// 获取Channel的CloseFuture,并且阻塞当前线程直到它完成channelFuture.channel().closeFuture().sync();} finally {// 关闭 EventLoopGroup,释放资源mainGroup.shutdownGracefully().sync();}}}
注意事项: 为什么要定义两个 EventLoopGroup
? 实际上这个 EventLoopGroup
相当于一个线程组,如果只定义一个,则这个线程组既负责服务端与客户端的连接管理,也负责服务端的事务处理,这就大大降低了服务端的吞吐量,定义两个 EventLoopGroup
可以极大地提高系统的并发性能。
3.3 客户端事务处理器(DemoNettyClientHandler)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoNettyClientHandler.java
package com.ljq.demo.springboot.netty.server.handler;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** @Description: netty 客户端处理器* @Author: junqiang.lu* @Date: 2023/8/18*/
@Slf4j
@Component
// 标记该类实例可以被多个 channel 共享
@ChannelHandler.Sharable
public class DemoNettyClientHandler extends ChannelInboundHandlerAdapter {/*** 接收消息** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf= (ByteBuf)msg;byte[] body=new byte[byteBuf.readableBytes()];byteBuf.readBytes(body);log.info("接收到来自服务端的消息:{}",new String(body));// 释放消息ReferenceCountUtil.release(msg);}/*** 和服务器建立连接时触发** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接到服务器!!!");// 向服务端发送上线消息byte[] bytes = "hi,服务端,我是客户端!".getBytes();ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);ctx.writeAndFlush(byteBuf);}/*** 有异常时触发** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("客户端异常", cause);super.exceptionCaught(ctx, cause);}
}
3.4 客户端连接类(DemoNettyClient)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/client/DemoNettyClient.java
package com.ljq.demo.springboot.netty.server.client;import com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;/*** @Description: Netty 客户端* @Author: junqiang.lu* @Date: 2023/8/22*/
@Slf4j
public class DemoNettyClient {private final String host;private final int port;private final EventLoopGroup mainGroup;private final Bootstrap bootstrap;private Channel channel;public DemoNettyClient(String host, int port) {this.host = host;this.port = port;this.mainGroup = new NioEventLoopGroup();this.bootstrap = new Bootstrap();}public Channel getChannel() {return this.channel;}/*** 创建连接*/public void connect() throws InterruptedException {bootstrap.group(mainGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new StringEncoder()).addLast(new ByteArrayEncoder()).addLast(new DemoNettyClientHandler());}});ChannelFuture future = bootstrap.connect().sync();this.channel = future.channel();}/*** 发送消息** @param message*/public void sendMessage(String message) {log.info("客户端待发送消息:{}", message);Channel channel = this.getChannel();byte[] bytes = message.getBytes(StandardCharsets.UTF_8);ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);channel.writeAndFlush(byteBuf);}public void close() throws InterruptedException {log.info("关闭客户端");mainGroup.shutdownGracefully().sync();}public static void main(String[] args) throws InterruptedException {String serverHost = "127.0.0.1";int serverPort = 9120;String message = "ahahaha啊哈哈哈啊哈";DemoNettyClient nettyClient = new DemoNettyClient(serverHost, serverPort);nettyClient.connect();for (int i = 0; i < 10; i++) {nettyClient.sendMessage(message + i);}log.info("--------开始休眠 5 秒------------");Thread.sleep(5000L);for (int i = 0; i < 5; i++) {nettyClient.sendMessage(i + message);Thread.sleep(100L);}nettyClient.close();}}
4 测试
4.1 测试流程
先启动 SpringBoot 应用程序,此时服务端已经启动完成
再执行客户端连接类(DemoNettyClient) 中的 main 方法
4.2 测试结果
客户端日志:
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈0
2023-08-23 11:27:11 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 48| 客户端连接到服务器!!!
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈1
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈2
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈3
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈4
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈5
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈6
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈7
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈8
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈9
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 96| --------开始休眠 5 秒------------
2023-08-23 11:27:11 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:16 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:0ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:16 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:1ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:16 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:2ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:17 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:3ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:17 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:4ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:17 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 82| 关闭客户端
Disconnected from the target VM, address: '127.0.0.1:56274', transport: 'socket'
服务端日志:
2023-08-23 11:27:11 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:hi,服务端,我是客户端!ahahaha啊哈哈哈啊哈0ahahaha啊哈哈哈啊哈1ahahaha啊哈哈哈啊哈2ahahaha啊哈哈哈啊哈3ahahaha啊哈哈哈啊哈4ahahaha啊哈哈哈啊哈5ahahaha啊哈哈哈啊哈6ahahaha啊哈哈哈啊哈7ahahaha啊哈哈哈啊哈8ahahaha啊哈哈哈啊哈9
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:0ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:1ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:2ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:3ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:4ahahaha啊哈哈哈啊哈
4.3 测试结论
- (1)从日志中可以看出客户端与服务端已经通讯成功
- (2)在第一个客户端循环发送消息中,客户端发了10次,然而服务端一次性接收到了所有消息,并不是发送一次接收一次
- (3)在第二个客户端循环发送消息过程中,客户端每间隔100毫秒发送一条消息,此时服务端是一条一条接收的
关于第(2)点,这是 Netty 通讯过程中的粘包问题,欲知如何解决,且听下回分解。
关于第(3)点,由于添加了时间间隔,Netty 会认为一条消息发送完成,因此就能发送一条接收一条。
5 推荐参考资料
[Netty入门] 最简单的Netty应用程序实例
Netty4:一个简单的消息传递的demo(分析和解析)
6 Github 源码
Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo/tree/master/demo-netty-server
个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.