1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| - QuorumPeerMain.main() - 1.创建QuorumPeerMain对象 - 2.初始化 运行 main.initializeAndRun(args) - 0.创建QuorumPeerConfig对象 - 1.通过QuorumPeerConfig.parse(),解析配置文件 - 读取zoo.cfg配置到Properties中 - parseProperties()解析Properties配置到QuorumPeerConfig对象中 - 2.启动定时清理任务,DatadirCleanupManager.start()清除过期的txnLog和snapshot文件 - DatadirCleanupManager.start() - 创建PurgeTask定时清理任务. // 参见PurgeTask线程逻辑 - 3.选择以集群或单机模式 来启动zk. runFromConfig(config) - 1.创建server连接工厂 ServerCnxnFactory.createFactory() - 2.根据QuorumPeerConfig配置,构建QuorumPeer对象,用于管理选举 - 3.启动quorumPeer线程. quorumPeer.start() // 参见quorumPeer线程逻辑 - quorumPeer.start(): - 1.启动时先从磁盘->内存数据库中恢复数据 loadDataBase() - quorumPeer.loadDataBase(): - 启动时先从磁盘->内存数据库中恢复数据 zkDb.loadDataBase(); - ZKDatabase.loadDataBase() - snapLog.restore()恢复快照 - FileTxnSnapLog.restore(): 从最后一个有效快照 反序列化 数据树,并返回反序列化的最后一个zxid snapLog.deserialize(dt, sessions) - FileSnap.deserialize()中 查找100个合法的snapshot文件.遍历读取指定的snapshot文件,反序列化 快照文件. 从文件名中解析出zxid - 读取事务日志 fastForwardFromEdits() - 遍历事务日志进行处理 processTransaction(); - 2.启动cnxnFactory(具体实现类如NIOServerCnxnFactory或NettyServerCnxnFactory等), 启动一个server,用来接收来自client的请求,绑定在配置文件中的clientPort端口 cnxnFactory.start() - 3.启动leader选举过程.startLeaderElection() - server刚启动的时候,server的状态初始化为LOOKING状态. - 1.创建Vote对象(myid, zxid, epoch) - 2.创建选举算法对象 createElectionAlgorithm() - createElectionAlgorithm() - 1.初始化QuorumCnxManager.管理选举中和其他server的交互,选举时监听在专门的electionAddr(端口)上createCnxnManager() - 2.启动Listener线程.绑定地址开启ServerSocket服务,用来侦听其他server连接.进行集群间选举,投票,数据同步.listener.start()// 参见QuorumCnxManager.Listener线程逻辑 - 3.创建快速选举算法new FastLeaderElection() - new FastLeaderElection(this, qcm)构造方法 1.starter(self, manager) - 1.创建选票发送队列sendqueue - 2.创建选票接收队列recvqueue - 3.new Messenger(manager) - 创建消息处理对象Messenger.持有 WorkerReceiver 和 WorkerSender 两个线程子类型对象组成, 用于操作上一步创建的sendqueue和recvqueue中的数据. 1.this.ws = new WorkerSender(manager)该线程 将要发送的消息出列 并且 入manager的队列 2.this.wr = new WorkerReceiver(manager) - 该类继承自ZooKeeperThread. 启动QuorumPeer线程 super.start(); // 参见QuorumPeer线程逻辑 - 4.quorumPeer.join() 当前main线程 等待上一行的 quorumPeer线程逻辑执行完毕
|