【Linux】系统编程生产者消费者模型(C++)

目录

【1】生产消费模型

【1.1】为何要使用生产者消费者模型

【1.2】生产者消费者模型优点

【2】基于阻塞队列的生产消费者模型

【2.1】生产消费模型打印模型

【2.2】生产消费模型计算公式模型

【2.3】生产消费模型计算公式加保存任务模型

【2.3】生产消费模型多生产多消费


【1】生产消费模型

        生产消费模型的321原则(便于记忆)

【解释】

  • 3种关系:生产者和生产者(互斥)、消费者和消费者(互斥)、生产者和消费者(互斥|[保证共享资源的安全性]|同步)。

  • 2种角色:生产者线程、消费者线程。

  • 1种交易场所:一段特定结构的缓冲区。

【1.1】为何要使用生产者消费者模型

        生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

【1.2】生产者消费者模型优点

  • 生产线程和消费线程进行解耦。

  • 支持并发。

  • 提高效率。

  • 支持生产和消费的一段时间的忙闲不均的问题。

【2】基于阻塞队列的生产消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

【2.1】生产消费模型打印模型

【makefile文件】

# 创建关联关系
cc=g++
standard=-std=c++11# 创建依赖关系
myBlockQueue:BlockQueue.cc $(cc) -o $@ $^ $(standard) -l pthread# 创建删除关系
.PHONY:clean
clean:rm -rf myBlockQueue

【BlockQueue.hpp文件】

#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:/* 构造函数 */BlockQueue(const int maxCapacity = g_maxCapacity): _maxCapacity(maxCapacity){// 初始化锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者信号量pthread_cond_init(&_pCond, nullptr);// 初始化消费者信号量pthread_cond_init(&_cCond, nullptr);}/* 析构函数 */~BlockQueue() {// 销毁锁pthread_mutex_destroy(&_mutex);// 销毁生产者信号量pthread_cond_destroy(&_pCond);// 销毁消费者信号量pthread_cond_destroy(&_cCond);}public:/* 新增任务 */void PushTask(const T &in){// 加锁pthread_mutex_lock(&_mutex);// 判断是否满// 细节二:充当条件判断的语法必须是while,不能是if  while(IsFull()) {// 如果容器满了,生产者就不能继续生产了!// 细节一:// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!pthread_cond_wait(&_pCond, &_mutex);}// 程序走到这里,容器一定是没有满的!_q.push(in);// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_cCond);   // 通知消费者已经生产了!// 解锁pthread_mutex_unlock(&_mutex);}/* 执行任务 */void PopTask(T* out) {// 加锁pthread_mutex_lock(&_mutex);// 判断是否空// 细节二:与PushTask一致while(IsEmpty()) {   // 如果容器空了,消费者就不能继续消费了!// 细节一:与PushTask一致pthread_cond_wait(&_cCond, &_mutex);}// 程序走到这里,容器一定是没有空的!*out = _q.front();_q.pop();// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_pCond);   // 通知生产者已经消费了!// 解锁pthread_mutex_unlock(&_mutex);}private:/* 判断容器满 */bool IsFull() {return _q.size() == _maxCapacity;}/* 判断容器空 */bool IsEmpty() {return _q.empty();}private:std::queue<T> _q;       // 存储容器int _maxCapacity;       // 标识存储重启最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond;  // 生产者条件变量pthread_cond_t _cCond;  // 消费者条件变量
};

【BlockQueue.cc文件】

#include "BlockQueue.hpp"/* 定义生产者线程 */
void *Producer(void *args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);// 生产者进行生产while(true) {int num = rand() % 10 + 1;bq->PushTask(num);std:: cout << "生产者在生产:" << num << std::endl;sleep(1);}
}/* 定义消费者线程 */
void *Consumer(void *args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);// 消费者进行消费while(true) {int num = 0;bq->PopTask(&num);std:: cout << "消费者在消费:" << num << std::endl;sleep(2);}
}/* 入口函数 */
int main()
{// 随机数种子srand((unsigned int)time(nullptr) ^ getpid());// 共享资源BlockQueue<int> *bqs = new BlockQueue<int>();pthread_t tP;pthread_t tC;pthread_create(&tP, nullptr, Producer, (void *)bqs);pthread_create(&tC, nullptr, Consumer, (void *)bqs);pthread_join(tP, nullptr);pthread_join(tC, nullptr);return 0;
}

【2.2】生产消费模型计算公式模型

【makefile文件】

# 创建关联关系
cc=g++
standard=-std=c++11# 创建依赖关系
myBlockQueue:BlockQueue.cc$(cc) -o $@ $^ $(standard) -l pthread# 创建删除关系
.PHONY:clean
clean:rm -rf myBlockQueue

【BlockQueue.hpp文件】

#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:/* 构造函数 */BlockQueue(const int maxCapacity = g_maxCapacity): _maxCapacity(maxCapacity){// 初始化锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者信号量pthread_cond_init(&_pCond, nullptr);// 初始化消费者信号量pthread_cond_init(&_cCond, nullptr);}/* 析构函数 */~BlockQueue() {// 销毁锁pthread_mutex_destroy(&_mutex);// 销毁生产者信号量pthread_cond_destroy(&_pCond);// 销毁消费者信号量pthread_cond_destroy(&_cCond);}public:/* 新增任务 */void PushTask(const T &in){// 加锁pthread_mutex_lock(&_mutex);// 判断是否满// 细节二:充当条件判断的语法必须是while,不能是if  while(IsFull()) {// 如果容器满了,生产者就不能继续生产了!// 细节一:// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!pthread_cond_wait(&_pCond, &_mutex);}// 程序走到这里,容器一定是没有满的!_q.push(in);// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_cCond);   // 通知消费者已经生产了!// 解锁pthread_mutex_unlock(&_mutex);}/* 执行任务 */void PopTask(T* out) {// 加锁pthread_mutex_lock(&_mutex);// 判断是否空// 细节二:与PushTask一致while(IsEmpty()) {   // 如果容器空了,消费者就不能继续消费了!// 细节一:与PushTask一致pthread_cond_wait(&_cCond, &_mutex);}// 程序走到这里,容器一定是没有空的!*out = _q.front();_q.pop();// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_pCond);   // 通知生产者已经消费了!// 解锁pthread_mutex_unlock(&_mutex);}private:/* 判断容器满 */bool IsFull() {return _q.size() == _maxCapacity;}/* 判断容器空 */bool IsEmpty() {return _q.empty();}private:std::queue<T> _q;       // 存储容器int _maxCapacity;       // 标识存储重启最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond;  // 生产者条件变量pthread_cond_t _cCond;  // 消费者条件变量
};

【Task.hpp文件】

#pragma once
#include <iostream>
#include <string>
#include <functional>/* 仿函数类 */
class Task
{
public:using func_t = std::function<int(int, int, char)>;public:/* 构造函数 */Task() {}/* 构造函数 */Task(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callBalk(func){}public:/* 仿函数 */std::string operator()(){int result = _callBalk(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);return buffer;}public:/* 返回打印公式 */std::string ToTaskString(){char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callBalk;
};/* 任务执行的种类 */
int MyCalculate(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}break;}case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}break;}default:break;}return result;
}

【BlockQueue.cc文件】

#include "BlockQueue.hpp"
#include "Task.hpp"
class Task;
int MyCalculate(int x, int y, char op);const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);// 生产者进行生产while(true) {int x = rand() % 10 + 1;int y = rand() % 10 + 1;int op = rand() % oper.size();Task t(x, y, oper[op], MyCalculate);calBq->PushTask(t);std:: cout << "生产任务:" << t.ToTaskString() << std::endl;sleep(2);}
}/* 定义消费者线程 */
void *Consumer(void *args)
{BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);// 消费者进行消费while(true) {Task t;calBq->PopTask(&t);std:: cout << "消费任务:" << t() << std::endl;sleep(1);}
}/* 入口函数 */
int main()
{// 随机数种子srand((unsigned int)time(nullptr) ^ getpid());// 共享资源BlockQueue<Task> *bqs = new BlockQueue<Task>();pthread_t tP;pthread_t tC;pthread_create(&tP, nullptr, Producer, (void *)bqs);pthread_create(&tC, nullptr, Consumer, (void *)bqs);pthread_join(tP, nullptr);pthread_join(tC, nullptr);return 0;
}

【2.3】生产消费模型计算公式加保存任务模型

【makefile文件】

# 创建关联关系
cc=g++
standard=-std=c++11# 创建依赖关系
myBlockQueue:BlockQueue.cc$(cc) -o $@ $^ $(standard) -l pthread# 创建删除关系
.PHONY:clean
clean:rm -rf myBlockQueue

【BlockQueue.hpp文件】

#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 500;
template <class T>
class BlockQueue
{
public:/* 构造函数 */BlockQueue(const int maxCapacity = g_maxCapacity): _maxCapacity(maxCapacity){// 初始化锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者信号量pthread_cond_init(&_pCond, nullptr);// 初始化消费者信号量pthread_cond_init(&_cCond, nullptr);}/* 析构函数 */~BlockQueue() {// 销毁锁pthread_mutex_destroy(&_mutex);// 销毁生产者信号量pthread_cond_destroy(&_pCond);// 销毁消费者信号量pthread_cond_destroy(&_cCond);}public:/* 新增任务 */void PushTask(const T &in){// 加锁pthread_mutex_lock(&_mutex);// 判断是否满// 细节二:充当条件判断的语法必须是while,不能是if  while(IsFull()) {// 如果容器满了,生产者就不能继续生产了!// 细节一:// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!pthread_cond_wait(&_pCond, &_mutex);}// 程序走到这里,容器一定是没有满的!_q.push(in);// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_cCond);   // 通知消费者已经生产了!// 解锁pthread_mutex_unlock(&_mutex);}/* 执行任务 */void PopTask(T* out) {// 加锁pthread_mutex_lock(&_mutex);// 判断是否空// 细节二:与PushTask一致while(IsEmpty()) {   // 如果容器空了,消费者就不能继续消费了!// 细节一:与PushTask一致pthread_cond_wait(&_cCond, &_mutex);}// 程序走到这里,容器一定是没有空的!*out = _q.front();_q.pop();// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_pCond);   // 通知生产者已经消费了!// 解锁pthread_mutex_unlock(&_mutex);}private:/* 判断容器满 */bool IsFull() {return _q.size() == _maxCapacity;}/* 判断容器空 */bool IsEmpty() {return _q.empty();}private:std::queue<T> _q;       // 存储容器int _maxCapacity;       // 标识存储重启最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond;  // 生产者条件变量pthread_cond_t _cCond;  // 消费者条件变量
};

【Task.hpp文件】

#pragma once
#include <iostream>
#include <string>
#include <functional>/* 计算任务 */
class CalTask
{
public:using func_t = std::function<int(int, int, char)>;public:/* 构造函数 */CalTask() {}/* 构造函数 */CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callBalk(func){}public:/* 仿函数 */std::string operator()(){int result = _callBalk(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);return buffer;}public:/* 返回打印公式 */std::string ToTaskString(){char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callBalk;
};/* 执行计算的方法 */
int MyCalculate(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}break;}case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}break;}default:break;}return result;
}/* 保存任务 */
class SaveTask
{
public:using func_t = std::function<void(const std::string &)>;public:/* 构造函数 */SaveTask() {}/* 构造函数 */SaveTask(const std::string &message, func_t func): _message(message), _callBalk(func){}public:/* 仿函数 */void operator()(){_callBalk(_message);}private:std::string _message;func_t _callBalk;
};/* 保存方法 */
void Save(const std::string& massage){std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(fp == NULL){std::cerr << "fopen fail!" << std::endl;return;}fputs(massage.c_str(), fp);fputs("\n", fp);fclose(fp);
}

【BlockQueue.cc文件】

#include "BlockQueue.hpp"
#include "Task.hpp"/* 共享资源Queue封装 */
template <class C, class S>
class BlockQueues
{
public:BlockQueue<C> *c_bq;BlockQueue<S> *s_bq;
};const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;// 生产者进行生产while (true){int x = rand() % 10 + 1;int y = rand() % 10 + 1;int op = rand() % oper.size();CalTask t(x, y, oper[op], MyCalculate);calBq->PushTask(t);std::cout << "Producer-生产任务:" << t.ToTaskString() << std::endl;sleep(2);}return nullptr;
}/* 定义消费者线程 */
void *Consumer(void *args)
{BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;BlockQueue<SaveTask> *serverBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;// 消费者进行消费while (true){CalTask t;calBq->PopTask(&t);std::string messgae = t();std::cout << "Consumer-消费任务:" << messgae << std::endl;SaveTask server(messgae, Save);serverBq->PushTask(server);std::cout << "Consumer-推送保存完成..." << std::endl;}   return nullptr;
}/* 定义保存线程 */
void *Saver(void *args)
{BlockQueue<SaveTask> *saveQ = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;while(true){SaveTask t;saveQ->PopTask(&t);t();std::cout << "SAVER-保存任务完成..." << std::endl;}return nullptr;
}#define T_PRODUCER 5
#define T_CONSUMER 10
/* 入口函数 */
int main()
{// 随机数种子srand((unsigned int)time(nullptr) ^ getpid());// 共享资源BlockQueues<CalTask, SaveTask> bqs;bqs.c_bq = new BlockQueue<CalTask>();bqs.s_bq = new BlockQueue<SaveTask>();// pthread_t tP;// pthread_t tC;// pthread_t tS;// pthread_create(&tP, nullptr, Producer, (void *)&bqs);// pthread_create(&tC, nullptr, Consumer, (void *)&bqs);// pthread_create(&tS, nullptr, Saver, (void *)&bqs);// pthread_join(tP, nullptr);// pthread_join(tC, nullptr);// pthread_join(tS, nullptr);// 创建生产者线程pthread_t tP[T_PRODUCER];for (int i = 0; i < T_PRODUCER; i++){pthread_create(&tP[i], nullptr, Producer, (void *)&bqs);}// 创建消费者线程pthread_t tC[T_CONSUMER];for (int i = 0; i < T_CONSUMER; i++){pthread_create(&tC[i], nullptr, Consumer, (void *)&bqs);}// 创建保存线程pthread_t tS;pthread_create(&tS, nullptr, Saver, (void *)&bqs);// 等待生产者线程回收for(int i = 0; i < T_PRODUCER; i++){pthread_join(*(tP + 1), nullptr);}// 等待消费者线程回收for(int i = 0; i < T_CONSUMER; i++){pthread_join(*(tC + 1), nullptr);}// 等待保存线程回收pthread_join(tS, nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}

【2.3】生产消费模型多生产多消费

【Makefile文件】

 

# 定义变量与参数字符串进行关联
cc=g++
standard=-std=c++11# 定义编译关系
myBackQueue: ThreadBackQueue.cc $(cc) -o $@ $^ $(standard) -lpthread# 定义命令
clean:rm -rf myBackQueue# 配置指令与系统指令分离
.PHONY: clean

【ThreadBackQueue.hpp文件】

#pragma once 
#include <iostream>
#include <queue>
#include <pthread.h>
#include "ThreadMutex.hpp"const int g_capacityMax = 100;/* 生产消费者模型封装类 */
template<class T>
class ThreadBackQueue
{
public:/* - 构造函数*/ThreadBackQueue(const int& capacity = g_capacityMax): _qCapacity(capacity){// 初始化互斥锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者条件变量pthread_cond_init(&_pCond, nullptr);// 初始化消费者条件变量pthread_cond_init(&_cCond, nullptr);}/* - 析构函数*/~ThreadBackQueue() {// 释放互斥锁pthread_mutex_destroy(&_mutex);// 释放生产者条件变量pthread_cond_destroy(&_pCond);// 释放消费者条件变量pthread_cond_destroy(&_cCond);}public:/* - 增加任务*/void Push(const T& in){// 加锁pthread_mutex_lock(&_mutex);// LockGuardMutex(&_mutex);// 判断是否满while(IsFull())pthread_cond_wait(&_pCond, &_mutex); // 去等待// 一定有空位置_q.push(in);// 一定有任务pthread_cond_signal(&_cCond);pthread_mutex_unlock(&_mutex);}/* - 处理任务*/void Pop(T* out){// 加锁pthread_mutex_lock(&_mutex);// LockGuardMutex(&_mutex);// 判断是否空while(IsEmpty())pthread_cond_wait(&_cCond, &_mutex); // 去等待// 一定有任务*out = _q.front(); _q.pop();// 一定有空位置if(GetTaskSize() == 1)pthread_cond_signal(&_pCond);pthread_mutex_unlock(&_mutex);}public: /* - 判断队列满*/bool IsFull(){return _q.size() == _qCapacity;}/* - 判断队列空*/bool IsEmpty(){return _q.empty();}/* - 获取队列中任务个数*/size_t GetTaskSize(){return _q.size();}private:std::queue<T>       _q;             // 消息队列缓冲区size_t              _qCapacity;     // 消息队列容量pthread_mutex_t     _mutex;         // 互斥锁pthread_cond_t      _pCond;         // 生产者条件变量pthread_cond_t      _cCond;         // 消费者条件变量
};

【ThreadTask.hpp文件】

#pragma once
#include <cstdio>
#include <iostream>
#include <string>
#include <functional>
#include "ThreadBackQueue.hpp"
class TaskCalculate;
class TaskSave;template<class C, class S>
class ThreadBackQueues
{
public:ThreadBackQueue<C>* _cTask;ThreadBackQueue<S>* _sTask;
};class TaskCalculate
{
private:// 定义仿函数using func_t = std::function<int(const int, const int, const char)>;public:/* - 无参构造函数 */TaskCalculate(){}/* - 带参数的构造函数*/TaskCalculate(func_t func, const int x, const int y, const char op): _func(func), _x(x), _y(y), _op(op){}public:/* - ()运算符重载*/std::string operator()(){int result = _func(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);return buffer;}public:std::string TaskString(){char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);return buffer;}private:int     _x;    // 第一个计算值int     _y;    // 第二个计算值char    _op;   // 第三个计算值func_t  _func; // 仿函数类型
};int Calculate(const int x, const int y, const char op)
{int calRet = 0;switch(op){case '+':{calRet = x + y;break;}case '-':{calRet = x - y;break;}case '*':{calRet = x * y;break;}case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;calRet = -1;}else{calRet = x / y;}break;}case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;calRet = -1;}else{calRet = x % y;}break;            }default:{break;}}return calRet;
};class TaskSave
{
private:using func_t = std::function<void(const std::string&)>;public:/* - 无参构造函数 */TaskSave() {}/* - 带参构造函数 */TaskSave(func_t func, const std::string& msg) : _func(func), _msg(msg){}public:/* - ()运算符重载*/void operator()(){_func(_msg);}private:std::string _msg;   func_t      _func;
};void FileSave(const std::string& msg)
{// 创建打开文件目录std::string target = "./Log.txt";// 打开文件FILE* fpath = fopen(target.c_str(), "a+");if(fpath == nullptr){std::cerr << "fopen fail!" << std::endl;return;}// 写入文件fputs(msg.c_str(), fpath);fputs("\n", fpath);// 关闭文件fclose(fpath);
}

【ThreadBase.hpp文件】

#pragma once  
#include <cstdio>
#include <cassert>
#include <iostream>
#include <functional>
#include <string>#include <pthread.h>
class ThreadBase;/* 线程上下文数据封装类 */
class ThreadBaseConnectText
{
public:ThreadBaseConnectText(): _textThis(nullptr), _textArgs(nullptr){}
public: ThreadBase*  _textThis;void*        _textArgs;
};/* 基于原生线程库的线程封装类 */
class ThreadBase
{
private:const int ctNum = 64;public:// 定义仿函数using func_t = std::function<void*(void*)>;public:public:/* - 构造函数* - func: 线程回调函数* - args:线程回调函数参数* - num : 编写线程名称设定的编号 */ThreadBase(func_t func, void* args = nullptr, const int& num = 1): _threadCallBack(func), _threadArgs(args){   // 自定义线程名称char nameBuffer[ctNum];snprintf(nameBuffer, sizeof(nameBuffer), "thread-%d", num);_threadName = nameBuffer;// 创建线程连接上下文 - 手动释放内存 - 【01】ThreadBaseConnectText* connectText = new ThreadBaseConnectText();connectText->_textThis = this;connectText->_textArgs = _threadArgs;int state = pthread_create(&_threadId, nullptr, StartRoutine, (void*)connectText);assert(state == 0); (void)state;}/* - 析构函数*/~ThreadBase() {}public:/* - 线程等待*/void Join(){int state = pthread_join(_threadId, nullptr);assert(state == 0); (void)state;   }public: /* - 获取线程名称*/std::string GetThreadName(){return _threadName;}/* - 获取线程Id*/std::string GetThreadId(){char buffer[ctNum];snprintf(buffer, sizeof(buffer), "0x%x", _threadId);return buffer;}public:/* - 线程函数*/static void* StartRoutine(void* args){ThreadBaseConnectText* connectText = static_cast<ThreadBaseConnectText*>(args);void* retVal = connectText->_textThis->Run(connectText->_textArgs);// 释放内存 - 【01】delete connectText;// 返回return retVal;}private:/* - StartRoutine专用函数(因为C/C++混编的原因)*/void* Run(void* args){// 调用回调线程return _threadCallBack(args);}private:std::string        _threadName;         // 线程名称pthread_t          _threadId;           // 线程Idfunc_t             _threadCallBack;     // 线程回调函数void*              _threadArgs;         // 线程回调函数参数
};

【ThreadMutex.hpp文件】

#pragma once 
#include <pthread.h>/* 原生线程锁类封装 */
class Mutex
{
public:/* - 构造函数*/Mutex(pthread_mutex_t* mutex): _pMutex(mutex){}/* - 析构函数*/~Mutex() {}public:/* - 加锁函数*/void Lock() { pthread_mutex_lock(_pMutex); }/* - 解锁函数*/void UnLock() { pthread_mutex_unlock(_pMutex); }private:pthread_mutex_t*    _pMutex;    // 内部的线程锁
};class LockGuardMutex
{
public:/* - 构造函数*/LockGuardMutex(pthread_mutex_t* mutex): _mutex(mutex){_mutex.Lock();}/* - 析构函数*/~LockGuardMutex(){_mutex.UnLock();}private:Mutex   _mutex;
};

【ThreadBackQueue.cc文件】

#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <unistd.h>
#include "ThreadBase.hpp"
#include "ThreadBackQueue.hpp"
#include "ThreadTask.hpp"std::string g_ops = "+-*/%";/* - 生产者线程函数*/
void* ProducerThread(void* args)
{ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;// 生产while(true){int x = rand() % 100;int y = rand() % 100;int o = rand() % g_ops.size();TaskCalculate task(Calculate, x, y, g_ops[o]);tBQ->Push(task);std::cout << "生产者在生产任务-> " << "[" << task.TaskString() << "] - - 队列任务个数为:" << tBQ->GetTaskSize()  << std::endl;sleep(1);}return nullptr;
}/* - 消费者线程函数*/
void* ConsumeThread(void* args)
{ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;// 消费while(true){TaskCalculate calculateTask;tBQ->Pop(&calculateTask);std::string strRet = calculateTask();std::cout << "消费者在消费任务-> " << "[" << strRet << "] - - 队列任务个数为:" << tBQ->GetTaskSize() << std::endl;TaskSave saveTask(FileSave, strRet);sBQ->Push(saveTask);std::cout << "消费者在保存任务-> " << "[" << strRet << "] - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;sleep(3);}return nullptr;
}/* - 保存者线程函数*/
void* SaveThread(void* args)
{ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;while(true){// 消费保存任务TaskSave saveTask;sBQ->Pop(&saveTask);// 执行保存任务saveTask();// 保存完成任务std::cout << "保存任务完成"<< " - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;}return nullptr;
}/* - 程序入口函数*/
int main()
{// 创建随机数种子srand((unsigned int)time(nullptr));// 创建共享资源ThreadBackQueues<TaskCalculate, TaskSave>* pBQ = new ThreadBackQueues<TaskCalculate, TaskSave>();pBQ->_cTask = new ThreadBackQueue<TaskCalculate>;pBQ->_sTask = new ThreadBackQueue<TaskSave>;// 创建生产者线程std::unique_ptr<ThreadBase> ptr_pt1(new ThreadBase(ProducerThread, (void*)pBQ, 1));std::unique_ptr<ThreadBase> ptr_pt2(new ThreadBase(ProducerThread, (void*)pBQ, 2));std::unique_ptr<ThreadBase> ptr_pt3(new ThreadBase(ProducerThread, (void*)pBQ, 3));std::unique_ptr<ThreadBase> ptr_pt4(new ThreadBase(ProducerThread, (void*)pBQ, 4));std::unique_ptr<ThreadBase> ptr_pt5(new ThreadBase(ProducerThread, (void*)pBQ, 5));std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt1->GetThreadName() << " 线程Id:" << ptr_pt1->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt2->GetThreadName() << " 线程Id:" << ptr_pt2->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt3->GetThreadName() << " 线程Id:" << ptr_pt3->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt4->GetThreadName() << " 线程Id:" << ptr_pt4->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt5->GetThreadName() << " 线程Id:" << ptr_pt5->GetThreadId() << std::endl;sleep(10);// 创建消费者线程std::unique_ptr<ThreadBase> ptr_ct1(new ThreadBase(ConsumeThread, (void*)pBQ, 11));std::unique_ptr<ThreadBase> ptr_ct2(new ThreadBase(ConsumeThread, (void*)pBQ, 12));std::unique_ptr<ThreadBase> ptr_ct3(new ThreadBase(ConsumeThread, (void*)pBQ, 13));std::unique_ptr<ThreadBase> ptr_ct4(new ThreadBase(ConsumeThread, (void*)pBQ, 14));std::unique_ptr<ThreadBase> ptr_ct5(new ThreadBase(ConsumeThread, (void*)pBQ, 15));std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct1->GetThreadName() << " 线程Id:" << ptr_ct1->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct2->GetThreadName() << " 线程Id:" << ptr_ct2->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct3->GetThreadName() << " 线程Id:" << ptr_ct3->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct4->GetThreadName() << " 线程Id:" << ptr_ct4->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct5->GetThreadName() << " 线程Id:" << ptr_ct5->GetThreadId() << std::endl;sleep(10);// 创建保存者线程std::unique_ptr<ThreadBase> ptr_st1(new ThreadBase(SaveThread, (void*)pBQ, 21));std::unique_ptr<ThreadBase> ptr_st2(new ThreadBase(SaveThread, (void*)pBQ, 22));std::unique_ptr<ThreadBase> ptr_st3(new ThreadBase(SaveThread, (void*)pBQ, 23));std::unique_ptr<ThreadBase> ptr_st4(new ThreadBase(SaveThread, (void*)pBQ, 24));std::unique_ptr<ThreadBase> ptr_st5(new ThreadBase(SaveThread, (void*)pBQ, 25));std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st1->GetThreadName() << " 线程Id:" << ptr_st1->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st2->GetThreadName() << " 线程Id:" << ptr_st2->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st3->GetThreadName() << " 线程Id:" << ptr_st3->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st4->GetThreadName() << " 线程Id:" << ptr_st4->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st5->GetThreadName() << " 线程Id:" << ptr_st5->GetThreadId() << std::endl;// 等待线程结束ptr_pt1->Join();ptr_pt2->Join();ptr_pt3->Join();ptr_pt4->Join();ptr_pt5->Join();ptr_ct1->Join();ptr_ct2->Join();ptr_ct3->Join();ptr_ct4->Join();ptr_ct5->Join();ptr_st1->Join();ptr_st2->Join();ptr_st3->Join();ptr_st4->Join();ptr_st5->Join();delete pBQ->_cTask;delete pBQ->_sTask;delete pBQ;return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/138796.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

速卖通新品如何推广,速卖通的推广渠道有哪些?——站斧浏览器

速卖通的推广渠道非常多样化&#xff0c;卖家可以根据自己的需求和预算选择合适的渠道来推广产品&#xff0c;提高曝光度和销售量&#xff0c;能够有效地提高产品的知名度和信任度。 速卖通新品如何推广&#xff1f; 速卖通上有数以百万计的卖家&#xff0c;每天都有大量的新…

位图(bitmap)原理以及实现

大家好&#xff0c;我是蓝胖子&#xff0c;我一直相信编程是一门实践性的技术&#xff0c;其中算法也不例外&#xff0c;初学者可能往往对它可望而不可及&#xff0c;觉得很难&#xff0c;学了又忘&#xff0c;忘其实是由于没有真正搞懂算法的应用场景&#xff0c;所以我准备出…

tp5连接多个数据库

一、如果你的主数据库配置文件都在config.php里 直接在config.php中中定义db2&#xff1a; 控制器中打印一下&#xff1a; <?php namespace app\index\controller; use think\Controller; use think\Db; use think\Request; class Index extends Controller {public fun…

千兆以太网传输层 UDP 协议原理与 FPGA 实现

文章目录 前言心得体会一、UDP 协议介绍二、UDP 数据报格式三、UDP 数据发送测试四、Verilog实现UDP 数据发送1、IP 头部检验 IPchecksun 的计算2、以太网报文的校验字段 FCS 的计算3、以太网报文发送模块实现五、以太网数据发送测试六、仿真代码七、仿真波形展示八、上板测试九…

一百八十二、大数据离线数仓——离线数仓从Kafka采集、最终把结果数据同步到ClickHouse的完整数仓流程(待续)

一、目的 经过6个月的奋斗&#xff0c;项目的离线数仓部分终于可以上线了&#xff0c;因此整理一下离线数仓的整个流程&#xff0c;既是大家提供一个案例经验&#xff0c;也是对自己近半年的工作进行一个总结。 二、项目背景 项目行业属于交通行业&#xff0c;因此数据具有很…

yo!这里是c++中的多态

前言 在学完继承之后&#xff0c;紧接着我们来认识多态&#xff0c;建议继承不太熟的先把继承部分的知识点搞熟&#xff0c;再来学习多态&#xff0c;否则会走火入魔&#xff0c;会混乱。因为多态是建立在继承的基础之上&#xff0c;而且多态中还存在与继承类似的概念&#xff…

如何使用jenkins、ant、selenium、testng搭建自动化测试框架

如果在你的理解中自动化测试就是在eclipse里面讲webdriver的包引入&#xff0c;然后写一些测试脚本&#xff0c;这就是你所说的自动化测试&#xff0c;其实这个还不能算是真正的自动化测试&#xff0c;你见过每次需要运行的时候还需要打开eclipse然后去选择运行文件吗&#xff…

Linux Ubuntu命令行快速配置C++开发环境

本文介绍在Linux操作系统的Ubuntu版本中&#xff0c;基于命令行&#xff0c;快速配置C 编辑、编译、运行的代码开发环境的简便方法。 在之前的文章Linux操作系统Ubuntu 22.04配置Visual Studio Code与C代码开发环境的方法(https://blog.csdn.net/zhebushibiaoshifu/article/det…

C++标准模板库——vector的使用及其模拟实现

目录 一. vector的介绍 1.vector的介绍 二.vector的使用 vector中常见接口的介绍vector的构造和析构函数vector的三种遍历方式 三.vector的模拟实现 vector的增删查改vector容器的容量变化和大小增减vector迭代器失效问题vector的小框架 构造函数和析构函数迭代器和operat…

基于Java的高校竞赛管理系统设计与实现(亮点:发起比赛、报名、审核、评委打分、获奖排名,可随意更换主题如蓝桥杯、ACM、王者荣耀、吃鸡等竞赛)

高校竞赛管理系统 一、前言二、我的优势2.1 自己的网站2.2 自己的小程序&#xff08;小蔡coding&#xff09;2.3 有保障的售后2.4 福利 三、开发环境与技术3.1 MySQL数据库3.2 Vue前端技术3.3 Spring Boot框架3.4 微信小程序 四、功能设计4.1 主要功能描述4.2 系统角色 五、系统…

vue获取本地缓存并转为json格式

场景 要求获取当前登录用户id&#xff0c;传入后台去筛选属于该用户的数据&#xff1b; 当前登录用户信息一般会在本地存储中&#xff0c;有些则是在session中&#xff0c;此处只对本地存储做讨论&#xff1b; 本地缓存的用法 1 存储数据 localStorage.setltem(userId,"…

VS Code用AI写代码:Codeium插件

文章目录 Codeiumchat代码生成 Codeium Codeium是基于边缘计算的代码AI工具&#xff0c;提供超过70种编程语言的代码补全、对话、搜索等功能&#xff0c;相当霸道。 在插件栏搜索到Codeium之后&#xff0c;需要科学上网安装&#xff0c;安装完成后会提示注册。注册之后&#…

2023-9-22 滑雪

题目链接&#xff1a;滑雪 #include <cstring> #include <algorithm> #include <iostream>using namespace std;const int N 310;int n, m; int h[N][N]; int f[N][N];int dx[4] {-1, 0, 1, 0}, dy[4] {0, 1, 0, -1};int dp(int x, int y) {int &v f…

MySQL数据库简介+库表管理操作+数据库用户管理

Mysql Part 1 一、数据库的基本概念1.1 使用数据库的必要性1.2 数据库基本概念1.2.1 数据&#xff08;Data&#xff09;1.2.2 表1.2.3 数据库1.2.4 数据库管理系统&#xff08;DBMS&#xff09;1.2.5 数据库系统 1.3 数据库的分类1.3.1 关系数据库 SQL1.3.2 非关系数据库 NoSQL…

看阿里测试工程师如何玩转postman+newman+jenkins接口自动化

【软件测试面试突击班】如何逼自己一周刷完软件测试八股文教程&#xff0c;刷完面试就稳了&#xff0c;你也可以当高薪软件测试工程师&#xff08;自动化测试&#xff09; postman用来做接口测试非常方便&#xff0c;接口较多时&#xff0c;则可以实现接口自动化 一、环境准备…

【DLL修复工具下载】一键修复电脑丢失d3dcompiler_47.dll问题方法

在我们使用电脑的过程中&#xff0c;有时候会遇到一些错误提示&#xff0c;其中“缺失 d3dcompiler_47.dll”就是比较常见的一种。那么&#xff0c;d3dcompiler_47.dll 到底是什么呢&#xff1f;为什么会出现缺失的情况&#xff1f;丢失 d3dcompiler_47.dll 又会对电脑产生什么…

芋道商城,基于 Vue + Uniapp 实现,支持分销、拼团、砍价、秒杀、优惠券、积分、会员等级、小程序直播、页面 DIY 等功能

商城简介 芋道商城&#xff0c;基于 芋道开发平台 构建&#xff0c;以开发者为中心&#xff0c;打造中国第一流的 Java 开源商城系统&#xff0c;全部开源&#xff0c;个人与企业可 100% 免费使用。 有任何问题&#xff0c;或者想要的功能&#xff0c;可以在 Issues 中提给艿艿…

低代码助力企业数字化转型

在当今这个数字化快速发展的时代&#xff0c;企业面临的竞争越来越激烈&#xff0c;数字化转型已成为企业发展的必经之路。低代码平台作为一种新型的开发工具&#xff0c;正在逐渐成为企业数字化转型的重要助力。本文将从数字化转型背景、低代码平台介绍、低代码平台的应用、低…

SIEM 中的事件关联

什么是 SIEM 中的事件关联 SIEM 中的事件关联可帮助安全团队识别来自不同来源的安全事件并确定其优先级&#xff0c;从而提供更全面的整体安全环境视图。 在典型的 IT 环境中&#xff0c;会跨各种系统和应用程序生成大量事件和日志。孤立地看&#xff0c;其中许多事件可能看起…

聊一聊 TLS/SSL

哈喽大家好&#xff0c;我是咸鱼 当我们在上网冲浪的时候&#xff0c;会在浏览器界面顶部看到一个小锁标志&#xff0c;或者网址以 “https://” 开头 这意味着我们正在使用 TLS/SSL 协议进行安全通信。虽然它可能看起来只是一个小小的锁图标和一个 “https” &#xff0c;但…