文章目录
- 1、典型应用场景及实现
- 1.1、 数据发布/订阅
- 1.1.1、配置管理案列
- 1.2、负载均衡
- 1.3、命名服务
- 1.4、分布式协调/通知
- 1.4.1、一种通用的分布式系统机器间通信方式
- 1.5、集群管理
- 1.6、Master选举
- 1.7、分布式锁
- 1.7.1、排他锁
- 1.7.2、共享锁
- 1.8、分布式队列
- 2、ZooKeeper在大型分布式系统中的应用
- 2.1、Hadoop
- 2.2、 HBase
- 2.3、Kafka
- 2.3.1、术语简介
- 2.3.2、Broker注册
- 2.3.3、Topic注册
- 2.3.4、负载均衡
- 2.3.5、小结
- 3、ZooKeeper在阿里巴巴的实践与应用
- 3.1、消息中间件:Metamorphosis
- 3.2、RPC服务框架:Dubbo
- 3.3、基于MySQL Binlog的增量订阅和消费组件:Canal
- 3.4、分布式数据库同步系统:Otter
- 3.5、轻量级分布式通用搜索平台:终搜
- 3.6、实时计算引擎:JStorm
- 4、小结
ZooKeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架,开发人员可以使用它来进行分布式数据的发布与订阅。另一方面,通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher事件通知机制,可以非常方便地构建一系列分布式应用中都会涉及的核心功能,如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等。
1、典型应用场景及实现
ZooKeeper是一个高可用的分布式数据管理与协调框架。基于对ZAB算法的实现,该框架能够很好地保证分布式环境中数据的一致性。也正是基于这样的特性,使得ZooKeeper成为了解决分布式一致性问题的利器。
随着近年来互联网系统规模的不断扩大,大数据时代飞速到来,越来越多的分布式系统将ZooKeeper作为核心组件使用,如Hadoop、HBase和Kafka等,因此,正确理解ZooKeeper的应用场景,对于ZooKeeper的使用者来说,显得尤为重要。这篇帖子将重点围绕数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、分布式锁和分布式队列等方面来讲解ZooKeeper的典型应用场景及实现。
1.1、 数据发布/订阅
数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。
发布/订阅系统一般有两种设计模式,分别是推(Push)模式和拉(Pull)模式。在推模式中,服务端主动将数据更新发送给所有订阅的客户端;而拉模式则是由客户端主动发起请求来获取最新数据,通常客户端都采用定时进行轮询拉取的方式。关于这两种模式更详细的讲解以及各自的优缺点,这里就不再赘述,读者可以自行到互联网上搜索相关的资料作进一步的了解。ZooKeeper采用的是推拉相结合的方式:客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。
如果将配置信息存放到ZooKeeper上进行集中管理,那么通常情况下,应用在启动的时候都会主动到ZooKeeper服务端上进行一次配置信息的获取,同时,在指定节点上注册一个Watcher监听,这样一来,但凡配置信息发生变更,服务端都会实时通知到所有订阅的客户端,从而达到实时获取最新配置信息的目的。下面我们通过一个“配置管理”的实际案例来展示ZooKeeper在“数据发布/订阅”场景下的使用方式。
在我们平常的应用系统开发中,经常会碰到这样的需求:系统中需要使用一些通用的配置信息,例如机器列表信息、运行时的开关配置、数据库配置信息等。这些全局配置信息通常具备以下3个特性:
- 数据量通常比较小。
- 数据内容在运行时会发生动态变化。
- 集群中各机器共享,配置一致。
对于这类配置信息,一般的做法通常可以选择将其存储在本地配置文件或是内存变量中。无论采用哪种方式,其实都可以简单地实现配置管理。如果采用本地配置文件的方式,那么通常系统可以在应用启动的时候读取到本地磁盘的一个文件来进行初始化,并且在运行过程中定时地进行文件的读取,以此来检测文件内容的变更。在系统的实际运行过程中,如果我们需要对这些配置信息进行更新,那么只要在相应的配置文件中进行修改,等到系统再次读取这些配置文件的时候,就可以读取到最新的配置信息,并更新到系统中去,这样就可以实现系统配置信息的更新。另外一种借助内存变量来实现配置管理的方式也非常简单,以Java系统为例,通常可以采用JMX方式来实现对系统运行时内存变量的更新。
从上面的介绍中,我们基本了解了如何使用本地配置文件和内存变量方式来实现配置管理。通常在集群机器规模不大、配置变更不是特别频繁的情况下,无论上面提到的哪种方式,都能够非常方便地解决配置管理的问题。但是,一旦机器规模变大,且配置信息变更越来越频繁后,我们发现依靠现有的这两种方式解决配置管理就变得越来越困难了。我们既希望能够快速地做到全局配置信息的变更,同时希望变更成本足够小,因此我们必须寻求一种更为分布式化的解决方案。
1.1.1、配置管理案列
接下去我们就以一个“数据库切换”的应用场景展开,看看如何使用ZooKeeper来实现配置管理。
1、配置存储
在进行配置管理之前,首先我们需要将初始化配置存储到ZooKeeper上去。一般情况下,我们可以在ZooKeeper上选取一个数据节点用于配置的存储,例如/app1/database_config(以下简称“配置节点”),如下图所示:
我们将需要集中管理的配置信息写入到该数据节点中去,例如:
2、配置获取
集群中每台机器在启动初始化阶段,首先会从上面提到的ZooKeeper配置节点上读取数据库信息,同时,客户端还需要在该配置节点上注册一个数据变更的Watcher监听,一旦发生节点数据变更,所有订阅的客户端都能够获取到数据变更通知。
3、配置变更
在系统运行过程中,可能会出现需要进行数据库切换的情况,这个时候就需要进行配置变更。借助ZooKeeper,我们只需要对ZooKeeper上配置节点的内容进行更新,ZooKeeper就能够帮我们将数据变更的通知发送到各个客户端,每个客户端在接收到这个变更通知后,就可以重新进行最新数据的获取。
1.2、负载均衡
据维基百科上的定义,负载均衡(Load Balance)是一种相当常见的计算机网络技术,用来对多个计算机(计算机集群)、网络连接、CPU、磁盘驱动器或其他资源进行分配负载,以达到优化资源使用、最大化吞吐率、最小化响应时间和避免过载的目的。通常负载均衡可以分为硬件和软件负载均衡两类,本节主要探讨的是ZooKeeper在“软”负载均衡中的应用场景。
在分布式系统中,负载均衡更是一种普遍的技术,基本上每一个分布式系统都需要使用负载均衡。分布式系统具有对等性,为了保证系统的高可用性,通常采用副本的方式来对数据和服务进行部署。而对于消费者而言,则需要在这些对等的服务提供方中选择一个来执行相关的业务逻辑,其中比较典型的就是DNS服务。在本节中,我们将详细介绍如何使用ZooKeeper来解决负载均衡问题。
一种动态的DNS服务
DNS是域名系统(Domain Name System)的缩写,是因特网中使用最广泛的核心技术之一。DNS系统可以看作是一个超大规模的分布式映射表,用于将域名和IP地址进行一一映射,进而方便人们通过域名来访问互联网站点。
通常情况下,我们可以向域名注册服务商申请域名注册,但是这种方式最大的缺陷在于只能注册有限的域名:日常开发过程中,经常会碰到这样的情况,在一个Company1公司内部,需要给一个App1应用的服务器集群机器配置一个域名解析。相信有过一线开发经验的读者一定知道,这个时候通常会需要有类似于app1.company1.com的一个域名,其对应的就是一个服务器地址。如果系统数量不多,那么通过这种传统的DNS配置方式还可以应付,但是,一旦公司规模变大,各类应用层出不穷,那么就很难再通过这种方式来进行统一的管理了。
因此,在实际开发中,往往使用本地HOST绑定来实现域名解析的工作。具体如何进行本地HOST绑定,因为不是本贴的重点,并且互联网上有大量的资料,因此这里不再赘述。使用本地HOST绑定的方法,可以很容易解决域名紧张的问题,基本上每一个系统都可以自行确定系统的域名与目标IP地址。同时,这种方法对于开发人员最大的好处就是可以随时修改域名与IP的映射,大大提高了开发调试效率。然而,这种看上去完美的方案,也有其致命的缺陷:当应用的机器规模在一定范围内,并且域名的变更不是特别频繁时,本地HOST绑定是非常高效且简单的方式。然而一旦机器规模变大后,就常常会碰到这样的情况:我们在应用上线的时候,需要在应用的每台机器上去绑定域名,但是在机器规模相当庞大的情况下,这种做法就相当不方便。另外,如果想要临时更新域名,还需要到每个机器上去逐个进行变更,要消耗大量时间,因此完全无法保证实时性。
现在,我们来介绍一种基于ZooKeeper实现的动态DNS方案(以下简称该方案为“DDNS”,Dynamic DNS)。
域名配置
和配置管理一样,我们首先需要在ZooKeeper上创建一个节点来进行域名配置,例如/DDNS/app1/server.app1.company1.com(以下简称“域名节点”),如下图所示:
从上图中我们看到,每个应用都可以创建一个属于自己的数据节点作为域名配置的根节点,例如/DDNS/app1,在这个节点上,每个应用都可以将自己的域名配置上去,下面的清单是一个配置示例:
域名解析
在传统的DNS解析中,我们都不需要关心域名的解析过程,所有这些工作都交给了操作系统的域名和IP地址映射机制(本地HOST绑定)或是专门的域名解析服务器(由域名注册服务商提供)。因此,在这点上,DDNS方案和传统的域名解析有很大的区别——在DDNS中,域名的解析过程都是由每一个应用自己负责的。通常应用都会首先从域名节点中获取一份IP地址和端口的配置,进行自行解析。同时,每个应用还会在域名节点上注册一个数据变更Watcher监听,以便及时收到域名变更的通知。
域名变更
在运行过程中,难免会碰上域名对应的IP地址或是端口变更,这个时候就需要进行域名变更操作。在DDNS中,我们只需要对指定的域名节点进行更新操作,ZooKeeper就会向订阅的客户端发送这个事件通知,应用在接收到这个事件通知后,就会再次进行域名配置的获取。
上面我们介绍了如何使用ZooKeeper来实现一种动态的DNS系统。通过ZooKeeper来实现动态DNS服务,一方面,可以避免域名数量无限增长带来的集中式维护的成本;另一方面,在域名变更的情况下,也能够避免因逐台机器更新本地HOST而带来的繁琐工作。
自动化的DNS服务
根据上面的讲解,相信读者基本上已经能够使用ZooKeeper来实现一个动态的DNS服务了。但是我们仔细看一下上面的实现就会发现,在域名变更环节中,当域名对应的IP地址发生变更的时候,我们还是需要人为地介入去修改域名节点上的IP地址和端口。接下来我们看看下面这种使用ZooKeeper实现的更为自动化的DNS服务。自动化的DNS服务系统主要是为了实现服务的自动化定位,整个系统架构如下图所示:
首先来介绍整个动态DNS系统的架构体系中几个比较重要的组件及其职责:
- Register集群负责域名的动态注册。
- Dispatcher集群负责域名解析。
- Scanner集群负责检测以及维护服务状态(探测服务的可用性、屏蔽异常服务节点等)。
- SDK提供各种语言的系统接入协议,提供服务注册以及查询接口。
- Monitor负责收集服务信息以及对DDNS自身状态的监控。
- Controller是一个后台管理的Console,负责授权管理、流量控制、静态配置服务和手动屏蔽服务等功能,另外,系统的运维人员也可以在上面管理Register、Dispatcher和Scanner等集群。
整个系统的核心当然是ZooKeeper集群,负责数据的存储以及一系列分布式协调。下面我们再来详细地看下整个系统是如何运行的。在这个架构模型中,我们将那些目标IP地址和端口抽象为服务的提供者,而那些需要使用域名解析的客户端则被抽象成服务的消费者。
域名注册
域名注册主要是针对服务提供者来说的。域名注册过程可以简单地概括为:每个服务提供者在启动的过程中,都会把自己的域名信息注册到Register集群中去。
- 服务提供者通过SDK提供的API接口,将域名、IP地址和端口发送给Register集群。例如,A机器用于提供serviceA.xxx.com,于是它就向Register发送一个“域名→IP:PORT”的映射:“serviceA.xxx.com→ 192.168.0.1:8080”。
- Register获取到域名、IP地址和端口配置后,根据域名将信息写入相对应的ZooKeeper域名节点中。
域名解析
域名解析是针对服务消费者来说的,正好和域名注册过程相反:服务消费者在使用域名的时候,会向Dispatcher发出域名解析请求。Dispatcher收到请求后,会从ZooKeeper上的指定域名节点读取相应的IP:PORT列表,通过一定的策略选取其中一个返回给前端应用。
域名探测
域名探测是指DDNS系统需要对域名下所有注册的IP地址和端口的可用性进行检测,俗称“健康度检测”。健康度检测一般有两种方式,第一种是服务端主动发起健康度心跳检测,这种方式一般需要在服务端和客户端之间建立起一个TCP长链接;第二种则是客户端主动向服务端发起健康度心跳检测。在DDNS架构中的域名探测,使用的是服务提供者主动向Scanner进行状态汇报(即第二种健康度检测方式)的模式,即每个服务提供者都会定时向Scanner汇报自己的状态。
Scanner会负责记录每个服务提供者最近一次的状态汇报时间,一旦超过5秒没有收到状态汇报,那么就认为该IP地址和端口已经不可用,于是开始进行域名清理过程。在域名清理过程中,Scanner会在ZooKeeper中找到该域名对应的域名节点,然后将该IP地址和端口配置从节点内容中移除。
1.3、命名服务
命名服务(Name Service)也是分布式系统中比较常见的一类场景,在《Java网络高级编程》一书中提到,命名服务是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等——这些我们都可以统称它们为名字(Name),其中较为常见的就是一些分布式服务框架(如RPC、RMI)中的服务地址列表,通过使用命名服务,客户端应用能够根据指定名字来获取资源的实体、服务地址和提供者的信息等。
Java语言中的JNDI便是一种典型的命名服务。JNDI是Java命名与目录接口(Java Naming and Directory Interface)的缩写,是J2EE体系中重要的规范之一,标准的J2EE容器都提供了对JNDI规范的实现。因此,在实际开发中,开发人员常常使用应用服务器自带的JNDI实现来完成数据源的配置与管理——使用JNDI方式后,开发人员可以完全不需要关心与数据库相关的任何信息,包括数据库类型、JDBC驱动类型以及数据库账户等。
ZooKeeper提供的命名服务功能与JNDI技术有相似的地方,都能够帮助应用系统通过一个资源引用的方式来实现对资源的定位与使用。另外,广义上命名服务的资源定位都不是真正意义的实体资源——在分布式环境中,上层应用仅仅需要一个全局唯一的名字,类似于数据库中的唯一主键。
1.4、分布式协调/通知
分布式协调/通知服务是分布式系统中不可缺少的一个环节,是将不同的分布式组件有机结合起来的关键所在。对于一个在多台机器上部署运行的应用而言,通常需要一个协调者(Coordinator)来控制整个系统的运行流程,例如分布式事务的处理、机器间的互相协调等。同时,引入这样一个协调者,便于将分布式协调的职责从应用中分离出来,从而可以大大减少系统之间的耦合性,而且能够显著提高系统的可扩展性。
ZooKeeper中特有的Watcher注册与异步通知机制,能够很好地实现分布式环境下不同机器,甚至是不同系统之间的协调与通知,从而实现对数据变更的实时处理。基于ZooKeeper实现分布式协调与通知功能,通常的做法是不同的客户端都对ZooKeeper上同一个数据节点进行Watcher注册,监听数据节点的变化(包括数据节点本身及其子节点),如果数据节点发生变化,那么所有订阅的客户端都能够接收到相应的Watcher通知,并做出相应的处理。
1.4.1、一种通用的分布式系统机器间通信方式
在绝大部分的分布式系统中,系统机器间的通信无外乎心跳检测、工作进度汇报和系统调度这三种类型。接下来,我们将围绕这三种类型的机器通信来讲解如何基于ZooKeeper去实现一种分布式系统间的通信方式。
心跳检测
机器间的心跳检测机制是指在分布式环境中,不同机器之间需要检测到彼此是否在正常运行,例如A机器需要知道B机器是否正常运行。在传统的开发中,我们通常是通过主机之间是否可以相互PING通来判断,更复杂一点的话,则会通过在机器之间建立长连接,通过TCP连接固有的心跳检测机制来实现上层机器的心跳检测,这些确实都是一些非常常见的心跳检测方法。
下面来看看如何使用ZooKeeper来实现分布式机器间的心跳检测。基于ZooKeeper的临时节点特性,可以让不同的机器都在ZooKeeper的一个指定节点下创建临时子节点,不同的机器之间可以根据这个临时节点来判断对应的客户端机器是否存活。通过这种方式,检测系统和被检测系统之间并不需要直接相关联,而是通过ZooKeeper上的某个节点进行关联,大大减少了系统耦合。
工作进度汇报
在一个常见的任务分发系统中,通常任务被分发到不同的机器上执行后,需要实时地将自己的任务执行进度汇报给分发系统。这个时候就可以通过ZooKeeper来实现。在ZooKeeper上选择一个节点,每个任务客户端都在这个节点下面创建临时子节点,这样便可以实现两个功能:
- 通过判断临时节点是否存在来确定任务机器是否存活;
- 各个任务机器会实时地将自己的任务执行进度写到这个临时节点上去,以便中心系统能够实时地获取到任务的执行进度。
系统调度
使用ZooKeeper,能够实现另一种系统调度模式:一个分布式系统由控制台和一些客户端系统两部分组成,控制台的职责就是需要将一些指令信息发送给所有的客户端,以控制它们进行相应的业务逻辑。后台管理人员在控制台上做的一些操作,实际上就是修改了ZooKeeper上某些节点的数据,而ZooKeeper进一步把这些数据变更以事件通知的形式发送给了对应的订阅客户端。
总之,使用ZooKeeper来实现分布式系统机器间的通信,不仅能省去大量底层网络通信和协议设计上重复的工作,更为重要的一点是大大降低了系统之间的耦合,能够非常方便地实现异构系统之间的灵活通信。
1.5、集群管理
随着分布式系统规模的日益扩大,集群中的机器规模也随之变大,因此,如何更好地进行集群管理也显得越来越重要了。
所谓集群管理,包括集群监控与集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制。在日常开发和运维过程中,我们经常会有类似于如下的需求:
- 希望知道当前集群中究竟有多少机器在工作。
- 对集群中每台机器的运行时状态进行数据收集。
- 对集群中机器进行上下线操作。
在传统的基于Agent的分布式集群管理体系中,都是通过在集群中的每台机器上部署一个Agent,由这个Agent负责主动向指定的一个监控中心系统(监控中心系统负责将所有数据进行集中处理,形成一系列报表,并负责实时报警,以下简称“监控中心”)汇报自己所在机器的状态。在集群规模适中的场景下,这确实是一种在生产实践中广泛使用的解决方案,能够快速有效地实现分布式环境集群监控,但是一旦系统的业务场景增多,集群规模变大之后,该解决方案的弊端也就显现出来了。
大规模升级困难
以客户端形式存在的Agent,在大规模使用后,一旦遇上需要大规模升级的情况,就非常麻烦,在升级成本和升级进度的控制上面临巨大的挑战。
统一的Agent无法满足多样的需求
对于机器的CPU使用率、负载(Load)、内存使用率、网络吞吐以及磁盘容量等机器基本的物理状态,使用统一的Agent来进行监控或许都可以满足。但是,如果需要深入应用内部,对一些业务状态进行监控,例如,在一个分布式消息中间件中,希望监控到每个消费者对消息的消费状态;或者在一个分布式任务调度系统中,需要对每个机器上任务的执行情况进行监控。很显然,对于这些业务耦合紧密的监控需求,不适合由一个统一的Agent来提供。
编程语言多样性
随着越来越多编程语言的出现,各种异构系统层出不穷。如果使用传统的Agent方式,那么需要提供各种语言的Agent客户端。另一方面,“监控中心”在对异构系统的数据进行整合上面临巨大挑战。
ZooKeeper具有以下两大特性:
- 客户端如果对ZooKeeper的一个数据节点注册Watcher监听,那么当该数据节点的内容或是其子节点列表发生变更时,ZooKeeper服务器就会向订阅的客户端发送变更通知。
- 对在ZooKeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么该临时节点也就被自动清除。
利用ZooKeeper的这两大特性,就可以实现另一种集群机器存活性监控的系统。例如,监控系统在/clusterServers节点上注册一个Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点:/clusterServers/[Hostname]。这样一来,监控系统就能够实时检测到机器的变动情况,至于后续处理就是监控系统的业务了。下面我们就通过分布式日志收集系统和在线云主机管理这两个典型例子来看看如何使用ZooKeeper实现集群管理。
分布式日志收集系统
分布式日志收集系统的核心工作就是收集分布在不同机器上的系统日志,在这里我们重点来看分布式日志系统(以下简称“日志系统”)的收集器模块。
在一个典型的日志系统的架构设计中,整个日志系统会把所有需要收集的日志机器(下文我们以“日志源机器”代表此类机器)分为多个组别,每个组别对应一个收集器,这个收集器其实就是一个后台机器(下文我们以“收集器机器”代表此类机器),用于收集日志。对于大规模的分布式日志收集系统场景,通常需要解决如下两个问题:
变化的日志源机器:在生产环境中,伴随着机器的变动,每个应用的机器几乎每天都是在变化的(机器硬件问题、扩容、机房迁移或是网络问题等都会导致一个应用的机器变化),也就是说每个组别中的日志源机器通常是在不断变化的。
变化的收集器机器:日志收集系统自身也会有机器的变更或扩容,于是会出现新的收集器机器加入或是老的收集器机器退出的情况。
上面两个问题,无论是日志源机器还是收集器机器的变更,最终都归结为一点:如何快速、合理、动态地为每个收集器分配对应的日志源机器,这也成为了整个日志系统正确稳定运转的前提,也是日志收集过程中最大的技术挑战之一。在这种情况下,引入ZooKeeper是个不错的选择。
1.6、Master选举
Master选举是一个在分布式系统中非常常见的应用场景。分布式最核心的特性就是能够将具有独立计算能力的系统单元部署在不同的机器上,构成一个完整的分布式系统。而与此同时,实际场景中往往也需要在这些分布在不同机器上的独立系统单元中选出一个所谓的“老大”,在计算机科学中,我们称之为Master。
在分布式系统中,Master往往用来协调集群中其他系统单元,具有对分布式系统状态变更的决定权。例如,在一些读写分离的应用场景中,客户端的写请求往往是由Master来处理的;而在另一些场景中,Master则常常负责处理一些复杂的逻辑,并将处理结果同步给集群中其他系统单元。Master选举可以说是ZooKeeper最典型的应用场景了,在本节中,我们就结合“一种海量数据处理与共享模型”这个具体例子来看看ZooKeeper在集群Master选举中的应用场景。
在分布式环境中,经常会碰到这样的应用场景:集群中的所有系统单元需要对前端业务提供数据,比如一个商品ID,或者是一个网站轮播广告的广告ID(通常出现在一些广告投放系统中)等,而这些商品ID或是广告ID往往需要从一系列的海量数据处理中计算得到——这通常是一个非常耗费I/O和CPU资源的过程。鉴于该计算过程的复杂性,如果让集群中的所有机器都执行这个计算逻辑的话,那么将耗费非常多的资源。一种比较好的方法就是只让集群中的部分,甚至只让其中的一台机器去处理数据计算,一旦计算出数据结果,就可以共享给整个集群中的其他所有客户端机器,这样可以大大减少重复劳动,提升性能。
这里我们以一个简单的广告投放系统后台场景为例来讲解这个模型。整个系统大体上可以分成客户端集群、分布式缓存系统、海量数据处理总线和ZooKeeper四个部分,如下图所示:
首先我们来看整个系统的运行机制。上图中的Client集群每天定时会通过ZooKeeper来实现Master选举。选举产生Master客户端之后,这个Master就会负责进行一系列的海量数据处理,最终计算得到一个数据结果,并将其放置在一个内存/数据库中。同时,Master还需要通知集群中其他所有的客户端从这个内存/数据库中共享计算结果。
接下去,我们将重点来看Master选举的过程,首先来明确下Master选举的需求:在集群的所有机器中选举出一台机器作为Master。针对这个需求,通常情况下,我们可以选择常见的关系型数据库中的主键特性来实现:集群中的所有机器都向数据库中插入一条相同主键ID的记录,数据库会帮助我们自动进行主键冲突检查,也就是说,所有进行插入操作的客户端机器中,只有一台机器能够成功——那么,我们就认为向数据库中成功插入数据的客户端机器成为Master。
乍一看,这个方案确实可行,依靠关系型数据库的主键特性能够很好地保证在集群中选举出唯一的一个Master。但是我们需要考虑的另一个问题是,如果当前选举出的Master挂了,那么该如何处理?谁来告诉我Master挂了呢?显然,关系型数据库没法通知我们这个事件。那么,如果使用ZooKeeper是否可以做到这一点呢?
利用ZooKeeper的强一致性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,即ZooKeeper将会保证客户端无法重复创建一个已经存在的数据节点。也就是说,如果同时有多个客户端请求创建同一个节点,那么最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很容易地在分布式环境中进行Master选举了。
在这个系统中,首先会在ZooKeeper上创建一个日期节点,例如“2013-09-20”,如下图所示:
客户端集群每天都会定时往ZooKeeper上创建一个临时节点,例如/master_election/2013-09-20/binding。在这个过程中,只有一个客户端能够成功创建这个节点,那么这个客户端所在的机器就成为了Master。同时,其他没有在ZooKeeper上成功创建节点的客户端,都会在节点/master_election/2013-09-20上注册一个子节点变更的Watcher,用于监控当前的Master机器是否存活,一旦发现当前的Master挂了,那么其余的客户端将会重新进行Master选举。
从上面的讲解中,我们可以看到,如果仅仅只是想实现Master选举的话,那么其实只需要有一个能够保证数据唯一性的组件即可,例如关系型数据库的主键模型就是非常不错的选择。但是,如果希望能够快速地进行集群Master动态选举,那么基于ZooKeeper来实现是一个不错的新思路。
1.7、分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要通过一些互斥手段来防止彼此之间的干扰,以保证一致性,在这种情况下,就需要使用分布式锁了。
在平时的实际项目开发中,我们往往很少会去在意分布式锁,而是依赖于关系型数据库固有的排他性来实现不同进程之间的互斥。这确实是一种非常简便且被广泛使用的分布式锁实现方式。然而有一个不争的事实是,目前绝大多数大型分布式系统的性能瓶颈都集中在数据库操作上。因此,如果上层业务再给数据库添加一些额外的锁,例如行锁、表锁甚至是繁重的事务处理,那么是不是会让数据库更加不堪重负呢?下面我们来看看使用ZooKeeper如何实现分布式锁,这里主要讲解排他锁和共享锁两类分布式锁。
1.7.1、排他锁
排他锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。如果事务T1对数据对象O1加上了排他锁,那么在整个加锁期间,只允许事务T1对O1进行读取和更新操作,其他任何事务都不能再对这个数据对象进行任何类型的操作——直到T1释放了排他锁。
从上面讲解的排他锁的基本概念中,我们可以看到,排他锁的核心是如何保证当前有且仅有一个事务获得锁,并且锁被释放后,所有正在等待获取锁的事务都能够被通知到。下面我们就来看看如何借助ZooKeeper实现排他锁。
定义锁
在通常的Java开发编程中,有两种常见的方式可以用来定义锁,分别是synchronized机制和JDK5提供的ReentrantLock。然而,在ZooKeeper中,没有类似于这样的API可以直接使用,而是通过ZooKeeper上的数据节点来表示一个锁,例如/exclusive_lock/lock节点就可以被定义为一个锁,如下图所示:
获取锁
在需要获取排他锁时,所有的客户端都会试图通过调用create()接口,在/exclusive_lock节点下创建临时子节点/exclusive_lock/lock。在前面几节中我们也介绍了,ZooKeeper会保证在所有的客户端中,最终只有一个客户端能够创建成功,那么就可以认为该客户端获取了锁。同时,所有没有获取到锁的客户端就需要到/exclusive_lock节点上注册一个子节点变更的Watcher监听,以便实时监听到lock节点的变更情况。
释放锁
在“定义锁”部分,我们已经提到,/exclusive_lock/lock是一个临时节点,因此在以下两种情况下,都有可能释放锁:
- 当前获取锁的客户端机器发生宕机,那么ZooKeeper上的这个临时节点就会被移除。
- 正常执行完业务逻辑后,客户端就会主动将自己创建的临时节点删除。
无论在什么情况下移除了lock节点,ZooKeeper都会通知所有在/exclusive_lock节点上注册了子节点变更Watcher监听的客户端。这些客户端在接收到通知后,再次重新发起分布式锁获取,即重复“获取锁”过程。整个排他锁的获取和释放流程,可以用下图来表示:
1.7.2、共享锁
共享锁(Shared Locks,简称S锁),又称为读锁,同样是一种基本的锁类型。如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁——直到该数据对象上的所有共享锁都被释放。
共享锁和排他锁最根本的区别在于,加上排他锁后,数据对象只对一个事务可见,而加上共享锁后,数据对所有事务都可见。下面我们就来看看如何借助ZooKeeper来实现共享锁。
定义锁
和排他锁一样,同样是通过ZooKeeper上的数据节点来表示一个锁,是一个类似于“/shared_lock/[Hostname]-请求类型-序号”的临时顺序节点,例如/shared_lock/192.168.0.1-R-0000000001,那么,这个节点就代表了一个共享锁,如下图所示:
获取锁
在需要获取共享锁时,所有客户端都会到/shared_lock这个节点下面创建一个临时顺序节点,如果当前是读请求,那么就创建例如/shared_lock/192.168.0.1-R-0000000001的节点;如果是写请求,那么就创建例如/shared_lock/192.168.0.1-W-0000000001的节点。
判断读写顺序
根据共享锁的定义,不同的事务都可以同时对同一个数据对象进行读取操作,而更新操作必须在当前没有任何事务进行读写操作的情况下进行。基于这个原则,我们来看看如何通过ZooKeeper的节点来确定分布式读写顺序,大致可以分为如下4个步骤:
- 创建完节点后,获取/shared_lock节点下的所有子节点,并对该节点注册子节点变更的Watcher监听。
- 确定自己的节点序号在所有子节点中的顺序。
- 对于读请求:如果没有比自己序号小的子节点,或是所有比自己序号小的子节点都是读请求,那么表明自己已经成功获取到了共享锁,同时开始执行读取逻辑。如果比自己序号小的子节点中有写请求,那么就需要进入等待。
- 接收到Watcher通知后,重复步骤1。
释放锁
释放锁的逻辑和排他锁是一致的,这里不再赘述。整个共享锁的获取和释放流程,可以用下图来表示:
1.8、分布式队列
业界有不少分布式队列产品,不过绝大多数都是类似于ActiveMQ、Metamorphosis、Kafka和HornetQ等的消息中间件(或称为消息队列)。在本节中,我们主要介绍基于ZooKeeper实现的分布式队列。分布式队列,简单地讲分为两大类,一种是常规的先入先出队列,另一种则是要等到队列元素集聚之后才统一安排执行的Barrier模型。
FIFO:先入先出
FIFO(First Input First Output,先入先出)的算法思想,以其简单明了的特点,广泛应用于计算机科学的各个方面。而FIFO队列也是一种非常典型且应用广泛的按序执行的队列模型:先进入队列的请求操作先完成后,才会开始处理后面的请求。
使用ZooKeeper实现FIFO队列,和1.7节中提到的共享锁的实现非常类似。FIFO队列就类似于一个全写的共享锁模型,大体的设计思路其实非常简单:所有客户端都会到/queue_fifo这个节点下面创建一个临时顺序节点,例如/queue_fifo/192.168.0.1-0000000001,如下图所示:
创建完节点之后,根据如下4个步骤来确定执行顺序:
- 通过调用getChildren()接口来获取/queue_fifo节点下的所有子节点,即获取队列中所有的元素。
- 确定自己的节点序号在所有子节点中的顺序。
- 如果自己不是序号最小的子节点,那么就需要进入等待,同时向比自己序号小的最后一个节点注册Watcher监听。
- 接收到Watcher通知后,重复步骤1。
整个FIFO队列的工作流程,可以用下图来表示:
Barrier:分布式屏障
Barrier原意是指障碍物、屏障,而在分布式系统中,特指系统之间的一个协调条件,规定了一个队列的元素必须都集聚后才能统一进行安排,否则一直等待。这往往出现在那些大规模分布式并行计算的应用场景上:最终的合并计算需要基于很多并行计算的子结果来进行。这些队列其实是在FIFO队列的基础上进行了增强,大致的设计思想如下:开始时,/queue_barrier节点是一个已经存在的默认节点,并且将其节点的数据内容赋值为一个数字n来代表Barrier值,例如n=10表示只有当/queue_barrier节点下的子节点个数达到10后,才会打开Barrier。之后,所有的客户端都会到/queue_barrier节点下创建一个临时节点,例如/queue_barrier/192.168.0.1,如下图所示:
创建完节点之后,根据如下5个步骤来确定执行顺序:
- 通过调用getData()接口获取/queue_barrier节点的数据内容:10。
- 通过调用getChildren()接口获取/queue_barrier节点下的所有子节点,即获取队列中的所有元素,同时注册对子节点列表变更的Watcher监听。
- 统计子节点的个数。
- 如果子节点个数还不足10个,那么就需要进入等待。
- 接收到Watcher通知后,重复步骤2。
整个Barrier队列的工作流程,可以用下图来表示:
2、ZooKeeper在大型分布式系统中的应用
介绍ZooKeeper在其中的应用场景和具体的实现方式,帮助读者更好地理解ZooKeeper的分布式应用场景。
2.1、Hadoop
Hadoop是Apache开源的一个大型分布式计算框架,由Lucene创始人DougCutting牵头创建,其定义了一种能够开发和运行处理海量数据的软件规范,用来实现一个在大规模集群中对海量数据进行分布式计算的软件平台。Hadoop的核心是HDFS和MapReduce,分别提供了对海量数据的存储和计算能力,自0.23.0版本开始,Hadoop又引入了全新一代MapReduce框架YARN。
在海量数据存储及处理领域,Hadoop是目前业界公认的最成熟也是最卓越的开源解决方案。本贴不会去过多地介绍Hadoop技术本身,感兴趣的读者可以访问Hadoop的官方网站了解更多关于这一分布式计算框架的内容。本贴主要讨论ZooKeeper在Hadoop中的使用场景。
在Hadoop中,ZooKeeper主要用于实现HA(High Availability),这部分逻辑主要集中在Hadoop Common的HA模块中,HDFS的NameNode与YARN的ResourceManager都是基于此HA模块来实现自己的HA功能的。同时,在YARN中又特别提供了ZooKeeper来存储应用的运行状态。本书将以Cloudera的5.0发布版本为例,围绕YARN中ZooKeeper的使用场景来讲解。
YARN介绍
YARN是Hadoop为了提高计算节点Master(JT)的扩展性,同时为了支持多计算模型和提供资源的细粒度调度而引入的全新一代分布式调度框架。其上可以支持MapReduce计算引擎,也支持其他的一些计算引擎,如Tez、Spark、Storm、Imlala和Open MPI等。其架构体系如下图所示:
从上图中可以看出,YARN主要由ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)和Container四部分组成。其中最为核心的就是ResourceManager,它作为全局的资源管理器,负责整个系统的资源管理和分配。关于YARN的更多介绍,读者可以访问YARN的官方网站[插图]进行查阅。
ZooKeeper一开始是Hadoop的子项目,因此很多设计之初的原始需求都是为了解决Hadoop系统中碰到的一系列分布式问题。虽然Hadoop的架构几经变迁后,ZooKeeper在Hadoop的使用场景也有所变化,但其出色的分布式协调功能依然是Hadoop解决单点和状态信息存储的重要组件。
2.2、 HBase
HBase,全称Hadoop Database,是Google Bigtable的开源实现,是一个基于Hadoop文件系统设计的面向海量数据的高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可以在廉价的PC服务器上搭建起大规模结构化的存储集群。
与大部分分布式NoSQL数据库不同的是,HBase针对数据写入具有强一致性的特性,甚至包括索引列也都实现了强一致性,因此受到了很多互联网企业的青睐。根据公开报道的数据,Facebook和阿里集团都分别拥有数千台的HBase服务器,存储和使用了数以PB计的在线数据。面对如此海量的数据以及如此大规模的服务器集群,如何更好地进行分布式状态协调成为了整个HBase系统正常运转的关键所在。
HBase在实现上严格遵守了Google BigTable论文的设计思想。BigTable使用Chubby来负责分布式状态的协调。在3.1节中我们已经讲解了Chubby,这是Google实现的一种基于Paxos算法的分布式锁服务,而HBase则采用了开源的ZooKeeper服务来完成对整个系统的分布式协调工作。下图中展示了整个HBase架构及其与ZooKeeper之间的结构关系。
从下图中可以看到,在HBase的整个架构体系中,ZooKeeper是串联起HBase集群与Client的关键所在。有趣的是,在2009年以前的HBase代码中,还看不到ZooKeeper的影子,因为当时HBase的定位是离线数据库。随着HBase逐步向在线分布式存储方向发展,出现了一系列难以解决的问题,例如开发者发现如果有RegionServer服务器挂掉时,系统无法及时得知信息,客户端也无法知晓,因此服务难以快速迁移至其他RegionServer服务器上——类似问题都是因为缺少相应的分布式协调组件,于是后来ZooKeeper被加入到HBase的技术体系中。直到今天,ZooKeeper依然是HBase的核心组件,而且ZooKeeper在HBase中的应用场景范围也己经得到了进一步的拓展。
2.3、Kafka
Kafka是知名社交网络公司LinkedIn于2010年12月份开源的分布式消息系统,主要由Scala语言开发,于2012年成为Apache的顶级项目,目前被广泛应用在包括Twitter、Netflix和Tumblr等在内的一系列大型互联网站点上。
Kafka主要用于实现低延迟的发送和收集大量的事件和日志数据——这些数据通常都是活跃的数据。所谓活跃数据,在互联网大型的Web网站应用中非常常见,通常是指网站的PV数和用户访问记录等。这些数据通常以日志的形式记录下来,然后由一个专门的系统来进行日志的收集与统计。
Kafka是一个吞吐量极高的分布式消息系统,其整体设计是典型的发布与订阅模式系统。在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置更改的情况下实现服务器的添加与删除,同样,消息的生产者和消费者也能够做到随意重启和机器的上下线。Kafka服务器及消息生产者和消费者之间的部署关系如下图所示:
2.3.1、术语简介
尽管Kafka是一个近似符合JMS规范的消息中间件实现,但是为了让读者能够更好地理解本节余下部分的内容,这里首先对Kafka中的一些术语进行简单的介绍。
- 消息生产者,即Producer,是消息产生的源头,负责生成消息并发送到Kafka服务器上。
- 消息消费者,即Consumer,是消息的使用方,负责消费Kafka服务器上的消息。
- 主题,即Topic,由用户定义并配置在Kafka服务端,用于建立生产者和消费者之间的订阅关系:生产者发送消息到指定Topic下,消费者从这个Topic下消费消息。
- 消息分区,即Partition,一个Topic下面会分为多个分区,例如“kafka-test”这个Topic可以分为10个分区,分别由两台服务器提供,那么通常可以配置为让每台服务器提供5个分区,假设服务器ID分别为0和1,则所有分区为0-0、0-1、0-2、0-3、0-4和1-0、1-1、1-2、1-3、1-4。消息分区机制和分区的数量与消费者的负载均衡机制有很大关系,后面将会重点展开讲解。
- Broker,即Kafka的服务器,用于存储消息,在消息中间件中通常被称为Broker。
- 消费者分组,即Group,用于归组同类消费者。在Kafka中,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
- Offset,消息存储在Kafka的Broker上,消费者拉取消息数据的过程中需要知道消息在文件中的偏移量,这个偏移量就是所谓的Offset。
2.3.2、Broker注册
Kafka是一个分布式的消息系统,这也体现在其Broker、Producer和Consumer的分布式部署上。虽然Broker是分布式部署并且相互之间是独立运行的,但还是需要有一个注册系统能够将整个集群中的Broker服务器都管理起来。在Kafka的设计中,选择了使用ZooKeeper来进行所有Broker的管理。
在ZooKeeper上会有一个专门用来进行Broker服务器列表记录的节点,下文中我们称之为“Broker节点”,其节点路径为/brokers/ids。
每个Broker服务器在启动时,都会到ZooKeeper上进行注册,即到Broker节点下创建属于自己的节点,其节点路径为/broker/ids/[0…N]。
从上面的节点路径中,我们可以看出,在Kafka中,我们使用一个全局唯一的数字来指代每一个Broker服务器,可以称其为“Broker ID”,不同的Broker必须使用不同的Broker ID进行注册,例如/broker/ids/1和/broker/ids/2分别代表了两个Broker服务器。创建完Broker节点后,每个Broker就会将自己的IP地址和端口等信息写入到该节点中去。
请注意,Broker创建的节点是一个临时节点,也就是说,一旦这个Broker服务器宕机或是下线后,那么对应的Broker节点也就被删除了。因此我们可以通过ZooKeeper上Broker节点的变化情况来动态表征Broker服务器的可用性。
2.3.3、Topic注册
在Kafka中,会将同一个Topic的消息分成多个分区并将其分布到多个Broker上,而这些分区信息以及与Broker的对应关系也都是由ZooKeeper维护的,由专门的节点来记录,其节点路径为/brokers/topics。下文中我们将这个节点称为“Topic节点”。Kafka中的每一个Topic,都会以/brokers/topics/[topic]的形式记录在这个节点下,例如/brokers/topics/login和/brokers/topics/search等。
Broker服务器在启动后,会到对应的Topic节点下注册自己的Broker ID,并写入针对该Topic的分区总数。例如,/brokers/topics/login/3➔2这个节点表明Broker ID为3的一个Broker服务器,对于“login”这个Topic的消息,提供了2个分区进行消息存储。同样,这个分区数节点也是一个临时节点。
2.3.4、负载均衡
生产者负载均衡
Kafka是分布式部署Broker服务器的,会对同一个Topic的消息进行分区并将其分布到不同的Broker服务器上。因此,生产者需要将消息合理地发送到这些分布式的Broker上——这就面临一个问题:如何进行生产者的负载均衡。对于生产者的负载均衡,Kafka支持传统的四层负载均衡,同时也支持使用ZooKeeper方式来实现负载均衡,这里我们首先来看使用四层负载均衡的方案。
四层负载均衡
四层负载均衡方案在设计上比较简单,一般就是根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常一个生产者只会对应单个Broker,然后该生产者生成的所有消息都发送给这个Broker。从设计上,我们可以很容易发现这种方式的优缺点:好处是整体逻辑简单,不需要引入其他三方系统,同时每个生产者也不需要同其他系统建立额外的TCP链接,只需要和Broker维护单个TCP链接即可。
但这种方案的弊端也是显而易见的,事实上该方案无法做到真正的负载均衡。因为在系统实际运行过程中,每个生产者生成的消息量,以及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数非常不均匀。另一方面,生产者也无法实时感知到Broker的新增与删除,因此,这种负载均衡方式无法做到动态的负载均衡。
使用ZooKeeper进行负载均衡
在Kafka中,客户端使用了基于ZooKeeper的负载均衡策略来解决生产者的负载均衡问题。在前面内容中也已经提到,每当一个Broker启动时,会首先完成Broker注册过程,并注册一些诸如“有哪些可订阅的Topic”的元数据信息。生产者就能够通过这个节点的变化来动态地感知到Broker服务器列表的变更。在实现上,Kafka的生产者会对ZooKeeper上的“Broker的新增与减少”、“Topic的新增与减少”和“Broker与Topic关联关系的变化”等事件注册Watcher监听,这样就可以实现一种动态的负载均衡机制了。此外,在这种模式下,还能够允许开发人员控制生产者根据一定的规则(例如根据消费者的消费行为)来进行数据分区,而不仅仅是随机算法而已——Kafka将这种特定的分区策略称为“语义分区”。显然,ZooKeeper在整个生产者负载均衡的过程中扮演了非常重要的角色,通过ZooKeeper的Watcher通知能够让生产者动态地获取Broker和Topic的变化情况。
消费者负载均衡
与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息。Kafka有消费者分组的概念,每个消费者分组中都包含了若干个消费者,每一条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定Topic下面的消息,互不干扰,也不需要互相进行协调。因此消费者的负载均衡也可以看作是同一个消费者分组内部的消息消费策略。
消息分区与消费者关系
Group ID,同一个消费者分组内部的所有消费者都共享该ID。同时,Kafka也会为每个消费者分配一个Consumer ID,通常采用“Hostname:UUID”的形式来表示。在Kafka的设计中,规定了每个消息分区有且只能同时有一个消费者进行消息的消费,因此,需要在ZooKeeper上记录下消息分区与消费者之间的对应关系。每个消费者一旦确定了对一个消息分区的消费权利,那么需要将其Consumer ID写入到对应消息分区的临时节点上,例如/consumers/[group_id]/owners/[topic]/[broker_id-partition_id],其中“[broker_id-partition_id]”就是一个消息分区的标识,节点内容就是消费该分区上消息的消费者的Consumer ID。
消息消费进度Offset记录
在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度,即Offset记录到ZooKeeper上去,以便在该消费者进行重启或是其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息的消费。Offset在ZooKeeper上的记录由一个专门的节点负责,其节点路径为/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id],其节点内容就是Offset值。
消费者注册
下面我们再来看看消费者服务器在初始化启动时加入消费者分组的过程:
1.注册到消费者分组。每个消费者服务器在启动的时候,都会到ZooKeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id]。完成节点创建后,消费者就会将自己订阅的Topic信息写入该节点。注意,该节点也是一个临时节点,也就是说,一旦消费者服务器出现故障或是下线后,其对应的消费者节点就会被删除掉。
2.对消费者分组中消费者的变化注册监听。每个消费者都需要关注所属消费者分组中消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听。一旦发现消费者新增或减少,就会触发消费者的负载均衡。
3.对Broker服务器的变化注册监听。消费者需要对/broker/ids/[0…N]中的节点进行监听的注册,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者的负载均衡。
4.进行消费者负载均衡。所谓消费者负载均衡,是指为了能够让同一个Topic下不同分区的消息尽量均衡地被多个消费者消费而进行的一个消费者与消息分区分配的过程。通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会触发消费者负载均衡。
2.3.5、小结
Kafka从设计之初就是一个大规模的分布式消息中间件,其服务端存在多个Broker,同时为了达到负载均衡,将每个Topic的消息分成了多个分区,并分布在不同的Broker上,多个生产者和消费者能够同时发送和接收消息。Kafka使用ZooKeeper作为其分布式协调框架,很好地将消息生产、消息存储和消息消费的过程有机地结合起来。同时借助ZooKeeper,Kafka能够在保持包括生产者、消费者和Broker在内的所有组件无状态的情况下,建立起生产者和消费者之间的订阅关系,并实现了生产者和消费者的负载均衡。
3、ZooKeeper在阿里巴巴的实践与应用
自2011年上半年起,阿里巴巴中间件团队的几位技术专家,率先将ZooKeeper引入到了阿里巴巴集团,并先后基于其开发了一系列分布式系统,其中就包括知名的分布式消息中间件Metamorphosis和PAAS解决方案TAE系统。经过近3年的开发与运维,目前中间件团队运维的ZooKeeper集群规模,已经从最初的3台服务器,增长到了7个集群27台服务器;客户端规模也已经从最初不到100个客户端,增长到了1万多个客户端,高峰时期甚至覆盖全网1/3的机器。同时,也滋生出了众多上层业务系统,其中包括消息中间件Metamorphosis、RPC服务框架Dubbo、MySQL复制组件Canal和同步组件Otter等一大批知名的开源系统。
3.1、消息中间件:Metamorphosis
Metamorphosis是阿里巴巴中间件团队的killme2008和wq163于2012年3月开源的一个Java消息中间件,目前项目主页地址为https://github.com/killme2008/Metamorphosis,由开源爱好者及项目的创始人killme2008和wq163持续维护。关于消息中间件,相信读者应该都听说过JMS规范,以及一些典型的开源实现,如ActiveMQ和HornetQ等,Metamorphosis也是其中之一。
Metamorphosis是一个高性能、高可用、可扩展的分布式消息中间件,其思路起源于LinkedIn的Kafka,但并不是Kafka的一个简单复制。Metamorphosis具有消息存储顺序写、吞吐量大和支持本地XA事务等特性,适用于大吞吐量、顺序消息、消息广播和日志数据传输等分布式应用场景,目前在淘宝和支付宝都有着广泛的应用,其系统整体部署结构如下图所示:
3.2、RPC服务框架:Dubbo
Dubbo是阿里巴巴于2011年10月正式开源的一个由Java语言编写的分布式服务框架,致力于提供高性能和透明化的远程服务调用方案和基于服务框架展开的完整SOA服务治理方案。目前项目主页地址为https://github.com/alibaba/dubbo。
Dubbo的核心部分包含以下三块:
- 远程通信:提供对多种基于长连接的NIO框架抽象封装,包括多种线程模型、序列化,以及“请求-响应”模式的信息交换方式。
- 集群容错:提供基于接口方法的远程过程透明调用,包括对多协议的支持,以及对软负载均衡、失败容错、地址路由和动态配置等集群特性的支持。
- 自动发现:提供基于注册中心的目录服务,使服务消费方能动态地查找服务提供方,使地址透明,使服务提供方可以平滑地增加或减少机器。
此外,Dubbo框架还包括负责服务对象序列化的Serialize组件、网络传输组件Transport、协议层Protocol以及服务注册中心Registry等,其整体模块组成和协作方式如下图所示。在本节中,我们将主要关注Dubbo中基于ZooKeeper实现的服务注册中心。
3.3、基于MySQL Binlog的增量订阅和消费组件:Canal
Canal是阿里巴巴于2013年1月正式开源的一个由纯Java语言编写的基于MySQL数据库Binlog实现的增量订阅和消费组件。目前项目主页地址为https://github.Com/alibaba/canal,由项目主要负责人,同时也是资深的开源爱好者agapple持续维护。
项目名Canal取自“管道”的英文单词,寓意数据的流转,是一个定位为基于MySQL数据库的Binlog增量日志来实现数据库镜像、实时备份和增量数据消费的通用组件。
早期的数据库同步业务,大多都是使用MySQL数据库的触发器机制(即Trigger)来获取数据库的增量变更。不过从2010年开始,阿里系下属各公司开始逐步尝试基于数据库的日志解析来获取增量变更,并在此基础上实现数据的同步,由此衍生出了数据库的增量订阅和消费业务——Canal项目也由此诞生了。
Canal的工作原理相对比较简单,其核心思想就是模拟MySQL Slave的交互协议,将自己伪装成一个MySQL的Slave机器,然后不断地向Master服务器发送Dump请求。Master收到Dump请求后,就会开始推送相应的Binary Log给该Slave(也就是Canal)。Canal收到Binary Log,解析出相应的Binary Log对象后就可以进行二次消费了,其基本工作原理如下图所示:
3.4、分布式数据库同步系统:Otter
Otter是阿里巴巴于2013年8月正式开源的一个由纯Java语言编写的分布式数据库同步系统,主要用于异地双A机房的数据库数据同步,致力于解决长距离机房的数据同步及双A机房架构下的数据一致性问题。目前项目主页地址为https://github.com/alibaba/otter,由项目主要负责人,同时也是资深的开源爱好者agapple持续维护。
项目名Otter取自“水獭”的英文单词,寓意数据搬运工,是一个定位为基于数据库增量日志解析,在本机房或异地机房的MySQL/Oracle数据库之间进行准实时同步的分布式数据库同步系统。Otter的第一个版本可以追溯到2004年,初衷是为了解决阿里巴巴中美机房之间的数据同步问题,从4.0版本开始开源,并逐渐演变成一个通用的分布式数据库同步系统。其基本架构如下图所示:
从上图中,我们可以看出,在Otter中也是使用ZooKeeper来实现一些与分布式协调相关的功能,下面我们将从Otter的分布式SEDA[插图]模型调度和面向全球机房服务的ZooKeeper集群搭建两方面来讲解Otter中的ZooKeeper使用。
3.5、轻量级分布式通用搜索平台:终搜
终搜(Terminator)是阿里早期的一款产品,最早应用在淘江湖,基于Lucene、Solr、ZooKeeper和Hadoop等开源技术构建,全方位支持各种检索需求,是一款实时性高、接入成本低、支持个性化检索定制的分布式全文检索系统。历经发展,终搜目前已成为服务于阿里集团内部各大业务线的通用搜索平台,截止2014年4月,已经有200多个不同规模、不同查询特征的应用接入使用。
终搜系统主要由前端业务查询处理、后台索引构建、数据存储和后台管理四大部分组成,其整体架构如下图所示:
3.6、实时计算引擎:JStorm
随着互联网大数据技术的不断发展,人们对数据实时性的要求越来越高,传统Hadoop的Map Reduce技术已经逐渐无法满足这些需求,因此实时计算成为了眼下大数据领域最热门的研究方向之一,出现了诸如Storm和JStorm这样的实时计算引擎。Storm是Twitter开源的一个高容错的分布式实时计算系统,而JStorm是阿里巴巴集团中间件团队在Storm基础上改造和优化的一个分布式实时计算引擎,使用Java语言编写,于2013年9月正式开源。相较于Storm,JStorm在功能上更强大,在稳定性和性能上有更卓越的表现,目前广泛应用于日志分析、消息转化器和统计分析器等一系列无状态的实时计算系统上。
JStorm是一个类似于Hadoop MapReduce的分布式任务调度系统,用户按照指定的接口编写一个任务程序,然后将这个任务程序提交给JStorm系统,JStorm会负责7×24小时运行并调度该任务。在运行过程中如果某个任务执行器(Worker)发生意外情况或其他故障,调度器会立即分配一个新的Worker替换这个失效的Worker来继续执行任务。
JStorm是一个典型的分布式调度系统,其系统整体架构如下图所示:
4、小结
ZooKeeper是一个高可用的分布式数据管理与系统协调框架。基于对ZAB算法的实现,该框架很好地保证了分布式环境中数据的一致性。也正是基于这样的特性,使得ZooKeeper成为了解决分布式一致性问题的利器。随着近年来互联网系统规模的不断扩大,大数据时代飞速到来,越来越多的分布式系统将ZooKeeper作为核心组件使用,如Hadoop、Hbase和Kafka等。因此,正确地理解ZooKeeper的应用场景,对于研发人员来说,显得尤为重要。
从数据发布/订阅、负载均衡、命名服务、分布式通知/协调、集群管理、Master选举、分布式锁和分布式队列等这些分布式系统中常见的应用场景展开,从理论上向读者讲解了ZooKeeper的最佳实践,同时结合Hadoop、HBase和Kafka等这些大型分布式系统以及阿里巴巴的一系列开源系统,向读者展现了如何借助ZooKeeper解决实际生产中的分布式问题。