leader选举相关线程

QuorumCnxManager : 管理者,包含发送 和 接收 队列.持有 listener.

QuorumCnxManager.Listener : 监听 并 建立 server 之间的连接.

FastLeaderElection.Messenger.SendWorker
消息生产者.循环 从发送队列 queueSendMap 中取消息(出队)并发送出去.并将该消息存入 lastMessageSent 中.

FastLeaderElection.Messenger.RecvWorker
消息消费者.循环 接收线程从底层 Socket 收到报文后放到 recvQueue 队列中,等待 Messenger 调用 pollRecvQueue 方法获取消息.

leader选举流程

启动时选举
选举算法的创建之前先创建 QuorumCnxManager,它通过 TCP 协议来进行 leader 选举.
每一对server之间都会保持一个TCP链接.
zookeeper服务之间都是配置myid大的作为客户端连接,myid小的作为服务器端.
创建server端的发送线程和接收线程.

投票默认先投自己,投票信息包括(sid,zxid,echo),将投票信息入待发送队列,等待sendWorker线程将投票发送给所有其他server.

下面一直循环操作,直到选出Leader为止.
从接收队列中取出接收到的投票,校验投票信息中的sid和所投票的leader.

将接收到的投票 和 自己的投票 pk.
比较优先级 epoch > zxid > sid.
判断投票消息里的 epoch 周期是不是比当前的大,如果大则消息中id对应的服务器就是leader.
若epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader.
若前面两个都相等那就比较服务器id,若大,则其就是leader.

更新自己的投票,再向集群中所有机器发出去.

每次投票后,当前服务器都会统计本轮接收到的所有投票(recvset中投票一致数),若有过半机器接收到了相同的投票信息,则认为选出了leader.

变更状态,following 或 leading.

leader挂了选举
其余非Observer服务器都会将自己状态变更为 LOOKING,进入leader选举流程.

QuorumPeer 线程逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
- QuorumPeer线程run()逻辑 用来进行选举,以及选举后进入各角色,角色被打破重新再进行选举
- 无限循环判断状态 while (running)
- 若是LOOKING状态 选举模式,启动FastLeaderElection. 进行选举,并设置投票. 选举策略,默认为FastLeaderElection. setCurrentVote(makeLEStrategy().lookForLeader()); 结束选举后,确定各个server的状态,下一轮循环时,则会走下面的各个角色的启动逻辑
- FastLeaderElection.lookForLeader()中 开始新一轮leader选举。每当我们的QuorumPeer将其状态更改为LOOKING时, 都会调用此方法,并向所有其他server发送通知。
- HashMap<Long, Vote> recvset recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票 HashMap<Long, Vote> outofelection 更新逻辑时钟.每进行一轮选举,都需要更新逻辑时钟 logicalclock.incrementAndGet(); 更新选票(选自己,sid, zxid, epoch) updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); 向其他服务器发送自己的选票 sendNotifications();
- loop:如果是竞选状态则一直循环(本服务器状态为LOOKING并且还未选出leader), 直到选举出结果
- 若未接收到其他服务器发送的选票 if(n == null) {}
- if(manager.haveDelivered()){// manager已经发送了所有选票消息(发送队列已经为空) sendNotifications();// 向所有其他服务器发送消息 }
- FastLeaderElection.sendNotifications()中 遍历所有sid(投票参与者集合),分别创建 投票信息. 将创建的所有投票信息存入投票发送队列,用于保存待发送的选票. (一个投票信息 会创建sid个消息,用于发送给所有的server) sendqueue.offer(notmsg);
- else {// 还未发送完所有投票消息 manager.connectAll();// 连接其他每个服务器 }
- QuorumCnxManager.connectAll()中 final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; // sid -> buffer queue,消息发送队列,用于保存那些待发送的消息,按照SID进行分组 遍历queueSendMap,分别connectOne(sid);
- QuorumCnxManager.connectOne(sid) 尝试使用其electionAddr与id sid建立与服务器的连接。 本server作为客户端向sid的server发起链接请求. 若两个server间未建立连接(即ConcurrentHashMap<Long, SendWorker> senderWorkerMap查不到), 则建立socket连接(默认同步)
- 校验投票信息中的sid和所投票的leader. 若投票者servers集合中 包含 接收到消息中的服务器id 和 n.leader(投票消息中的leader) else if (validVoter(n.sid) && validVoter(n.leader)) {}
- 判断给当前server发投票信息的sender的状态 若是FOLLOWING状态
- 1.若选票中的推选者(sender)的选举周期 大于 当前的 逻辑时钟(说明是新一轮选举中的选票),则替换选票并发送消息
- 更新(重新设置)自身的逻辑时钟,清空投票信息.
- 若能选出较优的leader [1],更新选票结果.否则使用自己的投票信息
- [1].FastLeaderElection.totalOrderPredicate()中 比较优先级 epoch > zxid > sid 判断消息里的epoch周期是不是比当前的大,如果大则消息中id对应的服务器就是leader. 若epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader. 若前面两个都相等那就比较服务器id,若大,则其就是leader.
- 2.如果选举周期小于自身的逻辑时钟,说明对方在一个比较早的选举周期中,不作处理 else if (n.electionEpoch < logicalclock.get())
- 3.两者时钟相等,调用totalOrderPredicate()判断是否需要更新当前服务器的选票,若更新了选票就广播给其他服务器 else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch);// 更新选票 sendNotifications();// 发送消息 }
- recvset用于记录 当前服务器 在 本轮次 的 Leader选举中收到的 所有 外部投票 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
- 若可以结束选举(判断投票一致数 是否足以结束选举,如大于1/2)
- 验证提议的leader是否有任何变化,默认等待20毫秒.若无变化则选举结束. (看看接收队列里还有没有新的选票).要是有新的 较优 的选票,则不能结束选举.
- 一旦我们没有从接收队列中读取任何新的相关消息,即可结束选举
- 设置自身状态 leader or follower or observer
- 清空接收队列 recvqueue.clear(),返回投票
- 若是OBSERVING状态
- 不参与选举,do nothing
- 若是FOLLOWING状态
- 和下面的LEADING状态逻辑相同
- 若是LEADING状态
- 若推选者的逻辑时钟和当前server相等 if(n.electionEpoch == logicalclock.get())
- 将该服务器和选票信息放入recvset中
- 判断是否完成了leader选举(投票数超过1/2) if(ooePredicate(recvset, outofelection, n)) {}
- 设置本server的状态(Leader或follower或observer)
- 创建并返回最终投票信息
- 清空recvqueue队列的选票
- 若是OBSERVING状态 启动Observer observer.observeLeader();
- 若是FOLLOWING状态 启动Follower. follower.followLeader();
- // 参见follower启动逻辑
- 若是LEADING状态 启动Leader. Leader选举完成之后,Peer确认了自己是Leader的身份,在 QuromPeer的主线程中执行Leader的逻辑 setLeader(makeLeader(logFactory)); // 创建Leader对象,并创建Server绑定在QuorumAddress上,用于和其他Follower之间相互通信 leader.lead();// Leader的真正的逻辑
- // 参见leader启动逻辑

QuorumCnxManager 线程逻辑

1
2
3
4
5
6
7
8
- QuorumCnxManager
- 作用
- 选举相关连接管理器,包含发送 和 接收 队列.持有 listener. 使用TCP通信,负责各台服务器之间的底层Leader选举过程中的网络通信. QuorumCnxManager可以保证每对peer之间只有一个连接, 如果有server发起新的连接,则比较sid,sid大的保留连接,小的放弃连接(只能大的连接小的).
- 成员变量
- sid -> SendWorker.每个SenderWorker消息发送器,都对应一台远程Zookeeper服务器,负责消息的发送,按照SID进行分组. ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
- sid -> buffer queue,消息发送队列,用于保存那些待发送的消息.按照SID进行分组 ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
- 最近发送过的消息,为每个SID保留最近发送过的一个消息 ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
- 所有收到的消息都放到recvQueue中 public final ArrayBlockingQueue<Message> recvQueue;

QuorumCnxManager.Listener 线程逻辑

1
2
3
4
5
6
7
- QuorumCnxManager.Listener 监听 并 建立server之间的连接. 该线程监听端口.绑定地址开启ServerSocket服务, 用来侦听其他server连接,进行集群间选举,投票,数据同步
- run()逻辑
- 开启选举端口.循环 等待接收socket连接请求,处理接收连接.
- receiveConnection(client);// 处理接收连接
- QuorumCnxManager.receiveConnection()中
- QuorumCnxManager.handleConnection(sock, din)中 从socket中读取到sid信息, 1.若对方sid小于自己sid,立马关闭连接.自己做为client向对方请求建立连接. **注意zk服务之间都是配置myid大的作为客户端连接,myid小的作为服务器端* closeSocket(sock); connectOne(sid);// 创建新连接
- 2.若对方sid大于自己sid,则启动工作线程以接收数据, 创建server端的发送线程任务和接收线程任务, 并且启动任务准备发送和接收client端数据. SendWorker sw = new SendWorker(sock, sid);// 发送线程 RecvWorker rw = new RecvWorker(sock, din, sid, sw);// 接收线程 // 启动 发送线程 和 接收线程 sw.start(); rw.start(); // 参见sendWorker和recvWorker逻辑

FastLeaderElection.Messenger.SendWorker 线程逻辑

1
2
3
- FastLeaderElection.Messenger.SendWorker 投票信息发送线程
- run()逻辑
- 循环 从发送队列queueSendMap中取消息(出队)并发送出去.并将该消息存入lastMessageSent中.

FastLeaderElection.Messenger.RecvWorker 线程逻辑

1
2
3
- FastLeaderElection.Messenger.RecvWorker 投票信息接收线程
- run()逻辑
- 循环 接收线程从底层Socket收到报文后放到recvQueue队列中,等待Messenger调用pollRecvQueue方法获取消息