成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

dubbo源碼解析——cluster

seal_de / 2175人閱讀

摘要:簡單來說就是應對出錯情況采取的策略。由于重試,重試次數(shù)過多時,帶來時延。通常用于實時性要求較高的讀操作,但需要浪費更多服務資源。通常用于通知所有提供者更新緩存或日志等本地資源信息。

我們再來回顧一下官網(wǎng)的對于集群容錯的架構(gòu)設計圖

Cluster概述

將 Directory 中的多個 Invoker 偽裝成一個 Invoker(偽裝過程用到loadBalance),對上層透明,偽裝過程包含了容錯邏輯,調(diào)用失敗后,重試另一個。簡單來說,就是應對出錯情況采取的策略??纯催@個接口:

該接口有9個實現(xiàn)類,換個角度來說,就是有9中應對策略,本文介紹幾個比較常用的策略

FailoverCluster

失敗自動切換,當調(diào)用遠程服務失敗時,自動選擇其他服務進行調(diào)用。可以通過retries設置重試次數(shù)。由于重試,重試次數(shù)過多時,帶來時延。

/**
 * Failover
 * 當invoker調(diào)用失敗,打印錯誤日志,并且重試其他invoker
 * 重試將導致時延
 */
public class FailoverClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory directory) {
        super(directory);
    }

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
        // 局部引用
        List> copyinvokers = invokers;

        // 參數(shù)校驗
        checkInvokers(copyinvokers, invocation);

        // 獲取方法名稱
        String methodName = RpcUtils.getMethodName(invocation);

        // 獲取重試次數(shù)
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            // 最少要調(diào)用1次
            len = 1;
        }

        // 局部引用
        RpcException le = null;
        // 已經(jīng)調(diào)用過的invoker列表
        List> invoked = new ArrayList>(copyinvokers.size());
        // 調(diào)用失敗的invoker地址
        Set providers = new HashSet(len);

        // i < len 作為循環(huán)條件,說明len是多少就循環(huán)多少次(len等于 重試次數(shù) + 1)
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                // 檢查invoker是否被銷毀
                checkWhetherDestroyed();
                // 重新選擇invoker(在重試之前,需要重新選擇,以避免候選invoker的改變)
                copyinvokers = list(invocation);
                // 參數(shù)檢查
                checkInvokers(copyinvokers, invocation);
            }

            /*
             * 這一步就是進入loadBalance負載均衡
             * 因為上述步驟可能篩選出invoker數(shù)量大于1,所以再次經(jīng)過loadBalance的篩選(同時避免獲取到已經(jīng)調(diào)用過的invoker)
             */
            Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);

            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 遠程方法調(diào)用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }

                // 正常執(zhí)行,直接返回結(jié)果。否則,如果還有重試次數(shù),則繼續(xù)重試
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }

        // 能到這里,說明都失敗了,providers保存失敗的invoker地址
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

}
MergeableCluster

這個主要用在分組聚合中,我們來看一下官網(wǎng)的介紹

按組合并返回結(jié)果 ,比如菜單服務,接口一樣,但有多種實現(xiàn),用group區(qū)分,現(xiàn)在消費方需從每種group中調(diào)用一次返回結(jié)果,合并結(jié)果返回,這樣就可以實現(xiàn)聚合菜單項。

下面補充一下使用方法(網(wǎng)上基本沒有使用方法的教程,樓主才疏學淺,花了幾個小時才摸索出來):
(1)consumer側(cè),提供合并merge方法
這里有幾個步驟:
a、在resources目錄下,新建META-INF及dubbo,新建文本com.alibaba.dubbo.rpc.cluster.Merger
b、映射自定義的merger名稱到相應的實現(xiàn)類,如:
myMerger=com.patty.dubbo.consumer.service.MyMerger
c、實現(xiàn)合并函數(shù),需要實現(xiàn)Merger接口,如下:

public class MyMerger implements Merger {

    @Override
    public ModelResult merge(ModelResult... items) {

        ModelResult result = new ModelResult();
        for (ModelResult item : items) {
            // 進行數(shù)據(jù)合并操作
            result.setResult((String)result.getResult() + (String) item.getResult());
        }

        return result;
    }
}

(2)將reference的cluster屬性設置為"mergeable",group設置為“*”,并且設置合并方法,如下:


        
    

(3)同一套代碼,分別利用不同的group,把服務發(fā)布到注冊中心上面。例如:/group1/com.huangyuan.demoService 及 /group2/com.huangyuan.demoService

(3)接下來就可以直接使用了,這邊測試得到結(jié)果:(這里合并只是簡單連接字符串)

接下來再看下源碼:

public Result invoke(final Invocation invocation) throws RpcException {
        // 獲取Directory中的invoker
        List> invokers = directory.list(invocation);

        // 獲取合并方法的名稱
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
        if (ConfigUtils.isEmpty(merger)) {
            for (final Invoker invoker : invokers) {
                // 如果沒有合并方法,只調(diào)動其中一個分組
                if (invoker.isAvailable()) {
                    return invoker.invoke(invocation);
                }
            }
            return invokers.iterator().next().invoke(invocation);
        }

        // 獲取返回值類型
        Class returnType;
        try {
            returnType = getInterface().getMethod(
                    invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
        } catch (NoSuchMethodException e) {
            returnType = null;
        }

        Map> results = new HashMap>();
        for (final Invoker invoker : invokers) {
            Future future = executor.submit(new Callable() {
                @Override
                public Result call() throws Exception {
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            });
            // 保留future(未真正執(zhí)行遠程調(diào)用)
            results.put(invoker.getUrl().getServiceKey(), future);
        }

        Object result = null;

        // 結(jié)果列表
        List resultList = new ArrayList(results.size());

        // 超時時間
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

        //
        for (Map.Entry> entry : results.entrySet()) {
            Future future = entry.getValue();
            try {
                // 執(zhí)行遠程調(diào)用
                Result r = future.get(timeout, TimeUnit.MILLISECONDS);
                if (r.hasException()) {
                    log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) + 
                                    " failed: " + r.getException().getMessage(), 
                            r.getException());
                } else {
                    resultList.add(r);
                }
            } catch (Exception e) {
                throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
            }
        }

        if (resultList.isEmpty()) {
            return new RpcResult((Object) null);
        } else if (resultList.size() == 1) {
            // 只有一個結(jié)果,直接返回了
            return resultList.iterator().next();
        }

        if (returnType == void.class) {
            return new RpcResult((Object) null);
        }

        if (merger.startsWith(".")) {
            /*
             * 配置的方法名稱,以"."開頭
             * 這種方式,入?yún)⒐潭ㄖ挥幸粋€,沒有達到合并的效果,不建議使用
             */
            merger = merger.substring(1);
            Method method;
            try {
                method = returnType.getMethod(merger, returnType);
            } catch (NoSuchMethodException e) {
                throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " + 
                        returnType.getClass().getName() + " ]");
            }
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException("Can not merge result: " + e.getMessage(), e);
            }
        } else {
            /*
             * 建議使用Merger擴展的方式
             */
            Merger resultMerger;
            if (ConfigUtils.isDefault(merger)) {
                resultMerger = MergerFactory.getMerger(returnType);
            } else {
                resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
            }
            if (resultMerger != null) {
                List rets = new ArrayList(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(
                        rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        return new RpcResult(result);
    }

PS:其實合并方法還有另外一個使用方式,使用".方法名稱",并且合并方法只能寫在結(jié)果類中,這種方式有一個很大的弊端,就是源碼中入?yún)⒐潭ㄖ挥幸粋€,所以達不到合并效果,故不推薦使用。


        
    
AvailableCluster
public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public  Invoker join(Directory directory) throws RpcException {

        return new AbstractClusterInvoker(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker invoker : invokers) {
                    if (invoker.isAvailable()) {
                        // 僅僅執(zhí)行可只用的invoker
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };

    }

}

遍歷所有的Invokers判斷invoker.isAvalible,只要一個有為true直接調(diào)用返回,否則就拋出異常.

ForkingCluster

引用官網(wǎng)的介紹

并行調(diào)用多個服務器,只要一個成功即返回。通常用于實時性要求較高的讀操作,但需要浪費更多服務資源??赏ㄟ^ forks="2" 來設置最大并行數(shù)。

我們來看看源碼的實現(xiàn)

FailfastCluster

快速失敗

Failfast可以理解為只發(fā)起一次調(diào)用,若失敗則立即報錯

通常用于非冪等寫操作

@Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 成功直接往下執(zhí)行
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 失敗拋出異常,不做別的處理
            if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }
BroadcastCluster

廣播調(diào)用

廣播調(diào)用所有提供者,逐個調(diào)用,任意一臺報錯則報錯。通常用于通知所有提供者更新緩存或日志等本地資源信息。

public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;

        for (Invoker invoker : invokers) {
            try {
                // 循環(huán)調(diào)用invoker
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }
FailbackClusterInvoker

失敗自動重試

當失敗了,記錄失敗的請求,按照一定的間隔定時重試

特別適用于通知服務

這個相對比較復雜,先了解一些基礎(chǔ)概念

Delayed

延遲接口,用于標記在給定延遲之后應該被作用的對象

ScheduledFuture

實現(xiàn)Delayed、Future接口,能夠獲取未來調(diào)度的結(jié)果

演示一些上面ScheduledFuture的用法

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author huangy on 2018/11/12
 */
public class ScheduledFutureTest {

    // 延遲調(diào)用,獲取未來調(diào)度結(jié)果的對象
    private volatile ScheduledFuture retryFuture;

    // 指定時間間隔 重發(fā)執(zhí)行一次
    private static final long RETRY_FAILED_PERIOD = 1 * 1000;

    // ScheduledExecutorService的主要作用就是可以將定時任務與線程池功能結(jié)合使用
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

    public void func() {
        retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

            @Override
            public void run() {
                System.out.println("retry");
            }
            // 延遲第一次執(zhí)行的時間    每次的延遲           時間單位(現(xiàn)在填的是毫秒)
        }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
    }

    public static void main(String[] args) {
        new ScheduledFutureTest().func();
    }
}

結(jié)果如下:

其實看完這個例子,再看failbackCluster就挺簡單了

public class FailbackClusterInvoker extends AbstractClusterInvoker {

    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);

    // 5s 重發(fā)一次
    private static final long RETRY_FAILED_PERIOD = 5 * 1000;

    /**
     * Use {@link NamedInternalThreadFactory} to produce {@link org.apache.dubbo.common.threadlocal.InternalThread}
     * which with the use of {@link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
     */
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
            new NamedInternalThreadFactory("failback-cluster-timer", true));

    // 保存需要重新執(zhí)行的invoker
    private final ConcurrentMap> failed = new ConcurrentHashMap>();

    // 延遲調(diào)用,獲取未來調(diào)度結(jié)果的對象
    private volatile ScheduledFuture retryFuture;

    public FailbackClusterInvoker(Directory directory) {
        super(directory);
    }

    private void addFailed(Invocation invocation, AbstractClusterInvoker router) {
        if (retryFuture == null) {
            // 避免同時調(diào)度
            synchronized (this) {
                if (retryFuture == null) {
                    retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

                        @Override
                        public void run() {
                            // collect retry statistics
                            try {
                                // 隔一段時間重新執(zhí)行
                                retryFailed();
                            } catch (Throwable t) { // Defensive fault tolerance
                                logger.error("Unexpected error occur at collect statistic", t);
                            }
                        }
                    }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                }
            }
        }
        failed.put(invocation, router);
    }

    void retryFailed() {
        // 沒有需要重新執(zhí)行的invoker
        if (failed.size() == 0) {
            return;
        }

        // 逐個調(diào)用之前失敗的invoker
        for (Map.Entry> entry : new HashMap>(
                failed).entrySet()) {
            Invocation invocation = entry.getKey();
            Invoker invoker = entry.getValue();
            try {
                invoker.invoke(invocation);
                failed.remove(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
            }
        }
    }

    @Override
    protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            // 正常執(zhí)行,則直接返回
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                    + e.getMessage() + ", ", e);
            // 記錄失敗的請求
            addFailed(invocation, this);
            return new RpcResult(); // ignore
        }
    }

}
FailsafeCluster

調(diào)用實例失敗后,如果有報錯,則忽略掉異常,返回一個正常的空結(jié)果

@Override
    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            Invoker invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
    }

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72098.html

相關(guān)文章

  • dubbo源碼解析——消費過程

    摘要:上一篇源碼解析概要篇中我們了解到中的一些概念及消費端總體調(diào)用過程。由于在生成代理實例的時候,在構(gòu)造函數(shù)中賦值了,因此可以只用該進行方法的調(diào)用。 上一篇 dubbo源碼解析——概要篇中我們了解到dubbo中的一些概念及消費端總體調(diào)用過程。本文中,將進入消費端源碼解析(具體邏輯會放到代碼的注釋中)。本文先是對消費過程的總體代碼邏輯理一遍,個別需要細講的點,后面會專門的文章進行解析。...

    darkbug 評論0 收藏0
  • dubbo源碼解析(一)Hello,Dubbo

    摘要:英文全名為,也叫遠程過程調(diào)用,其實就是一個計算機通信協(xié)議,它是一種通過網(wǎng)絡從遠程計算機程序上請求服務而不需要了解底層網(wǎng)絡技術(shù)的協(xié)議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...

    evin2016 評論0 收藏0
  • dubbo源碼解析(四十五)服務引用過程

    摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調(diào)用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...

    xiaowugui666 評論0 收藏0
  • dubbo源碼解析——概要篇

    摘要:服務提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當網(wǎng)絡通訊層收到一個請求后,會找到對應的實例,并調(diào)用它所對應的實例,從而真正調(diào)用了服務提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進行源碼學習??偨Y(jié)起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構(gòu)圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現(xiàn)在我們在哪,我們下一...

    Meathill 評論0 收藏0
  • dubbo源碼解析(三十五)集群——cluster

    摘要:失敗安全,出現(xiàn)異常時,直接忽略。失敗自動恢復,在調(diào)用失敗后,返回一個空結(jié)果給服務提供者。源碼分析一該類實現(xiàn)了接口,是集群的抽象類。 集群——cluster 目標:介紹dubbo中集群容錯的幾種模式,介紹dubbo-cluster下support包的源碼。 前言 集群容錯還是很好理解的,就是當你調(diào)用失敗的時候所作出的措施。先來看看有哪些模式: showImg(https://segmen...

    gself 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<