文章目录
- 4.1 等待事件或条件
- 4.1.1 使用条件变量等待
- 4.1.2 构建线程安全队列
- 条件变量 介绍
- 1. `std::condition_variable`
- 2. `std::condition_variable_any`
- 3. 成员函数
- a. `wait()`
- b. `wait_for()`
- c. `wait_until()`
- d. `notify_one()`
- e. `notify_all()`
- 总结
- 4.2 使用Future
- 4.2.1 后台任务的返回值
- 1. `std::async`的基本概念
- 2. 参数传递机制
- 3. 代码分析
- 示例1:成员函数调用
- 示例2:函数对象调用
- 示例3:普通函数调用
- 示例4:仅支持移动的对象
- 总结
- 4.2.2 Future与任务关联
- 示例代码
- 使用特化的 `packaged_task` 类
- 关键点解释
- 1. `std::packaged_task` 是个可调用对象
- 2. 可以封装在 `std::function` 对象中,从而作为线程函数传递到 `std::thread` 对象中,或作为可调用对象传递到另一个函数中或直接调用
- 3. 当 `std::packaged_task` 作为函数调用时,实参将由函数调用操作符传递至底层函数
- 4. 返回值作为异步结果存储在 `std::future` 中,并且可通过 `get_future()` 获取
- 5. 因此可以用 `std::packaged_task` 对任务进行打包,并适时的取回 `future`
- 6. 当异步任务需要返回值时,可以等待 `future` 状态变为“就绪”
- 示例代码
- 关键点总结
- 线程间传递任务
- 4.2.3 使用std::promise
- 1. `std::promise`
- 2. `std::packaged_task`
- 3. `std::async`
- 区别总结
- 异常处理与Future
- 多线程等待与`std::shared_future`
- 4.3 限时等待
- 4.3.1 时钟
- 4.3.2 时间段
- 4.3.3 时间点
- 4.3.4 使用超时
4.1 等待事件或条件
想象一下你在夜晚乘坐火车旅行。为了在正确的车站下车,你可以选择两种方法:
- 整晚都不睡觉,时刻关注着到站信息。虽然这样你不会错过站点,但你会非常累。
- 查看列车时刻表,估计自己什么时候应该醒来,然后设定闹钟。这种方法让你可以在一定程度上休息,但如果火车晚点了,你可能会被过早叫醒;而且如果闹钟没电了,你甚至可能睡过头。
理想的情况是,无论火车是提前还是迟到,总有人会在到达时叫醒你。这就像编程中的线程等待某个事件发生一样。
4.1.1 使用条件变量等待
当一个线程需要等待另一个线程完成某项任务时,它可以持续检查共享数据的状态(比如一个标志位),直到另一线程完成并修改这个标志位。但这会消耗大量的计算资源,并且可能导致其他线程无法及时获取所需的资源。
更好的方法是在等待期间让线程休眠一段时间,而不是一直占用CPU资源。例如:
bool flag;
std::mutex m;void wait_for_flag()
{std::unique_lock<std::mutex> lk(m);while(!flag){lk.unlock(); // 解锁互斥量std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 休眠100毫秒lk.lock(); // 再次锁定互斥量}
}
在这个例子中,线程在每次检查后都会释放互斥量,然后休眠一小段时间再重新锁定互斥量进行下一次检查。这样做既节省了资源,又避免了长时间占用资源的问题。
但是,确定最佳的休眠时间并不容易。如果休眠时间太短,效率提升有限;如果休眠时间太长,则会导致响应延迟。
最理想的解决方案是使用C++标准库提供的工具——条件变量。条件变量允许线程在特定条件下自动进入休眠状态,并在条件满足时自动唤醒。例如:
代码4.1 使用std::condition_variable
处理数据等待
#include <queue>
#include <mutex>
#include <condition_variable>std::mutex mut;
std::queue<data_chunk> data_queue; // 数据队列
std::condition_variable data_cond;// 准备数据的线程
void data_preparation_thread()
{while(more_data_to_prepare()){data_chunk const data=prepare_data();std::lock_guard<std::mutex> lk(mut);data_queue.push(data); // 将数据放入队列data_cond.notify_one(); // 唤醒一个等待的线程}
}// 处理数据的线程
void data_processing_thread()
{while(true){std::unique_lock<std::mutex> lk(mut); // 锁定互斥量data_cond.wait(lk,[]{return !data_queue.empty();}); // 等待队列不为空data_chunk data=data_queue.front();data_queue.pop();lk.unlock(); // 解锁互斥量process(data);if(is_last_chunk(data))break;}
}
在这个例子中,准备数据的线程将数据放入队列后,通过notify_one()
通知正在等待的线程;而处理数据的线程则在队列为空时进入休眠状态,一旦有新数据到来就被唤醒继续处理。
4.1.2 构建线程安全队列
我们可以通过上述思路构建一个线程安全的队列,使得多个线程可以安全地向队列中添加和取出数据。以下是完整的实现:
代码4.5 使用条件变量的线程安全队列
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>template<typename T>
class threadsafe_queue
{
private:mutable std::mutex mut; // 互斥量必须是可变的 std::queue<T> data_queue;std::condition_variable data_cond;
public:threadsafe_queue() {}void push(T new_value){std::lock_guard<std::mutex> lk(mut);data_queue.push(new_value);data_cond.notify_one();}void wait_and_pop(T& value){std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});value=data_queue.front();data_queue.pop();}std::shared_ptr<T> wait_and_pop(){std::unique_lock<std::mutex> lk(mut);data_cond.wait(lk,[this]{return !data_queue.empty();});std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool try_pop(T& value){std::lock_guard<std::mutex> lk(mut);if(data_queue.empty())return false;value=data_queue.front();data_queue.pop();return true;}std::shared_ptr<T> try_pop(){std::lock_guard<std::mutex> lk(mut);if(data_queue.empty())return std::shared_ptr<T>();std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool empty() const{std::lock_guard<std::mutex> lk(mut);return data_queue.empty();}
};
在这个实现中,push()
方法用于向队列中添加数据,并通过notify_one()
唤醒一个等待的线程;wait_and_pop()
方法用于从队列中取出数据,如果没有数据则进入休眠状态,直到有新的数据到来;try_pop()
方法尝试从队列中取出数据,如果没有数据则直接返回失败。
通过这种方式,我们可以确保多个线程能够安全地访问和操作同一个队列,同时避免了不必要的资源浪费。
条件变量 介绍
当然可以!条件变量(std::condition_variable
)是C++标准库中用于线程间同步的重要工具。它允许一个或多个线程等待某个条件变为真,然后被其他线程通知继续执行。下面是与条件变量相关的几个关键函数及其用途:
1. std::condition_variable
这是最基本的条件变量类。它只能与std::mutex
一起使用。
2. std::condition_variable_any
这是一个更通用的条件变量类。它可以与任何满足Lockable要求的互斥量(如std::mutex
、std::recursive_mutex
等)一起使用。
3. 成员函数
a. wait()
- 作用: 让当前线程进入阻塞状态,直到被通知并且指定的条件为真。
- 参数:
std::unique_lock<std::mutex>& lock
: 必须是一个已经被锁定的互斥锁。Predicate pred
: 可选的谓词函数,只有当这个函数返回true
时,线程才会真正解除阻塞并继续执行。
- 示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker_thread()
{std::unique_lock<std::mutex> lck(mtx);cv.wait(lck, []{ return ready; }); // 等待ready变为truestd::cout << "Worker thread is processing data.\n";
}int main()
{std::thread worker(worker_thread);{std::lock_guard<std::mutex> lck(mtx);ready = true;}cv.notify_one(); // 通知worker线程worker.join();
}
b. wait_for()
- 作用: 类似于
wait()
,但会在指定时间内等待条件变为真,如果超时仍未满足条件,则线程会自动恢复运行。 - 参数:
std::unique_lock<std::mutex>& lock
: 同wait()
。chrono::duration timeout_duration
: 超时时间。Predicate pred
: 可选的谓词函数。
- 返回值: 如果是因为条件满足而退出返回
true
,如果是超时退出返回false
。 - 示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker_thread()
{std::unique_lock<std::mutex> lck(mtx);if (cv.wait_for(lck, std::chrono::seconds(5), []{ return ready; })) {std::cout << "Condition met! Worker thread is processing data.\n";} else {std::cout << "Timeout occurred. No work to do.\n";}
}int main()
{std::thread worker(worker_thread);// Simulate some delay before setting the conditionstd::this_thread::sleep_for(std::chrono::seconds(3));{std::lock_guard<std::mutex> lck(mtx);ready = true;}cv.notify_one();worker.join();
}
c. wait_until()
- 作用: 类似于
wait_for()
,但它接受一个绝对时间点而不是相对时间间隔。 - 参数:
std::unique_lock<std::mutex>& lock
: 同wait()
。chrono::time_point timeout_time
: 绝对时间点。Predicate pred
: 可选的谓词函数。
- 返回值: 如果是因为条件满足而退出返回
true
,如果是超时退出返回false
。 - 示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker_thread()
{std::unique_lock<std::mutex> lck(mtx);auto now = std::chrono::system_clock::now();if (cv.wait_until(lck, now + std::chrono::seconds(5), []{ return ready; })) {std::cout << "Condition met! Worker thread is processing data.\n";} else {std::cout << "Timeout occurred. No work to do.\n";}
}int main()
{std::thread worker(worker_thread);// Simulate some delay before setting the conditionstd::this_thread::sleep_for(std::chrono::seconds(3));{std::lock_guard<std::mutex> lck(mtx);ready = true;}cv.notify_one();worker.join();
}
d. notify_one()
- 作用: 唤醒一个正在等待的线程。如果有多个线程在等待,只会随机选择一个进行唤醒。
- 参数: 无。
- 示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker_thread(int id)
{std::unique_lock<std::mutex> lck(mtx);cv.wait(lck, []{ return ready; });std::cout << "Thread " << id << " processed the data.\n";
}int main()
{std::thread workers[5];for (int i = 0; i < 5; ++i) {workers[i] = std::thread(worker_thread, i);}{std::lock_guard<std::mutex> lck(mtx);ready = true;}cv.notify_all(); // 或者使用 notify_one()for (auto& t : workers) {t.join();}
}
e. notify_all()
- 作用: 唤醒所有正在等待的线程。
- 参数: 无。
- 示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>std::mutex mtx;
std::condition_variable cv;
bool ready = false;void worker_thread(int id)
{std::unique_lock<std::mutex> lck(mtx);cv.wait(lck, []{ return ready; });std::cout << "Thread " << id << " processed the data.\n";
}int main()
{std::thread workers[5];for (int i = 0; i < 5; ++i) {workers[i] = std::thread(worker_thread, i);}{std::lock_guard<std::mutex> lck(mtx);ready = true;}cv.notify_all(); // 唤醒所有等待的线程for (auto& t : workers) {t.join();}
}
总结
wait()
: 阻塞线程,直到条件为真。wait_for()
: 在一定时间内阻塞线程,直到条件为真或超时。wait_until()
: 在特定时间点前阻塞线程,直到条件为真或超时。notify_one()
: 唤醒一个等待的线程。notify_all()
: 唤醒所有等待的线程。
这些函数共同构成了条件变量的核心功能,使得多线程编程中的同步变得更加高效和可靠。希望这些解释能帮助你更好地理解和使用条件变量!
当然可以,以下是排版后的总结:
4.2 使用Future
假设你要乘飞机去国外度假,在到达机场办理完各种登机手续后,你需要等待机场广播通知登机。在这段时间内,你可能会在候机室里找一些事情来打发时间,比如读书、上网或者喝杯咖啡。不过,你实际上就是在等待一件事情:机场广播通知登机。
C++标准库将这种事件称为future
。当线程需要等待特定事件时,某种程度上来说就需要知道期望的结果。之后,线程会周期性(较短的周期)地等待或检查事件是否触发(检查信息板),并在检查期间执行其他任务(品尝昂贵的咖啡)。另外,等待任务期间也可以先执行其他任务,直到对应的任务触发,然后等待future
的状态会变为就绪状态。future
可能是与数据相关(比如,登机口编号),也可能不是。当事件发生时(状态为就绪),这个future
就不能重置了。
C++标准库中有两种future
,声明在<future>
头文件中:std::future
和std::shared_future
,分别类似于std::unique_ptr
和std::shared_ptr
。std::future
只能与指定事件相关联,而std::shared_future
可以关联多个事件。后者的所有实例会在同时变为就绪状态,并且可以访问与事件相关的数据。这种关联与模板有关,例如std::unique_ptr
和std::shared_ptr
的模板参数就是相关的数据类型。对于不涉及数据的情况,可以使用std::future<void>
和std::shared_future<void>
的特化模板。虽然我倾向于线程通讯,但future
对象本身并不提供同步访问。当多个线程需要访问一个独立的future
对象时,必须使用互斥量或其他同步机制进行保护。然而,当多个线程对一个std::shared_future<>
副本进行访问时,即使同一个异步结果,也不需要同步future
。
并行技术规范在std::experimental
命名空间中扩展了这两个模板类:std::experimental::future<>
和std::experimental::shared_future<>
。这个命名空间是为了将其与std
命名空间中的模板类进行区分,实验命名空间中为这两个模板类添加了更多的功能。尤其是std::experimental
中的内容与代码质量无关(我希望这里也会有较高的实现质量),需要注意的是这个命名空间提供的都不是标准类和函数,语法和语义可能与纳入C++标准(即std
命名空间)后有所不同。如果想要使用这两个试验性的模板类,需要包含<experimental/future>
头文件。
4.2.1 后台任务的返回值
假设有一个需要长时间计算的操作,需要计算出一个有效值,但并不迫切需要这个值。你可以启动新线程来执行这个计算,而std::thread
并不提供直接接收返回值的机制。这时就可以使用std::async
函数模板(也在<future>
头文件中)。
当不着急让任务结果时,可以使用std::async
启动一个异步任务。与std::thread
不同,std::async
会返回一个std::future
对象,这个对象持有最终计算出来的结果。当需要这个值时,只需要调用这个对象的get()
成员函数,就会阻塞线程直到future
为就绪为止,并返回计算结果。
代码4.6 std::future从异步任务中获取返回值
#include <future>
#include <iostream>int find_the_answer_to_ltuae();
void do_other_stuff();int main()
{std::future<int> the_answer = std::async(find_the_answer_to_ltuae);do_other_stuff();std::cout << "The answer is " << the_answer.get() << std::endl;
}
与std::thread
方式一样,std::async
允许通过添加额外的调用参数,向函数传递额外的参数。第一个参数是指向成员函数的指针,第二个参数提供这个函数成员类的具体对象(可以通过指针或包装在std::ref
中),剩余的参数作为函数的参数传入。否则,第二个及随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。与std::thread
一样,当参数为右值时,拷贝操作将使用移动的方式转移原始数据,因此可以使用“只移动”类型作为函数对象和参数。
代码4.7 使用std::async向函数传递参数
#include <string>
#include <future>struct X
{void foo(int, std::string const&);std::string bar(std::string const&);
};X x;auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用 p->foo(42, "hello"), p 是指向 x 的指针
auto f2 = std::async(&X::bar, x, "goodbye"); // 调用 tmpx.bar("goodbye"), tmpx 是 x 的拷贝副本struct Y
{double operator()(double);
};Y y;auto f3 = std::async(Y(), 3.141); // 调用 tmpy(3.141), tmpy 通过 Y 的移动构造函数得到
auto f4 = std::async(std::ref(y), 2.718); // 调用 y(2.718)X baz(X&);auto f5 = std::async(baz, std::ref(x)); // 调用 baz(x)class move_only
{
public:move_only();move_only(move_only&&);move_only(const&) = delete;move_only& operator=(move_only&&);move_only& operator=(const&) = delete;void operator()();
};auto f6 = std::async(move_only()); // 调用 tmp(), tmp 是通过 std::move(move_only()) 构造得到
理解std::async
的参数传递机制需要了解C++中的引用、值传递以及移动语义。下面我将详细解释你提供的代码片段,并说明每个异步调用是如何处理参数传递的。
1. std::async
的基本概念
std::async
是C++11引入的一个函数模板,用于启动一个异步任务并返回一个std::future
对象,该对象可以在将来获取任务的结果。其基本形式如下:
template< class Function, class... Args >
std::future<std::result_of_t<Function(Args...)>> async( Function&& f, Args&&... args );
Function&& f
:要执行的可调用对象(如函数、lambda表达式、函数对象等)。Args&&... args
:传递给可调用对象的参数列表。
2. 参数传递机制
在std::async
中,参数通过值传递或引用传递的方式传递给异步任务。默认情况下,参数是通过值传递的,但如果使用std::ref
或std::cref
,则可以通过引用传递。
3. 代码分析
示例1:成员函数调用
struct X
{void foo(int, std::string const&);std::string bar(std::string const&);
};X x;auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用 p->foo(42, "hello"), p 是指向 x 的指针
auto f2 = std::async(&X::bar, x, "goodbye"); // 调用 tmpx.bar("goodbye"), tmpx 是 x 的拷贝副本
-
f1:
&X::foo
:指向成员函数foo
的指针。&x
:指向x
的指针,作为第一个隐式参数(this
指针)传递给成员函数。42
和"hello"
:作为第二个和第三个参数传递给foo
。
这实际上会调用
x.foo(42, "hello")
。 -
f2:
&X::bar
:指向成员函数bar
的指针。x
:x
的拷贝副本(因为没有使用std::ref
),作为第一个隐式参数(this
指针)传递给成员函数。"goodbye"
:作为第二个参数传递给bar
。
这实际上会调用
tmpx.bar("goodbye")
,其中tmpx
是x
的一个拷贝。
示例2:函数对象调用
struct Y
{double operator()(double);
};Y y;auto f3 = std::async(Y(), 3.141); // 调用 tmpy(3.141), tmpy 通过 Y 的移动构造函数得到
auto f4 = std::async(std::ref(y), 2.718); // 调用 y(2.718)
-
f3:
Y()
:临时创建一个Y
对象,通过移动构造函数生成一个新的临时对象tmpy
。3.141
:作为参数传递给operator()
。
这实际上会调用
tmpy(3.141)
。 -
f4:
std::ref(y)
:通过引用传递y
,而不是拷贝它。2.718
:作为参数传递给operator()
。
这实际上会调用
y(2.718)
。
示例3:普通函数调用
X baz(X&);auto f5 = std::async(baz, std::ref(x)); // 调用 baz(x)
-
f5:
baz
:普通函数baz
。std::ref(x)
:通过引用传递x
,而不是拷贝它。
这实际上会调用
baz(x)
。
示例4:仅支持移动的对象
class move_only
{
public:move_only();move_only(move_only&&);move_only(const&) = delete;move_only& operator=(move_only&&);move_only& operator=(const&) = delete;void operator()();
};auto f6 = std::async(move_only()); // 调用 tmp(), tmp 是通过 std::move(move_only()) 构造得到
-
f6:
move_only()
:临时创建一个move_only
对象,由于该类只支持移动语义,因此通过std::move
生成一个新的临时对象tmp
。- 没有其他参数传递给
operator()
。
这实际上会调用
tmp()
,其中tmp
是通过std::move(move_only())
构造得到的。
总结
- 值传递:默认情况下,参数通过值传递给异步任务,这意味着它们会被拷贝。
- 引用传递:使用
std::ref
或std::cref
可以将参数通过引用传递给异步任务,避免拷贝开销。 - 移动语义:对于仅支持移动的对象,必须通过
std::move
来传递,确保对象能够被正确地移动到异步任务中。
理解这些机制有助于正确设计和使用std::async
,以确保参数传递方式符合预期,避免不必要的拷贝和潜在的性能问题。
future
的等待取决于std::async
是否启动了一个线程,或是否有任务在进行同步。大多数情况下,也可以在函数调用之前向std::async
传递一个额外参数,这个参数的类型是std::launch
,可以是以下几种之一:
std::launch::deferred
: 表明函数调用延迟到wait()
或get()
函数调用时才执行。std::launch::async
: 表明函数必须在其所在的独立线程上执行。std::launch::deferred | std::launch::async
: 表明实现可以选择这两种方式的一种。
最后一个选项是默认的。当函数调用延迟,就可能不会再运行了。
示例:
auto f6 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行
auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 在 wait() 或 get() 调用时执行
auto f8 = std::async(std::launch::deferred | std::launch::async, baz, std::ref(x)); // 实现选择执行方式
auto f9 = std::async(baz, std::ref(x));
f7.wait(); // 调用延迟函数
本章的后续小节和第8章中,会再次看到这段程序,使用std::async
会将算法分割到各个任务中,这样程序就能并发执行。不过,这不是让std::future
与任务实例相关联的唯一方式,也可以将任务包装入std::packaged_task<>
中,或通过编写代码的方式,使用std::promise<>
模板显式设置值。与std::promise<>
相比,std::packaged_task<>
具有更高的抽象,所以我们从“高抽象”模板说起。
4.2.2 Future与任务关联
std::packaged_task<>
会将future
与函数或可调用对象进行绑定。当调用std::packaged_task<>
对象时,就会调用相关函数或可调用对象,当future
状态为就绪时,会存储返回值。这可以用在构建线程池(可见第9章)或其他任务的管理中,例如:在任务所在线程上运行其他任务,或将它们串行运行在一个特殊的后台线程上。当粒度较大的操作被分解为独立的子任务时,每个子任务都可以包含在std::packaged_task<>
实例中,之后将实例传递到任务调度器或线程池中。对任务细节进行抽象,调度器仅处理std::packaged_task<>
实例,而非处理单独的函数。
std::packaged_task<>
的模板参数是一个函数签名,例如void()
就是一个没有参数也没有返回值的函数,或int(std::string&, double*)
就是一个接受非const引用的std::string
参数和一个指向double
类型的指针参数,并且返回类型是int
。构造std::packaged_task<>
实例时,就必须传入函数或可调用对象。这个函数或可调用的对象,需要能接收指定的参数和返回(可转换为指定返回类型的)值。类型可以不完全匹配,因为这里类型可以隐式转换,可以用int
类型参数和返回float
类型的函数,来构建std::packaged_task<double(double)>
实例。
函数签名的返回类型可以用来标识从get_future()
返回的std::future<>
的类型,而函数签名的参数列表,可用来指定packaged_task
的函数调用操作符。例如,模板偏特化std::packaged_task<std::string(std::vector<char>*, int)>
会在下面的代码中使用到。
代码4.8 std::packaged_task<>的偏特化
template<>
class packaged_task<std::string(std::vector<char>*, int)>
{
public:template<typename Callable>explicit packaged_task(Callable&& f);std::future<std::string> get_future();void operator()(std::vector<char>*, int);
};
std::packaged_task
是个可调用对象,可以封装在std::function
对象中,从而作为线程函数传递到std::thread
对象中,或作为可调用对象传递到另一个函数中或直接调用。当std::packaged_task
作为函数调用时,实参将由函数调用操作符传递至底层函数,并且返回值作为异步结果存储在std::future
中,并且可通过get_future()
获取。因此可以用std::packaged_task
对任务进行打包,并适时取回future
。当异步任务需要返回值时,可以等待future
状态变为“就绪”。
您是对的,之前的示例没有直接使用特化的 packaged_task
类。为了更清楚地展示如何使用这个特化的 packaged_task
类,我们将重新编写一个具体的示例,并确保它使用了您提供的特化模板类定义。
示例代码
假设我们有以下特化的 packaged_task
类:
#include <iostream>
#include <future>
#include <vector>
#include <thread>// 特化的packaged_task类
template<>
class packaged_task<std::string(std::vector<char>*, int)>
{
public:// 构造函数,接受任何符合签名的可调用对象template<typename Callable>explicit packaged_task(Callable&& f) : func_(std::forward<Callable>(f)) {}// 获取与任务关联的futurestd::future<std::string> get_future() {return promise_.get_future();}// 调用操作符,执行任务void operator()(std::vector<char>* data, int multiplier) {promise_.set_value(func_(data, multiplier));}private:std::promise<std::string> promise_; // 用于存储结果std::function<std::string(std::vector<char>*, int)> func_; // 存储的可调用对象
};
使用特化的 packaged_task
类
接下来,我们编写一个完整的示例,展示如何使用这个特化的 packaged_task
类来异步处理数据并获取结果。
#include <iostream>
#include <future>
#include <vector>
#include <thread>// 特化的packaged_task类(如上所示)// 示例函数:处理数据
std::string process_data(std::vector<char>* data, int multiplier) {std::string result;for (char c : *data) {result += c * multiplier;}return result;
}int main() {// 创建一个包含一些数据的vectorstd::vector<char> data = {'a', 'b', 'c'};// 创建一个packaged_task,包装process_data函数packaged_task<std::string(std::vector<char>*, int)> task(process_data);// 获取futurestd::future<std::string> result_future = task.get_future();// 在新线程中运行taskstd::thread t(std::move(task), &data, 2);// 等待任务完成并获取结果std::cout << "进行其他工作..." << std::endl;std::string result = result_future.get(); // 如果任务未完成,这里会阻塞直到任务完成std::cout << "处理后的数据: " << result << std::endl;// 不要忘记join线程t.join();return 0;
}
关键点解释
-
特化的
packaged_task
类:- 这个类模板特化是为了处理特定签名的函数:
std::string(std::vector<char>*, int)
。 - 它包含一个
std::promise<std::string>
成员变量,用于存储异步任务的结果。 - 它还包含一个
std::function<std::string(std::vector<char>*, int)>
成员变量,用于存储传入的可调用对象。
- 这个类模板特化是为了处理特定签名的函数:
-
构造函数:
- 接受任何符合签名的可调用对象,并通过完美转发将其存储在
func_
成员变量中。
- 接受任何符合签名的可调用对象,并通过完美转发将其存储在
-
get_future()
方法:- 返回与
promise_
关联的std::future<std::string>
对象,这样可以在异步操作完成后获取结果。
- 返回与
-
operator()
方法:- 当调用
packaged_task
对象时,实际执行的是内部存储的可调用对象,并将结果通过promise_
设置给future
。
- 当调用
-
示例函数
process_data
:- 这是一个简单的函数,接受一个
std::vector<char>*
和一个int
参数,返回一个std::string
结果。
- 这是一个简单的函数,接受一个
-
主函数中的使用:
- 创建了一个
packaged_task
对象,包装了process_data
函数。 - 获取了与任务关联的
future
,并在新线程中执行任务。 - 使用
result_future.get()
等待任务完成并获取其结果。
- 创建了一个
通过这种方式,您可以利用特化的 packaged_task
来封装和管理具有特定签名的异步任务及其结果。这种方法提供了对异步编程的强大控制,同时保持了代码的简洁性和灵活性。
这句话详细描述了 std::packaged_task
在 C++ 中如何用于封装异步任务,并通过 std::future
获取其结果。以下是对这段话的逐句解释:
1. std::packaged_task
是个可调用对象
- 解释:
std::packaged_task
是一个可以像普通函数一样被调用的对象。它包装了一个可调用实体(如函数、lambda表达式或函数对象),并允许你以统一的方式处理这些可调用实体。
2. 可以封装在 std::function
对象中,从而作为线程函数传递到 std::thread
对象中,或作为可调用对象传递到另一个函数中或直接调用
- 解释:
std::packaged_task
可以被转换为std::function
类型,这意味着它可以像其他可调用对象一样被传递和使用。- 这使得你可以将
std::packaged_task
传递给std::thread
来在新线程中执行任务。 - 它也可以传递给其他需要可调用对象的函数,或者直接调用。
3. 当 std::packaged_task
作为函数调用时,实参将由函数调用操作符传递至底层函数
- 解释:
- 当你调用
std::packaged_task
对象时(例如使用operator()
),传递给它的参数会被转发给内部封装的可调用对象(如函数、lambda等)。 - 这意味着你可以像调用普通函数一样调用
std::packaged_task
对象,并且传递的参数会正确地传递给实际的任务函数。
- 当你调用
4. 返回值作为异步结果存储在 std::future
中,并且可通过 get_future()
获取
- 解释:
std::packaged_task
在执行时会将返回值存储在一个关联的std::future
对象中。- 你可以通过调用
get_future()
方法来获取这个std::future
对象。 - 之后,可以通过
std::future
的get()
方法获取任务的返回值。如果任务尚未完成,get()
会阻塞当前线程直到任务完成。
5. 因此可以用 std::packaged_task
对任务进行打包,并适时的取回 future
- 解释:
- 使用
std::packaged_task
,你可以将任意可调用对象(如函数、lambda等)打包成一个任务。 - 这个任务可以在不同的线程中异步执行,而不需要立即等待其结果。
- 通过
get_future()
获取的std::future
对象可以用来在稍后的时间点获取任务的结果。
- 使用
6. 当异步任务需要返回值时,可以等待 future
状态变为“就绪”
- 解释:
- 如果异步任务有返回值,你可以通过
std::future
对象等待任务完成。 - 调用
future.get()
会阻塞当前线程,直到任务完成并将结果存储在future
中。 - 一旦任务完成,
future
的状态变为“就绪”,此时调用get()
会立即返回任务的结果。
- 如果异步任务有返回值,你可以通过
示例代码
为了更好地理解上述概念,这里有一个完整的示例代码,演示如何使用 std::packaged_task
和 std::future
:
#include <iostream>
#include <future>
#include <vector>
#include <thread>// 示例函数:处理数据
std::string process_data(std::vector<char>* data, int multiplier) {std::string result;for (char c : *data) {result += c * multiplier;}return result;
}int main() {// 创建一个包含一些数据的vectorstd::vector<char> data = {'a', 'b', 'c'};// 创建一个packaged_task,包装process_data函数std::packaged_task<std::string(std::vector<char>*, int)> task(process_data);// 获取futurestd::future<std::string> result_future = task.get_future();// 将packaged_task移动到新线程中执行std::thread t(std::move(task), &data, 2);// 在主线程中进行其他工作...std::cout << "进行其他工作..." << std::endl;// 等待任务完成并获取结果std::string result = result_future.get(); // 如果任务未完成,这里会阻塞直到任务完成std::cout << "处理后的数据: " << result << std::endl;// 不要忘记join线程t.join();return 0;
}
关键点总结
std::packaged_task
: 包装了一个可调用对象,并提供了一种方式来异步执行该对象并获取其结果。std::future
: 通过get_future()
方法获取,用于在稍后的时间点获取异步任务的结果。- 异步执行: 使用
std::thread
或其他并发机制异步执行任务,而不阻塞主线程。 - 等待结果: 通过
future.get()
等待任务完成并获取其结果。
通过这种方式,您可以有效地管理异步任务及其结果,使程序更加灵活和高效。
线程间传递任务
很多图形架构需要特定的线程去更新界面,所以当线程对界面更新时,需要发出一条信息给正确的线程,让相应的线程来做界面更新。std::packaged_task
提供了这种功能,且不需要发送一条自定义信息给图形界面线程。
代码4.9 使用std::packaged_task执行一个图形界面线程
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>std::mutex m;
std::deque<std::packaged_task<void()> > tasks;bool gui_shutdown_message_received();
void get_and_process_gui_message();void gui_thread() // 1
{while(!gui_shutdown_message_received()) // 2{get_and_process_gui_message(); // 3std::packaged_task<void()> task;{std::lock_guard<std::mutex> lk(m);if(tasks.empty()) // 4continue;task = std::move(tasks.front()); // 5tasks.pop_front();}task(); // 6}
}std::thread gui_bg_thread(gui_thread);template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{std::packaged_task<void()> task(f); // 7std::future<void> res = task.get_future(); // 8std::lock_guard<std::mutex> lk(m);tasks.push_back(std::move(task)); // 9return res; // 10
}
代码十分简单:
- 图形界面线程①循环直到收到一条关闭图形界面的信息后关闭界面②。
- 关闭界面前,进行轮询界面消息处理③,例如用户点击和执行在队列中的任务。
- 当队列中没有任务④时,循环将继续。
- 除非能在队列中提取出一个任务⑤,释放队列上的锁,并且执行任务⑥。
- 这里
future
与任务相关,当任务执行完时,其状态会置为“就绪”。
将任务传入队列:
- 提供的函数⑦可以提供一个打包好的任务。
- 通过这个任务⑧调用
get_future()
成员函数获取future
对象。 - 并且在任务推入列表⑨之前,
future
将返回调用函数⑩。
例子中使用std::packaged_task<void()>
创建任务,其中包含了一个无参数无返回值的函数或可调用对象(如果当这个调用有返回值时,返回值会被丢弃)。这可能是最简单的任务,std::packaged_task
也可以用于复杂的情况——通过指定不同的函数签名作为模板参数,不仅可以改变其返回类型(因此该类型的数据会存在期望相关的状态中),也可以改变函数操作符的参数类型。这个例子可以简单的扩展成允许任务运行在图形界面线程上,并且接受传参,还可以通过std::future
获取返回值。
这些任务能作为简单的函数调用来表达吗?还有,任务的结果能从很多地方得到吗?这些问题可以使用第三种方法创建future
来解决:使用std::promise
对值进行显示设置。
4.2.3 使用std::promise
当需要处理很多网络连接时,会使用不同线程尝试连接每个接口,能使网络尽早联通。不幸的是,随着连接数量的增长,这种方式变得越来越不合适。因为大量的线程会消耗大量的系统资源,还有可能造成线程上下文频繁切换(当线程数量超出硬件可接受的并发数时),这都会对性能产生影响。最极端的例子:线程会将系统资源消耗殆尽,系统连接网络的能力会变得极差。因此通过少数线程处理网络连接,每个线程同时处理多个连接,对需要处理大量网络连接的应用而言,这是一种比较普遍的做法。
当线程处理多个连接事件,来自不同的端口连接的数据包基本上以乱序方式进行处理。同样的,数据包也将以乱序的方式进入队列。很多情况下,一些应用不是等待数据成功的发送,就是等待(新的)指定网络接口数据的接收成功。
std::promise<T>
提供设定值的方式(类型为T
),这个类型会和后面看到的std::future<T>
对象相关联。std::promise/std::future
对提供一种机制:future
可以阻塞等待线程,提供数据的线程可以使用promise
对相关值进行设置,并将future
的状态置为“就绪”。
可以通过给定的std::promise
的get_future()
成员函数来获取与之相关的std::future
对象,与std::packaged_task
的用法类似。当promise
设置完毕(使用set_value()
成员函数)时,对应的future
状态就变为“就绪”,并且可用于检索已存储的值。当设置值之前销毁std::promise
,将会存储一个异常。在4.2.4节中,会详细描述异常是如何传送到线程的。
代码4.10 使用promise解决单线程多连接问题
#include <future>void process_connections(connection_set& connections)
{while(!done(connections)) // 1{for(connection_iterator // 2connection=connections.begin(), end=connections.end();connection!=end;++connection){if(connection->has_incoming_data()) // 3{data_packet data=connection->incoming();std::promise<payload_type>& p=connection->get_promise(data.id); // 4p.set_value(data.payload);}if(connection->has_outgoing_data()) // 5{outgoing_packet data=connection->top_of_outgoing_queue();connection->send(data.payload);data.promise.set_value(true); // 6}}}
}
process_connections()
中(直到done()
返回true
①为止),每一次循环都会依次检查每个连接②,检索是否有数据③或正在发送已入队的传出数据⑤。假设输入数据包是具有ID和有效负载的(有实际的数据在其中),一个ID映射到一个std::promise
(可能是在相关容器中进行的依次查找)④,并且值是在包的有效负载中。传出包是在传出队列中检索,从接口直接发送出去。当发送完成,传出数据相关的promise
将置为true
,来表明传输成功⑥。是否能映射到实际网络协议上,取决于所用协议。
上面的代码不理会异常,一切工作都会很好地执行,但有悖常理。有时候磁盘满载,有时候会找不到东西,有时候网络会断,还有时候数据库会崩溃。当需要某个操作的结果时,就需要在对应的线程上执行这个操作,因为代码可以通过异常来报告错误。不过,这会对使用std::packaged_task
或std::promise
带来一些不必要的限制。因此,C++标准库提供了一种在以上情况下清理异常的方法,并且允许将异常存储为相关结果的一部分。
希望这些解释能帮助你更好地理解和使用条件变量和future
!
std::promise
、std::packaged_task
和 std::async
都是 C++ 标准库提供的用于处理异步操作和线程间通信的工具,但它们的设计目标和使用场景有所不同。以下是它们的主要区别:
1. std::promise
用途:
std::promise
主要用于在线程之间传递结果或异常。- 它允许一个线程设置某个值或异常,并让另一个线程通过关联的
std::future
对象获取这个值或异常。
特点:
- 手动控制:你需要显式地调用
set_value()
或set_exception()
来设置结果或异常。 - 灵活性:可以在任何地方设置结果,不局限于函数或可调用对象。
- 复杂性:相比其他方法,可能需要更多的代码来管理同步逻辑。
示例:
#include <iostream>
#include <thread>
#include <future>void modifyValue(std::promise<int>&& prom, int value) {if (value >= 0) {prom.set_value(value * 2); // 设置成功的结果} else {prom.set_exception(std::make_exception_ptr(std::invalid_argument("Negative value"))); // 设置异常}
}int main() {std::promise<int> prom;std::future<int> fut = prom.get_future();std::thread t(modifyValue, std::move(prom), -5);try {int result = fut.get(); // 获取结果或异常std::cout << "Result: " << result << std::endl;} catch (const std::exception& e) {std::cerr << "Exception caught: " << e.what() << std::endl;}t.join();return 0;
}
2. std::packaged_task
用途:
std::packaged_task
是一个包装可调用对象(如函数、lambda表达式等)的类模板,它将任务与std::future
关联起来。- 它允许你在一个线程中执行任务,并在另一个线程中获取任务的结果。
特点:
- 封装函数:主要用于封装函数或可调用对象,并将其作为任务执行。
- 自动管理:任务完成后,结果会自动存储在关联的
std::future
中。 - 移动语义:
std::packaged_task
只支持移动语义,不支持拷贝。
示例:
#include <iostream>
#include <future>
#include <thread>int exampleFunction(int a, int b) {return a + b;
}int main() {std::packaged_task<int(int, int)> task(exampleFunction);std::future<int> result = task.get_future();std::thread t(std::move(task), 5, 3); // 使用std::moveint returnValue = result.get(); // 获取结果std::cout << "Result from packaged_task: " << returnValue << std::endl;t.join();return 0;
}
3. std::async
用途:
std::async
是一种更高层次的抽象,它可以自动启动一个新的线程(或在池中重用现有线程),并返回一个std::future
对象,通过该对象可以获取任务的结果。- 它非常适合于快速启动异步任务而不必关心底层线程管理。
特点:
- 简单易用:只需一行代码即可启动异步任务。
- 自动管理:自动管理线程的创建和销毁,简化了代码。
- 两种模式:可以选择是否立即启动新线程(
std::launch::async
或std::launch::deferred
)。
示例:
#include <iostream>
#include <future>int exampleFunction(int a, int b) {return a + b;
}int main() {std::future<int> result = std::async(std::launch::async, exampleFunction, 5, 3);int returnValue = result.get(); // 获取结果std::cout << "Result from async: " << returnValue << std::endl;return 0;
}
区别总结
特性/工具 | std::promise | std::packaged_task | std::async |
---|---|---|---|
主要用途 | 在线程间传递结果或异常 | 封装函数或可调用对象并异步执行 | 快速启动异步任务 |
手动控制 | 需要显式调用 set_value() 或 set_exception() | 自动管理结果存储 | 自动管理任务启动和结果存储 |
封装能力 | 不限于函数或可调用对象 | 专门用于封装函数或可调用对象 | 直接封装函数或可调用对象 |
线程管理 | 需要手动管理线程 | 需要手动管理线程 | 自动管理线程 |
使用复杂度 | 较高(需要更多代码管理同步逻辑) | 中等(需要管理线程) | 较低(简洁且易于使用) |
选择哪种工具取决于你的具体需求:
- 如果你需要灵活地在线程间传递结果或异常,并且希望对同步逻辑有更多的控制,可以选择
std::promise
。 - 如果你有一个具体的函数或可调用对象需要异步执行,并希望自动管理结果存储,可以选择
std::packaged_task
。 - 如果你希望快速启动一个异步任务而不必关心底层线程管理细节,可以选择
std::async
。
异常处理与Future
在C++中,当你希望将异常存储在std::future
对象中以便后续处理时,可以通过多种方式实现。例如,当你调用一个可能会抛出异常的异步函数如square_root(-1)
,你可以使用std::async
来启动这个函数的异步执行,并通过std::future
获取结果或异常。
std::future<double> f = std::async(square_root, -1);
double y = f.get(); // 如果square_root抛出异常,这里会重新抛出
当square_root
抛出异常时,该异常会被存储在future
中,直到你调用get()
方法时才被重新抛出。类似地,使用std::promise
和std::packaged_task
也能达到相同效果。对于std::promise
,你可以通过捕获异常并使用set_exception()
将其存储:
extern std::promise<double> some_promise;
try {some_promise.set_value(calculate_value());
} catch (...) {some_promise.set_exception(std::current_exception());
}
多线程等待与std::shared_future
尽管std::future
提供了基本的异步操作支持,但它不支持多个线程同时等待同一个事件的结果。为了解决这个问题,C++引入了std::shared_future
。它允许多个线程安全地共享同一个期望值(future),从而避免数据竞争问题。
从std::future
到std::shared_future
的转换可以通过移动语义完成:
std::promise<int> p;
std::future<int> fut = p.get_future();
std::shared_future<int> shf(std::move(fut));
或者直接从std::promise
构造std::shared_future
:
std::promise<std::string> p;
std::shared_future<std::string> sf(p.get_future()); // 隐式转移所有权
使用std::shared_future
的一个典型场景是在并行计算中,例如电子表格中的单元格依赖关系计算。每个单元格可能依赖于其他单元格的计算结果,而这些计算可以并行进行。std::shared_future
使得这种依赖管理更加高效且易于实现。
此外,为了限制等待时间,C++提供了带有超时机制的等待函数,这在需要确保特定代码段在限定时间内完成执行时非常有用。
通过上述机制,C++有效地支持了复杂的并发编程需求,使得开发者能够更轻松地编写高效、可靠的多线程应用程序。
4.3 限时等待
在多线程编程中,阻塞调用通常用于等待某个事件的发生。然而,在某些情况下,需要限制线程的等待时间,例如发送“我还存活”的信号给用户或其他进程,或允许用户取消等待操作。C++提供了两种指定超时的方式:基于时间段(duration)和基于时间点(time point)。多数等待函数支持这两种方式,并分别以_for
和_until
作为后缀。
4.3.1 时钟
时钟是C++标准库中的时间信息源,提供以下四种信息:
- 当前时间:通过静态成员函数
now()
获取。 - 时间类型:特定的时间点可以通过
time_point
来表示。 - 时钟节拍:定义了时钟每秒的节拍数。
- 稳定时钟:当时钟节拍均匀分布且不可修改时称为稳定时钟。
C++标准库提供的时钟包括:
std::chrono::system_clock
:系统时间,“实际时间”。std::chrono::steady_clock
:稳定时钟,适合计算超时。std::chrono::high_resolution_clock
:具有最高精度的时钟。
4.3.2 时间段
时间段可以通过std::chrono::duration<>
模板处理。它接受两个模板参数:表示单位时间的类型(如int
、double
),以及每个单元所用秒数(如std::ratio<60, 1>
表示分钟)。标准库预定义了一些时间段类型,如nanoseconds
、microseconds
、milliseconds
等。
示例代码:
using namespace std::chrono_literals;
auto one_day = 24h;
auto half_an_hour = 30min;
auto max_time_between_messages = 30ms;
4.3.3 时间点
时间点由std::chrono::time_point<>
表示,第一个参数指定使用的时钟,第二个参数表示时间单位。时间点可以进行加减运算,从而获得新的时间点。对于计时非常有用,例如:
auto start = std::chrono::high_resolution_clock::now();
do_something();
auto stop = std::chrono::high_resolution_clock::now();
std::cout << "do_something() took "<< std::chrono::duration<double, std::chrono::seconds>(stop - start).count()<< " seconds" << std::endl;
4.3.4 使用超时
超时机制可以应用于多种场景,包括线程休眠、条件变量等待、互斥锁获取和future等待等。以下是几种常见用法:
-
线程休眠:使用
std::this_thread::sleep_for()
和std::this_thread::sleep_until()
。示例代码:
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_until(std::chrono::system_clock::now() + std::chrono::hours(1));
-
条件变量等待:使用
wait_for()
和wait_until()
。示例代码:
auto const timeout = std::chrono::steady_clock::now() + std::chrono::milliseconds(500); std::unique_lock<std::mutex> lk(m); if (cv.wait_until(lk, timeout) == std::cv_status::timeout) {// 处理超时逻辑 }
-
互斥锁获取:使用
try_lock_for()
和try_lock_until()
。示例代码:
std::timed_mutex mtx; if (mtx.try_lock_for(std::chrono::milliseconds(500))) {// 成功获取锁mtx.unlock(); }
-
Future等待:使用
wait_for()
和wait_until()
。示例代码:
std::future<int> f = std::async(some_task); if (f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready) {do_something_with(f.get()); }
通过上述机制,C++标准库提供了丰富的工具来实现限时等待,使得开发者能够更灵活地控制并发程序的行为。这些功能不仅提高了程序的响应性和用户体验,还增强了系统的可靠性和效率。