成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

dubbo之timeout超時分析

張率功 / 3412人閱讀

摘要:講到這里,超時原理基本上其實差不多了,這個類還有個地方需要注意,在初始化對象時,會去創(chuàng)建一個超時的延遲任務,延遲時間就是值,在這個延遲任務中也會調(diào)用方法喚醒阻塞

背景

在使用dubbo時,通常會遇到timeout這個屬性,timeout屬性的作用是:給某個服務調(diào)用設置超時時間,如果服務在設置的時間內(nèi)未返回結(jié)果,則會拋出調(diào)用超時異常:TimeoutException,在使用的過程中,我們有時會對provider和consumer兩個配置都會設置timeout值,那么服務調(diào)用過程中會以哪個為準?本文主要針對這個問題進行分析和擴展

三種設置方式

以provider配置為例:

方法級別

設置方式如下所示:


   

接口級別

全局級別

優(yōu)先級選擇

在dubbo中如果provider和consumer都配置了相同的一個屬性,比如本文分析的timeout,其實是有一個優(yōu)先級的,優(yōu)先級:
consumer方法配置 > provider方法配置 > consumer接口配置 > provider接口配置 > consumer全局配置 > provider全局配置。所以對于本文開始的提出的問題就有了結(jié)果,會以消費者配置的為準,接下結(jié)合源碼來進行解析,其實源碼很簡單,在RegistryDirectory類中將服務列表轉(zhuǎn)換為DubboInvlker方法中進行了處理:

    private Map> toInvokers(List urls) {
        Map> newUrlInvokerMap = new HashMap>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set keys = new HashSet();
        String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            // 重點就是下面這個方法
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(Constants.DISABLED_KEY)) {
                        enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(Constants.ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

重點就是上面mergeUrl()方法,將provider和comsumer的url參數(shù)進行了整合,在
mergeUrl()方法有會調(diào)用ClusterUtils.mergeUrl方法進行整合,因為這個方法比較簡單,就是對一些參數(shù)進行了整合了,會用consumer參數(shù)進行覆蓋,咱們這里就不分析了,如果感興趣的同學可以去研究一下。

超時處理

在配置設置了超時timeout,那么代碼中是如何處理的,這里咱們在進行一下擴展,分析一下dubbo中是如何處理超時的,在調(diào)用服務方法,最后都會調(diào)用DubboInvoker.doInvoke方法,咱們就從這個方法開始分析:

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);

                Result result;
                // 異步處理
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the async result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                // 同步處理
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在這個方法中,咱們就以同步模式進行分析,看request方法,request()方法會返回一個DefaultFuture類,在去調(diào)用DefaultFuture.get()方法,這里其實涉及到一個在異步中實現(xiàn)同步的技巧,咱們這里不做分析,所以重點就在get()方法里:

    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

在調(diào)用get()方法時,會去調(diào)用get(timeout)這個方法,在這個方法中會傳一個timeout字段,在和timeout就是咱們配置的那個參數(shù),在這個方法中咱們要關注下面一個代碼塊:

        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 線程阻塞
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 在超時時間里,還沒有結(jié)果,則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }

重點看await()方法,會進行阻塞timeout時間,如果阻塞時間到了,則會喚醒往下執(zhí)行,超時跳出while循環(huán)中,判斷是否有結(jié)果返回,如果沒有(這個地方要注意:只有有結(jié)果返回,或超時才跳出循環(huán)中),則拋出超時異常。講到這里,超時原理基本上其實差不多了,DefaultFuture這個類還有個地方需要注意,在初始化DefaultFuture對象時,會去創(chuàng)建一個超時的延遲任務,延遲時間就是timeout值,在這個延遲任務中也會調(diào)用signal()方法喚醒阻塞

文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/74197.html

相關文章

  • Dubbo 源碼分析 - 集群容錯 Cluster

    摘要:集群用途是將多個服務提供者合并為一個,并將這個暴露給服務消費者。比如發(fā)請求,接受服務提供者返回的數(shù)據(jù)等。如果包含,表明對應的服務提供者可能因網(wǎng)絡原因未能成功提供服務。如果不包含,此時還需要進行可用性檢測,比如檢測服務提供者網(wǎng)絡連通性等。 1.簡介 為了避免單點故障,現(xiàn)在的應用至少會部署在兩臺服務器上。對于一些負載比較高的服務,會部署更多臺服務器。這樣,同一環(huán)境下的服務提供者數(shù)量會大于1...

    denson 評論0 收藏0
  • Dubbo 源碼分析20 Dubbo服務提供者、服務消費者并發(fā)度控制機制

    摘要:代碼根據(jù)服務提供者和服務調(diào)用方法名,獲取。代碼根據(jù)服務提供者配置的最大并發(fā)度,創(chuàng)建該服務該方法對應的信號量對象。總結(jié)是控制消費端對單個服務提供者單個服務允許調(diào)用的最大并發(fā)度。 本文將詳細分析< dubbo:service executes=/>與< dubbo:reference actives = />的實現(xiàn)機制,深入探...

    不知名網(wǎng)友 評論0 收藏0
  • dubbo源碼解析(二十六)遠程調(diào)用——http協(xié)議

    摘要:前言基于表單的遠程調(diào)用協(xié)議,采用的實現(xiàn),關于協(xié)議就不用多說了吧。后記該部分相關的源碼解析地址該文章講解了遠程調(diào)用中關于協(xié)議的部分,內(nèi)容比較簡單,可以參考著官方文檔了解一下。 遠程調(diào)用——http協(xié)議 目標:介紹遠程調(diào)用中跟http協(xié)議相關的設計和實現(xiàn),介紹dubbo-rpc-http的源碼。 前言 基于HTTP表單的遠程調(diào)用協(xié)議,采用 Spring 的HttpInvoker實現(xiàn),關于h...

    xiyang 評論0 收藏0
  • dubbo源碼解析(二十五)遠程調(diào)用——hessian協(xié)議

    摘要:客戶端對象字節(jié)輸出流請求對象響應對象增加協(xié)議頭發(fā)送請求獲得請求后的狀態(tài)碼三該類實現(xiàn)了接口,是創(chuàng)建的工廠類。該類的實現(xiàn)跟類類似,但是是標準的接口調(diào)用會采用的工廠類,而是的協(xié)議調(diào)用。 遠程調(diào)用——hessian協(xié)議 目標:介紹遠程調(diào)用中跟hessian協(xié)議相關的設計和實現(xiàn),介紹dubbo-rpc-hessian的源碼。 前言 本文講解多是dubbo集成的第二種協(xié)議,hessian協(xié)議,He...

    xzavier 評論0 收藏0
  • 數(shù)據(jù)庫相關異常分析

    摘要:起因最近一段時間,生產(chǎn)系統(tǒng)持續(xù)碰到一些數(shù)據(jù)庫異常,導致執(zhí)行失敗。綜上,若發(fā)生異常,為數(shù)據(jù)庫連接失效,但是失效的原因可能會有多種,大致都與各種參數(shù)相關。當時數(shù)據(jù)量大概多條,然后在批量插入時拋出該異常。 起因 最近一段時間,生產(chǎn)系統(tǒng)持續(xù)碰到一些數(shù)據(jù)庫異常,導致 sql 執(zhí)行失敗。 應用環(huán)境 Java 1.7 + Mysql 5.6 + spring + ibatis 問題排查 將各種失敗的...

    IamDLY 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<