目录
一:Atomic:
二:Thread
1. 创建线程
2. 小心移动(std::move)线程
3. 如何创建带参数的线程
4. 线程参数是引用类型时,要小心谨慎。
5. 获取线程ID
6. jthread
7. 如何在线程中使用中断 stop_token
三:如何解决数据竞争
1.有问题的代码
2.使用互斥
3.预防死锁
4. 自动释放锁
5. 延迟锁
6. 共享锁
7. 线程安全的初始化
四:线程局部存储
五:线程通信
1.条件变量
2. 防止虚假唤醒
3. 防止唤醒丢失
4.信号量
5. std::latch
六:任务
1. std::promise, std::future
2. 用std::promise, std::future进行线程同步
3. std::async
4. std::package_task
一:Atomic:
#include <atomic>
#include <thread>
#include <iostream>using namespace std;std::atomic_int x, y;
int r1, r2;
void writeX() {x.store(1);r1 = y.load();
}
void writeY() {y.store(1);r2 = x.load();
} int main() {for (int i = 0; i < 100; i++){x = 0;y = 0;std::thread a(writeX);std::thread b(writeY);a.join();b.join();std::cout << r1 << r2 << std::endl;}return 0;
}
//可能的输出有三种情况:01, 10, 11
//01:先执行线程a, 再执行线程b
//10:先执行线程b,再执行线程a
//11:执行线程a一半后调度到线程b,然后再回来
二:Thread
1. 创建线程
#include <atomic>
#include <thread>
#include <iostream>using namespace std;void helloFunction() {cout << "function" << endl;
}class HelloFunctionObject {
public:void operator()() const {cout << "function object" << endl;}
};int main()
{thread t1(helloFunction); // functionHelloFunctionObject helloFunctionObject;thread t2(helloFunctionObject); // function objectthread t3([] { cout << "lambda function" << std::endl; }); // lambda functiont1.join(); //需要用join,否则可能会出现主线程退出时,t1线程还没有执行完的情况,引起异常t2.join();t3.join();return 0;
}
2. 小心移动(std::move)线程
#include <atomic>
#include <thread>
#include <iostream>using namespace std;int main()
{std::thread t([] { cout << "lambda function"; });std::thread t2;t2 = std::move(t);std::thread t3([] { cout << "lambda function"; });/*此处代码有问题,当t2 已经获得线程t后,它已经是callable和joinable,再赋值t3会terminate*/ t2 = std::move(t3); std::terminate
}
3. 如何创建带参数的线程
#include <atomic>
#include <thread>
#include <iostream>using namespace std;//如何在线程中传递参数
void printStringCopy(string s) { cout << s; }
void printStringRef(const string& s) { cout << s; }int main()
{string s{ "C++" };thread tPerCopy([=] { cout << s; }); // C++thread tPerCopy2(printStringCopy, s); // C++tPerCopy.join();tPerCopy2.join();thread tPerReference([&] { cout << s; }); // C++thread tPerReference2(printStringRef, s); // C++tPerReference.join();tPerReference2.join();
}
4. 线程参数是引用类型时,要小心谨慎。
#include <iostream>using namespace std;using std::this_thread::sleep_for;
using std::this_thread::get_id;struct Sleeper {Sleeper(int& i_) :i{ i_ } {};void operator() (int k) {for (unsigned int j = 0; j <= 5; ++j) {sleep_for(std::chrono::milliseconds(100));i += k;}std::cout << get_id(); // undefined behaviour}
private:int& i;
};int main()
{int valSleeper = 1000;//valSleeper 作为引用类型传给线程,如果主线程先退出,t线程使用valSleeper会产生未定义行为, 并且主线程和t线程共享varSleeper,产生数据竞争,std::thread t(Sleeper(valSleeper), 5); t.detach();std::cout << valSleeper; // undefined behaviour
}
5. 获取线程ID
using namespace std;
using std::this_thread::get_id;int main()
{std::cout << std::thread::hardware_concurrency() << std::endl; // 4std::thread t1([] { std::cout << get_id() << std::endl; }); // 139783038650112std::thread t2([] { std::cout << get_id() << std::endl; }); // 139783030257408std::cout << t1.get_id() << std::endl; // 139783038650112std::cout << t2.get_id() << std::endl; // 139783030257408t1.swap(t2);std::cout << t1.get_id() << std::endl; // 139783030257408std::cout << t2.get_id() << std::endl; // 139783038650112std::cout << get_id() << std::endl; // 140159896602432t1.join();t2.join();
}
6. jthread
#include <atomic>
#include <thread>
#include <iostream>using namespace std;
using std::this_thread::get_id;//jthread 自动join()的线程
int main()
{std::jthread thr{ [] { std::cout << "std::jthread" << "\n"; } }; // std::jthreadstd::cout << "thr.joinable(): " << thr.joinable() << "\n"; // thr.joinable(): true
}
7. 如何在线程中使用中断 stop_token
#include <atomic>
#include <thread>
#include <iostream>using namespace std;
using std::this_thread::get_id;
using namespace::std::literals;//字面量,比如0.2s, C++20能识别这种写法std::jthread nonInterruptable([] { // (1) 创建非中断线程int counter{ 0 };
while (counter < 10) {std::this_thread::sleep_for(0.2s);std::cerr << "nonInterruptable: " << counter << std::endl;++counter;
}});
std::jthread interruptable([](std::stop_token stoken) { // (2) 创建可中断线程int counter{ 0 };
while (counter < 10) {std::this_thread::sleep_for(0.2s);if (stoken.stop_requested()) return; // (3) 检查线程是否被中断std::cerr << "interruptable: " << counter << std::endl;++counter;
}});int main()
{std::this_thread::sleep_for(1s);std::cerr << "Main thread interrupts both jthreads" << std::endl;nonInterruptable.request_stop(); // (4)//请求中断,非中断线程不理会interruptable.request_stop();//请求中断,中断线程会响应
}
三:如何解决数据竞争
1.有问题的代码
#include <atomic>
#include <thread>
#include <iostream>using namespace std;struct Worker {Worker(string n) :name(n) {};void operator() () {for (int i = 1; i <= 3; ++i) {this_thread::sleep_for(chrono::milliseconds(200));//流本身是线程安全的,但是cout是共享变量,它会独占流,多个线程访问cout时会引起数据竞争 cout << name << ": " << "Work " << i << endl;}}
private:string name;
};int main()
{thread herb = thread(Worker("Herb"));thread andrei = thread(Worker(" Andrei"));thread scott = thread(Worker(" Scott"));thread bjarne = thread(Worker(" Bjarne"));herb.join();andrei.join();scott.join();bjarne.join();}
2.使用互斥
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>using namespace std;std::mutex mutexCout;struct Worker {Worker(string n) :name(n) {};void operator() () {for (int i = 1; i <= 3; ++i) {this_thread::sleep_for(chrono::milliseconds(200));mutexCout.lock();cout << name << ": " << "Work " << i << endl;mutexCout.unlock();}}
private:string name;
};int main()
{thread herb = thread(Worker("Herb"));thread andrei = thread(Worker("Andrei"));thread scott = thread(Worker("Scott"));thread bjarne = thread(Worker("Bjarne"));herb.join();andrei.join();scott.join();bjarne.join();}
3.预防死锁
m.lock();
sharedVar= getVar(); //如果此处抛出异常,会导致m.unlock未调用,锁不能被释放,其他线程无法得到锁,进而可能产生死锁
m.unlock()
#include <iostream>
#include <mutex>using namespace std;struct CriticalData {std::mutex mut;
};
void deadLock(CriticalData& a, CriticalData& b) {a.mut.lock();std::cout << "get the first mutex\n";std::this_thread::sleep_for(std::chrono::milliseconds(1));b.mut.lock();std::cout << "get the second mutex\n";a.mut.unlock(), b.mut.unlock();
}int main()
{CriticalData c1;CriticalData c2;//t1, t2在拿到锁后都在等对方释放锁std::thread t1([&] { deadLock(c1, c2); });std::thread t2([&] { deadLock(c2, c1); });t1.join();t2.join();
}
4. 自动释放锁
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>using namespace std;std::mutex mutexCout;
struct Worker {Worker(std::string n) :name(n) {};void operator() () {for (int i = 1; i <= 3; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::lock_guard<std::mutex> myLock(mutexCout);//自动释放锁std::cout << name << ": " << "Work " << i << std::endl;}}
private:std::string name;
};int main()
{thread herb = thread(Worker("Herb"));thread andrei = thread(Worker("Andrei"));thread scott = thread(Worker("Scott"));thread bjarne = thread(Worker("Bjarne"));herb.join();andrei.join();scott.join();bjarne.join();
}
5. 延迟锁
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>using namespace std;using namespace std;
struct CriticalData {mutex mut;
};
void deadLockResolved(CriticalData& a, CriticalData& b) {unique_lock<mutex>guard1(a.mut, defer_lock);cout << this_thread::get_id() << ": get the first lock" << endl;this_thread::sleep_for(chrono::milliseconds(1));unique_lock<mutex>guard2(b.mut, defer_lock);cout << this_thread::get_id() << ": get the second lock" << endl;cout << this_thread::get_id() << ": atomic locking" << endl;lock(guard1, guard2);
}int main()
{CriticalData c1;CriticalData c2;thread t1([&] { deadLockResolved(c1, c2); });thread t2([&] { deadLockResolved(c2, c1); });t1.join();t2.join();
}
6. 共享锁
#include <mutex>
...
std::shared_timed_mutex sharedMutex;
std::unique_lock<std::shared_timed_mutex> writerLock(sharedMutex);
std::shared_lock<std::shared_time_mutex> readerLock(sharedMutex);
std::shared_lock<std::shared_time_mutex> readerLock2(sharedMutex);
7. 线程安全的初始化
//常量表达式是线程安全的
struct MyDouble{
constexpr MyDouble(double v):val(v){};
constexpr double getValue(){ return val; }
private:
double val
};
constexpr MyDouble myDouble(10.5);
std::cout << myDouble.getValue(); // 10.5
//块内静态变量
void blockScope(){
static int MySharedDataInt= 2011;
}
//once_flag, call_once
#include <mutex>
...
using namespace std;
once_flag onceFlag;
void do_once(){
call_once(onceFlag, []{ cout << "Only once." << endl; });
}
thread t1(do_once);
thread t2(do_once);
四:线程局部存储
std::mutex coutMutex;
thread_local std::string s("hello from ");
void addThreadLocal(std::string const& s2){
s+= s2;
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << s << std::endl;
std::cout << "&s: " << &s << std::endl;
std::cout << std::endl;
}
std::thread t1(addThreadLocal, "t1");
std::thread t2(addThreadLocal, "t2");
std::thread t3(addThreadLocal, "t3");
std::thread t4(addThreadLocal, "t4");
五:线程通信
1.条件变量
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>using namespace std;std::mutex mutex_;
std::condition_variable condVar;
bool dataReady = false;
void doTheWork() {std::cout << "Processing shared data." << std::endl;
}
void waitingForWork() {std::cout << "Worker: Waiting for work." << std::endl;std::unique_lock<std::mutex> lck(mutex_);condVar.wait(lck, [] { return dataReady; });doTheWork();std::cout << "Work done." << std::endl;
}
void setDataReady() {std::lock_guard<std::mutex> lck(mutex_);dataReady = true;std::cout << "Sender: Data is ready." << std::endl;condVar.notify_one();
}int main()
{std::thread t1(waitingForWork);std::thread t2(setDataReady);t1.join();t2.join();
}
2. 防止虚假唤醒
//为了防止虚假唤醒,在唤醒前应进行条件检查,且发送方应将条件置为true。
//dataReady = true; //发送方设置条件满足
//[] { return dataReady; } //接收方进行条件检查
3. 防止唤醒丢失
//如果发送方在接收方等待之前,就发送了唤醒,可能会导致唤醒丢失,因此要做两件事:
//1: 要先等待,后发送唤醒
//2: 在接收方的等待函数中要检查是否满足条件 [] { return dataReady; };
4.信号量
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <vector>using namespace std;std::vector<int> myVec;std::counting_semaphore<1> prepareSignal(0); // (1)
void prepareWork() {myVec.insert(myVec.end(), { 0, 1, 0, 3 });std::cout << "Sender: Data prepared." << '\n';prepareSignal.release(); // (2)
}void completeWork() {std::cout << "Waiter: Waiting for data." << '\n';prepareSignal.acquire(); // (3)myVec[2] = 2;std::cout << "Waiter: Complete the work." << '\n';for (auto i : myVec) std::cout << i << " ";std::cout << '\n';
}int main()
{std::thread t1(prepareWork);std::thread t2(completeWork);t1.join();t2.join();
}
5. std::latch
#include <atomic>
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <semaphore>
#include <vector>
#include <latch>using namespace std;std::mutex coutMutex;std::latch workDone(2);
std::latch goHome(1); // (5)
void synchronizedOut(const std::string s) {std::lock_guard<std::mutex> lo(coutMutex);std::cout << s;
}class Worker {
public:Worker(std::string n) : name(n) { };void operator() () {// notify the boss when work is donesynchronizedOut(name + ": " + "Work done!\n");workDone.count_down(); // (3) 完成工作// waiting before going homegoHome.wait();//等待老板发命令让他们回家synchronizedOut(name + ": " + "Good bye!\n");}
private:std::string name;
};int main()
{std::cout << "BOSS: START WORKING! " << '\n';Worker herb(" Herb"); // (1) 工人1std::thread herbWork(herb); //工人1必须完成自己的工作Worker scott(" Scott"); // (2) 工人2std::thread scottWork(scott);//工人2必须完成自己的工作workDone.wait(); // (4) 完成工作后等待std::cout << '\n';goHome.count_down();//老板发命令回家std::cout << "BOSS: GO HOME!" << '\n';herbWork.join();scottWork.join();
}
6. std::barrier
#include <barrier>
#include <iostream>
#include <string>
#include <syncstream>
#include <thread>
#include <vector>int main()
{const auto workers = { "Anil", "Busara", "Carl" };auto on_completion = []() noexcept{// locking not needed herestatic auto phase ="... done\n""Cleaning up...\n";std::cout << phase;phase = "... done\n";};std::barrier sync_point(std::ssize(workers), on_completion);auto work = [&](std::string name){std::string product = " " + name + " worked\n";std::osyncstream(std::cout) << product; // ok, op<< call is atomicsync_point.arrive_and_wait();product = " " + name + " cleaned\n";std::osyncstream(std::cout) << product;sync_point.arrive_and_wait();};std::cout << "Starting...\n";std::vector<std::jthread> threads;threads.reserve(std::size(workers));for (auto const& worker : workers)threads.emplace_back(work, worker);
}
六:任务
1. std::promise, std::future
#include <future>
#include <iostream>void product(std::promise<int>&& intPromise, int a, int b) {intPromise.set_value(a * b);
}
int main()
{int a = 20;int b = 10;std::promise<int> prodPromise;std::future<int> prodResult = prodPromise.get_future();std::jthread prodThread(product, std::move(prodPromise), a, b);std::cout << "20*10= " << prodResult.get(); // 20*10= 200
}
2. 用std::promise, std::future进行线程同步
#include <future>
#include <iostream>void doTheWork() {std::cout << "Processing shared data." << std::endl;
}
void waitingForWork(std::future<void>&& fut) {std::cout << "Worker: Waiting for work." <<std::endl;fut.wait();doTheWork();std::cout << "Work done." << std::endl;
}
void setDataReady(std::promise<void>&& prom) {std::cout << "Sender: Data is ready." <<std::endl;prom.set_value();
}int main()
{std::promise<void> sendReady;auto fut = sendReady.get_future();std::jthread t1(waitingForWork, std::move(fut));std::jthread t2(setDataReady, std::move(sendReady));}
3. std::async
#include <future>
#include <iostream>using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;int main()
{auto begin = system_clock::now();auto asyncLazy = std::async(launch::deferred, [] { return system_clock::now(); });auto asyncEager = std::async(launch::async, [] { return system_clock::now(); });std::this_thread::sleep_for(std::chrono::seconds(1));auto lazyStart = asyncLazy.get() - begin;auto eagerStart = asyncEager.get() - begin;auto lazyDuration = duration<double>(lazyStart).count();auto eagerDuration = duration<double>(eagerStart).count();std::cout << lazyDuration << " sec"; // 1.00018 sec.std::cout << eagerDuration << " sec"; // 0.00015489 sec.
}
#include <future>
#include <iostream>
#include <thread>using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;int main()
{int res;std::thread t([&] { res = 2000 + 11; });t.join();std::cout << res << std::endl; // 2011auto fut = std::async([] { return 2000 + 11; });//异步调用std::cout << fut.get() << std::endl; // 2011
}
4. std::package_task
#include <future>
#include <iostream>
#include <queue>
#include <thread>using namespace std;
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;struct SumUp {int operator()(int beg, int end) {for (int i = beg; i < end; ++i) sum += i;return sum;}
private:int beg;int end;int sum{ 0 };
};int main()
{SumUp sumUp1, sumUp2;packaged_task<int(int, int)> sumTask1(sumUp1);//任务1packaged_task<int(int, int)> sumTask2(sumUp2);//任务2future<int> sum1 = sumTask1.get_future(); //任务1的结果future<int> sum2 = sumTask2.get_future(); //任务2的结果deque< packaged_task<int(int, int)>> allTasks; //存储所有的任务allTasks.push_back(move(sumTask1));//将任务1加入队列allTasks.push_back(move(sumTask2));//将任务2加入队列int begin{ 1 };int increment{ 5000 };int end = begin + increment;while (not allTasks.empty()) {packaged_task<int(int, int)> myTask = move(allTasks.front());//取出1个任务allTasks.pop_front();thread sumThread(move(myTask), begin, end);//执行这个任务begin = end;end += increment;sumThread.detach();}auto sum = sum1.get() + sum2.get();//查询任务的结果cout << sum;}