Curator分布式锁

系列文章目录


文章目录

  • 系列文章目录
  • 前言


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。
在这里插入图片描述
zookeeper安装单机模式

http://www.javacui.com/opensource/445.html

SpringBoot集成Curator实现Zookeeper基本操作

http://www.javacui.com/tool/615.html

SpringBoot集成Curator实现Watch事件监听

http://www.javacui.com/tool/616.html

Zookeeper实现分布式锁的机制

使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。

创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。

比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。

锁分类

InterProcessSemaphoreMutex:分布式不可重入排它锁

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量

Shared Lock 分布式非可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-lock.html

InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。

Shared Reentrant Lockf分布式可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

此锁可以重入,但是重入几次需要释放几次。

InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

Shared Reentrant Read Write Lock可重入读写锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html

读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

Multi Shared Lock 多共享锁

官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html

多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。

Shared Semaphore共享信号量

官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html

一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。

有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。

如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。

acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。

编码测试

package com.example.springboot;import com.example.springboot.tool.ZkConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;/*** @Auther: Java小强* @Date: 2022/2/4 - 19:33* @Decsription: com.example.springboot* @Version: 1.0*/
@SpringBootTest(classes = Application.class)
public class CuratorTest {@Autowiredprivate ZkConfiguration zk;// 共享信号量,多个信号量@Testpublic void testInterProcessSemaphoreV22() throws Exception {CuratorFramework client = zk.curatorFramework();// 创建一个信号量, Curator 以公平锁的方式进行实现final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3);new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取2个许可Collection<Lease> acquire = semaphore.acquire(2);System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnAll(acquire);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取一个许可Lease lease = semaphore.acquire();System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnLease(lease);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 共享信号量@Testpublic void testInterProcessSemaphoreV2() throws Exception {CuratorFramework client = zk.curatorFramework();// 创建一个信号量, Curator 以公平锁的方式进行实现final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1);new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取一个许可Lease lease = semaphore.acquire();System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnLease(lease);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取一个许可Lease lease = semaphore.acquire();System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnLease(lease);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 多重共享锁@Testpublic void testInterProcessMultiLock() throws Exception {CuratorFramework client = zk.curatorFramework();// 可重入锁final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock");// 不可重入锁final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock");// 创建多重锁对象final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();// 获取参数集合中的所有锁lock.acquire();// 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS));// interProcessLock1 是可重入锁, 所以可以继续获取锁System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS));// interProcessLock2 是不可重入锁, 所以获取锁失败System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS));} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 分布式读写锁@Testpublic void testReadWriteLock() throws Exception {CuratorFramework client = zk.curatorFramework();// 创建共享可重入读写锁final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock");final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();locl1.writeLock().acquire(); // 获取锁对象System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);locl1.readLock().acquire(); // 获取读锁,锁降级System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);locl1.readLock().release();System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");locl1.writeLock().release();System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock2.writeLock().acquire(); // 获取锁对象System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock2.readLock().acquire(); // 获取读锁,锁降级System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock2.readLock().release();System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");lock2.writeLock().release();System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 分布式可重入排它锁@Testpublic void testInterProcessMutex() throws Exception {CuratorFramework client = zk.curatorFramework();// 分布式可重入排它锁final InterProcessLock lock = new InterProcessMutex(client, "/lock");final InterProcessLock lock2 = new InterProcessMutex(client, "/lock");new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock.acquire(); // 获取锁对象System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");lock.acquire(); // 测试锁重入System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock.acquire(); // 获取锁对象System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");lock.acquire(); // 测试锁重入System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}
//        顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁}// 分布式不可重入排它锁@Testvoid testInterProcessSemaphoreMutex() throws Exception {CuratorFramework client = zk.curatorFramework();// 分布式不可重入排它锁final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock");final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock");new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock.acquire(); // 获取锁对象System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");// 测试锁重入Thread.sleep(2 * 1000);lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();lock2.acquire();System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);lock2.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}
//        顺序不一定,但是必须是获取后再释放其他线程才能获取到锁}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/325103.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【GlobalMapper精品教程】079:投影坐标系转地理坐标系(UTM转WGS1984/2000)

文章目录 一、矢量UTM转WGS1984/20001. UTM转WGS19842. UTM转CGCS2000二、栅格UTM转WGS1984/2000一、矢量UTM转WGS1984/2000 加载配套实验数据(data079.rar)中的矢量数据,如下所示: 查看源坐标系:双击图层的,图层投影选项卡,为UTM投影,Zone48N。 设置系统坐标系:点击…

【JavaEE网络】HTTPS详解:从对称与非对称加密到证书认证

目录 HTTPSHTTPS 是什么“加密” 是什么HTTTPS 的工作过程引入对称加密引入非对称加密引入证书完整流程总结 HTTPS HTTPS 是什么 HTTPS 也是一个应用层协议. 是在 HTTP 协议的基础上引入了一个加密层. HTTP 协议内容都是按照文本的方式明文传输的. 这就导致在传输过程中出现…

【JVM】ASM开发

认识ASM ASM是一个Java字节码操纵框架&#xff0c;它能被用来动态生成类或者增强既有类的功能。 ASM可以直接产生二进制class文件&#xff0c;也可以在类被加载入虚拟机之前动态改变类行为&#xff0c;ASM从类文件中读入信息后能够改变类行为&#xff0c;分析类信息&#xff…

【CSS】认识CSS选择器及各选择器对应的用法

目录 一、什么是CSS&#xff1f; 二、CSS 选择器 1. 标签选择器 2. 类选择器 3. ID选择器 4. 通配符选择器 5. 复合选择器 一、什么是CSS&#xff1f; CSS(Cascading Style Sheet)&#xff0c;层叠样式表。它与 HTML&#xff08;超文本标记语言&#xff09;一起使用&am…

线下研讨会 技术沙龙|乐鑫芯片与 ESP RainMaker® 为科技初创企业赋能

众多科技初创企业在智能硬件市场迅猛发展的背景下&#xff0c;对不断变化的需求展现出了高度的敏锐性&#xff0c;期望能够快速将其转化为切实的产品方案。然而&#xff0c;面对复杂繁重的软硬件集成任务&#xff0c;这些企业往往容易陷入研发瓶颈、资金短缺以及效率低下等多重…

前端小程序调用 getLocation 实现地图位置功能,通过 纬度:latitude 经度: longitude 获取当前位置

1、首先登录一下 腾讯的位置服务 有账号就登录没账号就注册&#xff0c; 点击右上角的控制台点击左侧的应用管理 ---> 我的应用 ---->> 创建应用 1、创建应用 2、列表就会显示我们刚刚创建好的 key 3、点击添加 key 4、按照要求填写信息 我们用的是小程序 所以选择…

vue2和vue3区别: 探索关键差异

vue2和vue3区别&#xff1a; 探索关键差异 Vue.js 作为流行的前端框架&#xff0c;其版本 3 带来了许多令人兴奋的改进和新功能。虽然 Vue 3 保持了与 Vue 2 的相似性&#xff0c;但也存在一些关键差异需要开发者注意。本文将通过表格形式&#xff0c;清晰地展现 Vue 2 和 Vue …

Python-VBA函数之旅-str函数

目录 一、str函数的常见应用场景 二、str函数使用注意事项 三、如何用好str函数&#xff1f; 1、str函数&#xff1a; 1-1、Python&#xff1a; 1-2、VBA&#xff1a; 2、推荐阅读&#xff1a; 个人主页&#xff1a; https://myelsa1024.blog.csdn.net/ 一、str函数的常…

搭建Springboot的基础开发框架-01

本系列专题虽然是按教学的深度来定稿的&#xff0c;但在项目结构和代码组织方面是按公司系统的要求来书定的。在本章中主要介绍下基础开发框架的功能。后续所有章节的项目全是在本基础框架的基础上演进的。 工程结构介绍 SpringbootSeries&#xff1a;父工程&#xff0c;定义一…

Linux(Ubuntu24.04) 安装 MinIO

本文所使用的 Ubuntu 系统版本是 Ubuntu 24.04 ! # 1、下载 MinIO wget https://dl.min.io/server/minio/release/linux-amd64/minio# 2、添加可执行权限 chmod x minio# 3、导出环境变量&#xff0c;用于设置账号密码&#xff0c;我设置的账号和密码都是 minioadmin export MI…

Xilinx 千兆以太网TEMAC IP核 AXI4-Lite接口信号

在AX4总线标准中&#xff0c;AXI4-Lite主要由向她址映射型通信。TEMAC的管理法口采用AXI4-Lite标准接口&#xff0c;TEMAC核的AX14-Lite接口信号如表1所示&#xff0c;根据AX14-Lite标准&#xff0c;接口角色分为主接口(Maser Interface)和从接口(Slave Interface)。主接口为通…

Ubuntu24安装搜狗输入法,修复闪屏问题

下载deb安装包&#xff1a;搜狗输入法linux-首页 安装&#xff1a;sudo dpkg -i 1.deb 搜狗输入法linux-安装指导 重启&#xff0c;但是完成后闪烁。按以下步骤更改桌面配置。 sudo gedit /etc/gdm3/custom.conf 取消WaylandEnable的注释即可

算法详解——回溯法

一、回溯法概述——问题背景 回溯法是一种解决约束满足问题的方法&#xff0c;特别适用于解决组合问题、搜索优化问题等。它通过逐步构建候选解决方案并且在这个解决方案不再可能满足约束或条件时进行剪枝和回溯。具体来说&#xff0c;回溯法可以应用于以下类型的问题&#xff…

怎么做自己的网站

现如今&#xff0c;拥有自己的网站已经成为现代生活中的一种标志。无论是个人博客、在线商店还是企业官网&#xff0c;都可以通过拥有一个网站来展示自己的个性、产品或服务。在这篇文章中&#xff0c;我将分享如何创建和管理自己的网站。 首先&#xff0c;你需要选择一个合适的…

Ubuntu22.04下安装kafka_2.11-0.10.1.0并运行简单实例

目录 一、版本信息 二、安装Kafka 1. 将Kafka安装包移到下载目录中 2. 安装Kafka并确保hadoop用户对Kafka目录有操作权限 三、启动Kafka并测试Kafka是否正常工作 1. 启动Kafka 2. 测试Kafka是否正常工作 一、版本信息 虚拟机产品&#xff1a;VMware Workstation 17 Pro…

【AI+老照片焕新】母亲节用AI把时间的印记变成暖心礼物

想念是一张泛黄的照片&#xff0c;藏在抽屉里的笑容&#xff0c;总是那么亲切。今天是母亲节&#xff0c;是不是想给妈妈来点不一样的惊喜&#xff1f;用AI技术&#xff0c;把那些老照片瞬间焕新&#xff0c;让妈妈的青春记忆重放光华&#xff01; 想象一下&#xff0c;妈妈年…

社交媒体数据恢复:脉脉

在使用社交软件脉脉的过程中&#xff0c;可能会遇到数据丢失的情况&#xff0c;如误删了重要信息或者更换手机后数据未能同步等问题。那么如何恢复脉脉中的数据呢&#xff1f;本文将为您提供详细的步骤指导。 注意&#xff1a;以下操作需要在脉脉账户登录状态下进行。 登录脉…

具有CMOS输出,高速响应特点的新型汽车级晶振SG2520CAA

爱普生推出的汽车级晶振SG2520CAA。SG2520CAA是一款CMOS输出的&#xff0c;具有高响应速度的2520封装汽车级晶振&#xff0c;具有低电流消耗&#xff0c;1.6 V至3.63 V的宽工作电压&#xff0c;以及-40C至85C的宽工作温度范围&#xff0c;此外还可提供高达125C的工作温度。符合…

C++Linux系统编程——makefile

Makefile Makefile简介 一个工程中的源文件不计其数&#xff0c;其按类型、功能、模块分别放在若干个目录中&#xff0c;makefile定义了一系列的规则来指定&#xff0c;哪些文件需要先编译&#xff0c;哪些文件需要后编译&#xff0c;哪些文件需要重新编译&#xff0c;甚至于…

SSH隧道可以做什么?

SSH隧道是SSH协议服务端提供的一种扩展功能&#xff0c;一般仅在linux服务器的SSH服务端中提供&#xff0c;其它的如交换机、防火墙等网络设备中&#xff0c;虽然支持SSH协议&#xff0c;但多数并不提供SSH隧道功能。 所以&#xff0c;在通过SSH协议连接远程设备时&#xff0c…