一. 线程互斥
1.1 线程互斥相关概念
- 临界资源:多线程执行流共享的资源就叫做临界资源。
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
- 原子性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
1.2 多线程的互斥问题
大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。多个线程并发的操作共享变量,会带来一些问题。
#include <cstdio>
#include <cstdint>
#include <pthread.h>
#include <unistd.h>// 共享资源
int tickets = 10;// 线程函数
void* sell_ticket(void* arg) {int sold = 0;while (true) {// 模拟售票过程if (tickets > 0) {printf("Thread %ld sold ticket %d, remaining tickets: %d\n", (long)arg, sold, tickets);tickets--;sold++;usleep(1000);}}pthread_exit(NULL);
}int main() {pthread_t thread[5];// 创建售票线程for(uint64_t i = 0; i < 5; i++){pthread_create(&thread[i], NULL, sell_ticket, (void *)i);}// 等待线程结束for(uint64_t i = 0; i < 5; i++){pthread_join(thread[i], NULL);}printf("All tickets sold out.\n");return 0;
}
为什么可能无法获得正确结果?
- –ticket操作本身就不是一个原子操作
- if语句判断条件为真以后,代码可以并发的切换到其他线程
要解决该问题, 需要做到以下三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
本质上就是需要一把锁, 而Linux上提供的是互斥量!
1.3 互斥量
1.3.1 初始化互斥量
静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
参数说明:
-
pthread_mutex_t *restrict mutex:指向要初始化的互斥锁的指针。
-
const pthread_mutexattr_t *restrict attr:指向互斥锁属性对象的指针。这个属性对象可以用来设置互斥锁的一些特殊属性,如类型、协议和优先级继承等。如果不需要特殊的属性设置,可以传入 NULL,使用默认属性。
返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:attr 指定的属性无效。
ENOMEM:内存不足,无法初始化互斥锁。
1.3.2 销毁互斥量
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- 参数说明:
- pthread_mutex_t *mutex:指向要销毁的互斥锁的指针。
- 返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的互斥锁无效。
EBUSY:互斥锁当前被锁定,无法销毁。
1.3.3 互斥量加锁和解锁
加锁:
int pthread_mutex_lock(pthread_mutex_t *mutex);
- 参数说明:
- pthread_mutex_t *mutex:指向要锁定的互斥锁的指针。
- 返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的互斥锁无效。
EDEADLK:当前线程已经锁定了该互斥锁,导致死锁。
解锁:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
- 参数说明:
- pthread_mutex_t *mutex:指向要解锁的互斥锁的指针。
- 返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的互斥锁无效。
EPERM:当前线程没有锁定该互斥锁。
1.3.4 线程安全的售票系统
#include <cstdio>
#include <cstdint>
#include <pthread.h>
#include <unistd.h>// 共享资源
int tickets = 10;
pthread_mutex_t mutex;// 线程函数
void* sell_ticket(void* arg) {int sold = 0;while (true) {pthread_mutex_lock(&mutex); // 锁定互斥锁if (tickets > 0) {tickets--;sold++;printf("Thread %ld sold ticket %d, remaining tickets: %d\n", (long)arg, sold, tickets);} else {pthread_mutex_unlock(&mutex); // 解锁互斥锁break;}pthread_mutex_unlock(&mutex); // 解锁互斥锁usleep(1000); // 模拟耗时操作}pthread_exit(NULL);
}int main() {pthread_t thread[5];// 初始化互斥锁pthread_mutex_init(&mutex, NULL);// 创建售票线程for(uint64_t i = 0; i < 5; i++) {pthread_create(&thread[i], NULL, sell_ticket, (void *)i);}// 等待线程结束for(uint64_t i = 0; i < 5; i++) {pthread_join(thread[i], NULL);}// 销毁互斥锁pthread_mutex_destroy(&mutex);printf("All tickets sold out.\n");return 0;
}
1.3.4 互斥量的实现原理
为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令, 该指令的作用是把寄存器和内存单元的数据相交换, 由于只有一条指令, 保证了原子性, 即使是多处理器平台, 访问内存的总线周期也有先后, 一个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。
1.4 线程安全和可重入函数
-
线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
-
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
1.4.1 可重入与线程安全的联系
- 函数是可重入的,那就是线程安全的。
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
1.4.2 可重入与线程安全的区别
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
二. 线程同步
2.1 线程同步相关概念
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题。
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。
2.2 条件变量
2.2.1 初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
- 参数说明:
- pthread_cond_t *restrict cond:指向要初始化的条件变量的指针。restrict 关键字表示该指针是唯一的,不会与其他指针别名绑定,这有助于编译器优化。
- const pthread_condattr_t *restrict attr:指向条件变量属性对象的指针。这个属性对象可以用来设置条件变量的一些特殊属性,如时钟选择等。如果不需要特殊的属性设置,可以传入 NULL,使用默认属性。
- 返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的条件变量或属性无效。
ENOMEM:内存不足,无法初始化条件变量。
2.2.2 销毁条件变量
int pthread_condattr_destroy(pthread_condattr_t *attr);
- 参数说明:
- pthread_condattr_t *attr:指向要销毁的条件变量属性对象的指针。
- 返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的属性对象无效。
2.2.3 等待条件变量
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
- 参数说明
- pthread_cond_t *restrict cond:指向要等待的条件变量的指针。restrict 关键字表示该指针是唯一的,不会与其他指针别名绑定,这有助于编译器优化。
- pthread_mutex_t *restrict mutex:指向与条件变量关联的互斥锁的指针。这个互斥锁必须在调用 pthread_cond_wait() 之前已经被当前线程锁定。
- 返回值
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的条件变量或互斥锁无效。
EPERM:当前线程没有锁定指定的互斥锁。
2.2.4 唤醒条件变量
单个唤醒:
int pthread_cond_signal(pthread_cond_t *cond);
- 参数说明:
- pthread_cond_t *cond:指向要发送信号的条件变量的指针。
- 返回值:
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的条件变量无效。
广播唤醒:
int pthread_cond_broadcast(pthread_cond_t *cond);
- 参数说明:
- pthread_cond_t *cond:指向要发送广播信号的条件变量的指针。
- 返回值
- 0:表示成功。
- 非零值:表示失败。常见的错误码包括:
EINVAL:指定的条件变量无效。
三. 生产者消费者模型
3.1 生产者消费者模型的概念
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
3.2 生产者消费者模型优点
- 解耦生产者和消费者
- 支持并发运行
- 支持忙闲不均
3.3 基于BlockingQueue的生产者消费者模型
- 在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构:
- 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素。
- 当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
#include <iostream>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <cstdlib>
#include <ctime>const int producer_size = 2;
const int consume_size = 2;
const int queue_size = 8;
const int task_size = 6;
const int push_rand_time = 100000;
const int push_stand_time = 100000;
const int pop_rand_time = 100000;
const int pop_stand_time = 100000;class BlockQueue {
private:std::vector<int> queue;int capacity;pthread_mutex_t mutex;pthread_cond_t not_empty;pthread_cond_t not_full;public:BlockQueue(int cap) : capacity(cap), queue(0) {pthread_mutex_init(&mutex, NULL);pthread_cond_init(¬_empty, NULL);pthread_cond_init(¬_full, NULL);}~BlockQueue() {pthread_mutex_destroy(&mutex);pthread_cond_destroy(¬_empty);pthread_cond_destroy(¬_full);}void push(int item) {pthread_mutex_lock(&mutex);while (queue.size() == capacity) {pthread_cond_wait(¬_full, &mutex);}queue.push_back(item);std::cout << "Producer " << pthread_self() % INT16_MAX << " push data: " << item << std::endl;usleep(rand() % push_rand_time + push_stand_time); // 模拟随机耗时操作if (queue.size() == capacity) {pthread_cond_broadcast(¬_empty); // 队列满时,唤醒所有消费者}else{pthread_cond_signal(¬_empty); // 只有一个元素时,唤醒一个消费者}pthread_mutex_unlock(&mutex);}int pop() {pthread_mutex_lock(&mutex);while (queue.empty()) {pthread_cond_wait(¬_empty, &mutex);}int item = queue.front();queue.erase(queue.begin());std::cout << "Consumer " << pthread_self() % INT16_MAX << " get data: " << item << std::endl;usleep(rand() % pop_rand_time + pop_stand_time); // 模拟随机耗时操作if (queue.size() == 0) {pthread_cond_broadcast(¬_full); // 队列空时,唤醒所有生产者}else {pthread_cond_signal(¬_full); // 队列接近满时,唤醒一个生产者}pthread_mutex_unlock(&mutex);return item;}
};void* producer(void* arg) {BlockQueue* queue = static_cast<BlockQueue*>(arg);for (int i = 0; i < task_size; ++i) {queue->push(i);}pthread_exit(NULL);
}void* consumer(void* arg) {BlockQueue* queue = static_cast<BlockQueue*>(arg);for (int i = 0; i < task_size; ++i) {int item = queue->pop();}pthread_exit(NULL);
}int main() {srand(time(NULL)); // 初始化随机数生成器pthread_t producer_thread[producer_size], consumer_thread[consume_size];BlockQueue queue(queue_size);// 创建生产者和消费者线程for (int i = 0; i < producer_size; i++) {pthread_create(&producer_thread[i], NULL, producer, &queue);}for (int i = 0; i < consume_size; i++) {pthread_create(&consumer_thread[i], NULL, consumer, &queue);}// 等待线程结束for (int i = 0; i < producer_size; i++) {pthread_join(producer_thread[i], NULL);}for (int i = 0; i < consume_size; i++) {pthread_join(consumer_thread[i], NULL);}std::cout << "All tasks completed." << std::endl;return 0;
}
————————————————————
感谢大家观看,不妨点赞支持一下吧喵~
如有错误,随时纠正喵~