40 生产者消费者模型

生产者消费者模型

概念

为何要使用生产者消费者模型,这个是用过一个容器解决生产者和消费的强耦合问题。生产者和消费者之间不需要通讯,通过阻塞队列通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列取,阻塞队列相当于缓冲区,平衡了双方的能力,用来解耦的
在这里插入图片描述

上面超市的例子。消费者需要泡面的话不用去找供货商要货,而是去超市取。如果找供货商,消费者只需要一包,供货商开启生产设备只生产一包,多次这样很浪费效率也不高。超市作为存储,需要一万包泡面,供货商生产1万包摆到超市里,将超市塞满,缓存起来,调整供货商和消费者的速度不一致导致的效率问题。供货商就可以休息下来。消费者需要几包去超市取,支持了一种忙闲不均的状态。供货商关注超市有多少空位置,需要多少货,消费者关注现有商品的数量。供货商在生产的时候,和消费者没关系,消费者购买的时候和供应商也没关系,双方不需要互相考虑,只完成自己的事情,就减少了依赖性,解耦。

在计算机里,生产者和消费者都是由线程承担,超市是一种特殊结构的内存空间,这个结构是一种共享资源,整个过程就是执行流在通信,如何安全高效的通信。共享资源就有并发的问题,这种并发有三种关系:

生产者和生产者:互斥关系。一个在供货的时候另一个需要等待
生产者和消费者:互斥和同步关系。如果供货商正在摆一个商品,消费者有没有得到。有一种不确定性,生产者要确定,数据安全,只有生产了和没有生产,消费者一定可以得到货物。供货商不停联系超市需不需要货,超市已经满了还在不停询问,占用了消费者询问的机会,导致消费者饥饿问题。所以要同步,保证顺序性。供货商刚供货一次再询问时,告知一段时间之内不要询问,消费者询问没有商品时,也告知没有并一段时间不要询问,安全才是本质
消费者和消费者:互斥关系

321原则
3种关系2种角色1个交易场所
3种关系,生产者和消费者之间互相搭配
2种角色:生产和消费
1个交易场所:特定结构的内存空间

优点:
1.支持忙闲不均
2.支持并发
3.生产和消费进行解耦

什么是解耦。main函数内调用一个函数,传入参数,需要等到函数返回才能继续往下执行,可以将两个分开为线程,参数用一段空间缓存,放到缓冲区里,函数调用时在空间里取,这就是解耦

基于BlockingQueue的生产者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
类似于管道

在这里插入图片描述

类的设计
首先需要数据存储的结构,这个用队列,一个容量设置为队列允许存放的最多数量。对队列的访问同一时间只能有一方,所以需要一个锁。类提供存入数据和取出数据的功能,生产者关心的是还能放多少数据,如果大于最大容量就要停止,所以要判断队列的现有数量,这是对共享资源的访问,加锁和释放锁,判断大于容量时就去条件变量队列等待,同样,消费者取物品也需要一个条件变量,消费者判断有没有商品,没有就到消费者的条件变量等待。当生产者生产出一个商品放入后就唤醒消费者取,消费者取完唤醒生产者生产

#include <queue>
#include <pthread.h>template<class T>
class BlockQueue
{static const int defaultnum = 20;
public:BlockQueue(int cap = defaultnum){_maxcap = cap;pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);_lowwater = _maxcap / 3;_highwater = _maxcap / 3 * 2;}void push(const T& x){pthread_mutex_lock(&_mutex);if (_que.size() == _maxcap) //防止被伪唤醒的状态 {pthread_cond_wait(&_pcond, &_mutex); //调用,自动释放锁}_que.push(x);  //确保生产条件满足才能生产//if (_que.size() > _highwater)pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_mutex);}T pop(){pthread_mutex_lock(&_mutex);if (_que.size() == 0){pthread_cond_wait(&_ccond, &_mutex);}T tmp = _que.front();_que.pop();//if (_que.size() < _lowwater)pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_mutex);return tmp;}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}private:std::queue<T> _que;int _maxcap;   //最大容量pthread_mutex_t _mutex;pthread_cond_t _pcond;pthread_cond_t _ccond;int _lowwater;int _highwater; //控制水位线
};

阻塞队列设置为了模板,不只可以放入内置类型,也可以是自定义类型。弄一个任务类,有两个操作数,操作符加减乘除随机。一个变量记录结果,一个记录可靠性,如果有除0错误设置为对应值。提供返回string类型整个表达式的内容功能

#pragma once
#include <stdio.h>
#include <string>enum
{DIVZERO = 1,UNKNOW
};
std::string g_op = "+-*/";
struct task
{
public:task(int a, int b, char op):_a(a), _b(b), _op(op), _result(0), _exitcode(0){}void run(){switch(_op){case '+':_result = _a + _b;break;case '-':_result = _a - _b;break;case '*':_result = _a * _b;break;case '/':if (_b == 0){_exitcode = DIVZERO;}else{_result = _a / _b;}      break;default:_exitcode = UNKNOW;break;}//printf("%d+%d结果:%d\n", _a, _b, _a + _b);}std::string getresult(){std::string str = std::to_string(_a) + _op  + std::to_string(_b);str += "=";str += std::to_string(_result);str += " [exit:";str += std::to_string(_exitcode);str += "]";return str;}std::string gettask(){std::string str = std::to_string(_a) + _op  + std::to_string(_b);return str;}private:int _a;int _b;char _op;int _result;int _exitcode;
};

main文件生成两个线程,生产和消费,传入阻塞队列的实例,生产出一个任务,消费者完成

#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <ctime>
#include "blockqueue.hpp"
#include "task.hpp"void *produce(void *bk)
{BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);while (true){int x1 = rand() % 10;usleep(10);int x2 = rand() % 10 + 1;char op = g_op[rand() % 4];task t(x1, x2, op);//生产printf("生产任务:%s\n", t.gettask().c_str());block->push(t);sleep(1);}
}void* consume(void* bk)
{BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);while (true){//消费task n = block->pop();n.run();printf("完成任务:%s\n", n.getresult().c_str());sleep(1);}
}int main()
{srand(time(NULL));pthread_t ptid, ctid;BlockQueue<task>* block = new BlockQueue<task>();pthread_create(&ptid, nullptr, produce, block);pthread_create(&ctid, nullptr, consume, block);while (true){sleep(1);}delete block;return 0;
}

结果:
在这里插入图片描述

伪唤醒

当队列里只剩一个位置的时候,生产者如果不小心唤醒了多个生产者。这时它们都会去竞争锁,拿到锁的线程去生产然后放入,接着释放锁。正常情况下,应该消费线程拿到这个锁取数据,但因为刚刚唤醒了多个线程,可能会抢到锁继续放入数据,这时就会超出最大容量出现错误。所以要将if处改为循环,释放锁后判断如果满了就调条件变量里休眠

多生产多消费

将上面的单生成单消费改为多生产多消费版本

#include <unistd.h>
#include <cstdlib>
#include <iostream>
#include <ctime>
#include "blockqueue.hpp"
#include "task.hpp"void *produce(void *bk)
{BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);while (true){int x1 = rand() % 10;usleep(10);int x2 = rand() % 10 + 1;char op = g_op[rand() % 4];task t(x1, x2, op);//生产printf("%p生产任务:%s\n", pthread_self(), t.gettask().c_str());block->push(t);sleep(1);}
}void* consume(void* bk)
{BlockQueue<task>* block = static_cast<BlockQueue<task>*>(bk);while (true){//消费task n = block->pop();n.run();printf("%p完成任务:%s\n", pthread_self(), n.getresult().c_str());//sleep(1);}
}int main()
{srand(time(NULL));BlockQueue<task>* block = new BlockQueue<task>();pthread_t ptid[3], ctid[5];for (int i = 0; i < 3; i++){pthread_create(&ptid[i], nullptr, produce, block);}for (int i = 0; i < 5; i++){pthread_create(&ctid[i], nullptr, consume, block);}for (int i = 0; i < 3; i++){pthread_join(ptid[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(ctid[i], nullptr);} delete block;return 0;
}

优势

虽然同一时间只能有一个执行流访问阻塞队列,多个生产者也只能有一个访问队列,那多个生产者和消费者的优势体现在什么地方。
生产者的数据从用户网络等地方获得,数据的获取也需要时间,当一个生产者往队列里放入数据时,其他生产者可以同时获取数据,后面只需要放入数据即可。消费者方拿到数据后要对数据加工处理,这部分也是需要花费时间,同样一个线程获取数据时,其他的可能正在处理获得的数据。所以说,这个模型提高了效率,并发程度,是高效的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/318497.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用Stream流方式合并两个list集合(部分对象属性重合)

一、合并出共有部分 package com.xu.demo.test;import java.util.Arrays; import java.util.List; import java.util.stream.Collectors;public class ListMergeTest1 {public static void main(String[] args) {List<User> list1 Arrays.asList(new User(1, "Alic…

docker 基础命令

docker 安装 更新系统 sudo apt update sudo apt -y dist-upgrade安装docker sudo apt-get -y install ca-certificates curl gnupg lsb-release sudo mkdir -p /etc/apt/keyrings curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/…

【Elasticsearch】安装配置与使用

1 前期准备 1.1 环境准备 麒麟ARM 64位操作系统 1.2 安装包准备 Elasticsearch下载地址: https://www.elastic.co/cn/downloads/elasticsearch 2 部署elasticsearch 2.1 创建es专用用户 注意&#xff1a;ES不能使用root用户来启动&#xff0c;必须使用普通用户来安装启…

【HTTP上】协议/域名/url/请求和响应/状态码/重定向

文章目录 0.应用层协议0.1HTTP协议 1.域名2.DNS3.访问浏览器4.URL搜索特殊字符如#&~ 5.万维网6.http请求和响应的格式6.1HTTP请求格式6.2HTTP响应格式6.3示例6.3模拟HTTP【框架】6.4查看请求或响应的工具FiddlerPostman 7.网页7.0对访问网页的认识7.1wget7.2新的认识7.3GET…

Java将文件目录转成树结构

在实际开发中经常会遇到返回树形结构的场景&#xff0c;特别是在处理文件系统或者是文件管理系统中。下面就介绍一下怎么将文件路径转成需要的树形结构。 在Java中&#xff0c;将List<String>转换成树状结构&#xff0c;需要定义一个树节点类&#xff08;TreeNode&#…

SQL注入漏洞--报错/union/布尔盲注/时间盲注

之前介绍了数据库的基本操作&#xff0c;今天这篇文章就来实操SQL注入。 阅读本文前可以先看一下基本操作&#xff0c;有助于更好理解本文。。。 https://blog.csdn.net/weixin_60885144/article/details/138356410?spm1001.2014.3001.5502 what SQL---结构化查询语言---S…

北京金融大数据有限公司X百望云签署战略合作协议 共同发布“金数数据要素流通云平台”

随着数据资产与数据要素相关政策密集出台&#xff0c;资本与实业企业均跃跃欲试。但因为没有龙头企业的方案引领和成熟的落地实践&#xff0c;市场呈谨慎观望态势&#xff0c;热度无处安放。 北京金融大数据有限公司&#xff08;以下简称“金融大数据公司”&#xff09;作为市…

excel怎么删除条件格式规则但保留格式?

这个问题的意思就是要将设置的条件格式&#xff0c;转换成单元格格式。除了使用VBA代码将格式转换外&#xff0c;还可以用excel自己的功能来完成这个任务。 一、将条件格式“留下来” 1.设置条件格式 选中数据&#xff0c;点击开始选项卡&#xff0c;设置条件格式&#xff0…

微服务保护和分布式事务(Sentinel、Seata)笔记

一、雪崩问题的解决的服务保护技术了解 二、Sentinel 2.1Sentinel入门 1.Sentinel的安装 &#xff08;1&#xff09;下载Sentinel的tar安装包先 &#xff08;2&#xff09;将jar包放在任意非中文、不包含特殊字符的目录下&#xff0c;重命名为 sentinel-dashboard.jar &…

STM32CubeMX+MDK通过I2S接口进行音频输入输出(全双工读写一个DMA回调)续-音质问题解决总结

一、前言 之前进行了STM32CubeMXMDK通过I2S接口进行音频输入输出&#xff08;全双工读写一个DMA回调&#xff09;的研究总结&#xff1a; https://juejin.cn/post/7339016190612881408#heading-34 后续音质问题解决了&#xff0c;目前测试下来48khz的双声道使用效果很好&…

Pytorch学习笔记——TensorBoard的初使用

1、TensorBoard介绍 TensorBoard是TensorFlow的可视化工具&#xff0c;但它也可以与PyTorch结合使用。TensorBoard提供了一个Web界面&#xff0c;可以展示你训练过程中的各种信息&#xff0c;如损失值、准确度、权重分布等&#xff0c;更好地帮助开发者理解和调试模型。 Tenso…

BJFUOJ-C++程序设计-实验3-继承和虚函数

A TableTennisPlayer 答案&#xff1a; #include<iostream> #include<cstring> using namespace std;class TableTennisPlayer{ private:string firstname;string lastname;bool hasTable;public:TableTennisPlayer(const string &, const string &, bool…

深入剖析Tomcat(五) 剖析Servlet容器并实现一个简易Context与Wrapper容器

上一章介绍了Tomcat的默认连接器&#xff0c;后续程序都会使用默认连接器。前面有讲过Catalina容器的两大块内容就是连接器与Servlet容器。不同于第二章的自定义丐版Servlet容器&#xff0c;这一章就来探讨下Catalina中的真正的Servlet容器究竟长啥样。 四种容器 在Catalina中…

实现优先队列——C++

目录 1.优先队列的类模板 2.仿函数的讲解 3.成员变量 4.构造函数 5。判空&#xff0c;返回size&#xff0c;返回队头 6.插入 7.删除 1.优先队列的类模板 我们先通过模板来进行初步了解 由上图可知&#xff0c;我们的模板里有三个参数&#xff0c;第一个参数自然就是你要存储的数…

读天才与算法:人脑与AI的数学思维笔记16_音乐图灵测试

1. 艾米 1.1. 人工智能作曲家 1.1.1. 分析机可能会生成任意复杂程度、精细程度的科学的音乐作品 1.1.1.1. 阿达洛夫莱斯 1.1.2. 巴赫的作品是大多数作曲家开始学习创作的起点&#xff0c;也是大多数计算机开始学习作曲的起点…

uniapp 短视频浏览组件(仿抖音、上滑下滑)组件 Ba-VideoSView

简介&#xff08;下载地址&#xff09; Ba-VideoSView 是一款uniapp短视频上划浏览组件&#xff0c;支持无限滑动加载&#xff0c;支持自定义界面&#xff08;功能遮罩&#xff09;,支持点播、直播。 支持无限滑动加载支持自定义界面&#xff08;遮罩&#xff09;支持监听上滑…

第 8 章 机器人平台设计(自学二刷笔记)

重要参考&#xff1a; 课程链接:https://www.bilibili.com/video/BV1Ci4y1L7ZZ 讲义链接:Introduction Autolabor-ROS机器人入门课程《ROS理论与实践》零基础教程 学习到当前阶段大家对ROS已经有一定的认知了&#xff0c;但是之前的内容更偏理论&#xff0c;尤其是介绍完第6…

KKView远程控制2.0版本发布,TeamViewer面临巨大挑战

KKView远程控制2.0版本发布&#xff0c;TeamViewer面临巨大挑战 近日&#xff0c;备受瞩目的远程控制软件KKView发布了其全新2.0版本&#xff0c;KKView以其独特的创新性和用户友好的设计&#xff0c;为远程办公、远程培训等领域提供了更加高效、便捷的解决方案。 KKView远程…

K8S 哲学 - deployment -- kubectl【create 、 rollout 、edit、scale、set】

kubectl create kubectl rollout kubectl edit kubectl set kubectl scale 1、创建与配置文件解析 2、deploy 滚动更新 &#xff1a;template 里面的内容改变触发滚动更新 编辑该 deploy 的 配置文件 &#xff0c;加入一个 label 不会触发滚动更新 改变 nginx镜…

分布式与一致性协议之Raft算法(一)

Raft算法 概述 Raft算法属于Multi-Paxos算法&#xff0c;它在兰伯特Multi-Paxos思想的基础上做了一些简化和限制&#xff0c;比如日志必须是连续的&#xff0c;只支持领导者(Leader)、跟随者(Follwer)和候选人(Candidate)3种状态。在理解和算法实现上&#xff0c;Raft算法相对…