成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Elasticsearch分布式一致性原理剖析(三)-Data篇

RyanQ / 836人閱讀

摘要:前言分布式一致性原理剖析系列將會對的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式原理以及其存在的問題等基于版本。使用額外的一致性組件維護。管理的全局組件,其保證數(shù)據(jù)的一致性。將這個加入自己的,同時向所有發(fā)送請求,要求將這個加入。

前言
“Elasticsearch分布式一致性原理剖析”系列將會對Elasticsearch的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式、原理以及其存在的問題等(基于6.2版本)。前兩篇文章介紹了ES中集群如何組成,master選舉算法,master更新meta的流程等,并分析了選舉、Meta更新中的一致性問題。本文會分析ES中的數(shù)據(jù)流,包括其寫入流程、算法模型PacificA、SequenceNumber與Checkpoint等,并比較ES的實現(xiàn)與標準PacificA算法的異同。目錄如下:

問題背景

數(shù)據(jù)寫入流程

PacificA算法

SequenceNumber、Checkpoint與故障恢復

ES與PacificA的比較

小結

問題背景

用過ES的同學都知道,ES中每個Index會劃分為多個Shard,Shard分布在不同的Node上,以此來實現(xiàn)分布式的存儲和查詢,支撐大規(guī)模的數(shù)據(jù)集。對于每個Shard,又會有多個Shard的副本,其中一個為Primary,其余的一個或多個為Replica。數(shù)據(jù)在寫入時,會先寫入Primary,由Primary將數(shù)據(jù)再同步給Replica。在讀取時,為了提高讀取能力,Primary和Replica都會接受讀請求。

在這種模型下,我們能夠感受到ES具有這樣的一些特性,比如:

數(shù)據(jù)高可靠:數(shù)據(jù)具有多個副本。

服務高可用:Primary掛掉之后,可以從Replica中選出新的Primary提供服務。

讀能力擴展:Primary和Replica都可以承擔讀請求。

故障恢復能力:Primary或Replica掛掉都會導致副本數(shù)不足,此時可以由新的Primary通過復制數(shù)據(jù)產(chǎn)生新的副本。
另外,我們也可以想到一些問題,比如:

數(shù)據(jù)怎么從Primary復制到Replica?

一次寫入要求所有副本都成功嗎?
Primary掛掉會丟數(shù)據(jù)嗎?
數(shù)據(jù)從Replica讀,總是能讀到最新數(shù)據(jù)嗎?
故障恢復時,需要拷貝Shard下的全部數(shù)據(jù)嗎?
可以看到,對于ES中的數(shù)據(jù)一致性,雖然我們可以很容易的了解到其大概原理,但是對其細節(jié)我們還有很多的困惑。那么本文就從ES的寫入流程,采用的一致性算法,SequenceId和Checkpoint的設計等方面來介紹ES如何工作,進而回答上述這些問題。需要注意的是,本文基于ES6.2版本進行分析,可能很多內容并不適用于ES之前的版本,比如2.X的版本等。

數(shù)據(jù)寫入流程

首先我們來看一下數(shù)據(jù)的寫入流程,讀者也可以閱讀這篇文章來詳細了解:https://zhuanlan.zhihu.com/p/...。

Replication角度: Primary -> Replica
我們從大的角度來看,ES寫入流程為先寫入Primary,再并發(fā)寫入Replica,最后應答客戶端,流程如下:

檢查Active的Shard數(shù)。

String activeShardCountFailure = checkActiveShardCount();

寫入Primary。

String activeShardCountFailure = checkActiveShardCount();

primaryResult = primary.perform(request);

并發(fā)的向所有Replicate發(fā)起寫入請求

performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());

等所有Replicate返回或者失敗后,返回給Client。

private void decPendingAndFinishIfNeeded() {
  assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
  if (pendingActions.decrementAndGet() == 0) {
      finish();
  }
}

上述過程在ReplicationOperation類的execute函數(shù)中,完整代碼如下:

 public void execute() throws Exception {
        final String activeShardCountFailure = checkActiveShardCount();
        final ShardRouting primaryRouting = primary.routingEntry();
        final ShardId primaryId = primaryRouting.shardId();
        if (activeShardCountFailure != null) {
            finishAsFailed(new UnavailableShardsException(primaryId,
                "{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
            return;
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
        primaryResult = primary.perform(request);
        primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
        final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
        if (replicaRequest != null) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
            }

            // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics.
            // we have to make sure that every operation indexed into the primary after recovery start will also be replicated
            // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then.
            // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint
            // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
            // of the sampled replication group, and advanced further than what the given replication group would allow it to.
            // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
            final long globalCheckpoint = primary.globalCheckpoint();
            final ReplicationGroup replicationGroup = primary.getReplicationGroup();
            markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
            performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
        }

        successfulShards.incrementAndGet();  // mark primary as successful
        decPendingAndFinishIfNeeded();
    }

下面我們針對這個流程,來分析幾個問題:

1. 為什么第一步要檢查Active的Shard數(shù)?

ES中有一個參數(shù),叫做wait_for_active_shards,這個參數(shù)是Index的一個setting,也可以在請求中帶上這個參數(shù)。這個參數(shù)的含義是,在每次寫入前,該shard至少具有的active副本數(shù)。假設我們有一個Index,其每個Shard有3個Replica,加上Primary則總共有4個副本。如果配置wait_for_active_shards為3,那么允許最多有一個Replica掛掉,如果有兩個Replica掛掉,則Active的副本數(shù)不足3,此時不允許寫入。

這個參數(shù)默認是1,即只要Primary在就可以寫入,起不到什么作用。如果配置大于1,可以起到一種保護的作用,保證寫入的數(shù)據(jù)具有更高的可靠性。但是這個參數(shù)只在寫入前檢查,并不保證數(shù)據(jù)一定在至少這些個副本上寫入成功,所以并不是嚴格保證了最少寫入了多少個副本。關于這一點,可參考以下官方文檔:

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
...It is important to note that this setting greatly reduces the chances of the write operation not writing to the requisite number of shard copies, but it does not completely eliminate the possibility, because this check occurs before the write operation commences. Once the write operation is underway, it is still possible for replication to fail on any number of shard copies but still succeed on the primary. The _shards section of the write operation’s response reveals the number of shard copies on which replication succeeded/failed.

2. 寫入Primary完成后,為何要等待所有Replica響應(或連接失敗)后返回
在更早的ES版本,Primary和Replica之間是允許異步復制的,即寫入Primary成功即可返回。但是這種模式下,如果Primary掛掉,就有丟數(shù)據(jù)的風險,而且從Replica讀數(shù)據(jù)也很難保證能讀到最新的數(shù)據(jù)。所以后來ES就取消異步模式了,改成Primary等Replica返回后再返回給客戶端。

因為Primary要等所有Replica返回才能返回給客戶端,那么延遲就會受到最慢的Replica的影響,這確實是目前ES架構的一個弊端。之前曾誤認為這里是等wait_for_active_shards個副本寫入成功即可返回,但是后來讀源碼發(fā)現(xiàn)是等所有Replica返回的。

    https://github.com/elastic/elasticsearch/blob/master/docs/reference/docs/data-replication.asciidoc
... Once all replicas have successfully performed the operation and responded to the primary, the primary acknowledges the successful completion of the request to the client.

如果Replica寫入失敗,ES會執(zhí)行一些重試邏輯等,但最終并不強求一定要在多少個節(jié)點寫入成功。在返回的結果中,會包含數(shù)據(jù)在多少個shard中寫入成功了,多少個失敗了:如果Replica寫入失敗,ES會執(zhí)行一些重試邏輯等,但最終并不強求一定要在多少個節(jié)點寫入成功。在返回的結果中,會包含數(shù)據(jù)在多少個shard中寫入成功了,多少個失敗了:
如果Replica寫入失敗,ES會執(zhí)行一些重試邏輯等,但最終并不強求一定要在多少個節(jié)點寫入成功。在返回的結果中,會包含數(shù)據(jù)在多少個shard中寫入成功了,多少個失敗了:

3. 如果某個Replica持續(xù)寫失敗,用戶是否會經(jīng)常查到舊數(shù)據(jù)?
這個問題是說,假如一個Replica持續(xù)寫入失敗,那么這個Replica上的數(shù)據(jù)可能落后Primary很多。我們知道ES中Replica也是可以承擔讀請求的,那么用戶是否會讀到這個Replica上的舊數(shù)據(jù)呢?

答案是如果一個Replica寫失敗了,Primary會將這個信息報告給Master,然后Master會在Meta中更新這個Index的InSyncAllocations配置,將這個Replica從中移除,移除后它就不再承擔讀請求。在Meta更新到各個Node之前,用戶可能還會讀到這個Replica的數(shù)據(jù),但是更新了Meta之后就不會了。所以這個方案并不是非常的嚴格,考慮到ES本身就是一個近實時系統(tǒng),數(shù)據(jù)寫入后需要refresh才可見,所以一般情況下,在短期內讀到舊數(shù)據(jù)應該也是可接受的。

ReplicationOperation.java,寫入Replica失敗的OnFailure函數(shù):

            public void onFailure(Exception replicaException) {
                logger.trace(
                    (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(),
                        opType,
                        shard,
                        replicaRequest),
                    replicaException);
                if (TransportActions.isShardNotAvailableException(replicaException)) {
                    decPendingAndFinishIfNeeded();
                } else {
                    RestStatus restStatus = ExceptionsHelper.status(replicaException);
                    shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                        shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, message,
                            replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
                            ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
                }
            }

調用failShardIfNeeded:

        public void failShardIfNeeded(ShardRouting replica, String message, Exception exception,
                                      Runnable onSuccess, Consumer onPrimaryDemoted, Consumer onIgnoredFailure) {

            logger.warn((org.apache.logging.log4j.util.Supplier)
                    () -> new ParameterizedMessage("[{}] {}", replica.shardId(), message), exception);
            shardStateAction.remoteShardFailed(replica.shardId(), replica.allocationId().getId(), primaryTerm, message, exception,
                    createListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
        }

shardStateAction.remoteShardFailed向Master發(fā)送請求,執(zhí)行該Replica的ShardFailed邏輯,將Shard從InSyncAllocation中移除。

    public void shardFailed(ShardRouting failedShard, UnassignedInfo unassignedInfo) {
        if (failedShard.active() && unassignedInfo.getReason() != UnassignedInfo.Reason.NODE_LEFT) {
            removeAllocationId(failedShard);

            if (failedShard.primary()) {
                Updates updates = changes(failedShard.shardId());
                if (updates.firstFailedPrimary == null) {
                    // more than one primary can be failed (because of batching, primary can be failed, replica promoted and then failed...)
                    updates.firstFailedPrimary = failedShard;
                }
            }
        }

        if (failedShard.active() && failedShard.primary()) {
            increasePrimaryTerm(failedShard.shardId());
        }
    }

Primary自身角度
從Primary自身的角度,一次寫入請求會先寫入Lucene,然后寫入translog。具體流程可以看這篇文章:https://zhuanlan.zhihu.com/p/... 。

1. 為什么要寫translog?
translog類似于數(shù)據(jù)庫中的commitlog,或者binlog。只要translog寫入成功并flush,那么這筆數(shù)據(jù)就落盤了,數(shù)據(jù)安全性有了保證,Segment就可以晚一點落盤。因為translog是append方式寫入,寫入性能也會比隨機寫更高。

另一方面是,translog記錄了每一筆數(shù)據(jù)更改,以及數(shù)據(jù)更改的順序,所以translog也可以用于數(shù)據(jù)恢復。數(shù)據(jù)恢復包含兩方面,一方面是節(jié)點重啟后,從translog中恢復重啟前還未落盤的Segment數(shù)據(jù),另一方面是用于Primary和新的Replica之間的數(shù)據(jù)同步,即Replica逐步追上Primary數(shù)據(jù)的過程。

2. 為什么先寫Lucene,再寫translog?
寫Lucene是寫入內存,寫入后在內存中refresh即可讀到,寫translog是落盤,為了數(shù)據(jù)持久化以及恢復。正常來講,分布式系統(tǒng)中是先寫commitLog進行數(shù)據(jù)持久化,再在內存中apply這次更改,那么ES為什么要反其道而行之呢?主要原因大概是寫入Lucene時,Lucene會再對數(shù)據(jù)進行一些檢查,有可能出現(xiàn)寫入Lucene失敗的情況。如果先寫translog,那么就要處理寫入translog成功但是寫入Lucene一直失敗的問題,所以ES采用了先寫Lucene的方式。

PacificA算法

PacificA是微軟亞洲研究院提出的一種用于日志復制系統(tǒng)的分布式一致性算法,論文發(fā)表于2008年(PacificA paper)。ES官方明確提出了其Replication模型基于該算法:

https://github.com/elastic/elasticsearch/blob/master/docs/reference/docs/data-replication.asciidoc
Elasticsearch’s data replication model is based on the primary-backup model and is described very well in the PacificA paper of Microsoft Research. That model is based on having a single copy from the replication group that acts as the primary shard. The other copies are called replica shards. The primary serves as the main entry point for all indexing operations. It is in charge of validating them and making sure they are correct. Once an index operation has been accepted by the primary, the primary is also responsible for replicating the operation to the other copies.

網(wǎng)上講解這個算法的文章較少,因此本文根據(jù)PacificA的論文,簡單介紹一下這個算法。該算法具有以下幾個特點:

強一致性。
單Primary向多Secondary的數(shù)據(jù)同步模式。
使用額外的一致性組件維護Configuration。
少數(shù)派Replica可用時仍可寫入。
一些名詞
首先我們介紹一下算法中的一些名詞:

Replica Group:一個互為副本的數(shù)據(jù)集合叫做Replica Group,每個副本是一個Replica。一個Replica Group中只有一個副本是Primary,其余為Secondary。
Configuration:一個Replica Group的Configuration描述了這個Replica Group包含哪些副本,其中Primary是誰等。
Configuration Version:Configuration的版本號,每次Configuration發(fā)生變更時加1。
Configuration Manager: 管理Configuration的全局組件,其保證Configuration數(shù)據(jù)的一致性。Configuration變更會由某個Replica發(fā)起,帶著Version發(fā)送給Configuration Manager,Configuration Manager會檢查Version是否正確,如果不正確則拒絕更改。
Query & Update:對一個Replica Group的操作分為兩種,Query和Update,Query不會改變數(shù)據(jù),Update會更改數(shù)據(jù)。
Serial Number(sn):代表每個Update操作執(zhí)行的順序,每次Update操作加1,為連續(xù)的數(shù)字。
Prepared List:Update操作的準備序列。
Committed List:Update操作的提交序列,提交序列中的操作一定不會丟失(除非全部副本掛掉)。在同一個Replica上,Committed List一定是Prepared List的前綴。
Primary Invariant
在PacificA算法中,要求采用某種錯誤檢測機制來滿足以下不變式:

Primary Invariant: 任何時候,當一個Replica認為自己是Primary時,Configuration Manager中維護的Configuration也認為其是當前的Primary。任何時候,最多只有一個Replica認為自己是這個Replica Group的Primary。

Primary Invariant保證了當一個節(jié)點認為自己是Primary時,其肯定是當前的Primary。如果不能滿足Primary Invariant,那么Query請求就可能發(fā)送給Old Primary,讀到舊的數(shù)據(jù)。

怎么保證滿足Primary Invariant呢?論文給出的一種方法是通過Lease機制,這也是分布式系統(tǒng)中常用的一種方式。具體來說,Primary會定期獲取一個Lease,獲取之后認為某段時間內自己肯定是Primary,一旦超過這個時間還未獲取到新的Lease就退出Primary狀態(tài)。只要各個機器的CPU不出現(xiàn)較大的時鐘漂移,那么就能夠保證Lease機制的有效性。

論文中實現(xiàn)Lease機制的方式是,Primary定期向所有Secondary發(fā)送心跳來獲取Lease,而不是所有節(jié)點都向某個中心化組件獲取Lease。這樣的好處是分散了壓力,不會出現(xiàn)中心化組件故障而導致所有節(jié)點失去Lease的情況。

Query
Query流程比較簡單,Query只能發(fā)送給Primary,Primary根據(jù)最新commit的數(shù)據(jù),返回對應的值。由于算法要求滿足Primary Invariant,所以Query總是能讀到最新commit的數(shù)據(jù)。

Update
Update流程如下:

Primary分配一個Serial Number(簡稱sn)給一個UpdateRequest。
Primary將這個UpdateRequest加入自己的Prepared List,同時向所有Secondary發(fā)送Prepare請求,要求將這個UpdateRequest加入Prepared List。
當所有Replica都完成了Prepare,即所有Replica的Prepared List中都包含了該Update請求時,Primary開始Commit這個請求,即將這個UpdateRequest放入Committed List中,同時Apply這個Update。需要注意的是,同一個Replica上,Committed List永遠是Prepared List的前綴,所以Primary實際上是提高Committed Point,把這個Update Request包含進來。
返回客戶端,Update操作成功。
當下一次Primary向Secondary發(fā)送請求時,會帶上Primary當前的Committed Point,此時Secondary才會提高自己的Committed Point。

從Update流程我們可以得出以下不變式:

Commited Invariant
我們把某一個Secondary的Committed List記為SecondaryCommittedList,其Prepared List記為SecondaryPreparedList,把Primary的Committed List記為PrimaryCommittedList。

Commited Invariant:SecondaryCommittedList一定是PrimaryCommittedList的前綴,PrimaryCommittedList一定是SecondaryPreparedList的前綴。

Reconfiguration:Secondary故障,Primary故障,新加節(jié)點

Secondary故障

當一個Secondary故障時,Primary向Configuration Manager發(fā)起Reconfiguration,將故障節(jié)點從Replica Group中刪除。一旦移除這個Replica,它就不屬于這個Replica Group了,所有請求都不會再發(fā)給它。

假設某個Primary和Secondary發(fā)生了網(wǎng)絡分區(qū),但是都可以連接Configuration Manager。這時候Primary會檢測到Secondary沒有響應了,Secondary也會檢測到Primary沒有響應。此時兩者都會試圖發(fā)起Reconfiguration,將對方從Replica Group中移除,這里的策略是First Win的原則,誰先到Configuration Manager中更改成功,誰就留在Replica Group里,而另外一個已經(jīng)不屬于Replica Group了,也就無法再更新Configuration了。由于Primary會向Secondary請求一個Lease,在Lease有效期內Secondary不會執(zhí)行Reconfiguration,而Primary的探測間隔必然是小于Lease時間的,所以我認為這種情況下總是傾向于Primary先進行Reconfiguration,將Secondary剔除。

Primary故障

當一個Primary故障時,Secondary會收不到Primary的心跳,如果超過Lease的時間,那么Secondary就會發(fā)起Reconfiguration,將Primary剔除,這里也是First Win的原則,哪個Secondary先成功,就會變成Primary。

當一個Secondary變成Primary后,需要先經(jīng)過一個叫做Reconciliation的階段才能提供服務

由于上述的Commited Invariant,所以原先的Primary的Committed List一定是新的Primary的Prepared List的前綴,那么我們將新的Primary的Prepared List中的內容與當前Replica Group中的其他節(jié)點對齊,相當于把該節(jié)點上未Commit的記錄在所有節(jié)點上再Commit一次,那么就一定包含之前所有的Commit記錄。即以下不變式:

Reconfiguration Invariant:當一個新的Primary在T時刻完成Reconciliation時,那么T時刻之前任何節(jié)點(包括原Primary)的Commited List都是新Primary當前Commited List的前綴。

Reconfiguration Invariant表明了已經(jīng)Commit的數(shù)據(jù)在Reconfiguration過程中不會丟。

新加節(jié)點

新加的節(jié)點需要先成為Secondary Candidate,這時候Primary就開始向其發(fā)送Prepare請求,此時這個節(jié)點還會追之前未同步過來的記錄,一旦追平,就申請成為一個Secondary,然后Primary向Configuration Manager發(fā)起配置變更,將這個節(jié)點加入Replica Group。

還有一種情況時,如果一個節(jié)點曾經(jīng)在Replica Group中,由于臨時發(fā)生故障被移除,現(xiàn)在需要重新加回來。此時這個節(jié)點上的Commited List中的數(shù)據(jù)肯定是已經(jīng)被Commit的了,但是Prepared List中的數(shù)據(jù)未必被Commit,所以應該將未Commit的數(shù)據(jù)移除,從Committed Point開始向Primary請求數(shù)據(jù)。

算法總結
PacificA是一個讀寫都滿足強一致性的算法,它把數(shù)據(jù)的一致性與配置(Configuration)的一致性分開,使用額外的一致性組件(Configuration Manager)維護配置的一致性,在數(shù)據(jù)的可用副本數(shù)少于半數(shù)時,仍可以寫入新數(shù)據(jù)并保證強一致性。

ES在設計上參考了PacificA算法,其通過Master維護Index的Meta,類似于論文中的Configuration Manager維護Configuration。其IndexMeta中的InSyncAllocationIds代表了當前可用的Shard,類似于論文中維護Replica Group。下一節(jié)我們會介紹ES中的SequenceNumber和Checkpoint,這兩個類似于PacificA算法中的Serial Number和Committed Point,在這一節(jié)之后,會再有一節(jié)來比較ES的實現(xiàn)與PacificA的異同。

SequenceNumber、Checkpoint與故障恢復
上面介紹了ES的一致性算法模型PacificA,該算法很重要的一點是每個Update操作都會有一個對應的Serial Number,表示執(zhí)行的順序。在之前的ES版本中,每個寫入操作并沒有類似Serial Number的東西,所以很多事情做不了。在15年的時候,ES官方開始規(guī)劃給每個寫操作加入SequenceNumber,并設想了很多應用場景。具體信息可以參考以下兩個鏈接:

Add Sequence Numbers to write operations #10708

Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

下面我們簡單介紹一下Sequence、Checkpoint是什么,以及其應用場景。

Term和SequenceNumber
每個寫操作都會分配兩個值,Term和SequenceNumber。Term在每次Primary變更時都會加1,類似于PacificA論文中的Configuration Version。SequenceNumber在每次操作后加1,類似于PacificA論文中的Serial Number。

由于寫請求總是發(fā)給Primary,所以Term和SequenceNumber會由Primary分配,在向Replica發(fā)送同步請求時,會帶上這兩個值。

LocalCheckpoint和GlobalCheckpoint
LocalCheckpoint代表本Shard中所有小于該值的請求都已經(jīng)處理完畢。

GlobalCheckpoint代表所有小于該值的請求在所有的Replica上都處理完畢。GlobalCheckpoint會由Primary進行維護,每個Replica會向Primary匯報自己的LocalCheckpoint,Primary根據(jù)這些信息來提升GlobalCheckpoint。

GlobalCheckpoint是一個全局的安全位置,代表其前面的請求都被所有Replica正確處理了,可以應用在節(jié)點故障恢復后的數(shù)據(jù)回補。另一方面,GlobalCheckpoint也可以用于Translog的GC,因為之前的操作記錄可以不保存了。不過ES中Translog的GC策略是按照大小或者時間,好像并沒有使用GlobalCheckpoint。

快速故障恢復
當一個Replica故障時,ES會將其移除,當故障超過一定時間,ES會分配一個新的Replica到新的Node上,此時需要全量同步數(shù)據(jù)。但是如果之前故障的Replica回來了,就可以只回補故障之后的數(shù)據(jù),追平后加回來即可,實現(xiàn)快速故障恢復。實現(xiàn)快速故障恢復的條件有兩個,一個是能夠保存故障期間所有的操作以及其順序,另一個是能夠知道從哪個點開始同步數(shù)據(jù)。第一個條件可以通過保存一定時間的Translog實現(xiàn),第二個條件可以通過Checkpoint實現(xiàn),所以就能夠實現(xiàn)快速的故障恢復。這是SequenceNumber和Checkpoint的第一個重要應用場景。

ES與PacificA的比較
相同點
Meta一致性和Data一致性分開處理:PacificA中通過Configuration Manager維護Configuration的一致性,ES中通過Master維護Meta的一致性。
維護同步中的副本集合:PacificA中維護Replica Group,ES中維護InSyncAllocationIds。
SequenceNumber:在PacificA和ES中,寫操作都具有SequenceNumber,記錄操作順序。
不同點
不同點主要體現(xiàn)在ES雖然遵循PacificA,但是目前其實現(xiàn)還有很多地方不滿足算法要求,所以不能保證嚴格的強一致性。主要有以下幾點:

Meta一致性:上一篇中分析了ES中Meta一致性的問題,可以看到ES并不能完全保證Meta一致性,因此也必然無法嚴格保證Data的一致性。
Prepare階段:PacificA中有Prepare階段,保證數(shù)據(jù)在所有節(jié)點Prepare成功后才能Commit,保證Commit的數(shù)據(jù)不丟,ES中沒有這個階段,數(shù)據(jù)會直接寫入。
讀一致性:ES中所有InSync的Replica都可讀,提高了讀能力,但是可能讀到舊數(shù)據(jù)。另一方面是即使只能讀Primary,ES也需要Lease機制等避免讀到Old Primary。因為ES本身是近實時系統(tǒng),所以讀一致性要求可能并不嚴格。
小結
本文分析了ES中數(shù)據(jù)流的一致性問題,可以看到ES最近幾年在這一塊有很多進展,但也存在許多問題。本文是Elasticsearch分布式一致性原理剖析的最后一篇文章,該系列文章是對ES的一個調研分析總結,逐步分析了ES中的節(jié)點發(fā)現(xiàn)、Master選舉、Meta一致性、Data一致性等,對能夠讀完該系列文章的同學說一聲感謝,期待與大家的交流。

詳情請閱讀原文

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/17711.html

相關文章

  • Elasticsearch分布一致原理剖析()-Data

    摘要:前言分布式一致性原理剖析系列將會對的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式原理以及其存在的問題等基于版本。使用額外的一致性組件維護。管理的全局組件,其保證數(shù)據(jù)的一致性。將這個加入自己的,同時向所有發(fā)送請求,要求將這個加入。 前言Elasticsearch分布式一致性原理剖析系列將會對Elasticsearch的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式、原理以及其存在的問題...

    cfanr 評論0 收藏0
  • Elasticsearch分布一致原理剖析(二)-Meta

    摘要:前言分布式一致性原理剖析系列將會對的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式原理以及其存在的問題等基于版本。中需要持久化的包括當前版本號,每次更新加。收集不到足夠的,于是本次發(fā)布失敗,同時退出狀態(tài)。 前言Elasticsearch分布式一致性原理剖析系列將會對Elasticsearch的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式、原理以及其存在的問題等(基于6.2版本)。前一...

    CntChen 評論0 收藏0
  • Elasticsearch分布一致原理剖析(二)-Meta

    摘要:前言分布式一致性原理剖析系列將會對的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式原理以及其存在的問題等基于版本。中需要持久化的包括當前版本號,每次更新加。收集不到足夠的,于是本次發(fā)布失敗,同時退出狀態(tài)。 前言Elasticsearch分布式一致性原理剖析系列將會對Elasticsearch的分布式一致性原理進行詳細的剖析,介紹其實現(xiàn)方式、原理以及其存在的問題等(基于6.2版本)。前一...

    TIGERB 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<