springboot使用netty做TCP客户端

 1、服务端文档说明


## 1. 概述本文档描述了Socket模拟器的通信协议实现细节,包括数据包格式、字节序、编码方式等信息。## 2. 通信基础### 2.1 连接方式
- 协议类型:TCP
- 网络层:IPv4 (AddressFamily.InterNetwork)
- 传输方式:流式 (SocketType.Stream)
- 协议:TCP (ProtocolType.Tcp)### 2.2 字节序
- 使用小端字节序(Little-Endian)
- 使用 `BitConverter` 进行字节序列的转换
- 长度字段采用4字节整数表示## 3. 数据包格式### 3.1 基本结构
```
+----------------+------------------+
|  Length (4B)   |   Payload       |
+----------------+------------------+
```- Length: 4字节整数,表示Payload的长度
- Payload: UTF-8编码的消息内容### 3.2 字段说明
1. Length字段- 大小:4字节- 类型:Int32- 字节序:小端序- 说明:表示后续Payload的字节长度2. Payload字段- 编码:UTF-8- 长度:可变,由Length字段指定- 内容:实际传输的消息数据## 4. 消息处理流程### 4.1 发送流程
1. 将消息字符串转换为UTF-8字节数组
2. 计算消息字节数组长度
3. 将长度转换为4字节数组(小端序)
4. 组合长度字段和消息内容
5. 发送完整数据包示例代码:
```csharp
byte[] bytes = Encoding.UTF8.GetBytes(sendMsg);   
byte[] xLenAry = BitConverter.GetBytes(bytes.Length);
byte[] sendData = new byte[bytes.Length + 4];
xLenAry.CopyTo(sendData, 0);
bytes.CopyTo(sendData, 4);
socket.Send(sendData);
```### 4.2 接收流程
1. 接收数据到缓冲区(缓冲区大小1MB)
2. 读取前4字节获取消息长度
3. 根据长度读取后续消息内容
4. 将字节数组转换为UTF-8字符串示例代码:
```csharp
byte[] arrServerRecMsg = new byte[1024 * 1024];
int length = socket.Receive(arrServerRecMsg);
byte[] lenstr = arrServerRecMsg.Skip(0).Take(4).ToArray();
int len = BitConverter.ToInt32(lenstr, 0);
string strSRecMsg = Encoding.UTF8.GetString(arrServerRecMsg, 4, len);
```## 5. 错误处理### 5.1 连接断开检测
- 通过 `Socket.Connected` 属性检查连接状态
- 捕获异常处理连接断开情况
- 在连接断开时清理资源并通知UI### 5.2 异常处理
- 捕获Socket异常并进行相应处理
- 在连接断开时关闭Socket
- 从连接池中移除断开的连接
- 更新UI显示连接状态## 6. 缓冲区管理### 6.1 接收缓冲区
- 大小:1MB (1024 * 1024 字节)
- 类型:字节数组
- 用途:临时存储接收到的数据### 6.2 发送缓冲区
- 动态分配,根据消息长度创建
- 包含4字节长度头部和消息内容
- 一次性发送完整数据包## 7. 注意事项1. 字符编码统一使用UTF-8,支持中文等多语言字符
2. 发送消息时需要先发送长度信息
3. 接收消息时需要先解析长度字段
4. 所有网络操作都需要进行异常处理
5. 在连接断开时要及时清理资源## 8. 性能考虑1. 使用后台线程处理接收消息
2. 设置适当的缓冲区大小
3. 及时关闭不使用的连接
4. 避免频繁的字符串转换操作 

2、pom文件

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.5.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent>
        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version></dependency>

3、配置文件

tcp:client:host: 127.0.0.1port: 15000timeout: 5000pool:maxTotal: 10maxIdle: 5minIdle: 2

4、 客户端示例 

     4.1、配置类

package com.netty.client.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "tcp.client")
public class TcpClientConfig {private String host;private int port;private Pool pool;private int timeout;// Getters and Setters@Datapublic static class Pool {private int maxTotal;private int maxIdle;private int minIdle;// Getters and Setters}
}

4.2、ClientHandler

package com.netty.client.hander;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.concurrent.*;
@Slf4j
@Component
@ChannelHandler.Sharable
public class ClientHandler extends SimpleChannelInboundHandler<String> {private final ConcurrentMap<String, CompletableFuture<String>> pendingRequests = new ConcurrentHashMap<>();private static final ScheduledExecutorService TIMEOUT_EXECUTOR = Executors.newScheduledThreadPool(1);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) {log.info("收到消息 ===> {}", msg);try{JSONObject jsonObject = JSON.parseObject(msg);JSONObject header = jsonObject.getJSONObject("Header");String correlationId = header.getString("TransactionID");CompletableFuture<String> future = pendingRequests.remove(correlationId);if (future != null) {future.complete(msg);}}catch (Exception e){log.info("channelRead0 has error ==>{}", Arrays.toString(e.getStackTrace()));log.info("channelRead0 message ==>{}",e.getMessage());}}public CompletableFuture<String> prepareResponse(String correlationId) {CompletableFuture<String> future = new CompletableFuture<>();ScheduledFuture<?> timeout = TIMEOUT_EXECUTOR.schedule(() -> {if (future.completeExceptionally(new TimeoutException())) {pendingRequests.remove(correlationId);}}, 5, TimeUnit.SECONDS);future.whenComplete((r, t) -> {timeout.cancel(true);pendingRequests.remove(correlationId);});pendingRequests.put(correlationId, future);return future;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}}

4.3、client

package com.netty.client;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.netty.client.config.TcpClientConfig;
import com.netty.client.hander.ClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class NettyTcpClient {private final GenericObjectPool<Channel> connectionPool;private final ClientHandler clientHandler;private final TcpClientConfig config;public NettyTcpClient(TcpClientConfig config) {this.config = config;this.clientHandler = new ClientHandler();this.connectionPool = new GenericObjectPool<>(new ChannelFactory(config, clientHandler),buildPoolConfig(config.getPool()));}private GenericObjectPoolConfig<Channel> buildPoolConfig(TcpClientConfig.Pool poolConfig) {GenericObjectPoolConfig<Channel> config = new GenericObjectPoolConfig<>();config.setMaxTotal(poolConfig.getMaxTotal());config.setMaxIdle(poolConfig.getMaxIdle());config.setMinIdle(poolConfig.getMinIdle());return config;}public String sendSync(String message) throws Exception {Channel channel = null;try {channel = connectionPool.borrowObject();JSONObject jsonObject = JSON.parseObject(message);JSONObject header = jsonObject.getJSONObject("header");String correlationId = header.getString("transactionID");// String correlationId = UUID.randomUUID().toString();CompletableFuture<String> future = clientHandler.prepareResponse(correlationId);channel.writeAndFlush(message).sync();return future.get(config.getTimeout(), TimeUnit.MILLISECONDS);}finally {if (channel != null) {connectionPool.returnObject(channel);}}}@PreDestroypublic void shutdown() {connectionPool.close();}private static class ChannelFactory extends BasePooledObjectFactory<Channel> {private final Bootstrap bootstrap;private final ClientHandler handler;public ChannelFactory(TcpClientConfig config, ClientHandler handler) {this.handler = handler;this.bootstrap = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 解码器pipeline.addLast(new LengthFieldDecoder());// 编码器pipeline.addLast(new LengthFieldEncoder());// 业务处理器pipeline.addLast(handler);}}).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000).remoteAddress(config.getHost(), config.getPort());}@Overridepublic Channel create() throws Exception {return bootstrap.connect().sync().channel();}@Overridepublic PooledObject<Channel> wrap(Channel channel) {return new DefaultPooledObject<>(channel);}@Overridepublic boolean validateObject(PooledObject<Channel> p) {return p.getObject().isActive();}@Overridepublic void destroyObject(PooledObject<Channel> p) {p.getObject().close();}}static class LengthFieldDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) return;in.markReaderIndex();int length = in.readIntLE();  // 小端序读取长度字段if (in.readableBytes() < length) {in.resetReaderIndex();return;}ByteBuf payload = in.readBytes(length);out.add(payload.toString(StandardCharsets.UTF_8));}}static class LengthFieldEncoder extends MessageToByteEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);out.writeIntLE(bytes.length);  // 小端序写入长度字段out.writeBytes(bytes);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/41405.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

matplotlib——南丁格尔玫瑰

南丁格尔玫瑰图&#xff08;Nightingale Rose Chart&#xff09;&#xff0c;是一种特殊形式的柱状图&#xff0c;它以南丁格尔&#xff08;Florence Nightingale&#xff09;命名&#xff0c;她在1858年首次使用这种图表来展示战争期间士兵死亡原因的数据。 它将数据绘制在极坐…

【大模型基础_毛玉仁】4.4 低秩适配方法

目录 4.4 低秩适配方法4.4.1 LoRA1&#xff09;方法实现2&#xff09;参数效率 4.4.2 LoRA 变体1&#xff09;打破低秩瓶颈&#xff08;例ReLoRA&#xff09;2&#xff09;动态秩分配&#xff08;例AdaLoRA&#xff09;3&#xff09;训练过程优化&#xff08;例DoRA&#xff09…

融合YOLO11与行为树的人机协作智能框架:动态工效学优化与自适应安全决策

人工智能技术要真正发挥其价值&#xff0c;必须与生产生活深度融合&#xff0c;为产业发展和人类生活带来实际效益。近年来&#xff0c;基于深度学习的机器视觉技术在工业自动化领域取得了显著进展&#xff0c;其中YOLO&#xff08;You Only Look Once&#xff09;算法作为一种…

Java为什么要使用线程池?

前言1.对线程的管理更加的规范化2.降低创建线程和销毁线程的开销 前言 之前对于Java线程池的理解&#xff0c;一直停留在&#xff1a;对于Java中的多线程机制来说&#xff0c;如果不使用线程池的话&#xff0c;线程的使用就会变得杂乱无章。这一步。一直没有深入去理解为什么其…

告别分库分表,时序数据库 TDengine 解锁燃气监控新可能

达成效果&#xff1a; 从 MySQL 迁移至 TDengine 后&#xff0c;设备数据自动分片&#xff0c;运维更简单。 列式存储可减少 50% 的存储占用&#xff0c;单服务器即可支撑全量业务。 毫秒级漏气报警响应时间控制在 500ms 以内&#xff0c;提升应急管理效率。 新架构支持未来…

TDengine 3.3.2.0 集群报错 Post “http://buildkitsandbox:6041/rest/sql“

原因&#xff1a; 初始化时处于内网环境下&#xff0c;Post “http://buildkitsandbox:6041/rest/sql“ 无法访问 修复&#xff1a; vi /etc/hosts将buildkitsandbox映射为本机节点 外网环境下初始化时没有该问题

【Linux】POSIX信号量与基于环形队列的生产消费者模型

目录 一、POSIX信号量&#xff1a; 接口&#xff1a; 二、基于环形队列的生产消费者模型 环形队列&#xff1a; 单生产单消费实现代码&#xff1a; RingQueue.hpp&#xff1a; main.cc&#xff1a; 多生产多消费实现代码&#xff1a; RingQueue.hpp&#xff1a; main.…

【13】Ajax爬取案例实战

目录 一、准备工作 二、爬取目标 三、初步探索&#xff1a;如何判断网页是经js渲染过的&#xff1f; 四、爬取列表页 4.1 分析Ajax接口逻辑 4.2 观察响应的数据 4.3 代码实现 &#xff08;1&#xff09;导入库 &#xff08;2&#xff09;定义一个通用的爬取方法…

嵌入式八股RTOS与Linux---网络系统篇

前言 关于计网的什么TCP三次握手 几层模型啊TCP报文啥的不在这里讲,会单独分成一个计算机网络模块   这里主要介绍介绍lwip和socket FreeRTOS下的网络接口–移植LWIP 实际上FreeRTOS并不自带网络接口,我们一般会通过移植lwip协议栈让FreeRTOS可以通过网络接口收发数据,具体可…

全分辨率免ROOT懒人精灵-自动化编程思维-设计思路-实战训练

全分辨率免ROOT懒人精灵-自动化编程思维-设计思路-实战训练 1.2025新版懒人精灵-实战红果搜索关键词刷视频&#xff1a;https://www.bilibili.com/video/BV1eK9kY7EWV 2.懒人精灵-全分辨率节点识别&#xff08;红果看广告领金币小实战&#xff09;&#xff1a;https://www.bili…

【更新中】【React】基础版React + Redux实现教程(Vite + React + Redux + TypeScript)

本项目是一个在react中&#xff0c;使用 redux 管理状态的基础版实现教程&#xff0c;用简单的案例练习redux的使用&#xff0c;旨在帮助学习 redux 的状态管理机制&#xff0c;包括 store、action、reducer、dispatch 等核心概念。 项目地址&#xff1a;https://github.com/Yv…

【MySQL】从零开始:掌握MySQL数据库的核心概念(四)

人们之所以不愿改变&#xff0c;是因为害怕未知。但历史唯一不变的事实&#xff0c;就是一切都会改变。 前言 这是我自己学习mysql数据库的第四篇博客总结。后期我会继续把mysql数据库学习笔记开源至博客上。 上一期笔记是关于mysql数据库的表格约束&#xff0c;没看的同学可以…

AP 场景架构设计(一) :OceanBase 读写分离策略解析

说明&#xff1a;本文内容对应的是 OceanBase 社区版&#xff0c;架构部分不涉及企业版的仲裁副本功能。OceanBase社区版和企业版的能力区别详见&#xff1a; 官网链接。 概述​ 当两种类型的业务共同运行在同一个数据库集群上时&#xff0c;这对数据库的配置等条件提出了较高…

CPU架构和微架构

CPU架构&#xff08;CPU Architecture&#xff09; CPU架构是指处理器的整体设计框架&#xff0c;定义了处理器的指令集、寄存器、内存管理方式等。它是处理器设计的顶层规范&#xff0c;决定了软件如何与硬件交互。 主要特点&#xff1a; 指令集架构&#xff08;ISA, Instr…

6.4 模拟专题:LeetCode1419.数青蛙

1.题目链接&#xff1a;数青蛙 - LeetCode 2.题目描述 给定一个字符串 croakOfFrogs&#xff0c;表示青蛙的鸣叫声序列。每个青蛙必须按顺序发出完整的 “croak” 字符&#xff0c;且多只青蛙可以同时鸣叫。要求计算最少需要多少只青蛙才能完成该字符串&#xff0c;若无法完成…

Linux 搭建dns主域解析,和反向解析

#!/bin/bash # DNS主域名服务 # user li 20250325# 检查当前用户是否为root用户 # 因为配置DNS服务通常需要较高的权限&#xff0c;只有root用户才能进行一些关键操作 if [ "$USER" ! "root" ]; then# 如果不是root用户&#xff0c;输出错误信息echo "…

Leetcode 二进制求和

java solution class Solution {public String addBinary(String a, String b) {StringBuilder result new StringBuilder();//首先设置2个指针, 从右往左处理int i a.length() - 1;int j b.length() - 1;int carry 0; //设置进位标志位//从2个字符串的末尾向前遍历while(…

【NLP 49、提示工程 prompt engineering】

目录 一、基本介绍 语言模型生成文本的基本特点 提示工程 prompt engineering 提示工程的优势 使用注意事项 ① 安全问题 ② 可信度问题 ③ 时效性与专业性 二、应用场景 能 ≠ 适合 应用场景 —— 百科知识 应用场景 —— 写文案 应用场景 —— 解释 / 编写…

【NLP 43、文本生成任务】

目录 一、生成式任务 二、seq2seq任务 1.模型结构 2.工作原理 3.局限性 三、自回归语言模型训练 Decoder only 四、自回归模型结构&#xff1a;文本生成任务 —— Embedding LSTM 代码示例 &#x1f680; 数据文件 代码流程 Ⅰ、模型初始化 Ⅱ、前向计算 代码运行流程 Ⅲ、加载…

vscode 通过Remote-ssh远程连接服务器报错 could not establish connection to ubuntu

vscode 通过Remote-ssh插件远程连接服务器报错 could not establish connection to ubuntu&#xff0c;并且出现下面的错误打印&#xff1a; [21:00:57.307] Log Level: 2 [21:00:57.350] SSH Resolver called for "ssh-remoteubuntu", attempt 1 [21:00:57.359] r…