上篇文章:Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)-CSDN博客
本篇代码仓库:
支持处理简单任务的生产者消费者模型代码
生产者-消费者-保存者三线程两队列模型
多生产多消费的生产者消费者模型
进一步使用生产者消费者模型不需要修改Queue.hpp
#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class BlockQueue
{
public:BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}// 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}private:std::queue<T> _queue; // 阻塞队列size_t _maxnum; // 队列最大容量pthread_mutex_t _mtx; // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};
目录
一. 生存消费模型处理简单任务
1.1 需求分析
1.2 Task.hpp
1.3 Main.cpp
1.4 测试
二. 生产者-消费者-保存者-三线程两队列
2.1 Task.hpp
2.2 Main.cpp
2.3 测试
三. 多生产多消费模型
四. 多生产多消费的意义
一. 生存消费模型处理简单任务
1.1 需求分析
上篇文章中,我们使用了生存者消费者模型实现了生产者产生随机数据放入BlockQueue,而消费者从BlockQueue中拿取数据输出。
现在想让生产者消费者模型处理一个简单的任务。比如:生产者产生两个随机数据和运算符,而消费者拿取数据获取计算结果并输出。
此时就需要一个Task.hpp来构造任务。
1.2 Task.hpp
创建一个结构体,内部包含计算数据,计算操作符,以及()重载
代码框架如下:
#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:
private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};
具体实现如下:需要增加构造函数,析构函数,重载(),回调函数等。
#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}~CalTask() {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};// 计算函数
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}
1.3 Main.cpp
根据分析,生产者生产数据然后交给消费者计算并输出。
#include <iostream>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *bq = static_cast<BlockQueue<CalTask> *>(args);while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask t(x, y, op, my_math);bq->push(t);usleep(500000);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *bq = static_cast<BlockQueue<CalTask> *>(args);while (true){CalTask t;bq->pop(&t);std::cout << "消费者获取数据并计算:" << t() << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义生产消费线程与阻塞队列pthread_t p;pthread_t c;BlockQueue<CalTask> *bq = new BlockQueue<CalTask>();pthread_create(&p, nullptr, producer, (void *)bq);pthread_create(&c, nullptr, consumer, (void *)bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;bq = nullptr;return 0;
}
1.4 测试
测试结果如下:
可以看到,通过生产者消费者模型。消费者成功的获取了来自生产传递的数据,并且计算这个结果。
如果有需要的话,可以将更复杂的任务交给生产者消费者模型。比如网络/本地的IO操作,时间复杂度较高的运算。
二. 生产者-消费者-保存者-三线程两队列
能否实现一个生产者生产数据交给消费者处理,消费者处理完成之后再交给保存者将结果保存在文件中?
生产线程生产任务传递给任务队列
消费线程从任务队列读取任务,消费完成任务之后将结果传递给保存队列
保存线程从保存队列读取数据并写入文件中
同时需要保证生产者消费者之间的同步,消费者与保存者之间的同步。
2.1 Task.hpp
需要给保存者一个任务用于保存,以解耦主函数和任务处理。
#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}~CalTask() {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};class SaveTask
{typedef void (*func_t)(const std::string &);public:SaveTask(std::string _result = "", func_t func = Save): _callback(func) {}void operator()(){_callback(_result);}private:std::string _result;func_t _callback;
};// 计算函数
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}void Save(const std::string &result)
{const std::string task_pwd = "./result.txt";FILE *fp = fopen(task_pwd.data(), "a"); // 需要追加写入if (fp == nullptr){std::cerr << "fopen error" << std::endl;}fputs(result.c_str(), fp);fclose(fp);fp = nullptr;
}
2.2 Main.cpp
为了能够让消费者同时访问两个队列,需要一个结构体能够存储两个队列。
在BlockQueue.hpp中新增下面的代码即可
template <class C, class S>
struct BlockQueues
{BlockQueue<C> *_c_bq;BlockQueue<S> *_s_bq;
};
主函数中需要新增一个保存者函数,用于保存者线程拿取数据并保存于文件中。同时消费者也需要新增一段将数据传递给保存者的逻辑 。
#include <iostream>
#include <memory>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *cal_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_c_bq;while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask ct(x, y, op, my_math);cal_bq->push(ct);sleep(1);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 生产消费阻塞队列,消费保存阻塞队列BlockQueue<CalTask> *cal_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_c_bq;BlockQueue<SaveTask> *save_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_s_bq;while (true){// 获取任务计算CalTask ct;cal_bq->pop(&ct);std::string result = ct();std::cout << "消费者获取数据并计算:" << result << std::endl;// 将任务传递给保存者SaveTask st(result, Save);std::cout << "消费者获取传递计算结果给保存者:" << result << std::endl;save_bq->push(st);}return nullptr;
}void *saver(void *args)
{// 获取交易场所 - 消费保存阻塞队列BlockQueue<SaveTask> *save_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_s_bq;while (true){SaveTask st;save_bq->pop(&st);st();std::cout << "保存者成功读取数据并保存在文件中:" << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义消费生产保存线程,与两个阻塞队列BlockQueues<CalTask, SaveTask> *bqs = new BlockQueues<CalTask, SaveTask>();bqs->_c_bq = new BlockQueue<CalTask>;bqs->_s_bq = new BlockQueue<SaveTask>;pthread_t p;pthread_t c;pthread_t s;pthread_create(&p, nullptr, producer, (void *)bqs);pthread_create(&c, nullptr, consumer, (void *)bqs);pthread_create(&s, nullptr, saver, (void *)bqs);pthread_join(p, nullptr);pthread_join(c, nullptr);pthread_join(s, nullptr);delete bqs->_c_bq;delete bqs->_s_bq;return 0;
}
2.3 测试
三. 多生产多消费模型
只需要更改主函数即可实现多生产多消费模型
int main()
{srand((unsigned int)time(0) ^ getpid());// 建立任务队列和保存队列BlockQueues<CalTask, SaveTask> *bqs = new BlockQueues<CalTask, SaveTask>;bqs->_c_bq = new BlockQueue<CalTask>;bqs->_s_bq = new BlockQueue<SaveTask>;pthread_t c[3], p[2], s;pthread_create(c, nullptr, consumer, (void *)bqs);pthread_create(c + 1, nullptr, consumer, (void *)bqs);pthread_create(c + 2, nullptr, consumer, (void *)bqs);pthread_create(p, nullptr, producer, (void *)bqs);pthread_create(p + 1, nullptr, producer, (void *)bqs);pthread_create(&s, nullptr, saver, (void *)bqs);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(s, nullptr);delete bqs->_c_bq;delete bqs->_s_bq;return 0;
}
运行结果如下:
四. 多生产多消费的意义
即便有多个生产者,多个消费者。但是锁只有一个,所以同一时刻只能有一个线程在临界区中执行代码。那么多生产多消费有什么意义?
1 当生产者A将数据传递给队列后,产生新的数据非常耗时间,此时其他生产者可以获取锁并投放数据,而生产者A可以同时产生数据
2 当消费者拿到一个数据进行消费的时候,其他消费者仍可以从队列中拿取新数据进行消费。而不需要等待消费者A消费完了其他消费者才去消费
3 即可以让生产者线程生产之前,消费者线程消费之后。让线程并发执行(而不是提高存取,拿取数据的效率)