Zookeeper入门到实战

Apache Zookeeper是Apache的一个软件项目,主要为大型分布式系统提供协调服务,例如分布式配置、同步服务和命名注册等。

Zookeeper

Zookeeper定位是一个分布式系统的管理者的角度,因为大多数大数据分布式系统的Logo都是动物的标志,Zookeeper顾名思义动物园管理者,在Zookeeper的论文中也提到Zookeeper是一个用于协调分布式系统应用的服务,它在集群中类似一个大脑的地位,其所维护的信息具有高可用性的特点。


安装

单机模式

下载安装包,进行解压

1
2
wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar zxvf zookeeper-3.4.11.tar.gz

修改配置文档,在Zookeeper的conf目录下,这个目录下有zoo_sample.cfg,将其更名为zoo.cfg

1
mv zoo_sample.cfg zoo.cfg

配置文件信息如下:

1
2
3
tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181

  • tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
  • dataDir: 顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
  • clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

完成上述配置之后,通过bin下面的zkServer.sh脚本启动Zookeeper。

1
./zkServer.sh start


集群模式

Zookeeper不仅可以单机提供服务,同时也支持多机组成集群来提供服务。
Zookeeper集群配置非常简单,就是在其配置项中添加几个配置项:

1
2
3
4
5
initLimit=5
syncLimit=2
server.1=192.168.58.129:2888:3888
server.2=192.168.58.130:2888:3888
server.3=192.168.58.131:2888:3888

  • initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒。
  • syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒。
  • server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

除了修改 zoo.cfg 配置文件,集群模式下还要配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面就有一个数据就是 A 的值,Zookeeper 启动时会读取这个文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是那个 server。


实现原理

数据模型

zookeeper-data-tree
Zookeeper的数据定义成一个树形结构,每一个节点称作为znode。

  • znode:znode包括了存储的数据和ACL(Access Control List)
  • znode有两种类型:
    • Ephemeral znodes: 当客户端与服务器断开之后自动删除。
    • Persistent znode: 只能自己创建或删除。
  • znode序号:如果创建znode时,需要排序标志的话,Zookeeper会自动进行排序。如果我们请求创建一个znode,指定命名为/a/b-,那么ZooKeeper会为我们创建一个名字为/a/b-3的znode。我们再请求创建一个名字为/a/b-的znode,ZooKeeper会为我们创建一个名字/a/b-5的znode。ZooKeeper给我们指定的序号是不断增长的。API中的create()的返回结果就是znode的实际名字。
  • 观察模式:客户端可以设置对某个节点进行观察,如果这个目录节点中的存储数据进行修改,会通知设置观察的客户端。这个通知只有一次,如果需要再次通知的话,需要接到通知之后再设置一次观察。

操作

下面的表格中列出了9种ZooKeeper的操作:

操作 说明
create Creates a znode (the parent znode must already exist)
delete Deletes a znode (the znode must not have any children)
exists Tests whether a znode exists and retrieves its metadata
getACL, setACL Gets/sets the ACL for a znode
getChildren Gets a list of the children of a znode
getData, setData Gets/sets the data associated with a znode
sync Synchronizes a client’s view of a znode with ZooKeeper

实现

ZooKeeper服务可以在两种模式下运行。在standalone模式下,我们可以运行一个单独的ZooKeeper服务器,我们可以在这种模式下进行基本功能的简单测试,但是这种模式没有办法体现ZooKeeper的高可用特性和快速恢复特性。在生产环境中,我们一般采用replicated(复制)模式安装在多台服务器上,组建一个叫做ensemble的集群。ZooKeeper在他的副本之间实现高可用性,并且只要ensemble集群中能够推举出主服务器,ZooKeeper的服务就可以一直不终断。例如,在一个5个节点的ensemble中,容忍有2个节点脱离集群,服务还是可用的。因为剩下的3个节点投票,可以产生超过集群半数的投票,来推选一台主服务器。而6个节点的ensemble中,也只能容忍2个节点的服务器死机。因为如果3个节点脱离集群,那么剩下的3个节点无论如何不能产生超过集群半数的投票来推选一个主服务器。所以,一般情况下ensemble中的服务器数量都是奇数。

从概念上来看,ZooKeeper其实是很简单的。他所做的一切就是保证每一次对znode树的修改,都能够复制到ensemble的大多数服务器上。如果非主服务器脱离集群,那么至少有一台服务器上的副本保存了最新状态。剩下的其他的服务器上的副本,会很快更新这个最新的状态。

为了实现这个简单而不平凡的设计思路,ZooKeeper使用了一个叫做Zab的协议。这个协议分为两阶段,并且不断的运行在ZooKeeper上:

  • 阶段 1:领导选举(Leader election)
    Ensemble中的成员通过一个程序来选举出一个首领成员,我们叫做leader。其他的成员就叫做follower。在大多数(quorum)follower完成与leader状态同步时,这个阶段才结束。

  • 阶段 2: 原子广播(Atomic broadcast)
    所有的写入请求都会发送给leader,leader再广播给follower。当大多数的follower同意写入请求,leader才会将更新提交,客户端就会随之得到leader更新成功的消息。协议中的设计也是具有原子性的,所以写入操作只有成功和失败两个结果。

如果leader脱离了集群,剩下的节点将选举一个新的leader。如果之前的leader回到了集群中,那么将被视作一个follower。leader的选举很快,大概200ms就能够产生结果,所以不会影响执行效率。

Ensemble中的所有节点都会在更新内存中的znode树的副本之前,先将更新数据写入到硬盘上。读操作可以请求任何一台ZooKeeper服务器,而且读取速度很快,因为读取是内存中的数据副本。


数据一致性

每一个对znode树的更新操作,都会被赋予一个全局唯一的ID,我们称之为zxid(ZooKeeper Transaction ID)。更新操作的ID按照发生的时间顺序升序排序。例如,z1大于z2,那么z1的操作就早于z2操作。
ZooKeeper在数据一致性上实现了如下几个方面:

  • 顺序一致性
    从客户端提交的更新操作是按照先后循序排序的。例如,如果一个客户端将一个znode z赋值为a,然后又将z的值改变成b,那么在这个过程中不会有客户端在z的值变为b后,取到的值是a。
  • 原子性
    更新操作的结果不是失败就是成功。即,如果更新操作失败,其他的客户端是不会知道的。
  • 系统视图唯一性
    无论客户端连接到哪个服务器,都将看见唯一的系统视图。如果客户端在同一个会话中去连接一个新的服务器,那么他所看见的视图的状态不会比之前服务器上看见的更旧。当ensemble中的一个服务器宕机,客户端去尝试连接另外一台服务器时,如果这台服务器的状态旧于之前宕机的服务器,那么服务器将不会接受客户端的连接请求,直到服务器的状态赶上之前宕机的服务器为止。
  • 持久性
    一旦更新操作成功,数据将被持久化到服务器上,并且不能撤销。所以服务器宕机重启,也不会影响数据。
  • 时效性
    系统视图的状态更新的延迟时间是有一个上限的,最多不过几十秒。如果服务器的状态落后于其他服务器太多,ZooKeeper会宁可关闭这个服务器上的服务,强制客户端去连接一个状态更新的服务器。

从执行效率上考虑,读操作的目标是内存中的缓存数据,并且读操作不会参与到写操作的全局排序中。这就会引起客户端在读取ZooKeeper的状态时产生不一致。例如,A客户端将znode z的值由a改变成a’,然后通知客户端B去读取z的值,但是B读取到的值是aa,而不是修改后的a’​​ 。为了阻止这种情况出现,B在读取z的值之前,需要调用sync方法。sync方法会强制B连接的服务器状态与leader的状态同步,这样B在读取z的值就是A重新更改过的值了。


会话

ZooKeeper的客户端中,配置了一个ensemble服务器列表。当启动时,首先去尝试连接其中一个服务器。如果尝试连接失败,那么会继续尝试连接下一个服务器,直到连接成功或者全部尝试连接失败。
一旦连接成功,服务器就会为客户端创建一个会话(session)。session的过期时间由创建会话的客户端应用来设定,如果在这个时间期间,服务器没有收到客户端的任何请求,那么session将被视为过期,并且这个session不能被重新创建,而创建的ephemeral znode将随着session过期被删除掉。在会话长期存在的情况下,session的过期事件是比较少见的,但是应用程序如何处理好这个事件是很重要的。
在长时间的空闲情况下,客户端会不断的发送ping请求来保持session。(ZooKeeper的客户端开发工具的liberay实现了自动发送ping请求,所以我们不必去考虑如何维持session)ping请求的间隔被设置成足够短,以便能够及时发现服务器失败(由读操作的超时时长来设置),并且能够及时的在session过期前连接到其他服务器上。
容错连接到其他服务器上,是由ZooKeeper客户端自动完成的。重要的是在连接到其他服务器上后,之前的session以及epemeral节点还保持可用状态。
在容错的过程中,应用将收到与服务断开连接和连接的通知。Watch模式的通知在断开链接时,是不会发送断开连接事件给客户端的,断开连接事件是在重新连接成功后发送给客户端的。如果在重新连接到其他节点时,应用尝试一个操作,这个操作是一定会失败的。对于这一点的处理,是一个ZooKeeper应用的重点。


时间

在ZooKeeper中有一些时间的参数。tick是ZooKeeper的基础时间单位,用来定义ensemble中服务器上运行的程序的时间表。其他时间相关的配置都是以tick为单位的,或者以tick的值为最大值或者最小值。例如,session的过期时间在2 ticks到20 ticks之间,那么你再设置时选择的session过期时间必须在2和20之间的一个数。
通常情况1 tick等于2秒。那么就是说session的过期时间的设置范围在4秒到40秒之间。在session过期时间的设置上有一些考虑。过期时间太短会造成加快物理失败的监测频率。在组成员关系的例子中,session的过期时间与从组中移除失败的成员花费的时间相等。如果设置过低的session过期时间,那么网络延迟就有可能造成非预期的session过期。这种情况下,就会出现在短时间内一台机器不断的离开组,然后又从新加入组中。
如果应用需要创建比较复杂的临时状态,那么就需要较长的session过期时间,因为重构花费的时间比较长。有一些情况下,需要在session的生命周期内重启,而且要保证重启完后session不过期(例如,应用维护和升级的情况)。服务器会给每一个session一个ID和密码,如果在连接创建时,ZooKeeper验证通过,那么session将被恢复使用(只要session没过期就行)。所以应用程序可以实现一个优雅的关机动作,在重启之前,将session的ID和密码存储在一个稳定的地方。重启之后,通过ID和密码恢复session。+

这仅仅是在一些特殊的情况下,我们需要使用这个特性来使用比较长的session过期时间。大多数情况下,我们还是要考虑当出现非预期的异常失败时,如何处理session过期,或者仅需要优雅的关闭应用,在session过期前不用重启应用。
通常情况也越大规模的ensemble,就需要越长的session过期时间。Connetction Timeout、Read Timeout和Ping Periods都由一个以服务器数量为参数的函数计算得到,当ensemble的规模扩大,这些值需要逐渐减小。如果为了解决经常失去连接而需要增加timeout的时长,建议你先监控一下ZooKeeper的metrics,再去调整。


状态

ZooKeeper对象在他的生命周期内会有不同的状态,我们通过getState()来获得当前的状态。

状态是一个枚举类型的数据。新构建的ZooKeeper对象在尝试连接ZooKeeper服务时的状态是CONNECTING,一旦与服务建立了连接那么状态就变成了CONNECTED。
zookeeper-state
客户端可以通过注册一个观察者对象来接收ZooKeeper对象状态的迁移。当通过CONNECTED状态后,观察者将接收到一个WatchedEvent事件,他的属性KeeperState的值是SyncConnected。

ZooKeeper实例会与服务连接断开或者重新连接,状态会在CONNECTING和CONNECTED之间转换。如果连接断开,watcher会收到一个断开连接事件。请注意,这两个状态都是ZooKeeper实例自己初始化的,并且在断开连接后会自动进行重连接。

如果调用了close()或者session过期,ZooKeeper实例会转换为第三个状态CLOSED,此时在接受事件的KeeperState属性值为Expired。一旦ZooKeeper的状态变为CLOSED,说明实例已经不可用(可以通过isAlive()来判断),并且不能再被使用。如果要重新建立连接,就需要重新构建一个ZooKeeper实例。


实战

以Zookeeper C API为例,介绍如何使用Zookeeper

先贴好代码,日后说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <pthread.h>
#include <zookeeper/zookeeper.h>
const char *hosts = "192.168.58.129:2181";
const char *root_path = "/";
const int32_t timeout = 30000;
volatile int connected = 0;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void signal_to_connect() {
pthread_mutex_lock(&lock);
connected = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&lock);
}
void zk_event_callback(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {
if (type == ZOO_SESSION_EVENT && state == ZOO_CONNECTED_STATE) {
printf("connect to zookeeper successfully!\ntype: %d, state: %d\n", type, state);
}
if (type == ZOO_CREATED_EVENT && state == ZOO_CONNECTED_STATE) {
printf("create znode: %s\ntype: %d, state: %d\n", path, type, state);
}
const char* context = (const char *)watcherCtx;
printf("function: %s\n", context);
context = NULL;
signal_to_connect();
}
zhandle_t *init_zhandle() {
char *h = malloc(strlen(hosts)+strlen(root_path)+1); // 1 for '\0'
strcpy(h, hosts);
strcat(h, root_path);
const char *str = __FUNCTION__;
return zookeeper_init(h, zk_event_callback, timeout, 0, (void *)str, 0);
}
void wait_to_connect() {
pthread_mutex_lock(&lock);
while (!connected) {
pthread_cond_wait(&cond, &lock);
}
pthread_mutex_unlock(&lock);
}
void close_zhandle(zhandle_t *handle) {
int res = zookeeper_close(handle);
printf("res: %s\n", zerror(res));
handle = NULL;
}
void exists(zhandle_t *handle, const char *path) {
const char *str = __FUNCTION__;
zoo_set_context(handle, (void *)str);
int res = zoo_exists(handle, path, 1, NULL);
printf("res: %s\n", zerror(res));
}
void create(zhandle_t *handle, const char *path) {
char buffer[64] = {0};
int res = zoo_create(handle, path, NULL, -1, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, buffer, sizeof(buffer));
printf("res: %s\n", zerror(res));
printf("new path: %s\n", buffer);
}
int main(int argc, char *argv) {
zhandle_t *handle = init_zhandle();
assert(handle != NULL);
wait_to_connect();
exists(handle, "/member");
create(handle, "/member");
close_zhandle(handle);
return 0;
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@753: Client environment:zookeeper.version=zookeeper C client 3.4.11
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@757: Client environment:host.name=server2
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@764: Client environment:os.name=Linux
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@765: Client environment:os.arch=3.10.0-514.el7.x86_64
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@766: Client environment:os.version=#1 SMP Tue Nov 22 16:42:41 UTC 2016
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@774: Client environment:user.name=root
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@782: Client environment:user.home=/root
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@log_env@794: Client environment:user.dir=/root/zookeeper
2018-01-31 20:11:06,882:7832(0x7fdb4ec20740):ZOO_INFO@zookeeper_init@827: Initiating client connection, host=192.168.58.129:2181/ sessionTimeout=30000 watcher=0x400c6b sessionId=0 sessionPasswd=<null> context=0x40110d flags=0
2018-01-31 20:11:06,882:7832(0x7fdb4dd00700):ZOO_INFO@check_events@1764: initiated connection to server [192.168.58.129:2181]
2018-01-31 20:11:06,897:7832(0x7fdb4dd00700):ZOO_INFO@check_events@1811: session establishment complete on server [192.168.58.129:2181], sessionId=0x10001934c15000c, negotiated timeout=30000
connect to zookeeper successfully!
type: -1, state: 3
function: init_zhandle
res: no node
res: ok
new path: /member
create znode: /member
type: 1, state: 3
function: exists
2018-01-31 20:11:06,905:7832(0x7fdb4ec20740):ZOO_INFO@zookeeper_close@2564: Closing zookeeper sessionId=0x10001934c15000c to [192.168.58.129:2181]
res: ok

参考资料:
Wikipedia-ApacheZookeeper
分布式服务框架 Zookeeper – 管理分布式环境中的数据
ZooKeeper深入浅出
zookeeper C API介绍
Zookeeper C API 指南