1. Zookeeper原生客户端库存在的缺点
复杂性高 :原生客户端库提供了底层的 API,需要开发者手动处理很多细节,如连接管理、会话管理、异常处理等。这增加了开发的复杂性,容易出错。连接管理繁琐 :使用原生客户端库时,开发者需要手动管理与 ZooKeeper 的连接。这包括连接的建立、重连、会话超时处理等。异常处理复杂 :原生客户端库的 API 抛出多种异常,如 KeeperException
、InterruptedException
等。开发者需要手动处理这些异常,增加了代码的复杂性。缺少高级功能 :原生客户端库缺少一些高级功能,如连接池管理、自动重试、负载均衡等。这些功能在实际应用中非常有用,但需要开发者自己实现或使用第三方库。缺少封装和抽象 :原生客户端库提供了底层的 API,缺少更高层次的封装和抽象。开发者需要自己编写大量的代码来实现常见的功能,如分布式锁、配置管理等。性能调优困难 :原生客户端库的性能调优需要开发者手动进行,如调整连接超时时间、会话超时时间等。这需要对 ZooKeeper 的工作原理有深入的理解。缺少社区支持 :相比于一些更高级的客户端库(如 Curator),原生客户端库的社区支持相对较少。开发者在使用过程中遇到问题时,可能难以找到解决方案。
2. Apache Curator介绍
2.1 基本概述
定义 :Apache Curator是专为Apache ZooKeeper设计的Java/JVM客户端库,通过提供高级API框架及一系列实用工具,大幅降低使用ZooKeeper的复杂度并提升应用的可靠性。开发背景 :Curator最初由Netflix公司开源,目前是Apache的顶级项目。
2.2 核心功能
高可用性连接管理 :自动处理与ZooKeeper服务器的连接断开和重新连接,确保连接的稳定性和可靠性。易于使用的API :封装复杂的ZooKeeper原语,提供更直观、简洁的使用方式,降低开发难度。模式(Recipes) :预置了一系列常见的分布式计算模式,如leader选举、分布式锁、缓存机制等,开发者可以快速实现这些分布式系统经典难题。服务发现与负载均衡 :支持动态的服务注册与发现,便于构建云原生应用,提高系统的可扩展性和灵活性。异步DSL :针对Java 8及以上版本提供了异步编程的支持,提高了响应速度和程序效率。
3. 使用指南
3.1 添加 Maven 依赖
< dependency> < groupId> org.apache.curator</ groupId> < artifactId> curator-framework</ artifactId> < version> 2.12.0</ version>
</ dependency>
< dependency> < groupId> org.apache.curator</ groupId> < artifactId> curator-recipes</ artifactId> < version> 2.12.0</ version>
</ dependency>
3.2 创建 Curator 客户端
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
import org. apache. zookeeper. CreateMode ; public class CuratorExample { public static void main ( String [ ] args) throws Exception { String connectString = "192.168.200.138:2181" ; String path = "/curator1" ; byte [ ] data = "myData" . getBytes ( ) ; ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ; CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ; client. start ( ) ; client. create ( ) . withMode ( CreateMode . PERSISTENT ) . forPath ( path, data) ; byte [ ] retrievedData = client. getData ( ) . forPath ( path) ; System . out. println ( "Retrieved data: " + new String ( retrievedData) ) ; client. close ( ) ; }
}
3.3 增删改查操作及Watcher监听
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. framework. api. CuratorEvent ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
import org. apache. zookeeper. CreateMode ;
import org. apache. zookeeper. WatchedEvent ;
import org. apache. zookeeper. Watcher ; public class CuratorExample { public static void main ( String [ ] args) throws Exception { String connectString = "192.168.200.138:2181" ; String path = "/curator1" ; byte [ ] data1 = "myData1" . getBytes ( ) ; byte [ ] data2 = "myData2" . getBytes ( ) ; ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ; CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ; client. start ( ) ; client. getCuratorListenable ( ) . addListener ( ( CuratorFramework c, CuratorEvent event) -> { switch ( event. getType ( ) ) { case WATCHED : WatchedEvent watchedEvent = event. getWatchedEvent ( ) ; if ( watchedEvent. getType ( ) == Watcher. Event. EventType. NodeDataChanged ) { System . out. println ( "监听的数据变化为: " + new String ( c. getData ( ) . forPath ( path) ) ) ; System . out. println ( "触发事件" ) ; } } } ) ; client. create ( ) . withMode ( CreateMode . PERSISTENT ) . forPath ( path, data1) ; byte [ ] retrievedData = client. getData ( ) . watched ( ) . forPath ( path) ; System . out. println ( "原始数据: " + new String ( retrievedData) ) ; client. setData ( ) . forPath ( path, data2) ; Thread . sleep ( 2000 ) ; client. delete ( ) . forPath ( path) ; Thread . sleep ( 2000 ) ; }
}
3.4 进行永久监听
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. framework. recipes. cache. ChildData ;
import org. apache. curator. framework. recipes. cache. NodeCache ;
import org. apache. curator. retry. ExponentialBackoffRetry ;
import org. apache. zookeeper. CreateMode ; public class PermanentWatcherExample { public static void main ( String [ ] args) throws Exception { String connectString = "192.168.200.138:2181" ; String path = "/curator1" ; byte [ ] data1 = "myData1" . getBytes ( ) ; byte [ ] data2 = "myData2" . getBytes ( ) ; byte [ ] data3 = "myData3" . getBytes ( ) ; ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ; CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ; client. start ( ) ; client. create ( ) . withMode ( CreateMode . PERSISTENT ) . forPath ( path, data1) ; NodeCache nodeCache = new NodeCache ( client, path) ; nodeCache. start ( ) ; nodeCache. getListenable ( ) . addListener ( ( ) -> { ChildData currentData = nodeCache. getCurrentData ( ) ; if ( currentData != null ) { System . out. println ( "触发了永久监听的回调,当前值为:" + new String ( currentData. getData ( ) ) ) ; } } ) ; client. setData ( ) . forPath ( path, data1) ; Thread . sleep ( 2000 ) ; client. setData ( ) . forPath ( path, data2) ; Thread . sleep ( 2000 ) ; client. setData ( ) . forPath ( path, data3) ; Thread . sleep ( 2000 ) ; client. delete ( ) . forPath ( path) ; }
}
3.5 使用分布式锁
import org. apache. curator. framework. CuratorFramework ;
import org. apache. curator. framework. CuratorFrameworkFactory ;
import org. apache. curator. framework. recipes. locks. InterProcessMutex ;
import org. apache. curator. retry. ExponentialBackoffRetry ; public class DistributedLockExample { public static void main ( String [ ] args) throws Exception { String connectString = "192.168.200.138:2181" ; String path = "/myLock" ; ExponentialBackoffRetry retry = new ExponentialBackoffRetry ( 5000 , 10 ) ; CuratorFramework client = CuratorFrameworkFactory . newClient ( connectString, retry) ; client. start ( ) ; InterProcessMutex lock = new InterProcessMutex ( client, path) ; lock. acquire ( ) ; try { System . out. println ( "Lock acquired, executing critical section..." ) ; Thread . sleep ( 2000 ) ; } finally { lock. release ( ) ; System . out. println ( "Lock released." ) ; } client. close ( ) ; }
}