成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Spring Cloud Ribbon負載均衡器

y1chuan / 3623人閱讀

摘要:代碼如下定義了用來存儲負載均衡器各服務(wù)實例屬性和統(tǒng)計信息的對象。下面看一下負載均衡器增加了哪些內(nèi)容。

客戶端負載均衡Spring Cloud Ribbon

?Spring Cloud Ribbon是一個基于HTTP和TCP的客戶端負載均衡工具,基于Netflix Ribbon實現(xiàn)。

目錄

客戶端負載均衡

源碼分析

負載均衡器(本文重點)

負載均衡策略

配置詳解

自動化配置

客戶端負載均衡&源碼分析

?請在上一篇文章的基礎(chǔ)上進行下面的學習,點擊這里閱讀上一篇

負載均衡器

?下面我們看一下具體的的負載均衡器,也就是ILoadBalancer接口的實現(xiàn)類。

AbstractLoadBalancer

?該類是ILoadBalancer接口的抽象實現(xiàn)類。

?在該抽象實現(xiàn)類中含有一個關(guān)于服務(wù)實例的分組枚舉類,該枚舉類主要有以下三種類型:

ALL:所有服務(wù)實例

STATUS_UP:正常服務(wù)的實例

STATUS_NOT_UP:停止服務(wù)的實例

?該抽象類下面的的函數(shù)有以下幾個:

chooseServer():該函數(shù)通過調(diào)用接口中的chooseServer(Object key)實現(xiàn),其中參數(shù)key為null,表示在選擇具體服務(wù)實例時忽略key的條件判斷

List getServerList(ServerGroup serverGroup):定義了根據(jù)分組類型來獲取不同的服務(wù)實例的列表

LoadBalancerStats getLoadBalancerStats():定義了獲取LoadBalancerStats對象的方法,LoadBalancerStats對象被用來存儲負載均衡器中各個服務(wù)實例當前的屬性和統(tǒng)計信息。這些信息可以用來觀察負載均衡器的運行情況,同時也是用來制定負載均衡策略的重要依據(jù)。

BaseLoadBalancer

?該類是Ribbon負載均衡器的基礎(chǔ)實現(xiàn)類,在該類中定義了很多關(guān)于負載均衡相關(guān)的基礎(chǔ)內(nèi)容。

?該類中定義并維護了兩個存儲服務(wù)實例Server對象的列表。一個用于存儲所有服務(wù)實例的清單,一個用于存儲正常服務(wù)的實例清單。代碼如下:

    @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List allServerList = Collections.synchronizedList(new ArrayList());
    @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
    protected volatile List upServerList = Collections.synchronizedList(new ArrayList());

?定義了用來存儲負載均衡器各服務(wù)實例屬性和統(tǒng)計信息的LoadBalancerStats對象。

?定義了檢查服務(wù)實例是否正常的IPing對象,在BaseLoadBalancer中默認為null,需要在構(gòu)造時注入它的實現(xiàn)。

?定義了檢查服務(wù)實例操作的執(zhí)行策略對象IPingStrategy,在BaseLoadBalancerz中默認使用了該類中定義的靜態(tài)內(nèi)部類SerialPingStrategy。根據(jù)源碼,可以看到該策略采用線性遍歷ping服務(wù)實例的方式實現(xiàn)檢查。但是該策略在當IPing的實現(xiàn)速度不理想或者Server列表過大時,可能會影響系統(tǒng)性能。這時就需要自己去實現(xiàn)自己的IPing策略。

?定義了負載均衡的處理規(guī)則IRule對象,從BaseLoadBalancer中chooseServer(Object key)方法源碼中也可以看出它是將服務(wù)實例選擇的任務(wù)交給了IRule中的Server choose(Object key)方法。默認的IRule實現(xiàn)是RoundRobinRule。

?啟動Ping任務(wù),在BaseLoadBalancer的默認構(gòu)造函數(shù)中,會直接啟動一個用于定時檢查Server是否健康的任務(wù)。該任務(wù)默認執(zhí)行的時間間隔為10s。

?實現(xiàn)了ILoadBalancer接口定義的負載均衡器應(yīng)該具備以下操作:

addServers(List newServers):向負載均衡器中增加新的服務(wù)實例列表。該實現(xiàn)將原本已經(jīng)維護的所有服務(wù)實例清單allServerList和新傳入的服務(wù)實例清單newServers都加入到newList中,然后再調(diào)用setServersList(List lsrv)方法對newList進行處理。在BaseLoadBalancer中的默認實現(xiàn)會用新的列表覆蓋舊的列表。后面幾個擴展實現(xiàn)類對于服務(wù)實例清單的更新的優(yōu)化都是通過對setServersList(List lsrv)重寫來實現(xiàn)的。

Server chooseServer(Object key):挑選一個具體的服務(wù)實例,上面介紹IRule的時候已經(jīng)說過,不再重說。

markServerDown(Server server):用來標記某個服務(wù)實例暫停服務(wù)

List getReachableServers():獲取可用的服務(wù)實例列表

List getAllServers():獲取所有的服務(wù)實例列表

DynamicServerListLoadBalancer

?DynamicServerListLoadBalancer該類繼承于BaseLoadBalancer類,它是對基礎(chǔ)負載均衡器的擴展。

?在該負載均衡器,實現(xiàn)了服務(wù)實例清單在運行期的動態(tài)更新能力;同時,它還具備了對服務(wù)實例清單的過濾功能,我們可以通過過濾器來選擇性的獲取一批服務(wù)實例清單。

?下面看一下負載均衡器增加了哪些內(nèi)容。

ServerList

?通過查看源碼,發(fā)現(xiàn)增加了一個關(guān)于服務(wù)列表的操作對象ServerList serverListImpl,其中T是一個Server的子類,即代表了一個具體的服務(wù)實例的擴展類。其中ServerList的定義如下:

public interface ServerList {

    public List getInitialListOfServers();
    
    public List getUpdatedListOfServers();   

}

?該抽象接口定義了兩個抽象方法,如下:

List getInitialListOfServers():用于獲取初始化的服務(wù)實例清單

List getUpdatedListOfServers():用于獲取更新的服務(wù)實例清單

?該抽象接口的實現(xiàn)類有很多,因為該負載均衡器中需要實現(xiàn)服務(wù)實例的動態(tài)更新,那么就需要Ribbon具備訪問Eureka服務(wù)注冊中心獲取服務(wù)實例的能力,在DynamicServerListLoadBalancer默認的ServerList是DomainExtractingServerList(默認的實現(xiàn)是在EurekaRibbonClientConfiguration),源碼如下:

package org.springframework.cloud.netflix.ribbon.eureka;

@Configuration
public class EurekaRibbonClientConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public ServerList ribbonServerList(IClientConfig config, Provider eurekaClientProvider) {
        if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
            return this.propertiesFactory.get(ServerList.class, config, serviceId);
        }
        DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
                config, eurekaClientProvider);
        DomainExtractingServerList serverList = new DomainExtractingServerList(
                discoveryServerList, config, this.approximateZoneFromHostname);
        return serverList;
    }

}

?查看DomainExtractingServerList的源碼可以看出,該類中有一個ServerList list,通過查看DomainExtractingServerList的構(gòu)造函數(shù),DomainExtractingServerList中的ServerList對象就是從上面的代碼中傳過來的DiscoveryEnabledNIWSServerList,源碼如下:

package org.springframework.cloud.netflix.ribbon.eureka;

public class DomainExtractingServerList implements ServerList {

    private ServerList list;
    private final RibbonProperties ribbon;

    private boolean approximateZoneFromHostname;

    public DomainExtractingServerList(ServerList list,
            IClientConfig clientConfig, boolean approximateZoneFromHostname) {
        this.list = list;
        this.ribbon = RibbonProperties.from(clientConfig);
        this.approximateZoneFromHostname = approximateZoneFromHostname;
    }

    @Override
    public List getInitialListOfServers() {
        List servers = setZones(this.list
                .getInitialListOfServers());
        return servers;
    }

    @Override
    public List getUpdatedListOfServers() {
        List servers = setZones(this.list
                .getUpdatedListOfServers());
        return servers;
    }

}

?同時,通過上面的源碼還可以看出,getInitialListOfServers()和getUpdatedListOfServers()方法的實現(xiàn)其實交給DiscoveryEnabledNIWSServerList來實現(xiàn)的,下面看一下DiscoveryEnabledNIWSServerList中這兩個方法的實現(xiàn)

package com.netflix.niws.loadbalancer;

public class DiscoveryEnabledNIWSServerList extends AbstractServerList{

    private static final Logger logger = LoggerFactory.getLogger(DiscoveryEnabledNIWSServerList.class);

    String clientName;
    String vipAddresses;
    boolean isSecure = false;

    boolean prioritizeVipAddressBasedServers = true;

    String datacenter;
    String targetRegion;

    int overridePort = DefaultClientConfigImpl.DEFAULT_PORT;
    boolean shouldUseOverridePort = false;
    boolean shouldUseIpAddr = false;

    private final Provider eurekaClientProvider;

    @Override
    public List getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }

    @Override
    public List getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

    private List obtainServersViaDiscovery() {
        List serverList = new ArrayList();

        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList();
        }

        EurekaClient eurekaClient = eurekaClientProvider.get();
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            // copy is necessary since the InstanceInfo builder just uses the original reference,
                            // and we don"t want to corrupt the global eureka copy of the object which may be
                            // used by other clients in our system
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
                }
            }
        }
        return serverList;
    }

}

?上述代碼的主要邏輯是借助EurekaClient從服務(wù)注冊中心獲取到具體的服務(wù)實例(InstanceInfo)列表,首頁獲取到EurekaClient,然后更具邏輯服務(wù)名(vipAddress),獲取服務(wù)實例,將服務(wù)實例狀態(tài)為UP(正常服務(wù))的實例轉(zhuǎn)換為DiscoveryEnabledServer對象,最終放在一個列表里返回。

?在獲取到ServerList之后,DomainExtractingServerList會調(diào)用自身的setZones方法,源碼如下:

    private List setZones(List servers) {
        List result = new ArrayList<>();
        boolean isSecure = this.ribbon.isSecure(true);
        boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
        for (DiscoveryEnabledServer server : servers) {
            result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
                    this.approximateZoneFromHostname));
        }
        return result;
    }

?通過源碼可以看出,該方法的主要作用是將DiscoveryEnabledNIWSServerList返回的List列表中的元素,轉(zhuǎn)換成DiscoveryEnabledServer的子類對象DomainExtractingServer,在該類對象的構(gòu)造函數(shù)中將為服務(wù)實例對象設(shè)置一些必要的屬性,如id,zone,isAliveFlag,readToServer等。

ServerListUpdate

?在DynamicServerListLoadBalancer類中有如下一段代碼,ServerListUpdater對象的實現(xiàn)就是對ServerList的更新

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };

?下面看一下ServerListUpdater接口,該類內(nèi)部還定義了一個UpdateAction接口,下面看一下源碼:

package com.netflix.loadbalancer;

public interface ServerListUpdater {
    
    public interface UpdateAction {
        void doUpdate();
    }

    void start(UpdateAction updateAction);

    void stop();

    String getLastUpdate();

    long getDurationSinceLastUpdateMs();
    
    int getNumberMissedCycles();

    int getCoreThreads();
}

?下面是該接口方法的介紹

void doUpdate():該方法的實現(xiàn)內(nèi)容就是對ServerList的具體更新操作

void start(UpdateAction updateAction):啟動更新服務(wù)器,傳入的UpdateAction對象為更新操作的具體實現(xiàn)

void stop():停止更新服務(wù)器

String getLastUpdate():獲取最近的更新時間戳

long getDurationSinceLastUpdateMs():獲取上一次更新到現(xiàn)在的時間間隔,單位ms

int getNumberMissedCycles():獲取錯過的更新周期數(shù)

int getCoreThreads():獲取核心線程數(shù)

?下面看一下ServerListUpdater的具體實現(xiàn)類

PollingServerListUpdater:動態(tài)服務(wù)列表更新的默認策略,DynamicServerListLoadBalancer負載均衡器中的默認實現(xiàn)就是該類,它通過定時任務(wù)的方式進行服務(wù)列表的更新。

EurekaNotificationServerListUpdater:該更新器可以用于DynamicServerListLoadBalancer負載均衡器,但是它的觸發(fā)機制與PollingServerListUpdater不同,它需要利用Eureka的事件監(jiān)聽器來驅(qū)動服務(wù)列表的更新操作。

?下面看一下PollingServerListUpdater的實現(xiàn),我們從start函數(shù)看起

    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }

?通過上述代碼可以看出大致邏輯,創(chuàng)建了一個Runnable線程任務(wù),在線程中調(diào)用了UpdateAction的doUpdate()方法,最后再啟動定時任務(wù),initialDelayMs默認值1000ms,refreshIntervalMs默認值是30*1000ms,也就是說更新服務(wù)實例在初始化之后延遲1s后開始執(zhí)行,并以30s為周期重復(fù)執(zhí)行。

ServerListFilter

?下面我們回顧一下UpdateAction中doUpdate()方法的具體實現(xiàn),源碼如下:

    public void updateListOfServers() {
        List servers = new ArrayList();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

?在上述源碼可以看出,首先是調(diào)用了ServerList的getUpdatedListOfServers方法,這是用來從Eureka Server獲取正常的服務(wù)實例列表。在獲取完服務(wù)實例列表以后,我們會調(diào)用filter.getFilteredListOfServers(servers),此處的filter就是我們所要找的ServerListFilter。

?ServerListFilter接口非常簡單,僅僅有一個List getFilteredListOfServers(List servers)方法,用于實現(xiàn)對服務(wù)列表的過濾,下面看一下它的主要實現(xiàn)類:

?在上面的圖中,ZonePreferenceServerListFilter的實現(xiàn)是Spring Cloud Ribbon中對Netflix Ribbon的擴展實現(xiàn),其他都是Netflix Ribbon中的原生實現(xiàn)類。下面我們這些類的特點。

AbstractServerListFilter
package com.netflix.loadbalancer;

public abstract class AbstractServerListFilter implements ServerListFilter {

    private volatile LoadBalancerStats stats;
    
    public void setLoadBalancerStats(LoadBalancerStats stats) {
        this.stats = stats;
    }
    
    public LoadBalancerStats getLoadBalancerStats() {
        return stats;
    }

}

?該類是一個抽象過濾器,在這里定義了過濾時需要的一個重要依據(jù)對象LoadBalancerStats,該對象存儲了關(guān)于負載均衡器的一些屬性和統(tǒng)計信息等。

ZoneAffinityServerListFilter

?該過濾器基于區(qū)域感知(Zone Affinity)的方式實現(xiàn)服務(wù)實例的過濾,它會根據(jù)提供服務(wù)的實例所處的區(qū)域(Zone)與消費者自身所處區(qū)域(Zone)進行比較,過濾掉那些不是同處一個區(qū)域的實例。

    public List getFilteredListOfServers(List servers) {
        if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
            List filteredServers = Lists.newArrayList(Iterables.filter(
                    servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
            if (shouldEnableZoneAffinity(filteredServers)) {
                return filteredServers;
            } else if (zoneAffinity) {
                overrideCounter.increment();
            }
        }
        return servers;
    }

?從上面的源碼可以看出,對于服務(wù)實例列表的過濾是通過Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())來實現(xiàn)的,其中判斷依據(jù)由ZoneAffinityPredicate實現(xiàn)服務(wù)實例與消費者的Zone比較。

?在比較過后,并不是立即返回過濾之后的ServerList。而是通過shouldEnableZoneAffinity方法來判斷是否要啟用區(qū)域感知的功能。下面看一下shouldEnableZoneAffinity的實現(xiàn):

    private boolean shouldEnableZoneAffinity(List filtered) {    
        if (!zoneAffinity && !zoneExclusive) {
            return false;
        }
        if (zoneExclusive) {
            return true;
        }
        LoadBalancerStats stats = getLoadBalancerStats();
        if (stats == null) {
            return zoneAffinity;
        } else {
            logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
            ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
            double loadPerServer = snapshot.getLoadPerServer();
            int instanceCount = snapshot.getInstanceCount();            
            int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
            if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() 
                    || loadPerServer >= activeReqeustsPerServerThreshold.get()
                    || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
                logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", 
                        new Object[] {(double) circuitBreakerTrippedCount / instanceCount,  loadPerServer, instanceCount - circuitBreakerTrippedCount});
                return false;
            } else {
                return true;
            }
            
        }
    }

?通過查看源碼可以看出,它調(diào)用了LoadBalancerStats的getZoneSnapshot方法來獲取這些過濾后的同區(qū)域?qū)嵗幕A(chǔ)指標(包含實例數(shù)量、斷路由器斷開數(shù)、活動請求數(shù)、實例平均負載等),然后根據(jù)一系列的算法求出下面的幾個評價值并與設(shè)置的閥值進行對比,如果有一個條件符合,就不啟用區(qū)域感知過濾的服務(wù)實例清單。

?上述算法實現(xiàn)為集群出現(xiàn)區(qū)域故障時,依然可以依靠其他區(qū)域的實例進行正常服務(wù)提供了完善的高可用保障。

blackOutServerPercentage:故障實例百分比(斷路由器斷開數(shù)/實例數(shù)量)>=0.8

activeReqeustsPerServer:實例平均負載>=0.6

availableServers:可用實例數(shù)量(實例數(shù)量-斷路器斷開數(shù))<2

DefaultNIWSServerListFilter

?該過濾器完全繼承自ZoneAffinityServerListFilter,是默認的NIWS(Netflix Internal Web Service)過濾器。

ServerListSubsetFilter

?該過濾器繼承自ZoneAffinityServerListFilter,適合擁有大規(guī)模服務(wù)集群(上百或更多)的系統(tǒng)。該過濾器可以產(chǎn)生一個區(qū)域感知結(jié)果的子集列表,同時還能夠通過比較服務(wù)實例的通信失敗數(shù)量和并發(fā)連接數(shù)來判定該服務(wù)是否健康來選擇性地從服務(wù)實例列表中剔除那些相對不夠健康的實例。該過濾器的實現(xiàn)主要有以下三步:

1.獲取區(qū)域感知的過濾結(jié)果,作為候選的服務(wù)實例清單。

2.從當前消費者維護的服務(wù)實例子集中剔除那些相對不夠健康的實例(同時將這些實例從候選清單中剔除,防止第三步的時候又被選入),不健康的標準如下:

?a. 服務(wù)實例的并發(fā)連接數(shù)超過客戶端配置的值,默認為0,配置參數(shù)為..ServerListSubsetFilter.eliminationConnectionThresold

?b. 服務(wù)實例的失敗數(shù)超過客戶端配置的值,默認為0,配置參數(shù)為..ServerListSubsetFilter.eliminationFailureThresold。

?c. 如果按符合上面任一規(guī)則的服務(wù)實例剔除后,剔除比例小于客戶端默認配置的百分比,默認為10%,配置參數(shù)為..ServerListSubsetFilter.forceEliminatePercent,那么就先對剩下的實例列表進行健康排序,再從最不健康的實例進行剔除,直到達到配置的剔除百分比。

3.在完成剔除后,清單已經(jīng)少了至少10%的服務(wù)實例,最后通過隨機的方式從候選清單中選出一批實例加入到清單中,以保持服務(wù)實例子集與原來的數(shù)量一致,默認的實例自己數(shù)量為20,配置參數(shù)為..ServerListSubsetFilter.size。

ZonePreferenceServerListFilter

?Spring Cloud整合時新增的過濾器。若使用Spring Cloud整合Eureka和Ribbon時會默認使用該過濾器。它實現(xiàn)了通過配置或者Eureka實例元數(shù)據(jù)的所屬區(qū)域(Zone)來過濾出同區(qū)域的服務(wù)實例。下面看一下源碼:

    @Override
    public List getFilteredListOfServers(List servers) {
        List output = super.getFilteredListOfServers(servers);
        if (this.zone != null && output.size() == servers.size()) {
            List local = new ArrayList<>();
            for (Server server : output) {
                if (this.zone.equalsIgnoreCase(server.getZone())) {
                    local.add(server);
                }
            }
            if (!local.isEmpty()) {
                return local;
            }
        }
        return output;
    }

?通過源碼分析可以得出以下幾個步驟:

首先通過父類的ZoneAffinityServerListFilter過濾器來獲得區(qū)域感知的服務(wù)實例列表

遍歷獲取的服務(wù)實例列表,取出根據(jù)消費者配置預(yù)設(shè)的區(qū)域Zone來進行過濾

過濾的結(jié)果如果是空直接返回區(qū)域感知的服務(wù)實例列表,如果不為空則返回過濾后的結(jié)果

ZoneAwareLoadBalancer

?ZoneAwareLoadBalancer負載均衡器是對DynamicServerListLoadBalancer的擴展。

?在DynamicServerListLoadBalancer中,并沒有對chooseServer函數(shù)進行重寫,因此會采用BaseLoadBalancer中chooseServer,使用RoundRobinRule規(guī)則,以線性輪詢的方式來選擇調(diào)用的服務(wù)實例,該算法實現(xiàn)簡單并沒有區(qū)域(Zone)的概念,所以會把所有實例視為一個Zone下的節(jié)點看待,這樣就會周期性的產(chǎn)生跨區(qū)域(Zone)訪問的情況,由于跨區(qū)域會產(chǎn)生更高的延遲,這些跨區(qū)域的實例主要以用來防止區(qū)域性故障實現(xiàn)高可用為目的,不能作為常規(guī)的訪問實例。

?ZoneAwareLoadBalancer可以有效的避免DynamicServerListLoadBalancer的問題。下面我們來看一下是如何避免這個問題的。

?首先,在ZoneAwareLoadBalancer中并沒有重寫setServerList,說明實現(xiàn)服務(wù)實例清單的更新主邏輯沒有修改。但是ZoneAwareLoadBalancer中重寫了setServerListForZones(Map> zoneServersMap)函數(shù)。

?下面我們先看一下DynamicServerListLoadBalancer中setServerListForZones中的實現(xiàn):

    @Override
    public void setServersList(List lsrv) {
        super.setServersList(lsrv);
        List serverList = (List) lsrv;
        Map> serversInZones = new HashMap>();
        for (Server server : serverList) {
            // make sure ServerStats is created to avoid creating them on hot
            // path
            getLoadBalancerStats().getSingleServerStat(server);
            String zone = server.getZone();
            if (zone != null) {
                zone = zone.toLowerCase();
                List servers = serversInZones.get(zone);
                if (servers == null) {
                    servers = new ArrayList();
                    serversInZones.put(zone, servers);
                }
                servers.add(server);
            }
        }
        setServerListForZones(serversInZones);
    }

    protected void setServerListForZones(
            Map> zoneServersMap) {
        LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
        getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
    }

?通過分析源碼可以看出,setServerListForZones的調(diào)用位于更新服務(wù)實例清單setServersList函數(shù)的最后,在setServerListForZones的實現(xiàn)中,首先獲取了LoadBalancerStats對象,然后調(diào)用其updateZoneServerMapping方法,下面我們看一下該方法的具體實現(xiàn):

    private ZoneStats getZoneStats(String zone) {
        zone = zone.toLowerCase();
        ZoneStats zs = zoneStatsMap.get(zone);
        if (zs == null){
            zoneStatsMap.put(zone, new ZoneStats(this.getName(), zone, this));
            zs = zoneStatsMap.get(zone);
        }
        return zs;
    }

    public void updateZoneServerMapping(Map> map) {
        upServerListZoneMap = new ConcurrentHashMap>(map);
        // make sure ZoneStats object exist for available zones for monitoring purpose
        for (String zone: map.keySet()) {
            getZoneStats(zone);
        }
    }

?通過上述源碼可以看出,setServerListForZones方法的主要作用是根據(jù)按區(qū)域(Zone)分組的實例列表,為負載均衡器中的LoadBalancerStats對象創(chuàng)建ZoneStats并放入Map zoneStatsMap集合中,每一個區(qū)域?qū)?yīng)一個ZoneStats,它用于存儲每個Zone的一些狀態(tài)和統(tǒng)計信息。

?下面我們看一下ZoneAwareLoadBalancer負載均衡器中setServerListForZones方法的實現(xiàn):

    @Override
    protected void setServerListForZones(Map> zoneServersMap) {
        super.setServerListForZones(zoneServersMap);
        if (balancers == null) {
            balancers = new ConcurrentHashMap();
        }
        for (Map.Entry> entry: zoneServersMap.entrySet()) {
            String zone = entry.getKey().toLowerCase();
            getLoadBalancer(zone).setServersList(entry.getValue());
        }
        // check if there is any zone that no longer has a server
        // and set the list to empty so that the zone related metrics does not
        // contain stale data
        for (Map.Entry existingLBEntry: balancers.entrySet()) {
            if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
                existingLBEntry.getValue().setServersList(Collections.emptyList());
            }
        }
    }

?首先創(chuàng)建了一個ConcurrentHashMap類型的balancers對象,它將用來存儲每個Zone區(qū)域?qū)?yīng)的負載均衡器。具體的負載均衡器的創(chuàng)建則是在下面的第一個循環(huán)中調(diào)用getLoadBalancer方法來完成,在創(chuàng)建負載均衡器的時候同時會創(chuàng)建它的規(guī)則(如果當前實現(xiàn)中沒有IRule,就創(chuàng)建一個AvailabilityFilteringRule規(guī)則,如果已經(jīng)有實例,則克隆一個)。

?在創(chuàng)建完負載均衡器之后馬上調(diào)用setServersList方法為其設(shè)置對應(yīng)Zone區(qū)域的實例清單。

?第二個循環(huán)是對Zone區(qū)域中實例清單的檢查,看看是否有Zone區(qū)域下已經(jīng)沒有實例了,是的話就將balancers中對應(yīng)Zone區(qū)域的實例列表清空,該操作的作用是為了后續(xù)選擇節(jié)點時,防止過時的Zone區(qū)域統(tǒng)計信息干擾具體實例的選擇算法。

?下面我們再看一下負載均衡器是如何挑選服務(wù)實例,來實現(xiàn)對區(qū)域的識別的:

    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
        Server server = null;
        try {
            LoadBalancerStats lbStats = getLoadBalancerStats();
            Map zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
            logger.debug("Zone snapshots: {}", zoneSnapshot);
            if (triggeringLoad == null) {
                triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
            }

            if (triggeringBlackoutPercentage == null) {
                triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                        "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
            }
            Set availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
            logger.debug("Available zones: {}", availableZones);
            if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
                String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
                logger.debug("Zone chosen: {}", zone);
                if (zone != null) {
                    BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                    server = zoneLoadBalancer.chooseServer(key);
                }
            }
        } catch (Exception e) {
            logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
        }
        if (server != null) {
            return server;
        } else {
            logger.debug("Zone avoidance logic is not invoked.");
            return super.chooseServer(key);
        }
    }

?通過源碼可以看出,只有當負載均衡器中維護的實例所屬的Zone區(qū)域的個數(shù)大于1的時候才會執(zhí)行這里的選擇策略,否則還是將使用父類的實現(xiàn)。當Zone區(qū)域的個數(shù)大于1的時候,它的實現(xiàn)步驟如下:

1.調(diào)用ZoneAvoidanceRule中的靜態(tài)方法createSnapshot(lbStats),為當前負載均衡器中所有的Zone區(qū)域分別創(chuàng)建快照,保存在在Map zoneSnapshot中,這些快照中的數(shù)據(jù)將用于后續(xù)的算法。

2.調(diào)用ZoneAvoidanceRule中的靜態(tài)方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),來獲取可用的Zone區(qū)域集合,在該函數(shù)中會通過Zone區(qū)域快照中的統(tǒng)計數(shù)據(jù)來實現(xiàn)可用區(qū)的挑選

?a.首先會剔除符合這些規(guī)則的Zone區(qū)域:所屬實例數(shù)為0的Zone區(qū)域;Zone區(qū)域內(nèi)實例的平均負載小于0,或者實例故障率(斷路由器斷開次數(shù)/實例數(shù))大于等于閥值(默認值為0.99999)

?b.然后根據(jù)Zone區(qū)域的實例平均負載計算出最差的Zone區(qū)域,這里的最差指的是實例平均負載最高的Zone區(qū)域

?c.如果在上面的過程中沒有符合剔除要求的區(qū)域,同時實例最大平均負載小于閥值(默認20%),就直接返回所有Zone區(qū)域為可用區(qū)域。否則,從最壞Zone區(qū)域集合中隨機選擇一個,將它從可用Zone區(qū)域集合中剔除。

3.當獲得的可用Zone區(qū)域集合不為空,并且個數(shù)小于Zone區(qū)域總數(shù),就隨機選擇一個Zone區(qū)域

4.在確定了某個Zone區(qū)域后,則獲取了對應(yīng)Zone區(qū)域的負載均衡器,并調(diào)用chooseServer來選擇具體的服務(wù)實例,而在chooseServer中將使用IRule接口的choose方法來選擇具體的服務(wù)實例。在這里,IRule接口的實現(xiàn)會采用ZoneAvoidanceRule來挑選具體的服務(wù)實例。

后續(xù)

?后面會介紹負載均衡策略的源碼分析,請繼續(xù)關(guān)注?。?!

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76735.html

相關(guān)文章

  • Spring Cloud實戰(zhàn)(三)-Spring Cloud Netflix Ribbon

    摘要:概要什么是實戰(zhàn)整合實現(xiàn)負載均衡是什么是一個客戶端負載均衡的組件什么是負載均衡負載均衡就是分發(fā)請求流量到不同的服務(wù)器目前的實現(xiàn)有軟件和硬件負載均衡分為兩種服務(wù)器端負載均衡如上圖所示服務(wù)器端負載均衡是對客戶透明的用戶請求到服務(wù)器真正的服務(wù)器是由 概要 什么是Spring Cloud Netflix Ribbon? 實戰(zhàn):整合Ribbon實現(xiàn)負載均衡 Spring Cloud Netfl...

    wangbinke 評論0 收藏0
  • Spring Cloud 參考文檔(客戶端負載衡器Ribbon

    摘要:客戶端負載均衡器是一個客戶端負載均衡器,可以讓你對和客戶端的行為進行大量控制,已經(jīng)使用了,因此,如果你使用,此部分也適用。 客戶端負載均衡器:Ribbon Ribbon是一個客戶端負載均衡器,可以讓你對HTTP和TCP客戶端的行為進行大量控制,F(xiàn)eign已經(jīng)使用了Ribbon,因此,如果你使用@FeignClient,此部分也適用。 Ribbon中的一個核心概念是命名客戶端,每個負載均...

    Songlcy 評論0 收藏0
  • SpringCloud(第 006 篇)電影微服務(wù),使用 Ribbon 在客戶端進行負載均衡

    摘要:第篇電影微服務(wù),使用在客戶端進行負載均衡一大致介紹是發(fā)布的云中間層服務(wù)開源項目,主要功能是提供客戶端負載均衡算法。而被注解后,能過用負載均衡,主要是維護了一個被注解的列表,并給列表中的添加攔截器,進而交給負載均衡器去處理。 SpringCloud(第 006 篇)電影微服務(wù),使用 Ribbon 在客戶端進行負載均衡 - 一、大致介紹 1、Ribbon 是 Netflix 發(fā)布的云中間層...

    nodejh 評論0 收藏0
  • 一起學習使用Spring Cloud Netflix之Ribbon

    摘要:本例中介紹如何使用來完成服務(wù)調(diào)用并實現(xiàn)負載均衡。即,對于注冊中心而言,生產(chǎn)者和調(diào)用者都是端。文件配置如下在文件中,我們將應(yīng)用命名為,端口為,表示注冊中心地址。 前言 Ribbon是Spring Cloud體系中完成負載均衡的重要組件。Spring Cloud體系中有兩種完成服務(wù)調(diào)用的組件,一種是Ribbon+RestTemplate,另一種Feign。Feign默認使用的也是Ribbo...

    nidaye 評論0 收藏0
  • Spring Cloud Ribbon

    摘要:客戶端負載均衡需要客戶端自己維護自己要訪問的服務(wù)實例清單,這些服務(wù)清單來源于注冊中心在使用進行服務(wù)治理時。使用從負載均衡器中挑選出的服務(wù)實例來執(zhí)行請求內(nèi)容。 客戶端負載均衡Spring Cloud Ribbon ?Spring Cloud Ribbon是一個基于HTTP和TCP的客戶端負載均衡工具,基于Netflix Ribbon實現(xiàn)。 目錄 客戶端負載均衡(本文重點) 源碼分析(本...

    fasss 評論0 收藏0
  • SpringCloud(第 008 篇)電影微服務(wù),使用配置文件配置 Ribbon 在客戶端進行負載

    摘要:第篇電影微服務(wù),使用配置文件配置在客戶端進行負載均衡調(diào)度算法一大致介紹通過配置來設(shè)置客戶端進行負載均衡的調(diào)度算法通過兩種代碼調(diào)用方式來測試客戶端負載均衡算法二實現(xiàn)步驟添加引用包模塊客戶端發(fā)現(xiàn)模塊 SpringCloud(第 008 篇)電影微服務(wù),使用 application.yml 配置文件配置 Ribbon 在客戶端進行負載均衡調(diào)度算法 - 一、大致介紹 1、通過 applicat...

    wangjuntytl 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<