leader选举相关线程
QuorumCnxManager : 管理者,包含发送 和 接收 队列.持有 listener.
QuorumCnxManager.Listener : 监听 并 建立 server 之间的连接.
FastLeaderElection.Messenger.SendWorker
消息生产者.循环 从发送队列 queueSendMap 中取消息(出队)并发送出去.并将该消息存入 lastMessageSent 中.
FastLeaderElection.Messenger.RecvWorker
消息消费者.循环 接收线程从底层 Socket 收到报文后放到 recvQueue 队列中,等待 Messenger 调用 pollRecvQueue 方法获取消息.
leader选举流程
启动时选举
选举算法的创建之前先创建 QuorumCnxManager,它通过 TCP 协议来进行 leader 选举.
每一对server之间都会保持一个TCP链接.
zookeeper服务之间都是配置myid大的作为客户端连接,myid小的作为服务器端
.
创建server端的发送线程和接收线程.
投票默认先投自己,投票信息包括(sid,zxid,echo),将投票信息入待发送队列,等待sendWorker线程将投票发送给所有其他server.
下面一直循环操作,直到选出Leader为止.
从接收队列中取出接收到的投票,校验投票信息中的sid和所投票的leader.
将接收到的投票 和 自己的投票 pk.
比较优先级 epoch > zxid > sid
.
判断投票消息里的 epoch 周期是不是比当前的大,如果大则消息中id对应的服务器就是leader.
若epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader.
若前面两个都相等那就比较服务器id,若大,则其就是leader.
更新自己的投票,再向集群中所有机器发出去.
每次投票后,当前服务器都会统计本轮接收到的所有投票(recvset中投票一致数),若有过半机器接收到了相同的投票信息,则认为选出了leader.
变更状态,following 或 leading.
leader挂了选举
其余非Observer服务器都会将自己状态变更为 LOOKING,进入leader选举流程.
QuorumPeer 线程逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| - QuorumPeer线程run()逻辑 用来进行选举,以及选举后进入各角色,角色被打破重新再进行选举 - 无限循环判断状态 while (running) - 若是LOOKING状态 选举模式,启动FastLeaderElection. 进行选举,并设置投票. 选举策略,默认为FastLeaderElection. setCurrentVote(makeLEStrategy().lookForLeader()); 结束选举后,确定各个server的状态,下一轮循环时,则会走下面的各个角色的启动逻辑 - FastLeaderElection.lookForLeader()中 开始新一轮leader选举。每当我们的QuorumPeer将其状态更改为LOOKING时, 都会调用此方法,并向所有其他server发送通知。 - HashMap<Long, Vote> recvset recvset用于记录当前服务器在本轮次的Leader选举中收到的所有外部投票 HashMap<Long, Vote> outofelection 更新逻辑时钟.每进行一轮选举,都需要更新逻辑时钟 logicalclock.incrementAndGet(); 更新选票(选自己,sid, zxid, epoch) updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); 向其他服务器发送自己的选票 sendNotifications(); - loop:如果是竞选状态则一直循环(本服务器状态为LOOKING并且还未选出leader), 直到选举出结果 - 若未接收到其他服务器发送的选票 if(n == null) {} - if(manager.haveDelivered()){// manager已经发送了所有选票消息(发送队列已经为空) sendNotifications();// 向所有其他服务器发送消息 } - FastLeaderElection.sendNotifications()中 遍历所有sid(投票参与者集合),分别创建 投票信息. 将创建的所有投票信息存入投票发送队列,用于保存待发送的选票. (一个投票信息 会创建sid个消息,用于发送给所有的server) sendqueue.offer(notmsg); - else {// 还未发送完所有投票消息 manager.connectAll();// 连接其他每个服务器 } - QuorumCnxManager.connectAll()中 final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; // sid -> buffer queue,消息发送队列,用于保存那些待发送的消息,按照SID进行分组 遍历queueSendMap,分别connectOne(sid); - QuorumCnxManager.connectOne(sid) 尝试使用其electionAddr与id sid建立与服务器的连接。 本server作为客户端向sid的server发起链接请求. 若两个server间未建立连接(即ConcurrentHashMap<Long, SendWorker> senderWorkerMap查不到), 则建立socket连接(默认同步) - 校验投票信息中的sid和所投票的leader. 若投票者servers集合中 包含 接收到消息中的服务器id 和 n.leader(投票消息中的leader) else if (validVoter(n.sid) && validVoter(n.leader)) {} - 判断给当前server发投票信息的sender的状态 若是FOLLOWING状态 - 1.若选票中的推选者(sender)的选举周期 大于 当前的 逻辑时钟(说明是新一轮选举中的选票),则替换选票并发送消息 - 更新(重新设置)自身的逻辑时钟,清空投票信息. - 若能选出较优的leader [1],更新选票结果.否则使用自己的投票信息 - [1].FastLeaderElection.totalOrderPredicate()中 比较优先级 epoch > zxid > sid 判断消息里的epoch周期是不是比当前的大,如果大则消息中id对应的服务器就是leader. 若epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader. 若前面两个都相等那就比较服务器id,若大,则其就是leader. - 2.如果选举周期小于自身的逻辑时钟,说明对方在一个比较早的选举周期中,不作处理 else if (n.electionEpoch < logicalclock.get()) - 3.两者时钟相等,调用totalOrderPredicate()判断是否需要更新当前服务器的选票,若更新了选票就广播给其他服务器 else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) { updateProposal(n.leader, n.zxid, n.peerEpoch);// 更新选票 sendNotifications();// 发送消息 } - recvset用于记录 当前服务器 在 本轮次 的 Leader选举中收到的 所有 外部投票 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); - 若可以结束选举(判断投票一致数 是否足以结束选举,如大于1/2) - 验证提议的leader是否有任何变化,默认等待20毫秒.若无变化则选举结束. (看看接收队列里还有没有新的选票).要是有新的 较优 的选票,则不能结束选举. - 一旦我们没有从接收队列中读取任何新的相关消息,即可结束选举 - 设置自身状态 leader or follower or observer - 清空接收队列 recvqueue.clear(),返回投票 - 若是OBSERVING状态 - 不参与选举,do nothing - 若是FOLLOWING状态 - 和下面的LEADING状态逻辑相同 - 若是LEADING状态 - 若推选者的逻辑时钟和当前server相等 if(n.electionEpoch == logicalclock.get()) - 将该服务器和选票信息放入recvset中 - 判断是否完成了leader选举(投票数超过1/2) if(ooePredicate(recvset, outofelection, n)) {} - 设置本server的状态(Leader或follower或observer) - 创建并返回最终投票信息 - 清空recvqueue队列的选票 - 若是OBSERVING状态 启动Observer observer.observeLeader(); - 若是FOLLOWING状态 启动Follower. follower.followLeader(); - // 参见follower启动逻辑 - 若是LEADING状态 启动Leader. Leader选举完成之后,Peer确认了自己是Leader的身份,在 QuromPeer的主线程中执行Leader的逻辑 setLeader(makeLeader(logFactory)); // 创建Leader对象,并创建Server绑定在QuorumAddress上,用于和其他Follower之间相互通信 leader.lead();// Leader的真正的逻辑 - // 参见leader启动逻辑
|
QuorumCnxManager 线程逻辑
1 2 3 4 5 6 7 8
| - QuorumCnxManager - 作用 - 选举相关连接管理器,包含发送 和 接收 队列.持有 listener. 使用TCP通信,负责各台服务器之间的底层Leader选举过程中的网络通信. QuorumCnxManager可以保证每对peer之间只有一个连接, 如果有server发起新的连接,则比较sid,sid大的保留连接,小的放弃连接(只能大的连接小的). - 成员变量 - sid -> SendWorker.每个SenderWorker消息发送器,都对应一台远程Zookeeper服务器,负责消息的发送,按照SID进行分组. ConcurrentHashMap<Long, SendWorker> senderWorkerMap; - sid -> buffer queue,消息发送队列,用于保存那些待发送的消息.按照SID进行分组 ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; - 最近发送过的消息,为每个SID保留最近发送过的一个消息 ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; - 所有收到的消息都放到recvQueue中 public final ArrayBlockingQueue<Message> recvQueue;
|
QuorumCnxManager.Listener 线程逻辑
1 2 3 4 5 6 7
| - QuorumCnxManager.Listener 监听 并 建立server之间的连接. 该线程监听端口.绑定地址开启ServerSocket服务, 用来侦听其他server连接,进行集群间选举,投票,数据同步 - run()逻辑 - 开启选举端口.循环 等待接收socket连接请求,处理接收连接. - receiveConnection(client);// 处理接收连接 - QuorumCnxManager.receiveConnection()中 - QuorumCnxManager.handleConnection(sock, din)中 从socket中读取到sid信息, 1.若对方sid小于自己sid,立马关闭连接.自己做为client向对方请求建立连接. **注意zk服务之间都是配置myid大的作为客户端连接,myid小的作为服务器端* closeSocket(sock); connectOne(sid);// 创建新连接 - 2.若对方sid大于自己sid,则启动工作线程以接收数据, 创建server端的发送线程任务和接收线程任务, 并且启动任务准备发送和接收client端数据. SendWorker sw = new SendWorker(sock, sid);// 发送线程 RecvWorker rw = new RecvWorker(sock, din, sid, sw);// 接收线程 // 启动 发送线程 和 接收线程 sw.start(); rw.start(); // 参见sendWorker和recvWorker逻辑
|
FastLeaderElection.Messenger.SendWorker 线程逻辑
1 2 3
| - FastLeaderElection.Messenger.SendWorker 投票信息发送线程 - run()逻辑 - 循环 从发送队列queueSendMap中取消息(出队)并发送出去.并将该消息存入lastMessageSent中.
|
FastLeaderElection.Messenger.RecvWorker 线程逻辑
1 2 3
| - FastLeaderElection.Messenger.RecvWorker 投票信息接收线程 - run()逻辑 - 循环 接收线程从底层Socket收到报文后放到recvQueue队列中,等待Messenger调用pollRecvQueue方法获取消息
|