C++编程:利用环形缓冲区优化 TCP 发送流程,避免 Short Write 问题

文章目录

    • 1. 什么是 Short Write 问题?
    • 2. 如何解决 Short Write 问题?
      • 2.1 方法 1:将 Socket 设置为阻塞模式
      • 2.2 方法 2:用户态维护发送缓冲区
    • 3. 用户态维护发送缓冲区实现
      • 3.1 核心要点
      • 3.2 代码实现
      • 3.3 测试程序
    • 参考文档

1. 什么是 Short Write 问题?

在 TCP 编程中,short write 问题指的是在调用 sendwrite 系统调用时,实际发送的数据量比预期要少。这通常是因为网络协议栈发送缓冲区的空间不足,导致不能一次性发送完整的数据。遇到这种情况时,系统调用会返回实际发送的字节数,并将 errno 设置为 EAGAIN,表示缓冲区没有足够的空间来继续发送数据。

2. 如何解决 Short Write 问题?

针对 EPOLL 模型 中的 LT(Level Triggered)模式,可以采取以下几种方案来解决 short write 问题:

2.1 方法 1:将 Socket 设置为阻塞模式

将 Socket 设置为阻塞模式时,send 系统调用会一直阻塞,直到有足够的缓冲区空间发送完整的数据。这种方法能够避免 short write 问题,但会导致线程阻塞,从而影响性能。因此,通常不推荐在高并发或需要高吞吐量的场景中使用此方法。

2.2 方法 2:用户态维护发送缓冲区

更推荐的方法是用户态维护一个发送缓冲区,并结合 EPOLLONESHOTEPOLLOUT 事件来控制数据发送。这种方法不需要阻塞线程,能够有效地处理 short write 问题。

3. 用户态维护发送缓冲区实现

3.1 核心要点

使用环形缓冲区来保存待发送的数据,当系统发送缓冲区不够时,数据会被存入环形缓冲区,并在后续的 EPOLLOUT 事件触发时继续发送。

  • 环形缓冲区设计

环形缓冲区(Circular Buffer)是一个固定大小的缓存,用于暂存数据。当数据无法完全写入网络协议栈缓冲区时,可以将其暂存,并在缓冲区有足够空间时继续写入。通过注册 EPOLLOUT 事件,当缓冲区有空闲空间时,程序可以重新尝试发送数据。

  • 数据收发管理类设计

    • asyncSend:当数据网络协议栈发送缓冲区没有足够空间时,会将数据存储到环形缓冲区,并通过 EPOLLONESHOTEPOLLOUT 事件确保数据能在后续时刻继续发送。

    • doSend:此函数会被 EPOLLOUT 事件触发,它从环形缓冲区中取出数据并尝试发送。如果发送成功,则释放相应的缓冲区空间;如果发送失败,且错误码为 EAGAINEINTER,则会重试。

3.2 代码实现

#include <atomic>
#include <cstring>
#include <memory>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstdio>
#include <mutex>
#include <cassert>
#include <fcntl.h>class LockFreeBytesBuffer {public:static const std::size_t kBufferSize = 10240U;  // 缓冲区大小LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {std::memset(buffer_, 0, kBufferSize);}bool append(const char* data, std::size_t length) noexcept;std::size_t beginRead(const char** target) noexcept;void endRead(std::size_t length) noexcept;private:char buffer_[kBufferSize];std::atomic<std::size_t> reader_index_;std::atomic<std::size_t> writer_index_;
};bool LockFreeBytesBuffer::append(const char* data, std::size_t length) noexcept {const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed);const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire);const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize;if (length > free_space) {return false;  // 缓冲区满}const std::size_t pos = current_write_index % kBufferSize;const std::size_t first_part = std::min(length, kBufferSize - pos);std::memcpy(&buffer_[pos], data, first_part);std::memcpy(&buffer_[0], data + first_part, length - first_part);writer_index_.store(current_write_index + length, std::memory_order_release);return true;
}std::size_t LockFreeBytesBuffer::beginRead(const char** target) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire);const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize;if (available_data == 0U) {return 0U;  // 缓冲区空}const std::size_t pos = current_read_index % kBufferSize;*target = &buffer_[pos];return std::min(available_data, kBufferSize - pos);
}void LockFreeBytesBuffer::endRead(std::size_t length) noexcept {const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed);reader_index_.store(current_read_index + length, std::memory_order_release);
}class SocketContext {public:SocketContext(int epoll_fd, int sock_fd): epoll_fd_(epoll_fd), sock_fd_(sock_fd) {setNonblocking();addFd();}~SocketContext() {removeFd();close(sock_fd_);}bool asyncSend(const char* data, int size) {bool result = buffer_.append(data, static_cast<std::size_t>(size));if (result) {modifyEvent(false, true);  // 修改 EPOLLONESHOT 和 EPOLLOUT}return result;}int doRecv() {char buffer[1024] = {};int count = read(sock_fd_, buffer, sizeof(buffer));if (count <= 0) {return count;  // 读取失败或连接关闭}modifyEvent(true, false);  // 恢复 EPOLLIN 事件fprintf(stderr, "Received: %s\n", buffer);return count;}int doSend() {const char* pdata = nullptr;std::size_t data_size = buffer_.beginRead(&pdata);if (data_size == 0) {return 0;  // 没有数据可以发送}int send_size = send(sock_fd_, pdata, static_cast<int>(data_size), MSG_DONTWAIT);if (send_size > 0) {buffer_.endRead(static_cast<std::size_t>(send_size));  // 更新已发送数据} else if (send_size == -1 && errno != EAGAIN) {fprintf(stderr, "send failed, error: %s\n", strerror(errno));}return send_size;}protected:void setNonblocking() {int flags = fcntl(sock_fd_, F_GETFL, 0);if (flags == -1) {fprintf(stderr, "fcntl GETFL failed: %s\n", strerror(errno));return;}fcntl(sock_fd_, F_SETFL, flags | O_NONBLOCK);}void addFd() {struct epoll_event event;event.data.ptr = this;event.events = EPOLLIN | EPOLLONESHOT | EPOLLOUT;if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, sock_fd_, &event) == -1) {fprintf(stderr, "epoll_ctl add failed: %s\n", strerror(errno));}}void removeFd() {epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, sock_fd_, nullptr);}inline void modifyEvent(bool in_event = true, bool out_event = true) {struct epoll_event event;event.data.ptr = this;event.events = EPOLLONESHOT;if (in_event) {event.events |= EPOLLIN;}if (out_event) {event.events |= EPOLLOUT;}epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, sock_fd_, &event);}private:int epoll_fd_;int sock_fd_;LockFreeBytesBuffer buffer_;
};

代码说明

  • 无锁环形缓冲区LockFreeBytesBuffer 类通过原子操作(std::atomic)来确保线程安全,避免了传统的锁机制。
    更多请见:C++生产者-消费者无锁缓冲区的简单实现
  • 事件驱动机制:通过 EPOLLINEPOLLOUT 事件来控制数据的接收和发送,避免了阻塞操作。
  • 非阻塞发送:通过 send 函数的 MSG_DONTWAIT 标志来确保发送操作不会阻塞,遇到 EAGAIN 错误时会重试。

在这里插入图片描述

3.3 测试程序

#include <iostream>
#include <memory>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <cassert>#define MAX_EVENTS 10int createServerSocket(int port) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {fprintf(stderr, "socket creation failed: %s\n", strerror(errno));return -1;}sockaddr_in server_addr;std::memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(port);if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {fprintf(stderr, "bind failed: %s\n", strerror(errno));close(sockfd);return -1;}if (listen(sockfd, 5) == -1) {fprintf(stderr, "listen failed: %s\n", strerror(errno));close(sockfd);return -1;}return sockfd;
}int main() {int epoll_fd = epoll_create1(0);if (epoll_fd == -1) {fprintf(stderr, "epoll_create1 failed: %s\n", strerror(errno));return -1;}int server_fd = createServerSocket(8080);if (server_fd == -1) {return -1;}// Set the server socket to non-blocking modefcntl(server_fd, F_SETFL, O_NONBLOCK);// Add server socket to epollstruct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = server_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {fprintf(stderr, "epoll_ctl: server_fd failed: %s\n", strerror(errno));return -1;}fprintf(stderr, "Server listening on port 8080...\n");while (true) {struct epoll_event events[MAX_EVENTS];int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < n; ++i) {if (events[i].data.fd == server_fd) {// Accept new client connectionint client_fd = accept(server_fd, NULL, NULL);if (client_fd == -1) {fprintf(stderr, "accept failed: %s\n", strerror(errno));continue;}fcntl(client_fd, F_SETFL, O_NONBLOCK);std::unique_ptr<SocketContext> client = std::make_unique<SocketContext>(epoll_fd, client_fd);ev.events = EPOLLIN | EPOLLOUT | EPOLLONESHOT;ev.data.ptr = client.get();if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {fprintf(stderr, "epoll_ctl: client_fd failed: %s\n", strerror(errno));}} else {SocketContext* client = static_cast<SocketContext*>(events[i].data.ptr);if (events[i].events & EPOLLIN) {client->doRecv();}if (events[i].events & EPOLLOUT) {client->doSend();}}}}close(server_fd);close(epoll_fd);return 0;
}

参考文档

  • tcp 解决short write问题
  • C++生产者-消费者无锁缓冲区的简单实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/470958.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

远离生成式AI大乱斗,SAS公司揭示亚太区千亿AI市场蓝图

生成式AI正在亚太区引发AI的新一轮风暴。根据市场调查公司IDC的一份最新调研&#xff0c;43%的亚太区企业将在未来12个月增加20%的AI投资&#xff0c;其中有40%的企业期待AI能够带来3倍投资回报。在亚太区&#xff0c;中国企业一马当先&#xff0c;不仅有27%的受访企业将AI用于…

Android Studio 将项目打包成apk文件

第一步&#xff1a;选择Build -> Generate Signed APK 会出现&#xff1a; 我们选择 Create new… 然后选择你要存放密钥的地方 点击ok之后&#xff0c;则选择好了文件&#xff0c;并生成了jks文件了。 点击ok之后&#xff0c; 会出现&#xff1a; 选择release&#xf…

【面试题】发起一次网络请求,当请求>=1s,立马中断

首先这是一个大厂的面试题&#xff0c;是我一个同事跟我说的&#xff0c;具体什么业务场景面试官没说&#xff0c;但我猜测可能是以下几种业务场景&#xff1a; 表单提交&#xff1a;在用户提交表单时&#xff0c;如果请求处理时间过长&#xff0c;可以中断请求并提示用户检查…

从0开始学习Linux——文件管理

往期目录&#xff1a; 从0开始学习Linux——简介&安装 从0开始学习Linux——搭建属于自己的Linux虚拟机 从0开始学习Linux——文本编辑器 从0开始学习Linux——Yum工具 从0开始学习Linux——远程连接工具 从0开始学习Linux——文件目录 从0开始学习Linux——网络配置 从0开…

MySQL系列之如何在Linux只安装客户端

导览 前言Q&#xff1a;如何安装一个Linux环境下的MySQL客户端一、准备文件1. 确认Server版本2. 选择Client安装文件 二、下载并安装1. 下载1.1 寻找文件1.2 文件说明 2. 安装2.1 上传至Linux服务器2.2 执行安装 三、连接验证1. 确认远程授权2. 建立远程连接 结语精彩回放 前言…

虚幻引擎 CEO 谈元宇宙:发展、策略与布局

在当今科技领域&#xff0c;元宇宙无疑是最热门的话题之一。Epic Games 首席执行官 Tim Sweeney 对元宇宙的未来发展充满信心&#xff0c;他认为开放元宇宙将融合娱乐、游戏和科技产业&#xff0c;带来一个光明的未来。本文将深入探讨采访中的关键内容&#xff0c;分析元宇宙的…

【R78/G15 开发板测评】串口打印 DHT11 温湿度传感器、DS18B20 温度传感器数据,LabVIEW 上位机绘制演化曲线

【R78/G15 开发板测评】串口打印 DHT11 温湿度传感器、DS18B20 温度传感器数据&#xff0c;LabVIEW 上位机绘制演化曲线 主要介绍了 R78/G15 开发板基于 Arduino IDE 环境串口打印温湿度传感器 DHT11 和温度传感器 DS18B20 传感器的数据&#xff0c;并通过LabVIEW上位机绘制演…

quartz

理论知识&#xff1a; 堆&#xff1a;堆是一颗安全二叉树&#xff0c;是一种特殊的树结构&#xff0c;它的每一个节点值都要比父节点要么大&#xff0c;要么小 小顶堆&#xff1a;最小的值放在最上面&#xff0c;每个子节点都比父节点大 大顶堆&#xff1a;最大的值放在最上…

提取神经网络数学表达式

来自《老饼讲解神经网络》 www..bbbdata.com 当我们在matlab训练好网络后&#xff0c;可以使用神经网络工具箱的sim(net,x)函数进行预测输出。但往往想提取出它的数学表达式&#xff0c;该怎么提取呢&#xff1f; 下面以《一个简单的神经网络例子》中的模型为例&#xff0c;提取…

Vue 的生命周期函数 和 Vuex

创建一个 Vue 实例 每个 Vue 应用都是通过用 Vue 函数创建一个新的 Vue 实例开始的&#xff1a; var vm new Vue({// 选项 })虽然没有完全遵循 MVVM 模型&#xff0c;但是 Vue 的设计也受到了它的启发。因此在文档中经常会使用 vm (ViewModel 的缩写) 这个变量名表示 Vue 实…

使用etl工具kettle的日常踩坑梳理之二、从Hadoop中导出数据

想操作MySQL等关系型数据库的可以访问我上一篇文章&#xff0c;本章主要介绍操作Hadoop等大数据组件。 根据2024年11月份测试了kettle工具在9.3及以上版本已经没有内置连接大数据(如Hadoop)组件了。 建议安装9.2及以下的&#xff0c;我这里送上8.3.0版本的请用百度网盘下载链…

新版 idea 编写 idea 插件时,启动出现 ClassNotFound

IntelliJ IDEA 2024.1.6 (Ultimate Edition) Build #IU-241.19072.14, built on August 8, 2024 Licensed to Sophia Tout Subscription is active until June 29, 2025. For educational use only. Runtime version: 17.0.111-b1207.30 amd64 Kotlin: 241.19072.14-IJ 新版本…

Java面向对象编程进阶之包装类

Java面向对象编程进阶之包装类 一、为什么要使用包装类二、掌握基本数据类型与包装类之间的转换1、为什么需要转换&#xff1f;2、如何转换&#xff1f; 三、String与基本数据类型、包装类之间的转换1、案例2、特别注意 一、为什么要使用包装类 为了使得基本类型的数据变量具备…

【mysql】使用宝塔面板在云服务器上安装MySQL数据库并实现远程连接

前言 使用宝塔Linux面板安装MySQL数据库并实现远程连接 使用宝塔面板安装mysql 宝塔面板&#xff0c;华为云开放3306端口 一些命令 // 命令行连接数据库 mysql -uroot -p // MySQL 5 版本 GRANT ALL ON *.* TO root% IDENTIFIED BY 替换成你的root密码 WITH GRANT OPTION; // …

性能测试|JMeter接口与性能测试项目

前言 在软件开发和运维过程中&#xff0c;接口性能测试是一项至关重要的工作。JMeter作为一款开源的Java应用&#xff0c;被广泛用于进行各种性能测试&#xff0c;包括接口性能测试。本文将详细介绍如何使用JMeter进行接口性能测试的过程和步骤。 JMeter是Apache组织开发的基…

OpenGL ES 共享上下文实现多线程渲染

OpenGL ES 共享上下文时,可以共享哪些资源? 共享上下文实现多线程渲染 EGL 概念回顾 EGL 是 OpenGL ES 和本地窗口系统(Native Window System)之间的通信接口,它的主要作用: 与设备的原生窗口系统通信; 查询绘图表面的可用类型和配置; 创建绘图表面; 在OpenGL ES 和…

应用于新能源汽车NCV4275CDT50RKG车规级LDO线性电压调节器芯片

关于车规级芯片&#xff08;Automotive Grade Chip&#xff09;&#xff0c;车规级芯片是专门用于汽车行业的芯片&#xff0c;具有高可靠性、高稳定性和低功耗等特点&#xff0c;以满足汽车电子系统的严格要求。这些芯片通常用于车载电子控制单元&#xff08;ECU&#xff09;和…

MQTT协议解析 : 物联网领域的最佳选择

1. MQTT协议概述 1.1 MQTT协议是什么 MQTT : Message Queuing Telemetry Transport 模式 : 发布 / 订阅主题优点 : 代码量小、低带宽、实时可靠应用 : 物联网、小型设备、移动应用MQTT 常用端口 : 1883 MQTT是一个网络协议&#xff0c;和HTTP类似&#xff0c;因为轻量简单&…

【OH】openHarmony开发环境搭建(基于windows子系统WSL)

前言 本文主要介绍基于windows子系统WSL搭建openHarmony开发环境。 WSL与Vmware虚拟机的区别&#xff0c;可以查看WSL与虚拟机的区别 更详细的安装配置过程可参考微软官网&#xff1a; ​安装 WSL 前提 以下基于windows 111专业版进行配置&#xff0c;windows 10应该也是可以…

豆瓣均分9:不容错过的9本大模型入门宝藏书籍,非常详细收藏我这一篇就够了

在这个大模型风起云涌的时代&#xff0c;技术的边界被不断拓宽&#xff0c;AI的力量正以前所未有的方式重塑我们的世界。如果你渴望站在技术的浪尖&#xff0c;深入了解增强现实&#xff08;AR&#xff09;、机器学习&#xff08;ML&#xff09;与强化学习&#xff08;RL&#…