processors

  • PrepRequestProcessor
    通常是请求处理链的第一个处理器.
    识别客户端请求是否是事务请求.对于事务请求,做预处理,诸如 创建请求事务头,事务体,会话检查,ACL检查 和 版本检查 等.

  • ProposalRequestProcessor
    leader 服务器的事务投票处理器,也是 leader 服务器事务处理流程的发起者.
    对于 非事务请求,它会直接将请求流转到 CommitProcessor 处理器,不再做其他处理.
    对于 事务请求,除了将请求交给CommitProcessor处理器外,还会根据请求类型创建对应的 Proposal 提议,并 发给所有 Follower 来发起一次集群内的事务投票.它还会将事务请求交给 SyncRequestProcessor 进行事务日志的记录.

  • SyncRequestProcessor
    事务日志记录处理器,将事务请求记录到事务日志文件中,同时还会触发zk进行数据快照.
    发送Sync请求的处理器.将请求记录到磁盘.它批量处理有效执行io的请求.在将日志同步到磁盘之前,请求不会传递到下一个 Processor.
    维护了一个处理请求的队列,其用于存放请求;
    维护了一个处理快照的线程,用于处理快照;
    维护了一个等待被刷新到磁盘的请求队列.
    将事务性请求刷新到磁盘,并且对请求进行快照处理.

  • AckRequestProcessor
    leader 独有的处理器,它负责在 SyncRequestProcessor 处理器完成事务日志记录后,向 Proposal 的投票收集器发送 ACK 反馈,以通知投票收集器当前服务器已经完成了对该 Proposal 的事务日志记录.
    将前一阶段的请求作为 ACK 转发给 Leader.

  • CommitProcessor
    事务提交处理器.
    对于非事务请求,该处理器直接将其交给下一个处理器进行处理.
    对于事务请求,它会等待集群内 针对 Proposal 的投票,直到该 Proposal 可被提交.

  • ToBeAppliedRequestProcessor
    维护 toBeApplied 队列,专门存储那些已经被 CommitProcessor 处理过的可被提交的 Proposal.
    它将这些请求逐个交付给 FinalRequestProcessor 处理器进行处理,等待 FinalRequestProcessor 处理器处理完后,再将其从toBeApplied 队列中移除.
    下个处理器必须是 FinalRequestProcessor 并且 FinalRequestProcessor 必须同步处理请求.

  • FinalRequestProcessor
    通常是请求处理链的最后一个处理器.
    创建客户端请求的响应;针对事务请求,它还会负责将事务应用到内存数据库中.

  • FollowerRequestProcessor
    它是 follower 的第一个请求处理器.用于识别当前请求是否是事务请求.
    若是事务请求,转发给 leader 服务器.leader 在收到这个事务请求后,就会将其提交到请求处理器链,按照正常事务请求进行处理.

  • ObserverRequestProcessor
    同 FollowerRequestProcessor 一样,将事务请求转发给 Leader.

  • SendAckRequestProcessor
    follower 独有,发送 ACK 请求的处理器.
    在 follower 完成事务日志记录后,会向 leader 服务器发送 ACK 消息表面自身完成了事务日志的记录工作.
    它和 leader 的 AckRequestProcessor 的区别:
    AckRequestProcessor 处理器和 leader 服务器在同一个服务器上,它的 ACK 反馈仅仅是一个本地操作.
    SendAckRequestProcessor 处理器在 follower 服务器上,需要通过 ACK 消息向 leader 服务器进行反馈.

processor链对事务请求的处理

PrepRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
- PrepRequestProcessor 
- 作用
- 通常是请求处理链的第一个处理器。
- 识别客户端请求是否是事务请求. 对于事务请求,做预处理,诸如 创建请求事务头,事务体,会话检查,ACL检查 和 版本检查 等.
- run()逻辑
- pRequest(request);中 判断请求类型. 对于事务请求,不同类型请求对应不同操作. 比如create请求
- CreateRequest createRequest = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
- pRequest2Txn()中 new TxnHeader() 创建请求事务头,后续的processors都是基于该请求头来识别当前请求是否是事务请求
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner());// session检查
- 请求数据反序列化到CreateRequest对象中
- 添加到zks.outstandingChanges中 // TODO finalRequestProcessor中会用到 addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL));
- nextProcessor.processRequest(request); 交由下个processor处理
- ProposalRequestProcessor.processRequest() // 参见ProposalRequestProcessor.processRequest()逻辑

ProposalRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
- ProposalRequestProcessor
- 作用
- leader服务器的`事务投票处理器`,也是leader服务器事务处理流程的发起者.
- 对于 非事务请求,它会直接将请求流转到CommitProcessor处理器,不再做其他处理.
- 对于 事务请求,除了将请求交给CommitProcessor处理器外,还会根据请求类型创建对应的Proposal提议,并发给所有Follower来发起一次集群内的事务投票.它还会将事务请求交给SyncRequestProcessor进行事务日志的记录.

- ProposalRequestProcessor.processRequest()中
- nextProcessor.processRequest(request); 1.将请求交给下个处理器进行处理,下个处理器是commitProcessor,走COMMIT流程
- commitProcessor.processRequest()中 queuedRequests.add(request); 将请求 添加到 请求队列
- leader的commitProcessor处理
- 若是事务请求
- zks.getLeader().propose(request); 2.若是事务请求,发proposal包,发起PROPOSAL流程
- leader.propose()中 outstandingProposals.put(lastProposed, p); 将事务请求放入outstandingProposals,它用于保存 zxid 和 proposal的映射
- 发送Leader.PROPOSAL包 sendPacket(pp);
- leader.sendPacket()中 遍历所有LearnerHandler,依次调用,将proposal包存入待发送队列 f.queuePacket(qp);
- learnerHandler.queuePacket()中 queuedPackets.add(p) 将包存入队列
- learnerHandler线程
- learnerHandler.run()里 512行 新启一个线程 发送queuedPackets队列中的packets
- sendPackets();// 发送queuedPackets队列中的packets if (p.getType() == Leader.PROPOSAL) { syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime()); }
- oa.writeRecord(p, "packet"); 发送packet
- Follower.processPacket()接收该Proposal包
- Follower.processPacket()中
- 反序列化请求数据
- fzk.logRequest(hdr, txn);
- FollowerZookeeperServer.logRequest()中 pendingTxns.add(request);请求入pendingTxns队列 // TODO 这个入队后面干嘛了
- syncProcessor.processRequest(request);
- SyncRequestProcessor.processRequest()中 queuedRequests.add(request); 请求入队列
- follower将poposal中的请求持久化 // 后续见SyncRequestProcessor.run()逻辑

syncProcessor

1
2
3
- syncProcessor.processRequest(request); 3.若是事务请求,交给 syncProcessor处理器记录事务日志,走SYNC流程
- 这里是leader服务器进行事务持久化操作, follower服务器会通过上面2中的proposal流程最终触发follower自己的事务持久化操作 即事务持久化在所有follower和leader上都会进行
- 事务持久化完成后会发ack给leader

SyncRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- SyncRequestProcessor
- 作用
- 事务日志记录处理器, 将事务请求记录到事务日志文件中,同时还会触发zk进行数据快照.
- 发送Sync请求的处理器。将请求记录到磁盘。它批量处理有效执行io的请求。在将日志同步到磁盘之前,请求不会传递到下一个RequestProcessor.
- 成员变量
- 维护了一个处理请求的队列queuedRequests,其用于存放请求
- 维护了一个处理快照的线程 snapInProcess,用于处理快照
- 维护了一个等待被刷新到磁盘的请求队列 toFlush

- SyncRequestProcessor.processRequest()逻辑
- queuedRequests.add(request); 事务请求入队列 private final LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();// 请求队列

- SyncRequestProcessor.run()逻辑
- 初始化事务日志记录计数logCount,用于判断记录 n次事务日志后 是否需要进行 snapshot
- 初始化设置滚动日志的随机数randRoll,该随机数用于防止所有servers在同一时刻进行snapshot操作
- while循环
- si = queuedRequests.poll(); 从队列中取出请求.
- 若请求队列中的请求取完了,刷盘
- flush(toFlush);
- SyncRequestProcessor.flush()中 zks.getZKDatabase().commit();刷盘
- 刷盘结束后发ack,循环取出所有toFlush的nextProcessor. 并调用nextProcessor.processRequest()方法
- 1.leader调AckRequestProcessor,省去通信了 AckRequestProcessor.processRequest()中 leader.processAck(self.getId(), request.zxid, null);// ack给leader
- // 参见AckRequestProcessor.processRequest()逻辑
- 2.follower调SendAckRequestProcessor
- // 参见SendAckRequestProcessor逻辑
- 若取出来的请求不为空
- (zks.getZKDatabase().append(si)) 将请求添加至事务日志文件
- ZKDatabase.append()中 return this.snapLog.append(si);
- FileTxnSnapLog.append()中 return txnLog.append(si.hdr, si.txn);
- FileTxnLog.append()中 logStream.flush(); // 刷新到磁盘 返回true
- 如果事务日志记录次数达到了snapCount / 2 + randRoll次,新起线程进行snapshot操作,并重置logCount计数
- toFlush.add(si); 将请求添加至 toFlush队列
- toFlush队列大小 大于1000,直接刷新到磁盘

AckRequestProcessor

1
2
3
4
5
6
- AckRequestProcessor
- 作用
- leader独有的处理器,它负责在SyncRequestProcessor处理器完成事务日志记录后,向Proposal的投票收集器发送ACK反馈,以通知投票收集器当前服务器已经完成了对该Proposal的事务日志记录.

- AckRequestProcessor.processRequest()逻辑
- 1.leader调AckRequestProcessor,省去通信了 AckRequestProcessor.processRequest()中 leader.processAck(self.getId(), request.zxid, null);// ack给leader

SendAckRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
- SendAckRequestProcessor
- 作用
- follower使用,用于follower将proposal中的request持久化后发ack给leader

- SendAckRequestProcessor.processRequest()逻辑
- SendAckRequestProcessor.processRequest()中 创建Leader.ACK包并发送 learner.writePacket(qp, false);
- LearnerHandler线程
- LearnerHandler.run()中 当Follower收到PROPOSAL并写到磁盘后会发送ACK给Leader case Leader.ACK: leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
- leader.processAck()中 处理ack包
- 1.根据zxid取出proposal Proposal p = outstandingProposals.get(zxid);
- p.ackSet.add(sid); 2.保存follower发来的ack的sid到该proposal对象中
- 判断self.getQuorumVerifier().containsQuorum(p.ackSet) 3.判断当接收到的对于 proposal包的ack是否超过一半
- QuorumMaj.containsQuorum()中 验证所给集合大小是否超过半数 return (set.size() > half);
- 4.若此处leader接收到的对于该proposal包的ack数超过了一半
- outstandingProposals.remove(zxid); 1.移除proposal
- toBeApplied.add(p); 2.将proposal添加到toBeApplied队列
- 这个toBeApplied队列中的元素会在Leader.ToBeAppliedRequestProcessor.processRequest()中取出来 // 参见ToBeAppliedRequestProcessor.processRequest()逻辑
- commit(zxid); 3.创建Leader.COMMIT包并发给所有follower
- leader.commit()中 遍历所有LearnerHandler,将所有Leader.COMMIT包加入queuedPackets队列. 用于给followers发.
- ...
- learnerHandler线程里,取出COMMIT包发给follower learnerHandler.sendPackets()里 会取queuedPackets队列中的所有packets并发送
- ...
- follower.processPacket()中,接收到COMMIT包后并处理.COMMIT包里只包含zxid信息,因为其他信息已经在之前的proposal包里包含了. 若是COMMIT包,调fzk.commit(qp.getZxid()); case Leader.COMMIT: fzk.commit(qp.getZxid()); break;
- FollowerZookeeperServer.commit(long zxid)中 取出COMMIT包交由commitProcessor处理
- long firstElementZxid = pendingTxns.element().zxid; 1.从待处理的事务队列pendingTxns中取出来第一个元素,判断它和commit包中的zxid是否一致
- 2.从队列中移除后,交由follower的commitProcessor进行处理 Request request = pendingTxns.remove(); commitProcessor.commit(request);
- // 参见commitProcessor.commit(request);逻辑
- inform(p); 4.创建Leader.INFORM包,发送给所有observers
- 由于observer服务器未参加之前的提议投票, 因此observer服务器尚未保存任何关于该提议的消息, 所有在广播commit消息时,需要区别对待, leader会向其发送INFORM消息,该消息体重包含了当前提议的内容.
- zk.commitProcessor.commit(p.request);// commit
- 调用leader的commitProcessor.commit()方法

CommitProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- CommitProcessor
- 作用
- 事务提交处理器.
- 对于非事务请求,该处理器直接将其交给下一个处理器进行处理.
- 对于事务请求,它会等待集群内 针对Proposal的投票 直到该Proposal可被提交.
- 利用它,每个服务器都能很好的控制对事务请求的顺序处理.
- 其实就是对于事务请求,commitProcessor会保证它这个事务请求必须经过proposal包和commit包之后,才会将它放入toProcess队列交给下个processor进行处理
- 变量
- queuedRequests
- queuedRequests 中的请求是在proposalRequestProcessor.processRequest()中加入进来的. 这些请求当时还没有进行proposal和commit包的处理
- committedRequests
- committedRequests中的请求时在执行完proposal和commit包后加入进来的
- toProcess
- queuedRequests队列中 所有非事务请求添加到 toProcess队列,用于交给下个processor处理
- nextPending
- queuedRequests队列中 下一个等待处理的事务请求

- commitProcessor.processRequest()逻辑
- 请求入queuedRequests队列. leader 和 所有 follower 在 ProposalRequestProcessor 中 都会调用该方法. queuedRequests 中的请求是在 proposalRequestProcessor.processRequest()中加入进来的.这些请求当时还没有进行 proposal 和 commit包的处理 queuedRequests.add(request);// 将请求 添加到 请求队列 notifyAll();

- commitProcessor.commit(request)逻辑
- 当leader完成了proposal过半,给followers发commit包之后,自己也会走commit逻辑. follower收到commit包也会走自己的commit逻辑.
- 完成proposal和commit包的请求入committedRequests队列 committedRequests.add(request); notifyAll();

- commitProcessor.run()逻辑
- 1.依次调用toProcess队列中每个request的nextProcessor,并清空队列
- // 参见Leader.ToBeAppliedRequestProcessor.processRequest() nextProcessor.processRequest(toProcess.get(i));
- 2.queuedRequests队列已经取完了,或nextPending已经设置值了 (其实就是在queuedRequest队列中已经找到了一个需要处理的事务请求)
- 此时若committedRequests为空(即还没收到要commit的请求), 线程wait(),跳出此次循环
- 2.2 若有commit请求
- 取出commit请求 Request r = committedRequests.remove(); 并和nextPending比较,二者应该一致. 赋值事务头,加入toProcess队列 toProcess.add(nextPending);
- 3.只要nextPending有值,就不走4的逻辑
- 4.遍历queuedRequests队列,从里面取出第一个事务请求给nextPending赋值

ToBeAppliedRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
- ToBeAppliedRequestProcessor
- 作用
- 维护toBeApplied队列,专门存储那些已经被CommitProcessor处理过的可被提交的Proposal
- 它将这些请求逐个交付给FinalRequestProcessor处理器进行处理,等待FinalRequestProcessor处理器处理完后,再将其从toBeApplied队列中移除.
- 下个处理器必须是FinalRequestProcessor并且FinalRequestProcessor必须同步处理请求。
- 变量
- toBeApplied队列
- 元素放入:leader接收到的对于该proposal包的ack数超过了一半时便会放入
- 元素取出:ToBeAppliedRequestProcessor.processRequest()取出交给finalRequestProcessor进行处理

- Leader.ToBeAppliedRequestProcessor.processRequest()
- 取出请求,调next,并移除 next.processRequest(request); toBeApplied.remove();
- // 参见FinalRequestProcessor.processRequest()逻辑

FinalRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
- FinalRequestProcessor
- 作用
- 通常是请求处理链的最后一个处理器。
- 创建客户端请求的响应;针对事务请求,它还会负责将事务应用到内存数据库中.

- FinalRequestProcessor.processRequest()
- 1.遍历zks.outstandingChanges(提交的事务请求),zxid比request.zxid小的请求都给干掉.
- 2.若是事务请求应用到内存数据库 if (request.hdr != null) rc = zks.processTxn(hdr, txn);
- rc = getZKDatabase().processTxn(hdr, txn);
- 事务写到内存 return dataTree.processTxn(hdr, txn);
- zks.getZKDatabase().addCommittedProposal(request); 3.事务请求保存到 committedLog
- 4.根据请求code,创建response响应客户端 rsp = new CreateResponse(rc.path); cnxn.sendResponse(hdr, rsp, "response");
- // 事务请求结束

FollowerRequestProcessor

1
2
3
- FollowerRequestProcessor
- 它是follower的第一个请求处理器.用于识别当前请求是否是事务请求.
- 若是事务请求,转发给leader服务器.leader在收到这个事务请求后,就会将其提交到请求处理器链,按照正常事务请求进行处理.