上次讲解了:Linux:多线程(二.理解pthread_t、线程互斥与同步、基于阻塞队列的生产消费模型)
文章目录
- 1.POSIX信号量
- 1.1引入
- 1.2回顾加深理解信号量
- 1.3信号量的操作接口
- 2.基于循环队列的生产消费模型
- 2.1循环队列
- 2.2整个项目
- 3.线程池
- 可变参数的处理
- 项目内容
- 特点:
- 使用方式:
- 单例模式(线程安全的懒汉线程池)
- 4.STL、智能指针是否线程安全
- 5.其他常见的各种锁
- 自旋锁
- 读者写者问题
- 逻辑过程
- 接口介绍
1.POSIX信号量
1.1引入
上次我们使用了阻塞队列的生产消费模型,在先前的生产者-消费者模型代码中,当一个线程想要操作临界资源时,必须确保临界资源处于满足条件的状态才能进行修改;否则无法修改。例如,在
Enqueue
接口中,当队列已满时,临界资源处于条件不可用的状态,无法继续进行push
操作。此时,线程应该进入条件变量队列cond
中等待。如果队列未满,即临界资源条件已准备好,那么可以继续push
,调用队列_q
的push
接口。观察代码可以看到,在判断临界资源是否就绪之前,必须先获取锁,因为判断临界资源实质上就是对临界资源的访问,而访问临界资源自然需要加锁以保护。因此,代码通常会先获取锁,然后手动检查临界资源的就绪状态,根据状态判断是等待还是直接操作临界资源。
但是如果事先知道临界资源的状态是否就绪,则无需一上来就加锁。一旦提前知道临界资源的就绪状态,便不再需要手动检查资源状态。在这种情况下,若有一个计数器来表示临界资源中小块资源的数量(如队列中每个空间),线程在访问临界资源前会先请求该计数器。若计数器大于0,则表明队列中有空余位置,可以直接向队列
push
数据;若计数器等于0,则说明队列已满,不能继续push
数据,应该阻塞等待,直至计数器再次大于0,方可继续向队列push
数据。
void Enqueue(T &in) // 生产者用的接口{pthread_mutex_lock(&_mutex);while (IsFull()){// 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!// 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁_product_wait_num++;pthread_cond_wait(&_product_cond, &_mutex);_product_wait_num--;}// 进行生产_block_queue.push(in);// 通知消费者来消费if (_consum_wait_num > 0){pthread_cond_signal(&_consum_cond);}pthread_mutex_unlock(&_mutex); // 其实解锁和唤醒条件顺序无所谓,先唤醒后那边等着,解锁后直接竞争// 如果先解锁,后唤醒:先解锁没任何效果,因为都在wait那里等,一唤醒就直接得到锁}
1.2回顾加深理解信号量
信号量是一种用于进程间通信和同步的机制。它本质上是一个计数器,用于衡量系统中的资源可用数量。通过信号量,可以实现对临界资源的访问控制,确保多个进程或线程能够安全地共享资源而不发生冲突。
在访问临界资源之前,程序可以通过申请信号量来获取对资源的访问权限。如果信号量的值大于0,表示资源可用,程序可以继续访问资源;如果信号量的值等于0,表示资源已被占用,程序需要等待,直到资源可用为止。
信号量并不仅仅是简单的计数器,它是通过原子操作实现的,确保信号量的操作是线程安全的。常用的信号量操作包括P操作(等待操作)和V操作(释放操作),也称为PV操作。P操作会将信号量的值减1,用于占用资源;V操作会将信号量的值加1,用于释放资源。
通过合理地使用信号量和PV操作,可以实现多线程或多进程之间的同步和互斥,避免资源竞争和死锁等并发问题。信号量是操作系统中重要的同步工具,广泛应用于进程间通信、资源管理、线程同步等场景。
system信号量和POSIX信号量都是用于进程间通信和同步的机制,但它们之间存在一些区别。
- 系统信号量:
- 系统信号量是Linux中的一种系统调用,用于进程间通信和同步。
- 系统信号量是以系统级资源的形式存在,可以跨越进程边界,不仅可以用于线程之间的同步,也可以用于进程之间的同步。
- 系统信号量是一个全局的计数器,可以通过系统调用函数来创建、初始化、P操作(等待操作)和V操作(释放操作)等。
- 系统信号量的操作是通过系统调用函数来实现的,如semget、semop等。
- POSIX信号量:
- POSIX信号量是基于POSIX标准的一种同步机制
- POSIX信号量与系统信号量类似,但是在接口和使用上有些许差异。
- POSIX信号量允许用于进程间通信和线程间同步。
- POSIX信号量通过调用相关的POSIX函数来创建、初始化、等待和释放,如sem_open、sem_wait、sem_post等。
系统信号量是Linux系统提供的一种进程间通信和同步机制,而POSIX信号量是基于POSIX标准的一种同步机制,二者都可以实现进程或线程间的同步和互斥操作
1.3信号量的操作接口
初始化信号量:
使用sem_init
函数可以初始化信号量,给定的value
值会成为信号量的初始值。如果信号量是线程间共享的,可以被多个线程同时使用;如果是进程间共享的,可以被多个进程使用
#include <semaphore.h>//下面的函数都这此头文件int sem_init(sem_t *sem, int pshared, unsigned int value);
sem
: 指向要初始化的信号量的指针(我们使用sem_t 类型直接定义)pshared
: 0 表示该信号量为线程间共享;非零值表示信号量为进程间共享value
: 信号量的初始值
- 若成功,返回值为0,表示初始化信号量成功。
- 若出现错误,返回值为-1,表示初始化失败,并设置errno来指示具体错误。(下面都是一样的)
销毁信号量:
使用sem_destroy
函数可以销毁之前初始化的信号量。在销毁信号量之前,要确保所有线程或进程都已经停止使用该信号量。
int sem_trywait(sem_t *sem);
sem
: 要销毁的信号量的指针
等待信号量:(P操作- -)
使用sem_wait
函数可以等待信号量,即执行P操作。如果信号量的值大于0,则将其减1并立即返回,否则线程(或进程)会阻塞等待信号量变为大于0。
int sem_wait(sem_t *sem);
sem
: 要等待的信号量的指针
发布信号量:(V操作++)
使用sem_post
函数可以发布(释放)信号量,即执行V操作。对信号量执行V操作会将其值加1,并唤醒可能正在等待该信号量的线程(或进程)。
int sem_post(sem_t *sem);
sem
: 要发布的信号量的指针
2.基于循环队列的生产消费模型
2.1循环队列
之前在阻塞队列里,我们不能实现出队列与入队列的同时进行。现在因为是循环队列我们使用了两个索引,而两个索引不同时可以同时进行出和入
当为空时或者满时,二者只能有一个开始执行。然后就不再相等了,也是能分开进行了
#pragma once#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <pthread.h>template <typename T>
class RingQueue
{
public:RingQueue(int cap) : _ringqueue(cap - 1), _cap(cap), _productor_index(0), _consumer_index(0) // vector初始化大小为cap个0{sem_init(&_room_sem, 0, _cap); // 这个是生产者的(能用的空间),一开始大小是整个空间的sem_init(&_data_sem, 0, 0); // 这个是消费者的(能用的数据),一开始是0pthread_mutex_init(&_productor_mutex, nullptr);pthread_mutex_init(&_consumer_mutex, nullptr); // 锁的初始化}// P+V保证了消费与生产的互斥与同步// 加锁和解锁保证了之间的互斥// 我们采取先预定资源,再竞争锁void Enqueue(const T &in) // 入队列{P(_room_sem); // p操作--Lock(_productor_mutex);// 到这里就说明一定有空间_ringqueue[_productor_index++] = in;_productor_index %= _cap; // 保证循环Unlock(_productor_mutex);V(_data_sem); // data++}void Pop(T *out) // 出队列 输出型参数{// 消费行为P(_data_sem);Lock(_consumer_mutex);*out = _ringqueue[_consumer_index++];_consumer_index %= _cap;Unlock(_consumer_mutex);V(_room_sem);}~RingQueue(){sem_destroy(&_room_sem);sem_destroy(&_data_sem); // 处理信号量pthread_mutex_destroy(&_productor_mutex);pthread_mutex_destroy(&_consumer_mutex);}private:void P(sem_t &sem) // 预定空间{sem_wait(&sem);}void V(sem_t &sem) // 还东西{sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}private:std::vector<T> _ringqueue; // 底层是一个数组int _cap; // 容量上限int _productor_index;int _consumer_index; // 生产和消费的下标sem_t _room_sem; // 生产者关心sem_t _data_sem; // 消费者关心// 定义锁,维护多生产多消费之间的互斥关系pthread_mutex_t _productor_mutex;pthread_mutex_t _consumer_mutex;
};
2.2整个项目
- RingQueue.hpp:封装的循环队列
- Main.cc:程序的主体
- Thread.hpp:自己封装的Thread
- Task.hpp:任务类(这里只是一个function包装器)
Tash.hpp
#pragma once
#include <functional>
#include <iostream>using Task = std::function<void()>;void Test()
{std::cout << "This is the Test Funtion" << std::endl;
}
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{template <typename T>using func_t = std::function<void(T &, std::string name)>;// typedef std::function<void(const T&)> func_t;template <typename T>class Thread{public:void Excute(){_func(_data, _threadname);}public:Thread(func_t<T> func, T &data, const std::string &name = "none-name"): _func(func), _data(data), _threadname(name), _stop(true){}static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!{Thread<T> *self = static_cast<Thread<T> *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if (!n){_stop = false;return true;}else{return false;}}void Detach(){if (!_stop){pthread_detach(_tid);}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;T &_data; // 为了让所有的线程访问同一个全局变量func_t<T> _func;bool _stop;};
} // namespace ThreadModule#endif
Main.cc
#include "RingQueue.hpp"
#include "Thread.hpp"
#include "Task.hpp"
#include <string>
#include <vector>
#include <unistd.h>using namespace ThreadModule;
int a = 10;using ringqueue_t = RingQueue<Task>;void Consumer(ringqueue_t &rq, std::string name)
{while (true){Task t;rq.Pop(&t);std::cout << "Consumer :" << " NAME" << name << std::endl;t();sleep(2);}
}void Productor(ringqueue_t &rq, std::string name)
{int cnt = 1;srand(time(nullptr));while (true){rq.Enqueue(Test);std::cout << "Productor is : " << cnt << " NAME" << name << std::endl;// sleep(2);cnt++;}
}void InitComm(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq, func_t<ringqueue_t> func, std::string prename)
{for (int i = 0; i < num; i++){std::string name = prename + "thread-00" + std::to_string(i + 1);threads->emplace_back(func, rq, name);}
}void InitConsumer(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{InitComm(threads, num, rq, Consumer, "Cons ");
}void InitProductor(std::vector<Thread<ringqueue_t>> *threads, int num, ringqueue_t &rq)
{InitComm(threads, num, rq, Productor, "Prod ");
}void StartAll(std::vector<Thread<ringqueue_t>> &threads)
{for (auto &thread : threads){std::cout << "start: " << thread.name() << std::endl;thread.Start();}
}void WaitAllThread(std::vector<Thread<ringqueue_t>> &threads)
{for (auto &thread : threads){thread.Join();}
}int main()
{ringqueue_t *rq = new ringqueue_t(5);std::vector<Thread<ringqueue_t>> threads;InitProductor(&threads, 1, *rq);InitConsumer(&threads, 1, *rq);StartAll(threads);WaitAllThread(threads);return 0;
}
3.线程池
可变参数的处理
<stdarg.h>
头文件中定义了一些宏,用于处理 C 语言中的可变参数函数
#define va_start(ap, param) ap = (va_list)¶m#define va_arg(ap, type) (*(type*)(ap++))#define va_end(ap) ap = NULL
va_list
:va_list
是一个类型,它用来声明一个变量,这个变量将被用来依次访问可变参数列表中的参数。va_start
:va_start
宏用于初始化va_list
变量。它接受两个参数:第一个参数是一个va_list
类型的变量,用来指向参数列表;第二个参数是最后一个确定的参数的后一个参数,即可变参数列表中已知参数的后一个参数。这样就能让va_list
从可变参数列表的第一个参数开始遍历。va_arg
:va_arg
宏用于先返回参数的值,再访问va_list
中的下一个参数。它接受两个参数:第一个参数是va_list
类型的变量;第二个参数是要获取的参数的类型。va_arg
的作用是逐个遍历可变参数列表,返回对应类型的参数值,并将va_list
向后移动到下一个参数。va_end
:va_end
宏用于清理va_list
变量,释放资源。一般来说,va_end
应该与对应的va_start
成对出现,用来正确终止可变参数的处理。va_copy
:va_copy
宏用于将一个va_list
类型的变量的值复制给另一个va_list
类型的变量,以便在后续代码中再次访问相同的可变参数列表。va_copy
函数的原型类似于va_copy(va_list dest, va_list src)
,通过将源va_list
复制给目标va_list
,使得目标va_list
在后续代码中可以重新访问相同的可变参数列表。
void Test(int num, ...)
{va_list arg;va_start(arg, num);while (num){int data = va_arg(arg, int);std::cout << "data: " << data << std::endl;num--;}va_end(arg); // arg = NULL
}int main()
{Test(3, 11, 22, 33);return 0;
}
__VA_ARGS__
是 C/C++ 中的预定义宏,用于表示宏定义中的可变参数部分。在宏定义中,如果我们希望定义一个参数个数不确定的宏,就可以使用__VA_ARGS__
来代表可变参数的部分。使用方法
在宏定义中,
__VA_ARGS__
常用于定义具有可变参数的宏#define LOG(format, ...) printf(format, __VA_ARGS__)
在上面的示例中,
LOG
宏定义了一个可变参数的输出日志功能。format
是格式化字符串,__VA_ARGS__
表示可变参数部分,当宏被调用时,实际参数会替换__VA_ARGS__
部分。工作原理
- 当宏被调用时,
__VA_ARGS__
会被替换为实际参数列表。- 编译器会将实际参数列表直接展开到宏定义中,作为可变参数的位置。
- 这样,就可以实现宏的可变参数功能。
使用
##
连接format
和__VA_ARGS__
,以确保在__VA_ARGS__
为空时,不会产生额外的逗号(一般都会加上)()
项目内容
- Log.hpp:
- 定义了日志输出的相关功能,包括日志级别的枚举
Level
、输出日志到文件的函数、获取时间字符串、打印日志消息等。- 定义了宏
LOG
,用于方便打印日志信息。
- Main.cc:
- 主程序文件,包含了
main
函数,创建了一个线程池ThreadPool
实例,并向线程池添加任务。- 在添加任务的过程中会记录日志信息。
- ThreadPool.hpp:
- 实现了线程池的功能,包括任务队列管理、线程的启动和停止、任务处理等。
- 包括了线程池的初始化、启动、等待、添加任务、停止等操作。
Task.hpp:定义了任务类
Task
,包含了任务的执行、结果转换为字符串等功能。Thread.hpp:定义了线程类
Thread
,包含了线程的执行函数、启动、分离、等待、停止等功能。
整体流程:在主程序中创建线程池并添加任务,线程池中的线程会从任务队列中获取任务并执行,执行过程中会记录日志信息。日志功能会将信息输出到屏幕或者保存到文件中,日志级别由枚举 Level
定义。
Log.hpp
#pragma once#include <string>
#include <cstdio>
#include <time.h> //time函数和localtime函数
#include <iostream>
#include <sys/types.h>
#include <unistd.h> //getpid
#include <pthread.h>
#include <stdarg.h>
#include <fstream>enum Level
{DEBUG = 0,INFO,WARNING,ERROR,FATAL // 从上到下,程度依次增大
};bool isSave = false; // 用来判断日志信息是否需要保存到文件中
std::string file_name = "log.txt";std::string LevelToString(int level)
{switch (level){case DEBUG:return "Debug";case INFO:return "Info";case WARNING:return "Warning";case ERROR:return "Error";case FATAL:return "Fatal";default:return "Unknown";}
}std::string GetTimeString()
{time_t curr_time = time(nullptr);struct tm *format_time = localtime(&curr_time); // format:格式if (format_time == nullptr)return "None";char time_buffer[1024];snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", // snprintf 会确保在目标字符数组的末尾添加 null 结尾符 '\0',// 以确保生成的字符串是以 null 结尾的,保证了能当成char*format_time->tm_year + 1900,format_time->tm_mon + 1,format_time->tm_mday,format_time->tm_hour,format_time->tm_min,format_time->tm_sec); // 从上到下,年月日、时分秒return time_buffer; // 由于 std::string 类的构造函数支持接受以null结尾的C字符串指针作为参数(一般是直接char* 的不是char arr[])// 因此在返回时会隐式地将 time_buffer 转换为 std::string 对象
}pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void SaveToFile(std::string &filename, std::string &message)
{std::ofstream out(filename, std::ios_base::app); // 以追加方式打开if (!out.is_open()){return;}out << message;out.close();
}void LogMessage(std::string filename, int line, bool isSave, int level, const char *format, ...)
{std::string levelstr = LevelToString(level);std::string timestr = GetTimeString();pid_t log_id = getpid();va_list arg;va_start(arg, format);char buffer[1024];vsnprintf(buffer, sizeof(buffer), format, arg);va_end(arg); // 处理可变参数列表std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +"[" + std::to_string(log_id) + "]" +"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";pthread_mutex_lock(&mutex);if (!isSave) // 是false就向显示器上打印{std::cout << message << std::endl;}else // 向文件里保存{SaveToFile(file_name, message);}pthread_mutex_unlock(&mutex);
}#define LOG(level, format, ...) \do \{ \LogMessage(__FILE__, __LINE__, isSave, level, format, ##__VA_ARGS__); \} while (0)// 在调用LogMessage时,参数一定是具体的,就使用__VA_ARGS__
// \: 反斜杠符号用于将宏定义延续到下一行,使得宏定义可以跨多行书写#define EnableFile() \do \{ \gIsSave = true; \} while (0)
// 向文件输入
#define EnableScreen() \do \{ \gIsSave = false; \} while (0)// 向屏幕输入
time()
函数:time_t time(time_t *timer)
函数用于获取当前的系统时间(从1970年1月1日0时0分0秒至今的秒数),返回一个
time_t
类型的值。
- 如果参数
timer
不为NULL
,则函数也会将时间戳写入到timer
指向的变量中。- 可以使用
time(NULL)
来获取当前的时间戳。
localtime()
函数:struct tm *localtime(const time_t *timer)
函数将时间戳转换为本地时间,返回一个指向
struct tm
结构体的指针。
struct tm
结构体包含了年、月、日、时、分、秒等本地时间信息。localtime()
返回的是一个指向静态分配的结构体的指针tm
,因此在多线程环境下要小心使用。
vsnprintf
是 C 语言标准库函数中的一个函数,用于将格式化的字符串输出到缓冲区中,且不超过特定字符数。它与sprintf
函数相似,但vsnprintf
可以处理可变参数列表,因此适用于不确定参数个数的情况。函数原型
int vsnprintf(char *str, size_t size, const char *format, va_list ap);
str
:指向要输出的字符缓冲区的指针。size
:要输出的字符数的最大限制(包括终止的 null 字符\0
)。format
:格式化字符串,包含占位符%
和格式规格。ap
:va_list
类型的参数列表,用于提供格式化字符串中的替换值。返回值
- 如果成功:返回写入缓冲区的字符数(不包括终止的 null 字符
\0
)。- 如果缓冲区空间不足:返回应该写入的字符数(不包括终止的 null 字符
\0
),但不会写入缓冲区。注意事项
- 类似于
sprintf
,但能够处理可变参数列表。- 可以指定输出字符数的最大限制,避免缓冲区溢出。
- 输出的字符串会被自动截断,确保不会超出指定的大小。
- 返回值可以帮助检查输出是否成功。
Main.cc
#include "ThreadPool.hpp"
#include "Log.hpp"
#include <iostream>
#include "Task.hpp"
#include <memory> //智能指针的int main()
{EnableFile(); // 向文件里输入std::unique_ptr<ThreadPool<Task>> tp = std::make_unique<ThreadPool<Task>>(5); // C++14新特性tp->Init();tp->Start();int tasknum = 10;while (tasknum){int a = rand() % 10 + 1;usleep(1234);int b = rand() % 5 + 1;Task t(a, b);LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());tp->Enqueue(t);sleep(1);tasknum--;}tp->Stop();tp->Wait();return 0;
}
C++14 标准中引入了
std::make_unique
函数,用于动态分配一个类型的对象,并返回一个std::unique_ptr
智能指针来管理这个对象的生命周期。以下是对std::make_unique
的详细讲解:特点:
std::make_unique
通过返回一个std::unique_ptr
来管理动态分配的对象,保证对象的所有权独立且唯一。std::make_unique
会在动态分配内存成功后,立即初始化对象并返回对其的std::unique_ptr
,确保异常安全性。std::make_unique
创建的对象绑定到智能指针中,避免出现内存泄漏或忘记释放内存等问题。使用方式:
#include <memory> std::unique_ptr<Type> ptr = std::make_unique<Type>(constructor parameters);
std::unique_ptr<Type>
:std::unique_ptr
是 C++ 中智能指针的一种,用于管理动态分配的对象。<Type>
表示该std::unique_ptr
指向的对象类型是Type
。这个智能指针将独占地拥有所指向的对象,保证资源在适当时候被释放。std::make_unique<Type>(constructor parameters)
:
std::make_unique
是一个 C++14 新引入的函数模板,用于动态分配内存并初始化对象。<Type>
表示需要创建的对象类型是Type
。- 在括号中的
constructor parameters
是传递给Type
类型对象构造函数的参数。std::make_unique
会在内存分配成功后立即初始化对象,并返回一个指向该对象的std::unique_ptr
,确保异常安全性和避免内存泄漏。
ThreadPool.hpp
// 我们这个线程库是一开始就有固定数量的线程,当来任务时就交给线程来执行
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Thread.hpp"
#include "Log.hpp"
#include "Task.hpp"using namespace ThreadModule;int defaultthreadnum = 5;template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeupAll(){pthread_cond_broadcast(&_cond);}public:void HandlerTask(std::string name) // 类的成员方法设计为另一个类的回调方法,这里处理任务{LOG(INFO, "%s is running...", name.c_str());while (true){LockQueue();while (_task_queue.empty() && _isrunning){_waitnum++; // 每次进来就说明要有线程等了ThreadSleep();_waitnum--;}// 到这里就说明有任务了// 如果线程池已经退出了 && 任务队列是空的if (_task_queue.empty() && !_isrunning){UnlockQueue();break;}// 如果线程池不退出 && 任务队列不是空的// 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出T t = _task_queue.front();_task_queue.pop(); // 已经吧任务拿出来了,在线程里LOG(DEBUG, "%s get a task", name.c_str());UnlockQueue();t(); // 进行处理任务,在锁外就行。我们在Task类里,已经重载了()了LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());}}ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);LOG(INFO, "ThreadPool Construct()"); // 可变参数列表为空}void Init(){for (int i = 0; i < _threadnum; i++){std::string name = "thread-00" + std::to_string(i + 1);//_threads.emplace_back(test, name); // 问题,参数多个this指针,与fun_t 不符合,可以加static_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name); // 使用这样解决,还能穿一个对象过去LOG(INFO, "ThreadPool Init %s ", name.c_str());}_isrunning = true;}void Start(){for (auto &e : _threads){e.Start();} // 不能在start这后面进行isruning的更改,因为,在未更改前,新线程可能已经运行完Task函数,直接退出了// 在让线程跑之前,初始化就要做好}void Wait(){for (auto &e : _threads){e.Join();LOG(INFO, "%s is quit...", e.name().c_str());}}bool Enqueue(const T &t){bool ret = false;LockQueue();if (_isrunning){_task_queue.push(t);if (_waitnum > 0){ThreadWakeup();}LOG(DEBUG, "enqueue task success");ret = true;}UnlockQueue();return ret;}void Stop(){LockQueue();_isrunning = false;ThreadWakeupAll();UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _threadnum;std::vector<Thread> _threads; // 一个存的都是线程的vectorstd::queue<T> _task_queue; // 任务队列pthread_mutex_t _mutex; // 锁pthread_cond_t _cond; // 条件变量int _waitnum; // 等待的线程数量bool _isrunning; // 该
};
- 成员函数和私有函数:
LockQueue()
和UnlockQueue()
:用于对任务队列进行加锁和解锁操作。ThreadSleep()
、ThreadWakeup()
和ThreadWakeupAll()
:用于线程等待和唤醒的操作。HandlerTask(std::string name)
:任务处理函数,线程从任务队列中取出任务并执行。ThreadPool(int threadnum = defaultthreadnum)
:构造函数,初始化线程池。Init()
:初始化线程池,创建固定数量的线程并设置为可运行状态。Start()
:启动线程池中的所有线程。Wait()
:等待所有线程执行完毕。Enqueue(const T &t)
:向任务队列中添加任务。Stop()
:停止线程池中的所有线程。~ThreadPool()
:析构函数,销毁线程池对象,释放资源。
- 使用方式:
- 创建
ThreadPool
对象后,通过Init()
初始化线程池,然后调用Start()
启动线程池中的线程。- 使用
Enqueue()
往线程池中添加任务,任务将会被线程取出执行。- 调用
Stop()
停止线程池中的所有线程,最后在析构函数中释放资源。
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
对于Thread的构造函数要一个fun_t的函数包装器,一个string,如果直接传HandlerTask,有一个this指针,我们使用
bind
把一个this对象绑定为第一个参数就行了(这是让另一个类运行本类成员函数的一个方法)
Thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>namespace ThreadModule
{using func_t = std::function<void(std::string)>;class Thread{public:void Excute(){_func(_threadname);}public:Thread(func_t func, const std::string &name = "none-name"): _func(func), _threadname(name), _stop(true){}static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!{Thread *self = static_cast<Thread *>(args);self->Excute();return nullptr;}bool Start(){int n = pthread_create(&_tid, nullptr, threadroutine, this);if (!n){_stop = false;return true;}else{return false;}}void Detach(){if (!_stop){pthread_detach(_tid);}}void Join(){if (!_stop){pthread_join(_tid, nullptr);}}std::string name(){return _threadname;}void Stop(){_stop = true;}~Thread() {}private:pthread_t _tid;std::string _threadname;func_t _func;bool _stop;};
} // namespace ThreadModule#endif
Task.hpp
#pragma once#include <iostream>
#include <string>
#include <functional>class Task
{
public:Task() {}Task(int a, int b) : _a(a), _b(b), _result(0){}void Excute(){_result = _a + _b;}std::string ResultToString(){return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);}std::string DebugToString(){return std::to_string(_a) + "+" + std::to_string(_b) + "=?";}void operator()(){Excute();}private:int _a;int _b;int _result;
};
单例模式(线程安全的懒汉线程池)
// 我们这个线程库是一开始就有固定数量的线程,当来任务时就交给线程来执行
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <pthread.h>
#include "Thread.hpp"
#include "Log.hpp"
#include "Task.hpp"using namespace ThreadModule;int defaultthreadnum = 5;template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void ThreadSleep(){pthread_cond_wait(&_cond, &_mutex);}void ThreadWakeup(){pthread_cond_signal(&_cond);}void ThreadWakeupAll(){pthread_cond_broadcast(&_cond);}void HandlerTask(std::string name) // 类的成员方法设计为另一个类的回调方法,这里处理任务{LOG(INFO, "%s is running...", name.c_str());while (true){LockQueue();while (_task_queue.empty() && _isrunning){_waitnum++; // 每次进来就说明要有线程等了ThreadSleep();_waitnum--;}// 到这里就说明有任务了// 如果线程池已经退出了 && 任务队列是空的if (_task_queue.empty() && !_isrunning){UnlockQueue();break;}// 如果线程池不退出 && 任务队列不是空的// 如果线程池已经退出 && 任务队列不是空的 --- 处理完所有的任务,然后在退出T t = _task_queue.front();_task_queue.pop(); // 已经吧任务拿出来了,在线程里LOG(DEBUG, "%s get a task", name.c_str());UnlockQueue();t(); // 进行处理任务,在锁外就行。我们在Task类里,已经重载了()了LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());}}// 构造函数是要私有的,让唯一的那个static成员变量来用ThreadPool(int threadnum = defaultthreadnum) : _threadnum(threadnum), _waitnum(0), _isrunning(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);LOG(INFO, "ThreadPool Construct()"); // 可变参数列表为空}void Init(){for (int i = 0; i < _threadnum; i++){std::string name = "thread-00" + std::to_string(i + 1);//_threads.emplace_back(test, name); // 问题,参数多个this指针,与fun_t 不符合,可以加static_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name); // 使用这样解决,还能穿一个对象过去LOG(INFO, "ThreadPool Init %s ", name.c_str());}_isrunning = true;}void Start(){for (auto &e : _threads){e.Start();} // 不能在start这后面进行isruning的更改,因为,在未更改前,新线程可能已经运行完Task函数,直接退出了// 在让线程跑之前,初始化就要做好}// 把赋值重载与拷贝构造删掉ThreadPool(const ThreadPool<T> &) = delete;ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;public:static ThreadPool<T> *getInstance(){if (_instance == nullptr) // 在最外面判断:多个线程都能进去,一旦有一个竞争到了锁,不为空了。后面的线程之间不用进去了{pthread_mutex_lock(&_lock);if (_instance == nullptr){_instance = new ThreadPool<T>();_instance->Init();_instance->Start(); // 线程池的初始化与启动LOG(DEBUG, "创建线程池单例");pthread_mutex_unlock(&_lock);return _instance;}}else{LOG(DEBUG, "获取线程池单例");return _instance;}}void Wait(){for (auto &e : _threads){e.Join();LOG(INFO, "%s is quit...", e.name().c_str());}}bool Enqueue(const T &t){bool ret = false;LockQueue();if (_isrunning){_task_queue.push(t);if (_waitnum > 0){ThreadWakeup();}LOG(DEBUG, "enqueue task success");ret = true;}UnlockQueue();return ret;}void Stop(){LockQueue();_isrunning = false;ThreadWakeupAll();UnlockQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _threadnum;std::vector<Thread> _threads; // 一个存的都是线程的vectorstd::queue<T> _task_queue; // 任务队列pthread_mutex_t _mutex; // 锁pthread_cond_t _cond; // 条件变量int _waitnum; // 等待的线程数量bool _isrunning; // 该static ThreadPool<T> *_instance;static pthread_mutex_t _lock;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::_instance = nullptr;template <typename T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER; // 类内定义,类外初始化
我们首先把一些类内使用的接口进行私有化
构造函数私有化,删除拷贝构造和赋值重载
在C++中,声明操作符重载函数时,可以省略参数的名字,只提供参数的类型。这是因为在操作符重载函数的声明中,参数的名字并不重要,重要的是参数的类型和数量以及函数的返回类型。为了简洁起见,有时候会省略参数的名字
定义出一个静态的该类的成员变量,和静态锁
要有一个初始化成员变量的函数,在这里是
getInstan()
在getInstan()函数里面我们使用了:双重检查锁定(Double-Checked Locking)机制
目的是在多线程环境下提高性能和减少竞争。这种机制的目标是尽量减少加锁的开销,只有在必要的时候才对共享资源加锁。
首先,通过
if (_instance == nullptr)
的外层判断,可以减少不必要的锁竞争。如果_instance
不为空,那么直接返回已经存在的实例,避免了不必要的加锁和解锁操作。在外层判断通过后,通过加锁的方式进入临界区,再次检查
_instance == nullptr
,是为了防止多个线程同时通过外层判断后,在竞争锁的过程中,其中一个线程创建了实例,后面的线程则不应该再创建实例。因此,内层的检查是为了保证并发情况下只有一个线程能创建实例。
4.STL、智能指针是否线程安全
STL(标准模板库)中的容器本身并不是线程安全的。STL的设计初衷是为了提供高性能和通用性,因此并没有在设计中添加线程安全的机制。在多线程环境下,如果多个线程并发地访问和修改同一个STL容器,可能会引发数据竞争和未定义的行为。
由于加锁机制会对性能造成影响,STL的设计者选择了不提供内置的线程安全机制。因此,如果需要在多线程环境下使用STL容器,调用者需要自行确保线程安全。这通常需要使用互斥锁或其他同步机制来保护对容器的访问,以避免竞态条件和数据竞争。
不同容器的线程安全性实现方式可能有所不同,例如哈希表可能采用锁分区(锁表)和锁桶(锁链)等方式来实现线程安全。因此,调用者在使用STL容器时需要注意不同容器的线程安全性差异,以及需要使用何种同步机制来确保线程安全。
智能指针在多线程环境下的线程安全性取决于具体类型。
-
unique_ptr:由于unique_ptr的特性是独占所有权,只能在一个地方拥有指针,因此在其生命周期内不会涉及线程安全问题。在单线程或者多线程环境下,unique_ptr都不需要额外的同步机制来确保线程安全。
-
shared_ptr:shared_ptr允许多个智能指针共享同一个对象,并使用引用计数技术来管理对象的生命周期。在多线程环境下,如果多个线程同时对shared_ptr进行拷贝或销毁操作,就会涉及到引用计数的增加和减少,从而可能导致线程安全问题。然而,标准库的实现通常会使用原子操作(比如CAS)来保证对引用计数的操作是线程安全的,确保shared_ptr在多线程环境下能够正常高效地工作。
原子操作之所以能够保证线程安全,主要是因为它们提供了操作的原子性和可靠性:
- 原子性:原子操作是不可分割的操作,要么完全执行,要么完全不执行,不会被中断。这样可以确保多线程环境下对共享变量的操作是原子的,避免了数据不一致的问题。
- 可靠性:原子操作的执行结果对其他线程是立即可见的,其他线程能够立即看到更新后的值,避免了缓存不一致导致的问题。
5.其他常见的各种锁
悲观锁和乐观锁是两种并发控制的策略,而自旋锁、公平锁和非公平锁则属于具体实现并发控制的方式。
-
悲观锁(Pessimistic Locking):
- 在每次对共享资源进行操作时都持有锁,认为其他线程会修改数据,因此在操作之前先加锁。
- 主要用于保证并发环境下数据的一致性和可靠性。
- 常见的悲观锁实现包括:互斥锁、读写锁等。
-
乐观锁(Optimistic Locking):
-
在操作共享资源时假设并发冲突的概率不高,因此不立即加锁,而是在更新时检查是否有其他线程修改过数据。
-
乐观锁通常会使用版本号机制或CAS操作(Compare and Swap)来确保数据的一致性。\
- CAS是一种乐观锁的实现方式,在更新数据时,会比较当前内存值和之前读取的值是否相等,如果相等说明数据未被修改,就可以进行更新操作,否则会失败。
- CAS是一种原子操作,通常是一个自旋过程,即不断重试直到CAS成功或者达到重试次数。
-
乐观锁避免了频繁加锁解锁的开销,适合读多写少的场景。
-
-
自旋锁(Spin Lock):
- 自旋锁是一种基于忙等待的锁,当线程尝试获取锁时如果锁已经被其他线程占用了,该线程会处于忙等待状态,直到锁被释放。
- 自旋锁适用于短暂持有锁的情况,长时间持有锁会造成CPU资源的浪费。
-
公平锁与非公平锁:
- 公平锁指的是对锁的获取按照请求的顺序进行,保证每个线程都有机会获取锁,即先到先得。
- 非公平锁则允许锁的获取不按照请求顺序,有可能后到的线程会在先前请求而未获得锁的线程之前获取锁。
- 非公平锁可以提高整体吞吐量,但可能导致优先级反转等问题。
自旋锁
自旋锁是一种基于忙等待的锁,当一个线程尝试获取自旋锁时,如果锁已经被其他线程占用,该线程会进行自旋操作,即不断检查锁的状态是否被释放,而不是立即被挂起等待。这种方式可以减少线程上下文切换的性能开销,适用于临界区内操作时间短暂的情况。
如何衡量临界区内操作时间:
- 统计分析:通过在临界区内添加时间戳或者计时器,可以统计每个线程在临界区内的实际操作时间。这样可以得出平均操作时间、最大操作时间等数据。
- 经验估计:根据对应用程序的了解和经验,估计临界区内操作的典型执行时间。这种方法可能不够精确,但可以作为初步评估。
- 实际观察:观察程序的实际运行情况,包括临界区内操作的执行时间和频率。根据观察结果来评估操作的时间。
还是看我们的经验来选择合适,恰当的锁
- 初始化自旋锁:
void spin_lock_init(spinlock_t *lock)
这个函数用于初始化一个自旋锁,通常在使用自旋锁之前调用。lock
为指向自旋锁变量的指针。
- 获得自旋锁:
void spin_lock(spinlock_t *lock);
当一个线程想要进入临界区时,它会调用这个函数来获取自旋锁。如果自旋锁已经被其他线程占用,当前线程会尝试不断地自旋等待,直到获取到锁。lock
为指向自旋锁变量的指针。
- 释放自旋锁:
void spin_unlock(spinlock_t *lock);
当线程执行完临界区内的操作后,需要调用这个函数来释放自旋锁,使得其他线程可以获取到锁。lock
为指向自旋锁变量的指针。
- 销毁自旋锁:
void spin_lock_destroy(spinlock_t *lock);
当自旋锁不再需要时,可以调用这个函数来销毁自旋锁以释放相关资源。lock
为指向自旋锁变量的指针。
读者写者问题
在多线程编程中,有时候会遇到一种常见的情况,即某些共享数据的修改操作相对较少,而读取操作却非常频繁,且读取操作中可能会伴随着耗时较长的查找操作。在这种情况下,如果对整个数据结构进行加锁,那么即使是读取操作也需要等待锁的释放,这会导致程序效率降低。
为了解决这种情况,可以使用读写锁。读写锁允许多个线程同时获取读锁,只有在获取写锁时才会阻塞其他线程。这样一来,在多读少写的情况下,多个线程可以同时获得读锁,从而提高了程序的并发性能,避免了不必要的阻塞。
总结一下,读写锁适用于多读少写的场景,可以通过允许多个线程同时获取读锁来提高程序的并发性能,避免不必要的阻塞,从而提高了程序的效率。
读者写者模型是用于描述多线程对共享数据进行读写操作时的一种经典并发模型。在读者写者模型中,有两类线程:读者和写者。读者线程只对共享数据进行读操作,而写者线程则对共享数据进行写操作。读者在读操作时不会互斥,多个读者可以同时访问共享数据(不会对数据进行修改),但写者在写操作时需要互斥,同时只允许一个写者访问共享数据且不允许其他任何读者或写者访问。
读者写者模型的目标是实现对共享数据的高效访问,保证数据的一致性和并发性。为了实现这一目标,通常会使用锁和条件变量等同步机制来控制读者和写者线程的访问。
1个交易场所
2个角色:读者与写者
3种关系:写者之间的互斥、读者之间没有关系、读者与写者之间的互斥与同步
读者和写者之间保持互斥与同步意味着在读者写者模型中,确保读者和写者之间的操作互斥(不能同时访问共享数据)并且同步(按照一定规则进行访问)。具体来说:
互斥(Mutual Exclusion):读者写者模型要求在写者对共享数据进行操作时,必须排他性地拥有对该数据的访问权,即其他任何读者或写者都不可以同时访问共享数据。这样做是为了避免数据一致性问题和争用条件(Race Condition)的发生,确保在写操作时数据不会同时被其他线程读或写。
同步(Synchronization):读者写者模型还要求在读者和写者之间进行协调,保证数据的访问顺序和一致性。通常情况下,写者优先的规则要求在写者请求访问共享数据时,必须等待所有正在读取数据的读者完成操作后才能进行写入;而在有写者等待访问共享数据时,所有新的读者请求必须等待,直到写者完成操作。这种同步行为保证了数据的一致性和安全性。
逻辑过程
int reader_count = 0;
pthread_mutex_t wlock;
pthread_mutex_t rlock;// 读者线程
void reader() {lock(&rlock); // 获取读者锁if (reader_count == 0) {lock(&wlock); // 如果当前没有读者,则获取写者锁}++reader_count; // 增加读者计数unlock(&rlock); // 释放读者锁// 这里进行读取操作lock(&rlock); // 重新获取读者锁--reader_count; // 减少读者计数if (reader_count == 0) {unlock(&wlock); // 如果已经没有读者,释放写者锁}unlock(&rlock); // 释放读者锁
}// 写者线程
void writer() {lock(&wlock); // 获取写者锁// 这里进行写入操作unlock(&wlock); // 释放写者锁
}
在上述伪代码中,我们模拟了读者写者模型的加锁逻辑,主要包括了对读者和写者线程进行互斥和同步控制。下面我们简要解释一下这段伪代码的逻辑:
reader_count
表示当前正在读取数据的读者数量。pthread_mutex_t wlock
和pthread_mutex_t rlock
分别表示写者锁和读者锁,用于读者写者线程的互斥操作。
对于读者线程:
-
首先获取读者锁
rlock
,确保读者线程之间的互斥。 -
如果当前没有其他读者在读取数据,则获取写者锁
wlock
,确保写者无法进入。- 申请成功:就接着行下进行
- 申请失败:说明写者正在写,那就阻塞等着
-
增加
reader_count
计数器,表明有一个读者正在读取数据。 -
释放读者锁,允许其他读者进入读取数据。
-
进行读取操作。
当没有读者在读时,我们就会释放写者锁
对于写者线程:
- 获取写者锁
wlock
,确保写者线程独占对共享数据的访问。 - 进行写操作。
- 释放写者锁,允许其他写者或读者访问数据。
接口介绍
-
pthread_rwlock_init
:初始化读写锁。函数原型为
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr)
,该函数用于初始化一个读写锁对象rwlock
,可以指定属性attr
,一般情况下可以传入NULL
使用默认属性。- 参数:
rwlock
:指向读写锁对象的指针。attr
:读写锁的属性对象指针,可以为NULL
,表示使用默认属性。
- 返回值:如果函数调用成功,返回值为 0;否则返回一个非零的错误码。
- 说明:该函数用于初始化一个读写锁对象,可以指定一些属性,如锁的类型、优先级规则等。
- 参数:
-
pthread_rwlock_destroy
:销毁读写锁。函数原型为
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock)
,用于销毁已经初始化的读写锁对象rwlock
。销毁读写锁后,该读写锁对象不可再使用,需要重新进行初始化。- 参数:
rwlock
:指向读写锁对象的指针。
- 返回值:如果函数调用成功,返回值为 0;否则返回一个非零的错误码。
- 说明:该函数用于销毁已经初始化的读写锁对象,释放相关资源。
- 参数:
-
pthread_rwlock_rdlock
:获取读锁。函数原型为
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock)
,该函数用于获取读锁,即允许多个线程同时获取读取权限,但在写锁被获取时将会阻塞。当读线程数较多时,考虑性能可以使用读锁。- 参数:
rwlock
:指向读写锁对象的指针。
- 返回值:如果函数调用成功,返回值为 0;否则返回一个非零的错误码。
- 说明:该函数用于获取读锁,允许多个线程同时获取读取权限,但在写锁被获取时将会阻塞。
- 参数:
-
pthread_rwlock_wrlock
:获取写锁。函数原型为
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock)
,该函数用于获取写锁,即独占地写入数据。一旦有线程获取了写锁,其他线程无法获取读锁或写锁,只能等待写锁的释放。- 参数:
rwlock
:指向读写锁对象的指针。
- 返回值:如果函数调用成功,返回值为 0;否则返回一个非零的错误码。
- 说明:该函数用于获取写锁,独占地写入数据。一旦有线程获取了写锁,其他线程无法获取读锁或写锁,只能等待写锁的释放。
- 参数:
-
pthread_rwlock_unlock
:释放锁。函数原型为
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock)
,用于释放读锁或写锁,让其他线程可以获取读写锁。- 参数:
rwlock
:指向读写锁对象的指针。
- 返回值:如果函数调用成功,返回值为 0;否则返回一个非零的错误码。
- 说明:该函数用于释放读锁或写锁,让其他线程可以获取读写锁,从而读取或写入共享数据。
- 参数:
我们对于读者里面的加锁就直接使用pthread_rwlock_rdlock,相当于上面的全部过程了
同理:对于写者里面的加锁就直接使用pthread_rwlock_wrlock