Aqs的CyclicBarrier。

今天我们来学习AQS家族的“外门弟子”:CyclicBarrier。

为什么说CyclicBarrier是AQS家族的“外门弟子”呢?那是因为CyclicBarrier自身和内部类Generation并没有继承AQS,但在源码的实现中却深度依赖AQS家族的成员ReentrantLock。就像修仙小说中,大家族会区分外门和内门,外门弟子通常会借助内门弟子的名声行事,CyclicBarrier正是这样,因此算是AQS家族的“外门弟子”。在实际的面试中,CyclicBarrier的出现的次数较少,通常会出现在与CountDownLatch比较的问题当中

今天我们就逐步拆解CyclicBarrier,来看看它与CountDownLatch之间到底有什么差别。

CyclicBarrier是什么?

先从CyclicBarrier的名字开始入手,Cyclic是形容词,译为“循环的,周期的”,Barrier是名词,译为“屏障,栅栏”,组合起来就是“循环的屏障”,那么该怎么理解“循环的屏障”呢?我们来看CyclicBarrier的注释是怎么解释的:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.   CyclicBarrier是一种同步辅助工具,允许一组线程等待彼此到达共同的屏障点。

The barrier is called cyclic because it can be re-used after the waiting threads are released. 因为在等待线程释放后可以重复使用,所以屏障被称为循环屏障。

看起来与CountDownLatch有些相似,我们通过一张图来展示下CyclicBarrier是怎样工作的:

部分线程到达屏障后,会在屏障处等待,只有全部线程都到达屏障后,才会继续执行。如果以CountDownLatch中越野徒步来举例的话,把老板拿掉,选手之间的互相等待,就是CyclicBarrier了。

另外,注释中说CyclicBarrier是“re-used”,即可重复使用的。回想一下CountDownLatch的实现,并未做任何重置计数器的工作,即当CountDownLatch的计数减为0后不能恢复,也就是说CountDownLatch的功能是一次性的

Tips:实际上,可以用CountDownLatch实现类似于CyclicBarrier的功能。

CyclicBarrier怎么用?

我们用没有老板参加的越野徒步来举例,部分先到的选手要等待后到的选手一起吃午饭,用CyclicBarrier来实现的代码是这样的:

// 初始化CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);for (int i = 0; i < 10; i++) {int finalI = i;new Thread(() -> {try {TimeUnit.SECONDS.sleep((finalI + 1));} catch (InterruptedException e) {throw new RuntimeException(e);}try {System.out.println("选手[" + finalI + "]到达终点,等待其他选手!!!");// 线程在屏障点处等待cyclicBarrier.await();System.out.println("选手[" + finalI + "]开始吃午饭啦!!!");} catch (InterruptedException | BrokenBarrierException e) {throw new RuntimeException(e);}}).start();
}

用法和CountDownLatch很相似,构造函数设置CyclicBarrier需要多少个线程达到屏障后统一行动,区别是CyclicBarrier在每个线程中都调用了CyclicBarrier#await,而我们在使用CountDownLatch时只在主线程中调用了一次CountDownLatch#await

那CountDownLatch可以在线程中调用CountDownLatch#await吗?答案是可以的,这样使用的效果和CyclicBarrier是一样的:

CountDownLatch countDownLatch = new CountDownLatch(10);for (int i = 0; i < 10; i++) {int finalI = i;new Thread(() -> {try {TimeUnit.SECONDS.sleep((finalI + 1));} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println("选手[" + finalI + "]到达终点!!!");countDownLatch.countDown();try {countDownLatch.await();System.out.println("选手[" + finalI + "]开始吃午饭啦!!!");} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();
}

通过上面的例子,我们不难想到CyclicBarrier#await方法是同时具备了CountDownLatch#countDown方法和CountDownLatch#await方法的能力,即执行了计数减1,又执行了暂停线程

CyclicBarrier是怎么实现的?

我们先整体认识一下CyclicBarrier:

CyclicBarrier的内部结构比CountDownLatch复杂一些,除了我们前面提到的借助AQS的“内门弟子”ReentrantLock类型的lock和Condition类型的trip外,CyclicBarrier还有两个“特别”的地方:

  • 内部类Generation,直译过来是“代”,它起到什么作用?
  • Runnable类型的成员变量barrierCommand,它又做了些什么?

其余的部分,大部分可以在CountDownLatch中找到对应的方法,或者通过名称我们就很容易得知它们的作用。

CyclicBarrier的构造方法

CyclicBarrier提供了两个(实际是一个)构造方法:

// 需要到达屏障的线程数
private final int parties;// 所有线程都到达后执行的动作
private final Runnable barrierCommand;// 计数器
private int count;public CyclicBarrier(int parties) {
this(parties, null);
}public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0) {throw new IllegalArgumentException();}this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;
}

第二个构造函数接收了两个参数:

  • parties:表示需要多少个线程到达屏障处调用CyclicBarrier#await
  • barrierAction:所有线程到达屏障后执行的动作。

构造方法的代码一如既往的简单,只有一处比较容易产生疑惑,parties和count有什么区别?

首先来看成员变量的声明,parties使用了final,表明它是不可变的对象,代表CyclicBarrier需要几个线程共同到达屏障处;而count是计数器,初始值是parties,随着到达屏障处的线程数量增多count会逐步减少至0。

CyclicBarrier的内部类Generation

private static class Generation {Generation() {}  boolean broken;
}

Generation用于标记CyclicBarrier的当前代,Doug Lea是这么解释它的作用的:

Each use of the barrier is represented as a generation instance. The generation changes whenever the barrier is tripped, or is reset.

每次使用屏障(CyclicBarrier)都需要一个Generation实例。无论是通过屏障还是重置屏障,Generation都会发生改变。

Generation中的broken用于标记当前的CyclicBarrier是否被打破,默认为false,值为true时表示当前CyclicBarrier已经被打破,此时CyclicBarrier不能正常使用,需要调用CyclicBarrier#reset方法重置CyclicBarrier的状态。

CyclicBarrier#await方法

前面我们猜测CyclicBarrier#await方法即实现了计数减1,又实现了线程等待的功能,下面我们就通过源码来验证我们的想法:

public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe);}
}public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {return dowait(true, unit.toNanos(timeout));
}

两个重载方法都指向了CyclicBarrier#dowait方法:

private int dowait(boolean timed, long nanos)  throws InterruptedException, BrokenBarrierException, TimeoutException {// 使用ReentrantLockfinal ReentrantLock lock = this.lock;lock.lock();try {// 第2部分// 获取CyclicBarrier的当前代,并检查CyclicBarrier是否被打破final Generation g = generation;if (g.broken) {throw new BrokenBarrierException();}// 线程被中断时,调用breakBarrier方法if (Thread.interrupted()) {breakBarrier();throw new InterruptedException();}// 第3部分//计数器减1int index = --count;// 计数器为0时表示所有线程都到达了,此时要做的就是唤醒等待中的线程if (index == 0) {boolean ranAction = false;try {// 执行唤醒前的操作final Runnable command = barrierCommand;if (command != null) {command.run();}ranAction = true;// CyclicBarrier进入下一代nextGeneration();return 0;} finally {if (!ranAction) {breakBarrier();}}}// 第4部分// 只有部分线程到达屏障处的情况for (;;) {try {//调用等待逻辑)if (!timed) {trip.await();} else if (nanos > 0L) {nanos = trip.awaitNanos(nanos);}} catch (InterruptedException ie) {// 线程被中断时,调用breakBarrier方法if (g == generation && ! g.broken) {breakBarrier();throw ie;} else {Thread.currentThread().interrupt();}}if (g.broken) {throw new BrokenBarrierException();}// 如果不是当前代,返回计数器的值if (g != generation) {return index;}// 如果等待超时,调用breakBarrier方法if (timed && nanos <= 0L) {breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}
}

CyclicBarrier#dowait方法看起来很长,但如果拆成3部分来看逻辑并不复杂:

  • 第1部分:CyclicBarrier与线程的状态校验;
  • 第2部分:当计数器减1后值为0时,唤醒所有等待中的线程;
  • 第3部分:当计数器减1后值不为0时,线程进入等待状态。

先来看第1部分,CyclicBarrier与线程的状态校验的部分,先是判断CyclicBarrier是否被打破,接着判断当前线程是否为中断状态,如果是则调用CyclicBarrier#breakBarrier方法:

private void breakBarrier() {generation.broken = true;count = parties;trip.signalAll();
}

CyclicBarrier#breakBarrier方法非常简单,只做了3件事:

  • 标记CyclicBarrier被打破;
  • 重置CyclicBarrier的计数器;
  • 唤醒全部等待中的线程。

也就是说,一旦有个线程标记为中断状态,都会直接打破CyclicBarrier的屏障。

我们先跳过第2部分的唤醒逻辑,直接来看第3部分线程进入等待状态的逻辑。根据timed参数选择调用Condition不同的等待方法,随后是对异常的处理和线程中断状态的处理,同样是调用CyclicBarrier#breakBarrier,标记CyclicBarrier不可用。线程进入等待状态的逻辑并不复杂,本质上是通过AQS的Condition来实现的。

最后来看第2部分唤醒所有等待中线程的操作,根据计数器是否为0判断是否需要进行唤醒。如果需要唤醒,最后一个执行CyclicBarrier#await的线程执行barrierCommand(此时尚未执行任何线程唤醒的操作),做通过屏障前的处理操作,接着调用CyclicBarrier#nextGeneration方法:

private void nextGeneration() {trip.signalAll();count = parties;generation = new Generation();
}

CyclicBarrier#nextGeneration方法也做了3件事:

  • 唤醒所有Condition上等待的线程;
  • 重置CyclicBarrier的计数器;
  • 创建新的Generation对象。

很符合进入“下一代”的名字,先唤醒“上一代”所有等待中的线程,然后重置CyclicBarrier的计数器,最后更新CyclicBarrier的Generation对象,对CyclicBarrier进行重置工作,让CyclicBarrier进入下一个纪元。

到这里我们不难发现,CyclicBarrier自身只做了维护计数器和重置计数器的工作,而保证互斥性和线程的等待与唤醒则是依赖AQS家族的成员完成的:

  • ReentrantLock保证了同一时间只有一个线程可以执行CyclicBarrier#await,即同一时间只有一个线程可以维护计数器;
  • Condition为CyclicBarrier提供了条件等待队列,完成了线程的等待与唤醒的工作。

CyclicBarrier#reset方法

最后我们来看CyclicBarrier#reset方法:

public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {// 主动打破CyclicBarrierbreakBarrier();// 使CyclicBarrier进入下一代nextGeneration();} finally {lock.unlock();}
}

CyclicBarrier#reset方法都是老面孔,先是CyclicBarrier#breakBarrier打破上一代CyclicBarrier,既然要重新开始就不要再“怀念”过去了;最后调用CyclicBarrier#nextGeneration开始新的时代。需要注意的是,这里加锁的目的是为了保证执行CyclicBarrier#reset时,没有任何线程正在执行CyclicBarrier#await方法。

好了,到这里CyclicBarrier的核心内容我们就一起分析完了,剩下的方法就非常简单了,相信通过名字大家就可以了解它们的作用,并猜到它们的实现了。

TipsCyclicBarrier#getNumberWaiting中加了锁,这是为什么?

CountDownLatch和Cyclicbarrier有什么区别?

最后的部分,我们来解答下开篇时的面试题,CountDownLatch和Cyclicbarrier有什么区别?

第1点:CyclicBarrier可以重复使用,CountDownLatch不能重复使用

无论是正常使用结束,还是调用CyclicBarrier#reset方法,Cyclicbarrier都可以重置内部的计数器

第2点:Cyclicbarrier只阻塞调用CyclicBarrier#await方法的线程,而CountDownLatch可以阻塞任意一个或多个线程

CountDownLatch将计数减1与阻塞拆分成了CountDownLatch#countDownCountDownLatch#await两个方法,而Cyclicbarrier只通过CyclicBarrier#await完成两步操作。如果在同一个线程中连续CountDownLatch#countDownCountDownLatch#await则实现了与CyclicBarrier#await方法相同的功能。


如果本文对你有帮助的话,还请多多点赞支持。如果文章中出现任何错误,还请批评指正。最后欢迎大家关注分享硬核Java技术的金融摸鱼侠王有志,我们下次再见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/118985.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++实现蜂群涌现效果(flocking)

Flocking算法0704_元宇宙中的程序员的博客-CSDN博客 每个个体的位置&#xff0c;通过计算与周围个体的速度、角度、位置&#xff0c;去更新位置。

【Seata】00 - Seata Server 部署(Windows、Docker 基于 Jpom)

文章目录 前言参考目录版本说明Windows 部署 seata-server1&#xff1a;下载压缩包2&#xff1a;文件存储模式3&#xff1a;db 存储模式3.1&#xff1a;建表3.2&#xff1a;修改配置文件3.3&#xff1a;启动脚本4&#xff1a;源码部署 Docker 部署 seata-server &#xff08;基…

Java学习之序列化

1、引言 《手册》第 9 页 “OOP 规约” 部分有一段关于序列化的约定 1&#xff1a; 【强制】当序列化类新增属性时&#xff0c;请不要修改 serialVersionUID 字段&#xff0c;以避免反序列失败&#xff1b;如果完全不兼容升级&#xff0c;避免反序列化混乱&#xff0c;那么请…

引用(个人学习笔记黑马学习)

1、引用的基本语法 #include <iostream> using namespace std;int main() {int a 10;//创建引用int& b a;cout << "a " << a << endl;cout << "b " << b << endl;b 100;cout << "a "…

JVM 是怎么设计来保证new对象的线程安全

1、采用 CAS 分配重试的方式来保证更新操作的原子性 2、每个线程在 Java 堆中预先分配一小块内存&#xff0c;也就是本地线程分配缓冲&#xff08;Thread Local AllocationBuffer&#xff0c;TLAB&#xff09;&#xff0c;要分配内存的线程&#xff0c;先在本地缓冲区中分配&a…

LeetCode494. 目标和

494. 目标和 文章目录 [494. 目标和](https://leetcode.cn/problems/target-sum/)一、题目二、题解方法一&#xff1a;目标和路径计数算法方法二&#xff1a;01背包方法三&#xff1a;01背包一维数组 一、题目 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个…

外部中断(EXTI) - 按键控制LED

一、外部中断/事件控制器(EXTI)结构图 1、结构图分析 外部中断主要由外部中断/事件控制器(External interrupt/event controller, EXTI)控制&#xff0c;它管理了外部中断或者事件的使能与否、触发方式等功能。 &#xff08; 外部中断/事件控制器(EXTI)结构图 &#xff09; …

【5】openGL使用宏和函数进行错误检测

当我们编写openGL程序&#xff0c;没有报编译链接错误&#xff0c;但是运行结果是黑屏&#xff0c;这不是我们想要的。 openGL提供了glGetError 来检查错误&#xff0c;我们可以通过在运行时进行打断点查看glGetError返回值&#xff0c;得到的是一个十进制数&#xff0c;将其转…

Nacos服务注册和服务配置

Nacos 是什么 Nacos (Dynamic Naming and Configuration Service)&#xff0c;其命名由三部分组成&#xff1a; Na (naming/nameServer)&#xff0c;即服务注册中心。 co (configuration)&#xff0c;即配置中心。 s (service)&#xff0c;即服务&#xff0c;表示 Nacos 实现的…

华为 连接OSPF和RIP网络---OSPF和RIP网络相互引入

路由引入简介 不同路由协议之间不能直接共享各自的路由信息&#xff0c;需要依靠配置路由的引入来实现。 获得路由信息一般有3种途径&#xff1a;直连网段、静态配置和路由协议。可以将通过这3种途径获得的路由信息引入到路由协议中&#xff0c;例如&#xff0c;把直连网段引入…

【文心一言】学习笔记

学习资料 《听说文心一言App霸榜了&#xff0c;那必须来一波全方位实测了》 情感陪伴&#xff1a;文心一言 App 可以充当用户的情感树洞&#xff0c;提供知心姐姐、【暖男】等角色扮演&#xff0c;为用户提供情绪疏导、情感分析、约会建议等服务。 1. 模型属性 【提示词工具…

无涯教程-Android - CheckBox函数

CheckBox是可以由用户切换的on/off开关。为用户提供一组互不排斥的可选选项时,应使用复选框。 CheckBox 复选框属性 以下是与CheckBox控件相关的重要属性。您可以查看Android官方文档以获取属性的完整列表以及可以在运行时更改这些属性的相关方法。 继承自 android.widget.T…

nuxt3+ts+vue3的ssr项目总结

目录 一、什么是SSR、SEO、SPA&#xff0c;它们之间的关系又是怎样的。 二、VUE做SSR的几种方法 1、插件prerender-spa-plugin 2、VUE开启SSR渲染模式 3、使用NUXT框架 三、NUXT3VUE3TS &#xff08;一&#xff09;基本配置 1、文件夹介绍 assets components pages…

Docker安装MySQL教程

虽然 docker 安装 mysql 不是一个很好的方案&#xff0c;但是为了个人使用方便&#xff0c;使用 docker 安装 mysql 还是没什么问题的。 本文为了方便&#xff0c;我们直接通过yum方式安装。所以&#xff0c;我们在安装之前需要电脑可以联网&#xff0c;不然我们这种方式是安装…

Python的由来和基础语法(一)

目录 一、Python 背景知识 1.1Python 是咋来的? 1.2Python 都能干啥? 1.3Python 的优缺点 二、基础语法 2.1常量和表达式 2.2变量和类型 变量的语法 (1) 定义变量 (2) 使用变量 变量的类型 (1) 整数 (2) 浮点数(小数) (3) 字符串 (4) 布尔 (5) 其他 动态类型…

《TCP/IP网络编程》阅读笔记--基于Windows实现Hello Word服务器端和客户端

目录 1--Hello Word服务器端 2--客户端 3--编译运行 3-1--编译服务器端 3-2--编译客户端 3-3--运行 1--Hello Word服务器端 // gcc hello_server_win.c -o hello_server_win -lwsock32 // hello_server_win 9190 #include <stdio.h> #include <stdlib.h> #i…

vue+element-ui el-table组件二次封装实现虚拟滚动,解决数据量大渲染DOM过多而卡顿问题

一、此功能已集成到TTable组件中 二、最终效果 三、需求 某些页面不做分页时&#xff0c;当数据过多&#xff0c;会导致页面卡顿&#xff0c;甚至卡死 四、虚拟滚动 一、固定一个可视区域的大小并且其大小是不变的&#xff0c;那么要做到性能最大化就需要尽量少地渲染 DOM 元素…

kotlin 转 Java

今天突然想研究下有些kotlin文件转为Java到底长什么样&#xff0c;好方便优化kotlin代码&#xff0c;搞了半天发现一个非常简单的Android Studio或者Intellij idea官方插件Kotlin&#xff0c;Kotlin是插件的名字&#xff0c;真是醉了&#xff1b; 这里以AS为例&#xff0c;使用…

OTFS-ISAC通信最新进展

测试场景 Tx DD域帧结构导频区域 Rx DD域帧导频区域 原始星座图 信道估计及数据检测 经过MP算法后的星座图 误码率曲线

ELK安装、部署、调试 (二) ES的安装部署

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎&#xff0c;基于RESTful web接口操作ES&#xff0c;也可以利用Java API。Elasticsearch是用Java开发的&#xff0c;并作为Apache许可条款下的开放源码发布&#xff0c;是当前流行的企业…