零基础Linux_24(多线程)线程同步+条件变量+生产者消费模型_阻塞队列版

目录

1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

1.2 线程同步的概念

1.3 生产者消费者模型的优点

2. 线程同步的应用

2.1 条件变量的概念

2.2 条件变量操作接口

3. 生产者消费者模型_阻塞队列

3.1 前期代码(轮廓)

3.2 中期代码(简单使用)

BlockQueue.hpp:

3.3 生产者消费者模型高效的体现

3.4 后期代码(处理任务)

Task.hpp

ProdCon.cc

3.5 最终代码(RAII风格的锁)

lockGuard.hpp

BlockQueue.hpp

本篇完。


1. 线程同步和生产者消费者模型

1.1 生产者消费者模型的概念

以生活中消费者生产者为例:

生活中,我们大部分人都扮演着消费者的角色,会经常在超市买东西,比如买方便面,而超市的方便面是由供应商生成的。此时我们就是消费者,供应商就是生产者,而超市就是一个交易场所。

  • 将读取数据的线程叫做消费者线程
  • 将产生数据的线程叫做生产者线程
  • 将共享的特定数据结构叫做缓冲区

超市的供应商肯定不止一家,即使同一种商品的供应上也不止一家,不同牌子方便面的生产者它们之间的关系是竞争关系,竞争的表现就是互斥。

站在超市的角度,假设只有一块区域是买方便面的,当生产者来供货的时候,这块区域只能让一家供应商来供货,否则就会导致不同的东西混着放,对消费者来说很不友好。

  • 生产者线程和生产者线程之间是互斥关系
  • 在同一时间只能有一个生产者线程来访问缓冲区。

假设现在超市只有一包方便面了,但是同时来了好多消费者都要买方便面,此时这些消费者之间的关系也是竞争关系,我买上你就买不上了。所以当只有一包方便面的时候,只能有一个买方便面的消费者进入超市。

  • 消费者线程和消费者线程之间是互斥关系
  • 在同一时间只能由一个消费者线程来访问缓冲区。

再假设,超市的方便面卖完了,生产者正在给超市供货,而消费者也正在买方便面,那消费者到底买没买到方便面?有可能生产者刚把方便面搬下来,还没来及摆上去,那么消费者就没有买到,也由可能生产者把方便面摆上去了,那么消费者就买到了。所以最好的办法就是生产者供货的时候,不让消费者进来。

在Linux中,缓冲区存放的都是数据,数据是可以覆盖的,比如消费者线程在读取缓冲区中的数据时,数据是"hello world",刚刚读取完"hello",生产者线程把"world"改成了"rtx",那么消费者线程读取到的就成了"hello rtx",就出错了。所以最好的办法就是当消费者线程访问缓冲区的时候,生产者线程不能访问缓冲区。

  • 消费者线程和生产者线程之间是互斥关系
  • 在同一时间内只有一个线程可以访问缓冲区。

1.2 线程同步的概念

保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。(在计算机操作系统中,饥饿(Starvation)是指某些进程或线程由于资源分配不公平或调度策略不合理而无法获得所需资源或执行时间的现象。当某个进程或线程长时间无法满足其资源需求时,就会出现饥饿问题。)

就像上篇博客中抢票的互斥代码,在每个线程抢完票以后没有进行延时代替其他处理动作时,所有票都被一个线程抢了,其他线程没有机会抢。

由于竞争能力弱而缺乏调度的线程就处于饥饿状态。

而同步就是让所有线程按照一定顺序来抢票,做到尽量公平,避免线程饥饿问题产生。具体如何实现后面会详细讲解。

继续拿超市来说,生产者不能无休止的向超市中供货,否则消费者无法进来消费,最终方便面会放不下。同样,消费者也不能无休止来买方便面,否则生产者进不来,方便面就会卖完,而且没有人来供货。所以最好的办法就是生产者供货,当货架摆满了就不供货了,让消费者来买,当方便面卖完了再让生产者来供货,从而让消费者和生产者协同起来。

  • 消费者线程和生产者线程之间又是同步关系
  • 生产者线程和消费者线程按照一定顺序去访问缓冲区。

根据上面例子和分析,对于生产者消费模型的本质可以总结为123原则(非官方版)

  • 1个交易场所:一段特定结构的缓冲区(上面例子就是超市)
  • 2种角色:生产者和消费者(上面例子就是客人和供货商)
  • 3种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥+同步关系)。

1.3 生产者消费者模型的优点

有了超市这个交易场所,生产者只要给超市供大量的货即可,比如几万包方便面,不用关心是消费者什么时候来买,只需要专注自己的生成即可。

对于消费者而言,只需要直接去超市买方便面就行,也不用等待方便面的生产。

超市只需要做的就是方便面卖完了,告诉生产者来上货,然后告诉消费者来买。消费者和生产者完全独立,不存在任何交集。

生产者消费者模型实现了消费者线程和生产者线程之间的解耦。(低耦合,高内聚)

我们平时写的C/C++代码,如果将main函数看成是生产者,普通函数看出是消费者,那么它两就存在高度耦合。

比如main函数里调用func函数:当执行func函数的时候,main函数在等待,只有func执行完毕以后main函数才能继续执行下去。如果将这两个函数看出两个执行流,那么它们就存在高度耦合。

而生产者消费者模型就成功的让生产者执行流和消费者执行流解耦了,生产者只管向缓冲区生产数据,消费者只管从缓冲区消耗数据,不用关心对方的状态。

再比如大部分人在周一到周五上班,在周六日休息,上班时候时间比较少,去超市消费的人也比较少,由于消费者和生产者互斥,所以就可以让生产者在周一到周五的时候来上货。

当周六日消费者休息的时候,去超市消费的人就比较多,方便面也卖的比较快,但是由于生产者供货量足够,所以并不会因为买的人多了就不够了的情况。

生产者消费者模型有助于解决生产者线程和消费者线程忙闲不均的问题。因为缓冲区能够缓存一定量的数据。

再比如我们买东西肯定不会直接去找供应商,因为人家不零售,因为生产者如果零售的话,每次开机器就仅生成几包方便面,成本高,效率低。

对于消费者而言,直接去找生产者还需等待生成者完成商品生成,消耗时间成本高,效率也低。

生产者消费者模型提高了了生产者线程和消费者线程的执行效率。


2. 线程同步的应用

同步是为了让多线程按照一定顺序互斥访问临界资源,在上面的生产者消费者模型中,如何实现同步呢?就要涉及下面的条件变量。

2.1 条件变量的概念

条件变量:用来描述某种临界资源是否就绪的一种数据化描述

当一个线程互斥地访问某个临界资源时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如,存在一个共享的队列,生产者线程负责生产数据到队列中,消费者线程负责从队列中读取数据,当消费者线程发现队列为空时就不要再去竞争锁去访问了,而是应该等待,直到生产者线程将数据生成到队列中。

要想让消费者线程等待,就需要使用到条件变量。

那么条件变量是什么呢?继续拿超市举例:假设现在超出的架子上一次只放一包方便面,只有这包方便面被人买走了,才会放上新的方便面。

此时来了一堆消费者消费者都要买方便面,因为只有一包,所以只能去竞争了,那些竞争能力强的才能买上方便面,甚至不停的抢不停的买,此时那些竞争能力弱的消费者就会始终都买不到方便面。

竞争能力弱的消费者就会始终抢不到锁,就会产生饥饿问题。

为了维持秩序,超市的工作人员设置了一个等待区,所有消费者都在这里排队购买,方便面被摆出来了,工作人员让一个消费者进去买,没有摆出来就等着。如果消费者想买两包甚至多包,只能重新排队。

等待区及工作人员就相当于条件变量

多线程互斥访问一个临界资源时,为了让这些线程按照一定顺序访问,将这些线程都放在条件变量的等待队列中,当另一个线程让条件变量符合条件(唤醒线程)时,队列中的第一个线程就去访问临界资源。


2.2 条件变量操作接口

条件变量同样是由POSIX线程库维护的,所以使用的是POSIX标准,和互斥锁的接口非常相似。

创建条件变量:

pthread_cond_t cond;
  • 同样要加pthread_。同样是类似int a;
  • cond是英文condition的缩写。

条件变量的初始化,释放:man pthread_cond_init:

使用类似互斥锁,只是传递的参数是创建好的条件变量。

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

cond:要初始化的条件变量

attr:nullptr

返回值:成功返回0,失败返回错误码

man pthread_cond_wait:

int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
  • 第一个参数cond:创建的条件变量地址。
  • 第二个参数mutex:互斥锁的地址,这个必须有,后面再讲解为什么必须传锁。
  • 返回值:放入条件变量的等待队列成功返回0,失败返回错误码。

pthread_cond_wait的作用:调用该接口的线程放入传入条件变量的等待队列中。

唤醒条件变量等待队列中的一个线程:

man pthread_cond_signal:

int pthread_cond_signal(pthread_cond_t *cond);
  • 参数cond:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_signal作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。

唤醒条件变量等待队列中的所有线程:

int pthread_cond_broadcast(pthread_cond_t *cond);
  • 参数:所在等待队列的条件变量地址
  • 返回值:唤醒成功返回0,失败返回错误码

pthread_cond_broadcast作用:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。

将条件变量用到上一篇抢票的代码中,实现多线程按照一定顺序互斥抢票:

  •  创建全局的条件变量(后面就不创建成全局的了)。
  •  每个线程一抢上锁以后就进入条件变量的等待队列。
  •  主线程每个一秒钟唤醒一个等待的线程进行抢票。
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <cassert>
#include <thread>
#include <unistd.h>
#include <pthread.h>
using namespace std;#define THREAD_NUM 4
int tickets = 10000; // 在并发访问的时候,导致了我们数据不一致的问题 -> 临界资源
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 创建条件变量class ThreadData
{
public:ThreadData(const std::string &n,pthread_mutex_t *pm):tname(n), pmtx(pm){}
public:std::string tname;pthread_mutex_t *pmtx;
};void *getTickets(void *args)
{ThreadData *td = (ThreadData*)args;while(true) // 抢票逻辑{int n = pthread_mutex_lock(td->pmtx); // 加锁assert(n == 0);pthread_cond_wait(&cond, td->pmtx); // 进入等待队列// 临界区if(tickets > 0) // 判断的本质也是计算的一种{usleep(rand()%1500);printf("%s: %d\n", td->tname.c_str(), tickets);tickets--; // 也可能出现问题n = pthread_mutex_unlock(td->pmtx); // 解锁assert(n == 0);}else{n = pthread_mutex_unlock(td->pmtx);  // break之前解锁assert(n == 0);break;}usleep(rand()%2000); // 抢完票,其实还需要后续的动作}delete td;return nullptr;
}int main()
{time_t start = time(nullptr);pthread_mutex_t mtx;pthread_mutex_init(&mtx, nullptr);pthread_t t[THREAD_NUM];for(int i = 0; i < THREAD_NUM; i++) // 多线程抢票的逻辑{std::string name = "thread ";name += std::to_string(i+1);ThreadData *td = new ThreadData(name, &mtx);pthread_create(t + i, nullptr, getTickets, (void*)td);}while(true){sleep(1);pthread_cond_signal(&cond); // 唤醒一个等待线程cout << "main thread wakeup a new thread" << endl;}for(int i = 0; i < THREAD_NUM; i++){pthread_join(t[i], nullptr);}pthread_mutex_destroy(&mtx);return 0;
}

此时就按照4 1 2 3 的顺序抢票了。

  • 每个线程都会被先挂起到等待队列中,等待主线程的唤醒。
  • 唤醒一个线程抢完票以后会继续进入等待队列,并且排在队列的后面。

如果不使用同步,就可能会只有一个线程在抢票,其他线程就会处于饥饿状态。

再放一份代码:(条件变量不再是全局的,还加了函数指针的方法让新线程执行不同的任务)

#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>#define TNUM 4
typedef void (*func_t)(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond);
volatile bool quit = false;// pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 可以定义成全局的,这样很多地方不用传参了,但是这里写正式点
// pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;class ThreadData
{
public:ThreadData(const std::string &name, func_t func, pthread_mutex_t *pmtx, pthread_cond_t *pcond): name_(name), func_(func), pmtx_(pmtx), pcond_(pcond){}public:std::string name_;func_t func_;pthread_mutex_t *pmtx_;pthread_cond_t *pcond_;
};void func1(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){// wait一定要在加锁和解锁之间进行waitpthread_mutex_lock(pmtx);// if(临界资源是否就绪 -> 没就绪) pthread_cond_wait 但是现在没有这样的判断if (!quit) // 这个if加不加不重要{pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞}std::cout << name << " running  --  a播放" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func2(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- b下载" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func3(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- c刷新" << std::endl;pthread_mutex_unlock(pmtx);}
}
void func4(const std::string &name, pthread_mutex_t *pmtx, pthread_cond_t *pcond)
{while (!quit){pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx); // 默认该线程在执行的时候,wait代码被执行,当前线程会被立即被阻塞std::cout << name << " running  -- d扫描" << std::endl;pthread_mutex_unlock(pmtx);}
}void *Entry(void *args) // 添加了一个类似软件层的东西,只是演示下
{ThreadData *td = (ThreadData *)args;         // td在每一个线程自己私有的栈空间中保存td->func_(td->name_, td->pmtx_, td->pcond_); // 它是一个函数,调用完成就要返回delete td;return nullptr;
}int main()
{pthread_mutex_t mtx;pthread_cond_t cond;pthread_mutex_init(&mtx, nullptr);pthread_cond_init(&cond, nullptr);pthread_t tids[TNUM];func_t funcs[TNUM] = {func1, func2, func3, func4};for (int i = 0; i < TNUM; i++){std::string name = "Thread ";name += std::to_string(i + 1);ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);pthread_create(tids + i, nullptr, Entry, (void *)td);}sleep(5); // 主线程和所有新线程都休眠五秒 -> 新线程阻塞等待// ctrl new threadint cnt = 10;while (cnt){std::cout << "resume thread run code ...." << cnt-- << std::endl;pthread_cond_signal(&cond);sleep(1); // 每隔一秒唤醒一个线程,线程不用自己休眠了// pthread_cond_broadcast(&cond); // 全部唤醒 -> 设置条件变量有效}std::cout << "ctrl done" << std::endl; // 控制结束quit = true;pthread_cond_broadcast(&cond); // quit = true;后再全部唤醒一次for (int i = 0; i < TNUM; i++){pthread_join(tids[i], nullptr);std::cout << "thread: " << tids[i] << "quit" << std::endl;}pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}

这个代码只是演示一下不同的使用方法和场景、


3. 生产者消费者模型_阻塞队列

上图所示就是要实现的模型,有一个生产者线程,一个消费者线程,还有一个阻塞队列。

  • 阻塞队列这里使用C++STL容器中的queue来实现。
  • 阻塞队列是公共资源,所以要保证它的安全,线程A和线程B要互斥访问,只需要一把锁就能实现生产者和消费者,生产者和生产者,消费者和消费者之间的互斥。
  • 阻塞队列中有数据消费者才能读,此时生产者不能进行生产,生产者线程要进入它的等待队列中。
  • 阻塞队列中没有数据或者不满时,生产者才能进行生产,消费者在生产的时候不能读,要进入它的等待队列。

3.1 前期代码(轮廓)

先敲个轮廓出来就是这样的:(代码具体就不讲解了,看注释就行)

Makefile:

ProdCon:ProdCon.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ProdCon

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>const int gDefaultCap = 7;template <class T>
class BlockQueue
{
public:BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T& in){}void pop(T* out){}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}protected:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 容量上限pthread_mutex_t _mtx;  // 保证队列安全pthread_cond_t _Empty; // 表示bq 是否空的条件pthread_cond_t _Full;  // 表示bq 是否满的条件
};

ProdCon.cc:(Producer consumer model)

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consumer(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;while(true) // 获取任务{sleep(1);}return nullptr;
}void* productor(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;while(true) // 制作任务{sleep(1);}return nullptr;
}int main()
{BlockQueue<int> *bqueue = new BlockQueue<int>();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

3.2 中期代码(简单使用)

然后现在来写一写关键的pop和push:

BlockQueue.hpp:

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>const int gDefaultCap = 7;template <class T>
class BlockQueue
{
private:bool isQueueEmpty(){return _bq.size() == 0;}bool isQueueFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T& in) // 生产者{pthread_mutex_lock(&_mtx);// pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用whilewhile(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件{pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待// 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?// 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放// 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的// 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁}_bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的pthread_cond_signal(&_Empty); // 唤醒pthread_mutex_unlock(&_mtx); // 解锁}void pop(T* out){pthread_mutex_lock(&_mtx);while (isQueueEmpty()){pthread_cond_wait(&_Empty, &_mtx);}*out = _bq.front(); // 访问临界资源_bq.pop();pthread_cond_signal(&_Full); // 唤醒pthread_mutex_unlock(&_mtx); // 解锁}~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}protected:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 容量上限pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全pthread_cond_t _Empty; // 用它来表示bq 是否空的条件pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

ProdCon.cc:

#include "BlockQueue.hpp"
#include <pthread.h>
#include <unistd.h>void* consumer(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;while(true) // 获取任务{int a = 0;bqueue->pop(&a);std::cout << "消费一个数据" << a << std::endl;sleep(1);}return nullptr;
}void* productor(void *args)
{BlockQueue<int> *bqueue = (BlockQueue<int> *)args;int a = 1;while(true) // 制作任务{bqueue->push(a);std::cout << "生产一个数据" << a++ << std::endl;// sleep(1); // 两个换着sleep看看能不能看到约束的效果}return nullptr;
}int main()
{BlockQueue<int> *bqueue = new BlockQueue<int>();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

编译运行试试效果:

此时就实现了生产者生产一批数据,然后两个线程一人消费一个,然后再生成,再消费。

把代码里生产者和消费者的sleep注释换一下:(此时就应该是消费者跟着生产者走了)

编译运行:

也成功达到了预想的效果。

如果不想类似生产一个消费一个的话还可以在生产者或者消费者里的唤醒线程前加if判断语句,比如:在生产者线程里:if(_bq.size() >= _capacity / 2) pthread_cond_signal(&_Empty);


3.3 生产者消费者模型高效的体现

前面在分析生产者消费者模型时,一直都在说该模型高效,那么到底体现在什么地方呢?

多个生产者线程向阻塞队列中生成数据,多个消费者线程从阻塞队列中消费数据。

该模型的三种关系决定了访问阻塞队列的线程同一时间只有一个。

尤其是上面代码现象中,消费和生产是一前一后的,对于阻塞队列的访问是串行的,凭什么说这个模型是高效的呢?

因为在生产者线程和消费者线程中,访问阻塞队列临界资源的代码都只有一条,只有临界区的代码才是串行访问的。

除了临界区的代码,其他部分代码所有线程都是并发执行的。

实际的线程中,临界区之外的代码会有很多,而且有可能会非常耗时,但是这些代码是可以多线程并发执行的,该模型的效率就会很高。

生产者消费者模型的高效体现在:非临界区的代码,多线程可以并发执行。

该模型的高效并不体现在对临界资源(阻塞队列)的访问上。


3.4 后期代码(处理任务)

生产者消费者模型实际上并不是仅仅用来生产消费整型数据的,它更多的是处理任务的。

这里创建一个计算任务的类Task(这里写在了一个头文件下,只弄了加法)。在类中的仿函数调用回调函数执行具体的计算逻辑,还有一个显示任务的接口。把传给阻塞队列的int传成Task

BlockQueue.hpp和前面一样

Task.hpp

#pragma once
#include <iostream>
#include <functional>
typedef std::function<int(int, int)> func_t;
class Task
{
public:Task() {}Task(int x, int y, func_t func): _x(x), _y(y), _func(func){}int operator()(){return _func(_x, _y);}public: // 不想写get接口就直接弄公有了int _x;int _y;// int type;func_t _func;
};

ProdCon.cc

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>inline int myAdd(int x, int y)
{return x + y;
}
void* consumer(void *args)
{BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;while(true) // 获取任务{Task t;bqueue->pop(&t);std::cout << pthread_self() <<" consumer: "<< t._x << " + " << t._y << " = " << t() << std::endl;// sleep(1);}return nullptr;
}void* productor(void *args)
{BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;int a = 1;while(true) // 制作任务{int x = rand() % 10 + 1;usleep(rand() % 1000);int y = rand() % 5 + 1;Task t(x, y, myAdd);bqueue->push(t);std::cout <<pthread_self() <<" productor: "<< t._x << " + " << t._y << " = ?" << std::endl;sleep(1); // 两个换着sleep看看能不能看到约束的效果}return nullptr;
}int main()
{srand((uint64_t)time(nullptr) ^ getpid() ^ 0x777); // 只是为了更随机BlockQueue<Task> *bqueue = new BlockQueue<Task>();pthread_t c[2],p[2];pthread_create(c, nullptr, consumer, bqueue);pthread_create(c + 1, nullptr, consumer, bqueue);pthread_create(p, nullptr, productor, bqueue);pthread_create(p + 1, nullptr, productor, bqueue);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);delete bqueue;return 0;
}

编译运行:


3.5 最终代码(RAII风格的锁)

ProdCon.cc和Task.hpp跟前面一样,

lockGuard.hpp

#pragma once
#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(pthread_mutex_t* mtx) :_pmtx(mtx){}void lock(){pthread_mutex_lock(_pmtx);std::cout << "进行加锁成功" << std::endl;}void unlock(){pthread_mutex_unlock(_pmtx);std::cout << "进行解锁成功" << std::endl;}~Mutex(){}
protected:pthread_mutex_t* _pmtx;
};class lockGuard // RAII风格的加锁方式
{
public:lockGuard(pthread_mutex_t* mtx) // 因为不是全局的锁,所以传进来,初始化:_mtx(mtx){_mtx.lock();}~lockGuard(){_mtx.unlock();}
protected:Mutex _mtx;
};

BlockQueue.hpp

(只是加锁方式变了,不用自己解锁了)

#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap = 7;template <class T>
class BlockQueue
{
private:bool isQueueEmpty(){return _bq.size() == 0;}bool isQueueFull(){return _bq.size() == _capacity;}public:BlockQueue(int capacity = gDefaultCap) : _capacity(capacity){pthread_mutex_init(&_mtx, nullptr);pthread_cond_init(&_Empty, nullptr);pthread_cond_init(&_Full, nullptr);}void push(const T& in) // 生产者{lockGuard lockgrard(&_mtx); // 自动调用构造函数//pthread_mutex_lock(&_mtx);// pthread_cond_wait: 只要是一个函数,就可能调用失败,可能存在 伪唤醒 的情况,所以用whilewhile(isQueueFull()) //1. 先检测当前的临界资源是否能够满足访问条件{pthread_cond_wait(&_Full, &_mtx); // 满的时候就在_Full这个条件变量下等待// 此时思考:我们是在临界区中,我是持有锁的,如果我去等待了,锁该怎么办呢?// 所以pthread_cond_wait第二个参数是一个锁,当成功调用wait之后,传入的锁,会被自动释放// 当我被唤醒时,我从哪里醒来呢?->从哪里阻塞挂起,就从哪里唤醒, 被唤醒的时候,我们还是在临界区被唤醒的// 当我们被唤醒的时候,pthread_cond_wait,会自动帮助我们线程获取锁}_bq.push(in); // 2. 队列不为空或者被唤醒 -> 访问临界资源,100%确定,资源是就绪的pthread_cond_signal(&_Empty); // 唤醒// pthread_mutex_unlock(&_mtx); // 解锁} // 出了代码块自动调用析构函数void pop(T* out){lockGuard lockgrard(&_mtx); // 自动调用构造函数// pthread_mutex_lock(&_mtx);while (isQueueEmpty()){pthread_cond_wait(&_Empty, &_mtx);}*out = _bq.front(); // 访问临界资源_bq.pop();pthread_cond_signal(&_Full); // 唤醒// pthread_mutex_unlock(&_mtx); // 解锁} // 出了代码块自动调用析构函数~BlockQueue(){pthread_mutex_destroy(&_mtx);pthread_cond_destroy(&_Empty);pthread_cond_destroy(&_Full);}protected:std::queue<T> _bq;     // 阻塞队列int _capacity;         // 容量上限pthread_mutex_t _mtx;  // 通过互斥锁保证队列安全pthread_cond_t _Empty; // 用它来表示bq 是否空的条件pthread_cond_t _Full;  //  用它来表示bq 是否满的条件
};

成功运行。


本篇完。

下一篇:零基础Linux_25(多线程)信号量+自选锁+读写锁(基于环形队列的生产者消费模型)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/174511.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

phar反序列化学习

PHP反序列化常见的是使用unserilize()进行反序列化&#xff0c;除此之外还有其它的反序列化方法&#xff0c;不需要用到unserilize()。就是用到phar反序列化。 Phar phar文件 Phar是将php文件打包而成的一种压缩文档&#xff0c;类似于Java中的jar包。它有一个特性就是phar文…

Golang教程——配置环境,再探GoLand

文章目录 一、Go是什么&#xff1f;二、环境配置验证配置环境变量 三、安装开发者工具GoLand四、HelloGolang 一、Go是什么&#xff1f; Go&#xff08;也称为Golang&#xff09;是一种开源的编程语言&#xff0c;由Google开发并于2009年首次发布。Go语言旨在提供一种简单、高…

企业文件防泄密方法

企业文件防泄密方法 安企神数据防泄密系统下载使用 企业文件是企业的核心资产&#xff0c;其中可能包含大量的敏感信息&#xff0c;如客户资料、产品配方、财务数据等。一旦这些文件泄露&#xff0c;可能会给企业带来不可估量的损失。 然而&#xff0c;企业文件防泄密是确保…

Vue 3响应式对象: ref和reactive

目录 什么是响应式对象&#xff1f; Ref Reactive Ref vs Reactive 适用场景&#xff1a; 访问方式&#xff1a; 引用传递&#xff1a; 性能开销&#xff1a; 响应式对象优点 响应式对象缺点 总结 Vue 3作为一种流行的JavaScript框架&#xff0c;提供了响应式编程的…

4.编译多线程应用程序

在不同平台下使用多线程的程序编译时的编译选项设置如下图所示。 gcc下编译时我们会碰到两个选项-pthread和-lpthread&#xff0c;记住推荐使用-pthread而不是-lpthread&#xff0c;原因如下 -lpthread只是起到链接pthread库的作用&#xff1b;而-pthread选项除了会链接pthrea…

搭建微信小程序环境及项目结构介绍

一、注册 访问微信公众平台&#xff0c;将鼠标的光标置于账号分类中的小程序上&#xff0c; 点击‘查看详情’ 点击“前往注册” 下方也可以点击注册&#xff1a; 小程序注册页面&#xff1a; 步骤a:进入小程序注册页&#xff0c;根据指引填写信息和提交相应的资料&#x…

uni-app打包之如何生成自由证书

我是使用Android Studio来直接生成。超级简单 第一步 打开 Android Studio 找到下面图片 第二步 选 Android App Bund 然后Next 第三步 选择创建新的 第四步 填写对应的 信息 密码最好都是一样的 第五步 点击ok 即可创建成功。 uniapp打包时候勾选文件 &#xff08;如果公…

SpringBoot相比于Spring的优点(自动配置和依赖管理)

自动配置 例子见真章 我们先看一下我们Spring整合Druid的过程&#xff0c;以及我们使用SpringBoot整合Druid的过程我们就知道我们SpringBoot的好处了。 Spring方式 Spring方式分为两种&#xff0c;第一种就是我们使用xml进行整合&#xff0c;第二种就是使用我们注解进行简化…

国外怎么传大文件到国内,这款传输软件跨国企业必备

从国外传输文件到国内&#xff0c;这项任务常常充满了挑战。国际之间的距离、网络延迟、数据安全和文件大小限制等问题使得这个过程异常复杂。本文将深入剖析这些挑战&#xff0c;并说明一款优秀的跨国传输软件&#xff0c;如何能够成为解决这些问题的强有力工具。 国外传输文件…

Vue的安装

----------------------------------------------------前置---------------------------------------------------- 1.node.js的下载安装、缓存路径的设置 ①安装 ②设置npm prefix, cache 2.NODE_PATH、PATH ①系统变量中加 ②PATH中加 3.配置镜像源 -----------------------…

数据库数据恢复—Oracle数据库报错ORA-01110错误的数据恢复案例

Oracle数据库故障&#xff1a; 北京某公司一台运行oracle数据库的服务器&#xff0c;机房意外断电导致该服务器重启&#xff0c;重启后发现oracle数据库报错。该Oracle数据库没有备份。 Oracle数据库数据恢复过程&#xff1a; 1、北亚企安数据恢复工程师检查该oracle数据库的数…

单目标应用:进化场优化算法(Evolutionary Field Optimization,EFO)求解微电网优化MATLAB

一、微网系统运行优化模型 微电网优化模型介绍&#xff1a; 微电网多目标优化调度模型简介_IT猿手的博客-CSDN博客 二、进化场优化算法EFO 进化场优化算法&#xff08;Evolutionary Field Optimization&#xff0c;EFO&#xff09;由Baris Baykant Alagoz等人于2022年提出&…

sd模型测试之又纯又欲的Copax Anime XL动漫大模型

除了各种美女图外&#xff0c;AI绘画大模型中&#xff0c;最受欢迎的是动漫。 动漫又分好几种&#xff0c;幼儿向、热血向、成人向等。 之前我推荐了几个风格不同的动漫大模型&#xff0c;今天推荐一个成人向的动漫大模型&#xff1a;Copax Anime XL。 当然了&#xff0c;成…

iOS调试技巧——使用Python 自定义LLDB

一、类介绍 在使用Python 自定义LLDB之前&#xff0c;先了解一下LLDB的一些类型 SBTarget 正在被调试的程序SBProcess 和程序关联的具体的进程SBThread 执行的线程SBFrame 和线程关联的一个栈帧SBVariable 变量&#xff0c;寄存器或是一个表达式 一般情况下&#xff0c;我们…

Kafka生产问题总结及性能优化实践

Kafka可视化管理工具kafka-manager 安装及基本使用可参考&#xff1a;https://www.cnblogs.com/dadonggg/p/8205302.html 线上环境规划 JVM参数设置 kafka是scala语言开发&#xff0c;运行在JVM上&#xff0c;需要对JVM参数合理设置&#xff0c;参看JVM调优专题 修改bin/kaf…

geoserver去除tif影像黑色的背景的方法

geoserver加载某些tif文件的时候,tif文件本身有黑色的背景,怎么去掉呢? 只要在geoserver中设置就行。 处理方法: 1.新建数据源时要选择ImageMosaic数据源 2,设置"Output Transparent Color" 设置"Output Transparent Color"为黑色(000000),在…

一文详解汽车电子CAN总线

0.什么是CAN总线 CAN总线(控制器区域网络Controller Area Network)是一个中央网络系统&#xff0c;连接不同的电子控制单元(ECU)以及车辆中的其他设备。现在的汽车可以有100个ECU&#xff0c;因此CAN总线通信变得非常重要。 1.CAN总线流行的背景 集中式:CAN总线系统允许对连接…

从 Hash索引、二叉树、B-Tree 与 B+Tree 对比看索引结构选择

从 Hash索引、二叉树、B-Tree 与 BTree 对比看索引结构选择 1、Hash 结构1.1、关于 Hash 数据结构1.2、InnoDB索引为啥不选 Hash 结构1.3、关于InnoDB 提供自适应 Hash 索引 &#xff08;Adaptive Hash Index&#xff09; 2、二叉搜索树3、平衡二叉树&#xff08;AVL树 &#x…

莫名其妙el-table不显示问题

完全复制element-ui中table代码&#xff0c;发现表格仍然不显示&#xff0c;看别人都说让降低版本&#xff0c;可我不想降低啊&#xff0c;不然其他组件有可能用不了&#xff0c;后来发现可以通过配置vite.config.js alias: {: path.resolve(__dirname, src),vue: vue/dist/vue…

<蓝桥杯软件赛>零基础备赛20周--第3周

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周&#xff08;读者可以按…