成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Gossip協(xié)議在Cassandra中的實(shí)現(xiàn)

hyuan / 927人閱讀

摘要:協(xié)議是什么簡單來說就是一種去中心化點(diǎn)對點(diǎn)的數(shù)據(jù)廣播協(xié)議你可以把它理解為病毒的傳播。傳染給,繼續(xù)傳染給如此下去。比如說服務(wù)發(fā)現(xiàn)框架就用了協(xié)議來做管理主機(jī)的關(guān)系以及集群之間的消息廣播,也用到了這個協(xié)議,用來實(shí)現(xiàn)一些節(jié)點(diǎn)發(fā)現(xiàn)健康檢查等。

Gossip協(xié)議是什么?

? 簡單來說就是一種去中心化、點(diǎn)對點(diǎn)的數(shù)據(jù)廣播協(xié)議,你可以把它理解為病毒的傳播。A傳染給B,B繼續(xù)傳染給C,如此下去。

? 協(xié)議本身只有一些簡單的限制,狀態(tài)更新的時間隨著參與主機(jī)數(shù)的增長以對數(shù)的速率增長,即使是一些節(jié)點(diǎn)掛掉或者消息丟失也沒關(guān)系。很多的分布式系統(tǒng)都用gossip 協(xié)議來解決自己遇到的一些難題。比如說服務(wù)發(fā)現(xiàn)框架consul就用了gossip協(xié)議( Serf)來做管理主機(jī)的關(guān)系以及集群之間的消息廣播,Cassandra也用到了這個協(xié)議,用來實(shí)現(xiàn)一些節(jié)點(diǎn)發(fā)現(xiàn)、健康檢查等。

通信流程 概述

首先系統(tǒng)需要配置幾個種子節(jié)點(diǎn),比如說A、B, 每個參與的節(jié)點(diǎn)都會維護(hù)所有節(jié)點(diǎn)的狀態(tài),node->(Key,Value,Version),版本號較大的說明其數(shù)據(jù)較新,節(jié)點(diǎn)P只能直接更新它自己的狀態(tài),節(jié)點(diǎn)P只能間接的通過gossip協(xié)議來更新本機(jī)維護(hù)的其他節(jié)點(diǎn)的數(shù)據(jù)。

大致的過程如下,

? ① SYN:節(jié)點(diǎn)A向隨機(jī)選擇一些節(jié)點(diǎn),這里可以只選擇發(fā)送摘要,即不發(fā)送valus,避免消息過大

? ② ACK:節(jié)點(diǎn)B接收到消息后,會將其與本地的合并,這里合并采用的是對比版本,版本較大的說明數(shù)據(jù)較新. 比如節(jié)點(diǎn)A向節(jié)點(diǎn)B發(fā)送數(shù)據(jù)C(key,value,2),而節(jié)點(diǎn)B本機(jī)存儲的是C(key,value1,3),那么因?yàn)锽的版本比較新,合并之后的數(shù)據(jù)就是B本機(jī)存儲的數(shù)據(jù),然后會發(fā)回A節(jié)點(diǎn)。

? ③ ACK2:節(jié)點(diǎn)A接收到ACK消息,將其應(yīng)用到本機(jī)的數(shù)據(jù)中

A發(fā)GossipDigestSyn  => B執(zhí)行GossipDigestSynVerbHandler 
B發(fā)GossipDigestAck  => A執(zhí)行GossipDigestAckVerbHandler 
A發(fā)GossipDigestAck2 => B執(zhí)行GossipDigestAck2VerbHandler

這三個類都實(shí)現(xiàn)了IVerbHandler接口,注冊到MessagingService的處理器中:

MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());

這樣當(dāng)消息模塊接收到消息后就會調(diào)用對應(yīng)的Handler處理,如下面的代碼所示:

IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
        if (verbHandler == null)
        {
              //未知的消息不處理
            logger.trace("Unknown verb {}", verb);
            return;
        }

        try
        {
            verbHandler.doVerb(message, id);
        }
        catch (IOException ioe)
        {
            handleFailure(ioe);
            throw new RuntimeException(ioe);
        }
        catch (TombstoneOverwhelmingException | IndexNotAvailableException e)
        {
            handleFailure(e);
            logger.error(e.getMessage());
        }
        catch (Throwable t)
        {
            handleFailure(t);
            throw t;
        }
源碼解析 初始化

具體的初始化都是在org.apache.cassandra.service.StorageService#public synchronized void initServer() throws ConfigurationException()去做的,里面會調(diào)用prepareToJoin() 嘗試加入gossip集群。

private void prepareToJoin() throws ConfigurationException
    {
          //volatile修飾保證可見性,已經(jīng)加入了集群就直接跳過
        if (!joined)
        {
            /*....省略...*/
            if (!MessagingService.instance().isListening())
                  //開始監(jiān)聽消息
                MessagingService.instance().listen();
            
              //給本節(jié)點(diǎn)起個名字
            UUID localHostId = SystemKeyspace.getLocalHostId();
            
              /*
              *  一次shadow round會獲取所有到與之通訊節(jié)點(diǎn)擁有的所有節(jié)點(diǎn)的信息
              */
            if (replacing)
            {
                localHostId = prepareForReplacement();
                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));

                if (!DatabaseDescriptor.isAutoBootstrap())
                {
                    // Will not do replace procedure, persist the tokens we"re taking over locally
                    // so that they don"t get clobbered with auto generated ones in joinTokenRing
                    SystemKeyspace.updateTokens(bootstrapTokens);
                }
                else if (isReplacingSameAddress())
                {
                    //only go into hibernate state if replacing the same address (CASSANDRA-8523)
                    logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
                                "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
                                "repair must be run after the replacement process in order to make this node consistent.",
                                DatabaseDescriptor.getReplaceAddress());
                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
                }
            }
            else
            {
                checkForEndpointCollision(localHostId);
            }

            // have to start the gossip service before we can see any info on other nodes.  this is necessary
            // for bootstrap to get the load info it needs.
            // (we won"t be part of the storage ring though until we add a counterId to our state, below.)
            // Seed the host ID-to-endpoint map with our own ID.
            getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
            appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
            appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
            appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(FBUtilities.getBroadcastRpcAddress()));
            appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());

            // load the persisted ring state. This used to be done earlier in the init process,
            // but now we always perform a shadow round when preparing to join and we have to
            // clear endpoint states after doing that.
            loadRingState();

            logger.info("Starting up server gossip");
              //啟動gossip,比如定時任務(wù)等
            Gossiper.instance.register(this);
            Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
            gossipActive = true;
            // gossip snitch infos (local DC and rack)
            gossipSnitchInfo();
            // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
            Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual Schema UUID in preparation for updates
            LoadBroadcaster.instance.startBroadcasting();
            HintsService.instance.startDispatch();
            BatchlogManager.instance.start();
        }
    }


public synchronized Map doShadowRound()
    {
        buildSeedsList();
        // it may be that the local address is the only entry in the seed
        // list in which case, attempting a shadow round is pointless
        if (seeds.isEmpty())
            return endpointShadowStateMap;

        seedsInShadowRound.clear();
        endpointShadowStateMap.clear();
        // 構(gòu)造一個空的Syn消息,表明這是一次shadow round
        List gDigests = new ArrayList();
        GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                DatabaseDescriptor.getPartitionerName(),
                gDigests);
        MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                digestSynMessage,
                GossipDigestSyn.serializer);

        inShadowRound = true;
        int slept = 0;
        try
        {
            while (true)
            {    
                  /*
                  *  第一次以及后面每五秒都會嘗試向所有的種子節(jié)點(diǎn)發(fā)送一次shdow round syn消息,嘗試
                  *  獲取所有的節(jié)點(diǎn)的信息。如果達(dá)到了最大的延遲(默認(rèn)為30S)或者已經(jīng)達(dá)到了目的就會退出
                  */
                if (slept % 5000 == 0)
                { 
                    logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds);

                    for (InetAddress seed : seeds)
                        MessagingService.instance().sendOneWay(message, seed);
                }

                Thread.sleep(1000);
                if (!inShadowRound)
                    break;

                slept += 1000;
                if (slept > StorageService.RING_DELAY)
                {
                    // if we don"t consider ourself to be a seed, fail out
                    if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
                        throw new RuntimeException("Unable to gossip with any seeds");

                    logger.warn("Unable to gossip with any seeds but continuing since node is in its own seed list");
                    inShadowRound = false;
                    break;
                }
            }
        }
        catch (InterruptedException wtf)
        {
            throw new RuntimeException(wtf);
        }

        return ImmutableMap.copyOf(endpointShadowStateMap);
    }

Gossiper#start()中啟動一個定時任務(wù)GossipTask,默認(rèn)為每秒一次,發(fā)送SYN消息:

/*
* 線程池最好都指定名字,這樣方便查問題,另外最好指定好隊(duì)列大小,最好不要用Executors中
* 默認(rèn)的無界隊(duì)列,關(guān)閉的時候注意處理好中斷,很多人都是catch Exception后打個異常就算了,
* 這樣不是很好的處理方式,我個人通常是當(dāng)catch到InterruptedException后,根據(jù)業(yè)務(wù)場景決定是否*  * 需要通過interrupt方法重置中斷位,當(dāng)處理完這輪任務(wù)之后,決定是否退出
*/
private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks");

public void start(int generationNbr, Map preloadLocalStates)
    {
        buildSeedsList();
        /* initialize the heartbeat state for this localEndpoint */
        maybeInitializeLocalState(generationNbr);
        EndpointState localState = endpointStateMap.get(FBUtilities.getBroadcastAddress());
        localState.addApplicationStates(preloadLocalStates);

        //notify snitches that Gossiper is about to start
        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
        if (logger.isTraceEnabled())
            logger.trace("gossip started with generation {}", localState.getHeartBeatState().getGeneration());

        scheduledGossipTask = executor.scheduleWithFixedDelay(new GossipTask(),
                                                              Gossiper.intervalInMillis,
                                                              Gossiper.intervalInMillis,
                                                              TimeUnit.MILLISECONDS);
    }

那么GossipTask內(nèi)部的實(shí)現(xiàn)是怎樣的呢?

  private class GossipTask implements Runnable
    {
        public void run()
        {
            try
            {
                //等待MessagingService開始監(jiān)聽
                MessagingService.instance().waitUntilListening();
                //加鎖
                taskLock.lock();
 
              //更新心跳計數(shù)器,這個是用來做失敗檢測的,這里會有個定時任務(wù)輪詢這個Map,檢測最近一次的
              //心跳時間,如果距離當(dāng)前時間差距不合理,那么我們就可以認(rèn)為這個節(jié)點(diǎn)掛掉了,可以放到另外
              //隊(duì)列中,隨后隔一段時間再去看看是否恢復(fù)。
              
                      endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
                if (logger.isTraceEnabled())
                    logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().getHeartBeatVersion());
                final List gDigests = new ArrayList();
                //隨機(jī)選擇一些節(jié)點(diǎn),構(gòu)造摘要列表
                  Gossiper.instance.makeRandomGossipDigest(gDigests);

                if (gDigests.size() > 0)
                {
                      //構(gòu)造消息,可以看到這里的類型是GOSSIP_DIGEST_SYN
                    GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
                                                                           DatabaseDescriptor.getPartitionerName(),
                                                                           gDigests);
                    MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_SYN,
                                                                                          digestSynMessage,
                                                                                          GossipDigestSyn.serializer);
                      /*將消息發(fā)送給一個活著的節(jié)點(diǎn),隨機(jī)選擇的,代碼如下
                      *  int index = (size == 1) ? 0 : random.nextInt(size);
                    *  InetAddress to = liveEndpoints.get(index);
                    *  如果選擇到的是種子節(jié)點(diǎn),那么就會返回true.
                      */ 
                    boolean gossipedToSeed = doGossipToLiveMember(message);
                    //隨機(jī)決定是否向掛掉的節(jié)點(diǎn)發(fā)送gossip消息
                      maybeGossipToUnreachableMember(message);
                      /*
                      * 可參見這個issue:https://issues.apache.org/jira/browse/CASSANDRA-150
                      */
                    if (!gossipedToSeed || liveEndpoints.size() < seeds.size())
                        maybeGossipToSeed(message);
                         doStatusCheck();
                }
            }
            catch (Exception e)
            {
                JVMStabilityInspector.inspectThrowable(e);
                logger.error("Gossip error", e);
            }
            finally
            {
                taskLock.unlock();
            }
        }
    }
GossipDigestSynVerbHandler
public void doVerb(MessageIn message, int id)
    {
        InetAddress from = message.from;
        if (logger.isTraceEnabled())
            logger.trace("Received a GossipDigestSynMessage from {}", from);
        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled");
            return;
        }

        GossipDigestSyn gDigestMessage = message.payload;
        /* 不是同一個集群的就不處理 */
        if (!gDigestMessage.clusterId.equals(DatabaseDescriptor.getClusterName()))
        {
            logger.warn("ClusterName mismatch from {} {}!={}", from, gDigestMessage.clusterId, DatabaseDescriptor.getClusterName());
            return;
        }

        if (gDigestMessage.partioner != null && !gDigestMessage.partioner.equals(DatabaseDescriptor.getPartitionerName()))
        {
            logger.warn("Partitioner mismatch from {} {}!={}", from, gDigestMessage.partioner, DatabaseDescriptor.getPartitionerName());
            return;
        }

        List gDigestList = gDigestMessage.getGossipDigests();

        /*發(fā)送者和接受者都處于shadow round階段,那么就發(fā)送一個空的ack回去*/
        if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound())
        {
            // a genuine syn (as opposed to one from a node currently
            // doing a shadow round) will always contain > 0 digests
            if (gDigestList.size() > 0)
            {
                logger.debug("Ignoring non-empty GossipDigestSynMessage because currently in gossip shadow round");
                return;
            }

            logger.debug("Received a shadow round syn from {}. Gossip is disabled but " +
                         "currently also in shadow round, responding with a minimal ack", from);
            // new ArrayList<>默認(rèn)16的size,也會占用額外的內(nèi)存,
              // 可以考慮改成0或者使用Collections.EMPTY_LIST
              MessagingService.instance()
                            .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                         new GossipDigestAck(new ArrayList<>(), new HashMap<>()),
                                                         GossipDigestAck.serializer),
                                        from);
            return;
        }

        if (logger.isTraceEnabled())
        {
            StringBuilder sb = new StringBuilder();
            for (GossipDigest gDigest : gDigestList)
            {
                sb.append(gDigest);
                sb.append(" ");
            }
            logger.trace("Gossip syn digests are : {}", sb);
        }
        
          /*
          * 下面的工作其實(shí)就類似于git中的merge,如上文所說,版本大的說明他所持有的節(jié)點(diǎn)信息較新
          * 這里就是做一個diff,如果你的version比我本地的大,那么我就發(fā)一個請求,讓你把這個節(jié)點(diǎn)的
          * 信息發(fā)給我,如果我的version比你的大,那么說明我的信息更新一點(diǎn),就會告訴你,你的該更新了
          * 然后就會發(fā)一個GossipDigestAck消息回去。
          */
        doSort(gDigestList);

        List deltaGossipDigestList = new ArrayList();
        Map deltaEpStateMap = new HashMap();
        Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
        logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
        MessageOut gDigestAckMessage = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                        new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
                                                                                        GossipDigestAck.serializer);
        if (logger.isTraceEnabled())
            logger.trace("Sending a GossipDigestAckMessage to {}", from);
        MessagingService.instance().sendOneWay(gDigestAckMessage, from);
    }

核心的實(shí)現(xiàn):

void examineGossiper(List gDigestList, List deltaGossipDigestList, Map deltaEpStateMap)
    {
        if (gDigestList.size() == 0)
        {
          
           /* 
            * 如果是空的,表明這是一次shadow round,那么我們要把自己所有已知的節(jié)點(diǎn)信息發(fā)過去。
            */
            logger.debug("Shadow request received, adding all states");
            for (Map.Entry entry : endpointStateMap.entrySet())
            {
                gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
            }
        }
        for ( GossipDigest gDigest : gDigestList )
        {
            int remoteGeneration = gDigest.getGeneration();
            int maxRemoteVersion = gDigest.getMaxVersion();
            /* Get state associated with the end point in digest */
            EndpointState epStatePtr = endpointStateMap.get(gDigest.getEndpoint());
            /*
                Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
                then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
                request all the data for this endpoint.
            */
            if (epStatePtr != null)
            {
                int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
                /* get the max version of all keys in the state associated with this endpoint */
                int maxLocalVersion = getMaxEndpointStateVersion(epStatePtr);
                if (remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion)
                    continue;

                if (remoteGeneration > localGeneration)
                {
                    /* we request everything from the gossiper */
                    requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
                }
                else if (remoteGeneration < localGeneration)
                {
                    /* send all data with generation = localgeneration and version > 0 */
                    sendAll(gDigest, deltaEpStateMap, 0);
                }
                else if (remoteGeneration == localGeneration)
                {
                    /*
                        If the max remote version is greater then we request the remote endpoint send us all the data
                        for this endpoint with version greater than the max version number we have locally for this
                        endpoint.
                        If the max remote version is lesser, then we send all the data we have locally for this endpoint
                        with version greater than the max remote version.
                    */
                    if (maxRemoteVersion > maxLocalVersion)
                    {
                        deltaGossipDigestList.add(new GossipDigest(gDigest.getEndpoint(), remoteGeneration, maxLocalVersion));
                    }
                    else if (maxRemoteVersion < maxLocalVersion)
                    {
                        /* send all data with generation = localgeneration and version > maxRemoteVersion */
                        sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
                    }
                }
            }
            else
            {
                /* We are here since we have no data for this endpoint locally so request everything. */
                requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
            }
        }
    }
GossipDigestAckVerbHandler
public void doVerb(MessageIn message, int id)
    {
        InetAddress from = message.from;
        if (logger.isTraceEnabled())
            logger.trace("Received a GossipDigestAckMessage from {}", from);
        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled");
            return;
        }

        GossipDigestAck gDigestAckMessage = message.payload;
        List gDigestList = gDigestAckMessage.getGossipDigestList();
        Map epStateMap = gDigestAckMessage.getEndpointStateMap();
        logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());

        if (Gossiper.instance.isInShadowRound())
        {
            if (logger.isDebugEnabled())
                logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);

            // 如果是空的,說明他也在shdow round中,木有事,反正還會重試的
            Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
            return; 
        }

        if (epStateMap.size() > 0)
        {
            /*
            * 第一次發(fā)送SYN消息的時候會更新firstSynSendAt,如果ACK消息
            * 是在我們第一次SYN之前的,那么說明這個ACK已經(jīng)過期了,直接忽略。
            */
            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
            {
                if (logger.isTraceEnabled())
                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
                return;
            }

            /* 失敗檢測相關(guān)的,先不管 */
            Gossiper.instance.notifyFailureDetector(epStateMap);
              /*將遠(yuǎn)程收到的信息跟本地的merge,類似上面的操作*/
            Gossiper.instance.applyStateLocally(epStateMap);
        }

        /*
        * 構(gòu)造一個GossipDigestAck2Message消息,將對方需要的節(jié)點(diǎn)信息發(fā)給他
        */
        Map deltaEpStateMap = new HashMap();
        for (GossipDigest gDigest : gDigestList)
        {
            InetAddress addr = gDigest.getEndpoint();
            EndpointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
            if (localEpStatePtr != null)
                deltaEpStateMap.put(addr, localEpStatePtr);
        }

        MessageOut gDigestAck2Message = new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                           new GossipDigestAck2(deltaEpStateMap),
                                                                                           GossipDigestAck2.serializer);
        if (logger.isTraceEnabled())
            logger.trace("Sending a GossipDigestAck2Message to {}", from);
        MessagingService.instance().sendOneWay(gDigestAck2Message, from);
    }
GossipDigestAck2VerbHandler
    public void doVerb(MessageIn message, int id)
    {
        if (logger.isTraceEnabled())
        {
            InetAddress from = message.from;
            logger.trace("Received a GossipDigestAck2Message from {}", from);
        }
        if (!Gossiper.instance.isEnabled())
        {
            if (logger.isTraceEnabled())
                logger.trace("Ignoring GossipDigestAck2Message because gossip is disabled");
            return;
        }
        Map remoteEpStateMap = message.payload.getEndpointStateMap();
        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
          /*將收到的節(jié)點(diǎn)信息與本地的merge*/
        Gossiper.instance.applyStateLocally(remoteEpStateMap);
    }
總結(jié)

源碼上看結(jié)構(gòu)是非常清晰的,每一步的邏輯相對來講還是比較容易理解的,其實(shí)也就類似tcp三次握手:

①、A隨機(jī)找個人B,隨機(jī)告訴他一些我知道的信息(這里可以根據(jù)時間排序、根據(jù)版本打分等等,具體可以參照論文)

②、B收到以后,和自己本地對比下,比A新的發(fā)回給A,比A舊的讓通知A在下一步告訴我

③、A本地合并下,然后將B需要的信息告訴他

④、B本地合并下

⑤、完成了

參考資料

https://www.cs.cornell.edu/ho...

https://www.consul.io

https://www.serf.io/

https://en.wikipedia.org/wiki...

https://github.com/apache/cas...

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/67574.html

相關(guān)文章

  • Hyperledger Fabric(八卦數(shù)據(jù)傳播協(xié)議

    八卦數(shù)據(jù)傳播協(xié)議 Hyperledger Fabric優(yōu)化了區(qū)塊鏈網(wǎng)絡(luò)性能,安全性,通過跨交易執(zhí)行(認(rèn)可和提交)對等點(diǎn)和交易排序節(jié)點(diǎn)劃分工作負(fù)載來實(shí)現(xiàn)可伸縮性。這種網(wǎng)絡(luò)操作的分離需要安全,可靠且可擴(kuò)展的數(shù)據(jù)傳播協(xié)議以確保數(shù)據(jù)的完整性和一致性。為了滿足這些條件,F(xiàn)abric實(shí)現(xiàn)了八卦數(shù)據(jù)傳播協(xié)議。 八卦協(xié)議 對等點(diǎn)利用八卦以可擴(kuò)展的方式廣播分類帳和通道數(shù)據(jù),八卦消息是連續(xù)的,并且通道上的每個對等點(diǎn)不...

    Youngs 評論0 收藏0
  • Spring Cloud Consul 之Greenwich版本全攻略

    摘要:在我們的文檔中,我們使用來表明就選舉和事務(wù)的順序達(dá)成一致。提供成員關(guān)系,故障檢測和事件廣播。這是一個允許請求的請求響應(yīng)機(jī)制。這包括服務(wù)發(fā)現(xiàn),還包括豐富的運(yùn)行狀況檢查,鎖定,鍵值,多數(shù)據(jù)中心聯(lián)合,事件系統(tǒng)和。 轉(zhuǎn)載請標(biāo)明出處: http://blog.csdn.net/forezp/a...本文出自方志朋的博客 什么是Consul Consul是HashiCorp公司推出的開源軟件,使...

    qingshanli1988 評論0 收藏0
  • 區(qū)塊鏈中的P2P

    摘要:為什么區(qū)塊鏈會選擇作為網(wǎng)絡(luò)基礎(chǔ)上面介紹的時候說過,他是無中心服務(wù)器的,中心服務(wù)器就意味著,當(dāng)受到攻擊的時候,中心服務(wù)器一旦宕機(jī),整個網(wǎng)絡(luò)和服務(wù)就會出現(xiàn)問題。區(qū)塊鏈的核心是去中心化,這和網(wǎng)絡(luò)的觀念不約而同,所以選擇的理由也就很充分。 區(qū)塊鏈中P2P介紹 p2p是什么 為什么區(qū)塊鏈需要P2P 比特幣、以太坊、超級賬本和EOS的P2P對比 P2P是什么 P2P作為區(qū)塊鏈網(wǎng)絡(luò)中去中心化...

    jkyin 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<