摘要:只有數(shù)據(jù)同步完成之后集群才具備對(duì)外提供服務(wù)的能力。當(dāng)節(jié)點(diǎn)在選舉后角色確認(rèn)為后將會(huì)進(jìn)入狀態(tài),源碼如下在節(jié)點(diǎn)狀態(tài)變更為之后會(huì)創(chuàng)建實(shí)例,并觸發(fā)過程。
在上一篇對(duì) zookeeper 選舉實(shí)現(xiàn)分析之后,我們知道 zookeeper 集群在選舉結(jié)束之后,leader 節(jié)點(diǎn)將進(jìn)入 LEADING 狀態(tài),follower 節(jié)點(diǎn)將進(jìn)入 FOLLOWING 狀態(tài);此時(shí)集群中節(jié)點(diǎn)將進(jìn)行數(shù)據(jù)同步操作,以保證數(shù)據(jù)一致。 只有數(shù)據(jù)同步完成之后 zookeeper 集群才具備對(duì)外提供服務(wù)的能力。
LEADING當(dāng)節(jié)點(diǎn)在選舉后角色確認(rèn)為 leader 后將會(huì)進(jìn)入 LEADING 狀態(tài),源碼如下:
public void run() { try { /* * Main loop */ while (running) { switch (getPeerState()) { case LEADING: LOG.info("LEADING"); try { setLeader(makeLeader(logFactory)); leader.lead(); setLeader(null); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { if (leader != null) { leader.shutdown("Forcing shutdown"); setLeader(null); } setPeerState(ServerState.LOOKING); } break; } } } finally { } }
QuorumPeer 在節(jié)點(diǎn)狀態(tài)變更為 LEADING 之后會(huì)創(chuàng)建 leader 實(shí)例,并觸發(fā) lead 過程。
void lead() throws IOException, InterruptedException { try { // 省略 /** * 開啟線程用于接收 follower 的連接請(qǐng)求 */ cnxAcceptor = new LearnerCnxAcceptor(); cnxAcceptor.start(); readyToStart = true; /** * 阻塞等待計(jì)算新的 epoch 值,并設(shè)置 zxid */ long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch()); zk.setZxid(ZxidUtils.makeZxid(epoch, 0)); /** * 阻塞等待接收過半的 follower 節(jié)點(diǎn)發(fā)送的 ACKEPOCH 信息; 此時(shí)說明已經(jīng)確定了本輪選舉后 epoch 值 */ waitForEpochAck(self.getId(), leaderStateSummary); self.setCurrentEpoch(epoch); try { /** * 阻塞等待 超過半數(shù)的節(jié)點(diǎn) follower 發(fā)送了 NEWLEADER ACK 信息;此時(shí)說明過半的 follower 節(jié)點(diǎn)已經(jīng)完成數(shù)據(jù)同步 */ waitForNewLeaderAck(self.getId(), zk.getZxid(), LearnerType.PARTICIPANT); } catch (InterruptedException e) { // 省略 } /** * 啟動(dòng) zk server,此時(shí)集群可以對(duì)外正式提供服務(wù) */ startZkServer(); // 省略 }
從 lead 方法的實(shí)現(xiàn)可得知,leader 與 follower 在數(shù)據(jù)同步過程中會(huì)執(zhí)行如下過程:
接收 follower 連接
計(jì)算新的 epoch 值
通知統(tǒng)一 epoch 值
數(shù)據(jù)同步
啟動(dòng) zk server 對(duì)外提供服務(wù)
FOLLOWING下面在看下 follower 節(jié)點(diǎn)進(jìn)入 FOLLOWING 狀態(tài)后的操作:
public void run() { try { /* * Main loop */ while (running) { switch (getPeerState()) { case LOOKING: // 省略 case OBSERVING: // 省略 case FOLLOWING: try { LOG.info("FOLLOWING"); setFollower(makeFollower(logFactory)); follower.followLeader(); } catch (Exception e) { LOG.warn("Unexpected exception",e); } finally { follower.shutdown(); setFollower(null); setPeerState(ServerState.LOOKING); } break; } } } finally { } }
QuorumPeer 在節(jié)點(diǎn)狀態(tài)變更為 FOLLOWING 之后會(huì)創(chuàng)建 follower 實(shí)例,并觸發(fā) followLeader 過程。
void followLeader() throws InterruptedException { // 省略 try { QuorumServer leaderServer = findLeader(); try { /** * follower 與 leader 建立連接 */ connectToLeader(leaderServer.addr, leaderServer.hostname); /** * follower 向 leader 提交節(jié)點(diǎn)信息用于計(jì)算新的 epoch 值 */ long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO); /** * follower 與 leader 數(shù)據(jù)同步 */ syncWithLeader(newEpochZxid); // 省略 } catch (Exception e) { // 省略 } } finally { // 省略 } }
從 followLeader 方法的實(shí)現(xiàn)可得知,follower 與 leader 在數(shù)據(jù)同步過程中會(huì)執(zhí)行如下過程:
請(qǐng)求連接 leader
提交節(jié)點(diǎn)信息計(jì)算新的 epoch 值
數(shù)據(jù)同步
下面我們看下在各個(gè)環(huán)節(jié)的實(shí)現(xiàn)細(xì)節(jié);
Leader Follower 建立通信protected QuorumServer findLeader() { QuorumServer leaderServer = null; // Find the leader by id Vote current = self.getCurrentVote(); for (QuorumServer s : self.getView().values()) { if (s.id == current.getId()) { // Ensure we have the leader"s correct IP address before // attempting to connect. s.recreateSocketAddresses(); leaderServer = s; break; } } if (leaderServer == null) { LOG.warn("Couldn"t find the leader with id = " + current.getId()); } return leaderServer; }
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, ConnectException, InterruptedException { sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); for (int tries = 0; tries < 5; tries++) { try { sock.connect(addr, self.tickTime * self.syncLimit); sock.setTcpNoDelay(nodelay); break; } catch (IOException e) { if (tries == 4) { LOG.error("Unexpected exception",e); throw e; } else { LOG.warn("Unexpected exception, tries="+tries+ ", connecting to " + addr,e); sock = new Socket(); sock.setSoTimeout(self.tickTime * self.initLimit); } } Thread.sleep(1000); } self.authLearner.authenticate(sock, hostname); leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream( sock.getInputStream())); bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); }
follower 會(huì)通過選舉后的投票信息確認(rèn) leader 節(jié)點(diǎn)地址,并發(fā)起連接(總共有 5 次嘗試連接的機(jī)會(huì),若連接不通則重新進(jìn)入選舉過程)
class LearnerCnxAcceptor extends ZooKeeperThread{ private volatile boolean stop = false; public LearnerCnxAcceptor() { super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress()); } @Override public void run() { try { while (!stop) { try{ /** * 接收 follower 的連接,并開啟 LearnerHandler 線程用于處理二者之間的通信 */ Socket s = ss.accept(); s.setSoTimeout(self.tickTime * self.initLimit); s.setTcpNoDelay(nodelay); BufferedInputStream is = new BufferedInputStream( s.getInputStream()); LearnerHandler fh = new LearnerHandler(s, is, Leader.this); fh.start(); } catch (SocketException e) { // 省略 } catch (SaslException e){ LOG.error("Exception while connecting to quorum learner", e); } } } catch (Exception e) { LOG.warn("Exception while accepting follower", e); } } }
從 LearnerCnxAcceptor 實(shí)現(xiàn)可以看出 leader 節(jié)點(diǎn)在為每個(gè) follower 節(jié)點(diǎn)連接建立之后都會(huì)為之分配一個(gè) LearnerHandler 線程用于處理二者之間的通信。
計(jì)算新的 epoch 值follower 在與 leader 建立連接之后,會(huì)發(fā)出 FOLLOWERINFO 信息
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
protected long registerWithLeader(int pktType) throws IOException{ /** * 發(fā)送 follower info 信息,包括 last zxid 和 sid */ long lastLoggedZxid = self.getLastLoggedZxid(); QuorumPacket qp = new QuorumPacket(); qp.setType(pktType); qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0)); /* * Add sid to payload */ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000); ByteArrayOutputStream bsid = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid); boa.writeRecord(li, "LearnerInfo"); qp.setData(bsid.toByteArray()); /** * follower 向 leader 發(fā)送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version */ writePacket(qp, true); // 省略 }
接下來我們看下 leader 在接收到 FOLLOWERINFO 信息之后做什么(參考 LearnerHandler)
public void run() { try { // 省略 /** * leader 接收 follower 發(fā)送的 FOLLOWERINFO 信息,包括 follower 節(jié)點(diǎn)的 zxid,sid,protocol version * @see Learner.registerWithleader() */ QuorumPacket qp = new QuorumPacket(); ia.readRecord(qp, "packet"); byte learnerInfoData[] = qp.getData(); if (learnerInfoData != null) { if (learnerInfoData.length == 8) { // 省略 } else { /** * 高版本的 learnerInfoData 包括 long 類型的 sid, int 類型的 protocol version 占用 12 字節(jié) */ LearnerInfo li = new LearnerInfo(); ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li); this.sid = li.getServerid(); this.version = li.getProtocolVersion(); } } /** * 通過 follower 發(fā)送的 zxid,解析出 foloower 節(jié)點(diǎn)的 epoch 值 */ long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); long peerLastZxid; StateSummary ss = null; long zxid = qp.getZxid(); /** * 阻塞等待計(jì)算新的 epoch 值 */ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); // 省略 }
從上述代碼可知,leader 在接收到 follower 發(fā)送的 FOLLOWERINFO 信息之后,會(huì)解析出 follower 節(jié)點(diǎn)的 acceptedEpoch 值并參與到新的 epoch 值計(jì)算中。 (具體計(jì)算邏輯參考方法 getEpochToPropose)
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException { synchronized(connectingFollowers) { if (!waitingForNewEpoch) { return epoch; } // epoch 用來記錄計(jì)算后的選舉周期值 // follower 或 leader 的 acceptedEpoch 值與 epoch 比較;若前者大則將其加一 if (lastAcceptedEpoch >= epoch) { epoch = lastAcceptedEpoch+1; } // connectingFollowers 用來記錄與 leader 已連接的 follower connectingFollowers.add(sid); QuorumVerifier verifier = self.getQuorumVerifier(); // 判斷是否已計(jì)算出新的 epoch 值的條件是 leader 已經(jīng)參與了 epoch 值計(jì)算,以及超過一半的節(jié)點(diǎn)參與了計(jì)算 if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) { // 將 waitingForNewEpoch 設(shè)置為 false 說明不需要等待計(jì)算新的 epoch 值了 waitingForNewEpoch = false; // 設(shè)置 leader 的 acceptedEpoch 值 self.setAcceptedEpoch(epoch); // 喚醒 connectingFollowers wait 的線程 connectingFollowers.notifyAll(); } else { long start = Time.currentElapsedTime(); long cur = start; long end = start + self.getInitLimit()*self.getTickTime(); while(waitingForNewEpoch && cur < end) { // 若未完成新的 epoch 值計(jì)算則阻塞等待 connectingFollowers.wait(end - cur); cur = Time.currentElapsedTime(); } if (waitingForNewEpoch) { throw new InterruptedException("Timeout while waiting for epoch from quorum"); } } return epoch; } }
從方法 getEpochToPropose 可知 leader 會(huì)收集集群中過半的 follower acceptedEpoch 信息后,選出一個(gè)最大值然后加 1 就是 newEpoch 值; 在此過程中 leader 會(huì)進(jìn)入阻塞狀態(tài)直到過半的 follower 參與到計(jì)算才會(huì)進(jìn)入下一階段。
通知新的 epoch 值leader 在計(jì)算出新的 newEpoch 值后,會(huì)進(jìn)入下一階段發(fā)送 LEADERINFO 信息 (同樣參考 LearnerHandler)
public void run() { try { // 省略 /** * 阻塞等待計(jì)算新的 epoch 值 */ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); if (this.getVersion() < 0x10000) { // we are going to have to extrapolate the epoch information long epoch = ZxidUtils.getEpochFromZxid(zxid); ss = new StateSummary(epoch, zxid); // fake the message leader.waitForEpochAck(this.getSid(), ss); } else { byte ver[] = new byte[4]; ByteBuffer.wrap(ver).putInt(0x10000); /** * 計(jì)算出新的 epoch 值后,leader 向 follower 發(fā)送 LEADERINFO 信息;包括新的 newEpoch */ QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null); oa.writeRecord(newEpochPacket, "packet"); bufferedOutput.flush(); // 省略 } } // 省略 }
protected long registerWithLeader(int pktType) throws IOException{ // 省略 /** * follower 向 leader 發(fā)送 FOLLOWERINFO 信息,包括 zxid,sid,protocol version */ writePacket(qp, true); /** * follower 接收 leader 發(fā)送的 LEADERINFO 信息 */ readPacket(qp); /** * 解析 leader 發(fā)送的 new epoch 值 */ final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid()); if (qp.getType() == Leader.LEADERINFO) { // we are connected to a 1.0 server so accept the new epoch and read the next packet leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt(); byte epochBytes[] = new byte[4]; final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes); /** * new epoch > current accepted epoch 則更新 acceptedEpoch 值 */ if (newEpoch > self.getAcceptedEpoch()) { wrappedEpochBytes.putInt((int)self.getCurrentEpoch()); self.setAcceptedEpoch(newEpoch); } else if (newEpoch == self.getAcceptedEpoch()) { wrappedEpochBytes.putInt(-1); } else { throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch()); } /** * follower 向 leader 發(fā)送 ACKEPOCH 信息,包括 last zxid */ QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null); writePacket(ackNewEpoch, true); return ZxidUtils.makeZxid(newEpoch, 0); } }
從上述代碼可以看出在完成 newEpoch 值計(jì)算后的 leader 與 follower 的交互過程:
leader 向 follower 發(fā)送 LEADERINFO 信息,告知 follower 新的 epoch 值
follower 接收解析 LEADERINFO 信息,若 new epoch 值大于 current accepted epoch 值則更新 acceptedEpoch
follower 向 leader 發(fā)送 ACKEPOCH 信息,反饋 leader 已收到新的 epoch 值,并附帶 follower 節(jié)點(diǎn)的 last zxid
數(shù)據(jù)同步LearnerHandler 中 leader 在收到過半的 ACKEPOCH 信息之后將進(jìn)入數(shù)據(jù)同步階段
public void run() { try { // 省略 // peerLastZxid 為 follower 的 last zxid peerLastZxid = ss.getLastZxid(); /* the default to send to the follower */ int packetToSend = Leader.SNAP; long zxidToSend = 0; long leaderLastZxid = 0; /** the packets that the follower needs to get updates from **/ long updates = peerLastZxid; ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock(); ReadLock rl = lock.readLock(); try { rl.lock(); final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog(); final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog(); LinkedListproposals = leader.zk.getZKDatabase().getCommittedLog(); if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) { /** * follower 與 leader 的 zxid 相同說明 二者數(shù)據(jù)一致;同步方式為差量同步 DIFF,同步的zxid 為 peerLastZxid, 也就是不需要同步 */ packetToSend = Leader.DIFF; zxidToSend = peerLastZxid; } else if (proposals.size() != 0) { // peerLastZxid 介于 minCommittedLog ,maxCommittedLog 中間 if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { /** * 在遍歷 proposals 時(shí),用來記錄上一個(gè) proposal 的 zxid */ long prevProposalZxid = minCommittedLog; boolean firstPacket=true; packetToSend = Leader.DIFF; zxidToSend = maxCommittedLog; for (Proposal propose: proposals) { // 跳過 follower 已經(jīng)存在的提案 if (propose.packet.getZxid() <= peerLastZxid) { prevProposalZxid = propose.packet.getZxid(); continue; } else { if (firstPacket) { firstPacket = false; if (prevProposalZxid < peerLastZxid) { /** * 此時(shí)說明有部分 proposals 提案在 leader 節(jié)點(diǎn)上不存在,則需告訴 follower 丟棄這部分 proposals * 也就是告訴 follower 先執(zhí)行回滾 TRUNC ,需要回滾到 prevProposalZxid 處,也就是 follower 需要丟棄 prevProposalZxid ~ peerLastZxid 范圍內(nèi)的數(shù)據(jù) * 剩余的 proposals 則通過 DIFF 進(jìn)行同步 */ packetToSend = Leader.TRUNC; zxidToSend = prevProposalZxid; updates = zxidToSend; } } /** * 將剩余待 DIFF 同步的提案放入到隊(duì)列中,等待發(fā)送 */ queuePacket(propose.packet); /** * 每個(gè)提案后對(duì)應(yīng)一個(gè) COMMIT 報(bào)文 */ QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(), null, null); queuePacket(qcommit); } } } else if (peerLastZxid > maxCommittedLog) { /** * follower 的 zxid 比 leader 大 ,則告訴 follower 執(zhí)行 TRUNC 回滾 */ packetToSend = Leader.TRUNC; zxidToSend = maxCommittedLog; updates = zxidToSend; } else { } } } finally { rl.unlock(); } QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null); if (getVersion() < 0x10000) { oa.writeRecord(newLeaderQP, "packet"); } else { // 數(shù)據(jù)同步完成之后會(huì)發(fā)送 NEWLEADER 信息 queuedPackets.add(newLeaderQP); } bufferedOutput.flush(); //Need to set the zxidToSend to the latest zxid if (packetToSend == Leader.SNAP) { zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid(); } /** * 發(fā)送數(shù)據(jù)同步方式信息,告訴 follower 按什么方式進(jìn)行數(shù)據(jù)同步 */ oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet"); bufferedOutput.flush(); /* if we are not truncating or sending a diff just send a snapshot */ if (packetToSend == Leader.SNAP) { /** * 如果是全量同步的話,則將 leader 本地?cái)?shù)據(jù)序列化寫入 follower 的輸出流 */ leader.zk.getZKDatabase().serializeSnapshot(oa); oa.writeString("BenWasHere", "signature"); } bufferedOutput.flush(); /** * 開啟個(gè)線程執(zhí)行 packet 發(fā)送 */ sendPackets(); /** * 接收 follower ack 響應(yīng) */ qp = new QuorumPacket(); ia.readRecord(qp, "packet"); /** * 阻塞等待過半的 follower ack */ leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType()); /** * leader 向 follower 發(fā)送 UPTODATE,告知其可對(duì)外提供服務(wù) */ queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null)); // 省略 } }
從上述代碼可以看出 leader 和 follower 在進(jìn)行數(shù)據(jù)同步時(shí)會(huì)通過 peerLastZxid 與 maxCommittedLog, minCommittedLog 兩個(gè)值比較最終決定數(shù)據(jù)同步方式。
DIFF(差異化同步)follower 的 peerLastZxid 等于 leader 的 peerLastZxid
此時(shí)說明 follower 與 leader 數(shù)據(jù)一致,采用 DIFF 方式同步,也即是無需同步
follower 的 peerLastZxid 介于 maxCommittedLog, minCommittedLog 兩者之間
此時(shí)說明 follower 與 leader 數(shù)據(jù)存在差異,需對(duì)差異的部分進(jìn)行同步;首先 leader 會(huì)向 follower 發(fā)送 DIFF 報(bào)文告知其同步方式,隨后會(huì)發(fā)送差異的提案及提案提交報(bào)文
交互流程如下:
Leader Follower | DIFF | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> |
示例: 假設(shè) leader 節(jié)點(diǎn)的提案緩存隊(duì)列對(duì)應(yīng)的 zxid 依次是:
0x500000001, 0x500000002, 0x500000003, 0x500000004, 0x500000005
而 follower 節(jié)點(diǎn)的 peerLastZxid 為 0x500000003,則需要將 0x500000004, 0x500000005 兩個(gè)提案進(jìn)行同步;那么數(shù)據(jù)包發(fā)送過程如下表:
報(bào)文類型 | ZXID |
---|---|
DIFF | 0x500000005 |
PROPOSAL | 0x500000004 |
COMMIT | 0x500000004 |
PROPOSAL | 0x500000005 |
COMMIT | 0x500000005 |
在上文 DIFF 差異化同步時(shí)會(huì)存在一個(gè)特殊場(chǎng)景就是 雖然 follower 的 peerLastZxid 介于 maxCommittedLog, minCommittedLog 兩者之間,但是 follower 的 peerLastZxid 在 leader 節(jié)點(diǎn)中不存在; 此時(shí) leader 需告知 follower 先回滾到 peerLastZxid 的前一個(gè) zxid, 回滾后再進(jìn)行差異化同步。
交互流程如下:
Leader Follower | TRUNC | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> | | PROPOSAL | | --------------------> | | COMMIT | | --------------------> |
示例: 假設(shè)集群中三臺(tái)節(jié)點(diǎn) A, B, C 某一時(shí)刻 A 為 Leader 選舉周期為 5, zxid 包括: (0x500000004, 0x500000005, 0x500000006); 假設(shè)某一時(shí)刻 leader A 節(jié)點(diǎn)在處理完事務(wù)為 0x500000007 的請(qǐng)求進(jìn)行廣播時(shí) leader A 節(jié)點(diǎn)服務(wù)器宕機(jī)導(dǎo)致 0x500000007 該事物沒有被同步出去;在集群進(jìn)行下一輪選舉之后 B 節(jié)點(diǎn)成為新的 leader,選舉周期為 6 對(duì)外提供服務(wù)處理了新的事務(wù)請(qǐng)求包括 0x600000001, 0x600000002;
集群節(jié)點(diǎn) | ZXID 列表 |
---|---|
A | 0x500000004, 0x500000005, 0x500000006, 0x500000007 |
B | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
C | 0x500000004, 0x500000005, 0x500000006, 0x600000001, 0x600000002 |
此時(shí)節(jié)點(diǎn) A 在重啟加入集群后,在與 leader B 節(jié)點(diǎn)進(jìn)行數(shù)據(jù)同步時(shí)會(huì)發(fā)現(xiàn)事務(wù) 0x500000007 在 leader 節(jié)點(diǎn)中并不存在,此時(shí) leader 告知 A 需先回滾事務(wù)到 0x500000006,在差異同步事務(wù) 0x600000001,0x600000002;那么數(shù)據(jù)包發(fā)送過程如下表:
報(bào)文類型 | ZXID |
---|---|
TRUNC | 0x500000006 |
PROPOSAL | 0x600000001 |
COMMIT | 0x600000001 |
PROPOSAL | 0x600000002 |
COMMIT | 0x600000002 |
若 follower 的 peerLastZxid 大于 leader 的 maxCommittedLog,則告知 follower 回滾至 maxCommittedLog; 該場(chǎng)景可以認(rèn)為是 TRUNC+DIFF 的簡化模式
交互流程如下:
Leader Follower | TRUNC | | --------------------> |SNAP(全量同步)
若 follower 的 peerLastZxid 小于 leader 的 minCommittedLog 或者 leader 節(jié)點(diǎn)上不存在提案緩存隊(duì)列時(shí),將采用 SNAP 全量同步方式。 該模式下 leader 首先會(huì)向 follower 發(fā)送 SNAP 報(bào)文,隨后從內(nèi)存數(shù)據(jù)庫中獲取全量數(shù)據(jù)序列化傳輸給 follower, follower 在接收全量數(shù)據(jù)后會(huì)進(jìn)行反序列化加載到內(nèi)存數(shù)據(jù)庫中。
交互流程如下:
Leader Follower | SNAP | | --------------------> | | DATA | | --------------------> |
leader 在完成數(shù)據(jù)同步之后,會(huì)向 follower 發(fā)送 NEWLEADER 報(bào)文,在收到過半的 follower 響應(yīng)的 ACK 之后此時(shí)說明過半的節(jié)點(diǎn)完成了數(shù)據(jù)同步,接下來 leader 會(huì)向 follower 發(fā)送 UPTODATE 報(bào)文告知 follower 節(jié)點(diǎn)可以對(duì)外提供服務(wù)了,此時(shí) leader 會(huì)啟動(dòng) zk server 開始對(duì)外提供服務(wù)。
FOLLOWER 數(shù)據(jù)同步下面我們?cè)诳聪聰?shù)據(jù)同步階段 FOLLOWER 是如何處理的,參考 Learner.syncWithLeader
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); QuorumPacket qp = new QuorumPacket(); long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid); /** * 接收 leader 發(fā)送的數(shù)據(jù)同步方式報(bào)文 */ readPacket(qp); synchronized (zk) { if (qp.getType() == Leader.DIFF) { } else if (qp.getType() == Leader.SNAP) { // 執(zhí)行加載全量數(shù)據(jù) } else if (qp.getType() == Leader.TRUNC) { // 執(zhí)行回滾 } else { } outerLoop: while (self.isRunning()) { readPacket(qp); switch(qp.getType()) { case Leader.PROPOSAL: // 處理提案 break; case Leader.COMMIT: // commit proposal break; case Leader.INFORM: // 忽略 break; case Leader.UPTODATE: // 設(shè)置 zk server self.cnxnFactory.setZooKeeperServer(zk); // 退出循環(huán) break outerLoop; case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery /** * follower 響應(yīng) NEWLEADER ACK */ writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true); break; } } } ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0)); writePacket(ack, true); // 啟動(dòng) zk server zk.startup(); }
從上述代碼中可以看出 follower 在數(shù)據(jù)同步階段的處理流程如下:
follower 接收 leader 發(fā)送的數(shù)據(jù)同步方式(DIFF/TRUNC/SANP)報(bào)文并進(jìn)行相應(yīng)處理
當(dāng) follower 收到 leader 發(fā)送的 NEWLEADER 報(bào)文后,會(huì)向 leader 響應(yīng) ACK (leader 在收到過半的 ACK 消息之后會(huì)發(fā)送 UPTODATE)
當(dāng) follower 收到 leader 發(fā)送的 UPTODATE 報(bào)文后,說明此時(shí)可以對(duì)外提供服務(wù),此時(shí)將啟動(dòng) zk server
小結(jié)最后用一張圖總結(jié)下 zk 在完成選舉后數(shù)據(jù)同步的過程如下圖所示:
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74515.html
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂?,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)??梢娝氖褂茫陂_始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供的隊(duì)列式...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供的隊(duì)列式...
閱讀 1342·2023-04-26 00:10
閱讀 2437·2021-09-22 15:38
閱讀 3802·2021-09-22 15:13
閱讀 3518·2019-08-30 13:11
閱讀 655·2019-08-30 11:01
閱讀 3040·2019-08-29 14:20
閱讀 3220·2019-08-29 13:27
閱讀 1734·2019-08-29 11:33