Linux 生产者消费者模型

1. 背景概念

假设现在有多个线程,一部分线程负责生产任务,称为生产者productor,另一部线程负责执行任务,称为消费者consumer,他们之间是一对一一对一一对一的关系。

现在生产者productor-3有任务要派发,但是consumer-3正在执行上一个任务,于是productor-3就只能等待。但是此时明明有两个线程consumer-1consumer-2是空闲的,它们却不能执行这个任务,这就出现了很明显的资源分配不合理问题。

于是提出了生产者消费者模型,即通过一个容器来解决生产者和消费者的强耦合问题:

我们可以把这个任务队列看成一个缓冲区,生产者负责生成数据或任务,并将其放入该共享的缓冲区中;消费者则从共享缓冲区中取出数据或任务进行处理。

这个模型的目的是解决生产者和消费者之间的协调问题,尤其是在多线程或多进程环境中,当两者的工作速率不一致时,如何有效地管理资源和避免数据竞争或死锁。

这个任务队列的实现上我们有两种形式,分别是阻塞队列BlockQueue以及环形队列。

2. 阻塞队列BlockQueue

阻塞队列是实现生产者消费者模型的一种常用数据结构。在阻塞队列中,当队列为空时,消费者尝试获取元素的操作会被阻塞,直到生产者向队列中添加了新的元素。同样,当队列已满时,生产者尝试添加新元素的操作会被阻塞,直到消费者从队列中取出了元素。这种机制自然地实现了生产者和消费者之间的同步。 

阻塞队列BlockQueue的成员变量

    queue<T> _q;//队列int _capacity;//阻塞队列的容量pthread_mutex_t _mutex;pthread_con_t _consume_cond;pthread_con_t _produce_cond;

由于这个阻塞队列为临界资源,并且生产者productor线程和消费者consumer线程都要访问阻塞队列,所以我们需要一把锁mutex来保护阻塞队列,保证线程互斥。即生产者和生产者,消费者和消费者,生产者和消费者之间均为互斥关系。

同时,当队列为空时,消费者需要到等待队列排队等待直到生产者向阻塞队列添加元素,同理,当队列为满时,生产者需要到等待队列排队等待直到消费者从阻塞队列获取元素,因为生产者和消费者排队等待的并不是同一个队列,所以我们需要两个队列,即两个条件变量produce_cond和consume_cond。

BlockQueue.hpp完整代码

#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class blockqueue
{public:blockqueue(int capacity):_capacity(capacity),_q(capacity){pthread_mutex_init(&mutex,nullptr);pthread_cond_init(&consume_cond,nullptr);pthread_cond_init(&produce_cond,nullptr);}~blockqueue(){pthread_mutex_destroy(&mutex);pthread_cond_destroy(&consume_cond);pthread_cond_destroy(&produce_cond);}void push(T val){pthread_mutex_lock(&mutex);//加锁while(_q.size()==_capacity)//如果队列满了,进入生产者等待队列阻塞{pthread_cond_wait(&produce_cond,&mutex);}_q.push(val);pthread_cond_signal(&consume_cond);//唤醒消费者pthread_mutex_unlock(&mutex);//解锁}T pop(){pthread_mutex_lock(&mutex);//加锁while(_q.empty())//如果队列为空,进入消费者等待队列阻塞{pthread_cond_wait(&consume_cond,&mutex);}T val=_q.front();_q.pop();pthread_cond_signal(&produce_cond);//唤醒生产者pthread_mutex_unlock(&mutex);//解锁return val;}private:queue<T> _q;//队列int _capacity;//阻塞队列的容量pthread_mutex_t _mutex;pthread_con_t _consume_cond;pthread_con_t _produce_cond;
};

我们写一个main.cpp来测试基于阻塞队列BlockQueue实现的生产者消费者模型,main.cpp如下

#include "BlockQueue.hpp"
#include <string>
#include <ctime>
#include <unistd.h>
struct ThreadData
{blockqueue<int> *bq;pthread_t tid;string name;
};
void *ProductorRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){sleep(1);int num = rand() % 10 + 1;td->bq->push(num);cout << "[" << td->name << "] push: " << num << endl;}
}
void *ConsumerRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){int num = td->bq->pop();cout << "[" << td->name << "] pop: " << num << endl;}
}
int main()
{srand(time(nullptr));blockqueue<int> bq;ThreadData productors[3], consumers[3];for (int i = 0; i < 3; i++){sleep(1);productors[i].bq = &bq;productors[i].name = "Productor-" + to_string(i + 1);pthread_create(&productors[i].tid, nullptr, ProductorRoutine, (void *)&productors[i]);}for (int i = 0; i < 3; i++){consumers[i].bq = &bq;consumers[i].name = "Consumer-" + to_string(i + 1);pthread_create(&consumers[i].tid, nullptr, ConsumerRoutine, (void *)&consumers[i]);}for (int i = 0; i < 3; i++){pthread_join(productors[i].tid, nullptr);pthread_join(consumers[i].tid, nullptr);}return 0;
}

makefile如下

main.exe:main.cpp BlockQueue.hppg++ -o $@ $^ -l pthread
.PHONEY:clean
clean:rm -f main.exe

编译运行后结果如下

生产者往阻塞队列里投放数据,消费者从阻塞队列里获取数据,并且保持了生产者和生产者,消费者和消费者,生产者和消费者之间的互斥关系以及生产者和生产者,消费者和消费者之间的同步关系。

3. 环形队列RingQueue

环形队列是另一种用于生产者消费者模型的数据结构,它通过循环利用固定大小的数组来模拟无限大的队列。在环形队列中,我们需要使用POSIX信号量,用于控制生产者和消费者对缓冲区的访问,确保在任何时刻只有一个生产者或消费者能够修改缓冲区中的数据。与阻塞队列不同的是,环形队列通过使用POSIX信号量允许生产者和消费者同时进行生产和消费操作,从而提高资源利用率。

POSIX信号量

POSIX信号量是一种同步机制,可以用来实现生产者消费者模型中的同步。信号量本质上是一个计数器,表示某种资源的可用数量,用于控制对共享资源的访问。生产者和消费者可以通过增加或减少信号量的值来协调对数据缓冲区的访问。 

例如,当一个线程想要访问资源时,它会检查信号量的计数器。如果计数器大于 0,则表示有可用资源,线程可以访问资源并使计数器减 1。如果计数器为 0,则表示没有可用资源,线程会进入等待状态,直到有其他线程释放资源;当一个线程完成对资源的访问后,它会将计数器加 1,并唤醒一个等待的线程。

POSIX信号量的使用

POSIX信号量的类型为 sem_t ,在使用POSIX信号量时,通常需要包含头文件 <semaphore.h>,并且在编译时链接pthread库,以支持多线程操作。

sem_init
int sem_init(sem_t *sem, int pshared, unsigned int value);

sem_init 函数用于初始化一个POSIX信号量。

参数:

  • sem:指向要初始化的信号量的指针。
  • pshared:一个标志位,用于指定信号量的类型。如果pshared为0,信号量是进程本地的,只能在单个进程内部的线程之间共享;如果pshared非0,信号量是进程共享的,可以在不同进程之间共享。
  • value:信号量的初始值。

函数调用成功时返回0,失败时返回-1,并设置errno以指示错误。

sem_destroy
int sem_destroy(sem_t *sem);

sem_destroy函数用于销毁一个POSIX信号量。

  • sem:指向要销毁的信号量的指针。

函数返回0表示成功,非0值表示失败,并设置errno以指示错误。

sem_wait & sem_trywait
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);

sem_wait函数用于请求信号量,如果信号量的值大于0,则函数将信号量的值减1并立即返回。如果信号量的值为0,则当前线程将被阻塞,直到信号量的值变为正数. 

sem_trywait函数是sem_wait的非阻塞版本,它尝试减少信号量的值。如果信号量的当前值大于0,则函数成功并将信号量的值减1。如果信号量的当前值为0,则sem_trywait不会阻塞调用线程,而是立即返回一个错误码EAGAIN,表示操作不能立即执行.

sem_post
int sem_post(sem_t *sem);

sem_post函数用于释放信号量并增加信号量的值,并唤醒一个等待该信号量的线程。当信号量的值增加后,如果有线程因为 sem_wait 或 sem_trywait 而被阻塞,这些线程中的至少一个将被唤醒并重新竞争信号量。

sem_getvalue
int sem_getvalue(sem_t *sem, int *sval);

sem_getvalue函数用于获取信号量的当前值,并将该值存储在提供的整数指针指向的变量中。

  • sem:指向要查询的信号量的指针。
  • sval:指向整数的指针,用于接收信号量的当前值。

函数返回0表示成功,返回-1表示出现错误,并设置errno以指示错误类型.

环形队列的实现

上图是一个环形队列,生产者与消费者都绕着环顺时针走,黑色代表这个位置有数据,白色代表这个位置没有数据。生产者在前面生产,消费者跟在后面消费,这就是基于环形队列的生产消费模型。

RingQueue的成员变量

        vector<T> _q;//数组模拟环形队列int _capacity;//队列容量int _productor_index;//生产者当前位置int _consumer_index;//消费者当前位置pthread_mutex_t _productor_mutex;//生产者锁pthread_mutex_t _consumer_mutex;//消费者锁sem_t _space_sem;sem_t _data_sem;

由于生产者只需要关注环形队列剩余空间数量,消费者只需要关注剩余数据数量,所以在这个模型中,通常会有两个信号量:一个用于控制生产者对环形队列剩余空间的访问 space_sem(通常初始化为环形队列大小),另一个用于控制消费者对环形队列剩余数据的访问 data_sem(初始化为0)。

就上图来说: space_sem = 4,data_sem = 4

  • 只有 space_sem 的值大于0,表示还有空间可以放数据,此时生产者才可以生产
  • 只有 data_sem 的值大于0,表示当前有数据可以读取,此时消费者才可以消费

所以环形队列的判空和判满问题也能很好解决

而且在该种情况下,我们就可以直接保证了生产者和消费者之间的互斥关系,因为只有当环形队列为空或者为满的时候生产者和消费者的位置才重合在一起,而为空时,data_sem == 0,消费者不能再获取数据,为满时,space_sem == 0,生产者不能在添加数据,这样就天然维护了生产者和消费者之间的互斥关系。

而我们还要维护生产者和生产者,消费者和消费者之间互斥关系,就需要两把锁 productor_mutex 和 consumer_mutex,这样生产者和消费者就可以同时访问同一个队列的不同部分,这就可以让生产和消费同时进行提高效率。

 RingQueue.hpp完整代码

#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
using namespace std;
template <class T>
class ringqueue
{
public:ringqueue(int capacity = 5): _capacity(capacity), _productor_index(0), _consumer_index(0),{_q.resize(capacity);pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr);sem_init(&_space_sem, 0, capacity);sem_init(&_data_sem, 0, 0);}~ringqueue(){pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);sem_destroy(&_space_sem);sem_destroy(&_data_sem);}void push(T data){// 因为信号量本身是互斥的,所以可以把申请信号量放在加锁之前sem_wait(&_space_sem);                   // 空间不足时等待pthread_mutex_lock(&_productor_mutex);   // 申请锁_q[_productor_index++] = data;           // 添加数据_productor_index %= _capacity;           // 更新位置pthread_mutex_unlock(&_productor_mutex); // 释放锁sem_post(&_data_sem);                    // 数据添加完成,通知消费者}T pop(){sem_wait(&_data_sem);                   // 数据不足时等待pthread_mutex_lock(&_consumer_mutex);   // 申请锁T data = _q[_consumer_index++];         // 取出数据_consumer_index %= _capacity;           // 更新位置pthread_mutex_unlock(&_consumer_mutex); // 释放锁sem_post(&_space_sem);                  // 空间释放完成,通知生产者return data;}private:vector<T> _q;                     // 数组模拟环形队列int _capacity;                    // 队列容量int _productor_index;             // 生产者当前位置int _consumer_index;              // 消费者当前位置pthread_mutex_t _productor_mutex; // 生产者锁pthread_mutex_t _consumer_mutex;  // 消费者锁sem_t _space_sem;sem_t _data_sem;
};

同理,我们把main.cpp的代码改用RingQueue.hpp来测试一下

main.cpp如下

#include "BlockQueue.hpp"
#include "RingQueue.hpp"
#include <string>
#include <ctime>
#include <unistd.h>
struct ThreadData
{ringqueue<int> *rq;pthread_t tid;string name;
};
void *ProductorRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){sleep(1);int num = rand() % 10 + 1;td->rq->push(num);cout << "[" << td->name << "] push: " << num << endl;}
}
void *ConsumerRoutine(void *arg)
{ThreadData *td = static_cast<ThreadData *>(arg);while (true){int num = td->rq->pop();cout << "[" << td->name << "] pop: " << num << endl;}
}
int main()
{srand(time(nullptr));ringqueue<int> rq;ThreadData productors[3], consumers[3];for (int i = 0; i < 3; i++){sleep(1);productors[i].rq = &rq;productors[i].name = "Productor-" + to_string(i + 1);pthread_create(&productors[i].tid, nullptr, ProductorRoutine, (void *)&productors[i]);}for (int i = 0; i < 3; i++){consumers[i].rq = &rq;consumers[i].name = "Consumer-" + to_string(i + 1);pthread_create(&consumers[i].tid, nullptr, ConsumerRoutine, (void *)&consumers[i]);}for (int i = 0; i < 3; i++){pthread_join(productors[i].tid, nullptr);pthread_join(consumers[i].tid, nullptr);}return 0;
}

测试结果如下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/459005.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PlantUML在IDEA中使用

1.打开settings,搜索PlantUML Integration并下载 2.安装并重启IDEA 3.学习相关的语法即可进行使用

Java之多线程的实现(创建)(3种实现方式)(面试高频)

目录 一、多线程的3种实现方式 &#xff08;1&#xff09;继承Thread类。 &#xff08;2&#xff09;实现Runnable接口。&#xff08;void run()&#xff1a;该方法无返回值、无法抛出异常&#xff09; &#xff08;3&#xff09;实现Callable接口。&#xff08;V call() throw…

企业如何吸引稀缺的高技能员工

高技能员工的稀缺性和招聘难度日益凸显&#xff0c;其原因主要在于技术发展迅速、人才供需失衡、企业竞争加剧。其中&#xff0c;技术发展迅速导致人才培养跟不上市场需求&#xff0c;使得高技能人才更加稀缺。以人工智能领域为例&#xff0c;新技术层出不穷&#xff0c;相关人…

【MySQL】MySQL数据库中密码加密和查询的解决方案

本篇博客是为了记录自己在遇到password函数无法生效时的解决方案。通过使用AES_ENCRYPT(str,key)和AES_DECRYPT(str,key)进行加密和解密。 一、问题 自己想创建一个user表&#xff0c;user表中有一个password属性列&#xff0c;自己想对密码进行加密后再存入数据库&#xff0c…

java质数的判断 C语言指针变量的使用

1. public static void main(String[] args) {Scanner scnew Scanner(System.in);System.out.println("请输入一个值");int num sc.nextInt();boolean flagtrue;for (int i2;i<num;i){if (num%i0){flagfalse;break;}}if (flag){System.out.println(num"是一…

Midjourney 3D:探索未来沉浸式体验的无限可能

一、Midjourney 3D:开启沉浸式新时代 最近,Midjourney宣布即将推出一款全新的3D产品,这不仅仅是一次简单的3D生成技术的升级,而是一场革命。这款新产品将基于先进的光场技术,而非传统的3D网格模型,为用户提供前所未有的沉浸式体验。用户不仅可以“跳入”生成的场景中自由…

CasPL: Cascade Prompt Learning for Vision-Language Model Adaptation

文章汇总 当前的问题 目前可学习的提示符号主要用于适应任务的单一阶段(即适应提示)&#xff0c;容易导致过度拟合风险。 动机 提示符将分两个阶段逐步优化。在初始阶段&#xff0c;学习增强提示&#xff0c;**通过使用大量未标记的领域图像数据对齐其预测逻辑&#xff0c;从…

【文献及模型、制图分享】基于投入品减量增效视角的长江经济带农业生产绿色化演进研究

文献介绍 绿色化转型是农业可持续发展研究的重要议题。以农业生产绿色化转型过程的理论分析为基础&#xff0c;运用文献调查、访谈与问卷调查、脱钩分析相结合的方法&#xff0c;研究了长江经济带农业生产绿色化转型过程和投入品减量增效的趋势。 结果表明&#xff1a; 2015…

记录一个容器间访问不通问题

docker-compose装了zookeeper和一个服务。 zk服务如下&#xff1a; szxc-zk:image: "image.sd001.cn:30003/base/zookeeper:3.8"privileged: trueenvironment:- "TZAsia/Shanghai"#- "ALLOW_ANONYMOUS_LOGINyes"- "ZOO_MY_ID1"- &qu…

redis详细教程(3.ZSet,Bitmap,HyperLogLog)

ZSet Redis 的 ZSet&#xff08;有序集合&#xff09;是一种特殊的数据类型&#xff0c;它允许存储一系列不重复的字符串元素&#xff0c;并为每个元素关联一个分数&#xff08;score&#xff09;。这个分数用于对集合中的元素进行排序。ZSet 的特点是&#xff1a; 唯一性&am…

【Windows】电脑端口明明没有进程占用但显示端口被占用(动态端口)

TOC 一、问题 重启电脑后&#xff0c;启用某个服务显示1089端口被占用。 查看是哪个进程占用了&#xff1a; netstat -aon | findstr "1089"没有输出&#xff0c;但是换其他端口&#xff0c;是可以看到相关进程的&#xff1a; 现在最简单的方式是给我的服务指定另…

RHCE的学习(8)

动态网站 lnmp&#xff08;LAMP&#xff09; 解析index.php界面 &#xff08;1&#xff09;预配&#xff0c;确保服务能够被访问 systemctl stop firewalld setenforce 0 &#xff08;2&#xff09;安装nginx服务 mount /dev/sr0 /mnt cat /etc/yum.repos.d/base.repo dnf …

【待学习 】 DHTMLX Gantt

DHTMLX Gantt是一个开源 JavaScript 甘特图库&#xff0c;可以帮助您以美观的图表形式说明和管理项目计划。 它可以将任务之间的依赖关系显示为线条&#xff0c;并允许您设置任务之间的不同关系&#xff08;完成-开始、开始-开始、完成-完成、开始-完成&#xff09;。标准版还…

一二三应用开发平台自定义查询设计与实现系列2——查询方案功能实现

查询方案功能实现 上面实现了自定义查询功能框架&#xff0c;从用户角度出发&#xff0c;有些条件组合可以形成特定的查询方案&#xff0c;对应着业务查询场景。诸多查询条件的组合&#xff0c;不能每次都让用户来设置&#xff0c;而是应该保存下来&#xff0c;下次可以直接使…

一文解决单调栈的应用

单调栈的定义&#xff1a; 单调栈是栈的一中特殊形式&#xff0c;在栈中的元素必须满足单调性&#xff08;一定是单调上升或单调下降等等的规律&#xff09;。 单调栈的性质&#xff1a; 单调栈解决的问题 单调栈解决的常见问题&#xff1a;给定一个序列&#xff0c;求每个位置…

css绘制s型(grid)

在之前有通过flex布局实现了s型布局&#xff0c;是通过截取数组形式循环加载数据 这次使用grid直接加载数据通过css实现 <div id"app"><template v-for"(item,inx) in items"><div class"row"><template v-for"(ite…

SpringBoot 集成RabbitMQ 实现钉钉日报定时发送功能

文章目录 一、RabbitMq 下载安装二、开发步骤&#xff1a;1.MAVEN 配置2. RabbitMqConfig 配置3. RabbitMqUtil 工具类4. DailyDelaySendConsumer 消费者监听5. 测试延迟发送 一、RabbitMq 下载安装 官网&#xff1a;https://www.rabbitmq.com/docs 二、开发步骤&#xff1a;…

微信小程序美团点餐

引言&#xff1a;外卖已经成为了都市人的必备&#xff0c;在无数个来不及&#xff08;懒得&#xff09;做饭的时刻拯救孤单寂寞的胃。美团外卖无疑是外卖届的领头羊&#xff0c;它的很多功能与设计都值得我们学习。本文将从五个方面&#xff0c;对美团外卖展开产品分析&#xf…

vue封装信号强度

图标下载链接: https://pan.baidu.com/s/1828AidkCKU1KTkw1SvBwQg?pwd4k7n 共五格信号 信号5为绿色&#xff0c;信号4为绿色&#xff0c;信号3为黄色&#xff0c;信号2为黄色&#xff0c;信号1为红色&#xff0c;信号0为灰色。 子组件 /components/SignalStrength/index.vu…

【Python爬虫实战】深入解析 Selenium:从元素定位到节点交互的完整自动化指南

#1024程序员节&#xff5c;征文# &#x1f308;个人主页&#xff1a;易辰君-CSDN博客 &#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/2401_86688088/category_12797772.html ​ 前言 Selenium 是进行网页自动化操作的强大工具&#xff0c;在测试、数据抓取、用户行…