【Linux】线程的互斥和同步

【Linux】线程的互斥和同步

线程间的互斥

  • 临界资源:多线程执行共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源进行保护
  • 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。

1. 互斥量mutex

  • 大部分情况下,线程使用的数据都是局部变量,变量的地址空间在线程的栈空间内,这种情况下,变量属于单个线程,且其他线程无法访问该变量
  • 但有时候,很多变量需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程间的交互
  • 多个线程并发操作共享变量,会带来一些问题

实验:操作共享变量会有问题的售票系统代码

#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<iostream>
#include<pthread.h>
#include <string>
#include <unistd.h>
#include<functional>
namespace ThreadModule
{template<typename T>using func_t = std::function<void(T)>;template<typename T>class Thread{public:Thread(func_t<T> func,T const data,const std::string& threadname = "none_name")//--常量引用和非常量引用的概念:_func(func),_data(data),_threadname(threadname),_stop(true){}void Excute(){_func(_data);}static void* handler(void* args){Thread<T>* self = static_cast<Thread<T> *>(args);// self->Excute();self->_func(self->_data);return nullptr;}bool Start(){int ret = pthread_create(&_tid,nullptr,handler,this);if(ret == 0){_stop = false;return true;}else {return false;}}void Join(){if(!_stop){pthread_join(_tid,nullptr);}}std::string name(){return _threadname;}~Thread(){}private:std::string _threadname;pthread_t _tid;T _data;func_t<T> _func;bool _stop;};
}
#endif
using namespace ThreadModule;
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
class ThreadData
{public:ThreadData(const std::string& threadname,int& ticked):_threadname(threadname),_tickeds(ticked),_total(0){}~ThreadData(){}public:std::string _threadname;int& _tickeds;int _total;
};
void route(ThreadData* ptr)
{while(true){if(ptr->_tickeds > 0){//模拟一次抢票逻辑usleep(1000);printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);ptr->_tickeds--;ptr->_total++;}else{break;}}return ;
}
const int num = 4;
int main()
{//创建一批线程std::vector<Thread<ThreadData*>> threads;std::vector<ThreadData*> datas;for(int i = 0;i < num;i++){std::string name = "thread-" + std::to_string(i+1);ThreadData* _ptr = new ThreadData(name,g_tickets);threads.emplace_back(route,_ptr,name);datas.emplace_back(_ptr);}//启动一批线程for(auto& thread:threads){thread.Start();}//等待一批线程for(auto& thread:threads){thread.Join();std::cout<<"wait thread name: "<<thread.name()<<std::endl;}for(auto data: datas){std::cout<<"name: "<<data->_threadname<<"total is: "<<data->_total<<std::endl; delete data; }return 0;
}

实验现象:

为什么会抢到票为-1,或-2的票呢?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
  • _tickeds-- 操作本身就不是一个原子操作

在这里插入图片描述

取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax # 600b34 <_tickeds>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) # 600b34 <_tickeds>

– 操作并不是原子操作,而是对应三条汇编指令:

  • load :将共享变量ticket从内存加载到寄存器中
  • update : 更新寄存器里面的值,执行-1操作
  • store :将新值,从寄存器写回共享变量ticket的内存地址

要解决以上问题,需要做到三点:

  • 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
  • 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临
    界区。
  • 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区

要做到这三点,本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。

2. 互斥量的接口

初始化互斥量
初始化互斥量有两种方法:

  • 方法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
  • 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);参数:mutex:要初始化的互斥量attr:NULL

销毁互斥量
销毁互斥量需要注意:

  • 使用PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
  • 不要销毁一个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁

int ptread_mutex_lock(pthread_mutex* mutex);
int ptread_mutex_unlock(pthread_mutex* mutex);
返回值:成功返回0,失败返回错误号

调用pthread_lock可能会出现以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但有竞争到互斥量,那么pthread_lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁

改进上面的售票系统:

int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
void route(ThreadData* ptr)
{while(true){pthread_mutex_lock(&getmutex);//加锁:本质是将并行执行--》串行执行,加锁的粒度越细越好if(ptr->_tickeds > 0){usleep(1000);printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);ptr->_tickeds--;pthread_mutex_unlock(&getmutex);//解锁ptr->_total++;}else{pthread_mutex_unlock(&getmutex);//解锁break;}}return ;
}

image-20241123142442036

更优雅的代码:

#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include<pthread.h>
class LockGuard
{public:LockGuard(pthread_mutex_t* mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}private:pthread_mutex_t* _mutex;
};
#endif
#include"Lock_Guard.hpp"
#include"Thread.hpp"
#include<iostream>
#include <iostream>
#include <vector>
#include<mutex>using namespace ThreadModule;
//静态锁
//pthread_mutex_t getmutex = PTHREAD_MUTEX_INITIALIZER;//静态锁
class ThreadData
{public:ThreadData(const std::string& threadname,int& ticked,std::mutex& mutex):_threadname(threadname),_tickeds(ticked),_total(0),_mutex(mutex){}~ThreadData(){}public:std::string _threadname;int& _tickeds;int _total;// pthread_mutex_t& _mutex;std::mutex& _mutex;
};
int g_tickets = 10000; // 共享资源,没有保护的, 临界资源
void route(ThreadData* ptr)
{while(true){//加锁:本质是将并行执行--》串行执行,加锁的粒度越细越好//线程竞争锁是自由竞争的,竞争锁的能力太强,就会导致其他线程抢不到锁,---造成其他线程的饥饿问题!!!//pthread_mutex_lock(&ptr->_mutex);     动态锁//pthread_mutex_lock(&getmutex);        静态锁//LockGuard mutex(&ptr->_mutex);        自己封装的RAII锁// std::lock_guard<std::mutex> lock(ptr->_mutex);  //C++11RAII锁ptr->_mutex.lock();                     //C++11锁//模拟一次抢票逻辑              if(ptr->_tickeds > 0)         {usleep(1000);printf("%s is runing ,get ticked: %d\n",ptr->_threadname.c_str(),ptr->_tickeds);ptr->_tickeds--;//pthread_mutex_unlock(&ptr->_mutex);//解锁//pthread_mutex_unlock(&getmutex);ptr->_mutex.unlock();ptr->_total++;}else{//pthread_mutex_unlock(&ptr->_mutex);//解锁//pthread_mutex_unlock(&getmutex);ptr->_mutex.unlock();break;}}return ;
}
const int num = 4;
int main()
{//动态锁// pthread_mutex_t mutex;// pthread_mutex_init(&mutex,nullptr);//C++11锁std::mutex mutex;//创建一批线程std::vector<Thread<ThreadData*>> threads;std::vector<ThreadData*> datas;for(int i = 0;i < num;i++){std::string name = "thread-" + std::to_string(i+1);ThreadData* _ptr = new ThreadData(name,g_tickets,mutex);threads.emplace_back(route,_ptr,name);datas.emplace_back(_ptr);}//启动一批线程for(auto& thread:threads){thread.Start();}//等待一批线程for(auto& thread:threads){thread.Join();std::cout<<"wait thread name: "<<thread.name()<<std::endl;}for(auto data: datas){std::cout<<"name: "<<data->_threadname<<"total is: "<<data->_total<<std::endl; delete data; //pthread_mutex_destroy(&data->_mutex);data->_mutex.~mutex();}return 0;
}

2. 互斥的底层实现

  • 经过上面的例子,大家已经意识到单纯的i++ 或者++i 都不是原子的,有可能会有数据一致性问题
  • 为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器内存单
    数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一
    个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改一下

加锁and解锁:

image-20241123164826838

image-20241123170200857

线程间的同步

条件变量

  • 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情
    况就需要用到条件变量。

同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问
    题,叫做同步
  • 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解

1. 条件变量函数接口

初始化

  • 静态初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 动态初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict
attr);
参数:cond:要初始化的条件变量attr:NULL
  • 销毁
int pthread_con_destroy(pthread_con_t *cond);
  • 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:cond:要在这个条件变量上等待mutex:互斥量,后面详细解释
  • 唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

实验:线程同步

#include<iostream>
#include<vector>
#include<string>
#include<unistd.h>
#include<pthread.h>
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;//互斥量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//条件变量
void* Mastercore(void* args)
{sleep(3);std::cout<<"mastcore 开始唤醒..."<<std::endl;std::string name = static_cast<char*>(args);//唤醒...while(true){pthread_cond_signal(&cond);//唤醒等待队列中第一个线程sleep(1);}//pthread_cond_broadcast(&cond);//唤醒等待队列中所有线程return nullptr;
}
void* Slavercore(void* args)
{std::string name = static_cast<char*>(args);while(true){pthread_mutex_lock(&gmutex);//加锁pthread_cond_wait(&cond,&gmutex);// 等待条件变量,std::cout<<name<<"被唤醒..."<<std::endl;//TODOpthread_mutex_unlock(&gmutex);//解锁}
}
void StartMaster(std::vector<pthread_t>* tidptr)
{pthread_t tid;int n = pthread_create(&tid,nullptr,Mastercore,(void*)"master thread");if(n == 0){std::cout<<"master thread create success ..."<<std::endl;}tidptr->emplace_back(tid);
}void StartSlaver(std::vector<pthread_t>* tidptr,int threadnum)
{for(int i = 0;i< threadnum;i++){pthread_t tid;char* name = new char[64];snprintf(name,64,"thread-%d",i+1);int n = pthread_create(&tid,nullptr,Slavercore,name);if(n == 0){std::cout<<name<<" create success ..."<<std::endl;}tidptr->emplace_back(tid);}
}
void WaitThread(const std::vector<pthread_t>& tids)
{for(auto tid:tids){pthread_join(tid,nullptr);}
}
int main()
{std::vector<pthread_t> tids;StartMaster(&tids);StartSlaver(&tids,5);WaitThread(tids);return 0;
}

结果:

image-20241123205256945

生产者与消费者模型

  • 解耦
  • 支持并发
  • 支持忙闲不均:是指在一个系统中,不同组件或线程之间工作负载分配不均匀的现象。

image-20241124142954354

实验:生产者与消费者模型基础版

阻塞队列:

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
template <typename T>
class BlockQueue
{
public:bool is_full(){return _BlockQueue.size() == _cap;}bool is_empty(){return _BlockQueue.empty();}public:BlockQueue(int cap): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_productor_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}void enqueue(T &in) // 生产者使用接口{pthread_mutex_lock(&_mutex);if (is_full()){pthread_cond_wait(&_productor_cond, &_mutex); // 如果满了,生产者入等待队列,解锁--唤醒--出等待队列,锁定}_BlockQueue.push(in);// 通知消费者来买// std::cout << "通知消费者来买" << std::endl;pthread_cond_signal(&_consumer_cond);// std::cout << "通知完成" << std::endl;pthread_mutex_unlock(&_mutex);}void pop(T *out){pthread_mutex_lock(&_mutex);if (is_empty()){// std::cout << "消费者入等待队列" << std::endl;pthread_cond_wait(&_consumer_cond, &_mutex); // 如果空了,消费者入等待队列,解锁,---被唤醒--出等待队列,锁定}*out = _BlockQueue.front();_BlockQueue.pop();// 通知生产者来卖pthread_cond_signal(&_productor_cond);pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_productor_cond);pthread_cond_destroy(&_consumer_cond);}private:std::queue<T> _BlockQueue;int _cap; // 阻塞队列上限pthread_mutex_t _mutex;pthread_cond_t _productor_cond; // 生产者等待队列pthread_cond_t _consumer_cond;  // 消费者等待队列
};
#endif

主函数:

#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
using namespace ThreadModule;void producer(BlockQueue<int> &bq)
{int cnt = 3;while (true){bq.enqueue(cnt);std::cout << "producer is sell :" << cnt << std::endl;cnt++;}
}
void consumer(BlockQueue<int> &bq)
{while (true){sleep(3);int data;bq.pop(&data); // 为什么传地址,通过地址修改cntstd::cout << "consumer is buy :" << data << std::endl;}
}
void Start_Com(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num, func_t<BlockQueue<int>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads_ptr->emplace_back(func, bq, name);threads_ptr->back().Start();}
}
void StartProducer(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num)
{Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<BlockQueue<int>>> *threads_ptr, BlockQueue<int> &bq, int num)
{Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<BlockQueue<int>>> &threads)
{for (auto thread : threads){thread.Join();}
}
int main()
{BlockQueue<int> *ptr = new BlockQueue<int>(5);std::vector<Thread<BlockQueue<int>>> threads;StartProducer(&threads, *ptr, 1);StartConsumer(&threads, *ptr, 1);WaitAllThread(threads);return 0;
}

image-20241124143415817

升级版:传递任务1. 0

#pragma once
#include <iostream>
class Task
{
public:Task(){}Task(int a, int b): _a(a), _b(b){}void Excute(){_result = _a+_b;}std::string ResultToString(){return std::to_string(_a)+" + "+std::to_string(_b)+" = "+std::to_string(_result);}std::string DebugToString(){return std::to_string(_a)+" + "+std::to_string(_b)+" = ?";}
private: int _a;int _b;int _result;
};
#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;void producer(BlockQueue<Task> &bq)
{srand((unsigned)time(NULL));int cnt = 3;while (true){sleep(2);int first = rand() % 100;usleep(1234);int second = rand() % 100;Task tk(first, second);bq.enqueue(tk);std::cout << tk.DebugToString() << std::endl;}
}
void consumer(BlockQueue<Task> &bq)
{while (true){Task td;bq.pop(&td); // 为什么传地址,通过地址修改cnttd.Excute();std::cout << td.ResultToString() << std::endl;}
}
void Start_Com(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num, func_t<BlockQueue<Task>> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads_ptr->emplace_back(func, bq, name);threads_ptr->back().Start();}
}
void StartProducer(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num)
{Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, BlockQueue<Task> &bq, int num)
{Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<BlockQueue<Task>>> &threads)
{for (auto thread : threads){thread.Join();}
}
int main()
{BlockQueue<Task> *ptr = new BlockQueue<Task>(5);std::vector<Thread<BlockQueue<Task>>> threads;StartProducer(&threads, *ptr, 1);StartConsumer(&threads, *ptr, 3);WaitAllThread(threads);return 0;
}

image-20241124154042539

传递任务2.0

#include <iostream>
#include "BlockQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadModule;
using Task = std::function<void()>;
using bock_queue_t = BlockQueue<Task>;
void printdata()
{std::cout << "hell word" << std::endl;
}
void producer(bock_queue_t &bq)
{while (true){sleep(1);Task t = printdata;bq.enqueue(t);}
}
void consumer(bock_queue_t &bq)
{while (true){Task tk;bq.pop(&tk); // 为什么传地址,通过地址修改cnttk();}
}
void Start_Com(std::vector<Thread<BlockQueue<Task>>> *threads_ptr, bock_queue_t &bq, int num, func_t<bock_queue_t> func)
{for (int i = 0; i < num; i++){std::string name = "thread-" + std::to_string(i + 1);threads_ptr->emplace_back(func, bq, name);threads_ptr->back().Start();//创建线程}
}
void StartProducer(std::vector<Thread<bock_queue_t>> *threads_ptr, bock_queue_t &bq, int num)
{Start_Com(threads_ptr, bq, num, producer);
}
void StartConsumer(std::vector<Thread<bock_queue_t>> *threads_ptr, bock_queue_t &bq, int num)
{Start_Com(threads_ptr, bq, num, consumer);
}
void WaitAllThread(std::vector<Thread<bock_queue_t>> &threads)
{for (auto thread : threads){thread.Join();}
}
int main()
{bock_queue_t *ptr = new bock_queue_t(5);std::vector<Thread<bock_queue_t>> threads;StartProducer(&threads, *ptr, 1);StartConsumer(&threads, *ptr, 3);WaitAllThread(threads);return 0;
}

image-20241124154529305

POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。

  • 初始化信号量
#include<semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value)
参数:pshared: 0表示线程间共享,非零表示进程间共享value:表示信号量初始值
  • 销毁信号量
int sem_destroy(sem_t* sem);
  • 等待信号量
功能:等待信号量,会将信号量值减一
int sem_wait(sem_t * sem);//P()
  • 发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了,将信号量的值叫加一
int sem_post(sem_t * sem);//V()

上一节生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
(POSIX信号量):

image-20241124200631234

基于环形队列的生产消费模型

  • 环形队列采用数组模拟,用模运算来模拟环状特性

image-20241124192711664

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来
    判断满或者空。另外也可以预留一个空的位置,作为满的状态

image-20241124192731836

  • 但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
#ifndef __RING_QUEUE_HPP__
#define __RING_QUEUE_HPP__
#include <iostream>
#include <pthread.h>
#include <queue>
#include <semaphore.h>
#include <vector>
template <typename T>
class RingQueue
{
public:void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RingQueue(int cap): _ring_queue(cap), _cap(cap), _productor_index(0), _consumer_index(0){sem_init(&_room_sem, 0, _cap);//信号量初始化,空间资源初始为_ring_queue的容量sem_init(&_data_sem, 0, 0);//数据资源初始为 0pthread_mutex_init(&_productor_mutex, nullptr);//互斥量初始化pthread_mutex_init(&_consumer_mutex, nullptr);}void emquue(T &in){// 生产行为//P操作用来减少信号量的值(通常是减1)。//如果_room_sem信号量的值大于0,执行P操作后,信号量的值减1,进程继续执行。//如果信号量的值为0,执行P操作后,进程会被阻塞,直到信号量的值变为大于0,这时进程才会被唤醒并继续执行。P(_room_sem);Lock(_productor_mutex);//加锁,维护生产者与生产者的竞争_ring_queue[_productor_index++] = in;//生产数据_productor_index %= _ring_queue.size();Unlock(_productor_mutex);V(_data_sem);//当_data_sem信号量的值增加后,如果有进程因为执行P(_data_sem)操作而被阻塞在该信号量上,//那么系统会选择一个或多个进程解除其阻塞状态,允许它们继续执行}void pop(T *out){// 消费行为P(_data_sem);//_dataLock(_consumer_mutex);*out = _ring_queue[_consumer_index++];_consumer_index %= _ring_queue.size();Unlock(_consumer_mutex);V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);//销毁信号量sem_destroy(&_data_sem);pthread_mutex_destroy(&_productor_mutex);//销毁互斥量pthread_mutex_destroy(&_consumer_mutex);}private:// 环形队列std::vector<T> _ring_queue;int _cap;// 生产者与消费者下标int _productor_index;int _consumer_index;// 信号量sem_t _room_sem;//空间信号量sem_t _data_sem;//数据信号量// 互斥量pthread_mutex_t _productor_mutex;//生产者互斥量pthread_mutex_t _consumer_mutex;//消费者互斥量
};
#endif
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include <iostream>
#include <pthread.h>
#include <string>
#include <unistd.h>
#include <functional>
namespace ThreadModule
{template <typename T>using func_t = std::function<void(T &,std::string)>;template <typename T>class Thread{public:Thread(func_t<T> func, T& data, const std::string threadname = "none_name") // 为什么--常量引用和非常量引用的概念: _func(func), _data(data), _threadname(threadname), _stop(true){}void Excute(){_func(_data,_threadname);}static void *handler(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int ret = pthread_create(&_tid, nullptr, handler, this);if (ret == 0){_stop = false;return true;}else{return false;}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}~Thread() {}private:std::string _threadname;pthread_t _tid;T& _data;func_t<T> _func;bool _stop;};
}
#endif
#include <iostream>
#include "RingQueue.hpp"
#include "Thread.hpp"
using namespace ThreadModule;
using Task = std::function<void()>;
using ring_queue_t = RingQueue<Task>;
void printdata()
{std::cout << "hell word" << std::endl;
}
void producer(ring_queue_t &bq, std::string name)
{int cnt = 10;while (true){sleep(2);Task t = printdata;bq.emquue(t); // 传递任务// std::cout<< name << " in: " << cnt << std::endl;// cnt++;}
}
void consumer(ring_queue_t &bq, std::string name)
{while (true){int cnt;Task tk;bq.pop(&tk);tk(); // 执行执行任务std::cout << name << " is run : task " << std::endl;}
}
void Init_Com(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num, func_t<ring_queue_t> func, const std::string &who)
{for (int i = 0; i < num; i++){std::string _name = "thread- " + std::to_string(i + 1) + "  " + who;threads_ptr->emplace_back(func, rq, _name);// threads_ptr->back().Start();}
}
void InitProducer(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num)
{Init_Com(threads_ptr, rq, num, producer, "producer");
}
void InitConsumer(std::vector<Thread<ring_queue_t>> *threads_ptr, ring_queue_t &rq, int num)
{Init_Com(threads_ptr, rq, num, consumer, "consumer");
}
void StartAllThread(std::vector<Thread<ring_queue_t>> &threads)
{for (auto &thread : threads){thread.Start();}
}
void WaitAllThread(std::vector<Thread<ring_queue_t>> &threads)
{for (auto &thread : threads){thread.Join();}
}
int main()
{ring_queue_t *ptr = new ring_queue_t(10);std::vector<Thread<ring_queue_t>> threads; // 所有副线程共享ring_queueInitProducer(&threads, *ptr, 1);           // 生产者初始化InitConsumer(&threads, *ptr, 3);           // 消费者初始化StartAllThread(threads);                   // 启动所有副线程WaitAllThread(threads);                    // 等待所有副线程return 0;
}

结果

[!NOTE]

代码一定要多敲,才能明白里面的细节,加油👍👍👍

【Linux】互斥和同步—完结。下一章【Linux】线程池

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/479619.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

集合Queue、Deque、LinkedList、ArrayDeque、PriorityQueue详解

1、 Queue与Deque的区别 在研究java集合源码的时候&#xff0c;发现了一个很少用但是很有趣的点&#xff1a;Queue以及Deque&#xff1b; 平常在写leetcode经常用LinkedList向上转型Deque作为栈或者队列使用&#xff0c;但是一直都不知道Queue的作用&#xff0c;于是就直接官方…

亮相全国集群智能与协同控制大会,卓翼飞思无人智能科研方案成焦点

无人集群智能协同技术是人工智能发展的必然趋势&#xff0c;也是我国新一代人工智能的核心研究领域。为加强集群智能与协同控制需求牵引和对接、技术交流和互动&#xff0c;11月23-25日&#xff0c;由中国指挥与控制学会主办的第八届全国集群智能与协同控制大会在贵阳市隆重召开…

Oracle JDK(通常简称为 JDK)和 OpenJDK区别

Java 的开发和运行时环境主要由两种实现主导&#xff1a;Oracle JDK&#xff08;通常简称为 JDK&#xff09;和 OpenJDK。尽管它们都基于同一个代码库&#xff0c;但在一些关键点上有所区别。以下是详细的对比&#xff1a; 1. 基础代码 Oracle JDK&#xff1a; 基于 OpenJD…

损失函数分类

1. NLLLoss&#xff08;负对数似然损失&#xff09; 定义&#xff1a; 直接对预测的概率 p(yi) 的负对数求平均。通常配合 Softmax 使用&#xff0c;输入为对数概率。 优点&#xff1a; 对离散分类问题效果良好。更灵活&#xff0c;用户可以自行计算 Softmax。 缺点&#x…

vue3 数字滚动插件vue3-count-to

安装 npm i vue3-count-to -S 引入 import { CountTo } from vue3-count-to 使用 <countTo :startVal"0" :endVal"57.63" :decimals"0" :duration"3000"></countTo> 所有配置

CodeTON Round 9 (Div. 1 + Div. 2, Rated, Prizes!)(前五道)

A. Shohag Loves Mod 翻译&#xff1a; Shohag 有一个整数 n。请帮他找出一个递增整数序列 &#xff0c;使得 在所有 的对上都满足。 可以证明&#xff0c;在给定的约束条件下&#xff0c;这样的序列总是存在的。 思路&#xff1a; 每个数为下标i*2-1&#xff08;注意这里下…

数据结构之二:表

顺序表代码&#xff1a;SData/SqList/SeqList.h Hera_Yc/bit_C_学习 - 码云 - 开源中国 链表相关代码&#xff1a;SData/ListLink/main.c Hera_Yc/bit_C_学习 - 码云 - 开源中国 leetcode相关代码leetcode/reverse_Link/main.c Hera_Yc/bit_C_学习 - 码云 - 开源中国 本文…

Adaboost集成学习 | Python实现基于NuSVR-Adaboost多输入单输出回归预测

目录 效果一览基本介绍程序设计参考资料效果一览 基本介绍 基于NuSVR-Adaboost多输入单输出回归预测python代码 NuSVR是一种支持向量回归(SVR)算法的变体,用于解决回归问题。SVR是一种监督学习方法,它用于预测连续目标变量,而不是分类标签。NuSVR在SVR的基础上引入了一个…

Vue.js --- 生命周期

1. 前言 在 Vue.js 中&#xff0c;生命周期是指一个 Vue 实例从创建到销毁的过程。Vue 提供了一系列的生命周期钩子&#xff08;lifecycle hooks&#xff09;&#xff0c;让开发者可以在不同的阶段执行特定的代码。了解这些生命周期钩子是构建 Vue 组件的基础&#xff0c;能够…

排序算法之选择排序篇

思想&#xff1a; 每次从未排序的部分找出最小的元素&#xff0c;将其放到已排序部分的末尾 从数据结构中找到最小值&#xff0c;放到第一位&#xff0c;放到最前面&#xff0c;之后再从剩下的元素中找出第二小的值放到第二位&#xff0c;以此类推。 实现思路&#xff1a; 遍…

hive的cascade使用解释

最近看到涉及到hive表字段新增&#xff0c;项目组其他人员让我add columns后加 cascade&#xff0c;这个我以前见到过&#xff0c;但是我一般没有用&#xff0c;也没出问题&#xff0c;那就研究下。 网上大多数的说法就是分区表加字段需要级联&#xff0c;原因是&#xff0c;你…

聊聊Flink:这次把Flink的触发器(Trigger)、移除器(Evictor)讲透

一、触发器(Trigger) Trigger 决定了一个窗口&#xff08;由 window assigner 定义&#xff09;何时可以被 window function 处理。 每个 WindowAssigner 都有一个默认的 Trigger。 如果默认 trigger 无法满足你的需要&#xff0c;你可以在 trigger(…) 调用中指定自定义的 tr…

docker部署nginx,并配置SSL证书

、拉取nginx镜像 docker pull nginx:latest 在此过程中会遇到网络的问题&#xff0c;导致镜像无法下载&#xff0c;这时候需要在服务器中配置下国内的镜像地址。下面包含近期最新的国内镜像&#xff0c;截至2024年11月27日&#xff1a; "https://<你的阿里云账号ID&…

OceanBase 大数据量导入(obloader)

现需要将源数据库&#xff08;Oracle|MySQL等&#xff09;一些表的海量数据迁移到目标数据库 OceanBase 中&#xff0c;基于常规 jdbc 驱动编码的方式涉及开发工作&#xff0c;性能效率也要看编码的处理机制。 OceanBase 官方提供了的 OceanBase Migration Service (OMS) 数据…

【Spring MVC】如何获取cookie/session以及响应@RestController的理解,Header的设置

前言 &#x1f31f;&#x1f31f;本期讲解关于SpringMVC的编程之参数传递~~~ &#x1f308;感兴趣的小伙伴看一看小编主页&#xff1a;GGBondlctrl-CSDN博客 &#x1f525; 你的点赞就是小编不断更新的最大动力 &#x1f386;那么废…

【详细介绍及演示】Flink之checkpoint检查点的使用

目录 一、介绍 二、 设置checkpoint检查点演示 1、 代码演示 2、测试代码效果 3、查看快照情况 ​编辑 三、在集群上运行 1、第一次运行 2、第二次运行 四、自定义检查点savePoint 1、提交一个flink job 打成jar包 2、输入一些数据&#xff0c;观察单词对应的数字的…

JAVA篇05 —— 内部类(Local、Anonymous、Member、Static)

欢迎来到我的主页&#xff1a;【一只认真写代码的程序猿】 本篇文章收录于专栏【小小爪哇】 如果这篇文章对你有帮助&#xff0c;希望点赞收藏加关注啦~ 目录 1 内部类Inner Class 1.1 局部内部类 1.2 匿名内部类&#xff08;※※&#xff09; 1.3 匿名类最佳实践&#xf…

Spring Boot 与 Spring Cloud Alibaba 版本兼容对照

版本选择要点 Spring Boot 3.x 与 Spring Cloud Alibaba 2022.0.x Spring Boot 3.x 基于 Jakarta EE&#xff0c;javax.* 更换为 jakarta.*。 需要使用 Spring Cloud 2022.0.x 和 Spring Cloud Alibaba 2022.0.x。 Alibaba 2022.0.x 对 Spring Boot 3.x 的支持在其发行说明中…

jsp的pageContext对象

jsp的pageContext对象 是页面的上下文对象&#xff0c;表示当前页面运行环境&#xff0c;用于获取当前页面jsp页面信息&#xff0c;作用范围为当前的jsp页面 pageContext对象可以访问当前页面的所有jsp内置对象 jsp的四种内置对象 4中作用域&#xff1a;pagecontext,request…

网络安全在数字时代保护库存数据中的作用

如今&#xff0c;通过软件管理库存已成为一种标准做法。企业使用数字工具来跟踪库存水平、管理供应链和规划财务。 然而&#xff0c;技术的便利性也带来了网络威胁的风险。黑客将库存数据视为有价值的目标。保护这些数据不仅重要&#xff0c;而且必不可少。 了解网络安全及其…