一、关于ZooKeeper
ZooKeeper 是一个分布式协调服务的开源框架,本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对节点进行有效管理。主要用来解决分布式集群中应用系统的一致性的问题,帮助开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。
二、数据模型
2.1 Znode
ZooKeeper有一个类似分布式文件系统的命名体系。区别在于Zookeeper每个一个节点或子节点都可以拥有数据。节点路径是一个由斜线分开的绝对路径,注意没有相对路径。其中每个节点都被称作为Znode。
Znode兼具文件和目录两种特点。既像文件一样维护着数据、元信息、ACL(Access Control List,访问控制列表)、时间戳等数据结构,又像目录一样可以作为路径标识的一部分。
每个Znode由3部分组成:
- stat:此为状态信息, 描述该Znode的版本, 权限等信息
- data:与该Znode关联的数据
- children:该Znode下的子节点
其中Stat结构由以下字段组成:
- czxid:创建znode的zxid。
- mzxid:上次修改znode的zxid。
- pzxid:最后修改znode子节点的zxid。
- ctime:创建znode的时间,以毫秒为单位。
- mtime:znode最后一次修改的时间(以毫秒为单位)。
- version:该znode数据的修改次数。
- cversion:该znode的子节点的变化次数。
- aversion:该znode的ACL变化次数。
- ephemeralOwner:如果该znode是一个ephemeralnode,则该znode所有者的会话id。如果不是临时节点,则为零。
- dataLength:该znode的数据字段长度。
- numChildren:该znode的子节点数。
2.2 Znode特性
znode是程序员使用的主要对象,它有以下几个关键特点:
Watches
客户端可以在znode上设置watches。对该znode的更改会触发该watch,同时这个watche会被清除(若想再次获取需要重新设置)。当watch触发时,ZooKeeper会向客户端发送通知。
数据访问
存储在znode上的数据是读写都是原子性的。读获取znode的所有数据,写则替换所有数据。每个节点都有一个访问控制列表(Access Control List, ACL)来限制权限。
ZooKeeper支持以下权限:
- CREATE:创建子节点
- READ:你可以从一个节点获取数据并列出它的子节点。
- WRITE:设置节点数据
- DELETE:删除子节点
- ADMIN:可设置权限
ZooKeeper不是作为一个通用的数据库或大型对象存储来设计的。znode每个节点默认的数据量上限是1M(这个值可以通过配置修改),这样可以保证不需要花费额外的时间来进行网络以及IO操作。
Ephemeral Nodes 临时节点
ZooKeeper有临时节点的概念。只要创建znode的会话处于活动状态,这些znode就会存在。当会话结束时就会被删除。由于这种行为,临时的znode不允许有子节点。会话的临时列表可以使用getEphemerals()
api检索。
Sequence Nodes 有序节点
当创建znode时,你也可以要求ZooKeeper在路径的末尾添加一个单调递增的计数器。这个计数器对父znode来说是唯一的。计数器的格式为%010d——即10位填充为0的数字。
三、会话
要创建一个客户端会话,应用程序代码必须提供一个连接字符串,其中包含一个用逗号分隔的host:port对列表,每个端口对对应一个ZooKeeper服务器(例如:“127.0.0.1:4545”或“127.0.0.1:3000、127.0.0.1:3001 127.0.0.1:3002”)。ZooKeeper客户端会创建一个服务的句柄来与ZooKeeper服务建立会话,这时句柄处于CONNECTING状态,之后它会选择一个任意的服务器并尝试连接到它。如果这个连接失败,或者由于任何原因客户端与服务器断开连接,客户端将自动尝试列表中的下一个服务器,直到连接建立,句柄进入CONNECTED状态。如果发生不可恢复的错误,例如会话过期或身份验证失败,或者应用程序显式关闭句柄,则句柄将移动到CLOSED状态。
四、Zookeeper leader选举
4.1 选举
leader选举主要分为以下两种情况:
4.1.1 启动时期的Leader选举
每个节点启动的时候都 LOOKING 观望状态,接下来就开始进行选举主流程。这里选取三台机器组成的集群为例。第一台服务器 server1启动时,无法进行 leader 选举,当第二台服务器 server2 启动时,两台机器可以相互通信,进入 leader 选举过程。
(1)每台 server 发出一个投票,由于是初始情况,server1 和 server2 都将自己作为 leader 服务器进行投票,每次投票包含所推举的服务器myid、zxid、epoch,使用(myid,zxid)表示,此时 server1 投票为(1,0),server2 投票为(2,0),然后将各自投票发送给集群中其他机器。
(2)接收来自各个服务器的投票。集群中的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票(epoch)、是否来自 LOOKING 状态的服务器。
(3)分别处理投票。针对每一次投票,服务器都需要将其他服务器的投票和自己的投票进行对比,对比规则如下:
a. 优先比较 epoch
b. 检查 zxid,zxid 比较大的服务器优先作为 leader
c. 如果 zxid 相同,那么就比较 myid,myid 较大的服务器作为 leader 服务器
(4)统计投票。每次投票后,服务器统计投票信息,判断是都有过半机器接收到相同的投票信息。server1、server2 都统计出集群中有两台机器接受了(2,0)的投票信息,此时已经选出了 server2 为 leader 节点。
(5)改变服务器状态。一旦确定了 leader,每个服务器响应更新自己的状态,如果是 follower,那么就变更为 FOLLOWING,如果是 Leader,变更为 LEADING。此时 server3继续启动,直接加入变更自己为 FOLLOWING。
4.1.2 运行过程中的 leader 选举
当集群中 leader 服务器出现宕机或者不可用情况时,整个集群无法对外提供服务,进入新一轮的 leader 选举。
(1)变更状态。leader 挂后,其他非 Oberver服务器将自身服务器状态变更为 LOOKING。
(2)每个 server 发出一个投票。在运行期间,每个服务器上 zxid 可能不同。
(3)处理投票。规则同启动过程。
(4)统计投票。与启动过程相同。
(5)改变服务器状态。与启动过程相同。
4.2 选举状态:
- LOOKING: 竞选状态
- FOLLOWING: 随从状态,同步 leader 状态,参与投票
- OBSERVING: 观察状态,同步 leader 状态,不参与投票
- LEADING: 领导者状态
4.3 选举时的几个参数:
- 服务器 ID(myid):编号越大在选举算法中权重越大
- 事务 ID(zxid):值越大说明数据越新,权重越大
- 逻辑时钟(epoch-logicalclock):同一轮投票过程中的逻辑时钟值是相同的,每投完一次值会增加
4.4 为什么Zookeeper集群节点是奇数个
在Zookeeper中Leader选举算法采用了Quorom算法。该算法的核心思想是当多数Server写成功,则任务数据写成功。假设有3个Server,则最多允许一个Server挂掉;如果有4个Server,则同样最多允许一个Server挂掉。既然3个或者4个Server,同样最多允许1个Server挂掉,那么它们的可靠性是一样的,所以选择奇数个ZooKeeperServer即可。
五、数据同步
在 Zookeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性。ZAB 协议分为两部分:
5.1 消息广播
Zookeeper 使用单一的主进程 Leader 来接收和处理客户端所有事务请求,并采用 ZAB 协议的原子广播协议,将事务请求以 Proposal 提议广播到所有 Follower 节点,当集群中有过半的Follower 服务器进行正确的 ACK 反馈,那么Leader就会再次向所有的 Follower 服务器发送commit 消息,将此次提案进行提交。这个过程可以简称为 2pc 事务提交。
5.2 崩溃恢复
在正常情况消息广播情况下能运行良好,但是一旦 Leader 服务器出现崩溃,或者由于网络原理导致 Leader 服务器失去了与过半 Follower 的通信,那么就会进入崩溃恢复模式,需要选举出一个新的 Leader 服务器。在这个过程中可能会出现两种数据不一致性的隐患,需要 ZAB 协议的特性进行避免。
(1)Leader 服务器将消息 commit 发出后,立即崩溃
(2)Leader 服务器刚提出 proposal 后,立即崩溃
ZAB 协议的恢复模式使用了以下策略:
(1)选举 zxid 最大的节点作为新的 leader
(2)新 leader 将事务日志中尚未提交的消息进行处理
六、安装
6.1 安装JDK
6.2 下载程序
地址:https://zookeeper.apache.org/releases.html
6.3 配置
复制conf目录的zoo_sample.cfg文件,命名为zoo.cfg,可以配置以下参数:
# 心跳时间,最小会话超时将是TickTime的两
tickTime=2000
# 数据目录
dataDir=/var/lib/zookeeper
# 端口
clientPort=2181
6.4 启动
linux执行以下命令:
# 启动
bin/zkServer.sh start
# 关闭
bin/zkServer.sh stop
windows直接执行zkServer.cmd
即可
6.5 使用zkCli
bin/zkCli.sh -server 127.0.0.1:2181
以下是一些常用的命令
# 获取帮助
help
# 查看
ls /
# 创建一个节点
create /my_node my_data
# 创建一个临时节点
create -e /my_node_e my_data
# 创建一个有序节点
create -s /my_node_s my_data
# 查看节点数据
get /my_node
# 查看节点stat以及data
get -s /my_node
# 修改节点数据
set /my_node my_data_2
# 删除节点
delete /my_node
# 递归删除节点
deleteall /my_node/child_node
# 查看节点stat
stat /my_node
# 监听子节点变化(监听是一次性的,若需要继续监听需要再次执行监听命令)
ls -w /rabb
# 监听节点数据变化
get -w /rabb
6.6 搭建集群
以三节点集群为例,步骤如下:
6.6.1 配置
分别在3台机器安装zookeeper,然后修改其配置文件zoo.cfg,如下:
dataDir=/usr/local/zookeeper/data
server.1=192.168.10.31:2888:3888
server.2=192.168.10.32:2888:3888
server.3=192.168.10.33:2888:3888
之后在三台机器的创建对应data目录,并创建myid文件,设置其内容为对应的id值
mkdir -p /usr/local/zookeeper/data
echo "1"> /usr/local/zookeeper/data/myid
6.6.2 启动
依次启动三台的zookeeper
6.6.3 验证集群
通过以下命令,查看zookeeper节点的集群状态以及其对应leader或follower角色
zkServer.sh status
七、Java API
7.1 引入依赖
...
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
...
7.2 基础方法
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
String nodePath = "/rabb_java_api";
// 创建方法
String create = zooKeeper.create(nodePath, "test_data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(create);
// 获取子节点方法
List<String> children = zooKeeper.getChildren("/", true);
System.out.println(children);
// 判断节点是否存在方法
Stat exists = zooKeeper.exists(nodePath, false);
System.out.println(exists);
// 获取节点数据方法
byte[] data = zooKeeper.getData(nodePath, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
}, null);
System.out.println(new String(data));
// 修改节点数据
zooKeeper.setData(nodePath, "test_data1".getBytes(), zooKeeper.exists(nodePath, false).getVersion());
// 删除节点
zooKeeper.delete(nodePath, zooKeeper.exists(nodePath, false).getVersion());
}
7.3 监听使用
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
String nodePath = "/rabb_java_api";
Watcher defaultWatcher = watchedEvent -> {
System.out.println("defaultWatcher start");
System.out.println("state:" + watchedEvent.getState().name());
System.out.println("type:" + watchedEvent.getType().name());
System.out.println("path:" + watchedEvent.getPath());
System.out.println("defaultWatcher end");
};
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 2000, defaultWatcher);
Watcher childrenWatcher = watchedEvent -> {
System.out.println("childrenWatcher start");
System.out.println("state:" + watchedEvent.getState().name());
System.out.println("type:" + watchedEvent.getType().name());
System.out.println("path:" + watchedEvent.getPath());
System.out.println("childrenWatcher end");
};
Watcher dataWatcher = watchedEvent -> {
System.out.println("dataWatcher start");
System.out.println("state:" + watchedEvent.getState().name());
System.out.println("type:" + watchedEvent.getType().name());
System.out.println("path:" + watchedEvent.getPath());
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDataChanged) {
try {
byte[] data = zooKeeper.getData(nodePath, false, null);
System.out.println("data change to " + Strings.fromByteArray(data));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("dataWatcher end");
};
zooKeeper.getChildren(nodePath, childrenWatcher);
zooKeeper.getData(nodePath, dataWatcher, null);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
参考
官方文档:https://zookeeper.apache.org/doc/r3.7.0/zookeeperProgrammers.html