【Zookeeper专题】Zookeeper经典应用场景实战(一)

目录

  • 前置知识
  • 课程内容
    • 一、Zookeeper Java客户端实战
      • 1.1 Zookeeper 原生Java客户端使用
      • 1.2 Curator开源客户端使用
        • 快速开始
        • 使用示例
    • 二、Zookeeper在分布式命名服务中的实战
      • 2.1 分布式API目录
      • 2.2 分布式节点的命名
      • 2.3 分布式的ID生成器
    • 三、zookeeper实现分布式队列
      • 3.1 设计思路
      • 3.2 使用Apache Curator实现分布式队列
  • 学习总结
  • 感谢

前置知识

在学习本节课之前,至少需要掌握Zookeeper的节点特性,以及基本操作。
《【Zookeeper专题】Zookeeper特性与节点数据类型详解》

课程内容

一、Zookeeper Java客户端实战

Zookeeper的客户端有很多,这边主要介绍的是两种:

  1. Zookeeper官方的Java客户端API
  2. 第三方的Java客户端API,Curator

ZooKeeper官方的客户端API提供了基本的操作,例如:创建会话、增删查改节点等(就是对原有命令交互式客户端的封装)。不过,Zookeeper官方客户端封装度比较低,使用起来不是很方便。这种不方便体现在:

  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册
  • 会话超时之后没有实现重连机制
  • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常
  • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口
  • 创建节点时如果抛出异常,需要自行检查节点是否存在
  • 无法实现级联删除

当然不便之处不止这些,不管怎样,在实际开发中,我们通常不是很建议使用官方API的。

1.1 Zookeeper 原生Java客户端使用

使用前,先引入客户端的依赖:

<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency>

然后是代码示例:

public class ZkClientDemo {private static final  String  CONNECT_STR="localhost:2181";private final static  String CLUSTER_CONNECT_STR="192.168.65.156:2181,192.168.65.190:2181,192.168.65.200:2181";public static void main(String[] args) throws Exception {final CountDownLatch countDownLatch=new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR,4000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected==event.getState() && event.getType()== Event.EventType.None){//如果收到了服务端的响应事件,连接成功countDownLatch.countDown();System.out.println("连接建立");}}});System.out.printf("连接中");countDownLatch.await();//CONNECTEDSystem.out.println(zooKeeper.getState());//创建持久节点zooKeeper.create("/user","fox".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}
}

客户端主要的API有:

create(path, data, acl,createMode):创建一个给定路径的 znode,并在 znode 保存 data[]的数据,createMode指定 znode 的类型。
delete(path, version):如果给定 path上的znode的版本和给定的version匹配,删除znode。
exists(path, watch):判断给定 path 上的 znode 是否存在,并在 znode 设置一个 watch。
getData(path, watch):返回给定 path 上的 znode 数据,并在 znode 设置一个 watch。
setData(path, data, version):如果给定 path 上的 znode 的版本和给定的 version 匹配,设置 znode 数据。
getChildren(path, watch):返回给定 path 上的 znode 的孩子 znode 名字,并在 znode 设置一个 watch。
sync(path):把客户端 session 连接节点和 leader 节点进行同步

以上这些API主要的特点如下:

  1. 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化
  2. 所有更新 znode 数据的 API 都有两个版本,即:无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
  3. 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来自服务端的响应(不过ZK有一点不好的是,对于同步异步方法没有在方法名上显示注明sync/async,而是体现在请求参数callback上)

例如,这边简单演示一下同步跟异步创建节点方法。

// 同步创建,并且返回创建节点的路径信息
@Test
public void createTest() throws KeeperException, InterruptedException {String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);log.info("created path: {}",path);
}// 异步创建
// 看最后一个lambda表达式
@Test
public void createAsycTest() throws InterruptedException {zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,(rc, path, ctx, name) -> log.info("rc  {},path {},ctx {},name {}",rc,path,ctx,name),"context");TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

其余API这里就不演示了,大家伙感兴趣的可以回头去试试。

1.2 Curator开源客户端使用

Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

快速开始

引入maven依赖
Curator的使用包含了几个包:

  • curator-framework是对ZooKeeper的底层API的一些封装
  • curator-client提供了一些客户端的操作,例如重试策略等
  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
<!-- zookeeper client -->
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.0</version>
</dependency><!--curator-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>

创建一个客户端
在使用curator-framework包操作ZooKeeper前,首先要创建一个客户端实例。这是一个CuratorFramework类型的对象,有两种方法:

  1. 使用工厂类CuratorFrameworkFactory的静态newClient()方法
// 重试策略 
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
//创建客户端实例
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
//启动客户端
client.start();
  1. 使用工厂类CuratorFrameworkFactory的静态builder构造者方法
//随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("base") // 包含隔离名称.build();
client.start();

buidler调用链函数说明:

  • connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3
  • retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误
    在这里插入图片描述
  • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。
使用示例

创建节点
创建节点的方式如下面的代码所示,回顾我们之前课程中讲到的内容,描述一个节点要包括节点的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。

 @Test
public void testCreate() throws Exception {String path = curatorFramework.create().forPath("/curator-node");curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())log.info("curator create node :{}  successfully.",path);
}

在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath函数来指定节点的路径和数据信息。

一次性创建带层级结构的节点

@Test
public void testCreateWithParent() throws Exception {String pathWithParent="/node-parent/sub-node-1";String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);log.info("curator create node :{}  successfully.",path);
}

获取数据

@Test
public void testGetData() throws Exception {byte[] bytes = curatorFramework.getData().forPath("/curator-node");log.info("get data from  node :{}  successfully.",new String(bytes));
}

更新数据
我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

@Test
public void testSetData() throws Exception {curatorFramework.setData().forPath("/curator-node","changed!".getBytes());byte[] bytes = curatorFramework.setData().forPath("/curator-node");log.info("get data from  node /curator-node :{}  successfully.",new String(bytes));
}

删除节点

@Test
public void testDelete() throws Exception {String pathWithParent="/node-parent";curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}

guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。

异步接口
Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

public interface BackgroundCallback
{/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

如上接口,主要参数为 client 客户端,和服务端事件 event。inBackground异步处理默认在EventThread中执行

@Test
public void test() throws Exception {curatorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);}).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

或者使用自定义线程池:

@Test
public void test() throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();curatorFramework.getData().inBackground((item1, item2) -> {log.info(" background: {}", item2);},executorService).forPath(ZK_NODE);TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

Curator 监听器
我们知道,ZK的一大特色便是他们的监听机制。Curator在监听方面,相比于原生的客户端,Curator将重复注册、事件信息等进行了高度封装,让用户做到开箱即用。并且在监听事件返回了详细的信息,包括变动的节点路径,节点值等等,这是原生API所没有的。
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。

官方推荐的节点监听API有:

  • NodeCache(已过期):对某一个节点进行监听,监听事件包括指定路径的增删改等操作
@Slf4j
public class NodeCacheTest extends AbstractCuratorTest{public static final String NODE_CACHE="/node-cache";@Testpublic void testNodeCacheTest() throws Exception {createIfNeed(NODE_CACHE);NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {log.info("{} path nodeChanged: ",NODE_CACHE);printNodeData();}});nodeCache.start();}public void printNodeData() throws Exception {byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);log.info("data: {}",new String(bytes));}
}
  • PathChildrenCache(已过期):对指定路径节点的一级子目录监听,不对该节点的操作监听。换句话说就是对其子目录的增删改操作监听
@Slf4j
public class PathCacheTest extends AbstractCuratorTest{public static final String PATH="/path-cache";@Testpublic void testPathCache() throws Exception {createIfNeed(PATH);PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {log.info("event:  {}",event);}});// 如果设置为true则在首次启动时就会缓存节点内容到Cache中pathChildrenCache.start(true);}
}
  • TreeCache(已过期):综合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听,可以设置监听深度
public class TreeCacheTest extends AbstractCuratorTest{public static final String TREE_CACHE="/tree-path";@Testpublic void testTreeCache() throws Exception {createIfNeed(TREE_CACHE);TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {log.info(" tree cache: {}",event);}});treeCache.start();}
}
  • CuratorCache:上面的几个节点缓存API其实已经过期了,最近的版本开始使用CuratorCache单个接口来替代它们,在使用上也更为简单。我们来小小的看一下该类的创建api
    在这里插入图片描述
    如上所示,构建节点缓存的build()方法提供了一个可选的参数optionsOptions是一个内部枚举类型,如果不指定,默认是缓存【给定节点开始的整个节点树】。
    下面是一个简单的使用示例:
package org.tuling.zk.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class TestCuratorCache {private final static String CLUSTER_CONNECT_STR="114.132.46.145:2181";public static void main(String[] args) throws Exception {//构建客户端实例CuratorFramework curator= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 设置重试策略.build();//启动客户端curator.start();assert curator.getState().equals(CuratorFrameworkState.STARTED);curator.blockUntilConnected();if(curator.checkExists().forPath("/father") != null) {curator.delete().deletingChildrenIfNeeded().forPath("/father");}// 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)// 缓存构建选项是SINGLE_NODE_CACHECuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",CuratorCache.Options.SINGLE_NODE_CACHE);// 创建一系列CuratorCache监听器,都是通过lambda表达式指定CuratorCacheListener listener = CuratorCacheListener.builder()// 初始化完成时调用.forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))// 添加或更改缓存中的数据时调用.forCreatesAndChanges((oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]\n",oldNode, node))// 添加缓存中的数据时调用.forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]\n", childData))// 更改缓存中的数据时调用.forChanges((oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]\n",oldNode, node))// 删除缓存中的数据时调用.forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]\n", childData))// 添加、更改或删除缓存中的数据时调用.forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]\n", type, oldData, data)).build();// 给CuratorCache实例添加监听器cache.listenable().addListener(listener);// 启动CuratorCachecache.start();// 创建节点/father/son/grandson1curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/father/son/grandson1", "data".getBytes());// 创建节点/father/son/grandson1/testcurator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/father/son/grandson1/test", "test".getBytes());// 创建节点/father/son/grandson1/test/test2curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/father/son/grandson1/test/test2", "test2".getBytes());// 更改节点/father/son/grandson1的数据curator.setData().forPath("/father/son/grandson1", "new data".getBytes());// 更改节点/father/son/grandson1/test的数据curator.setData().forPath("/father/son/grandson1/test", "new test".getBytes());// 删除节点/father/son/grandson1curator.delete().deletingChildrenIfNeeded().forPath("/father/son/grandson1");Thread.sleep(10000000);}
}

二、Zookeeper在分布式命名服务中的实战

所谓命名服务,其实就是为系统中的资源提供标识能力,被命名的服务比如是:集群中的某个机器,提供服务的地址或者远程对象。ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名。典型的,用到了分布式命名服务的场景有:

  • 分布式API目录
  • 分布式节点命名
  • 分布式ID生成器

2.1 分布式API目录

分布式API目录,即:为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。
在Dubbo中,就是使用了当前方式。使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:

  • 服务提供者(Service Provider)在启动的时候,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。
  • 服务消费者(Consumer)启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API

大概的模型图如下:
在这里插入图片描述

2.2 分布式节点的命名

一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。比如说,当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。
如何为大量的动态节点命名呢?一种简单的办法是可以通过配置文件,手动为每一个节点命名。但是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。
可用于生成集群节点的编号的方案:

  1. 使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护
  2. 使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号

在第2种方案中,集群节点命名服务的基本流程是:

  • 启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点
  • 在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID
  • 如果临时节点太多,可以根据需要删除临时顺序ZNode节点

2.3 分布式的ID生成器

在分布式系统中,分布式ID生成器的使用场景非常之多:

  • 大量的数据记录,需要分布式ID。
  • 大量的系统消息,需要分布式ID。
  • 大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析。
  • 分布式节点的命名服务,往往也需要分布式ID。
  • … …

传统的数据库自增主键已经不能满足需求。在分布式系统环境中,迫切需要一种全新的唯一ID系统,这种系统需要满足以下需求:

  1. 全局唯一:不能出现重复ID
  2. 高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响

市面上,分布式的ID生成器方案大致如下:

  1. Java的UUID
  2. 分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID
  3. Twitter的SnowFlake算法(雪花算法)
  4. ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID
  5. MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个“_id”字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID

我们这里介绍一下,基于Zookeeper实现分布式ID生成器

基于Zookeeper实现分布式ID生成器
在ZooKeeper节点的四种类型中,其中有以下两种类型具备自动编号的能力:

  • PERSISTENT_SEQUENTIAL持久化顺序节点
  • EPHEMERAL_SEQUENTIAL临时顺序节点

ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。
可以通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID:

@Slf4j
public class IDMaker extends CuratorBaseOperations {private String createSeqNode(String pathPefix) throws Exception {CuratorFramework curatorFramework = getCuratorFramework();//创建一个临时顺序节点String destPath = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPefix);return destPath;}public String  makeId(String path) throws Exception {String str = createSeqNode(path);if(null != str){//获取末尾的序号int index = str.lastIndexOf(path);if(index>=0){index+=path.length();return index<=str.length() ? str.substring(index):"";}}return str;}
}@Test
public void testMarkId() throws Exception {IDMaker idMaker = new IDMaker();idMaker.init();String pathPrefix = "/idmarker/id-";for(int i=0;i<5;i++){new Thread(()->{for (int j=0;j<10;j++){String id = null;try {id = idMaker.makeId(pathPrefix);log.info("{}线程第{}个创建的id为{}",Thread.currentThread().getName(),j,id);} catch (Exception e) {e.printStackTrace();}}},"thread"+i).start();}Thread.sleep(Integer.MAX_VALUE);
}

测试结果如下:
在这里插入图片描述

基于Zookeeper实现SnowFlakeID算法
Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字,这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。
在这里插入图片描述
SnowFlakeID的四个部分,具体介绍如下:
1)第一位:占用1 bit,其值始终是0,没有实际作用
2)时间戳:占用41 bit,精确到毫秒,总共可以容纳约69年的时间
3)工作机器id:占用10 bit,最多可以容纳1024个节点
4)序列号:占用12 bit。这个值意味着,在同一毫秒同一节点上,可以生成4096个id,这已经是相当可观了

在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。

SnowFlake算法的优点:

  • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性
  • 容量大,每秒可生成几百万个ID
  • ID呈趋势递增,后续插入数据库的索引树时,性能较高

SnowFlake算法的缺点:

  • 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序
  • 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险

基于ZK实现雪花算法的代码示例如下:(体现在第三部分机器id上)

public class SnowflakeIdGenerator {/*** 单例*/public static SnowflakeIdGenerator instance =new SnowflakeIdGenerator();/*** 初始化单例** @param workerId 节点Id,最大8091* @return the 单例*/public synchronized void init(long workerId) {if (workerId > MAX_WORKER_ID) {// zk分配的workerId过大throw new IllegalArgumentException("woker Id wrong: " + workerId);}instance.workerId = workerId;}private SnowflakeIdGenerator() {}/*** 开始使用该算法的时间为: 2017-01-01 00:00:00*/private static final long START_TIME = 1483200000000L;/*** worker id 的bit数,最多支持8192个节点*/private static final int WORKER_ID_BITS = 13;/*** 序列号,支持单节点最高每毫秒的最大ID数1024*/private final static int SEQUENCE_BITS = 10;/*** 最大的 worker id ,8091* -1 的补码(二进制全1)右移13位, 然后取反*/private final static long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);/*** 最大的序列号,1023* -1 的补码(二进制全1)右移10位, 然后取反*/private final static long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);/*** worker 节点编号的移位*/private final static long WORKER_ID_SHIFT = SEQUENCE_BITS;/*** 时间戳的移位*/private final static long TIMESTAMP_LEFT_SHIFT = WORKER_ID_BITS + SEQUENCE_BITS;/*** 该项目的worker 节点 id*/private long workerId;/*** 上次生成ID的时间戳*/private long lastTimestamp = -1L;/*** 当前毫秒生成的序列*/private long sequence = 0L;/*** Next id long.** @return the nextId*/public Long nextId() {return generateId();}/*** 生成唯一id的具体实现*/private synchronized long generateId() {long current = System.currentTimeMillis();if (current < lastTimestamp) {// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过,出现问题返回-1return -1;}if (current == lastTimestamp) {// 如果当前生成id的时间还是上次的时间,那么对sequence序列号进行+1sequence = (sequence + 1) & MAX_SEQUENCE;if (sequence == MAX_SEQUENCE) {// 当前毫秒生成的序列数已经大于最大值,那么阻塞到下一个毫秒再获取新的时间戳current = this.nextMs(lastTimestamp);}} else {// 当前的时间戳已经是下一个毫秒sequence = 0L;}// 更新上次生成id的时间戳lastTimestamp = current;// 进行移位操作生成int64的唯一ID//时间戳右移动23位long time = (current - START_TIME) << TIMESTAMP_LEFT_SHIFT;//workerId 右移动10位long workerId = this.workerId << WORKER_ID_SHIFT;return time | workerId | sequence;}/*** 阻塞到下一个毫秒*/private long nextMs(long timeStamp) {long current = System.currentTimeMillis();while (current <= timeStamp) {current = System.currentTimeMillis();}return current;}
}

三、zookeeper实现分布式队列

常见的消息队列有:RabbitMQ,RocketMQ,Kafka等。Zookeeper作为一个分布式的小文件管理系统,同样能实现简单的队列功能。但是Zookeeper不适合大数据量存储,官方并不推荐作为队列使用,但由于实现简单,集群搭建较为便利,因此在一些吞吐量不高的小型系统中还是比较好用的。

3.1 设计思路

在这里插入图片描述

  1. 创建队列根节点:在Zookeeper中创建一个持久节点,用作队列的根节点。所有队列元素的节点将放在这个根节点下
  2. 实现入队操作:当需要将一个元素添加到队列时,可以在队列的根节点下创建一个临时有序节点。节点的数据可以包含队列元素的信息
  3. 实现出队操作:当需要从队列中取出一个元素时,可以执行以下操作:
    • 获取根节点下的所有子节点
    • 找到具有最小序号的子节点
    • 获取该节点的数据
    • 删除该节点
    • 返回节点的数据

代码示例如下:

/*** 入队* @param data* @throws Exception*/
public void enqueue(String data) throws Exception {// 创建临时有序子节点zk.create(QUEUE_ROOT + "/queue-", data.getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}/*** 出队* @return* @throws Exception*/
public String dequeue() throws Exception {while (true) {List<String> children = zk.getChildren(QUEUE_ROOT, false);if (children.isEmpty()) {return null;}Collections.sort(children);for (String child : children) {String childPath = QUEUE_ROOT + "/" + child;try {byte[] data = zk.getData(childPath, false, null);zk.delete(childPath, -1);return new String(data, StandardCharsets.UTF_8);} catch (KeeperException.NoNodeException e) {// 节点已被其他消费者删除,尝试下一个节点}}}
}

3.2 使用Apache Curator实现分布式队列

Apache Curator是一个ZooKeeper客户端的封装库,提供了许多高级功能,包括分布式队列。

public class CuratorDistributedQueueDemo {private static final String QUEUE_ROOT = "/curator_distributed_queue";public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new ExponentialBackoffRetry(1000, 3));client.start();// 定义队列序列化和反序列化QueueSerializer<String> serializer = new QueueSerializer<String>() {@Overridepublic byte[] serialize(String item) {return item.getBytes();}@Overridepublic String deserialize(byte[] bytes) {return new String(bytes);}};// 定义队列消费者QueueConsumer<String> consumer = new QueueConsumer<String>() {@Overridepublic void consumeMessage(String message) throws Exception {System.out.println("消费消息: " + message);}@Overridepublic void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {}};// 创建分布式队列DistributedQueue<String> queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_ROOT).buildQueue();queue.start();// 生产消息for (int i = 0; i < 5; i++) {String message = "Task-" + i;System.out.println("生产消息: " + message);queue.put(message);Thread.sleep(1000);}Thread.sleep(10000);queue.close();client.close();}
}

学习总结

  1. 学习了zookeeper客户端Curator的使用

感谢

感谢【51CTO博客】大佬【作者:ITKaven】的文章。《ZooKeeper : Curator框架之数据缓存与监听CuratorCache》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/151157.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一些常见分布-正态分布、对数正态分布、伽马分布、卡方分布、t分布、F分布等

目录 正态分布 对数正态分布 伽马分布 伽马函数 贝塔函数 伽马分布 卡方分布 F分布 t分布 附录 参考文献 本文主要介绍一些常见的分布&#xff0c;包括正态分布、对数正态分布、伽马分布、卡方分布、F分布、t分布。给出了分布的定义&#xff0c;推导了概率密度函数&…

中国34省区市三维地形图(直接保存)

吉林 ▼ 辽宁 ▼ 北京 ▼ 河北 ▼ 山东 ▼ 山西 ▼ 天津 ▼ 江苏 ▼ 福建 ▼ 上海 ▼ 台湾 ▼ 浙江 ▼ 广东 ▼ 广西 ▼ 海南 ▼ 香港和澳门 ▼ 安徽 ▼ 河南 ▼ 湖北 ▼ 湖南 ▼ 江西 ▼ 甘肃 ▼ 内蒙古 ▼ 宁夏 ▼ 青海 ▼ 陕西 ▼ 新疆 ▼ 贵州 …

力扣 -- 1745. 分割回文串 IV

解题步骤&#xff1a; 参考代码&#xff1a; class Solution { public:bool checkPartitioning(string s) {int ns.size();vector<vector<bool>> dp(n,vector<bool>(n));for(int in-1;i>0;i--){for(int ji;j<n;j){if(s[i]s[j]){dp[i][j]i1<j?dp[i…

One Thread One Loop主从Reactor模型⾼并发服务器

One Thread One Loop主从Reactor模型⾼并发服务器 文章目录 One Thread One Loop主从Reactor模型⾼并发服务器一些补充HTTP服务器Reactor 模型eventfd通用类Any 目标功能模块划分&#xff1a;SERVER模块Buffer模块&#xff1a;编写思路&#xff1a;接口设计&#xff1a;具体实现…

【云计算网络安全】DDoS 缓解解析:DDoS 攻击缓解策略、选择最佳提供商和关键考虑因素

文章目录 一、前言二、什么是 DDoS 缓解三、DDoS 缓解阶段四、如何选择 DDoS 缓解提供商4.1 网络容量4.2 处理能力4.3 可扩展性4.4 灵活性4.5 可靠性4.6 其他考虑因素4.6.1 定价4.6.2 所专注的方向 文末送书《数据要素安全流通》本书编撰背景本书亮点本书主要内容 一、前言 云…

蓝桥杯每日一题2023.9.30

蓝桥杯大赛历届真题 - C&C 大学 B 组 - 蓝桥云课 (lanqiao.cn) 题目描述 题目分析 对于此题&#xff0c;首先想到了dfs进行一一找寻&#xff0c;注意每次不要将重复的算进去&#xff0c;故我们每次循环可以记录一个开始的位置&#xff0c;下一次到这个位置时&#xff0c;…

玩转Linux—如何在Linux环境中部署MySQL、Redis和nginx

1、Linux常用命令 Linux学习之路&#xff1a; VMware虚拟机安装Linux系统(详解版) 查看当前文件目录&#xff1a;ls查看目录中文件详细信息&#xff1a;ll输出当前所处的目文件目录&#xff1a;pwdLinux查看当前IP地址&#xff1a;ifconfigWindows查看当前IP地址&#xff1…

【Docker】 docker中apt-get update过慢,这样配置瞬间提速!

docker中apt-get update过慢&#xff0c;这样配置瞬间提速&#xff01; 源官网全球镜像站 源 今天办公地点的网络出奇的差&#xff0c;看电影看小说打游戏完全没影响&#xff0c;只要更新就蜗速前进&#xff0c;只能从网上翻下&#xff0c;看看有没有网速快的下载源。 碰巧看到…

山西电力市场日前价格预测【2023-10-09】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-10-09&#xff09;山西电力市场全天平均日前电价为575.84元/MWh。其中&#xff0c;最高日前电价为1500.00元/MWh&#xff0c;预计出现在17: 30-20: 00。最低日前电价为218.27元/MWh&#x…

RHCE---DNS服务器

文章目录 目录 文章目录 前言 一.DNS服务器 概述&#xff1a; 域名分类 域名服务器的类型划分 DNS域名解析过程 DNS服务器配置 总结 前言 前面几节文章我们了解到时间服务器和远程连接服务器&#xff0c;还通过访问域名的方式建立一个网站&#xff0c; 我们知道访问服务器需…

TDengine OSS 与 qStudio 实现无缝协同,革新数据分析和管理方式

在数字化转型如火如荼的当下&#xff0c;海量爆发的时序数据处理成为转型成功的关键因素之一。为了帮助社区用户更好地进行数据分析和管理&#xff0c;丰富可视化解决方案的多样性&#xff0c;我们将开源的时序数据库&#xff08;Time Series Database&#xff09; TDengine OS…

java版工程项目管理系统 Spring Cloud+Spring Boot+Mybatis+Vue+ElementUI+前后端分离 功能清单

Java版工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离 功能清单如下&#xff1a; 首页 工作台&#xff1a;待办工作、消息通知、预警信息&#xff0c;点击可进入相应的列表 项目进度图表&#xff1a;选择&#xff08;总体或单个&#xff09;项目显示…

Qt + FFmpeg 搭建 Windows 开发环境

Qt FFmpeg 搭建 Windows 开发环境 Qt FFmpeg 搭建 Windows 开发环境安装 Qt Creator下载 FFmpeg 编译包测试 Qt FFmpeg踩坑解决方法1&#xff1a;换一个 FFmpeg 库解决方法2&#xff1a;把项目改成 64 位 后记 官方博客&#xff1a;https://www.yafeilinux.com/ Qt开源社区…

javaee ssm框架项目整合thymeleaf 项目结构图

搭建ssm框架项目 参考这篇博客 引入thymeleaf 引入jar包 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schema…

sheng的学习笔记-【中文】【吴恩达课后测验】Course 1 - 神经网络和深度学习 - 第二周测验

课程1_第2周_测验题 目录&#xff1a;目录 第一题 1.神经元计算什么&#xff1f; A. 【  】神经元计算激活函数后&#xff0c;再计算线性函数&#xff08;zWxb&#xff09; B. 【  】神经元计算一个线性函数&#xff08;zWxb&#xff09;&#xff0c;然后接一个激活函数…

罗彻斯特大学探讨ChatGPT等人工智能将如何影响高等教育

人工智能聊天机器人ChatGPT持续引起互联网用户的热议&#xff0c;它能够回答关于各个领域的问题&#xff0c;创作歌曲、食谱&#xff0c;起草电子邮件等等。罗切斯特的教职员工和管理人员就他们如何处理 ChatGPT 以及它如何影响未来的教学和学习提出了他们的想法。 “让这项技…

文件智能管理将文件统一保存在某个指定文件夹中

日常工作中经常会整理文件到指定的文件夹&#xff0c;少的时候用鼠标拖拖&#xff0c;多了就很麻烦了&#xff0c;手动操作很容易出现漏洞&#xff0c;会漏个某文件没有移动进去或出现重复移动同一个文件等&#xff0c;移动文件这种工作很枯燥可以交给文件批量改名高手软件&…

Spark基础

一、spark基础 1、为什么使用Spark Ⅰ、MapReduce编程模型的局限性 (1) 繁杂 只有Map和Reduce两个操作&#xff0c;复杂的逻辑需要大量的样板代码 (2) 处理效率低 Map中间结果写磁盘&#xff0c;Reduce写HDFS&#xff0c;多个Map通过HDFS交换数据 任务调度与启动开销大 (…

UG\NX二次开发 重命名特征对象 UF_OBJ_set_name

文章作者:里海 来源网站:《里海NX二次开发3000例专栏》 感谢粉丝订阅 感谢 林闹 订阅本专栏,非常感谢。 简介 UG\NX二次开发 重命名特征 UF_OBJ_set_name 效果 代码 #include "me.hpp" #include <vector> #include

管易云与网易互客对接集成发货单查询2.0连通编辑订单(管易包裹物流=>互客销售订单物流(修改)V1)

管易云与网易互客对接集成发货单查询2.0连通编辑订单(管易包裹物流>互客销售订单物流&#xff08;修改&#xff09;V1) 来源系统:管易云 管易云是金蝶旗下专注提供电商企业管理软件服务的子品牌&#xff0c;先后开发了C-ERP、EC-OMS、EC-WMS、E店管家、BBC、B2B、B2C商城网站…