基于准静态自适应环型缓存器(QSARC)的taskBus万兆吞吐实现

文章目录

    • 概要
    • 整体架构流程
    • 技术名词解释
    • 技术细节
      • 1. 数据结构
      • 2. 自适应计算队列大小
      • 3. 生产者拼接缓存
      • 4. 高效地通知消费者
    • 小结
      • 1. 性能表现情况
      • 2. 主要改进和局限
      • 3. 源码和发行版

概要

准静态自适应环形缓存器(Quasi-Static Adaptive Ring Cache)是taskBus用于数据吞吐的软件结构。

  • 准静态:缓存器的大小并不是静态分配,而是随着吞吐需求的提高,缓慢增长,并最终适应最高峰时的内存消耗。当缓存器已经达峰后,不再有堆内存分配。
  • 自适应:根据包大小的不同,缓存器在恒定的最大峰值内存容积下,根据统计获得包的大小数据,决定环状缓存队列元素的个数、每个元素的大小。
  • 环形: 队列收尾相接形成环状,生产者、消费者使用两个时钟前后相随,时钟本身采用atomic保护,无需额外的锁。

使用该缓存器,基于增强管道数据流转技术(EPDR)的业余软线无线电平台taskBus可在Linux 系统 i7 6700K 主频 4GHz下达到3GBps(24Gbps)的总交换能力。该交换能力被各个通道均分,共同支撑taskBus平台按照工程的连接关系,把各个生产者产出的数据包及时、完整、有序地输送给消费者。尽管采用了本技术,但与采用内核层面的zero-copy函数进行管道直接交换的性能极限还差了20倍。

整体架构流程

taskBus的整体架构是一种多进程的合作机制,平台管理N个子进程,各个进程可以充当消费者和生产者的角色。平台收集各个子进程的stdout输出,并按照用户给定的消费关系,送入消费者的stdout.在这样的整体架构下,数据流转流程如下:

stdout
stdin
stdin
stdin
stdout
生产者1
平台 QSARC
消费者1
消费者2
消费者2
生产者2

在202409版本之前,taskBus使用的是消费者队列。由1个生产者产出的数据,会复制N份进入各个消费者的队列。这种情况下,平台既要为生产者部位保留一个用于包拼接的缓存,又要做N次memcpy。在消费者显著多于生产者时,吞吐能力下降的很厉害。

202409版本之后,设计成每份包只有唯一的1个副本,存储在生产者队列。消费者根据索引去拉取:

  • 平台为每个生产者维护1个环状缓存队列。
  • 平台按照消费关系,把每个包的队列位置索引播发给各个消费者。
  • 播发时,会维护一个待消费计数器,将值加1
  • 消费者消费包后,会将计数器的值减1
  • 如果生产者队列绕了一圈,发现下一个位置的计数器不为0,说明消费的速度赶不上生产速度,此时平台不再接受生产者的数据,等待。

整个流程如下图所示(动图):

Queue
注意的是,上图中队列的大小是4个包,即使包消费完毕,可用的容积也不会释放,这样,随着程序的运行,渐渐地动态内存分配会越来越少,速度会越来越快。当然,如果包长是固定的,则不存在此问题,可以提前全部分配。

值得注意的是,在时钟11时,因为下一个位置依旧有消费者在驻留,因此生产被暂停等待。这种架构的缺点是整体的吞吐能力存在“木桶效应”,即由消费最慢的消费者决定1秒内能够流转多少包。

技术名词解释

  • 包:用于一次功能操作的数据单位,可以理解为1段连续内存的数据。包由包头、数据段组成。包头含有长度指示,正常的数据包之间是紧密衔接的。
  • 时钟:用于控制生产、消费的整形变量,从0开始无限增长。每处理1个包,时钟会加1。
  • 当前位置:生产者、消费者操作队列的当前位置,数值= 时钟 % 队列大小。

技术细节

1. 数据结构

生产者为taskNode类型,拥有如下成员变量维护自己的生产队列:

QVector<int> m_status_stdin;
QVector<QByteArray> m_array_stdin;
QVector<qsizetype> m_size_stdin;
QVector<QAtomicInteger<qsizetype> > m_cnt_stdin;

变量1 控制生产的状态。管道到来的数据,是一个无限有序无损流,类似TCP。但是,每次流可能会被切断,导致包头、数据块可能被切割到多次调用里。这个变量用于在各次调用中记忆上次的生产状态。

变量2 就是缓存的内存本身。这个列表的QByteArray元素个数是缓存的包个数,每个QByteArray会保持在历史最大包的高位。这样,只有到来的包大于当前体积时,才会重新分配内存。

变量3 存储缓存内当前的写入位置。如果包已经写完了,则等于包长。如果是空闲,则为0。

变量4 就是待消费计数器,每次生产完毕1个包,会根据消费者等待队列广播包的索引(时钟),并把计数设置为消费者个数。消费者消费完,会减一。

2. 自适应计算队列大小

这种队列存在1个重大的问题,就是很难控制内存的用量。由于各个QByteArray都只增加不释放,因此期望的用量是:

C = M ∗ N C = M * N C=MN

M是最大包大小,N是队列包个数。

由于taskBus设计时,建议包的大小小于64KB,在假设包的种类小于K时,可以预先统计前K个包的最大长度M,从而确定按照最大内存门限,如C=128MB,要设置的N。

N = C / M N = C/M N=C/M

当然,这种算法是极为简陋的。这是建立在taskBus的具体应用场景上的。对包变化幅度很大、无法简单统计的情况,要考虑一定的shink策略,如在每次消费指针归0时,裁剪队列的大小,并释放尾部的内存。

//Adjust buf sizeif (m_pos_stdin==8){qsizetype sza = 0;for (int ia = 0;ia<8;++ia)if (sza < m_size_stdin[ia])sza = m_size_stdin[ia];if (sza < 32)sza = 32;m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;if (m_bufsize_adjust > m_bufsize_stdin)m_bufsize_adjust = m_bufsize_stdin;if (m_bufsize_adjust < 8)m_bufsize_adjust = 8;emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());}

3. 生产者拼接缓存

在当前生产位置上,平台为生产者拼接一个完整的包。这里用到了状态机。状态机拥有如下状态:

状态名取值意义
头部捕获中0正在捕获头部
数据缓存中1正在根据头部指示的状态,缓存数据
数据缓存完毕2包接收完整,触发消费。
数据缓存完毕3触发消费完毕,待回收。

在每个状态上,都会进行进程管道读操作,不断的从stdout获取数据。直到状态2,会触发消费,并在全部消费通知播发后,进入状态3。当下一次生产者访问状态3的队列成员时,如果没有消费者还在消费这个包,则会把包位置归零,状态归零。否则,会阻塞生产者,直到消费者消费完毕为止。

void taskNode::slot_readyReadStandardOutput()
{LOG_PROFILE("IO","Start Recieving packs.");qsizetype total_sz = m_process->size();int badHeader = 0;while (total_sz){const qsizetype pos = m_pos_stdin % m_bufsize_adjust;QByteArray & curr_array = m_array_stdin[pos];qsizetype & readBufMax = m_size_stdin[pos];QAtomicInteger<qsizetype> & cnt = m_cnt_stdin[pos];int & stat = m_status_stdin[pos];//生产者被阻塞了,因为下一个缓存位置依旧有消费者在消费数据。if (cnt>0)break;//Old dataif (stat==3){readBufMax = 0;stat = 0;}auto * header =	reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());//Headerif (stat==0){if (total_sz<sizeof(TASKBUS::subject_package_header))break;auto needRead = sizeof(TASKBUS::subject_package_header) - readBufMax ;auto red = m_process->read(curr_array.data()+readBufMax,needRead);readBufMax += red;if (readBufMax == sizeof(TASKBUS::subject_package_header)){if (header->prefix[0]==0x3C && header->prefix[1]==0x5A &&	header->prefix[2]==0x7E && header->prefix[3]==0x69){stat = 1;}else{++badHeader;readBufMax = 0;}}Q_ASSERT(readBufMax <= sizeof(TASKBUS::subject_package_header));}//dataif (stat==1){const qsizetype packAllSize = sizeof(TASKBUS::subject_package_header)+header->data_length;if (curr_array.size()<packAllSize){curr_array.resize(packAllSize);header = reinterpret_cast<const TASKBUS::subject_package_header *>  (curr_array.data());}auto needRead = packAllSize - readBufMax ;auto red = m_process->read(curr_array.data()+readBufMax,needRead);readBufMax += red;if (readBufMax==packAllSize){stat = 2;}Q_ASSERT(readBufMax <= packAllSize);}//Sendif (stat==2){const qsizetype pack_size = sizeof(TASKBUS::subject_package_header)+header->data_length;extern QAtomicInteger<quint64>  g_totalrev;g_totalrev += readBufMax;++m_spackage_sent;m_sbytes_sent += sizeof(TASKBUS::subject_package_header)+header->data_length;if (header->subject_id == TB_SUBJECT_CMD){//Command must endwith \0const char * pCmd = (const char *)header+sizeof(TASKBUS::subject_package_header);QString cmd = QString::fromUtf8(pCmd,header->data_length);QMap<QString, QVariant> map_z = taskCell::string_to_map(cmd);//remember uuidif (map_z.contains("source")){if(m_uuid.size()==0 )m_uuid = map_z["source"].toString();if (map_z.contains("destin"))emit sig_new_command(map_z);}}else if (m_currPrj)m_currPrj->routing_new_package(this,pos);if (m_bDebug)log_package(true,(char *)header,pack_size);stat = 3;++m_pos_stdin;//Adjust buf sizeif (m_pos_stdin==8){qsizetype sza = 0;for (int ia = 0;ia<8;++ia)if (sza < m_size_stdin[ia])sza = m_size_stdin[ia];if (sza < 32)sza = 32;m_bufsize_adjust = taskCell::default_ringcache * 1024 * 1024 / sza;if (m_bufsize_adjust > m_bufsize_stdin)m_bufsize_adjust = m_bufsize_stdin;if (m_bufsize_adjust < 8)m_bufsize_adjust = 8;emit_message(QString("Adjusted buffer ring size : %1 MB / %2 Bytes = %3 frames.").arg(taskCell::default_ringcache).arg(sza).arg(m_bufsize_adjust).toUtf8());}}total_sz = m_process->size();}if (badHeader)emit_message(QByteArray("Error header recieved. ""Header must be 0x3C, 0x5A, 0x7E,"" 0x69. Aborting."));LOG_PROFILE("IO","End Recieving packs.");
}

4. 高效地通知消费者

如果每个包很小,则QEvent触发的密度会很大,开销很大。我们设置一个消费者的索引队列,存储待消费的生产者队列、索引:

	QMutex m_mtx_queue;QList<taskNode *> m_write_queue;QList<qsizetype> m_write_pos;

而后,只在队列为0时,触发Event。

bool taskNode::enqueue_write(taskNode * node, qsizetype pos)
{m_mtx_queue.lock();int z = m_write_queue.size() + m_write_cmd.size();m_write_queue.push_back( node);m_write_pos.push_back(pos);m_mtx_queue.unlock();if (!z){QCoreApplication::postEvent(this,new QEvent(m_nPackEvent));}return  true;
}

同时,在消费时,一次性获取队列,并清空。这样减少锁的碰撞。

void taskNode::flush_write()
{m_mtx_queue.lock();QList<taskNode *> write_queue = m_write_queue;QList<qsizetype> write_pos = m_write_pos;QByteArrayList write_cmd = m_write_cmd;m_write_queue.clear();m_write_pos.clear();m_write_cmd.clear();//qDebug()<<write_queue.size();m_mtx_queue.unlock();while (write_queue.size()){taskNode * node = write_queue.first();qsizetype pos = write_pos.first();write_queue.pop_front();write_pos.pop_front();QByteArray & arr = node->get_stdin_array(pos);m_process->write(arr.constData(),sz);--cnt;}

小结

尽管采用了环形队列,由于在QProcess上还是存在mem-alloc,这使得峰值状态下CPU占用还是很大的。整体速率距离PCI总线和DDR4的能力还相去甚远,即使和bash直接管道连接相比,也有不小的差距。不过,为了构造灵活的管道吞吐能力,允许数据被多对多流转和反馈回环,损失一些性能也差强人意。

1. 性能表现情况

通过上述操作,taskBus的吞吐能力得到了保证,在只使用用户态的内存操作情况下,缓存64MB时,可以获得10Gbps以上的性能。在Linux下,可达 24Gbps,即3GBps1

平台系统峰值吞吐单路流量平均来回延迟
i7-10700U1Linux x643354MBps1340MBps1ms
i7-6700KLinux x642844MBps1050MBps2.2ms
i7-10700U2win10 home x641561MBps400MBps22ms
i7-6700Kwin10 home x641345MBps340MBps40ms
RaspberryPi 4(8GB)Rasbain 64263MBps102MBps6ms

上表是运行双进程点对点双向PING的状态下达到的。多进程下,总速率会稍高。可以发现,同样的硬件配置下:

  • windows下taskBus的吞吐能力要比Linux少1倍
  • windows下taskBus的带宽利用率要低于Linux,总速率/单路速率Linux更优。
  • windows下的延迟更大。

这是非常奇怪的现象,讲起来windows应该更快才对。对于里面的细节原因,只有后面慢慢研究了。

2. 主要改进和局限

与2024年8月版本相比,少了一层生产者–>消费者的memcpy,转而只是传递int类型的索引,在生产者:消费者个数=1:N的情况下,吞吐能力会得到较大提高。这种memcpy次数的降低,对于老旧CPU影响更大,即使在2进程互PING(1:1)的测试中,也能达到 20-30%的提速。

同时要注意到,3GBps已经很接近用户态内存的吞吐极限。若要追求极为苛刻的传输,需要按照文章开始的链接里的vmsplice zero-copy来定制,取得额外10-20倍的性能提升。

3. 源码和发行版

源码和发行版参考

GitCode.net

或者

GitCode.com


  1. i7-10700U是一个笔记本上的低功耗cpu,在最大睿频4GHz下的Manjaro Linux系统上,3GBps持续了5秒。由于温度上升,温度墙导致频率下降到1.8GHz,速度降低1倍。 ↩︎ ↩︎

  2. i7-10700U是一个笔记本上的低功耗cpu,通过在windows-10下去除温度墙,可以在97摄氏度的高温状态下,保持在4GHz,维持1.5GBps的流量。 ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/421733.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【R语言】删除数据框中所有行中没有大于200的数值的行

在Perl中还需要循环按行读入文件&#xff0c;而在R中&#xff0c;一行代码解决问题&#xff1a; df <- df[apply(df, 1, function(x) any(x > 200)), ]这是一个使用apply函数对数据框df进行操作的表达式。apply函数用于对数据框、矩阵或数组进行元素级别的操作。 df&am…

LINQ 和 LINQ扩展方法 (1)

LINQ函数概念&#xff1a; LINQ&#xff08;Language Integrated Query&#xff09;是一种C#语言中的查询技术&#xff0c;它允许我们在代码中使用类似SQL的查询语句来操作各种数据源。这些数据源可以是集合、数组、数据库、XML文档等等。LINQ提供了一种统一的编程模型&#x…

C++ Qt开发:运用QJSON模块解析数据

转载C Qt开发&#xff1a;运用QJSON模块解析数据-阿里云开发者社区 Qt 是一个跨平台C图形界面开发库&#xff0c;利用Qt可以快速开发跨平台窗体应用程序&#xff0c;在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置&#xff0c;实现图形化开发极大的方便了开发效率&…

OpenHarmony鸿蒙( Beta5.0)智能门铃开发实践

鸿蒙开发往期必看&#xff1a; 一分钟了解”纯血版&#xff01;鸿蒙HarmonyOS Next应用开发&#xff01; “非常详细的” 鸿蒙HarmonyOS Next应用开发学习路线&#xff01;&#xff08;从零基础入门到精通&#xff09; “一杯冰美式的时间” 了解鸿蒙HarmonyOS Next应用开发路…

[Web安全 网络安全]-文件包含漏洞

文章目录&#xff1a; 一&#xff1a;前言 1.什么是文件包含漏洞 2.文件包含漏洞的成因 3.文件包含漏洞的分类 4.文件包含漏洞的防御策略 5.文件包含函数&#xff08;触发点Sink&#xff09; 6.环境 6.1 靶场 6.2 其他工具 二&#xff1a;文件包含LFI labs靶场实验…

GD32E230 RTC报警中断功能使用

GD32E230 RTC报警中断使用 GD32E230 RTC时钟源有3个&#xff0c;一个是内部RC振动器产生的40KHz作为时钟源&#xff0c;或者是有外部32768Hz晶振.,或者外部高速时钟晶振分频作为时钟源。 &#x1f516;个人认为最难理解难点的就是有关RTC时钟异步预分频和同步预分频的计算。在对…

图神经网络介绍3

1. 图同构网络&#xff1a;Weisfeiler-Lehman 测试与图神经网络的表达力 本节介绍一个关于图神经网络表达力的经典工作&#xff0c;以及随之产生的另一个重要的模型——图同构网络。图同构问题指的是验证两个图在拓扑结构上是否相同。Weisfeiler-Lehman 测试是一种有效的检验两…

python-数字反转

题目描述 给定一个整数 N&#xff0c;请将该数各个位上数字反转得到一个新数。新数也应满足整数的常见形式&#xff0c;即除非给定的原数为零&#xff0c;否则反转后得到的新数的最高位数字不应为零&#xff08;参见样例 2&#xff09;。 输入格式 一个整数 N。 输出格式 …

FAT32文件系统详细分析 (格式化SD nandSD卡)

FAT32 文件系统详细分析 (格式化 SD nand/SD 卡) 目录 FAT32 文件系统详细分析 (格式化 SD nand/SD 卡)1. 前言2.格式化 SD nand/SD 卡3.FAT32 文件系统分析3.1 保留区分析3.1.1 BPB(BIOS Parameter Block) 及 BS 区分析3.1.2 FSInfo 结构扇区分析3.1.3 引导扇区剩余扇区3.1.4 …

【Go】Go语言中的基本数据类型与类型转换

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

IDEA创建MAVEN项目

这里介绍如何使用IDEA创建MEAN工程&#xff0c;这里以创建模块举例&#xff0c;创建项目步骤相当&#xff1b; 创建项目 File-项目-new-module 这里选择普通java项目&#xff0c;Archetype选quickstart 项目介绍 create后可以看到创建的demo1 及其目录结构 里面默认的App里…

C++设计模式——Mediator中介者模式

一&#xff0c;中介者模式的定义 中介者模式是一种行为型设计模式。它通过一个中介者对象将多个对象之间的交互关系进行封装&#xff0c;使得对象之间的交互需要通过中介者对象来完成。该设计模式的结构很容易理解&#xff0c;以中介者为中心。 中介者模式的设计思想侧重于在…

【爬虫软件】小红书按关键词批量采集笔记,含笔记正文、转评赞藏等!

一、背景介绍 1.1 爬取目标 熟悉我的小伙伴都了解&#xff0c;我之前开发过2款软件&#xff1a; 【GUI软件】小红书搜索结果批量采集&#xff0c;支持多个关键词同时抓取&#xff01; 【GUI软件】小红书详情数据批量采集&#xff0c;含笔记内容、转评赞藏等&#xff01; 现在…

【Linux】进程调度与切换

【Linux】进程调度与切换 1. 基本概念2. 进程切换3. 进程调度3.1运行队列实现优先级设计3.2 处理效率问题3.3 活动队列与过期队列3.4 如何解决饥饿问题3.5 active指针和expired指针 1. 基本概念 竞争性: 系统进程数目众多&#xff0c;而CPU资源只有少量&#xff0c;甚至1个&am…

Linux——高流量 高并发(访问场景) 高可用(架构要求)

高并发通用设计逻辑&#xff1a; 定位单点&#xff0c;拆分问题 架构调整的顺序&#xff1a; 动静分离 // 没有实现动静分离 // 静态请求 交给 nginx或者 httpd 这种对于静态资源处理效率更高的服务&#xff0c;动态请求 交给php-fpm 服务来处理 使用云服务提供商 &#xff…

数据库(DB、DBMS、SQL)

今天我来讲解一下数据库和可视化数据库管理系统的使用 数据库概述 数据库 存储数据的仓库&#xff0c;数据是有组织的存储 DataBase (DB) 数据库管理系统 操纵和管理数据库的大型软件 DataBaseMangement System (DBMS) SQL 操作关系型数据库的编程语言&#xff0c;定义…

探索最佳 Shell 工具:全面测评 Bash、Zsh、Fish、Tcsh 和 Ksh

感谢浪浪云支持发布 浪浪云活动链接 &#xff1a;https://langlangy.cn/?i8afa52 文章目录 1. 简介2. 测评工具3. 测评标准4. Bash 测评4.1 易用性4.2 功能特性4.3 性能4.4 可定制性4.5 社区和支持 5. Zsh 测评5.1 易用性5.2 功能特性5.3 性能5.4 可定制性5.5 社区和支持 6. F…

Java、python、php三个版本 抗震救灾物资管理系统 抗洪救灾物资分配系统 救援物资申请平台(源码、调试、LW、开题、PPT)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人 八年开发经验&#xff0c;擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等&#xff0c;大家有这一块的问题可以一起交流&…

解锁SAP数据的潜力:SNP Glue与SAP Datasphere的协同作用

在各种文章中&#xff0c;我们研究了客户如何利用SNP Glue与基于云的数据仓库和数据湖相结合&#xff0c;以充分利用其SAP数据。SNP Glue 通过高性能集成解决方案帮助客户解锁 SAP 数据孤岛。例如&#xff0c;可以使用SNP Glue先进的增量捕获&#xff08;CDC&#xff09;近乎实…

DIC技术助力新能源汽车主机厂力学测试研发与整车性能提升

在新能源汽车研发过程中&#xff0c;非接触式全视场应变DIC测量方案&#xff0c;越来越受到汽车主机厂的信赖与认可。传统接触式传感器&#xff0c;在精度、灵活性和数据处理能力上存在局限。DIC技术可提供精确、高效、全视场、便捷的非接触式测量解决方案。 在汽车研发阶段&a…