netty构建udp服务器以及发送报文到客户端客户端详细案例

目录

一、基于netty创建udp服务端以及对应通道设置关键

二、发送数据

三、netty中的ChannelOption常用参数说明

1、ChannelOption.SO_BACKLOG

2、ChannelOption.SO_REUSEADDR

3、ChannelOption.SO_KEEPALIVE

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

5、ChannelOption.SO_LINGER

6、ChannelOption.TCP_NODELAY


一、基于netty创建udp服务端以及对应通道设置关键

@Configuration
@RefreshScope
public class NettyUdpServer {@Value("${netty.server.udpPort}")private int port;private EventLoopGroup bossGroup;//主线程private Channel channel;//通道private ChannelFuture future; //回调@Autowiredprivate DataCollector dataCollector;;public Channel start() throws InterruptedException {//判断是否支持Epoll模式,从而创建不同的线程组bossGroup = Epoll.isAvailable()? new EpollEventLoopGroup() : new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();//linux平台下增加SO_REUSEPORT特性提高性能,支持多个进程或者线程绑定到同一个端口,提高服务器程序的吞吐性能if(Epoll.isAvailable()) {//设置反应器线程组b.group(bossGroup).handler(new EpollUdpServerInitializer(dataCollector))//设置nio类型的通道.channel(EpollDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.SO_RCVBUF, 1024 * 1024).option(EpollChannelOption.SO_REUSEPORT, true);}else{//设置反应器线程组b.group(bossGroup).handler(new UdpServerInitializer(dataCollector))//设置nio类型的通道.channel(NioDatagramChannel.class)//设置通道的参数.option(ChannelOption.SO_BROADCAST, true).option(ChannelOption.SO_REUSEADDR, true);}//Channel channel = server.bind(port).sync().channel();//开始绑定服务器,通过调用sync()同步方法阻塞直到绑定成功//ChannelFuture channelFuture = b.bind(port).sync();//等待通道关闭的异步任务结束//ChannelFuture closeFuture = channelFuture.channel().closeFuture();//closeFuture.sync();ChannelFuture f = b.bind(port).sync();channel = f.channel();if(f.isSuccess()){//MasterSelector registry = new MasterSelector("","netty-services",  port);System.out.println("UDP服务器启动,监听在端口:" + port);}else {channel.closeFuture().sync();}} finally {//bossGroup.shutdownGracefully().sync();}System.out.println("Udp服务器启动,监听在端口:"+port);return channel;}
}

以上代码中Epoll.isAvailable()用户判断是window还是linux环境,linux环境默认采用Epoll相关通道,所以显式设置EpollDatagramChannel通道。在处理(handler)的设置中要根据不同的通道设置初始化的通道类型:

linux环境下EpollDatagramChannel通道设置 .handler(new EpollUdpServerInitializer(dataCollector))具体代码

public class EpollUdpServerInitializer extends ChannelInitializer<EpollDatagramChannel> {private final DataCollector dataCollector;public EpollUdpServerInitializer(DataCollector dataCollector) {this.dataCollector = dataCollector;}@Overrideprotected void initChannel(EpollDatagramChannel epollDatagramChannel) throws Exception {epollDatagramChannel.pipeline()//添加netty空闲超时检查的支持.addLast(new UdpServerHandler(dataCollector));}

要使 通过服务器端通过EpollDatagramChannel通道发送数据,客户端能够正常接收到数据,下图中标红的泛型通道类要与服务器端设置的通道类一致

同意要支持Nio类型通道为NioDatagramChannel.class时,通道初始化为:

public class UdpServerInitializer extends ChannelInitializer<NioDatagramChannel> {private final DataCollector dataCollector;public UdpServerInitializer(DataCollector dataCollector) {this.dataCollector = dataCollector;}@Overrideprotected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {nioDatagramChannel.pipeline()//添加netty空闲超时检查的支持.addLast(new UdpServerHandler(dataCollector));}
}

 要使 通过服务器端通过NioDatagramChannel通道发送数据,客户端能够正常接收到数据,下图中标红的泛型通道类要与服务器端设置的通道类一致

二、发送数据

关键代码,采用writeAndFlush发送数据,注意:要发送udp数据报,

public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {/**设置最大消息大小*/private static final int MAX_MESSAGE_SIZE = 2048;/**线程池*/private ExecutorService executorService;private final DataCollector dataCollector;public UdpServerHandler(DataCollector dataCollector) {this.dataCollector = dataCollector;//根据当前系统可用的处理器数量创建一个固定长度的线程池executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, DatagramPacket datagramPacket) throws Exception {ByteBuf buffer = datagramPacket.content();//确保不会超出最大消息大小if(buffer.readableBytes() > MAX_MESSAGE_SIZE) {buffer.release();return;}UdpDatagram udpDatagram = parseUdpDatagram(buffer);UdpDatagram respUdpDatagram = dataCollector.processUdpDatagram(udpDatagram);if (null != respUdpDatagram) {handleReceivedData(ctx, respUdpDatagram, datagramPacket);}}/*** 处理接收到的数据* @param ctx* @param udpDatagram*/public void handleReceivedData(ChannelHandlerContext ctx, UdpDatagram udpDatagram, DatagramPacket datagramPacket) throws ExecutionException, InterruptedException {Channel channel = ctx.channel();if (log.isInfoEnabled()) {log.info("received udp message: sessionId: {}, opCode: {}, short messageId: {}",ctx.channel().id(), udpDatagram.getMessageTypeEnum(), udpDatagram.getShortMessageId());}byte[] payloadBytes = udpDatagram.getPayloadBytes();ByteBuf copiedBuffer = Unpooled.copiedBuffer(payloadBytes);ChannelFuture channelFuture = channel.writeAndFlush(new DatagramPacket(copiedBuffer.retain(), datagramPacket.sender()));channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (channelFuture.isSuccess()) {// 数据发送成功log.info("数据发送成功:sender host: {}, sender port:{}, sender address:{}",datagramPacket.sender().getHostName(),datagramPacket.sender().getPort(), datagramPacket.sender().getAddress());} else {// 数据发送失败log.error("数据发送失败: {}",channelFuture.cause().getStackTrace());channelFuture.cause().printStackTrace();}}});}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {dataCollector.tcpConnect(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (log.isWarnEnabled()) {log.warn("udp session throw an exception, sessionId:{} exception message: {}",ctx.channel().id().asLongText(), cause.getMessage());}}//当客户端关闭链接时关闭通道@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {dataCollector.tcpChannelDisconnect(ctx.channel());}
}

处理类继承SimpleChannelInboundHandler类泛型类为DatagramPacket 

 writeAndFlush方法中发送的数据类型要是DatagramPacket

三、netty中的ChannelOption常用参数说明

1、ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数。函数listen(int socketfd, int backlog)用来初始化服务端可连接队列。

服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。

2、ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。

比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用。

比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

3、ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。

当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作发送缓冲区大小和接受缓冲区大小。

接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

5、ChannelOption.SO_LINGER

ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发送剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送。

6、ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。

Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。

而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

ChannelOption属性
SO_BROADCAST对应套接字层的套接字:SO_BROADCAST,将消息发送到广播地址。
如果目标中指定的接口支持广播数据包,则启用此选项可让应用程序发送广播消息。
SO_KEEPALIVE对应套接字层的套接字:SO_KEEPALIVE,保持连接。
在空闲套接字上发送探测,以验证套接字是否仍处于活动状态。
SO_SNDBUF对应套接字层的套接字:SO_SNDBUF,设置发送缓冲区的大小。
SO_RCVBUF对应套接字层的套接字:SO_RCVBUF,获取接收缓冲区的大小。
SO_REUSEADDR对应套接字层的套接字:SO_REUSEADDR,本地地址复用。
启用此选项允许绑定已使用的本地地址。
SO_LINGER对应套接字层的套接字:SO_LINGER,延迟关闭连接。
启用此选项,在调用close时如果存在未发送的数据时,在close期间将阻止调用应用程序,直到数据被传输或连接超时。
SO_BACKLOG

对应TCP/IP协议中<font color=red>backlog</font>参数,<font color=red>backlog</font>即连接队列,设置TCP中的连接队列大小。如果队列满了,会发送一个ECONNREFUSED错误信息给C端,即“ Connection refused”。

SO_TIMEOUT等待客户连接的超时时间。
IP_TOS对应套接字层的套接字:IP_TOS,在IP标头中设置服务类型(TOS)和优先级。
IP_MULTICAST_ADDR对应IP层的套接字选项:IP_MULTICAST_IF,设置应发送多播数据报的传出接口。
IP_MULTICAST_IF对应IP层的套接字选项:IP_MULTICAST_IF2,设置应发送多播数据报的IPV6传出接口。
IP_MULTICAST_TTL对应IP层的套接字选项:IP_MULTICAST_TTL,在传出的 多播数据报的IP头中设置生存时间(TTL)。
IP_MULTICAST_LOOP_DISABLED取消 指定应将 传出的多播数据报的副本 回传到发送主机,只要它是多播组的成员即可。
TCP_NODELAY

对应TCP层的套接字选项:TCP_NODELAY,指定TCP是否遵循<font color=#35b998>Nagle算法</font> 决定何时发送数据。Nagle算法代表通过减少必须发送包的个数来增加网络软件系统的效率。即尽可能发送大块数据避免网络中充斥着大量的小数据块。如果要追求高实时性,需要设置关闭Nagle算法;如果需要追求减少网络交互次数,则设置开启Nagle算法。

  

 ChannelOption通用配置

参数说明
ALLOCATORByteBuf的分配器,默认值为ByteBufAllocator.DEFAULT。
RCVBUF_ALLOCATOR

用于Channel分配接受Buffer的分配器,默认值为AdaptiveRecvByteBufAllocator.DEFAULT,是一个自适应的接受缓冲区分配器,能根据接受到的数据自动调节大小。可选值为FixedRecvByteBufAllocator,固定大小的接受缓冲区分配器。

MESSAGE_SIZE_ESTIMATOR

消息大小估算器,默认为DefaultMessageSizeEstimator.DEFAULT。估算ByteBuf、ByteBufHolder和FileRegion的大小,其中ByteBuf和ByteBufHolder为实际大小,FileRegion估算值为0。该值估算的字节数在计算水位时使用,FileRegion为0可知FileRegion不影响高低水位。

CONNECT_TIMEOUT_MILLIS连接超时毫秒数,默认值30000毫秒即30秒。
WRITE_SPIN_COUNT一个Loop写操作执行的最大次数,默认值为16。也就是说,对于大数据量的写操作至多进行16次,如果16次仍没有全部写完数据,此时会提交一个新的写任务给EventLoop,任务将在下次调度继续执行。这样,其他的写请求才能被响应不会因为单个大数据量写请求而耽误。
WRITE_BUFFER_WATER_MARK
ALLOW_HALF_CLOSURE一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。
AUTO_READ自动读取,默认值为True。Netty只在必要的时候才设置关心相应的I/O事件。对于读操作,需要调用channel.read()设置关心的I/O事件为OP_READ,这样若有数据到达才能读取以供用户处理。该值为True时,每次读操作完毕后会自动调用channel.read(),从而有数据到达便能读取;否则,需要用户手动调用channel.read()。需要注意的是:当调用config.setAutoRead(boolean)方法时,如果状态由false变为true,将会调用channel.read()方法读取数据;由true变为false,将调用config.autoReadCleared()方法终止数据读取。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/288446.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vite+vue3动态模块化导入并使用pinia

一、安装引入pinia 1.安装 pnpm install pinia # 或者使用 yarn yarn add pinia # 或者使用 npm npm install pinia 2.在main.js里引入 import { createApp } from vue import App from ./App.vue import { createPinia } from pinia createApp(App).use(createPinia()).mo…

java特殊文件——properties属性文件概述

前言&#xff1a; 整理下学习笔记&#xff0c;打好基础&#xff0c;daydayup!! properties properties是一个Map集合&#xff08;键值对合集&#xff09;&#xff0c;但是一般不当作合集。而是用来代表属性文件&#xff0c;通过Properties读写属性文件里的内容 Properties调用方…

数据库学习(四)mybatis

Mybatis Mybatis是一个基于数据持久层&#xff08;DAO层&#xff09;的一款框架&#xff0c;他能极大的简化Java中连接数据库&#xff0c;操作数据库也就是jdbc的操作。 在定义mybatis相关接口时&#xff0c;不需要定义实现类&#xff0c;因为在程序启动时&#xff0c;mybati…

程序员如何兼职赚小钱?

程序员由于有技术和手艺其实兼职赚钱的路子还是挺多的&#xff0c;只要你有足够的时间。 1. 做外包 这是比较传统的方式&#xff0c;甲方在一些众包平台上发布开发任务&#xff0c;你可以抢这个任务&#xff0c;但是价格都比较便宜。 任务比较多的平台: 猪八戒、一品威客、开…

聚合支付备案新增机构名单公布,14家机构成功备案

孟凡富 3月27日&#xff0c;中国支付清算协会公布了最新一批收单外包服务机构备案机构结果&#xff0c;总备案机构为27000家&#xff0c;新增备案机构为648家&#xff0c;其中&#xff0c;新增聚合支付技术服务备案机构包括北京鑫杰华誉、深圳中峻、多点(深圳)数字科技、扬州泽…

Amazon SageMaker + Stable Diffusion 搭建文本生成图像模型

如果我们的计算机视觉系统要真正理解视觉世界&#xff0c;它们不仅必须能够识别图像&#xff0c;而且必须能够生成图像。文本到图像的 AI 模型仅根据简单的文字输入就可以生成图像。 近两年&#xff0c;以ChatGPT为代表的AIGC技术崭露头角&#xff0c;逐渐从学术研究的象牙塔迈…

静态、动态代理模式(Spring学习笔记八)

代理模式是SpringAOC的底层 代理模式分为&#xff1a;静态代理模式 动态代理模式 1、静态代理 代码步骤 接口&#xff1a; package com.li.dedmo01;public interface Rent {public void rent(); }真实角色&#xff1a; package com.li.dedmo01;public class Host imple…

没有与参数列表匹配的构造函数“cv::VideoWriter::VideoWriter”实例

今天在使用Visual Studio开发与OpenCV相关的程序时&#xff0c;遇到了这样的情况: 第一个参数的下方被打上了红波浪线&#xff0c;我本能的觉得是第一个参数出的问题&#xff0c;于是改成了这样: 红线依然存在&#xff0c;没有消失&#xff0c;把鼠标放在红线下方&#xff0c…

AI Agent(LLM Agent)入门解读

1. 什么是AI Agent&#xff1f; AI Agent可以理解为一个智能体&#xff0c;包括感知模块、规划决策模块和行动模块&#xff0c;类似于人类的五官、大脑和肢体。它能帮助人类处理复杂的任务&#xff0c;并能根据环境反馈进行学习和调整。 五官可以理解为感知模块&#xff0c;大…

Linux相关命令(1)

1、找出文件夹下包含 “aaa” 同时不包含 “bbb”的文件&#xff0c;然后把他们重新生成一下。要求只能用一行命令。 find ./ -type f -name "*aaa*" ! -name "*bbb*" -exec touch {} \;文件系统操作命令 df&#xff1a;列出文件系统的整体磁盘使用情况 …

已注册的商标别忘了续展,新注可能难下证!

近期普推知产老杨遇到好几个网友和看过多个案例&#xff0c;以前商标名称可以申请注册下来&#xff0c;但是换字体注册不下来了&#xff0c;有的是不想续展想直接换字体申请注册&#xff0c;但是也没有下来。 这些商标名称主要是存在禁止注册或缺显&#xff0c;比如“柳林”以前…

LeetCode讲解算法2-数据结构[栈和队列](Python版)

文章目录 一、栈1.1 栈的定义1.2 栈的实现分析步骤1.3 栈的应用匹配圆括号匹配符号模2除法&#xff08;十进制转二进制&#xff09;进制转换 二、队列2.1 单向队列2.2 双端队列2.3 队列的应用验证回文串滑动窗口最大值 一、栈 1.1 栈的定义 栈是一种线性数据结构&#xff0c;栈…

【MySQL】简述SQLの通用语法及4种基本语句介绍(DDL/DML/DQL/DCL)

前言 大家好吖&#xff0c;欢迎来到 YY 滴MySQL系列 &#xff0c;热烈欢迎&#xff01; 本章主要内容面向接触过C Linux的老铁 主要内容含&#xff1a; 欢迎订阅 YY滴C专栏&#xff01;更多干货持续更新&#xff01;以下是传送门&#xff01; YY的《C》专栏YY的《C11》专栏YY的…

【那些年错过的好书】——Web前端开发实战:HTML5+CSS3+JavaScript+Vue+Bootstrap(微视频版)

喜欢前端的同学&#xff0c;可以私信我加入学习群。 点击链接&#xff0c;获取资源&#xff1a; https://lizetoolbox.top:8080/qrCode_contact 或者 http://lizetoolbox.top/qrCode_contact 正文开始 前言推荐理由书籍介绍章节介绍实书示例写在最后 前言 陌生的朋友&…

kubectl 启用shell自动补全功能

官网手册参考&#xff1a;https://kubernetes.io/zh-cn/docs/tasks/tools/install-kubectl-linux/ 系统&#xff1a;centos7 补全脚本依赖于工具 bash-completion&#xff0c; 所以要先安装它&#xff08;可以用命令 type _init_completion 检查 bash-completion 是否已安装&a…

Python爬虫之爬取网页图片

当我们想要下载网页的图片时&#xff0c;发现网页的图片太多了&#xff0c;无从下手&#xff0c;那我们写一个脚本来爬取呗。 这次的脚本是专门针对某个外国网站使用的&#xff0c;因此仅供参考思路。 在测试的过程中&#xff0c;我发现网站使用了发爬虫机制&#xff0c;具体就…

阿里云服务器优惠价格61元一年,多配置报价,来看看

2024年阿里云服务器优惠价格表&#xff0c;一张表整理阿里云服务器最新报价&#xff0c;阿里云服务器网aliyunfuwuqi.com整理云服务器ECS和轻量应用服务器详细CPU内存、公网带宽和系统盘详细配置报价单&#xff0c;大家也可以直接移步到阿里云CLUB中心查看 aliyun.club 当前最新…

AI+软件工程:10倍提效!用ChatGPT编写系统功能文档

系统功能文档是一种描述软件系统功能和操作方式的文档。它让开发团队、测试人员、项目管理者、客户和最终用户对系统行为有清晰、全面的了解。 通过ChatGPT&#xff0c;我们能让编写系统功能文档的效率提升10倍以上。 用ChatGPT生成系统功能文档 我们以线上商城系统为例&#…

jetcache 2级缓存模式实现批量清除

需求 希望能够实现清理指定对象缓存的方法&#xff0c;例如缓存了User表&#xff0c;当User表巨大时&#xff0c;通过id全量去清理不现实&#xff0c;耗费资源也巨大。因此需要能够支持清理指定本地和远程缓存的批量方法。 分析 查看jetcache生成的cache接口&#xff0c;并没…

Java设计模式 | 抽象工厂模式

抽象工厂模式 工厂方法模式中考虑的是一类产品的生产&#xff0c;如幼儿园只培养小朋友&#xff0c;鞋厂只生产鞋子。这些工厂只生产同种类产品&#xff0c;同种类产品称为同等级产品&#xff0c;即工厂方法模式只考虑生产同等级的产品&#xff0c;但是在现实生活中许多工厂都…