找回密码
 立即注册
查看: 201|回复: 0

ZooKeeper基础原理

[复制链接]
发表于 2022-5-30 11:16 | 显示全部楼层 |阅读模式
1 系统模型

1.1 数据模型


image.png

1.2 事务ID


ZK中事务是指能够改变ZK服务器状态的操作,也称之为:事务操作或更新操作,包括节点创建、删除、内容更新、会话创建/失效等。对于每个事务请求,ZK都会为其分配一个全局唯一事务ID。
1.3 节点特性


持久节点、临时节点、顺序节点:

    持久节点:该数据节点被创建后,就会一直存在于ZK服务器中,知道有删除操作来主动清除。

    临时节点:生命周期和客户端的会话绑定在一起
2 Watcher通知

总体可以概括为如下3个步骤:

    客户端注册Watcher

    服务端处理Watcher

    客户端回调Watcher

ZK的Watcher机制主要包括3部分:客户端线程、客户端WatchManager、ZK服务器。

image.png

整体逻辑如下:客户端在向ZK服务器注册Watcher的同时,会将Watcher对象存储在客户端的WatcherManager中。当ZK服务器端触发Watcher事件后,会向客户端发送通知,客户端线程从WatchManager中取出对应的Watcher对象来执行回调逻辑。
public interface Watcher { void process(WatchedEvent event);}public class WatchedEvent { final private KeeperState keeperState; final private EventType eventType; private String path;}
关于WatchedEvent和WatcherEvent二者表示同一个事物,都是对服务端事件的封装,区别如下:

    WatchedEvent:一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象

    WatcherEvent:实现了序列化接口,用于网络传输
2.1 客户端注册Watcher

2.1.1 类图结构


image.png

1、WatchRegistration:用于暂存,数据节点的路径和Watcher的对应关系
abstract class WatchRegistration { private Watcher watcher; private String clientPath;}class DataWatchRegistration extends WatchRegistration { @Override protected Map<String, Set<Watcher>> getWatches(int rc) { // 参考ZKWatchManager#dataWatches return watchManager.dataWatches; }}
2、ZKWatchManager:管理客户端的Watcher信息
private static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<>(); private final Map<String, Set<Watcher>> existWatches = new HashMap<>(); private final Map<String, Set<Watcher>> childWatches = new HashMap<>(); private volatile Watcher defaultWatcher;}2.1.2 注册流程


在创建一个Zookeeper客户端对象时,可以向构造器中传入一个默认的Watcher,该Watcher将作为整个ZK会话期间的默认Watcher,会一直被保存在客户端的ZKWatchManager#defaultWatcher中,构造器如下:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);// 也可以通过getData, getChildren, exist等3个接口向ZK中注册Watcher,3者工作原理一样,以getData为例如下public byte[] getData(final String path, Watcher watcher, Stat stat);public byte[] getData(String path, boolean watch, Stat stat);  // 通过boolean来标识是否使用构造器中提到的默认Watcher来进行注册
客户端Watcher注册流程图如下(以getData接口为例分析):

image-20200428223827327.png

1、客户端首先对当前请求request进行标记,将其设置为:使用Watcher监听,同时封装一个Watcher的注册信息WatchRegistration对象,该对象用于暂存数据节点的路径和Watcher的对应关系
public byte[] getData(final String path, Watcher watcher, Stat stat) {WatchRegistration wcb = null;if (watcher != null) {wcb = new DataWatchRegistration(watcher, path);}// 请求头RequestHeader header = new RequestHeader();header.setType(ZooDefs.OpCode.getData);// 请求体GetDataRequest request = new GetDataRequest();request.setPath(serverPath);  // 此处为服务端的路径,代码省略了:final String serverPath = prependChroot(path);request.setWatch(watcher != null);// 响应GetDataResponse response = new GetDataResponse();ReplyHeader r = cnxn.submitRequest(header, request, response, wcb);return response.getData();}
2、在ZK中,Packet是最小的通信协议单元,用于进行客户端和服务端的网络传输,任何需要传输的对象都会被封装成一个Packet对象。因此在ClientCnxn中会将对象WatchRegistration封装到Packet中,然后放入到发送队列中等待
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration)  { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r;}Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; // 省略 outgoingQueue.add(packet); } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet;}
3、随后ZK客户端会向服务端发送该请求,同时等待请求的返回。请求发送完毕后,由ClientCnxn.SendThread线程的readResponse()方法负责接收来自服务端的响应,最终finishPacket()方法会从Packet中取出对应的Watcher并注册到ZKWatchManager中
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); }}
4、由于之前已经将Watcher暂存在WatchRegistration中,现在只需要从该对象中提取出Watcher,然后将Watcher对象转交给ZKWatchManager中,并最终保存到dataWatches中。

注意:如果客户端注册的所有Watcher都被传递到服务端,那么服务端肯定会出现内存紧张或其他性能问题,因此,在将WatchRegistration封装到Packet对象的底层网络传输的序列化过程中,并没有将WatchRegistration对象完全地序列化到底层字节数组中。ZK只是将requestHeader和request两个属性进行序列化。如下:
public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); }}2.2 服务端处理Watcher

2.2.1 类图结构


ServerCnxn表示一个客户端和服务端的连接信息,默认使用NIOServerCnxn,同时在3.4.0版本之后,引入了基于Netty的实现版本:NettyServerCnxn。

[图片上传失败...(image-2e174e-1629099527238)]

同时ServerCnxn也实现了Watcher接口,因此可以将ServerCnxn看做是一个Watcher对象,在服务端,数据节点的路径和ServerCnxn最终都会存储在WatchManager中的watchTable和watch2Paths中:

    watchTable:从数据节点路径的粒度来托管Watcher

    watch2Paths:从Watcher的粒度控制时间出发需要出发的数据节点路径

注意WatchManager还负责Watcher事件的触发,并移除那些已经被触发的Watcher。

注意WatchManager只是个统称,在服务端,DataTree中会托管2个WatchManager:dataWatches和childWatches

[图片上传失败...(image-4baa9e-1629099527238)]
2.2.2 ServerCnxn存储


image.png

总结:对于标记了Watcher注册的请求,ZK会将其对应的ServerCnxn存储到WatchManager中。即如下代码
// FinalRequestProcessorcase OpCode.getData: {    lastOp = "GETD";    //....    // getDataRequest.getWatch()=true,即认为当前客户端请求需要进行Watcher注册,就会将ServerCnxn对象和数据节点路径传入到getData()方法中    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);    rsp = new GetDataResponse(b, stat);    break;}// ZKDatabasepublic byte[] getData(String path, Stat stat, Watcher watcher) {    return dataTree.getData(path, stat, watcher);}// DataTreepublic byte[] getData(String path, Stat stat, Watcher watcher) {    DataNode n = nodes.get(path);    synchronized (n) {        if (watcher != null) {            dataWatches.addWatch(path, watcher);        }        return n.data;    }}2.2.2 Watcher触发


例如事件:NodeDataChanged事件的触发条件就是,Watcher监听的对应数据节点的数据内容发生变更。具体实现如下:

[图片上传失败...(image-97703-1629099527238)]

在对指定节点进行数据更新后,通过调用WatchManager的triggerWatch()方法来触发相关的事件:
public Set<Watcher> triggerWatch(String path, EventType type) {    return triggerWatch(path, type, null);}/* 1\. 封装WatchEvent   2\. 查询Watcher   3\. 调用process()方法来触发Watcher。注意,这里的Watcher其实就是ServerCnxn对象。*/public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);    HashSet<Watcher> watchers;    synchronized (this) {        watchers = watchTable.remove(path);        for (Watcher w : watchers) {            HashSet<String> paths = watch2Paths.get(w);            if (paths != null) {                paths.remove(path);            }        }    }    for (Watcher w : watchers) {        if (supress != null && supress.contains(w)) {            continue;        }        w.process(e);       // ServerCnxn#process    }    return watchers;}
注意,这里的Watcher其实就是ServerCnxn对象,因此这里调用的ServerCnxn对象的process()方法。
// ServerCnxnsynchronized public void process(WatchedEvent event) {    // 在请求头中标记"-1",表明当前是一个通知    ReplyHeader h = new ReplyHeader(-1, -1L, 0);    // 将WatchedEvent包装成WatcherEvent,以便进行网络传输    WatcherEvent e = event.getWrapper();    sendResponse(h, e, "notification");}2.3 客户端回调Watcher


当服务端使用ServerCnxn对应的TCP连接向客户端发送一个WatcherEvent事件,客户端接收程序:ClientCnxn.SendThread,示例如下:
void readResponse(ByteBuffer incomingBuffer) throws IOException {    //如果响应头replyHdr中标识了XID=-1,表明这是一个通知类型的响应    /* 1\. 发序列化       2\. 处理chrootPath       3\. 还原WatchedEvent       4\. 回调Watcher,将WatchedEvent对象交给EventThread线程,在下一个轮询周期中进行Watcher回调  */    if (replyHdr.getXid() == -1) {        WatcherEvent event = new WatcherEvent();        event.deserialize(bbia, "response");        if (chrootPath != null) {            String serverPath = event.getPath();            if(serverPath.compareTo(chrootPath)==0)                event.setPath("/");            else if (serverPath.length() > chrootPath.length())                event.setPath(serverPath.substring(chrootPath.length()));        }        WatchedEvent we = new WatchedEvent(event);        eventThread.queueEvent( we );        return;    }}
Watcher的回调处理是通过ClientCnxn.EventThread线程来完成,该线程是ZK客户端专门用来处理服务端通知事件的线程。
// EventThreadpublic void queueEvent(WatchedEvent event) {    if (event.getType() == EventType.None && sessionState == event.getState()) {        return;    }    sessionState = event.getState();    WatcherSetEventPair pair = new WatcherSetEventPair(            watcher.materialize(event.getState(), event.getType(),                    event.getPath()),                    event);    // waitingEvents队列是一个待处理Watcher的队列,EventThread.run()方法会不断对该队列进行处理    waitingEvents.add(pair);}// ZKWatchManager// 从ZKWatchManager中取出所有相关的Watcherpublic Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {    Set<Watcher> result = new HashSet<Watcher>();    switch (type) {        //...        case NodeDataChanged:        case NodeCreated:            synchronized (dataWatches) {                addTo(dataWatches.remove(clientPath), result);            }            synchronized (existWatches) {                addTo(existWatches.remove(clientPath), result);            }            break;        //...        return result;    }}final private void addTo(Set<Watcher> from, Set<Watcher> to) {    if (from != null) {        to.addAll(from);    }}
客户端在识别出事件类型EventType后,会从相应的Watcher存储(dataWatches、existWatches或childWatches中的一个或多个)中取出对应的Watcher。在获取到所有相关的Watcher之后,会将其放入到waitingEvents队列中,waitingEvents是一个待处理Watcher的队列,EventThread.run()方法会不断对该队列进行处理
public void run() {    isRunning = true;    while (true) {        Object event = waitingEvents.take();        if (event == eventOfDeath) {            wasKilled = true;        } else {            processEvent(event);        }    }}private void processEvent(Object event) {    if (event instanceof WatcherSetEventPair) {        WatcherSetEventPair pair = (WatcherSetEventPair) event;        for (Watcher watcher : pair.watchers) {            // 此处有try...catch...            watcher.process(pair.event);        }    }}3 序列化和协议

3.1 Jute序列化


ZK从第一个正式版本开始,就一直使用Jute作为序列化组件来进行网络数据传输和本地磁盘数据存储的序列化和反序列化工作。从2008年开始,ZK官方就提出要使用Apache Avro、Thirft、protobuf这样的组件来替换Jute,但由于实施复杂,一直未能实现(Jute的序列化能力目前不是ZK的性能瓶颈)
3.1.1 Record接口


Jute定义了自己独特的序列化格式Record,所有序列化对象均需要实现该接口,接口如下:
public interface Record {    // archive是底层真正的序列化器和反序列化器,因为一个archive中可以包含对多个对象的序列化和反序列化操作,因此需要一个tag进行标识    void serialize(OutputArchive archive, String tag) throws IOException;    void deserialize(InputArchive archive, String tag) throws IOException;}
序列化过程就是将当前对象的各个成员变量以一定的标记(tag)写入到序列化器重。
3.1.2 Archive


InputArchive和OutputArchive分别是Jute底层的序列化器和反序列化器接口定义。

image.png

3.2 Jute示例


示例代码:
public class MockReqHeader implements Record {    private long sessionId;    private String type;    // 构造器:无参构造器、全参构造器    // setter/getter    // 序列化过程就是将当前对象的各个成员变量以一定的标记(tag)写入到序列化器重。    public void serialize(OutputArchive a_, String tag) throws java.io.IOException {        a_.startRecord(this, tag);        a_.writeLong(sessionId, "sessionId");        a_.writeString(type, "type");        a_.endRecord(this, tag);    }    public void deserialize(InputArchive a_, String tag) throws java.io.IOException {        a_.startRecord(tag);        sessionId = a_.readLong("sessionId");        type = a_.readString("type");        a_.endRecord(tag);    }}
Jute的序列化和反序列示例:
// 序列化代码ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);new MockReqHeader(1000000001L, "ping").serialize(boa, "header");// 网络传输ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());baos.close();// 反序列化ByteArrayInputStream bais = new ByteArrayInputStream();BinaryIutputArchive bia = BinaryIutputArchive..getArchive(bais);MockReqHeader header = new MockReqHeader();header.deserialize(bia, "header");bais.close();4 通信协议

基于TCP/IP协议,ZK实现了自己的通信协议来完成客户端和服务端、服务端和服务端之间的网络通信。ZK通信协议整体上的设计非常简单:
    对于请求协议,主要包含:请求体和请求头

image.png

    对于响应协议,主要包含:响应体和响应头

image.png

4.1 请求协议


ZK的请求头包含了请求最基本的信息,包括xid和type。

    xid:记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序

    type:请求的操作类型,常见如:创建节点、删除节点、获取节点等
public class RequestHeader implements Record {  private int xid;  private int type;}
根据协议规定,除非是“会话创建”请求,其他所有的客户端请求中都会带上请求头。

最主要的请求体就包含:会话创建ConnectRequest、获取节点数据GetDataRequest、更新节点数据SetDataRequest
4.2 响应协议


响应头中包含了每一个响应最基本的信息,包括:xid、zxid、err。

    xid:请求中的xid原路返回

    zxid:代表ZK服务器端的事务ID

    err:错误码,用以标识请求处理异常
public class ReplyHeader implements Record {  private int xid;  private long zxid;  private int err;}
最主要的响应体包括:会话创建ConnectResponse、获取节点数据GetDataResponse、更新节点数据SetDataResponse
附录1 源码导入

ANT


我以为zookeeper就是用Maven来构建的,没想到是Ant。虽然笔者对它不熟,但还是下载并安装了它的环境,并按照大部分博客中说的那样来试图导入zookeeper。1、在gitHub上下载zookeeper源码

我以为zookeeper就是用Maven来构建的,没想到是Ant。虽然笔者对它不熟,但还是下载并安装了它的环境,并按照大部分博客中说的那样来试图导入zookeeper。1、在gitHub上下载zookeeper源码

2、下载并安装ant环境

3、定位到zookeeper源码根目录,并执行ant eclipse

4、导入并运行

博客中这样写的人都自己试过么?导入之后的各种报错,找不到包,Build都过不去,你们是怎么运行起来的。。

后来,又看到说是因为IDEA不认识ant,需要手动添加Sources目录。搞了半天,还是跑不起,搞得我火大。。

点根烟,站起来走两步,也许事情还有转机。
JAR包


既然Ant搞不成,那就抛弃它,转变下思路,不能被它牵着鼻子走。

1、下载zookeeper带有源码的Jar包,并解压。比如zookeeper-3.4.13-sources.jar,下载地址在:https://repo1.maven.org/maven2/org/apache/zookeeper/zookeeper/。

2、在IDEA中新建Maven项目。

3、复制gitHub上zookeeper项目里pom文件的依赖内容,放到新建项目中。

这样就完成了zookeeper源码的导入,比Ant编译手动添加Sources之类的,方便很多

[图片上传失败...(image-14085f-1629099527233)]

不过,有两点可能需要注意。

1、在pom文件中,修改netty的版本为3.x。
<dependency>    <groupId>io.netty</groupId>    <artifactId>netty</artifactId>    <version>3.10.5.Final</version></dependency></pre>
2、如果Completer类报错,就重新导下包路径。
启动服务


找到ZooKeeperServerMain类,运行Main方法即可启动服务器。

不过,在此之前,我们先要将zoo.cfg文件拷贝下来。内容如下:
tickTime=2000initLimit=10syncLimit=5dataDir=E:\\zookeeper-dataclientPort=2181
并在main方法中,指定配置文件的路径。
args = new String[1];args[0] = "E:\\zookeeper-data\\conf\\zoo.cfg";</pre>测试


启动之后,会有日志显示绑定端口。然后我们有两种方式来测试。

1、zkCli

用zkCli工具来连接。

执行:./zkCli.sh -server 192.168.100.139:2181

然后服务器输出日志如下:
NIO服务器启动,绑定端口:0.0.0.0/0.0.0.0:2181
接收来自客户端的连接:/192.168.100.139:62241
Client attempting to establish new session at /192.168.100.139:62241
Creating new log file: log.3
Established session 0x100151428150000 with negotiated timeout 30000
for client /192.168.100.139:62241</pre>
执行创建节点、查询、删除等操作,结果无误。

2、zookeeper

如果没有zkCli工具,我们也可以写测试程序来搞。

比如:
CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 1000, new Watcher() {    public void process(WatchedEvent event) {        countDownLatch.countDown();    }});countDownLatch.await();zooKeeper.create("/test","".getBytes(),                 ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-5-10 16:15 , Processed in 0.093878 second(s), 26 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表