C++笔记之不同buffer数量下的生产者-消费者机制
文章目录
- C++笔记之不同buffer数量下的生产者-消费者机制
- 0.在不同的缓冲区数量下,生产者-消费者机制的实现方式和行为的区别
- 1.最简单的生产者-消费者实现:抄自 https://mp.weixin.qq.com/s/G1lHNcbYU1lUlfugXnNhZg
- 2.1个生产者,1个消费者,操作1个buffer,使用环形队列
- 3.3个生产者,3个消费者,操作1个buffer
- 4.3个生产者,3个消费者,操作2个buffer,双buffer中存在生产者和消费者交替操作同一个buffer逻辑
- 5.1个生产者,1个消费者,操作3个buffer
- 6.n个生产者,n个消费者,操作n个buffer
- 7.在生产者-消费者中使用信号量
0.在不同的缓冲区数量下,生产者-消费者机制的实现方式和行为的区别
-
单个缓冲区:
- 在单个缓冲区的情况下,生产者和消费者共享一个有限大小的缓冲区,生产者将数据放入缓冲区,而消费者从中取出数据。
- 如果缓冲区已满,生产者必须等待,直到有空间可用。如果缓冲区为空,消费者必须等待,直到有数据可用。
- 这种情况下,通常需要使用互斥锁来保护共享缓冲区,以确保多个线程或进程不会同时访问缓冲区,从而避免竞态条件。
-
多个缓冲区:
- 在多个缓冲区的情况下,通常有多个生产者和多个消费者,每个生产者-消费者对共享一个缓冲区。这些缓冲区可以是独立的,也可以按照某种策略连接在一起。
- 这种情况下,不同的生产者可以同时往不同的缓冲区放置数据,不同的消费者可以同时从不同的缓冲区取出数据,从而提高了并发性能。
- 但是,管理多个缓冲区可能需要更复杂的同步和管理机制,以确保生产者和消费者之间的正确协作。
总的来说,单个缓冲区和多个缓冲区的主要区别在于资源的共享方式和并发性能。单个缓冲区通常更简单,但并发性能可能较低,因为所有的生产者和消费者都要竞争同一个缓冲区。多个缓冲区可以提高并发性能,但需要更复杂的管理和同步机制来维护多个缓冲区的状态。选择哪种方式取决于具体的应用需求和性能要求。
下面将比较这三种情况:一个缓冲区、两个缓冲区和三个缓冲区下的生产者-消费者机制的实现。
-
一个缓冲区:
- 在这种情况下,你只需要一个共享的缓冲区,可以使用一个队列(如std::queue)来表示。
- 生产者将数据放入队列的尾部,消费者从队列的头部取出数据。
- 你需要使用互斥锁来保护队列,以确保生产者和消费者不会同时访问它。
- 生产者在队列满时会阻塞,消费者在队列为空时会阻塞,可以使用条件变量来实现这种等待。
-
两个缓冲区:
- 在这种情况下,你可以使用两个独立的缓冲区,每个缓冲区都有自己的队列。
- 生产者可以交替将数据放入不同的队列,消费者也可以交替从不同的队列取出数据。
- 这种方式可以提高并发性能,因为生产者和消费者可以并行操作不同的缓冲区。
- 你需要使用互斥锁和条件变量来保护每个队列。
-
三个缓冲区:
- 在这种情况下,你可以类似地使用三个独立的缓冲区,每个缓冲区有自己的队列。
- 生产者和消费者之间的协作方式和两个缓冲区的情况类似,只是有更多的缓冲区可供使用。
- 这可以进一步提高并发性能,但需要更复杂的管理和同步机制。
1.最简单的生产者-消费者实现:抄自 https://mp.weixin.qq.com/s/G1lHNcbYU1lUlfugXnNhZg
在 C++ 中可以使用 std::condition_variable 来实现生产者和消费者模式:生产者在缓冲区未满时不断添加数据,并唤醒消费者进行数据读取;消费者在缓存区为空时阻塞等待生产者的唤醒,并在读取数据后唤醒等待的生产者可以继续添加数据。
运行
代码
#include "condition_variable"
#include "iostream"
#include "queue"
#include "thread"
#include <mutex>using namespace std;class ProducerAndConsumerDemo {public:void producerNumber(); // 生产数据void consumerNumber(); // 消费数据private:const int dataSize = 1000; // 总数据量queue<int> buffer; // 数据缓存区const int bufferSize = 10; // 缓存区大小condition_variable bufferNotEmpty; // 信号量--缓存区有数据了condition_variable bufferNotFull; // 信号量--缓存区不满了mutex m_mutex; // 互斥量
};void ProducerAndConsumerDemo::producerNumber() {for (int i = 0; i < dataSize; ++i) {{unique_lock<mutex> locker(m_mutex);bufferNotFull.wait(locker, [&]() { return buffer.size() < bufferSize; }); // 缓存区满了则阻塞buffer.push(i);cout << "生产者---生产了数字:" << i << ",当前 bufferSize:" << buffer.size() << endl;} // 解锁互斥量bufferNotEmpty.notify_one();this_thread::sleep_for(chrono::milliseconds(1000)); // 模拟生产耗时}
}void ProducerAndConsumerDemo::consumerNumber() {while (true) {{unique_lock<mutex> locker(m_mutex);bufferNotEmpty.wait(locker, [&]() { return buffer.size() > 0; }); // 缓冲区为空则阻塞int i = buffer.front();buffer.pop();cout << "消费者---消费了数字:" << i << ",当前 bufferSize:" << buffer.size() << endl;} // 解锁互斥量bufferNotFull.notify_one();this_thread::sleep_for(chrono::milliseconds(2000)); // 模拟消费耗时}
}int main() {ProducerAndConsumerDemo pcDemo;thread consumerThread(&ProducerAndConsumerDemo::producerNumber, &pcDemo);thread producerThread(&ProducerAndConsumerDemo::consumerNumber, &pcDemo);producerThread.join();consumerThread.join();system("pause");return 0;
}
2.1个生产者,1个消费者,操作1个buffer,使用环形队列
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>const int bufferSize = 5; // 缓冲区大小class CircularQueue {public:CircularQueue() : buffer(bufferSize), front(0), rear(0), count(0) {}// 生产者线程使用的enqueue函数,将数据添加到队列void enqueue(int item) {std::unique_lock<std::mutex> lock(mutex);// 检查队列是否已满if (count < bufferSize) {buffer[rear] = item;rear = (rear + 1) % bufferSize;count++;std::cout << "Produced: " << item << std::endl;// 通知等待中的消费者线程有新数据可用cv.notify_all();}}// 消费者线程使用的dequeue函数,从队列中取出数据int dequeue() {std::unique_lock<std::mutex> lock(mutex);// 如果队列为空,等待生产者生产数据while (count == 0) {cv.wait(lock);}// 从队列中取出数据int item = buffer[front];front = (front + 1) % bufferSize;count--;std::cout << "Consumed: " << item << std::endl;return item;}private:std::vector<int> buffer; // 缓冲区,用于存储数据int front; // 队列前部索引int rear; // 队列后部索引int count; // 当前队列中的元素数量std::mutex mutex; // 用于线程同步的互斥锁std::condition_variable cv; // 条件变量,用于线程等待和通知
};// 生产者线程函数,负责向队列中添加数据
void producer(CircularQueue &queue) {for (int i = 1; i <= 10; ++i) {queue.enqueue(i);// 模拟生产耗时std::this_thread::sleep_for(std::chrono::milliseconds(200));}
}// 消费者线程函数,负责从队列中取出数据
void consumer(CircularQueue &queue) {for (int i = 0; i < 10; ++i) {int item = queue.dequeue();// 模拟消费耗时std::this_thread::sleep_for(std::chrono::milliseconds(300));}
}int main() {CircularQueue queue;// 创建生产者线程和消费者线程std::thread producerThread(producer, std::ref(queue));std::thread consumerThread(consumer, std::ref(queue));// 等待线程结束producerThread.join();consumerThread.join();return 0;
}
3.3个生产者,3个消费者,操作1个buffer
这个示例中,有3个生产者线程和3个消费者线程,它们并行地生产和消费元素,使用互斥锁保护共享的队列,并使用条件变量来通知生产者和消费者缓冲区的状态。生产者线程生成随机整数并将其放入缓冲区,而消费者线程从缓冲区中取出元素并将其打印出来。每个生产者和消费者线程各执行5次操作。
运行
代码
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>const int buffer_size = 3; // 缓冲区大小std::queue<int> buffer; // 环形队列
std::mutex mtx; // 互斥锁,用于保护缓冲区的访问
std::condition_variable not_full; // 条件变量,表示缓冲区不满
std::condition_variable not_empty; // 条件变量,表示缓冲区不空void producer(int id) {for (int i = 0; i < 5; ++i) {std::unique_lock<std::mutex> lock(mtx);while (buffer.size() >= buffer_size) {not_full.wait(lock); // 如果缓冲区已满,等待直到不满}int item = rand() % 100;buffer.push(item);std::cout << "Producer " << id << " produced: " << item << std::endl;not_empty.notify_all(); // 通知消费者缓冲区不空lock.unlock();std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产过程}
}void consumer(int id) {for (int i = 0; i < 5; ++i) {std::unique_lock<std::mutex> lock(mtx);while (buffer.empty()) {not_empty.wait(lock); // 如果缓冲区为空,等待直到不空}int item = buffer.front();buffer.pop();std::cout << "Consumer " << id << " consumed: " << item << std::endl;not_full.notify_all(); // 通知生产者缓冲区不满lock.unlock();std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费过程}
}int main() {std::vector<std::thread> producers;std::vector<std::thread> consumers;for (int i = 0; i < 3; ++i) {producers.emplace_back(producer, i);consumers.emplace_back(consumer, i);}for (auto& producer_thread : producers) {producer_thread.join();}for (auto& consumer_thread : consumers) {consumer_thread.join();}return 0;
}
4.3个生产者,3个消费者,操作2个buffer,双buffer中存在生产者和消费者交替操作同一个buffer逻辑
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>const int BUFFER_SIZE = 5; // 缓冲区大小std::mutex mtx; // 互斥锁,用于保护共享资源
std::condition_variable producer_cv, consumer_cv; // 条件变量,用于线程同步
std::vector<int> buffer1, buffer2; // 两个缓冲区,用于生产者和消费者之间的数据交换
int current_buffer = 1; // 标志,表示当前使用哪个缓冲区// 生产者函数
void producer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产时间std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁,保护共享资源if ((current_buffer == 1 && buffer1.size() >= BUFFER_SIZE) ||(current_buffer == 2 && buffer2.size() >= BUFFER_SIZE)) {// 缓冲区已满,生产者等待std::cout << "Buffer " << current_buffer << " 已满,生产者等待...\n";producer_cv.wait(lock, [] { return (current_buffer == 1 && buffer1.size() < BUFFER_SIZE) ||(current_buffer == 2 && buffer2.size() < BUFFER_SIZE); });}int item = i;if (current_buffer == 1) {buffer1.push_back(item);std::cout << "生产到缓冲区 1: " << item << std::endl;} else {buffer2.push_back(item);std::cout << "生产到缓冲区 2: " << item << std::endl;}lock.unlock(); // 释放互斥锁consumer_cv.notify_one(); // 通知一个等待的消费者线程// 切换使用的缓冲区current_buffer = (current_buffer == 1) ? 2 : 1;}
}// 消费者函数
void consumer() {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200)); // 模拟消费时间std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁,保护共享资源if ((current_buffer == 1 && buffer1.empty()) ||(current_buffer == 2 && buffer2.empty())) {// 缓冲区为空,消费者等待std::cout << "消费者在用Buffer " << current_buffer << ",Buffer " << current_buffer << " 为空,消费者等待...\n";consumer_cv.wait(lock, [] { return (current_buffer == 1 && !buffer1.empty()) ||(current_buffer == 2 && !buffer2.empty()); });}int item;if (current_buffer == 1) {item = buffer1.front();buffer1.erase(buffer1.begin());std::cout << "从缓冲区 1 消费: " << item << std::endl;} else {item = buffer2.front();buffer2.erase(buffer2.begin());std::cout << "从缓冲区 2 消费: " << item << std::endl;}lock.unlock(); // 释放互斥锁producer_cv.notify_one(); // 通知一个等待的生产者线程// 切换使用的缓冲区current_buffer = (current_buffer == 1) ? 2 : 1;}
}int main() {std::thread producer_thread(producer); // 创建生产者线程std::thread consumer_thread(consumer); // 创建消费者线程producer_thread.join(); // 等待生产者线程结束consumer_thread.join(); // 等待消费者线程结束return 0;
}
5.1个生产者,1个消费者,操作3个buffer
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>const int BUFFER_COUNT = 3;std::mutex mtx;
std::condition_variable cv_producer, cv_consumer;std::vector<std::queue<int>> buffers(BUFFER_COUNT);int current_buffer = 0;void producer() {for (int i = 1; i <= 10; ++i) {std::unique_lock<std::mutex> lock(mtx);// 检查当前缓冲区是否已满while (!buffers[current_buffer].empty()) {cv_producer.wait(lock);}buffers[current_buffer].push(i);std::cout << "生产者生产了: " << i << " 到缓冲区: " << current_buffer << std::endl;current_buffer = (current_buffer + 1) % BUFFER_COUNT;// 唤醒消费者cv_consumer.notify_all();}
}void consumer() {for (int i = 1; i <= 10; ++i) {std::unique_lock<std::mutex> lock(mtx);// 检查当前缓冲区是否为空while (buffers[current_buffer].empty()) {cv_consumer.wait(lock);}int item = buffers[current_buffer].front();buffers[current_buffer].pop();std::cout << "消费者消费了: " << item << " 从缓冲区: " << current_buffer << std::endl;current_buffer = (current_buffer + 1) % BUFFER_COUNT;// 唤醒生产者cv_producer.notify_all();}
}int main() {std::thread producer_thread(producer);std::thread consumer_thread(consumer);producer_thread.join();consumer_thread.join();return 0;
}
6.n个生产者,n个消费者,操作n个buffer
运行:偶尔会core dump
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>const int num_buffers = 3; // 缓冲区的数量
const int buffer_size = 5; // 每个缓冲区的大小
const int producers_and_consumers_num = 3; // 生产者和消费者的数量std::vector<int> buffers[num_buffers]; // 多个缓冲区,每个缓冲区是一个整数向量
std::mutex buffer_mutexes[num_buffers]; // 与每个缓冲区相关联的互斥锁
std::condition_variable buffer_cv[num_buffers]; // 与每个缓冲区相关联的条件变量// 生产者线程函数
void producer(int id) {for (int i = 0; i < 10; ++i) {// 查找具有最多可用空间的缓冲区int max_space = -1;int selected_buffer = -1;for (int j = 0; j < num_buffers; ++j) {std::unique_lock<std::mutex> lock(buffer_mutexes[j]);int space = buffer_size - buffers[j].size();if (space > max_space) {max_space = space;selected_buffer = j;}}// 等待直到有足够的空间可以生产while (buffers[selected_buffer].size() >= buffer_size) {std::unique_lock<std::mutex> lock(buffer_mutexes[selected_buffer]);buffer_cv[selected_buffer].wait(lock);}// 生产数据并将其放入缓冲区buffers[selected_buffer].push_back(id * 100 + i);std::cout << "生产者 " << id << " 生产了 " << id * 100 + i << " 放入缓冲区 " << selected_buffer << std::endl;// 通知等待的消费者线程buffer_cv[selected_buffer].notify_one();}
}// 消费者线程函数
void consumer(int id) {for (int i = 0; i < 10; ++i) {// 查找具有最少可用空间的缓冲区,这表示有数据等待消费int min_space = buffer_size + 1;int selected_buffer = -1;for (int j = 0; j < num_buffers; ++j) {std::unique_lock<std::mutex> lock(buffer_mutexes[j]);int space = buffer_size - buffers[j].size();if (space < min_space) {min_space = space;selected_buffer = j;}}// 等待直到有数据可以消费while (buffers[selected_buffer].empty()) {std::unique_lock<std::mutex> lock(buffer_mutexes[selected_buffer]);buffer_cv[selected_buffer].wait(lock);}// 检查缓冲区是否为空,然后再消费数据if (!buffers[selected_buffer].empty()) {int data = buffers[selected_buffer].back();buffers[selected_buffer].pop_back();std::cout << "消费者 " << id << " 消费了 " << data << " 从缓冲区 " << selected_buffer << std::endl;}// 通知等待的生产者线程buffer_cv[selected_buffer].notify_one();}
}int main() {std::thread producers[producers_and_consumers_num];std::thread consumers[producers_and_consumers_num];// 创建生产者和消费者线程for (int i = 0; i < producers_and_consumers_num; ++i) {producers[i] = std::thread(producer, i);consumers[i] = std::thread(consumer, i);}// 等待所有线程完成for (int i = 0; i < producers_and_consumers_num; ++i) {producers[i].join();consumers[i].join();}return 0;
}
7.在生产者-消费者中使用信号量
这个示例模拟了一个生产者-消费者问题,其中多个生产者线程和消费者线程共享一个有界缓冲区,信号量用于控制对缓冲区的并发访问。
在此示例中,有三个生产者线程和三个消费者线程,它们共享一个有界缓冲区。Semaphore类用于控制缓冲区的空闲和满状态。生产者线程生成随机项目并将它们放入缓冲区,然后通知消费者线程。消费者线程从缓冲区中取出项目并通知生产者线程。信号量确保缓冲区在多线程环境中得到正确的访问和同步。
这个示例有助于理解信号量在多线程环境中的应用,尤其是在生产者-消费者问题中的作用。通过信号量,可以控制多个线程之间的并发访问,以避免数据竞态和确保正确的协调。
运行
代码
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>const int BUFFER_SIZE = 5;class Semaphore {public:Semaphore(int count = 0) : count_(count) {}void notify() {std::unique_lock<std::mutex> lock(mutex_);count_++;cv_.notify_one();}void wait() {std::unique_lock<std::mutex> lock(mutex_);while (count_ == 0) {cv_.wait(lock);}count_--;}private:std::mutex mutex_;std::condition_variable cv_;int count_;
};Semaphore empty(BUFFER_SIZE); // 空缓冲区的信号量
Semaphore full(0); // 满缓冲区的信号量
std::mutex bufferMutex; // 缓冲区互斥量
std::queue<int> buffer; // 共享缓冲区void producer(int id) {for (int i = 0; i < 10; ++i) {int item = rand() % 100; // 随机生成一个项目empty.wait(); // 等待空缓冲区bufferMutex.lock(); // 锁定缓冲区buffer.push(item); // 将项目放入缓冲区std::cout << "Producer " << id << " produced: " << item << std::endl;bufferMutex.unlock(); // 解锁缓冲区full.notify(); // 通知缓冲区已满std::this_thread::sleep_for(std::chrono::milliseconds(100));}
}void consumer(int id) {for (int i = 0; i < 10; ++i) {full.wait(); // 等待满缓冲区bufferMutex.lock(); // 锁定缓冲区int item = buffer.front();buffer.pop();std::cout << "Consumer " << id << " consumed: " << item << std::endl;bufferMutex.unlock(); // 解锁缓冲区empty.notify(); // 通知缓冲区已空std::this_thread::sleep_for(std::chrono::milliseconds(250));}
}int main() {std::vector<std::thread> producers;std::vector<std::thread> consumers;for (int i = 0; i < 3; ++i) {producers.emplace_back(producer, i);consumers.emplace_back(consumer, i);}for (auto &producerThread : producers) {producerThread.join();}for (auto &consumerThread : consumers) {consumerThread.join();}return 0;
}