watcher流程

三个过程:
client 注册 Watcher
server 处理 Watcher
client 回调 Watcher

client 注册 Watcher

zk client 可以通过 new ZooKeeper(),getData(),getChildren(), exist() 传入 watcher对象 来 注册 Watcher.

比如对于 getData()时注册watcher的操作.实际上 client 就是把 watcher 对象 存到 DataWatchRegistration 里,再创建 Packet,存入 outgoingQueue,等待 SendThread 线程取出来发给 server.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 1.创建ZooKeeper对象时注册Watcher
- new ZooKeeper()传入Watcher,会回调Watcher.process()方法
- ZooKeeper中会创建ZKWatchManager
- watchManager.defaultWatcher = watcher;初始化默认Watcher
- 解析connectString创建hostProvider
- 创建客户端连接管理类ClientCnxn
- 启动ClientCnxn线程
- ClientCnxn构造方法中创建sendThread和eventThread
EventThread负责处理Server返回的WatchedEvent,回调注册的客户端事件接口处理函数
SendThread为outgoing(传出)请求队列提供服务并生成心跳.它还会产生ReadThread

// 2.setData()
- zk.setData(path, data, version);
- 创建RequestHeader,调用cnxn.submitRequest(h, request, response, null);
- clientCnxn.submitRequest()中:将Request等信息封装成packet,放入outgoingQueue队列中

// sendThread逻辑
- ClientCnxn.SendThread中
- run()逻辑
- 建立与server的连接
- 定时发送ping
- 委托给 clientCnxnSocket.doTransport()进行底层的nio传输
- ClientCnxnSocketNIO.doIO()中
从outgoingQueue取出packet
- p.createBB();// 序列化
- sock.write(p.bb);// 发送
- 从outgoingQueue队列中移除该发送的包
- pendingQueue.add(p);// 将packet加入到pendingQueue队列

// 3.getData()时注册watcher
- zk.getData(zooDataPath, watcher, stat);getData()时注册watcher
- wcb = new DataWatchRegistration(watcher, clientPath); // 注册 watcher到DataWatchRegistration中
- packet = new Packet(h, r, request, response, watchRegistration);通过watchRegistration创建packet,存入outgoingQueue,等待发送

server 处理 Watcher

比如对于 getData()时注册watcher的处理:其实 server端就是在 FinalRequestProcessor 中将 Watcher(其实是NIOServerCnxn对象) 添加到 WatchManager 对象的 watchTable 和 watch2Paths 属性中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
- 2.Server处理Watcher
- 处理客户端发来的zk.getData()注册watcher的请求
- NIOServerCnxnFactory.run()有读写事件
- NIOServerCnxn.doIO()
- NIOServerCnxn.readPayload();// 读取内容
- NIOServerCnxn.readRequest()
- ZooKeeperServer.processPacket()
- ZooKeeperServer.submitRequest(si);// 提交请求
- touch(si.cnxn);// 判断session是否存在或者已经超时
- firstProcessor.processRequest(si);// 处理请求
- Processor链处理...
- FinalRequestProcessor.processRequest()中
- case OpCode.getData:// 对于获取数据请求,若有watch会注册
- dataWatches.addWatch(path, watcher);// 若有watcher的话则注册watcher,此时watcher为NIOServerCnxn.将Watcher添加到WatchManager对象的watchTable和watch2Paths属性中

当server端收到了client端的setData()请求,由于引起了数据变更,会触发 getData()时注册watcher. 其实就是从上面 getData() 时存入的 watchTable 里查出来 watchers 并移除(只触发一次),调用 watcher.process()方法,也就是 NIOServerCnxn.process() 发送给 客户端 Watcher事件.

1
2
3
4
5
6
7
8
9
- 3.Server触发Watcher
- 处理client发来的zk.setData()请求
- FinalRequestProcessor.processRequest()中
- rc = zks.processTxn(hdr, txn);// 处理事务,应用到 dataTree上
- DataTree.setData()中,有watcher时会触发watcher
- dataWatches.triggerWatch(path, EventType.NodeDataChanged); // setData()时会导致数据变更,若有watcher会触发watcher
- watchers = watchTable.remove(path); // 获取该path对应的watchers,watchers只触发一次就移除
- w.process(e);// 进行watcher事件处理,传入WatchedEvent,此时的w是NioServerCnxn对象
- NIOServerCnxn.process()中,创建响应头,xid=-1代表watcher事件.将WatchedEvent对象转换为WatcherEvent,用于网络传输,响应客户端

client 回调 Watcher

client端 通过 SendThread.readResponse() 接收服务端响应.

根据 replyHdr.xid 判断是 Watcher事件.

将 来自服务端的响应,反序列成 WatcherEvent 对象,WatcherEvent 的信息里只包含 type,state,path 信息.
client端收到的server端的watcher信息

然后再将 WatcherEvent 对象 转成 WatchedEvent 对象.
根据watcher信息创建的WatchedEvent对象

从 ZKWatchManager.dataWatches 中 取出 path 对应的 watcher 并移除(只触发一次),连同 WatchedEvent 创建WatcherSetEventPair 对象.入 waitingEvents 队列,交给 eventThread 线程处理.

eventThread线程 从 waitingEvents 队列里取出对应的 watcher,调用 watcher.process() 处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
- 4.Client回调Watcher
- SendThread.readResponse()接收服务端响应
- 根据replyHdr.getXid() == -1 判断是Watcher通知,反序列化出WatcherEvent对象
- eventThread.queueEvent(we);// 将Watcher事件 入事件队列,交给EventThread处理
- watcher.materialize()根据watcher的事件类型进行不同的处理
- 比如NodeDataChanged和NodeCreated事件
- 1.从 ZKWatchManager.dataWatches 中移除 clientPath 对应的 watchers(相当于只能触发一次),并将这些移除的 watchers 添加到 result 中返回
- 通过watcher和event创建WatcherSetEventPair
- waitingEvents.add(pair);// 将WatcherSetEventPair对象入waitingEvents这个LinkedBlockingQueue中(保证watcher顺序)
- EventThread.run()中
- 从waitingEvents中取出event,调processEvent(event)
- processEvent(event)中
- event中包含实际的watcher对象,直接回调watcher.process()即可

相关类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Watcher
WatchedEvent

ZooKeeper
ZKWatchManager

ClientWatchManager

WatchRegistration
ChildWatchRegistration
DataWatchRegistration
ExistsWatchRegistration

ClientCnxn
Packet

Watcher接口

Watcher 接口中包含:
1.Event 接口,用于定义事件所代表的状态
Event 接口中包含 KeeperState(事件发生时zk的状态) 和 EventType(事件类型)两个枚举类
2.process() 抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package org.apache.zookeeper;
public interface Watcher {
/**
* 表示事件
*/
@InterfaceAudience.Public
public interface Event {

/**
* 事件发生时Zookeeper的状态
*/
@InterfaceAudience.Public
public enum KeeperState {
@Deprecated
Unknown(-1),// 未知状态.不再使用,服务器不会产生此状态

Disconnected(0),// 断开

@Deprecated
NoSyncConnected(1),// 未同步连接.不再使用,服务器不会产生此状态

SyncConnected(3),// 同步连接状态

AuthFailed(4),// 认证失败状态

ConnectedReadOnly(5),// 只读连接状态

SaslAuthenticated(6),// SASL认证通过状态

Expired(-112);// 过期状态
// ...
}

/**
* 事件类型
*/
public enum EventType {
None(-1),// 无
NodeCreated(1),// 节点创建
NodeDeleted(2),// 节点删除
NodeDataChanged(3),// 节点数据变化
NodeChildrenChanged(4);// 节点的子节点变化
}
}

// process()方法
abstract public void process(WatchedEvent event);
}

回调方法 process()

zk server 向 client 发送一个 watcher 通知时,会回调对应的 process() 方法.

1
abstract public void process(WatchedEvent event);

WatchedEvent

process()方法的参数 WatchedEvent,zk server会将watcher事件通过 WatchedEvent 对象 传递给 client,WatchedEvent 包含了事件的一些属性:

1
2
3
4
WatchedEvent属性
-keeperState: KeeperState // 通知状态
-eventType: EventType // 事件类型
-path: String // 对应的path

WatcherEvent

WatcherEvent 用于网络传输.

zk server 在生成 WatchedEvent 事件后,通过 getwrapper() 将 WatchedEvent 包装成 可序列化的 WatcherEvent事件,用于网络传输.

zk client 接收到 server 的 WatcherEvent 对象后,会将 WatcherEvent 反序列化成 WatchedEvent 事件,传给 process() 方法.

ServerCnxn

ServerCnxn 实现了 Watcher 接口

WatchManager

WatchManager 是 ZooKeeper 服务端 Watcher 的管理者,包含 watchTable 和 watch2Paths.