【Java并发编程】信号量Semaphore详解

一、简介

Semaphore(信号量):是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore 一般用于流量的控制,特别是公共资源有限的应用场景。例如数据库的连接,假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。

Semaphore 是一个有效的流量控制工具,它基于 AQS 共享锁实现。我们常常用它来控制对有限资源的访问。

  • 每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;
  • 每次释放资源后,就释放一个信号量。

二、源码

2.1 类总览

通过上面的类图可以看到,SemaphoreReentrantLock 的内部类的结构相同,类内部总共存在 SyncNonfairSyncFairSync 三个类, NonfairSync 与 FairSync 类继承自 Sync 类,其只有一个 tryAcquireShared() 方法,重写了AQS的该方法。Sync 类继承自 AbstractQueuedSynchronizer 抽象类。

CountDownLatch 类似,Semaphore 主要是通过 AQS 的共享锁机制实现的,因此它的核心属性只有一个 Sync。总体源码如下:

public class Semaphore implements java.io.Serializable {//序列化版本号private static final long serialVersionUID = -3222578661600680210L;//同步队列private final Sync sync;//构造方法//指定许可数,默认为非公平策略public Semaphore(int permits) {sync = new NonfairSync(permits);}//指定许可数和是否公平策略public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}//Semaphore提供了acquire方法来获取一个许可,会阻塞线程(有重载方法,可以指定获取许可的个数)public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); //调用AQS的acquireSharedInterruptibly方法, 即共享式获取响应中断}//tryAcquire的意思是尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}//Semaphore提供release来释放许可public void release() {sync.releaseShared(1); //调用AQS的releaseShared方法,即释放共享式同步状态}abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}//获取许可数目    final int getPermits() {return getState();}//共享模式下非公平策略获取//本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性,该方法返回的情况有如下两种情况://   信号量不够,直接返回,返回值为负数,表示获取失败;//   信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。final int nonfairTryAcquireShared(int acquires) {for (;;) { //自旋int available = getState(); //获取可用许可值int remaining = available - acquires; //计算剩余的许可值//如果剩余许可值小于0,说明许可不够用了,直接返回,否则CAS更新许可值,更新成功返回,否则继续自旋if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}//共享模式下进行释放//该方法也是一个自旋方法,通过自旋+CAS原子性地修改许可值protected final boolean tryReleaseShared(int releases) {for (;;) { //自旋int current = getState(); //获取许可值int next = current + releases; //计算释放后的许可值if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) //CAS修改许可值,成功则返回,失败则继续自旋return true;}}//根据指定的缩减量减小可用许可的数目final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}//获取并返回立即可用的所有许可数目final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}//采用非公平策略获取资源static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}//获取许可protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); //共享模式下非公平策略获取}}//采用公平策略获取资源static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}//获取许可protected int tryAcquireShared(int acquires) {for (;;) {//获取共享锁之前,先调用hasQueuedPredecessors方法来判断队列中是否存在其他正在排队的节点,// 如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}
}

2.2 核心方法

获取信号量的方法总共有四个:

释放信号量的方法有两个:

获取信号量四个方法中后面三个方法原理同 acquire() ,我们这里来分析一下 acquire() 和 release() 方法。

2.2.1 acquire() 方法

获取许可,会阻塞线程,响应中断。

// Semaphore
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

内部调用的是 AQSacquireSharedInterruptibly() 方法, 即共享式获取响应中断,代码如下:

// AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

除了 tryAcquireShared() 方法由 AQS 子类实现,其他方法在 《AQS实现原理》中有讲解过,这里不再赘述。我们来分析一下子类实现的 tryAcquireShared() 方法,这里就要分公平和非公平策略两种情况了。

2.2.1.1 非公平策略下

非公平策略下的 tryAcquireShared() 方法:

// Semaphore#NonfairSync
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);
}

内部调用 Sync#nonfairTryAcquireShared() 方法:

// Sync
final int nonfairTryAcquireShared(int acquires) {//自旋for (;;) {//获取可用许可值int available = getState();//计算剩余的许可值int remaining = available - acquires;//如果剩余许可值小于0,说明许可不够用了,直接返回,否则CAS更新同步状态,更新成功返回,否则继续自旋if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

该方法本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性。方法返回的情况有如下两种情况

  • 信号量不够,直接返回,返回值为负数,表示获取失败;
  • 信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。

2.2.1.2 公平策略下

公平策略下的 tryAcquireShared() 方法如下:

// Semaphore#FairSync
protected int tryAcquireShared(int acquires) {//自旋for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}

我们看到它与非公平策略的唯一区别就是多了下面这个 if 代码:

protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;......}
}// AbstractQueuedSynchronizer
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());
}

即在获取共享锁之前,先调用 hasQueuedPredecessors() 方法来判断队列中是否存在其他正在排队的节点,如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。

2.2.2 release() 方法

Semaphore 提供 release() 方法来释放许可。我们继续分析 release() 方法,源码如下:

// Semaphore
public void release() {sync.releaseShared(1);
}//AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {//如果释放锁成功,唤醒正在排队的节点doReleaseShared();return true;}return false;
}//Semaphore#Sync
protected final boolean tryReleaseShared(int releases) {//自旋for (;;) {//获取许可值int current = getState();//计算释放后的许可值int next = current + releases;//如果释放后比释放前的许可值还小,直接报Errorif (next < current) // overflowthrow new Error("Maximum permit count exceeded");//CAS修改许可值,成功则返回,失败则继续自旋if (compareAndSetState(current, next))return true;}
}

tryReleaseShared() 方法是一个自旋方法,通过自旋+CAS原子性地修改同步状态,逻辑很简单。

2.2.3 其余方法

获取信号量的方法有四个:

释放信号量的方法有两个:

其余获取和释放信号量的方法原理同上问,不再赘述。接下来看看其余的工具方法。

2.2.3.1 tryAcquire() 尝试获取许可

该方法一共有四种重载形式:

  • tryAcquire() :尝试获取许可,如果获取成功返回true,否则返回false,不会阻塞线程,而且不响应中断。
  • tryAcquire(int permits) :同上的基础上,可以指定获取许可的个数。
  • tryAcquire(long timeout, TimeUnit unit) :指定超时时间,它调用AQS的tryAcquireSharedNanos() 方法,即共享式超时获取。
  • tryAcquire(int permits, long timeout, TimeUnit unit) :可以指定获取许可的个数和超时时间。
//Semaphore
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;
}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;
}public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
2.2.3.2 availablePermits() 获取可用许可数

源码如下:

//Semaphore
public int availablePermits() {//获取可用许可数return sync.getPermits();
}//Sync
//获取可用许可数
final int getPermits() {return getState();
}

2.2.3.3 drainPermits() 耗光信号量

将剩下的信号量一次性消耗光,并且返回所消耗的信号量。

//Semaphore
public int drainPermits() {return sync.drainPermits();
}//Sync
final int drainPermits() {//自旋操作for (;;) {//获取信号量值int current = getState();//如果信号量为0,直接返回//否则CAS修改为0,成功则返回,否则继续自旋if (current == 0 || compareAndSetState(current, 0))return current;}
}
2.2.3.4 reducePermits() 减少信号量

reducePermits() 和 acquire() 方法相比都是减少信号量的值,但是 reducePermits() 不会导致任何线程阻塞,即只要传递的参数 reductions(减少的信号量的数量)大于0,操作就会成功。所以调用该方法可能会导致信号量最终为负数

//Semaphore
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);
}//Sync
final void reducePermits(int reductions) {//自旋for (;;) {//获取当前信号量值int current = getState();//计算剩余许可值int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");//CAS修改同步状态,成功则返回,失败则继续自旋if (compareAndSetState(current, next))return;}
}

三、使用案例

这里以经典的停车作为案例。假设停车场有3个停车位,此时有5辆汽车需要进入停车场停车。

public static void main(String[] args) {//定义semaphore实例,设置许可数为3,即停车位为3个Semaphore semaphore = new Semaphore(3);//创建五个线程,即有5辆汽车准备进入停车场停车for (int i = 1; i <= 5; i++) {new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "尝试进入停车场...");//尝试获取许可semaphore.acquire();//模拟停车long time = (long) (Math.random() * 10 + 1);System.out.println(Thread.currentThread().getName() + "进入了停车场,停车" + time +"秒...");Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName() + "开始驶离停车场...");//释放许可semaphore.release();System.out.println(Thread.currentThread().getName() + "离开了停车场!");}}, i + "号汽车").start();}
}
//执行结果
1号汽车尝试进入停车场...
5号汽车尝试进入停车场...
4号汽车尝试进入停车场...
3号汽车尝试进入停车场...
2号汽车尝试进入停车场...
5号汽车进入了停车场,停车5秒...
1号汽车进入了停车场,停车8秒...
4号汽车进入了停车场,停车9秒...
5号汽车开始驶离停车场...
5号汽车离开了停车场!
3号汽车进入了停车场,停车10秒...
1号汽车开始驶离停车场...
1号汽车离开了停车场!
2号汽车进入了停车场,停车2秒...
4号汽车开始驶离停车场...
4号汽车离开了停车场!
2号汽车开始驶离停车场...
2号汽车离开了停车场!
3号汽车开始驶离停车场...
3号汽车离开了停车场!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/458551.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谈谈对函数式编程的理解及rxjs的使用

背景 函数式编程可以说是非常古老的编程方式&#xff0c;但是近几年变成了一个非常热门的话题。不管是Google力推的Go、学术派的Scala与Haskell&#xff0c;还是Lisp的新语言Clojure&#xff0c;这些新的函数式编程语言越来越受到人们的关注。函数式编程思想对前端的影响很大&…

C语言基础题(大合集2)

1. 时间转换 给定秒数 --> 输出秒数 转化成 时/分/秒 //时间转换 //给定秒数 --> 转换成 小时/分/秒 int main() {//输入int seconds 0;int h 0;//小时int m 0;//分钟int s 0;//秒scanf("%d", &seconds);//计算h seconds / 60 / 60;m seconds / 60…

详解varint,zigzag编码, 以及在Go标准库中的实现

文章目录 为啥需要varint编码为啥需要zigzag编码varint编码解码 zigzag编码解码 局限性 为啥需要varint编码 当我们用定长数字类型int32来表示整数时&#xff0c;为了传输一个整数1&#xff0c;我们需要传输00000000 00000000 00000000 00000001 32 个 bits&#xff0c;而有价…

【C++】STL初识

【C】STL初识 文章目录 【C】STL初识前言一、STL基本概念二、STL六大组件简介三、STL三大组件四、初识STL总结 前言 本篇文章将讲到STL基本概念&#xff0c;STL六大组件简介&#xff0c;STL三大组件&#xff0c;初识STL。 一、STL基本概念 STL(Standard Template Library,标准…

QT建立工程时出现了:Reading Project

QT建立工程时出现了&#xff1a;Reading Project 打开建立的工程发现&#xff0c;缺少build文件 从别的工程中复制一个build&#xff0c;点击.pro就可以打开了

【CSS3】css开篇基础(4)

1.❤️❤️前言~&#x1f973;&#x1f389;&#x1f389;&#x1f389; Hello, Hello~ 亲爱的朋友们&#x1f44b;&#x1f44b;&#xff0c;这里是E绵绵呀✍️✍️。 如果你喜欢这篇文章&#xff0c;请别吝啬你的点赞❤️❤️和收藏&#x1f4d6;&#x1f4d6;。如果你对我的…

Spring Boot实现的动态化酒店住宿管理系统

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理酒店客房管理系统的相关信息成为必然。开发…

图文详解ChatGPT-o1完成论文写作的全流程

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 本月中旬OpenAI发布了OpenAI o1系列新的AI模型。 据OpenAI介绍&#xff0c;这些模型旨在花更多时间思考后再做出反应&#xff0c;就像人一样。通过训练&#xff0c;它们学会改进思维过…

深度学习(六)CNN:图像处理的强大工具(6/10)

一、CNN 的概述 卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;CNN&#xff09;是深度学习的代表算法之一&#xff0c;在深度学习中占据着重要地位。 CNN 的发展历程可追溯至 20 世纪 80 至 90 年代&#xff0c;时间延迟网络和 LeNet - 5 是最早出现的卷…

conda虚拟环境中安装cuda方法、遇到的问题

conda虚拟环境中安装cuda方法、遇到的问题 文章目录 conda虚拟环境中安装cuda方法、遇到的问题conda虚拟环境中安装cudacuda.h和cuda_runtime.hpytorch运行时的CUDA版本其他问题检查包冲突nvcc -V和nvidia-smi显示的版本不一致cuda路径 conda虚拟环境中安装cuda 参考文章&…

【AIGC】从CoT到BoT:AGI推理能力提升24%的技术变革如何驱动ChatGPT未来发展

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;迈向AGI的新跨越&#x1f4af;BoT与CoT的技术对比技术原理差异推理性能提升应用范围和通用性从错误中学习的能力总结 &#x1f4af;BoT的工作流程和机制初始化过程生成推…

layaair获取组件里的脚本

获取脚本用getComponents方法&#xff0c;但是这个方法里的参数不是脚本的名称。而是组件类型。如果你需要获取脚本&#xff0c;则类型为Laya.Script。挺坑的。我在官网找都没找到这个是这么用的。我猜测的。没想到试了一下成功了。 property(Laya.Node)public img1: Laya.Node…

碰一碰支付系统搭建怎么做?头部公司源码大测评!

随着碰一碰支付dai li骗局的曝光&#xff0c;越来越多的人开始选择将目光转向碰一碰支付系统搭建这一入局方式&#xff0c;连带着与之相关的多个话题&#xff0c;如碰一碰支付系统搭建怎么做等也成为了当前的一大热点。 毕竟&#xff0c;相较于dai li 模式的与第三方公司合作、…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-26

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-26 前言 本期相关论文可以从“下载” 资源中获取&#xff0c;如果有感兴趣的问题&#xff0c;欢迎交流探讨&#xff01; 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-26前言目…

【C++进阶】C++11(上)

【C进阶】C11(上) &#x1f955;个人主页&#xff1a;开敲&#x1f349; &#x1f525;所属专栏&#xff1a;C&#x1f96d; &#x1f33c;文章目录&#x1f33c; 1. C11的发展史 2. 列表初始化 2.1 C98的传统{} 2.2 C11中的{} 2.3 C11中的std::initializer_list 3. 右值引用…

Kaggle竞赛——灾难推文分类(Disaster Tweets)

目录 1. 准备工作2. 资源导入3. 数据处理4. 绘制词云图5. 数据可视化5.1 词数和字符数可视化5.2 元特征可视化5.3 类别可视化 6. 词元分析6.1 一元语法统计6.2 多元语法统计 7. 命名实体识别8. 推文主题提取9. 构建模型9.1 数据划分与封装9.2 模型训练与验证 10. 模型评估11. 测…

jvm虚拟机介绍

Java虚拟机&#xff08;JVM&#xff09;是Java语言的运行环境&#xff0c;它基于栈式架构&#xff0c;通过加载、验证、准备、解析、初始化等类加载过程&#xff0c;将Java类文件转换成平台无关的字节码&#xff0c;并在运行时动态地将其翻译成特定平台的机器码执行。 JVM的核心…

App测试环境部署

一.JDK安装 参考以下AndroidDevTools - Android开发工具 Android SDK下载 Android Studio下载 Gradle下载 SDK Tools下载 二.SDK安装 安装地址&#xff1a;https://www.androiddevtools.cn/ 解压 环境变量配置 变量名&#xff1a;ANDROID_SDK_HOME 参考步骤&#xff1a; A…

K8s中TSL证书如何续期

TSL是什么 K8s中的作用是什么&#xff1f; 在 Kubernetes&#xff08;K8s&#xff09;中&#xff0c;TSL 指的是 Transport Layer Security&#xff0c;也就是传输层安全协议。它是用来保护在网络上传输的数据的安全性和隐私性。 TSL 在 Kubernetes 中的作用包括&#xff1a;…