一、Zookeeper概述
1.1 Zookeeper的定义
Zookeeper是一个开源的分布式协调服务,主要用于分布式应用程序中的协调管理。它由Apache软件基金会维护,是Hadoop生态系统中的重要成员。Zookeeper提供了一个高效且可靠的分布式锁服务,以及群集管理功能,在分布式系统中起到了“守护神”的作用。
1.2 Zookeeper的核心理念
Zookeeper基于以下关键概念构建:
-
数据模型:Zookeeper的数据模型是一个层次结构,这个层次类似于一个文件系统,与liunx的文件系统类似,整体可以看作为一棵树。它由节点组成,节点也成为ZNode,每个节点可以有子节点。节点可以存储数据,但数据尺寸有限默认存储为1MB的数据。
-
节点(ZNode):Zookeeper中的每个数据单元称为ZNode。ZNode有两种类型:持久(Persistent)和临时(Ephemeral)。持久节点在客户端断开连接后仍存在,而临时节点在客户端断开连接后会被自动删除。
-
观察者(Watcher):客户端可以在ZNode上设置观察者,当ZNode的数据或子节点发生变化时,Watcher会通知对应的客户端。
-
有序性(Orderliness):Zookeeper通过全局顺序来确保所有操作的顺序一致。
-
数据一致性 :每个server保存一份相同的数据拷贝,客户端无论请求到被集群中哪个server处理,得到的数据都是一致的。
-
集群服务:在Zookeeper集群服务由一个领导者(leader),多个跟随者(follower)组成的集群。领导者(leader)负责进行投票的发起和决议,更新集群服务状态。跟随者用于接收客户请求并向客户端返回结果,在选举Leader过程中参与投票。集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。
二、Zookeeper的应用场景
2.1 分布式锁服务
Zookeeper能够非常有效地实现分布式锁。这在需要同步或并发控制的分布式系统中尤为重要。通过利用Zookeeper的临时ZNode特性,可以实现锁的自动释放,防止死锁。
2.2 统一配置管理
在分布式系统中,应用程序的配置管理成为一个复杂的问题。Zookeeper提供了一种集中式管理配置的方式,所有的配置文件可以存储在Zookeeper中,并且可以动态更新。当配置变化时,Zookeeper可以通知到所有客户端,从而使应用程序能够立即响应变化。
2.3 命名服务
Zookeeper可以作为分布式系统的命名服务,通过维护名称和元数据的映射关系,提供高效的名称解析能力。
2.4 集群管理
Zookeeper能够监控集群中各个节点的状态,决定节点是否健康,而节点的加入和离开能够动态调整。
三、Zookeeper的安全管理操作方法
3.1 基本安全措施
-
验证(Authentication):Zookeeper支持基于客户端和服务器之间的认证机制。通过设置用户和密码,可以限制对ZNode的访问。
3.1.1认证方式
- world:默认方式,开放的权限,意解为全世界都能随意访问。
- auth:已经授权且认证通过的用户才可以访问。
- digest:用户名:密码方式认证,实际业务开发中最常用的方式。
- IP白名单:授权指定的Ip地址,和指定的权限点,控制访问。
-
ACL(Access Control Lists):通过设定ACL,能够控制ZNode的读写权限。ACL规则可以根据不同的需求设定,如只读、完全控制等。
3.1.2 ACL授权流程
- 添加认证用户
addauth digest 用户名:密码
- 设置权限
setAcl /path auth:用户名:密码:权限
- 查看Acl设置
getAcl /path
完整的操作如下代码
-- 添加授权用户
[zk: localhost:2181] addauth digest user1:123456
-- 创建节点
[zk: localhost:2181] create /testNode testNode
-- 节点授权
[zk: localhost:2181] setAcl /testNode auth:user1:123456:cdrwa
-- 查看授权
[zk: localhost:2181] getAcl /testNode
3.2 数据加密
在Zookeeper的配置文件中,可以启用数据传输加密(例如SSL/TLS)来保证数据在网络传输中的安全性。
3.3 安全配置示例
# zookeeper configuration
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
digest.authenticationHandler.sasl.clientAllowedProtocols=GSSAPI:CRAM-MD5
四、Zookeeper与Spring Boot 2的整合
4.1 引入依赖
在Spring Boot 2项目中,首先需要引入Curator依赖,这是用于简化Zookeeper操作的一个高层次API。Curator框架在Zookeeper原生API接口上进行二次包装。提供ZooKeeper各种应用场景:比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等API封装。
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.12.0</version>
</dependency>
4.2 Springboot项目yml配置
在application.properties
中,配置Zookeeper的连接信息:
zoo:keeper:#开启标志enabled: true#服务器地址server: 127.0.0.1:2181#命名空间,被称为ZNodenamespace: testNode#权限控制,加密digest: user1:123456#会话超时时间sessionTimeoutMs: 3000#连接超时时间connectionTimeoutMs: 60000#最大重试次数maxRetries: 2#初始休眠时间baseSleepTimeMs: 1000
4.3 编写配置类
编写Zookeeper配置类,用于初始化Zookeeper客户端:
@Configuration
public class ZookeeperConfig {private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperConfig.class) ;//注入Zookeeper配置文件类,用于获取yml的配置项值@Autowiredprivate ZookeeperParam zookeeperParam ;private static CuratorFramework client = null ;/*** 初始化*/@PostConstructpublic void init (){//重试策略,初试时间1秒,重试10次RetryPolicy policy = new ExponentialBackoffRetry(zookeeperParam.getBaseSleepTimeMs(),zookeeperParam.getMaxRetries());//通过工厂创建Curatorclient = CuratorFrameworkFactory.builder().connectString(zookeeperParam.getServer()) //链接的服务的地址.authorization("digest",zookeeperParam.getDigest().getBytes()) //认证方式.connectionTimeoutMs(zookeeperParam.getConnectionTimeoutMs()).sessionTimeoutMs(zookeeperParam.getSessionTimeoutMs()).retryPolicy(policy).build();//开启连接client.start();LOGGER.info("zookeeper 初始化完成...");}public static CuratorFramework getClient (){return client ;}public static void closeClient (){if (client != null){client.close();}}
}
4.4 示例代码
示例代码展示如何在Spring Boot 2项目中使用Zookeeper:
Zookeeper接口类
public interface ZookeeperService {/*** 判断节点是否存在*/boolean isExistNode (final String path) ;/*** 创建节点*/void createNode (CreateMode mode,String path ) ;/*** 设置节点数据*/void setNodeData (String path, String nodeData) ;/*** 创建节点*/void createNodeAndData (CreateMode mode, String path , String nodeData) ;/*** 获取节点数据*/String getNodeData (String path) ;/*** 获取节点下数据*/List<String> getNodeChild (String path) ;/*** 是否递归删除节点*/void deleteNode (String path,Boolean recursive) ;/*** 获取读写锁*/InterProcessReadWriteLock getReadWriteLock (String path) ;
}
Zookeeper接口实现类IMPL
@Service
public class ZookeeperServiceImpl implements ZookeeperService {private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImpl.class);@Overridepublic boolean isExistNode(String path) {CuratorFramework client = ZookeeperConfig.getClient();client.sync() ;try {Stat stat = client.checkExists().forPath(path);return client.checkExists().forPath(path) != null;} catch (Exception e) {LOGGER.error("isExistNode error...", e);e.printStackTrace();}return false;}@Overridepublic void createNode(CreateMode mode, String path) {CuratorFramework client = ZookeeperConfig.getClient() ;try {// 递归创建所需父节点client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);} catch (Exception e) {LOGGER.error("createNode error...", e);e.printStackTrace();}}@Overridepublic void setNodeData(String path, String nodeData) {CuratorFramework client = ZookeeperConfig.getClient() ;try {// 设置节点数据client.setData().forPath(path, nodeData.getBytes("UTF-8"));} catch (Exception e) {LOGGER.error("setNodeData error...", e);e.printStackTrace();}}@Overridepublic void createNodeAndData(CreateMode mode, String path, String nodeData) {CuratorFramework client = ZookeeperConfig.getClient() ;try {// 创建节点,关联数据client.create().creatingParentsIfNeeded().withMode(mode).forPath(path,nodeData.getBytes("UTF-8"));} catch (Exception e) {LOGGER.error("createNode error...", e);e.printStackTrace();}}@Overridepublic String getNodeData(String path) {CuratorFramework client = ZookeeperConfig.getClient() ;try {// 数据读取和转换byte[] dataByte = client.getData().forPath(path) ;String data = new String(dataByte,"UTF-8") ;if (StringUtils.isNotEmpty(data)){return data ;}}catch (Exception e) {LOGGER.error("getNodeData error...", e);e.printStackTrace();}return null;}@Overridepublic List<String> getNodeChild(String path) {CuratorFramework client = ZookeeperConfig.getClient() ;List<String> nodeChildDataList = new ArrayList<>();try {// 节点下数据集nodeChildDataList = client.getChildren().forPath(path);} catch (Exception e) {LOGGER.error("getNodeChild error...", e);e.printStackTrace();}return nodeChildDataList;}@Overridepublic void deleteNode(String path, Boolean recursive) {CuratorFramework client = ZookeeperConfig.getClient() ;try {if(recursive) {// 递归删除节点client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);} else {// 删除单个节点client.delete().guaranteed().forPath(path);}} catch (Exception e) {LOGGER.error("deleteNode error...", e);e.printStackTrace();}}@Overridepublic InterProcessReadWriteLock getReadWriteLock(String path) {CuratorFramework client = ZookeeperConfig.getClient() ;// 写锁互斥、读写互斥InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);return readWriteLock ;}
}
Zookeeper业务API场景使用
@Api("Zookeeper接口使用实例")
@RestController
public class ZookeeperController {@Autowiredprivate ZookeeperService zookeeperService ;@ApiOperation(value="查询节点数据")@GetMapping("/getNodeData")public HttpResult getNodeData (String path) {return HttpResult.create(HttpStatus.SUCCESS,zookeeperService.getNodeData(path));}@ApiOperation(value="判断节点是否存在")@GetMapping("/isExistNode")public HttpResult isExistNode (final String path){return HttpResult.create(HttpStatus.SUCCESS,zookeeperService.isExistNode(path));}@ApiOperation(value="创建节点")@GetMapping("/createNode")public HttpResult createNode (CreateMode mode, String path ){zookeeperService.createNode(mode,path) ;return HttpResult.create(HttpStatus.SUCCESS);}@ApiOperation(value="设置节点数据")@GetMapping("/setNodeData")public HttpResult setNodeData (String path, String nodeData) {zookeeperService.setNodeData(path,nodeData) ;return HttpResult.create(HttpStatus.SUCCESS);}@ApiOperation(value="创建并设置节点数据")@GetMapping("/createNodeAndData")public HttpResult createNodeAndData (CreateMode mode, String path , String nodeData){zookeeperService.createNodeAndData(mode,path,nodeData) ;return HttpResult.create(HttpStatus.SUCCESS);}@ApiOperation(value="递归获取节点数据")@GetMapping("/getNodeChild")public HttpResult getNodeChild (String path) {return HttpResult.create(HttpStatus.SUCCESS,zookeeperService.getNodeChild(path));}@ApiOperation(value="是否递归删除节点")@GetMapping("/deleteNode")public HttpResult deleteNode (String path,Boolean recursive) {zookeeperService.deleteNode(path,recursive) ;return HttpResult.create(HttpStatus.SUCCESS);}
}
接口返回HttpResult统一类
import com.fasterxml.jackson.annotation.JsonInclude;import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;/*** @author Lqzhang* @date 2020/5/19*/
@Data
@ApiModel(value = "HttpResult", description = "统一返回数据结构")
public class HttpResult<T> {@ApiModelProperty(value = "返回状态码")private Integer code;@ApiModelProperty(value = "返回信息")private String msg;@ApiModelProperty(value = "返回数据")@JsonInclude(value = JsonInclude.Include.NON_NULL)private T data;public static <E> HttpResult<E> create(HttpStatus httpStatus) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(httpStatus.getCode());httpResult.setMsg(httpStatus.getMessage());return httpResult;}public static <E> HttpResult<E> create(HttpStatus httpStatus, String msg) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(httpStatus.getCode());httpResult.setMsg(msg);return httpResult;}public static <E> HttpResult<E> create(HttpStatus httpStatus, E data) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(httpStatus.getCode());httpResult.setMsg(httpStatus.getMessage());httpResult.setData(data);return httpResult;}public static <E> HttpResult<E> create(HttpStatus httpStatus, String msg, E data) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(httpStatus.getCode());httpResult.setMsg(msg);httpResult.setData(data);return httpResult;}public static <E> HttpResult<E> create(Integer code, String msg, E data) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(code);httpResult.setMsg(msg);httpResult.setData(data);return httpResult;}public static <E> HttpResult<E> success() {return success(null);}public static <E> HttpResult<E> success(E data) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(200);httpResult.setMsg("操作成功");httpResult.setData(data);return httpResult;}public static <E> HttpResult<E> fail() {return fail(HttpStatus.FAIL.getMessage());}public static <E> HttpResult<E> fail(String message) {HttpResult<E> httpResult = new HttpResult<>();httpResult.setCode(HttpStatus.FAIL.getCode());httpResult.setMsg(message);return httpResult;}
}
/*** 请求结果状态枚举常量类** @author Lqzhang*/
public enum HttpStatus {SUCCESS(200, "请求成功"),NO_DATA(201, "没有查询到对应的数据"),FAIL(203, "请求异常"),PARAM_ERROR(204, "参数名错误或参数为空,请检查"),NO_LOGIN(205, "没有授权"),SAVE_ERROR(206, "操作失败"),NO_DATA_IN_AUTH(207, "权限范围内没有查询到数据"),ARREARS(208, "账户可用余额已不足,请充值"),UNBOUND_PHONE(210, "用户账户未绑定微信手机"),USER_NOT_EXITS(211, "用户不存在"),PASSWORD_ERROR(212, "密码错误"),TOKEN_EXPIRED(403, "当前登录凭证已失效,请重新登录"),SERVICE_NOT_OPENED(215, "该功能未开通,联系管理员开通使用"),;/*** 状态码*/private int code;/*** 状态信息*/private String message;HttpStatus(int code, String message) {this.code = code;this.message = message;}public int getCode() {return code;}public void setCode(int code) {this.code = code;}public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public int getStatusTypeCode(HttpStatus httpStatus) {return httpStatus.getCode();}public String getStatusTypeMessage(HttpStatus httpStatus) {return httpStatus.getMessage();}public static HttpStatus getStatusTypeByCode(int code) {HttpStatus httpStatus = null;for (HttpStatus status : values()) {if (status.getCode() == code) {httpStatus = status;break;}}return httpStatus;}public static HttpStatus getStatusTypeByMessage(String message) {HttpStatus httpStatus = null;for (HttpStatus status : values()) {if (status.getMessage().equals(message)) {httpStatus = status;break;}}return httpStatus;}}
结论
Zookeeper作为分布式系统中的重要组件,提供了多种功能和强大的协调能力。在实际应用中,可以利用Zookeeper实现分布式锁、统一配置管理、命名服务及集群管理等功能。通过与Spring Boot 2的整合,能更好地在应用中利用Zookeeper这些功能,以提升系统的可用性和可靠性。希望通过本文的介绍,您对Zookeeper有更加深入的了解,并能够在实际项目中加以应用。