Netty的简介与实战

Netty简介

一、背景与来源

  • Netty最初是由JBOSS提供的一个Java开源框架,现在已成为Github上的独立项目。
  • 它基于Java的NIO(New Input/Output)模型,提供了简单而强大的抽象,使得网络编程变得更加容易和高效。

二、特点与优势

  1. 高性能:Netty使用高效的Reactor模式,采用非阻塞I/O操作,以及优化的内存管理和缓冲区池,确保了高性能的数据处理和传输。与其他业界主流的NIO框架相比,Netty在吞吐量、延迟、资源消耗等方面都表现出色。
  2. 异步事件驱动:Netty基于事件驱动模型,能够轻松处理并发连接和高吞吐量的数据传输。它处理连接、读写、异常等事件,并通过事件处理器将这些事件传递给应用程序,使开发者能够集中处理业务逻辑。
  3. 支持多种协议:Netty支持多种传输协议,包括TCP、UDP、HTTP、HTTPS、WebSocket、Google Protocol Buffers等,还可以通过扩展支持其他自定义协议。这使得开发者能够更轻松地处理不同协议的网络通信。
  4. 可扩展性:Netty提供了灵活的Pipeline机制,允许开发者通过ChannelHandler链来处理网络事件,实现自定义的编解码器、拦截器等组件。通过这种模块化的设计,可以轻松扩展和定制Netty的功能。
  5. 易用性:Netty提供了简洁的API和详细的文档,使得开发者能够快速上手和实现复杂的网络功能。同时,Netty的社区非常活跃,有大量的资源和经验可供参考。
  6. 跨平台:Netty可运行在多种操作系统和Java版本上,保证了良好的跨平台兼容性。

三、核心组件与功能

Netty框架主要由以下几个核心组件构成,这些组件共同构建了Netty的整体架构,分别负责处理不同的功能和逻辑:

  1. Channel:Channel是Netty中的基本抽象,代表一个连接或通信的载体,可以是TCP连接、UDP套接字等。Channel负责I/O操作的执行,并维护连接的状态。
  2. EventLoop:EventLoop是Netty中的事件循环,负责处理I/O操作和任务调度。每个EventLoop都与一个线程关联,并分配给一个或多个Channel。EventLoop负责将事件分发给对应的ChannelHandler进行处理。
  3. ChannelHandler:ChannelHandler是Netty中的处理器接口,负责处理网络事件,如连接建立、数据读写、异常处理等。开发者可以实现自定义的ChannelHandler以处理特定的业务逻辑。
  4. ChannelPipeline:ChannelPipeline是一个ChannelHandler的链表,负责管理和调度ChannelHandler。当一个网络事件发生时,ChannelPipeline会按照链表顺序将事件传递给各个ChannelHandler,直到其中一个处理器处理了事件或者到达链表尾部。
  5. ChannelHandlerContext:ChannelHandlerContext是ChannelHandler与ChannelPipeline之间的桥梁,允许ChannelHandler与Pipeline以及其他Handler进行交互。通过ChannelHandlerContext,Handler可以访问Channel、Pipeline,以及发送事件给其他Handler。
  6. ByteBuf:ByteBuf是Netty中的字节缓冲区,用于存储和处理字节数据。相较于Java的ByteBuffer,ByteBuf提供了更高效的内存管理和更简洁的API,支持自动扩容、复合缓冲区等特性。
  7. Bootstrap:Bootstrap是Netty中的启动类,用于配置和启动客户端或服务器。通过Bootstrap,开发者可以设置Channel的初始化参数、事件处理器等,以及绑定端口和启动监听。

四、应用场景与实例

Netty被广泛应用于分布式系统、实时通信、游戏开发等场景。例如,RocketMQ、Elasticsearch、Dubbo等知名的开源项目和大型企业都使用了Netty作为底层网络通信框架。这些应用通过Netty的高性能和灵活的设计,实现了高效、可靠的网络通信。

实战

netty服务器

@Componentpublic class NettyServer {static final Logger log = LoggerFactory.getLogger(NettyServer.class);/*** 端口号*/@Value("${webSocket.netty.port:8888}")int port;EventLoopGroup bossGroup;EventLoopGroup workGroup;@AutowiredProjectInitializer nettyInitializer;@PostConstructpublic void start() throws InterruptedException {new Thread(() -> {bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();// bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作bootstrap.group(bossGroup, workGroup);// 设置NIO类型的channelbootstrap.channel(NioServerSocketChannel.class);// 设置监听端口bootstrap.localAddress(new InetSocketAddress(port));// 设置管道bootstrap.childHandler(nettyInitializer);// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功ChannelFuture channelFuture = null;try {channelFuture = bootstrap.bind().sync();log.info("Server started and listen on:{}", channelFuture.channel().localAddress());// 对关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}).start();}/*** 释放资源*/@PreDestroypublic void destroy() throws InterruptedException {if (bossGroup != null) {bossGroup.shutdownGracefully().sync();}if (workGroup != null) {workGroup.shutdownGracefully().sync();}}
}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

  public class NettyConfig {/*** 定义全局单利channel组 管理所有channel*/private static volatile ChannelGroup channelGroup = null;/*** 存放请求ID与channel的对应关系*/private static volatile ConcurrentHashMap<String, Channel> channelMap = null;/*** 定义两把锁*/private static final Object lock1 = new Object();private static final Object lock2 = new Object();public static ChannelGroup getChannelGroup() {if (null == channelGroup) {synchronized (lock1) {if (null == channelGroup) {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}}}return channelGroup;}public static ConcurrentHashMap<String, Channel> getChannelMap() {if (null == channelMap) {synchronized (lock2) {if (null == channelMap) {channelMap = new ConcurrentHashMap<>();}}}return channelMap;}public static Channel getChannel(String userId) {if (null == channelMap) {return getChannelMap().get(userId);}return channelMap.get(userId);}}

管道配置

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {/*** webSocket协议名*/static final String WEBSOCKET_PROTOCOL = "WebSocket";/*** webSocket路径*/@Value("${webSocket.netty.path:/webSocket}")String webSocketPath;@AutowiredWebSocketHandler webSocketHandler;@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 设置管道ChannelPipeline pipeline = socketChannel.pipeline();// 流水线管理通道中的处理程序(Handler),用来处理业务// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ObjectEncoder());// 以块的方式来写的处理器pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));// 自定义的handler,处理业务逻辑pipeline.addLast(webSocketHandler);}
}

自定义handler

 @Component@ChannelHandler.Sharablepublic class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger log = LoggerFactory.getLogger(NettyServer.class);/*** 一旦连接,第一个被执行*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());// 添加到channelGroup 通道组NettyConfig.getChannelGroup().add(ctx.channel());}/*** 读取数据*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info("服务器收到消息:{}", msg.text());// 获取用户ID,关联channelJSONObject jsonObject = JSONUtil.parseObj(msg.text());String uid = jsonObject.getStr("uid");NettyConfig.getChannelMap().put(uid, ctx.channel());// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key = AttributeKey.valueOf("userId");ctx.channel().attr(key).setIfAbsent(uid);// 回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("用户下线了:{}", ctx.channel().id().asLongText());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常:{}", cause.getMessage());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);ctx.close();}/*** 删除用户与channel的对应关系*/private void removeUserId(ChannelHandlerContext ctx) {AttributeKey<String> key = AttributeKey.valueOf("userId");String userId = ctx.channel().attr(key).get();NettyConfig.getChannelMap().remove(userId);}}推送消息接口及实现类public interface PushMsgService {/*** 推送给指定用户*/void pushMsgToOne(String userId, String msg);/*** 推送给所有用户*/void pushMsgToAll(String msg);}@Servicepublic class PushMsgServiceImpl implements PushMsgService {@Overridepublic void pushMsgToOne(String userId, String msg) {Channel channel = NettyConfig.getChannel(userId);if (Objects.isNull(channel)) {throw new RuntimeException("未连接socket服务器");}channel.writeAndFlush(new TextWebSocketFrame(msg));}@Overridepublic void pushMsgToAll(String msg) {NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/458650.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【算法刷题指南】双指针

&#x1f308;个人主页&#xff1a; 南桥几晴秋 &#x1f308;C专栏&#xff1a; 南桥谈C &#x1f308;C语言专栏&#xff1a; C语言学习系列 &#x1f308;Linux学习专栏&#xff1a; 南桥谈Linux &#x1f308;数据结构学习专栏&#xff1a; 数据结构杂谈 &#x1f308;数据…

前端零基础入门到上班:【Day1】什么是前端?

本来打算开付费专栏 但是想起那句话 赠人玫瑰手留余香 引言1. 什么是前端&#xff1f;1.1 前端的定义1.2 前端的三大核心技术1.3 前端框架和工具 2. 什么是后端&#xff1f;2.1 后端的定义2.2 后端的组成要素2.3 后端框架和工具 3. 前后端的区别4. 什么是前后端分离&#xff1f…

院士领衔,瑞德磁电誓将中国红染遍磁电产业

【哔哥哔特导读】今天我们从广州来到淮北&#xff0c;参观一家由院士领衔创立的金属磁粉芯企业&#xff0c;看他们如何将中国红染遍磁电产业&#xff0c;一步步实现金属磁粉芯的国产替代。 想要成为一个领域的头部企业&#xff0c;技术实力与产能规模缺一不可&#xff0c;而瑞…

[翱捷]让SDK跑起来了

一&#xff0c;环境安排及验证 参照文档 <<ASR编译环境及编译步骤--3601.docx>> <<Windows环境搭建.docx>> <<ChildWatchSWUG_1221.doc>> 主要工具包括 ARM DS-5 V5.26.2 (64-bit)ActivePerl 5.28.1 Build 2801 (64-bit)msys2-x86_6…

摊牌了,创业失败了

“以为这个网红不会塌房&#xff0c;结果一觉醒来&#xff0c;天塌了……” ——某电商供应商 “这不是禁不住网上的各种诱惑吗&#xff0c;9月30日纵身入局&#xff0c;节假日几天不能买入&#xff0c;8号上班第一天我还看着钱数开心呢。结果今天……” ——一位投资失利&…

Python日志系统详解:Logging模块最佳实践

Python日志系统详解&#xff1a;Logging模块最佳实践 在开发Python应用程序时&#xff0c;日志记录是排查问题、监控系统状态、优化性能的重要手段。Python标准库中提供了强大的logging模块&#xff0c;使开发者可以轻松实现灵活的日志系统。本文将详细介绍Python的logging模块…

Java实现邮箱发送邮件添加定时任务(二)

上篇文章我们谈到邮件的发送&#xff0c;但是可以发现使用非常局限&#xff0c;这里我做了一个简单的修改&#xff0c;添加了定时发送功能&#xff0c;可以帮助我们处理很多繁琐的事 这里我写了一个简单的案例 1. 先在pom文件里面添加依赖 2.配置yml文件 3.写一个定时任务类…

python项目实战——多协程下载美女图片

协程 文章目录 协程协程的优劣势什么是IO密集型任务特点示例与 CPU 密集型任务的对比处理 I/O 密集型任务的方式总结 创建并使用协程asyncio模块 创建协程函数运行协程函数asyncio.run(main())aiohttp模块调用aiohttp模块步骤 aiofiles————协程异步函数遇到的问题一 await …

AI最新动态概览-2024年10月28日

1. 字节跳动加速欧洲布局&#xff0c;拟建AI研发中心 近日&#xff0c;有消息称字节跳动正积极筹备在欧洲设立AI研发中心&#xff0c;此举标志着该公司在全球技术版图上的又一重要扩张。随着人工智能技术的飞速发展&#xff0c;字节跳动正通过招兵买马&#xff0c;进一步巩固其…

Linux 进程优先级 进程切换

目录 优先级 概念 为什么优先级要限制在一定范围内 进程切换 方式 EIP寄存器(程序计数器) 进程在运行时会使用寄存器来保存临时数据 进程的上下文是什么&#xff1f; 进程的上下文保存到哪&#xff1f; 内核栈或专门的上下文结构也在内核空间&#xff1f;那为什么不直…

java 提示 避免用Apache Beanutils进行属性的copy。

避免用Apache Beanutils进行属性的copy。 Inspection info: 避免用Apache Beanutils进行属性的copy。 说明&#xff1a;Apache BeanUtils性能较差&#xff0c;可以使用其他方案比如Spring BeanUtils, Cglib BeanCopier。 TestObject a new TestObject(); TestObject b new Te…

2024 最新 frida技术栈 第一部分

目录 1.下载 2. 安装 2.1. 命令 3.基本使用 3.1 列出运行的APP 3.2 列出所有APP 3.3 杀死进程 4. frida hook 方法 4.1 frida客户端命令行的参数 4.2. Frida两种操作模式 4.3. Frida操作APP的两种方式 4.3.1. attach模式 4.3.2. spawn模式 4.3.3 转发端口启…

RabbitMQ的Overview Totals是空

一、问题描述 RabbitMQ 版本&#xff1a;4.0.2&#xff0c;Erlang 版本&#xff1a;26.2.5.4。 RabbitMQ 页面管理(rabbitmq_management)的 Overview > Totals 是空&#xff1a; 二、原因分析 RabbitMQ 的配置&#xff1a; management_agent.disable_metrics_collector…

Hive的数据存储格式

目录 一、前言 二、存储格式 2.1、文本格式&#xff08;TextFile&#xff09; 2.1.1、定义与特点 2.1.2、存储与压缩 2. 1.3、使用场景 2.2、行列式文件&#xff08;ORCFile&#xff09; 2.2.1、ORC的结构 2.2.2、ORC的数据类型 2.2.3、ORC的压缩格式 2.2.3、ORC存储…

LVGL移植教程(超详细)——基于GD32F303X系列MCU

版本&#xff1a;LVGL Kernel V8.3.0&#xff0c;运行压力测试Demo Stress首先放一张最终Stress Demo 运行图&#xff1a; 一、准备 1. GD32 Keil工程 准备任意一个屏幕可以正常显示的GD32工程&#xff1a; 2. LVGL源码 最新版现在已经是V9.2了&#xff0c;这里我选择了…

XQT_UI 组件|03 |加载组件 XQtLoading

XQtLoading 使用文档 简介 XQtLoading 是一个自定义的加载动画组件&#xff0c;旨在为用户提供可配置的旋转花瓣动画效果。它可以在应用程序中用于指示加载状态&#xff0c;提升用户体验。 特征 可配置性&#xff1a;用户可以根据需求调整旋转周期、缩放周期、最大/最小缩放…

Bi-LSTM-CRF实现中文命名实体识别工具(TensorFlow)

项目源码获取方式见文章末尾&#xff01; 回复暗号&#xff1a;13&#xff0c;免费获取600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 **《------往期经典推荐------》**项目名称 1.【MobileNetV2实现实时口罩检测tensorflow】 2.【卫星图像道路检测DeepLabV3P…

关于嵌入式学习的一些短浅经验

一、写在前面 感谢在 10.23&#xff0c;各位大佬对我进行的模拟面试&#xff0c;我也发现了我对知识的不熟练的部分&#xff0c;比如 IPC 方法和线程同步方法的知识。模拟面试第四期-已经拿到大厂 OFFER 的研究生大佬-LINUX 卷到飞起_哔哩哔哩_bilibili 然后&#xff0c;沈阳…

uniapp+uniCloud前端独立开发全栈项目Vue3版本学习路线,轻松开发H5、微信小程序、APP

概述 嗨&#xff0c;大家好&#xff0c;我是爱搞知识的咸虾米&#xff0c;这个学习路线是uniappuniCloud生态开发微信小程序、H5、APP等实战项目&#xff0c;从零基础开始到各种类型的项目案例&#xff0c;使用比较新的vue3语法糖版本&#xff0c;通过前端的技术可以轻松开发上…

微信小程序——消息订阅

首先用到的就是wx.requestSubscribeMessage接口。 注意&#xff1a;用户发生点击行为或者发起支付回调后&#xff0c;才可以调起订阅消息界面 requestSubscribeMessage() {uni.requestSubscribeMessage({tmplIds: [],//需要订阅的消息模板的id的集合&#xff0c;一次调用最多可…