深入探讨C++多线程性能优化

深入探讨C++多线程性能优化

在现代软件开发中,多线程编程已成为提升应用程序性能和响应速度的关键技术之一。尤其在C++领域,多线程编程不仅能充分利用多核处理器的优势,还能显著提高计算密集型任务的效率。然而,多线程编程也带来了诸多挑战,特别是在性能优化方面。本文将深入探讨影响C++多线程性能的一些关键因素,比较锁机制与原子操作的性能。通过这些内容,希望能为开发者提供有价值的见解和实用的优化策略,助力于更高效的多线程编程实践。

先在开头给一个例子,你认为下面这段benchmark代码结果会是怎样的。这里的逻辑很简单,将0-20000按线程切成n片,每个线程在一个Set里查找这个数字存不存在,存在则计数+1。

#include <benchmark/benchmark.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>constexpr int kSetSize = 10000;class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this]() {s = std::make_shared<std::unordered_set<int>>();for (int i = 0; i < kSetSize; i++) {s->insert(i);}});}std::shared_ptr<std::unordered_set<int>> GetSet() { return s; }private:std::shared_ptr<std::unordered_set<int>> s;std::once_flag flag;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize * 2;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {auto inst = GetSet();if (inst->count(i) > 0) {sum++;}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);BENCHMARK_MAIN();

跑出来的结果如下:
在这里插入图片描述

将任务切分成多个片段并交由多线程执行后,整体性能不仅没有提升,反而下降了,且性能与线程数成反比。那么,问题来了:导致这种结果的原因是什么?如何才能实现合理的并行执行,从而降低CPU的执行时间?在接下来的部分,笔者将为你揭示答案。

影响多线程性能的因素

笔者认为,影响多线程性能的主要因素有以下两个:

 1.Lock Contention。
 2.Cache Coherency。

Lock Contention对应使用锁来处理多线程同步问题的场景,而Cache Coherency则对应使用原子操作来处理多线程同步问题的场景。

Lock Contention

在多线程环境中,多个线程同时尝试获取同一个锁(Lock)时,会发生竞争现象,这就是所谓的锁竞争(Lock Contention)。锁竞争会导致线程或进程被阻塞,等待锁被释放,从而影响系统的性能和响应时间。大多数情况下,开发人员会选择使用锁来解决线程间的同步问题,因此锁竞争问题也变得广为人知且容易理解。由于锁的存在,位于临界区的代码在同一时刻只能由一个线程执行。因此,优化的思路就是尽量避免多个线程同时访问同一资源。常见的优化方向有两种:

 1.减少临界区大小:临界区越小,这段代码的执行时间就越短,从而在整体程序运行时间中所占的比例也越小,冲突也就越少。
 2.对共享资源进行分桶操作:每个线程只会在某个桶上访问资源,理想情况下,每个线程都会访问不同的桶,这样就不会有冲突。

减少临界区大小需要开发者对自己的代码进行仔细思考,将不必要的操作放在临界区外,例如一些初始化和内存分配操作。

对共享资源进行分桶操作在工程实践中也非常常见。例如,LevelDB的LRUCache中,每个Key只会固定在一个桶上。如果hash函数足够优秀且数据分布足够随机,这种方法可以大大提高LRUCache的性能。

Cache Coherency

缓存一致性(Cache Coherency)是指在多处理器系统中,确保各个处理器的缓存中的数据保持一致的机制。由于现代计算机系统通常包含多个处理器,每个处理器都有自己的缓存(如L1、L2、L3缓存),因此在并发访问共享内存时,可能会出现缓存数据不一致的问题。缓存一致性协议旨在解决这些问题,确保所有处理器在访问共享内存时看到的是一致的数据。

当我们对一个共享变量进行写入操作时,实际上需要通过缓存一致性协议将该变量的更新同步到其他线程的缓存中,否则可能会读到不一致的值。实际上,这个同步过程的单位是一个缓存行(Cache Line),而且同步过程相对较慢,因为涉及到跨核通信。

由此引申出两个严重影响性能的现象:

 1.Cache Ping-Pong。
 2.False Sharing。

Cache Ping-Pong

缓存乒乓效应(Cache Ping-Pong)是指在多处理器系统中,多个处理器频繁地对同一个缓存行(Cache Line)进行读写操作,导致该缓存行在不同处理器的缓存之间频繁地来回传递。这种现象会导致系统性能下降,因为缓存行的频繁传递会引起大量的缓存一致性流量和处理器间通信开销。

讲到这里,其实就可以解释为什么开头那段代码会随着线程数量的增加而性能反而下降。代码中的变量 s 是一个共享资源,但它使用了 shared_ptr。在复制 shared_ptr 时,会引起引用计数的增加(计数+1),多个线程频繁对同一个缓存行进行读写操作,从而引发缓存乒乓效应,导致性能下降。最简单的修改方式就是去掉 shared_ptr,代码如下,同时还可以得到我们预期的结果,即CPU时间随着线程数的增加而降低:

#include <benchmark/benchmark.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>constexpr int kSetSize = 10000;class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}});}const std::unordered_set<int>& GetSet() { return s; }private:std::unordered_set<int> s;std::once_flag flag;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize * 2;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {benchmark::DoNotOptimize(sum++);}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);BENCHMARK_MAIN();

在这里插入图片描述

False Sharing

伪共享(False Sharing)实际上是一种特殊的缓存乒乓效应(Cache Ping-Pong)。它指的是在多处理器系统中,多个处理器访问不同的数据,但这些数据恰好位于同一个缓存行中,导致该缓存行在不同处理器的缓存之间频繁传递。尽管处理器访问的是不同的数据,但由于它们共享同一个缓存行,仍然会引发缓存一致性流量,导致性能下降。

为了更好地理解这一现象,我们可以对上面的代码进行一些修改。假设我们使用一个 vector<atomic> 来记录不同线程的 sum 值,这样虽然不同线程修改的是不同的sum,但是还是在一个缓存行上。使用 atomic 是为了强制触发缓存一致性协议,否则操作系统可能会进行优化,不会立即将修改反映到主存。

#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>constexpr int kSetSize = 10000;class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this, &state]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}sums = std::vector<std::atomic<int>>(state.threads());});}const std::unordered_set<int>& GetSet() { return s; }std::vector<std::atomic<int>>& GetSums() { return sums; }private:std::unordered_set<int> s;std::once_flag flag;std::vector<std::atomic<int>> sums;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {benchmark::DoNotOptimize(GetSums()[state.thread_index()]++);}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);BENCHMARK_MAIN();

在这里插入图片描述

可以看到,尽管不同线程没有使用同一个变量,但由于 sums 里面的元素共享同一个缓存行(Cache Line),同样会导致性能急剧下降。

针对这种情况,只要我们将 sums 中的元素隔离,使它们不在同一个缓存行上,就不会引发这个问题。一般来说,缓存行的大小为64字节,我们可以使用一个类填充到64个字节来实现隔离。优化后的代码如下:

#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>constexpr int kSetSize = 10000;struct alignas(64) PaddedCounter {std::atomic<int> value{0};char padding[64 - sizeof(std::atomic<int>)];  // 填充到缓存行大小
};class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this, &state]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}sums = std::vector<PaddedCounter>(state.threads());});}const std::unordered_set<int>& GetSet() { return s; }std::vector<PaddedCounter>& GetSums() { return sums; }private:std::unordered_set<int> s;std::once_flag flag;std::vector<PaddedCounter> sums;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWork)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {benchmark::DoNotOptimize(GetSums()[state.thread_index()].value++);}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWork)->Threads(8);BENCHMARK_MAIN();

在这里插入图片描述

Lock VS Atomic

Lock Atomic Benchmark

很多人都认为锁(lock)比原子操作(atomic)要更慢,那么实际上真的是这样吗?下面我们通过两个测试来进行对比。

公平起见,我们将使用一个基于 atomic 变量实现的自旋锁(SpinLock)与 std::mutex 进行性能对比。自旋锁的实现摘自 Folly 库。其原理是使用一个 atomic 变量来标记是否被占用,并使用 acquire-release 内存序来保证临界区的正确性。在冲突过大时,自旋锁会使用 sleep 让出 CPU。代码如下:

#pragma once#include <atomic>
#include <cstdint>class Sleeper {static const uint32_t kMaxActiveSpin = 4000;uint32_t spin_count_;public:constexpr Sleeper() noexcept : spin_count_(0) {}inline __attribute__((always_inline)) static void sleep() noexcept {struct timespec ts = {0, 500000};nanosleep(&ts, nullptr);}inline __attribute__((always_inline)) void wait() noexcept {if (spin_count_ < kMaxActiveSpin) {++spin_count_;
#ifdef __x86_64__asm volatile("pause" ::: "memory");
#elif defined(__aarch64__)asm volatile("yield" ::: "memory");
#else// Fallback for other architectures
#endif} else {sleep();}}
};class SpinLock {enum { FREE = 0, LOCKED = 1 };public:constexpr SpinLock() : lock_(FREE) {}inline __attribute__((always_inline)) bool try_lock() noexcept { return cas(FREE, LOCKED); }inline __attribute__((always_inline)) void lock() noexcept {Sleeper sleeper;while (!try_lock()) {do {sleeper.wait();} while (AtomicCast(&lock_)->load(std::memory_order_relaxed) == LOCKED);}}inline __attribute__((always_inline)) void unlock() noexcept {AtomicCast(&lock_)->store(FREE, std::memory_order_release);}private:inline __attribute__((always_inline)) bool cas(uint8_t compare, uint8_t new_val) noexcept {return AtomicCast(&lock_)->compare_exchange_strong(compare, new_val, std::memory_order_acquire,std::memory_order_relaxed);}inline __attribute__((always_inline)) static std::atomic<uint8_t>* AtomicCast(uint8_t* value) {return reinterpret_cast<std::atomic<uint8_t>*>(value);}private:uint8_t lock_;
};

在第一个benchmark中,我们测试了无竞争情况下的性能。也就是说,原子变量的CAS操作只会执行一次,不会进入 sleep 状态。在这种情况下,自旋锁(SpinLock)等价于一次原子 set 操作。
代码如下:

#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
#include "spin_lock.h"constexpr int kSetSize = 10000;// struct alignas(64) PaddedCounter {
//   std::atomic<int> value{0};
//   char padding[64 - sizeof(std::atomic<int>)];  // 填充到缓存行大小
// };struct alignas(64) PaddedCounterLock {int value{0};char padding[64 - sizeof(std::atomic<int>)];  // 填充到缓存行大小
};class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this, &state]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}sums_atomic = std::vector<PaddedCounterLock>(state.threads());sum_lock = std::vector<PaddedCounterLock>(state.threads());});}const std::unordered_set<int>& GetSet() { return s; }std::vector<PaddedCounterLock>& GetSumsAtomic() { return sums_atomic; }std::vector<PaddedCounterLock>& GetSumLock() { return sum_lock; }private:std::unordered_set<int> s;std::once_flag flag;std::vector<PaddedCounterLock> sums_atomic;std::vector<PaddedCounterLock> sum_lock;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {SpinLock m;for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {std::lock_guard<SpinLock> lg(m);benchmark::DoNotOptimize(GetSumsAtomic()[state.thread_index()].value++);}}}
}BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {std::mutex m;for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {std::lock_guard<std::mutex> lg(m);benchmark::DoNotOptimize(GetSumLock()[state.thread_index()].value++);}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);BENCHMARK_MAIN();

benchmark结果:
在这里插入图片描述

第二个benchmark是对比竞争激烈时的性能,代码如下:

#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
#include "spin_lock.h"constexpr int kSetSize = 10000;class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this, &state]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}count_ = 0;});}const std::unordered_set<int>& GetSet() { return s; }void SpinLockAndAdd() {std::lock_guard<SpinLock> lg(m1_);count_++;}void MutexLockAndAdd() {std::lock_guard<std::mutex> lg(m2_);count_++;}private:std::unordered_set<int> s;std::once_flag flag;uint32_t count_;SpinLock m1_;std::mutex m2_;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {SpinLockAndAdd();}}}
}BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {MutexLockAndAdd();}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);BENCHMARK_MAIN();

benchmark结果:
在这里插入图片描述

可以看到,无论是哪一种情况,std::mutex 的性能都更优。当然,这个测试结果可能会因不同的操作系统而有所不同,但至少可以得出一个结论:这两者的性能是一个量级的,并不存在 atomic 一定比 std::mutex 更快的说法。这其实是因为现代 C++ 中的 std::mutex 实现已经高度优化,其实现与上面的自旋锁(SpinLock)非常相似,在低竞争的情况下并不会陷入内核态。

那么,按上面的说法,是不是我们根本不需要 atomic 变量呢?先来分析一下 atomic 的优点。

atomic 的优点有:

 1.可以实现内存占用极小的锁。
 2.当临界区操作可以等价于一个原子操作时,性能会更高。

对于第二个结论,我们可以做个测试。同样,拿前面的例子稍作修改。

case 1如下:

#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
#include "spin_lock.h"constexpr int kSetSize = 10000;class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this, &state]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}});}const std::unordered_set<int>& GetSet() { return s; }private:std::unordered_set<int> s;std::once_flag flag;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {std::atomic<uint32_t> sum = 0;for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {benchmark::DoNotOptimize(sum++);}}}
}BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {std::mutex m;uint32_t sum = 0;for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {std::lock_guard<std::mutex> lg(m);sum++;}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);BENCHMARK_MAIN();

benchmark结果:
在这里插入图片描述

case 2如下:

#include <benchmark/benchmark.h>
#include <atomic>
#include <iostream>
#include <memory>
#include <mutex>
#include <unordered_set>
#include "spin_lock.h"constexpr int kSetSize = 10000;class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State& state) override {std::call_once(flag, [this, &state]() {for (int i = 0; i < kSetSize; i++) {s.insert(i);}count_ = 0;atomic_count_ = 0;});}const std::unordered_set<int>& GetSet() { return s; }void AtomicAdd() { atomic_count_++; }void MutexLockAndAdd() {std::lock_guard<std::mutex> lg(m);count_++;}private:std::unordered_set<int> s;std::once_flag flag;uint32_t count_;std::atomic<uint32_t> atomic_count_;std::mutex m;
};BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkAtomic)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {AtomicAdd();}}}
}BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkLock)(benchmark::State& state) {for (auto _ : state) {int size_sum = kSetSize;int size_per_thread = (size_sum + state.threads() - 1) / state.threads();int sum = 0;int start = state.thread_index() * size_per_thread;int end = std::min((state.thread_index() + 1) * size_per_thread, size_sum);for (int i = start; i < end; i++) {const auto& inst = GetSet();if (inst.count(i) > 0) {MutexLockAndAdd();}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkAtomic)->Threads(8);BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(1);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(2);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkLock)->Threads(8);BENCHMARK_MAIN();

benchmark结果:
在这里插入图片描述

接下来结合这两个优点来看,链式数据结构的场景非常适合使用 atomic 变量。

 1.内存占用少:即使每个节点都实现一个自旋锁(SpinLock),也不会浪费太多内存。

 2.链式数据结构的临界区通常可以优化成一个指针的 CAS 操作。

Epoch Based Reclamation

虽然如此,但要写一个高性能的并发安全的链式数据结构是非常困难的,这主要是因为写操作包含了删除操作。举个最简单的例子:

假设有一个链表 A->B->C,一个线程正在读B节点,另一个线程正在删除B节点,如何保证读线程在读B节点期间不会被另一个线程给删掉?

再举个更复杂的例子:

假设有一个链表 A->B->C。一个线程正在读取 B 节点,另一个线程正在修改 B 节点。显然,最简单的实现是锁住 B,同时只允许一个操作,但显然这样从各方面来看性能都不是最佳的,这是第一个方法。

第二个方法是类似于 Copy On Write(COW)。写操作时先重新构造一个节点 B1,再修改对应的数据,最后通过 CAS 操作修改指针连接 A->B1。

我们来分析一下为什么第二个方法远比第一个方法要好。

首先,上锁会触发原子写,意味着即便是你只是为了读数据,也会触发一次 Cache Line 一致性同步的问题。而且在找到 B 节点之前的每一个节点都要依次上锁来保证读取的正确性,这意味着极大概率会发生 Cache Ping-Pong 问题。

再来看写操作,写操作除了上锁以外还需要修改节点的数据。第二个方法需要先构造一个新的节点再修改,意味着这个节点在插入链表之前一定不在其他线程的 Cache 里(排除刚好有某个变量和这个新节点的内存在同一个 Cache Line 的情况)。而第一个方法修改的节点已经在链表里,这表示在之前一定有线程已经访问过这个节点,那么它很可能在 Cache 里面,从而触发一次 Cache Line 一致性同步的问题。

然而事情没有这么简单。试想一下,在修改完指针 A->B1 后,B 节点需要被丢弃释放,这时候其他线程有可能正在访问 B 节点而导致崩溃。

可以看出这些问题都是因为删除操作引起的,这个问题有几个著名的解决方案,比如 Epoch Based Reclamation 和 Hazard Pointer 等。这里只介绍其中的 Epoch Based Reclamation,感兴趣的话请自行搜索了解其他实现方式。

该算法的思路是删除操作会尝试触发版本 +1,但只有当所有线程都是最新版本 e 时才能成功,成功后会回收 e-1 版本的内存。因此,最多会累积 3 个版本未释放节点的内存。是个以空间换时间,轻读重写的方案。

首先,每个线程维护自己的线程变量:

 1.active:标记该线程是否正在读数据。

 2.epoch:标记该线程当前的版本。

全局维护变量:

 1.global_epoch:全局最新的版本。

 2.retire_list:等待释放的节点。

读操作:

 1.首先把线程 active 标记为 true,表示正在读数据。

 2.然后把 global_epoch 赋值给 epoch,记录当前正在读的版本。

 3.如果线程需要删除节点,则把节点放到全局的 retire_list 末尾。

 4.结束读后,将 active 标记为 false。

写操作:

 1.如果要删除节点,则把节点放到全局的 retire_list 末尾,并且尝试增加版本。

 2.增加版本时检查所有线程的状态,当所有线程满足 epoch 等于当前版本 e 或者 active 为 false 时,进行版本 e = e + 1 操作。

 3.清空 e-2 版本的 retire_list。

这里给出一个简单的实现,代码如下:

#pragma once#include <array>
#include <atomic>
#include <mutex>
#include <numeric>
#include <vector>constexpr uint8_t kEpochSize = 3;
constexpr uint8_t kCacheLineSize = 64;template <uint32_t kReadThreadNum>
class ThreadIDManager;template <uint32_t kReadThreadNum>
struct ThreadID {ThreadID() { tid = ThreadIDManager<kReadThreadNum>::GetInstance().AcquireThreadID(); }~ThreadID() { ThreadIDManager<kReadThreadNum>::GetInstance().ReleaseThreadID(tid); }uint32_t tid;
};template <uint32_t kReadThreadNum>
class ThreadIDManager {public:ThreadIDManager() : tid_list_(kReadThreadNum) { std::iota(tid_list_.begin(), tid_list_.end(), 1); }ThreadIDManager(const ThreadIDManager &) = delete;ThreadIDManager(ThreadIDManager &&) = delete;ThreadIDManager &operator=(const ThreadIDManager &) = delete;~ThreadIDManager() = default;static ThreadIDManager &GetInstance() {static ThreadIDManager inst;return inst;}uint32_t AcquireThreadID() {std::lock_guard<std::mutex> lock(tid_list_mutex_);auto tid = tid_list_.back();tid_list_.pop_back();return tid;}void ReleaseThreadID(const uint32_t tid) {std::lock_guard<std::mutex> lock(tid_list_mutex_);tid_list_.emplace_back(tid);}private:std::vector<uint32_t> tid_list_;std::mutex tid_list_mutex_;
};struct TLS {TLS() : active(false), epoch(0) {}TLS(TLS &) = delete;TLS(TLS &&) = delete;void operator=(const TLS &) = delete;~TLS() = default;std::atomic_flag active;std::atomic<uint8_t> epoch;
} __attribute__((aligned(kCacheLineSize)));template <class RCObject, class DestroyClass, uint32_t kReadThreadNum>
class EbrManager {public:EbrManager() : tls_list_(), global_epoch_(0), update_(false), write_cnt_(0) {for (int i = 0; i < kEpochSize; i++) {retire_list_[i].store(nullptr, std::memory_order_release);}}EbrManager(const EbrManager<RCObject, DestroyClass, kReadThreadNum> &) = delete;EbrManager(EbrManager<RCObject, DestroyClass, kReadThreadNum> &&) = delete;EbrManager &operator=(const EbrManager<RCObject, DestroyClass, kReadThreadNum> &) = delete;~EbrManager() { ClearAllRetireList(); }void ClearAllRetireList() {for (int i = 0; i < kEpochSize; i++) {ClearRetireList(i);}}inline void StartRead() {auto &tls = GetTLS();tls.active.test_and_set(std::memory_order_release);tls.epoch.store(global_epoch_.load(std::memory_order_acquire), std::memory_order_release);}inline void EndRead() { GetTLS().active.clear(std::memory_order_release); }inline void FreeObject(RCObject *object) {auto epoch = global_epoch_.load(std::memory_order_acquire);auto *node = new RetireNode;node->obj = object;do {node->next = retire_list_[epoch].load(std::memory_order_acquire);} while (!retire_list_[epoch].compare_exchange_weak(node->next, node, std::memory_order_acq_rel));auto write_cnt = write_cnt_.fetch_add(1, std::memory_order_relaxed);if (write_cnt > kReadThreadNum) {if (!update_.test_and_set(std::memory_order_acq_rel)) {TryGC();update_.clear(std::memory_order_release);}}}private:inline TLS &GetTLS() {thread_local ThreadID<kReadThreadNum> thread_id;return tls_list_[thread_id.tid];}inline void TryGC() {auto epoch = global_epoch_.load(std::memory_order_acquire);// TODO 优化记录上一次搜索到的位置for (int i = 0; i < tls_list_.size(); i++) {if (tls_list_[i].active.test(std::memory_order::memory_order_acquire) &&tls_list_[i].epoch.load(std::memory_order::memory_order_acquire) != epoch) {return;}}global_epoch_.store((epoch + 1) % kEpochSize, std::memory_order_release);ClearRetireList((epoch + 2) % kEpochSize);write_cnt_.store(0, std::memory_order_relaxed);}inline void ClearRetireList(int index) {auto *retire_node = retire_list_[index].load(std::memory_order_acquire);while (retire_node != nullptr) {DestroyClass destroy(retire_node->obj);auto *old_node = retire_node;retire_node = retire_node->next;delete old_node;}retire_list_[index].store(nullptr, std::memory_order_release);}struct RetireNode {RCObject *obj;RetireNode *next;};std::array<char, kCacheLineSize> start_padding_;std::array<TLS, kReadThreadNum> tls_list_;std::atomic<uint8_t> global_epoch_;std::array<char, kCacheLineSize> mid_padding_;std::atomic_flag update_;std::atomic<uint32_t> write_cnt_;std::atomic<RetireNode *> retire_list_[kEpochSize];std::array<char, kCacheLineSize> end_padding_;
};

这里再给出一个benchmark,对比一下使用 Epoch Based Reclamation(EBR)和不使用 EBR 的区别。由于笔者时间有限,只能写一个非常简单的版本,仅供参考。

#include <benchmark/benchmark.h>
#include <mutex>
#include "ebr.h"
#include "spin_lock.h"struct Node {Node() : lock(), next(nullptr) {}int key;int value;Node *next;SpinLock lock;
};class NodeFree {public:NodeFree(Node *node) { delete node; }
};/** 快速测试起见,简单写了个list版本的kv结构,里面只会有3个元素,然后只支持Get和Modify,Modify也必定会命中key。* 不是直接把key,value,next设置成atomic变量而是使用SpinLock的原因是模拟复杂情况,真实情况下会存在Add和Remove操作,实现没有如此简单。*/
class MyList {public:MyList() {Node *pre_node = nullptr;auto *&cur_node = root_;// 这里虽然插入了10个元素,但后面的实现会假设第一个key 9作为header是绝对不会被修改或者读到的。for (int i = 0; i < 10; i++) {cur_node = new Node;cur_node->key = i;cur_node->value = i;cur_node->next = pre_node;pre_node = cur_node;}}int Get(int key, int *value) {root_->lock.lock();auto *cur_node = root_->next;auto *pre_node = root_;while (cur_node != nullptr) {cur_node->lock.lock();pre_node->lock.unlock();if (key == cur_node->key) {*value = cur_node->value;cur_node->lock.unlock();return 0;}pre_node = cur_node;cur_node = cur_node->next;}pre_node->lock.unlock();return 1;}int Modify(int key, int value) {root_->lock.lock();auto *cur_node = root_->next;auto *pre_node = root_;while (cur_node != nullptr) {cur_node->lock.lock();pre_node->lock.unlock();if (key == cur_node->key) {cur_node->value = value;cur_node->lock.unlock();return 0;}pre_node = cur_node;cur_node = cur_node->next;}pre_node->lock.unlock();return 1;}int GetUseEbr(int key, int *value) {ebr_mgr_.StartRead();auto *cur_node = root_->next;while (cur_node != nullptr) {if (key == cur_node->key) {*value = cur_node->value;ebr_mgr_.EndRead();return 0;}cur_node = cur_node->next;}ebr_mgr_.EndRead();return 1;}int ModifyUseEbr(int key, int value) {root_->lock.lock();auto *cur_node = root_->next;auto *pre_node = root_;while (cur_node != nullptr) {cur_node->lock.lock();if (key == cur_node->key) {auto *new_node = new Node;new_node->key = cur_node->key;new_node->value = value;new_node->next = cur_node->next;pre_node->next = new_node;cur_node->lock.unlock();pre_node->lock.unlock();ebr_mgr_.FreeObject(cur_node);return 0;}auto *next_node = cur_node->next;pre_node->lock.unlock();pre_node = cur_node;cur_node = next_node;}pre_node->lock.unlock();return 1;}private:Node *root_;EbrManager<Node, NodeFree, 15> ebr_mgr_;
};class MyBenchmark : public benchmark::Fixture {public:void SetUp(const ::benchmark::State &state) override {}MyList &GetMyList() { return l; }private:MyList l;std::once_flag flag;
};constexpr int kKeySize = 10000;BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkNoUseEbr)(benchmark::State &state) {for (auto _ : state) {auto &mylist = GetMyList();if (0 == state.thread_index()) {// modifyfor (int i = 0; i < kKeySize; i++) {mylist.Modify(i % 9, i);}} else {// getfor (int i = 0; i < kKeySize; i++) {int value;mylist.Get(i % 9, &value);}}}
}BENCHMARK_DEFINE_F(MyBenchmark, MultiThreadedWorkUseEbr)(benchmark::State &state) {for (auto _ : state) {auto &mylist = GetMyList();if (0 == state.thread_index()) {// modifyfor (int i = 0; i < kKeySize; i++) {mylist.ModifyUseEbr(i % 9, i);}} else {// getfor (int i = 0; i < kKeySize; i++) {int value;mylist.GetUseEbr(i % 9, &value);}}}
}// 注册基准测试,并指定线程数
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkNoUseEbr)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkNoUseEbr)->Threads(8);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkNoUseEbr)->Threads(12);BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkUseEbr)->Threads(4);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkUseEbr)->Threads(8);
BENCHMARK_REGISTER_F(MyBenchmark, MultiThreadedWorkUseEbr)->Threads(12);BENCHMARK_MAIN();

benchmark结果:
在这里插入图片描述

结语

以上是笔者的一些个人见解。由于水平和时间有限,测试用例尽可能地简化,可能不够全面。如果内容中有任何错误,欢迎指出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/452581.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

62天框架安全(学习)

发现学了之后没有去复习&#xff0c;每天都要问自己学了什么&#xff0c;复习了吗&#xff0c;下次还能记住吗 一下内容来自【小迪安全2023】第62天:服务攻防-框架安全&CVE复现&Spring&Struts&Laravel&ThinkPHP_小迪安全文档2023-CSDN博客 一个网站的源码…

排序算法 —— 直接插入排序

目录 1.直接插入排序的思想 2.直接插入排序的实现 实现分析 实现代码 3.直接插入排序的分析 时间复杂度分析 空间复杂度分析 稳定性 1.直接插入排序的思想 直接插入排序的思想就是把待排序的元素按其关键码值的大小依次插入到一个已经排好序的有序序列中&#xff0c…

一种基于OCR图像识别技术的发票采集管理系统及方法

一种基于OCR图像识别技术的发票采集管理系统及方法 摘要 本发明涉及了一种基于OCR图像识别技术的发票采集管理系统及方法&#xff0c;该系统的发票信息采集单元采集发票图片信息数据&#xff0c;OCR图像识别单元基于OCR图像识别技术并结合人工智能深度学习算法对发票图片信息数…

vscode默认添加python项目的源目录路径到执行环境(解决ModuleNotFoundError: No module named问题)

0. 问题描述 vscode中编写python脚本&#xff0c;导入工程目录下的其他模块&#xff0c;出现ModuleNotFoundError: No module named 错误 在test2的ccc.py文件中执行print(sys.path) 查看路径 返回结果发现并无’/home/xxx/first_demo’的路径&#xff0c;所以test2下面的文…

Vue-router 路由守卫执行流程图

vue-router 路由守卫执行的流程图&#xff08;个人理解&#xff09; 图1 - 图2

【MR开发】在Pico设备上接入MRTK3(一)——在Unity工程中导入MRTK3依赖

写在前面的话 在Pico上接入MRTK3&#xff0c;目前已有大佬开源。 https://github.com/Phantomxm2021/PicoMRTK3 也有值得推荐的文章。 MRTK3在PICO4上的使用小结 但由于在MacOS上使用MRTK3&#xff0c;无法通过Mixed Reality Feature Tool工具管理MRTK3安装包。 故记录一下…

jmeter使用文档

文章目录 一、安装使用1、下载2、bin/jmeter.properties介绍 二、windows使用1、微调&#xff08;1&#xff09;界面样式&#xff08;2&#xff09;修改语言 2、简单使用3、各组件详解&#xff08;1&#xff09;CSV 数据文件配置&#xff08;2&#xff09;BeanShell取样器 三、…

Pair的基本概念

概述 当一个方法需返回两个值、并且两个值都有重要意义时&#xff0c;我们一般会用Map的key、value来表达。 但是如果仅返回两个值&#xff0c;就用管理一堆key/value键值对的HashMap等结构&#xff0c;有点大材小用&#xff0c;增加了数据结构的复杂度。所以便出现了pair这个…

RAG流程的实现与改进

一、 RAG流程图 数据入库&#xff1a;读取本地数据并切成小块&#xff0c;并把这些小块经过编码embedding后&#xff0c;存储在一个向量数据库中&#xff08;下图1——6步&#xff09;&#xff1b;相关性检索&#xff1a;用户提出问题&#xff0c;问题经过编码&#xff0c;再在…

探索Python中的多线程与多进程

在Python编程中&#xff0c;多线程和多进程是两个重要的概念&#xff0c;它们被用来提高程序的执行效率。本文将深入探讨这两个概念&#xff0c;并对比它们在Python中的实现方式。 一、多线程 多线程是一种并发执行的程序设计方法。在Python中&#xff0c;我们可以使用thread…

【C++_string类练习】仅仅反转字母

题目链接&#xff1a;仅仅反转字母 解题思路&#xff1a; 这种反转字符的题目我第一个想到的方法就是&#xff1a;双指针 一个指针在前start&#xff0c;一个指针在后back&#xff0c; 如果指针所指向的位置的值是字母&#xff0c;那么两个指针位置的值就进行交换&#xff0…

Leetcode 反转字符串中的单词

这个Java代码解决了“反转字符串中的单词顺序”的问题&#xff0c;具体思想如下&#xff1a; 1. 去除字符串首尾的空格 s.trim() 方法用于去除输入字符串 s 中的前导和尾随空格。这样做是为了防止在后续步骤中多余的空格对结果产生影响。 2. 按空格分割字符串 s.split(&quo…

Ingress-nginx中HTTPS的强制转发

文章目录 在使用aws 的NLB转发流量到ingress时&#xff0c;发现NLP上生成的转发配置不符合正常预期&#xff0c;如下图&#xff1a; ingress-nginx service 配置如下&#xff1a; apiVersion: v1 kind: Service metadata:annotations:service.beta.kubernetes.io/aws-load-b…

智能去毛刺:2D视觉引导机器人如何重塑制造业未来

机器人技术已经深入到各个工业领域中&#xff0c;为制造业带来了前所未有的变革。其中&#xff0c;2D视觉引导机器人技术以其精准、高效的特点&#xff0c;在去毛刺工艺中发挥着越来越重要的作用。本文将为您介绍2D视觉引导机器人技术的基本原理及其在去毛刺工艺中的应用&#…

Node.js学习笔记

回顾&#xff1a; javascript 可以在浏览器运行 &#xff08;js代码会JavaScript的解析引擎执行&#xff09;chrome 》V8 &#xff08;性能最好&#xff09;FireFox 》 奥丁猴safri 》JSCoreIE浏览器 》查克拉JavaScript可以在浏览器端操作DOM 和BOM每一个浏览器都内置了B…

php生成PDF文件(FPDF)

FPDF即“Free PDF”&#xff0c;FPDF类库提供了基本的PDF创建功能&#xff0c;其源代码和使用权是免费的。 PDF格式文档优势 通用&#xff1a;PDF文档在UNIX和Windows系统均可正常使用。 安全&#xff1a;PDF文档可设置为只读模式&#xff0c;并且可以添加密码等保护措施。 美…

JavaScript:闭包、防抖与节流

一&#xff0c;闭包 1&#xff0c;什么是闭包 闭包是指一个函数和其周围的词法环境(lexical environment)的组合。 换句话说&#xff0c;闭包允许一个函数访问并操作函数外部的变量。 闭包的核心特性: 函数内部可以访问外部函数的变量即使外部函数已经返回&#xff0c;内部…

ApacheShiro反序列化 550 721漏洞

Apache Shiro是一个强大且易用的Java安全框架,执行身份验证、授权、密码和会话管理个漏洞被称为 Shiro550 是因为在Apache Shiro的GitHub问题跟踪器中&#xff0c;该漏洞最初被标记为第550个问题,721漏洞名称也是由此而来 Shiro-550 CVE-2016-4437 Shiro反序列化Docker复现 …

Pytest参数详解 — 基于命令行模式!

1、--collect-only 查看在给定的配置下哪些测试用例会被执行 2、-k 使用表达式来指定希望运行的测试用例。如果测试名是唯一的或者多个测试名的前缀或者后缀相同&#xff0c;可以使用表达式来快速定位&#xff0c;例如&#xff1a; 命令行-k参数.png 3、-m 标记&#xff08;…