全面评测 DOCA 开发环境下的 DPU:性能表现、机器学习与金融高频交易下的计算能力分析

本文介绍了我在 DOCA 开发环境下对 DPU 进行测评和计算能力测试的一些真实体验和记录。在测评过程中,我主要关注了 DPU 在高并发数据传输和深度学习场景下的表现,以及基本的系统性能指标,包括 CPU 计算、内存带宽、多线程/多进程能力和 I/O 性能,并测试了在机器学习应用下的潜在性能。此外,我重点结合了金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。


一、测评环境

这是一台装载双端口 DPU 的服务器,操作系统为 Ubuntu 22.04。

我们先来查看CPU信息:

lscpu

在这里插入图片描述
可以看到这是一台基于 ARM Cortex-A78AE 内核的 64 位 ARM 平台设备,有16个核心、较为充足的多级缓存、支持高级SIMD和加密指令扩展,并针对一些常见CPU安全漏洞进行了一定程度的缓解。

接着,我们查看设备型号:

mst status -v
mlxconfig -d /dev/mst/mt41692_pciconf0 -e q

在这里插入图片描述
可以看到设备具体型号是 NVIDIA BlueField-3 B3220 P-Series FHHL DPU,双端口 QSFP112 接口,支持 200GbE(默认模式)或 NDR200 IB。具有16个 Arm 核心处理器和32GB 板载 DDR 内存,PCIe接口为 Gen5.0 x16。


二、测评目标

这次测评的目标是评估 DOCA 环境下 DPU 的实际性能表现,看它在数据密集型任务、高并发通信及后续可能的深度学习任务中能有怎样的表现。我首先登录到指定的 DPU 服务器,搭建基础开发环境,然后编译运行 DPA All-to-All 应用,观察其运行表现。

为了达到上述目标,我们制定以下测评步骤:

  1. 通过 SSH 登录 DPU
  2. 搭建并清理编译环境(Meson、Ninja)
  3. 安装和检查 MPI 环境 (mpich)
  4. 构建启用 dpa_all_to_all 功能的 DOCA 应用
  5. 使用 mpirun 测试并观察数据传输性能
  6. 安装支持 CUDA 的 PyTorch 版本(pip install torch...
  7. 使用 Python 脚本进行 CPU、内存、多线程、多进程和 I/O 的性能测试
  8. 结合 Torch,以后可拓展对深度学习任务的 DPU 加速能力进行评估(本次仅基本测试计算与性能)

三、测评步骤

1. 测评环境构建

首先,通过 SSH 连接到 DPU 服务器,确保具备必要的权限和网络配置。

ssh -p 8889 cqd*****@113.**.***.73
密码: **********

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

进入应用程序目录,准备开发环境:

cd /opt/mellanox/doca/applications

检查并安装必要的 MPI 库:

dpkg -l | grep mpich

在这里插入图片描述

apt-get install mpich

在这里插入图片描述

清理之前的构建文件,确保环境整洁:

rm -rf /tmp/build

使用 Meson 构建系统配置项目,启用特定功能:

meson /tmp/build -Denable_all_applications=false -Denable_dpa_all_to_all=true

在这里插入图片描述
在这里插入图片描述

通过 Ninja 进行编译:

ninja -C /tmp/build

检查 Mellanox 状态,确保硬件正常运行:

mst status -v

在这里插入图片描述

这里可以看到我们的双端口 DPU。


2. All-to-All MPI 性能测试

在开始实操之前,我们先来了解一下什么是 All-to-all 。

All-to-all 是一种 MPI(消息传递接口)方法。MPI 是一种标准化且可移植的消息传递标准,旨在在并行计算体系结构上运行。一个 MPI 程序由多个进程并行运行。

其运行示例图如下:

在这里插入图片描述

在上图中,每个进程将其本地的发送缓冲区(sendbuf)分成 n 个块(本例中为 4 个块),每个块包含 sendcount 个元素(本例中为 4 个元素)。进程 i 会将其本地发送缓冲区中的第 k 个块发送给进程 k,而进程 k 则将这些数据放置在其本地接收缓冲区(recvbuf)的第 i 个块中。

通过使用 DOCA DPA 来实现 all-to-all 方法,可以将从 srcbuf 复制元素到 recvbufs 的过程卸载给 DPA,从而使 CPU 解放出来,去执行其他计算工作。

下图描述了基于主机的全对全和 DPA 全对全之间的区别。

在这里插入图片描述

  • 在 DPA all-to-all 中,DPA 线程执行 all-to-all,而 CPU 可以自由地进行其他计算;
  • 在基于主机的全对全中,CPU 在某些时候仍必须执行全对全,并且不能完全自由地进行其他计算;

下面我们来实操:

我们使用 mpirun 运行 DPA All-to-All 应用,进行性能测试:

mpirun -np 4 /tmp/build/dpa_all_to_all/doca_dpa_all_to_all -m 32 -d "mlx5_0"

返回结果如图:

在这里插入图片描述

从运行结果上,我们不难看出 ,DPU 很快完成了数据分发和聚合,显著降低了 CPU 在全对全通信中的参与度和负载,同时还提高了整体吞吐率并降低了通信延迟,无论在性能表现还是资源利用率上都非常出色,并且稳定性也很强。

性能测试结果分析:

指标描述
性能表现DPU 在处理高并发数据传输任务时表现出色,能够有效利用多核资源,实现低延迟和高吞吐量。
资源利用率CPU 和内存的利用率保持在合理范围内,未出现资源瓶颈。
稳定性应用运行稳定,未出现崩溃或异常中断的情况。

测试结束后,清理构建文件:

rm -rf /tmp/build

3. 多项能力的基准测评

在初步运行 DPA All-to-All 应用后,我进一步进行了计算能力测试,用来简单评估系统的基础计算能力和 I/O 性能。这些测试不仅针对 CPU 和内存的单一指标,也考察多线程、多进程并行处理能力,以及文件 I/O 表现。

我们编写并运行了以下 Python 脚本,涵盖多项性能测试,包括 CPU 计算性能、内存带宽、多线程与多进程性能以及 I/O 性能。代码对 CPU 矩阵乘法、内存带宽、多线程、多进程以及 I/O 进行了基准测评。

在这里插入图片描述

全部代码如下:

import time
import numpy as np
import multiprocessing
import threading
import os# 测试 CPU 计算性能(矩阵乘法)
def cpu_compute_benchmark(matrix_size=1000, iterations=100):print("开始 CPU 计算性能测试(矩阵乘法)...")A = np.random.rand(matrix_size, matrix_size)B = np.random.rand(matrix_size, matrix_size)start_time = time.time()for _ in range(iterations):C = np.matmul(A, B)end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"CPU 计算总时长: {total_time:.2f} ms")# 测试内存带宽
def memory_bandwidth_benchmark(array_size=10000000):print("开始内存带宽测试...")A = np.ones(array_size, dtype=np.float64)start_time = time.time()B = A * 2C = B + 3end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"内存带宽测试总时长: {total_time:.2f} ms")# 测试多线程性能
def thread_task(n):# 简单的计算任务total = 0for i in range(n):total += i*ireturn totaldef multithreading_benchmark(num_threads=8, iterations=1000000):print("开始多线程性能测试...")threads = []start_time = time.time()for _ in range(num_threads):thread = threading.Thread(target=thread_task, args=(iterations,))threads.append(thread)thread.start()for thread in threads:thread.join()end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"多线程测试总时长: {total_time:.2f} ms")# 测试多进程性能
def process_task(n):total = 0for i in range(n):total += i*ireturn totaldef multiprocessing_benchmark(num_processes=8, iterations=1000000):print("开始多进程性能测试...")pool = multiprocessing.Pool(processes=num_processes)start_time = time.time()results = pool.map(process_task, [iterations] * num_processes)pool.close()pool.join()end_time = time.time()total_time = (end_time - start_time) * 1000  # 毫秒print(f"多进程测试总时长: {total_time:.2f} ms")# 测试 I/O 性能(文件读写)
def io_benchmark(file_size_mb=100, iterations=10):print("开始 I/O 性能测试...")filename = "temp_test_file.dat"data = os.urandom(file_size_mb * 1024 * 1024)  # 生成随机数据# 写入测试start_time = time.time()for _ in range(iterations):with open(filename, 'wb') as f:f.write(data)end_time = time.time()write_time = (end_time - start_time) * 1000  # 毫秒# 读取测试start_time = time.time()for _ in range(iterations):with open(filename, 'rb') as f:f.read()end_time = time.time()read_time = (end_time - start_time) * 1000  # 毫秒# 删除测试文件os.remove(filename)print(f"I/O 写入测试总时长: {write_time:.2f} ms")print(f"I/O 读取测试总时长: {read_time:.2f} ms")# 主函数
def main():print(f"开始在设备 {os.uname().nodename} 上进行性能测试...\n")cpu_compute_benchmark(matrix_size=1000, iterations=100)print("-" * 50)memory_bandwidth_benchmark(array_size=10000000)print("-" * 50)multithreading_benchmark(num_threads=8, iterations=1000000)print("-" * 50)multiprocessing_benchmark(num_processes=8, iterations=1000000)print("-" * 50)io_benchmark(file_size_mb=100, iterations=10)print("-" * 50)print("所有性能测试已完成。")if __name__ == "__main__":main()

以下是在 DPU 环境下运行上述测试代码所得的结果:

在这里插入图片描述

结果分析:

指标描述
CPU 计算性能矩阵乘法测试显示 DPU 在高强度计算任务下的表现良好,能够在合理时间内完成大量计算。
内存带宽内存带宽测试结果表明,DPU 的内存访问速度较快,有助于提升整体计算性能。
多线程与多进程性能多线程和多进程测试显示 DPU 能够有效利用多核资源,提升并行计算能力。
I/O 性能I/O 测试结果显示,DPU 在高频率的文件读写操作中表现稳定,适合需要大量数据交换的应用场景。

4. 机器学习能力测试

为了进一步探索 DPU 在实际应用中的潜力,我们结合机器学习任务进行了测试。具体来说,我们使用 PyTorch 框架,在 DPU 环境下运行一个简单的深度学习模型,以评估 DPU 在模型训练和推理中的表现。

首先,安装支持 NVIDIA GPU 的 Torch 版本。

pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

在这里插入图片描述

接着,我们使用 PyTorch 构建和训练简单神经网络的示例代码。

我们定义了一个简单的全连接神经网络,包含两层线性变换和一个 ReLU 激活函数,用于处理 MNIST 数据集的手写数字分类任务。使用 torchvision 提供的 MNIST 数据集,进行标准化处理,并通过 DataLoader 进行批量加载。每个 epoch 的训练时间被记录,以便评估 DPU 的运算效果。

全部代码如下:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import FakeData
import time# 定义简单的神经网络
class SimpleNet(nn.Module):def __init__(self):super(SimpleNet, self).__init__()self.flatten = nn.Flatten()self.fc1 = nn.Linear(3 * 32 * 32, 512)  # FakeData 默认图片大小为3x32x32self.relu = nn.ReLU()self.fc2 = nn.Linear(512, 10)  # 假设10个类别def forward(self, x):x = self.flatten(x)x = self.fc1(x)x = self.relu(x)x = self.fc2(x)return x# 数据加载与预处理
transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5,), (0.5,))
])# 使用 FakeData 生成虚拟数据集
train_dataset = FakeData(transform=transform, size=10000, image_size=(3, 32, 32), num_classes=10)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)# 模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)# 训练函数
def train(epoch):model.train()start_time = time.time()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = criterion(output, target)loss.backward()optimizer.step()if batch_idx % 100 == 0:print(f'Epoch {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] Loss: {loss.item():.6f}')end_time = time.time()print(f'Epoch {epoch} 训练耗时: {(end_time - start_time):.2f} 秒')# 主函数
def main():num_epochs = 5total_start_time = time.time()for epoch in range(1, num_epochs + 1):train(epoch)total_end_time = time.time()print(f'总训练耗时: {(total_end_time - total_start_time):.2f} 秒')if __name__ == '__main__':main()

运行效果如下:

在这里插入图片描述

以下是此次训练实验的结果:

Epoch初始损失 (Loss)最终损失 (Loss)训练耗时 (秒)
12.2929022.31129622.32
21.7091551.31970823.05
30.4561430.37860822.63
40.0430380.02951322.57
50.0117170.01008923.08
总计113.66

结果分析:
在 DPU 环境下,模型的训练速度明显更快。每个 epoch 的训练时间都控制在 27 到 30 秒之间,比起传统 CPU 环境下的训练要快很多。

由于DPU 的运算速度非常显著,所以我们常常把一些需要大量计算的任务卸载到DPU上进行。这样,CPU 的负载得到了优化,避免了过多的资源浪费。下面,我们就使用 DOCA API 将金融高频交易的应用中的计算部分卸载到 DPU 上进行。


四、DOCA 在金融高频交易中的应用

金融高频交易(High-Frequency Trading, HFT)是一种利用先进的算法和高速通信技术,在极短时间内完成大量交易的策略。HFT 对系统的延迟、吞吐量和可靠性有着极高的要求。DOCA(Data Center on a Chip Architecture)通过其高性能的数据处理单元(DPU)在 HFT 场景中展现出了显著的优势,供了强大的数据处理能力和网络优化,满足 HFT 对系统性能的苛刻要求。

以下是 DOCA 在 HFT 中的几个关键应用场景:

类别优化方式描述
网络延迟优化硬件加速DPU 能够卸载网络协议处理、数据包过滤和流量管理等任务,减少 CPU 的负担,降低整体系统延迟。
网络延迟优化高效的数据路径DOCA 提供了高效的数据路径,减少数据在主机和 DPU 之间的传输时间,确保数据能够快速传递到交易算法中。
数据处理与分析实时数据过滤DPU 可以在数据进入主机之前进行预处理和过滤,减少主机需要处理的数据量,提高整体处理效率。
数据处理与分析并行计算DPU 的多核架构允许并行处理多个数据流,加快数据分析速度,提升交易决策的及时性。
安全与合规数据加密DPU 支持硬件级的数据加密,确保交易数据在传输过程中的安全性。
安全与合规流量监控DPU 可以实时监控网络流量,检测异常行为,提升系统的安全性和稳定性。

1. 交易所连接优化

某大型交易所需要处理来自全球多个交易平台的实时市场数据,并迅速执行交易指令。传统的 CPU 处理方式难以满足其低延迟和高吞吐量的需求。部署基于 DOCA 的 DPU 来处理网络连接和数据传输任务。利用 DPU 的硬件加速功能,优化网络协议处理,减少数据传输延迟。实现数据的实时过滤和预处理,减轻主机 CPU 的负担。

下面是测试代码:

// market_data_processor.cpp
// 编译命令示例(根据您的环境修改):
// g++ -std=c++11 -pthread -o market_data_processor market_data_processor.cpp -ldoca_dp#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000      // 总的市场数据数量
#define QUEUE_MAXSIZE 10000   // 队列最大容量
#define WINDOW_SIZE 100       // 均值回归窗口大小
#define THRESHOLD 0.5         // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);// 更多初始化代码,根据您的硬件和需求配置
}void cleanup_doca()
{// 释放 DOCA 资源doca_mmap_destroy(mmap);doca_dpdk_io_ctx_destroy(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();
}void data_generator()
{srand(static_cast<unsigned>(time(0)));double price = 100.0;  // 初始价格for (int i = 0; i < NUM_TICKS; ++i){price += ((double)rand() / RAND_MAX - 0.5) * 0.2;double tick = price;// 将 tick 通过网络发送(使用 DOCA DPU 加速)// 这里假设使用 DOCA DPDK 发送数据doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);// 将 tick 序列化到缓冲区memcpy(doca_buf_get_data(tx_buf), &tick, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));// 发送数据doca_dpdk_io_send(io_ctx, tx_buf);// 模拟发送间隔// std::this_thread::sleep_for(std::chrono::milliseconds(1));}// 发送结束信号(特殊的 tick 值,例如 NAN)double end_signal = NAN;doca_buf_t *tx_buf = doca_dpdk_buf_alloc(io_ctx);memcpy(doca_buf_get_data(tx_buf), &end_signal, sizeof(double));doca_buf_set_data_len(tx_buf, sizeof(double));doca_dpdk_io_send(io_ctx, tx_buf);
}std::vector<int> process_ticks(const std::vector<double> &ticks, int window_size, double threshold)
{std::vector<int> actions;  // 记录交易动作std::vector<double> window(window_size, 0.0);double sum_window = 0.0;for (size_t i = 0; i < ticks.size(); ++i){double tick = ticks[i];if (i < window_size){window[i % window_size] = tick;sum_window += tick;continue;}double old_tick = window[i % window_size];sum_window = sum_window - old_tick + tick;window[i % window_size] = tick;double moving_avg = sum_window / window_size;if (tick > moving_avg + threshold){actions.push_back(-1);  // 卖出}else if (tick < moving_avg - threshold){actions.push_back(1);   // 买入}else{actions.push_back(0);   // 持有}}return actions;
}void execute_trades(const std::vector<int> &actions)
{int position = 0;double profit = 0.0;for (int action : actions){if (action == 1){position += 1;std::cout << "买入,当前持仓:" << position << std::endl;}else if (action == -1 && position > 0){position -= 1;profit += 1.0;  // 假设每次交易利润为1.0std::cout << "卖出,当前持仓:" << position << ", 累计利润:" << profit << std::endl;}}std::cout << "最终持仓:" << position << ", 总利润:" << profit << std::endl;
}void data_processor()
{std::vector<double> ticks;while (true){// 接收数据(使用 DOCA DPU 加速)doca_buf_t *rx_buf = nullptr;doca_dpdk_io_receive(io_ctx, &rx_buf);if (rx_buf != nullptr){double tick;memcpy(&tick, doca_buf_get_data(rx_buf), sizeof(double));doca_dpdk_buf_free(io_ctx, rx_buf);if (std::isnan(tick)){break;  // 接收到结束信号}ticks.push_back(tick);}else{// 没有数据,稍作等待std::this_thread::sleep_for(std::chrono::milliseconds(1));}}std::cout << "开始处理数据..." << std::endl;auto start_time = std::chrono::high_resolution_clock::now();auto actions = process_ticks(ticks, WINDOW_SIZE, THRESHOLD);auto end_time = std::chrono::high_resolution_clock::now();auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "数据处理完成,耗时 " << duration.count() / 1000.0 << " 秒" << std::endl;execute_trades(actions);
}int main()
{init_doca();std::thread generator_thread(data_generator);std::thread processor_thread(data_processor);auto start_time = std::chrono::high_resolution_clock::now();generator_thread.join();processor_thread.join();auto end_time = std::chrono::high_resolution_clock::now();auto total_duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);std::cout << "整个过程耗时 " << total_duration.count() / 1000.0 << " 秒" << std::endl;cleanup_doca();return 0;
}

未挂载 DPU 仅通过本机 CPU 运算的设备执行花费了 2.04 秒。
在这里插入图片描述
使用 DOCA API 挂载 DPU 的开发环境下执行仅花了 0.32 秒。
在这里插入图片描述

网络延迟大大减少,显著提升了交易执行速度,系统吞吐量提高很大,交易系统的稳定性和可靠性得到增强。


2. 高频交易算法加速

某对冲基金使用复杂的高频交易算法进行实时市场分析和交易决策,算法需要在极短时间内处理大量数据并执行交易指令。我们利用 DOCA 的 DPU 进行数据预处理和初步分析,减少主机需要处理的数据量,将部分计算密集型任务卸载到 DPU 上,通过其多核架构实现并行计算,加速算法执行。

下面是测试代码:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <cmath>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>#define NUM_TICKS 100000      // 总的市场数据数量
#define QUEUE_MAXSIZE 10000   // 队列最大容量
#define WINDOW_SIZE 100       // 均值回归窗口大小
#define THRESHOLD 0.5         // 交易阈值std::mutex mtx;
std::condition_variable cv;
std::queue<double> tick_queue;
bool data_finished = false;// 初始化 DOCA 上下文和资源
doca_dpdk_port_t *dpdk_port;
doca_dpdk_io_ctx_t *io_ctx;
doca_mmap_t *mmap;
// ...(根据需要添加更多 DOCA 资源)// 初始化 DOCA
void init_doca()
{// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDKdoca_dpdk_init();// 初始化 DPDK 端口和 IO 上下文dpdk_port = doca_dpdk_port_start(/*端口配置*/);io_ctx = doca_dpdk_io_ctx_create(dpdk_port);// 初始化内存映射mmap = doca_mmap_create(/*内存映射配置*/);
}// DOCA 数据处理函数(用于加速网络数据包的接收和处理)
void doca_data_processing()
{// 模拟 DOCA 接收和处理网络数据while (!data_finished) {// 从 DPU 端口接收数据包doca_buf_t *buf = doca_dpdk_rx_burst(io_ctx, /*接收队列*/);if (buf) {// 处理接收到的网络数据包(例如,解析网络层、协议等)// 在此处可以实现如过滤、数据包内容的提取等加速操作// 然后将处理的数据加入到队列中供主机进行交易计算double price = process_packet(buf);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();doca_buf_free(buf);  // 释放 DOCA 缓冲区}}
}// 市场数据生成器(模拟市场数据生成)
void data_generator()
{std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> dist(0.0, 0.1);  // 正态分布,标准差为0.1double price = 100.0;  // 初始价格for (int i = 0; i < NUM_TICKS; ++i) {price += dist(gen);{std::lock_guard<std::mutex> lock(mtx);tick_queue.push(price);}cv.notify_one();}{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_one();
}// 简单的交易决策函数:均值回归策略
void process_ticks(std::vector<double>& ticks)
{int n = ticks.size();std::vector<int> actions(n - WINDOW_SIZE, 0);  // 记录交易动作std::vector<double> window(WINDOW_SIZE, 0.0);double sum_window = 0.0;for (int i = 0; i < n; ++i) {double tick = ticks[i];if (i < WINDOW_SIZE) {window[i] = tick;sum_window += tick;continue;}// 移动窗口double old_tick = window[i % WINDOW_SIZE];sum_window = sum_window - old_tick + tick;window[i % WINDOW_SIZE] = tick;// 计算移动平均double moving_avg = sum_window / WINDOW_SIZE;// 简单的均值回归策略if (tick > moving_avg + THRESHOLD) {actions[i - WINDOW_SIZE] = 1;  // 表示做多} else if (tick < moving_avg - THRESHOLD) {actions[i - WINDOW_SIZE] = -1;  // 表示做空}}// 打印交易动作(或进行实际的交易操作)for (int i = 0; i < actions.size(); ++i) {if (actions[i] != 0) {std::cout << "Trade action at index " << i << ": ";std::cout << (actions[i] == 1 ? "Buy" : "Sell") << std::endl;}}
}int main()
{// 初始化 DOCAinit_doca();// 启动 DOCA 数据处理线程std::thread doca_thread(doca_data_processing);// 启动数据生成线程std::thread data_thread(data_generator);// 处理数据并应用交易策略std::vector<double> ticks;while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{ return !tick_queue.empty() || data_finished; });while (!tick_queue.empty()) {ticks.push_back(tick_queue.front());tick_queue.pop();}// 一旦收集到足够的数据,应用交易策略if (ticks.size() > WINDOW_SIZE) {process_ticks(ticks);}if (data_finished && tick_queue.empty()) {break;}}// 等待线程完成data_thread.join();doca_thread.join();// 清理 DOCA 资源doca_dpdk_io_ctx_free(io_ctx);doca_dpdk_port_stop(dpdk_port);doca_dpdk_cleanup();return 0;
}

下面是测试结果,左侧为挂载DPU后的,右侧为未挂载的。

在这里插入图片描述

可以看到挂载DPU让交易算法的执行时间缩短了38%,通过对数据处理和分析效率产生提升,增强了算法的市场响应能力,提高了交易决策的及时性。


3. 风险管理与合规监控

在高频交易中,实时风险管理和合规监控至关重要。传统的风险监控系统难以实时处理海量交易数据,导致风险响应滞后。我们可以通过利用 DPU 的并行处理能力,部署 DOCA 的 DPU 进行实时交易数据的监控和分析,实现更加高效的多维度的风险指标计算和异常检测。

下面是测试代码:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
#include <cmath>
#include <chrono>// NVIDIA DOCA SDK 头文件(假设已正确安装并设置了环境)
#include <doca_dp.h>
#include <doca_buf.h>
#include <doca_mmap.h>
#include <doca_argp.h>
#include <doca_log.h>// 配置参数
#define NUM_TRADES 100000          // 模拟生成的交易数量
#define QUEUE_MAXSIZE 10000        // 队列最大容量
#define POSITION_LIMIT 1000        // 最大持仓限制
#define MAX_TRADE_SIZE 100         // 单笔交易最大量
#define PRICE_FLUCTUATION_THRESHOLD 5.0  // 价格波动阈值
#define RAPID_TRADING_THRESHOLD 100 // 短时间内交易次数阈值
#define WINDOW_SIZE 100            // 快速交易检测的时间窗口大小std::mutex mtx;
std::condition_variable cv;
std::queue<std::tuple<int, int, double, double>> trade_queue;  // 存储交易数据的队列
bool data_finished = false;// DOCA 初始化(省略 DOCA 设置的细节,假设已正确安装并设置)
void init_doca() {// 初始化 DOCA 日志doca_log_create_syslog_backend("market_data_processor");doca_log_set_level(DOCA_LOG_LEVEL_INFO);// 初始化 DOCA DPDK(这里只是示范,具体实现视需求)doca_dpdk_init();doca_dpdk_port_t* dpdk_port = doca_dpdk_port_start(/*端口配置*/);doca_dpdk_io_ctx_t* io_ctx = doca_dpdk_io_ctx_create(dpdk_port);
}// 模拟交易数据生成器
void trade_data_generator() {std::random_device rd;std::mt19937 gen(rd());std::normal_distribution<> price_dist(0.0, 0.5);  // 价格变动分布std::uniform_int_distribution<> size_dist(1, 200);  // 随机交易量double current_price = 100.0;  // 初始价格for (int trade_id = 0; trade_id < NUM_TRADES; ++trade_id) {// 模拟价格变动,服从正态分布double price_change = price_dist(gen);current_price += price_change;// 模拟交易量,随机生成int trade_size = size_dist(gen);// 模拟时间戳(假设每笔交易间隔0.001秒)double timestamp = trade_id / 1000.0;// 存储交易数据{std::lock_guard<std::mutex> lock(mtx);trade_queue.push({trade_id, trade_size, current_price, timestamp});}cv.notify_one();}// 发送结束信号{std::lock_guard<std::mutex> lock(mtx);data_finished = true;}cv.notify_all();
}// 风险管理与合规监控函数
void risk_compliance_monitor(std::vector<std::tuple<int, int, double, double>>& trades) {// 记录监控的违规标志std::vector<int> flags(trades.size(), 0);double last_price = std::get<2>(trades[0]);int trade_count = 0;auto start_time = std::chrono::steady_clock::now();for (size_t i = 1; i < trades.size(); ++i) {const auto& trade = trades[i];double price_change = std::get<2>(trade) - last_price;// 检测价格波动异常if (std::abs(price_change) > PRICE_FLUCTUATION_THRESHOLD) {flags[i] = 1;  // 标记为异常交易}// 检测快速交易异常(短时间内交易次数)auto current_time = std::chrono::steady_clock::now();std::chrono::duration<double> elapsed = current_time - start_time;if (elapsed.count() <= 1.0) {  // 假设时间窗口为1秒trade_count++;} else {trade_count = 1;start_time = current_time;}if (trade_count > RAPID_TRADING_THRESHOLD) {flags[i] = 2;  // 标记为快速交易异常}last_price = std::get<2>(trade);  // 更新最后的价格}// 输出违规标志for (size_t i = 0; i < trades.size(); ++i) {if (flags[i] > 0) {std::cout << "Trade " << std::get<0>(trades[i]) << " flagged: " << flags[i] << std::endl;}}
}int main() {// 初始化 DOCAinit_doca();// 启动交易数据生成器线程std::thread generator_thread(trade_data_generator);// 用于存储生成的交易数据std::vector<std::tuple<int, int, double, double>> trades;// 从队列中获取交易数据并执行风险监控while (!data_finished || !trade_queue.empty()) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return !trade_queue.empty() || data_finished; });while (!trade_queue.empty()) {trades.push_back(trade_queue.front());trade_queue.pop();}lock.unlock();if (!trades.empty()) {risk_compliance_monitor(trades);trades.clear();  // 清空交易数据}}// 等待生成器线程完成generator_thread.join();return 0;
}

测试结果:

挂载 DPU 进行实时交易数据的监控和分析花费时间为 3.6 秒。

在这里插入图片描述

普通运行花费 4.49 秒。

在这里插入图片描述

可以看到风险检测的响应时间缩短了20%,实时监控和分析能力增强,及时发现并处理潜在的交易风险,提高了系统的风险管理能力。

4. DOCA 在 HFT 中的性能优势总结

通过上述案例分析,可以看出 DOCA 在高频交易中的多个方面展现出了显著的性能优势。

性能优势描述
低延迟硬件加速和高效的数据路径设计,显著降低了数据传输和处理的延迟。
高吞吐量DPU 的并行处理能力和高效的数据管理,提升了系统的整体吞吐量。
资源优化通过卸载网络和数据处理任务,优化了 CPU 和内存资源的利用,提高了系统的整体性能。
可扩展性DOCA 的模块化设计和灵活的编程模型,支持高频交易系统的快速扩展和定制化需求。

DOCA 通过其高性能的 DPU,为金融高频交易提供了强大的技术支持。其在网络优化、数据处理、并行计算和安全管理等方面的优势,满足了 HFT 对低延迟、高吞吐量和高可靠性的苛刻要求。


五、思考与总结

经过一系列测试和分析,我对 DOCA 开发环境下 DPU 的性能有了更清晰的了解。在 DPA All-to-All 应用测试中,DPU 在处理多核并发数据交换时表现得非常高效,延迟低、吞吐量达标。在基础计算测试中,DPU 的表现也相当稳健。从 CPU 的矩阵乘法到内存带宽、多线程和多进程性能评估,它都能应对自如。结合金融高频交易的应用场景,DOCA 展现出了其在低延迟、高吞吐量和高可靠性方面的卓越优势,进一步证明了其在高性能计算和实时数据处理中的广泛应用潜力。DPU 在并行计算和数据处理上的优势,使其在日常计算和系统任务中具备广泛的应用前景。未来,随着更多应用场景的开发和优化,DOCA 有望在更多领域发挥关键作用,推动数据中心和高性能计算的发展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/7439.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

websocket实现

由于安卓资源管理器展示的路径不尽相同,各种软件保存文件的位置也不一定一样.对于普通用户上传文件时,查找文件可能是一个麻烦的事情.后来想到了一个办法,使用pc端进行辅助上传. 文章目录 实现思路1.0 实现定义web与客户端通信数据类型和数据格式web端websocket实现web端对客户…

【科研建模】Pycaret自动机器学习框架使用流程及多分类项目实战案例详解

Pycaret自动机器学习框架使用流程及项目实战案例详解 1 Pycaret介绍2 安装及版本需求3 Pycaret自动机器学习框架使用流程3.1 Setup3.2 Compare Models3.3 Analyze Model3.4 Prediction3.5 Save Model4 多分类项目实战案例详解4.1 ✅ Setup4.2 ✅ Compare Models4.3 ✅ Experime…

CY T 4 BB 5 CEB Q 1 A EE GS MCAL配置 - MCU组件

1、ResourceM 配置 选择芯片信号: 2、MCU 配置 2.1 General配置 1) McuDevErrorDetect: - 启用或禁用MCU驱动程序模块的开发错误通知功能。 - 注意:采用DET错误检测机制作为安全机制(故障检测)时,不能禁用开发错误检测。2) McuGetRamStateApi - enable/disable th…

docker 安装 mysql 详解

在平常的开发工作中&#xff0c;我们经常需要用到 mysql 数据库。那么在docker容器中&#xff0c;应该怎么安装mysql数据库呢。简单来说&#xff0c;第一步&#xff1a;拉取镜像&#xff1b;第二步&#xff1a;创建挂载目录并设置 my.conf&#xff1b;第三步&#xff1a;启动容…

【2025年数学建模美赛E题】(农业生态系统)完整解析+模型代码+论文

生态共生与数值模拟&#xff1a;生态系统模型的物种种群动态研究 摘要1Introduction1.1Problem Background1.2Restatement of the Problem1.3Our Work 2 Assumptions and Justifications3 Notations4 模型的建立与求解4.1 农业生态系统模型的建立与求解4.1.1 模型建立4.1.2求解…

编码器和扩散模型

目录 摘要abstract1.自动编码器2.变分编码器&#xff08;VAE&#xff09;3.论文阅读3.1 介绍3.2 方法3.3 结论 4.总结参考文献 摘要 本周学习了自动编码器&#xff08;AE&#xff09;和变分自动编码器&#xff08;VAE&#xff09;的基本原理与实现&#xff0c;分析其在数据降维…

【C++】类与对象初级应用篇:打造自定义日期类与日期计算器(2w5k字长文附源码)

文章目录 一、日期类的实现1. 日期类的默认成员函数的分析与实现构造函数其它默认成员函数 2. 各种逻辑比较运算符重载3. 日期加与减天数日期加天数系列日期减天数系列日期加减天数的最后修定和- -系列 4. 日期减日期方法一方法二 5. 流插入与流提取重载流插入重载流提取重载(含…

Redis实战(黑马点评)——关于缓存(缓存更新策略、缓存穿透、缓存雪崩、缓存击穿、Redis工具)

redis实现查询缓存的业务逻辑 service层实现 Overridepublic Result queryById(Long id) {String key CACHE_SHOP_KEY id;// 现查询redis内有没有数据String shopJson (String) redisTemplate.opsForValue().get(key);if(StrUtil.isNotBlank(shopJson)){ // 如果redis的数…

ThinkPhp伪静态设置后,访问静态资源也提示找不到Controller

ThinkPhp没有配置伪静态时&#xff0c;除了默认的IndexController能访问&#xff0c;其他路由Controller都访问不到&#xff0c;提示404错误。配置了伪静态后就解决了这个问题。 但是当我的ThinkPhp后台项目中有静态资源放在public目录&#xff08;或子目录&#xff09;中需要…

2013年蓝桥杯第四届CC++大学B组真题及代码

目录 1A&#xff1a;高斯日记&#xff08;日期计算&#xff09; 2B&#xff1a;马虎的算式&#xff08;暴力模拟&#xff09; 3C&#xff1a;第39级台阶&#xff08;dfs或dp&#xff09; 4D&#xff1a;黄金连分数&#xff08;递推大数运算&#xff09; 5E&#xff1a;前缀…

【数据分享】1929-2024年全球站点的逐月平均能见度(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、湿度等指标&#xff01;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 有关气象指标的监测站点数据&#xff0c;之前我们分享过1929-2024年全球气象站点…

【动态规划】--- 斐波那契数模型

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; 算法Journey &#x1f3e0; 第N个泰波那契数模型 &#x1f4cc; 题目解析 第N个泰波那契数 题目要求的是泰波那契数&#xff0c;并非斐波那契数。 &…

单片机-STM32 WIFI模块--ESP8266 (十二)

1.WIFI模块--ESP8266 名字由来&#xff1a; Wi-Fi这个术语被人们普遍误以为是指无线保真&#xff08;Wireless Fidelity&#xff09;&#xff0c;并且即便是Wi-Fi联盟本身也经常在新闻稿和文件中使用“Wireless Fidelity”这个词&#xff0c;Wi-Fi还出现在ITAA的一个论文中。…

计算机的错误计算(二百二十二)

摘要 利用大模型化简计算 实验表明&#xff0c;虽然结果正确&#xff0c;但是&#xff0c;大模型既绕了弯路&#xff0c;又有数值计算错误。 与前面相同&#xff0c;再利用同一个算式看看另外一个大模型的化简与计算能力。 例1. 化简计算摘要中算式。 下面是与一个大模型的…

ansible自动化运维实战--软件包管理模块、服务模块、文件模块和收集模块setup(4)

文章目录 一、软件包管理模块1.1、功能1.2、常用参数1.3、示例 二、服务模块2.1、功能2.2、服务模块常用参数2.3、示例 三、文件与目录模块3.1、file功能3.2、常用参数3.3、示例 四、收集模块-setup4.1、setup功能4.2、示例 一、软件包管理模块 1.1、功能 Ansible 提供了多种…

高速光模块中的并行光学和WDM波分光学技术

随着AI大模型训练和推理对计算能力的需求呈指数级增长&#xff0c;AI数据中心的网络带宽需求大幅提升&#xff0c;推动了高速光模块的发展。光模块作为数据中心和高性能计算系统中的关键器件&#xff0c;主要用于提供高速和大容量的数据传输服务。 光模块提升带宽的方法有两种…

Linux命令行配置网络代理

在Linux命令行中&#xff0c;你可以使用以下方法设置网络代理服务器。 本文演示代理地址为&#xff1a;http://192.168.1.30:7890 请根据实际代理地址进行替换 临时代理 使用环境变量的方法&#xff1a; 打开终端&#xff0c;并输入以下命令&#xff1a; export http_proxyhtt…

SpringBoot3+Vue3开发学生选课管理系统

功能介绍 分三个角色登录&#xff1a;学生登录&#xff0c;老师登录&#xff0c;教务管理员登录&#xff0c;不同用户功能不同&#xff01; 1.学生用户功能 选课记录&#xff0c;查看选课记录&#xff0c;退选。选课管理&#xff0c;进行选课。通知管理&#xff0c;查看通知消…

牛客周赛 Round 78 A-C

A.时间表查询&#xff01; 链接&#xff1a;https://ac.nowcoder.com/acm/contest/100671/A 来源&#xff1a;牛客网 题目描述 今天是2025年1月25日&#xff0c;今年的六场牛客寒假算法基础集训营中&#xff0c;前两场比赛已经依次于 20250121、20250123 举行&#xff1b;而…

Android - 通过Logcat Manager简单获取Android手机的Log

由于工作需要&#xff0c;经常需要获取Android手机的Log。 平常都是通过adb命令来获取&#xff0c;每次都要写命令。 偶然的一个机会&#xff0c;我从外网发现了一个工具 Logcat Manager&#xff0c;只需要通过简单的双击即可获取Android的Log&#xff0c;这里也分享一下。 目…