Linux操作系统7- 线程同步与互斥5(POSIX条件变量生产者消费者模型的进一步使用)

上篇文章:Linux操作系统7- 线程同步与互斥4(基于POSIX条件变量的生产者消费者模型)-CSDN博客

本篇代码仓库:

支持处理简单任务的生产者消费者模型代码

生产者-消费者-保存者三线程两队列模型

多生产多消费的生产者消费者模型

        进一步使用生产者消费者模型不需要修改Queue.hpp

#pragma once
#include <iostream>
#include <queue> //使用queue作为阻塞队列#include <unistd.h>
#include <pthread.h>
const int gnum = 10; // 阻塞队列的最大容量template <class T>
class BlockQueue
{
public:BlockQueue(int maxnum = gnum) : _maxnum(maxnum){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}// 生产者生产数据void push(const T &in){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否满足生产while (is_full()){// 数据满了生产者等待消费者消费pthread_cond_wait(&_pcond, &_mtx);}// 生产数据_queue.push(in);// 队列不为空,通知消费者消费pthread_cond_signal(&_ccond);// 解锁pthread_mutex_unlock(&_mtx);}// 消费这消费数据,通过输入输出型参数获取数据void pop(T *out){// 加锁保护pthread_mutex_lock(&_mtx);// 判断是否可以消费数据while (is_empty()){// 等待生产者生产数据pthread_cond_wait(&_ccond, &_mtx);}// 开始消费数据*out = _queue.front();_queue.pop();// 队列不满,通知生产者生产数据pthread_cond_signal(&_pcond);// 解锁pthread_mutex_unlock(&_mtx);}private:bool is_empty(){return _queue.empty();}bool is_full(){return _queue.size() == _maxnum;}private:std::queue<T> _queue;  // 阻塞队列size_t _maxnum;        // 队列最大容量pthread_mutex_t _mtx;  // 互斥锁pthread_cond_t _pcond; // 生产者条件变量,满了需要休眠pthread_cond_t _ccond; // 消费者条件变量,无数据要休眠
};

目录

一. 生存消费模型处理简单任务

1.1 需求分析

1.2 Task.hpp

1.3 Main.cpp 

1.4 测试

二. 生产者-消费者-保存者-三线程两队列

2.1 Task.hpp 

2.2 Main.cpp

2.3 测试

三. 多生产多消费模型 

四. 多生产多消费的意义


一. 生存消费模型处理简单任务

1.1 需求分析

        上篇文章中,我们使用了生存者消费者模型实现了生产者产生随机数据放入BlockQueue,而消费者从BlockQueue中拿取数据输出。

        现在想让生产者消费者模型处理一个简单的任务。比如:生产者产生两个随机数据和运算符,而消费者拿取数据获取计算结果并输出。

        此时就需要一个Task.hpp来构造任务。

1.2 Task.hpp

        创建一个结构体,内部包含计算数据,计算操作符,以及()重载

代码框架如下:

#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:
private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};

        具体实现如下:需要增加构造函数,析构函数,重载(),回调函数等。

#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}~CalTask() {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};// 计算函数
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}

1.3 Main.cpp 

        根据分析,生产者生产数据然后交给消费者计算并输出。

#include <iostream>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *bq = static_cast<BlockQueue<CalTask> *>(args);while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask t(x, y, op, my_math);bq->push(t);usleep(500000);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *bq = static_cast<BlockQueue<CalTask> *>(args);while (true){CalTask t;bq->pop(&t);std::cout << "消费者获取数据并计算:" << t() << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义生产消费线程与阻塞队列pthread_t p;pthread_t c;BlockQueue<CalTask> *bq = new BlockQueue<CalTask>();pthread_create(&p, nullptr, producer, (void *)bq);pthread_create(&c, nullptr, consumer, (void *)bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;bq = nullptr;return 0;
}

1.4 测试

测试结果如下:

        可以看到,通过生产者消费者模型。消费者成功的获取了来自生产传递的数据,并且计算这个结果。

        如果有需要的话,可以将更复杂的任务交给生产者消费者模型。比如网络/本地的IO操作,时间复杂度较高的运算。 

二. 生产者-消费者-保存者-三线程两队列

        能否实现一个生产者生产数据交给消费者处理,消费者处理完成之后再交给保存者将结果保存在文件中?

        生产线程生产任务传递给任务队列

        消费线程从任务队列读取任务,消费完成任务之后将结果传递给保存队列

        保存线程从保存队列读取数据并写入文件中

        同时需要保证生产者消费者之间的同步,消费者与保存者之间的同步。

2.1 Task.hpp 

        需要给保存者一个任务用于保存,以解耦主函数和任务处理。

#pragma once
#include <iostream>
#include <functional>class CalTask
{// 使用c++11 using 和 包装器using func_t = std::function<int(int, int, char)>;// 当然也可以使用函数指针// typedef int (*func_t)(int, int, char);public:CalTask() {}CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func) {}~CalTask() {}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof(buffer) - 1, "%d %c %d = %d", _x, _op, _y, result);return buffer;}private:int _x;int _y;char _op;func_t _callback; // 通过回调函数调用计算
};class SaveTask
{typedef void (*func_t)(const std::string &);public:SaveTask(std::string _result = "", func_t func = Save): _callback(func) {}void operator()(){_callback(_result);}private:std::string _result;func_t _callback;
};// 计算函数
int my_math(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero" << std::endl;return -1;}else{result = x / y;}break;}case '%':if (y == 0){std::cerr << "moved zero" << std::endl;return -1;}else{result = x % y;}break;default:break;}return result;
}void Save(const std::string &result)
{const std::string task_pwd = "./result.txt";FILE *fp = fopen(task_pwd.data(), "a"); // 需要追加写入if (fp == nullptr){std::cerr << "fopen error" << std::endl;}fputs(result.c_str(), fp);fclose(fp);fp = nullptr;
}

2.2 Main.cpp

        为了能够让消费者同时访问两个队列,需要一个结构体能够存储两个队列。

在BlockQueue.hpp中新增下面的代码即可

template <class C, class S>
struct BlockQueues
{BlockQueue<C> *_c_bq;BlockQueue<S> *_s_bq;
};

        主函数中需要新增一个保存者函数,用于保存者线程拿取数据并保存于文件中。同时消费者也需要新增一段将数据传递给保存者的逻辑 。

#include <iostream>
#include <memory>
#include <string>#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"const std::string OP = "+-*/%";
void *producer(void *args)
{// 获取交易场所 - 阻塞队列BlockQueue<CalTask> *cal_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_c_bq;while (true){int x = rand() % 100;int y = rand() % 100;char op = OP[rand() % OP.size()];// 打印日志printf("生产者生产的数据:%d %c %d 并交给消费者计算\n", x, op, y);CalTask ct(x, y, op, my_math);cal_bq->push(ct);sleep(1);}return nullptr;
}void *consumer(void *args)
{// 获取交易场所 - 生产消费阻塞队列,消费保存阻塞队列BlockQueue<CalTask> *cal_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_c_bq;BlockQueue<SaveTask> *save_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_s_bq;while (true){// 获取任务计算CalTask ct;cal_bq->pop(&ct);std::string result = ct();std::cout << "消费者获取数据并计算:" << result << std::endl;// 将任务传递给保存者SaveTask st(result, Save);std::cout << "消费者获取传递计算结果给保存者:" << result << std::endl;save_bq->push(st);}return nullptr;
}void *saver(void *args)
{// 获取交易场所 - 消费保存阻塞队列BlockQueue<SaveTask> *save_bq = static_cast<BlockQueues<CalTask, SaveTask> *>(args)->_s_bq;while (true){SaveTask st;save_bq->pop(&st);st();std::cout << "保存者成功读取数据并保存在文件中:" << std::endl;}return nullptr;
}int main()
{srand(time(0) ^ getpid() ^ rand());// 定义消费生产保存线程,与两个阻塞队列BlockQueues<CalTask, SaveTask> *bqs = new BlockQueues<CalTask, SaveTask>();bqs->_c_bq = new BlockQueue<CalTask>;bqs->_s_bq = new BlockQueue<SaveTask>;pthread_t p;pthread_t c;pthread_t s;pthread_create(&p, nullptr, producer, (void *)bqs);pthread_create(&c, nullptr, consumer, (void *)bqs);pthread_create(&s, nullptr, saver, (void *)bqs);pthread_join(p, nullptr);pthread_join(c, nullptr);pthread_join(s, nullptr);delete bqs->_c_bq;delete bqs->_s_bq;return 0;
}

2.3 测试

三. 多生产多消费模型 

        只需要更改主函数即可实现多生产多消费模型

int main()
{srand((unsigned int)time(0) ^ getpid());// 建立任务队列和保存队列BlockQueues<CalTask, SaveTask> *bqs = new BlockQueues<CalTask, SaveTask>;bqs->_c_bq = new BlockQueue<CalTask>;bqs->_s_bq = new BlockQueue<SaveTask>;pthread_t c[3], p[2], s;pthread_create(c, nullptr, consumer, (void *)bqs);pthread_create(c + 1, nullptr, consumer, (void *)bqs);pthread_create(c + 2, nullptr, consumer, (void *)bqs);pthread_create(p, nullptr, producer, (void *)bqs);pthread_create(p + 1, nullptr, producer, (void *)bqs);pthread_create(&s, nullptr, saver, (void *)bqs);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(c[2], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(s, nullptr);delete bqs->_c_bq;delete bqs->_s_bq;return 0;
}

运行结果如下:

 

四. 多生产多消费的意义

        即便有多个生产者,多个消费者。但是锁只有一个,所以同一时刻只能有一个线程在临界区中执行代码。那么多生产多消费有什么意义?

        1 当生产者A将数据传递给队列后,产生新的数据非常耗时间,此时其他生产者可以获取锁并投放数据,而生产者A可以同时产生数据

        2 当消费者拿到一个数据进行消费的时候,其他消费者仍可以从队列中拿取新数据进行消费。而不需要等待消费者A消费完了其他消费者才去消费

        3 即可以让生产者线程生产之前,消费者线程消费之后。让线程并发执行(而不是提高存取,拿取数据的效率)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/38697.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【嵌入式学习2】C语言 - VScode环境搭建

目录 ## 语言分类 ## c语言编译器 ## VScode相关配置 ## 语言分类 编译型语言&#xff1a;C&#xff0c;C解释型语言&#xff1a;python&#xff0c;JS ## c语言编译器 分类GCC 系列MinGWCygwinMSVC系列一套编程语言编译器将GCC编译器和GNU Binutils移植到Win32平台下的产物…

使用Doris broker load导入数据到带Kerberos的HA HDFS的命令详解

以下是关于 Doris Broker Load 结合 Kerberos 认证的 HDFS 数据导入的详细解析&#xff1a; 一、Broker Load 核心原理 Broker Load 是 Doris 中用于从 HDFS/S3 等远程存储系统异步导入大数据的工具&#xff0c;其核心流程如下&#xff1a; 任务提交&#xff1a;用户通过 SQL…

数字化转型 2.0:AI、低代码与智能分析如何重塑企业竞争力?

引言&#xff1a;数字化转型进入2.0时代 在过去的十几年里&#xff0c;企业的数字化转型&#xff08;1.0&#xff09;主要围绕信息化和自动化展开&#xff0c;例如引入ERP、CRM等系统&#xff0c;提高办公效率&#xff0c;减少人为失误。然而&#xff0c;随着市场竞争加剧&…

指针,数组 易混题解析(一)

目录 一.相关知识点 1.数组名是什么&#xff1f; 两个例外&#xff1a; 2.strlen 3.sizeof 4. * ( ) 与 [ ] 的互换 二.一维数组 三.字符数组 1. 字符 &#xff08;1&#xff09;sizeof &#xff08;2&#xff09;strlen 2.字符串 &#xff08;1&#xff09;si…

ABC392题解

A 算法标签: 模拟 #include <iostream> #include <algorithm> #include <cstring>using namespace std;int main() {ios::sync_with_stdio(false);cin.tie(0), cout.tie(0);int w[3];for (int i 0; i < 3; i) cin >> w[i];sort(w, w 3);if (w[0]…

Quartus + VScode 实现模块化流水灯

文章目录 一、通过VScode编写Verilog代码二、模块化编程三、代码示例 一、通过VScode编写Verilog代码 1、下载Vscode 2、下载相关插件 搜索Verilog就会弹出有如图所示的插件&#xff0c;下载并安装 3、创建Quartus项目 4、创建完成后点击Tools&#xff0c;选择Options 然后在…

简单讲一下控制系统所用的PID公式

2025年3月23日电子电工实验室讲课大纲 哈喽&#xff0c;小伙伴们大家好&#xff0c;今天我们来讲一下PID&#xff01;首先我们的针对的场景是什么——循迹小车&#xff01; 就是我们刚入手STM32时&#xff0c;我们可能会制作一个循迹小车。我们想让那个小车走直线&#xff0c;但…

直观理解ECC椭圆曲线加密算法

数学还是挺有逻辑的&#xff0c;给出计算的操作步骤 就能得出想要结果 背景&#xff1a; ● ECC 是一种极其巧妙的 非对称加密算法 , 其完美利用了 椭圆曲线几何累加 不可逆的性质 ● 拥有 密钥体积小&#xff0c;计算速度快的优势&#xff0c;被广泛用于各种区块链&#xff0c…

深度解析 Android Matrix 变换(二):组合变换 pre、post

前言 在上一篇文章中&#xff0c;我们讲解了 Canvas 中单个变换的原理和效果&#xff0c;即缩放、旋转和平移。但是单个旋转仅仅是基础&#xff0c;Canvas 变换最重要的是能够随意组合各种变换以实现想要的效果。在这种情况下&#xff0c;就需要了解如何组合变换&#xff0c;以…

c++之迭代器

一.迭代器的基本概念 1.什么是迭代器 迭代器是一种对象&#xff0c;它提供了一种访问容器中各个元素的方法&#xff0c;同时隐藏了容器内部的实现细节。简单来说&#xff0c;迭代器就像是一个指针&#xff0c;它可以指向容器中的某个元素&#xff0c;并且能够通过一些操作&am…

在 .NET 9.0 Web API 中实现 Scalar 接口文档及JWT集成

示例代码&#xff1a;https://download.csdn.net/download/hefeng_aspnet/90408075 介绍 随着 .NET 9 的发布&#xff0c;微软宣布他们将不再为任何 .NET API 项目提供默认的 Swagger gen UI。以前&#xff0c;当我们创建 .NET API 项目时&#xff0c;微软会自动添加 Swagger…

【操作系统笔记】操作系统的功能

上节课,我们学习了《什么是操作系统》。接下来,我们来看看操作系统有哪些功能? 这里讲的内容有两部分,一个是操作系统的目标,另外一个就是操作系统的功能。这两个细节可能会在考试的时候考到,但是最近好些年很少考到了。为了理解,我们还是一起来看一下。 操作系统的目标…

C/C++蓝桥杯算法真题打卡(Day7)

一、P8723 [蓝桥杯 2020 省 AB3] 乘法表 - 洛谷 算法代码&#xff1a; #include<bits/stdc.h> // 包含标准库中的所有头文件&#xff0c;通常用于竞赛编程中简化代码 using namespace std; // 使用标准命名空间&#xff0c;避免每次调用标准库函数时都要加std:: ty…

数据结构5(初):排序

目录 1、排序的概念以及常见的排序算法 1.1、排序的概念 1.2、常见的排序算法 2、常见排序算法的实现 2.1、插入排序 2.1.1、直接插入排序 2.1.2、希尔排序 2.2、选择排序 2.2.1、直接选择排序 2.2.2、堆排序 2.3、交换排序 2.3.1、冒泡排序 2.3.2、快速排序 2.3.…

VS2022中通过VCPKG安装的ceres之后调试ceres的例程设置

1.采用C20. vcpkg中设置: 2.增加预处理宏: GLOG_USE_GLOG_EXPORT 3.屏蔽sdl错误 在 项目-属性-C/C -命令行中添加 /sdl /w34996 #include "ceres/ceres.h" //#include <iostream> //#include<glog/logging.h>using ceres::AutoDiffCostFunction; usi…

Pydantic字段级校验:解锁@validator的12种应用

title: Pydantic字段级校验:解锁@validator的12种应用 date: 2025/3/23 updated: 2025/3/23 author: cmdragon excerpt: Pydantic校验系统支持通过pre验证器实现原始数据预处理,在类型转换前完成字符清洗等操作。格式验证涵盖正则表达式匹配与枚举值约束,确保护照编号等字…

函数递归和迭代

1.什么是递归&#xff1f; 在C语言中递归就是自己调用自己。 看一下简单函数的递归&#xff1a; 上面的代码实现演示一下函数的递归&#xff0c;最终是会陷入死循环的&#xff0c;栈溢出 。 1.1递归的思想&#xff1a; 把一个大型的问题一步一步的转换成一个个小的子问题来解…

发票查验/发票验真如何用Java实现接口调用

一、什么是发票查验&#xff1f;发票验真接口&#xff1f; 输入发票基本信息发票代码、发票号码、开票日期、校验码后6位、不含税金额、含税金额&#xff0c;核验发票真伪。 该接口也适用于机动车、二手车销售发票、航空运输电子客票、铁路电子客票等。 二、如何用Java实现接口…

AM32-MultiRotor-ESC项目固件编译和烧录方法介绍

AM32-MultiRotor-ESC项目固件编译和烧录方法介绍 &#x1f4cd;AM32-MultiRotor-ESC项目地址:https://github.com/AlkaMotors/AM32-MultiRotor-ESC-firmware&#x1f388;Updater with V8 Bootloader&#xff1a; https://github.com/AlkaMotors/F051_Bootloader_Updater&#…

HarmonyOS:@AnimatableExtend 装饰器自学指南

在最近的项目开发中&#xff0c;我遇到了需要实现复杂动画效果的需求。在探索解决方案的过程中&#xff0c;我发现了 AnimatableExtend 装饰器&#xff0c;它为实现动画效果提供了一种非常灵活且强大的方式。然而&#xff0c;在学习这个装饰器的过程中&#xff0c;我发现相关的…