watcher流程
三个过程:
client 注册 Watcher
server 处理 Watcher
client 回调 Watcher
client 注册 Watcher
zk client 可以通过 new ZooKeeper(),getData(),getChildren(), exist() 传入 watcher对象 来 注册 Watcher.
比如对于 getData()时注册watcher的操作
.实际上 client 就是把 watcher 对象 存到 DataWatchRegistration 里,再创建 Packet,存入 outgoingQueue,等待 SendThread 线程取出来发给 server.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| // 1.创建ZooKeeper对象时注册Watcher - new ZooKeeper()传入Watcher,会回调Watcher.process()方法 - ZooKeeper中会创建ZKWatchManager - watchManager.defaultWatcher = watcher;初始化默认Watcher - 解析connectString创建hostProvider - 创建客户端连接管理类ClientCnxn - 启动ClientCnxn线程 - ClientCnxn构造方法中创建sendThread和eventThread EventThread负责处理Server返回的WatchedEvent,回调注册的客户端事件接口处理函数 SendThread为outgoing(传出)请求队列提供服务并生成心跳.它还会产生ReadThread
// 2.setData() - zk.setData(path, data, version); - 创建RequestHeader,调用cnxn.submitRequest(h, request, response, null); - clientCnxn.submitRequest()中:将Request等信息封装成packet,放入outgoingQueue队列中
// sendThread逻辑 - ClientCnxn.SendThread中 - run()逻辑 - 建立与server的连接 - 定时发送ping - 委托给 clientCnxnSocket.doTransport()进行底层的nio传输 - ClientCnxnSocketNIO.doIO()中 从outgoingQueue取出packet - p.createBB();// 序列化 - sock.write(p.bb);// 发送 - 从outgoingQueue队列中移除该发送的包 - pendingQueue.add(p);// 将packet加入到pendingQueue队列
// 3.getData()时注册watcher - zk.getData(zooDataPath, watcher, stat);getData()时注册watcher - wcb = new DataWatchRegistration(watcher, clientPath); // 注册 watcher到DataWatchRegistration中 - packet = new Packet(h, r, request, response, watchRegistration);通过watchRegistration创建packet,存入outgoingQueue,等待发送
|
server 处理 Watcher
比如对于 getData()时注册watcher的处理:其实 server端就是在 FinalRequestProcessor 中将 Watcher(其实是NIOServerCnxn对象) 添加到 WatchManager 对象的 watchTable 和 watch2Paths 属性中.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| - 2.Server处理Watcher - 处理客户端发来的zk.getData()注册watcher的请求 - NIOServerCnxnFactory.run()有读写事件 - NIOServerCnxn.doIO() - NIOServerCnxn.readPayload();// 读取内容 - NIOServerCnxn.readRequest() - ZooKeeperServer.processPacket() - ZooKeeperServer.submitRequest(si);// 提交请求 - touch(si.cnxn);// 判断session是否存在或者已经超时 - firstProcessor.processRequest(si);// 处理请求 - Processor链处理... - FinalRequestProcessor.processRequest()中 - case OpCode.getData:// 对于获取数据请求,若有watch会注册 - dataWatches.addWatch(path, watcher);// 若有watcher的话则注册watcher,此时watcher为NIOServerCnxn.将Watcher添加到WatchManager对象的watchTable和watch2Paths属性中
|
当server端收到了client端的setData()请求,由于引起了数据变更,会触发 getData()时注册watcher. 其实就是从上面 getData() 时存入的 watchTable 里查出来 watchers 并移除(只触发一次),调用 watcher.process()方法,也就是 NIOServerCnxn.process() 发送给 客户端 Watcher事件.
1 2 3 4 5 6 7 8 9
| - 3.Server触发Watcher - 处理client发来的zk.setData()请求 - FinalRequestProcessor.processRequest()中 - rc = zks.processTxn(hdr, txn);// 处理事务,应用到 dataTree上 - DataTree.setData()中,有watcher时会触发watcher - dataWatches.triggerWatch(path, EventType.NodeDataChanged); // setData()时会导致数据变更,若有watcher会触发watcher - watchers = watchTable.remove(path); // 获取该path对应的watchers,watchers只触发一次就移除 - w.process(e);// 进行watcher事件处理,传入WatchedEvent,此时的w是NioServerCnxn对象 - NIOServerCnxn.process()中,创建响应头,xid=-1代表watcher事件.将WatchedEvent对象转换为WatcherEvent,用于网络传输,响应客户端
|
client 回调 Watcher
client端 通过 SendThread.readResponse() 接收服务端响应.
根据 replyHdr.xid 判断是 Watcher事件.
将 来自服务端的响应,反序列成 WatcherEvent 对象,WatcherEvent 的信息里只包含 type,state,path 信息.
然后再将 WatcherEvent 对象 转成 WatchedEvent 对象.
从 ZKWatchManager.dataWatches 中 取出 path 对应的 watcher 并移除(只触发一次),连同 WatchedEvent 创建WatcherSetEventPair 对象.入 waitingEvents 队列,交给 eventThread 线程处理.
eventThread线程 从 waitingEvents 队列里取出对应的 watcher,调用 watcher.process() 处理.
1 2 3 4 5 6 7 8 9 10 11 12 13
| - 4.Client回调Watcher - SendThread.readResponse()接收服务端响应 - 根据replyHdr.getXid() == -1 判断是Watcher通知,反序列化出WatcherEvent对象 - eventThread.queueEvent(we);// 将Watcher事件 入事件队列,交给EventThread处理 - watcher.materialize()根据watcher的事件类型进行不同的处理 - 比如NodeDataChanged和NodeCreated事件 - 1.从 ZKWatchManager.dataWatches 中移除 clientPath 对应的 watchers(相当于只能触发一次),并将这些移除的 watchers 添加到 result 中返回 - 通过watcher和event创建WatcherSetEventPair - waitingEvents.add(pair);// 将WatcherSetEventPair对象入waitingEvents这个LinkedBlockingQueue中(保证watcher顺序) - EventThread.run()中 - 从waitingEvents中取出event,调processEvent(event) - processEvent(event)中 - event中包含实际的watcher对象,直接回调watcher.process()即可
|
相关类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Watcher WatchedEvent
ZooKeeper ZKWatchManager
ClientWatchManager
WatchRegistration ChildWatchRegistration DataWatchRegistration ExistsWatchRegistration
ClientCnxn Packet
|
Watcher接口
Watcher 接口中包含:
1.Event 接口,用于定义事件所代表的状态
Event 接口中包含 KeeperState(事件发生时zk的状态
) 和 EventType(事件类型)两个枚举类
2.process() 抽象方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package org.apache.zookeeper; public interface Watcher {
@InterfaceAudience.Public public interface Event {
@InterfaceAudience.Public public enum KeeperState { @Deprecated Unknown(-1),
Disconnected(0),
@Deprecated NoSyncConnected(1),
SyncConnected(3),
AuthFailed(4),
ConnectedReadOnly(5),
SaslAuthenticated(6),
Expired(-112); }
public enum EventType { None(-1), NodeCreated(1), NodeDeleted(2), NodeDataChanged(3), NodeChildrenChanged(4); } }
abstract public void process(WatchedEvent event); }
|
回调方法 process()
zk server 向 client 发送一个 watcher 通知时,会回调对应的 process() 方法.
1
| abstract public void process(WatchedEvent event);
|
WatchedEvent
process()方法的参数 WatchedEvent,zk server会将watcher事件通过 WatchedEvent 对象 传递给 client,WatchedEvent 包含了事件的一些属性:
1 2 3 4
| WatchedEvent属性 -keeperState: KeeperState -eventType: EventType -path: String
|
WatcherEvent
WatcherEvent 用于网络传输.
zk server 在生成 WatchedEvent 事件后,通过 getwrapper() 将 WatchedEvent 包装成 可序列化的 WatcherEvent事件,用于网络传输.
zk client 接收到 server 的 WatcherEvent 对象后,会将 WatcherEvent 反序列化成 WatchedEvent 事件,传给 process() 方法.
ServerCnxn
ServerCnxn 实现了 Watcher 接口
WatchManager
WatchManager 是 ZooKeeper 服务端 Watcher 的管理者,包含 watchTable 和 watch2Paths.