基于 BlockQueue(阻塞队列) 的 生产者消费者模型

文章目录

  • 阻塞队列(BlockQueue)介绍
  • 生产者消费者模型 介绍
  • 代码实现
    • lockGuard.hpp()
    • Task.hpp(任务类)
    • BlockQueue.hpp(阻塞队列)
    • conProd.cc(生产者消费者模型 主进程)
  • 执行结果


阻塞队列(BlockQueue)介绍

阻塞队列(Blocking Queue)是一种特殊类型的队列,它具有阻塞操作的特性。在并发编程中,阻塞队列可以用于实现线程间的安全通信和数据共享。

阻塞队列的 主要特点 是:

  • 队列为空时,消费者线程尝试从队列中获取(出队)元素时会被阻塞,直到有新的元素被添加到队列中为止。
  • 队列已满时,生产者线程尝试向队列中添加(入队)元素时也会被阻塞,直到有空闲容量可用。

阻塞队列通常提供入队操作、出队操作以及获取队列大小等基本方法。

在这里插入图片描述

阻塞队列的实现在下文


生产者消费者模型 介绍

生产者消费者模型 是一种常用的 并发编程模型 ,用于解决多线程或多进程环境下的协作问题。该模型包含两类角色:生产者和消费者

生产者负责生成数据,并将数据存放到共享的缓冲区中。消费者则从缓冲区中获取数据并进行处理。生产者和消费者之间通过共享的缓冲区进行数据交互。

为了确保线程安全,生产者和消费者需要遵循一些规则

  1. 如果缓冲区已满,则生产者需要等待直到有空间可用。
  2. 如果缓冲区为空,则消费者需要等待直到有数据可用。
  3. 生产者和消费者都不能访问缓冲区的内部结构,只能通过特定的接口进行操作。

在这里插入图片描述


代码实现

在代码实现上,生产者消费者模型通常涉及以下几个 角色和操作

  1. 生产者(Producer):负责生成数据并将其放入共享的缓冲区。
  2. 消费者(Consumer):从共享的缓冲区中获取数据并进行处理。
  3. 缓冲区(Buffer):用于暂存生产者生成的数据,供消费者使用。
  4. 同步机制:用于确保生产者和消费者之间的协调和同步,以避免竞态条件和数据不一致性等问题。

我们将要实现的代码中:

阻塞队列 作为缓冲区Task任务类 由生产者生产传入阻塞队列,以便消费者拿去任务消费lockGuard 与条件变量 保证 生产者消费者之间的协调,同步。


lockGuard.hpp()

在 lockGuard.hpp中我们 实现了一个 需封装了互斥锁的Mutex类和一个 实现自动加解锁的lockGuard类

Mutex类封装了pthread_mutex_t类型的互斥锁, lockGuard类是一个RAII风格的加锁方式。

通过这种方式,lockGuard对象的生命周期和锁的生命周期绑定在一起,可以确保在任何情况下都能保证锁的正确释放,避免死锁等问题

完整代码:

#pragma once                                                                           #include <iostream>
#include <pthread.h>using std::cout; using std::endl;// Mutex类封装 pthread_mutex_t 互斥锁
class Mutex
{
public:// 构造 Mutex(pthread_mutex_t* mtx):_pmtx(mtx){}// 调用lock 进行加锁void lock(){cout << "进行加锁" << endl;pthread_mutex_lock(_pmtx);}// 调用unlock 进行解锁void unlock(){cout << "进行解锁" << endl;pthread_mutex_unlock(_pmtx);}~Mutex(){}private:pthread_mutex_t* _pmtx; 
};// RAII 风格的加锁方式
// 以实现自动加解锁
class lockGuard
{
public:// 构造lockGuard(pthread_mutex_t* mtx):_mtx(mtx){_mtx.lock();}// 析构~lockGuard(){_mtx.unlock();}private:Mutex _mtx;                                                                        
};

Task.hpp(任务类)

下面的代码 是一个简化的 任务封装类用于生产者产生任务并将其放入阻塞队列,供消费者取出并执行。通过将函数与参数打包成任务,实现了任务的传递和执行。

#pragma once#include <iostream>
#include <functional>                                                                  // 表示一个函数类型。    
// func_t是一个接受两个整数参数并返回整数的函数类型    
typedef std::function<int(int, int)> func_t;    // 任务类型: 用于生产者产生任务
class Task    
{    
public:    Task(){};    // 传入三个参数x,y,以及一个函数,task则执行func(x, y)    Task(int x, int y, func_t func):_x(x),_y(y),_func(func)    {}    // 用于执行任务。在函数体内部,会调用存储在 _func 中的函数对象,// 并将 _x 和 _y 作为参数传递给这个函数对象。// 最后 返回执行结果。int operator()()    {    return _func(_x,_y);    } public:// 用作函数参数int _x;int _y;func_t _func;
};    

BlockQueue.hpp(阻塞队列)

对 阻塞队列 进行类的实现:

BlockQueue包含以下成员变量

std::queue<T> _bq;   // 阻塞队列
int _capacity; // 容量上限
pthread_mutex_t _mtx;   // 互斥锁: 保证队列安全
pthread_cond_t _empty; // 表示bq是否为空
pthread_cond_t _full; // 表示bq是否为满 

以及除构造函数/析构函数外的以下 BlockQueue包含以下成员函数

bool isQueueEmpty() // 判断队列是否为空
bool isQueueFull() // 判断队列是否为满
void push(const T &in) // 生产者用于制造任务
void pop(const T &in) // 消费者用于消耗任务

完整代码:

#pragma once#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>#include "lockGuard.hpp"const int gDefaultCap = 5; // 作为阻塞队列的默认容量// 阻塞队列
template <class T>
class BlockQueue
{
private:// 判断队列是否为空bool isQueueEmpty(){return _bq.size() == 0;}// 判满bool isQueueFull(){return _bq.size() == _capacity; // 当size == _capacity 证明队列已满}public:// 构造BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){// 初始化互斥锁 && 条件变量pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_empty, nullptr);pthread_cond_init(&_full, nullptr);}// 析构~BlockQueue(){// 销毁 互斥锁 && 条件变量pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}// 生产者进程void push(const T &in){// 创建一个lockGuard 变量lockGuard lockguard(&_mtx);while(isQueueFull()){   // 如果此时阻塞队列为满,进程等待,直到有空位时改变_fullpthread_cond_wait(&_full, &_mtx);}// 此时阻塞队列有空位,正常插入元素,并_bq.push(in);pthread_cond_signal(&_empty); // 发送信号,表示队列不再为空pthread_mutex_unlock(&_mtx);}// 消费者进程void pop(T *out){lockGuard lockguard(&_mtx);// pthread_mutex_lock(&mtx_);while (isQueueEmpty())  // 如果队列为空,等待生产者制造任务pthread_cond_wait(&_empty, &_mtx);// 此时队列内有任务,*out = _bq.front(); // 拿_bq的头部元素,并执行pop(拿任务+销毁)_bq.pop();pthread_cond_signal(&_full);pthread_mutex_unlock(&_mtx);}private:std::queue<T> _bq;   // 阻塞队列int _capacity; // 容量上限pthread_mutex_t _mtx;   // 互斥锁: 保证队列安全pthread_cond_t _empty; // 表示bq是否为空pthread_cond_t _full; // 表示bq是否为满 
};

conProd.cc(生产者消费者模型 主进程)

该文件中包含以下函数:

  • myAdd 函数:一个简单的加法函数,即实际执行任务所执行的函数

  • consumer 函数消费者线程的执行函数。该函数从阻塞队列中获取任务,并执行任务的函数。

  • productor 函数生产者线程的执行函数。该函数随机生成两个整数参数,创建一个任务对象,并将任务对象插入到阻塞队列中。

  • main 函数主函数,用于创建并启动多个消费者线程和生产者线程。通过调用 pthread_create 创建线程,并通过 pthread_join 等待线程结束。

完整代码:

#include "blockQueue.hpp"                                                           
#include "Task.hpp"    
#include <pthread.h>
#include <unistd.h> // 加法函数,用于生产者进程产生任务
int myAdd(int x, int y)    
{    return x + y;    
}    // 消费者进程
void *consumer(void* args)    
{  // 将获得的agrs 参数 强制转化为BlockQueue<Task>* 类型 并赋值给变量bqueueBlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;while(true){// 获取任务Task t;bqueue->pop(&t); // 执行任务 + 销毁// 打印任务信息,因为我们使用的仅仅是一个加法函数,所以直接打印"+"cout << pthread_self() << " consumer: " << t._x << " + " << t._y << " = " << t() << endl;}return nullptr;
}// 生产者进程
void* productor(void* args)
{BlockQueue<Task>* bqueue = (BlockQueue<Task>*)args;while(true){// 制造任务// 生产者将任务传到缓冲区,消费者再将其消耗// 任务不一定有生产者制造,也可能通过外部获得// 随机产生x, y两个参数,执行Taskint x = rand() % 10 + 1;usleep(rand() % 1000);  int y = rand() % 5 + 1;Task t(x, y, myAdd);// 发送任务bqueue->push(t);// 输出消息cout << pthread_self() << " productor: " << t._x << " + " << t._y << " = ?" << endl;sleep(1);}return nullptr;
}int main()
{// getpid():获取当前进程的进程ID(PID),用于区分不同的进程。// 0x11451 用于增加种子的随机性srand((uint64_t)time(nullptr) ^ getpid() ^ 0x11451);BlockQueue<Task>* bqueue = new BlockQueue<Task>();pthread_t con[2], pro[2]; // 声明两个消费者 / 生产者,增加并行性// 可以将 &con[1] 换为 con+1pthread_create(&con[0], nullptr, consumer, bqueue);pthread_create(&con[1], nullptr, consumer, bqueue);pthread_create(&pro[0], nullptr, productor, bqueue);pthread_create(&pro[1], nullptr, productor, bqueue);// 执行完毕,等待进程销毁pthread_join(con[0], nullptr);pthread_join(con[1], nullptr);pthread_join(pro[0], nullptr);pthread_join(pro[1], nullptr);delete bqueue; // 销毁队列return 0;
}

执行结果

在这里插入图片描述

根据上面的执行结果,可以看出,程序先连续生产(即加锁信息的打印),阻塞队列满了后开始消费,后面重复 生产消费(即加锁解锁)的步骤。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/100763.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pytest自动化框架运行全局配置文件pytest.ini

还记得在之前的篇章中有讲到Pytest是目前主要流行的自动化框架之一&#xff0c;他有基础的脚本编码规则以及两种运行方式。 pytest的基础编码规则是可以进行修改&#xff0c;这就是今日文章重点。 看到这大家心中是否提出了两个问题&#xff1a;pytest的基础编码规则在哪可以…

探索高效的HTTP异步接口测试方法:从轮询等待到自动化方案

本文将深入探讨HTTP异步接口测试的多个方面&#xff0c;包括轮询等待、性能测试以及自动化方案。通过详细的解释和实际案例&#xff0c;帮助您了解如何有效地测试异步接口&#xff0c;确保系统的稳定性和性能。 在现代软件开发中&#xff0c;HTTP异步接口扮演着至关重要的角色&…

QCustomPlot绘制多条曲线在不同的位置

ui->setupUi(this);QCPLayoutGrid* layout ui->customPlot->plotLayout();//把之前的布局清除layout->clear();//设置行间距layout->setRowSpacing(0);layout->setColumnSpacing(0);// 2. 准备数据QVector<double> x(101), y(101);for (int i 0; i &…

设计模式之代理模式(Proxy)的C++实现

1、代理模式的提出 在组件的开发过程中&#xff0c;有些对象由于某种原因&#xff08;比如对象创建的开销很大&#xff0c;或者对象的一些操作需要做安全控制&#xff0c;或者需要进程外的访问等&#xff09;&#xff0c;会使Client使用者在操作这类对象时可能会存在问题&…

Bigemap在地质工程勘察行业中的应用

Bigemap在地质工程勘察行业中的应用 选择Bigemap的原因&#xff1a; 师兄在测绘局工作&#xff0c;买过全能版&#xff0c;帮我下载过高程数据&#xff0c;我觉得效果可以&#xff0c;于是联系到软件公司进行试用、咨询 使用场景&#xff1a; 影像、等高线、地形等资料下载&…

七夕表白前端代码

七夕表白前端代码&#xff0c;话不多说直接上代码&#xff1a; <!DOCTYPE html> <html> <head><title>七夕表白</title><style>body {text-align: center;font-family: Arial, sans-serif;}h1 {color: #e74c3c;}p {font-size: 18px;line-…

热烈祝贺甘肃峻茂成功入选航天系统采购供应商库

经过航天系统采购平台的严审&#xff0c;甘肃峻茂新材料科技有限公司成功入选中国航天系统采购供应商库。航天系统采购平台是航天系统内企业采购专用平台&#xff0c;服务航天全球范围千亿采购需求&#xff0c;目前&#xff0c;已有华为、三一重工、格力电器、科大讯飞等企业、…

数据分析15——office中的Excel基础技术汇总

0、前言&#xff1a; 这部分总结就是总结每个基础技术的定义&#xff0c;在了解基础技术名称和定义后&#xff0c;方便对相关技术进行检索学习。笔记不会详细到所有操作都说明&#xff0c;但会把基础操作的名称及作用说明&#xff0c;可自行检索。本文对于大部分读者有以下作用…

SpringBoot08——前端数据模拟MockJS+vue-element-admin后台集成

感觉用到再说吧 2. vue-element-admin后台集成 3.JWT跨域认证 看自己的demo2源码吧

NodeJs导出PDF

&#xff08;优于别人&#xff0c;并不高贵&#xff0c;真正的高贵应该是优于过去的自己。——海明威&#xff09; 场景 根据订单参数生成账单PDF 结果 示例代码 /* eslint-disable no-unused-vars */ /* eslint-disable no-undef */ /* eslint-disable complexity */ const…

LVS负载均衡集群-NAT模式部署

集群 集群&#xff1a;将多台主机作为一个整体&#xff0c;然后对外提供相同的服务 集群使用场景&#xff1a;高并发的场景 集群的分类 1.负载均衡器集群 减少响应延迟&#xff0c;提高并发处理的能力 2&#xff0c;高可用集群 增强系统的稳定性可靠性&…

【Python机器学习】实验15 将Lenet5应用于Cifar10数据集(PyTorch实现)

文章目录 CIFAR10数据集介绍1. 数据的下载2.修改模型与前面的参数设置保持一致3. 新建模型4. 从数据集中分批量读取数据5. 定义损失函数6. 定义优化器7. 开始训练8.测试模型 9. 手写体图片的可视化10. 多幅图片的可视化 思考题11. 读取测试集的图片预测值&#xff08;神经网络的…

BDA初级分析——SQL清洗和整理数据

一、数据处理 数据处理之类型转换 字符格式与数值格式存储的数据&#xff0c;同样是进行大小排序&#xff0c; 会有什么区别&#xff1f; 以rev为例&#xff0c;看看字符格式与数值格式存储时&#xff0c;排序会有什么区别&#xff1f; 用cast as转换为字符后进行排序 SEL…

陕西科技大学改考408!附考情分析

改考信息 8月14日&#xff0c;陕西科技大学公布了2024年硕士研究生招生目录&#xff08;初稿&#xff09;&#xff0c;其中不难发现083500软件工程初试专业课由819数据结构改为408计算机学科专业基础 图片&#xff1a;陕西科技大学24专业目录-软件工程学硕 https://yjszs.sus…

Docker容器:Docker-Compose

Docker容器&#xff1a;Docker-Compose 一.Docker-Compose概念 1.Docker-Compose使用场景 一个Dockerfile模板文件可以定义一个单独的应用容器&#xff0c;如果需要定义多个容器就需要服务编排。服务编排有很多种技术方案&#xff0c;今天是介绍 Docker 官方产品 Docker Com…

Redis 5环境搭建

一、环境搭建 如果是Centos8&#xff0c;yum 仓库中默认的 Redis版本就是5&#xff0c;直接yum install即可。如果是Centos7&#xff0c;yum 仓库中默认的 Redis版本是3系列&#xff0c;比较老~ 为了我们能在 Centos7中下载到 Redis5 首先要安装额外的软件源 sudo yum insta…

Python爬取斗罗大陆全集

打开网址http://www.luoxu.cc/dmplay/C888H-1-265.html F12打开Fetch/XHR&#xff0c;看到m3u8&#xff0c;ts&#xff0c;一眼顶真&#xff0c;打开index.m3u8 由第一个包含第二个index.m3u8的地址&#xff0c;ctrlf在源代码中一查index&#xff0c;果然有&#xff0c;不过/…

用加持了大模型的 Byzer-Notebook 做数据分析是什么体验

Byzer-Notebook 是专门为 SQL 而研发的一款 Web Notebook。他的第一公民是 SQL&#xff0c;而 Jupyter 则是是以 Python 为第一公民的。 随着 Byzer 引擎对大模型能力的支持日渐完善&#xff0c; Byzer-Notebook 也在不自觉中变得更加强大。我和小伙伴在聊天的过程中才发现他已…

负载均衡下的webshell

文章目录 1.场景描述2.在蚁剑里添加 Shell3.因为负载均衡而出现的问题4.问题解决方案4.1 方案14.2 方案24.3 方案3 1.场景描述 当前手里有一个以docker部署的Tomcat负载均衡环境。主机对外ip和端口为192.168.100.130:18080 我们假设其为一个真实的业务系统&#xff0c;存在一…

Ubuntu20.04安装SNMP服务

在线安装snmp 1.安装snmp服务 sudo apt-get install updatesudo apt-get install snmp snmpd snmp-mibs-downloader2.重启SNMP服务 sudo /etc/init.d/snmpd restart3.查看snmp配置 sudo grep -Ev ^$|^# /etc/snmp/snmpd.conf 离线安装SNMP &#xff08;重要&#xff09; 我…