

Dubbo 源碼分析 - 服務調用過程

Travis / 2122人閱讀


注: 本系列文章已捐贈給 Dubbo 社區(qū),你也可以在 Dubbo 官方文檔中閱讀本系列文章。

1. 簡介

在前面的文章中,我們分析了 Dubbo SPI、服務導出與引入、以及集群容錯方面的代碼。經過前文的鋪墊,本篇文章我們終于可以分析服務調用過程了。Dubbo 服務調用過程比較復雜,包含眾多步驟。比如發(fā)送請求、編解碼、服務降級、過濾器鏈處理、序列化、線程派發(fā)以及響應請求等步驟。限于篇幅原因,本篇文章無法對所有的步驟一一進行分析。本篇文章將會重點分析請求的發(fā)送與接收、編解碼、線程派發(fā)以及響應的發(fā)送與接收等過程,至于服務降級、過濾器鏈和序列化大家自行進行分析,也可以將其當成一個黑盒,暫時忽略也沒關系。介紹完本篇文章要分析的內容,接下來我們進入正題吧。

2. 源碼分析

在進行源碼分析之前,我們先來通過一張圖了解 Dubbo 服務調用過程。

首先服務消費者通過代理對象 Proxy 發(fā)起遠程調用,接著通過網絡客戶端 Client 將編碼后的請求發(fā)送給服務提供方的網絡層上,也就是 Server。Server 在收到請求后,首先要做的事情是對數(shù)據(jù)包進行解碼。然后將解碼后的請求發(fā)送至分發(fā)器 Dispatcher,再由分發(fā)器將請求派發(fā)到指定的線程池上,最后由線程池調用具體的服務。這就是一個遠程調用請求的發(fā)送與接收過程。至于響應的發(fā)送與接收過程,這張圖中沒有表現(xiàn)出來。對于這兩個過程,我們也會進行詳細分析。

2.1 服務調用方式

Dubbo 支持同步和異步兩種調用方式,其中異步調用還可細分為“有返回值”的異步調用和“無返回值”的異步調用。所謂“無返回值”異步調用是指服務消費方只管調用,但不關心調用結果,此時 Dubbo 會直接返回一個空的 RpcResult。若要使用異步特性,需要服務消費方手動進行配置。默認情況下,Dubbo 使用同步調用方式。

本節(jié)以及其他章節(jié)將會使用 Dubbo 官方提供的 Demo 分析整個調用過程,下面我們從 DemoService 接口的代理類開始進行分析。Dubbo 默認使用 Javassist 框架為服務接口生成動態(tài)代理類,因此我們需要先將代理類進行反編譯才能看到源碼。這里使用阿里開源 Java 應用診斷工具 Arthas 反編譯代理類,結果如下:

 * Arthas 反編譯步驟:
 * 1. 啟動 Arthas
 *    java -jar arthas-boot.jar
 * 2. 輸入編號選擇進程
 *    Arthas 啟動后,會打印 Java 應用進程列表,比如:
 *    [1]: 11232 org.jetbrains.jps.cmdline.Launcher
 *    [2]: 22370 org.jetbrains.jps.cmdline.Launcher
 *    [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer
 *    [4]: 22362 com.alibaba.dubbo.demo.provider.Provider
 *    [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain
 * 這里輸入編號 3,讓 Arthas 關聯(lián)到啟動類為 com.....Consumer 的 Java 進程上
 * 3. 由于 Demo 項目中只有一個服務接口,因此此接口的代理類類名為 proxy0,此時使用 sc 命令搜索這個類名。
 *    $ sc *.proxy0
 *    com.alibaba.dubbo.common.bytecode.proxy0
 * 4. 使用 jad 命令反編譯 com.alibaba.dubbo.common.bytecode.proxy0
 *    $ jad com.alibaba.dubbo.common.bytecode.proxy0
 * 更多使用方法請參考 Arthas 官方文檔:
 *   https://alibaba.github.io/arthas/quick-start.html
public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    // 方法數(shù)組
    public static Method[] methods;
    private InvocationHandler handler;

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;

    public proxy0() {

    public String sayHello(String string) {
        // 將參數(shù)存儲到 Object 數(shù)組中
        Object[] arrobject = new Object[]{string};
        // 調用 InvocationHandler 實現(xiàn)類的 invoke 方法得到調用結果
        Object object = this.handler.invoke(this, methods[0], arrobject);
        // 返回調用結果
        return (String)object;

    /** 回聲測試方法 */
    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;

如上,代理類的邏輯比較簡單。首先將運行時參數(shù)存儲到數(shù)組中,然后調用 InvocationHandler 接口實現(xiàn)類的 invoke 方法,得到調用結果,最后將結果轉型并返回給調用方。關于代理類的邏輯就說這么多,繼續(xù)向下分析。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker invoker;

    public InvokerInvocationHandler(Invoker handler) {
        this.invoker = handler;

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class[] parameterTypes = method.getParameterTypes();
        // 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        // 如果 toString、hashCode 和 equals 等方法被子類重寫了,這里也直接調用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        // 將 method 和 args 封裝到 RpcInvocation 中,并執(zhí)行后續(xù)的調用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();

InvokerInvocationHandler 中的 invoker 成員變量類型為 MockClusterInvoker,MockClusterInvoker 內部封裝了服務降級邏輯。下面簡單看一下:

public class MockClusterInvoker implements Invoker {
    private final Invoker invoker;
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 獲取 mock 配置值
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            // 無 mock 邏輯,直接調用其他 Invoker 對象的 invoke 方法,
            // 比如 FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            // force:xxx 直接執(zhí)行 mock 邏輯,不發(fā)起遠程調用
            result = doMockInvoke(invocation, null);
        } else {
            // fail:xxx 表示消費方對調用服務失敗后,再執(zhí)行 mock 邏輯,不拋出異常
            try {
                // 調用其他 Invoker 對象的 invoke 方法
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    // 調用失敗,執(zhí)行 mock 邏輯
                    result = doMockInvoke(invocation, e);
        return result;
    // 省略其他方法

服務降級不是本文重點,因此這里就不分析 doMockInvoke 方法了。考慮到前文已經詳細分析過 FailoverClusterInvoker,因此本節(jié)略過 FailoverClusterInvoker,直接分析 DubboInvoker。

public abstract class AbstractInvoker implements Invoker {
    public Result invoke(Invocation inv) throws RpcException {
        if (destroyed.get()) {
            throw new RpcException("Rpc invoker for service ...");
        RpcInvocation invocation = (RpcInvocation) inv;
        // 設置 Invoker
        if (attachment != null && attachment.size() > 0) {
            // 設置 attachment
        Map contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變量中
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設置異步信息到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        try {
            // 抽象方法,由子類實現(xiàn)
            return doInvoke(invocation);
        } catch (InvocationTargetException e) {
            // ...
        } catch (RpcException e) {
            // ...
        } catch (Throwable e) {
            return new RpcResult(e);

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
    // 省略其他方法

上面的代碼來自 AbstractInvoker 類,其中大部分代碼用于添加信息到 RpcInvocation#attachment 變量中,添加完畢后,調用 doInvoke 執(zhí)行后續(xù)的調用。doInvoke 是一個抽象方法,需要由子類實現(xiàn),下面到 DubboInvoker 中看一下。

public class DubboInvoker extends AbstractInvoker {
    private final ExchangeClient[] clients;
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        // 設置 path 和 version 到 attachment 中
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            // 從 clients 數(shù)組中獲取 ExchangeClient
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        try {
            // 獲取異步配置
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // isOneway 為 true,表示“單向”通信
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            // 異步無返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                // 發(fā)送請求
                currentClient.send(inv, isSent);
                // 設置上下文中的 future 為 null
                // 返回一個空的 RpcResult
                return new RpcResult();

            // 異步有返回值
            else if (isAsync) {
                // 發(fā)送請求,獲得 ResponseFuture 實例
                ResponseFuture future = currentClient.request(inv, timeout);
                // 設置 future 到上下文中
                RpcContext.getContext().setFuture(new FutureAdapter(future));
                // 暫時返回一個空結果
                return new RpcResult();

            // 同步調用
            else {
                // 發(fā)送請求,得到一個 ResponseFuture 實例,并調用該實例的 get 方法進行等待
                return (Result) currentClient.request(inv, timeout).get();
        } catch (TimeoutException e) {
            throw new RpcException(..., "Invoke remote method timeout....");
        } catch (RemotingException e) {
            throw new RpcException(..., "Failed to invoke remote method: ...");
    // 省略其他方法

上面的代碼包含了 Dubbo 對同步和異步調用的處理邏輯,搞懂了上面的代碼,會對 Dubbo 的同步和異步調用方式有更深入的了解。Dubbo 實現(xiàn)同步和異步調用比較關鍵的一點就在于由誰調用 ResponseFuture 的 get 方法。同步調用模式下,由框架自身調用 ResponseFuture 的 get 方法。異步調用模式下,則由用戶調用該方法。ResponseFuture 是一個接口,下面我們來看一下它的默認實現(xiàn)類 DefaultFuture 的源碼。

public class DefaultFuture implements ResponseFuture {
    private static final Map CHANNELS = 
        new ConcurrentHashMap();

    private static final Map FUTURES = 
        new ConcurrentHashMap();
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private volatile Response response;
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 獲取請求 id,這個 id 很重要,后面還會見到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存儲  映射關系到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    public Object get() throws RemotingException {
        return get(timeout);

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        if (!isDone()) {
            long start = System.currentTimeMillis();
            try {
                // 循環(huán)檢測服務提供方是否成功返回了調用結果
                while (!isDone()) {
                    // 如果調用結果尚未返回,這里等待一段時間
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果調用結果成功返回,或等待超時,此時跳出 while 循環(huán),執(zhí)行后續(xù)的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
            // 如果調用結果仍未返回,則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        // 返回調用結果
        return returnFromResponse();
    public boolean isDone() {
        // 通過檢測 response 字段為空與否,判斷是否收到了調用結果
        return response != null;
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        // 如果調用結果的狀態(tài)為 Response.OK,則表示調用過程正常,服務提供方成功返回了調用結果
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        // 拋出異常
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        throw new RemotingException(channel, res.getErrorMessage());
    // 省略其他方法

如上,當服務消費者還未接收到調用結果時,用戶線程調用 get 方法會被阻塞住。同步調用模式下,框架獲得 DefaultFuture 對象后,會立即調用 get 方法進行等待。而異步模式下則是將該對象封裝到 FutureAdapter 實例中,并將 FutureAdapter 實例設置到 RpcContext 中,供用戶使用。FutureAdapter 是一個適配器,用于將 Dubbo 中的 ResponseFuture 與 JDK 中的 Future 進行適配。這樣當用戶線程調用 Future 的 get 方法時,經過 FutureAdapter 適配,最終會調用 ResponseFuture 實現(xiàn)類對象的 get 方法,也就是 DefaultFuture 的 get 方法。

到這里關于 Dubbo 幾種調用方式的代碼邏輯就分析完了,下面來分析請求數(shù)據(jù)的發(fā)送與接收,以及響應數(shù)據(jù)的發(fā)送與接收過程。

2.2 服務消費方發(fā)送請求 2.2.1 發(fā)送請求


這張圖展示了服務消費方發(fā)送請求過程的部分調用棧,略為復雜。從上圖可以看出,經過多次調用后,才將請求數(shù)據(jù)送至 Netty NioClientSocketChannel。這樣做的原因是通過 Exchange 層為框架引入 Request 和 Response 語義,這一點會在接下來的源碼分析過程中會看到。其他的就不多說了,下面開始進行分析。首先分析 ReferenceCountExchangeClient 的源碼。

final class ReferenceCountExchangeClient implements ExchangeClient {

    private final URL url;
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap ghostClientMap) {
        this.client = client;
        // 引用計數(shù)自增
        this.url = client.getUrl();
        // ...

    public ResponseFuture request(Object request) throws RemotingException {
        // 直接調用被裝飾對象的同簽名方法
        return client.request(request);

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接調用被裝飾對象的同簽名方法
        return client.request(request, timeout);

    /** 引用計數(shù)自增,該方法由外部調用 */
    public void incrementAndGetCount() {
        // referenceCount 自增
    public void close(int timeout) {
        // referenceCount 自減
        if (referenceCount.decrementAndGet() <= 0) {
            if (timeout == 0) {
            } else {
            client = replaceWithLazyClient();
    // 省略部分方法

ReferenceCountExchangeClient 內部定義了一個引用計數(shù)變量 referenceCount,每當該對象被引用一次 referenceCount 都會進行自增。每當 close 方法被調用時,referenceCount 進行自減。ReferenceCountExchangeClient 內部僅實現(xiàn)了一個引用計數(shù)的功能,其他方法并無復雜邏輯,均是直接調用被裝飾對象的相關方法。所以這里就不多說了,繼續(xù)向下分析,這次是 HeaderExchangeClient。

public class HeaderExchangeClient implements ExchangeClient {

    private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
    private final Client client;
    private final ExchangeChannel channel;
    private ScheduledFuture heartbeatTimer;
    private int heartbeat;
    private int heartbeatTimeout;

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        this.client = client;
        // 創(chuàng)建 HeaderExchangeChannel 對象
        this.channel = new HeaderExchangeChannel(client);
        // 以下代碼均與心跳檢測邏輯有關
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        if (needHeartbeat) {
            // 開啟心跳檢測定時器

    public ResponseFuture request(Object request) throws RemotingException {
        // 直接 HeaderExchangeChannel 對象的同簽名方法
        return channel.request(request);

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // 直接 HeaderExchangeChannel 對象的同簽名方法
        return channel.request(request, timeout);

    public void close() {
    private void doClose() {
        // 停止心跳檢測定時器

    private void startHeartbeatTimer() {
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        public Collection getChannels() {
                            return Collections.singletonList(HeaderExchangeClient.this);
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);

    private void stopHeartbeatTimer() {
        if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
            try {
            } catch (Throwable e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(e.getMessage(), e);
        heartbeatTimer = null;
    // 省略部分方法

HeaderExchangeClient 中很多方法只有一行代碼,即調用 HeaderExchangeChannel 對象的同簽名方法。那 HeaderExchangeClient 有什么用處呢?答案是封裝了一些關于心跳檢測的邏輯。心跳檢測并非本文所關注的點,因此就不多說了,繼續(xù)向下看。

final class HeaderExchangeChannel implements ExchangeChannel {
    private final Channel channel;
    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        // 這里的 channel 指向的是 NettyClient
        this.channel = channel;
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(..., "Failed to send request ...");
        // 創(chuàng)建 Request 對象
        Request req = new Request();
        // 設置雙向通信標志為 true
        // 這里的 request 變量類型為 RpcInvocation
        // 創(chuàng)建 DefaultFuture 對象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 調用 NettyClient 的 send 方法發(fā)送請求
        } catch (RemotingException e) {
            throw e;
        // 返回 DefaultFuture 對象
        return future;

到這里大家終于看到了 Request 語義了,上面的方法首先定義了一個 Request 對象,然后再將該對象傳給 NettyClient 的 send 方法,進行后續(xù)的調用。需要說明的是,NettyClient 中并未實現(xiàn) send 方法,該方法繼承自父類 AbstractPeer,下面直接分析 AbstractPeer 的代碼。

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    public void send(Object message) throws RemotingException {
        // 該方法由 AbstractClient 類實現(xiàn)
        send(message, url.getParameter(Constants.SENT_KEY, false));
    // 省略其他方法

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
        // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現(xiàn)
        Channel channel = getChannel();
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send ...");
        // 繼續(xù)向下調用
        channel.send(message, sent);
    protected abstract Channel getChannel();
    // 省略其他方法

默認情況下,Dubbo 使用 Netty 作為底層的通信框架,因此下面我們到 NettyClient 類中看一下 getChannel 方法的實現(xiàn)邏輯。

public class NettyClient extends AbstractClient {
    // 這里的 Channel 全限定名稱為 org.jboss.netty.channel.Channel
    private volatile Channel channel;

    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isConnected())
            return null;
        // 獲取一個 NettyChannel 類型對象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);

final class NettyChannel extends AbstractChannel {

    private static final ConcurrentMap channelMap = 
        new ConcurrentHashMap();

    private final org.jboss.netty.channel.Channel channel;
    /** 私有構造方法 */
    private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        this.channel = channel;

    static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        // 嘗試從集合中獲取 NettyChannel 實例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,則創(chuàng)建一個新的 NettyChannel 實例
            NettyChannel nc = new NettyChannel(ch, url, handler);
            if (ch.isConnected()) {
                // 將  鍵值對存入 channelMap 集合中
                ret = channelMap.putIfAbsent(ch, nc);
            if (ret == null) {
                ret = nc;
        return ret;

獲取到 NettyChannel 實例后,即可進行后續(xù)的調用。下面看一下 NettyChannel 的 send 方法。

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發(fā)送消息(包含請求和響應消息)
        ChannelFuture future = channel.write(message);
        // sent 的值源于  中 sent 的配置值,有兩種配置值:
        //   1. true: 等待消息發(fā)出,消息發(fā)送失敗將拋出異常
        //   2. false: 不等待消息發(fā)出,將消息放入 IO 隊列,即刻返回
        // 默認情況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發(fā)出,若在規(guī)定時間沒能發(fā)出,success 會被置為 false
            success = future.await(timeout);
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message ...");

    // 若 success 為 false,這里拋出異常
    if (!success) {
        throw new RemotingException(this, "Failed to send message ...");

經歷多次調用,到這里請求數(shù)據(jù)的發(fā)送過程就結束了,過程漫長。為了便于大家閱讀代碼,這里以 DemoService 為例,將 sayHello 方法的整個調用路徑貼出來。

  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

在 Netty 中,出站數(shù)據(jù)在發(fā)出之前還需要進行編碼操作,接下來我們來分析一下請求數(shù)據(jù)的編碼邏輯。

2.2.2 請求編碼

在分析請求編碼邏輯之前,我們先來看一下 Dubbo 數(shù)據(jù)包結構。

Dubbo 數(shù)據(jù)包分為消息頭和消息體,消息頭用于存儲一些元信息,比如魔數(shù)(Magic),數(shù)據(jù)包類型(Request/Response),消息體長度(Data Length)等。消息體中用于存儲具體的調用消息,比如方法名稱,參數(shù)列表等。下面簡單列舉一下消息頭的內容。

偏移量(Bit) 字段 取值
0 ~ 7 魔數(shù)高位 0xda00
8 ~ 15 魔數(shù)低位 0xbb
16 數(shù)據(jù)包類型 0 - Response, 1 - Request
17 調用方式 僅在第16位被設為1的情況下有效,0 - 單向調用,1 - 雙向調用
18 事件標識 0 - 當前數(shù)據(jù)包是請求或響應包,1 - 當前數(shù)據(jù)包是心跳包
19 ~ 23 序列化器編號 2 - Hessian2Serialization
3 - JavaSerialization
4 - CompactedJavaSerialization
6 - FastJsonSerialization
7 - NativeJavaSerialization
8 - KryoSerialization
9 - FstSerialization
24 ~ 31 狀態(tài) 20 - OK
32 ~ 95 請求編號 共8字節(jié),運行時生成
96 ~ 127 消息體長度 運行時計算

了解了 Dubbo 數(shù)據(jù)包格式,接下來我們就可以探索編碼過程了。這次我們開門見山,直接分析編碼邏輯所在類。如下:

public class ExchangeCodec extends TelnetCodec {

    // 消息頭長度
    protected static final int HEADER_LENGTH = 16;
    // 魔數(shù)內容
    protected static final short MAGIC = (short) 0xdabb;
    protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
    protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
    protected static final byte FLAG_REQUEST = (byte) 0x80;
    protected static final byte FLAG_TWOWAY = (byte) 0x40;
    protected static final byte FLAG_EVENT = (byte) 0x20;
    protected static final int SERIALIZATION_MASK = 0x1f;
    private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);

    public Short getMagicCode() {
        return MAGIC;

    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            // 對 Request 對象進行編碼
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            // 對 Response 對象進行編碼,后面分析
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);

    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);

        // 創(chuàng)建消息頭字節(jié)數(shù)組,長度為 16
        byte[] header = new byte[HEADER_LENGTH];

        // 設置魔數(shù)
        Bytes.short2bytes(MAGIC, header);

        // 設置數(shù)據(jù)包類型(Request/Response)和序列化器編號
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        // 設置通信方式(單向/雙向)
        if (req.isTwoWay()) {
            header[2] |= FLAG_TWOWAY;
        // 設置事件標識
        if (req.isEvent()) {
            header[2] |= FLAG_EVENT;

        // 設置請求編號,8個字節(jié),從第4個字節(jié)開始設置
        Bytes.long2bytes(req.getId(), header, 4);

        // 獲取 buffer 當前的寫位置
        int savedWriteIndex = buffer.writerIndex();
        // 更新 writerIndex,為消息頭預留 16 個字節(jié)的空間
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // 創(chuàng)建序列化器,比如 Hessian2ObjectOutput
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {
            // 對事件數(shù)據(jù)進行序列化操作
            encodeEventData(channel, out, req.getData());
        } else {
            // 對請求數(shù)據(jù)進行序列化操作
            encodeRequestData(channel, out, req.getData(), req.getVersion());
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        // 獲取寫入的字節(jié)數(shù),也就是消息體長度
        int len = bos.writtenBytes();
        checkPayload(channel, len);

        // 將消息體長度寫入到消息頭中
        Bytes.int2bytes(len, header, 12);

        // 將 buffer 指針移動到 savedWriteIndex,為寫消息頭做準備
        // 從 savedWriteIndex 下標處寫入消息頭
        // 設置新的 writerIndex,writerIndex = 原寫下標 + 消息頭長度 + 消息體長度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    // 省略其他方法

以上就是請求對象的編碼過程,該過程首先會通過位運算將消息頭寫入到 header 數(shù)組中。然后對 Request 對象的 data 字段執(zhí)行序列化操作,序列化后的數(shù)據(jù)最終會存儲到 ChannelBuffer 中。序列化操作執(zhí)行完后,可得到數(shù)據(jù)序列化后的長度 len,緊接著將 len 寫入到 header 指定位置處。最后再將消息頭字節(jié)數(shù)組 header 寫入到 ChannelBuffer 中,整個編碼過程就結束了。本節(jié)的最后,我們再來看一下 Request 對象的 data 字段序列化過程,也就是 encodeRequestData 方法的邏輯,如下:

public class DubboCodec extends ExchangeCodec implements Codec2 {
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        // 依次序列化 dubbo version、path、version

        // 序列化調用方法名
        // 將參數(shù)類型轉換為字符串,并進行序列化
        Object[] args = inv.getArguments();
        if (args != null)
            for (int i = 0; i < args.length; i++) {
                // 對運行時參數(shù)進行序列化
                out.writeObject(encodeInvocationArgument(channel, inv, i));
        // 序列化 attachments


2.3 服務提供方接收請求

前面說過,默認情況下 Dubbo 使用 Netty 作為底層的通信框架。Netty 檢測到有數(shù)據(jù)入站后,首先會通過解碼器對數(shù)據(jù)進行解碼,并將解碼后的數(shù)據(jù)傳遞給下一個入站處理器的指定方法。所以在進行后續(xù)的分析之前,我們先來看一下數(shù)據(jù)解碼過程。

2.3.1 請求解碼


public class ExchangeCodec extends TelnetCodec {
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
        int readable = buffer.readableBytes();
        // 創(chuàng)建消息頭字節(jié)數(shù)組
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        // 讀取消息頭數(shù)據(jù)
        // 調用重載方法進行后續(xù)解碼工作
        return decode(channel, buffer, readable, header);

    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 檢查魔數(shù)是否相等
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            if (header.length < readable) {
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
            // 通過 telnet 命令行發(fā)送的數(shù)據(jù)包不包含消息頭,所以這里
            // 調用 TelnetCodec 的 decode 方法對數(shù)據(jù)包進行解碼
            return super.decode(channel, buffer, readable, header);
        // 檢測可讀數(shù)據(jù)量是否少于消息頭長度,若小于則立即返回 DecodeResult.NEED_MORE_INPUT
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;

        // 從消息頭中獲取消息體長度
        int len = Bytes.bytes2int(header, 12);
        // 檢測消息體長度是否超出限制,超出則拋出異常
        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;
        // 檢測可讀的字節(jié)數(shù)是否小于實際的字節(jié)數(shù)
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 繼續(xù)進行解碼工作
            return decodeBody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);

上面方法通過檢測消息頭中的魔數(shù)是否與規(guī)定的魔數(shù)相等,提前攔截掉非常規(guī)數(shù)據(jù)包,比如通過 telnet 命令行發(fā)出的數(shù)據(jù)包。接著再對消息體長度,以及可讀字節(jié)數(shù)進行檢測。最后調用 decodeBody 方法進行后續(xù)的解碼工作,ExchangeCodec 中實現(xiàn)了 decodeBody 方法,但因其子類 DubboCodec 覆寫了該方法,所以在運行時 DubboCodec 中的 decodeBody 方法會被調用。下面我們來看一下該方法的代碼。

public class DubboCodec extends ExchangeCodec implements Codec2 {

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        // 獲取消息頭中的第三個字節(jié),并通過邏輯與運算得到序列化器編號
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // 獲取調用編號
        long id = Bytes.bytes2long(header, 4);
        // 通過邏輯與運算得到調用類型,0 - Response,1 - Request
        if ((flag & FLAG_REQUEST) == 0) {
            // 對響應結果進行解碼,得到 Response 對象。這個非本節(jié)內容,后面再分析
            // ...
        } else {
            // 創(chuàng)建 Request 對象
            Request req = new Request(id);
            // 通過邏輯與運算得到通信方式,并設置到 Request 對象中
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            // 通過位運算檢測數(shù)據(jù)包是否為事件類型
            if ((flag & FLAG_EVENT) != 0) {
                // 設置心跳事件到 Request 對象中
            try {
                Object data;
                if (req.isHeartbeat()) {
                    // 對心跳包進行解碼,該方法已被標注為廢棄
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (req.isEvent()) {
                    // 對事件數(shù)據(jù)進行解碼
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    DecodeableRpcInvocation inv;
                    // 根據(jù) url 參數(shù)判斷是否在 IO 線程上對消息體進行解碼
                    if (channel.getUrl().getParameter(
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        // 在當前線程,也就是 IO 線程上進行后續(xù)的解碼工作。此工作完成后,可將
                        // 調用方法名、attachment、以及調用參數(shù)解析出來
                    } else {
                        // 僅創(chuàng)建 DecodeableRpcInvocation 對象,但不在當前線程上執(zhí)行解碼邏輯
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    data = inv;
                // 設置 data 到 Request 對象中
            } catch (Throwable t) {
                // 若解碼過程中出現(xiàn)異常,則將 broken 字段設為 true,
                // 并將異常對象設置到 Reqeust 對象中
            return req;

如上,decodeBody 對部分字段進行了解碼,并將解碼得到的字段封裝到 Request 中。隨后會調用 DecodeableRpcInvocation 的 decode 方法進行后續(xù)的解碼工作。此工作完成后,可將調用方法名、attachment、以及運行時調用參數(shù)解析出來。下面我們來看一下 DecodeableRpcInvocation 的 decode 方法邏輯。

public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 通過反序列化得到 dubbo version,并保存到 attachments 變量中
        String dubboVersion = in.readUTF();
        setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);

        // 通過反序列化得到 path,version,并保存到 attachments 變量中
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        // 通過反序列化得到調用方法名
        try {
            Object[] args;
            Class[] pts;
            // 通過反序列化得到參數(shù)類型字符串,比如 Ljava/lang/String;
            String desc = in.readUTF();
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                // 將 desc 解析為參數(shù)類型數(shù)組
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 解析運行時參數(shù)
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
            // 設置參數(shù)類型數(shù)組

            // 通過反序列化得到原 attachments 的內容
            Map map = (Map) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                // 將 map 與當前對象中的 attachment 集合進行融合
            // 對 callback 類型的參數(shù)進行處理
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);

            // 設置參數(shù)列表

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
        return this;

上面的方法通過反序列化將諸如 path、version、調用方法名、參數(shù)列表等信息依次解析出來,并設置到相應的字段中,最終得到一個具有完整調用信息的 DecodeableRpcInvocation 對象。

到這里,請求數(shù)據(jù)解碼的過程就分析完了。此時我們得到了一個 Request 對象,這個對象會被傳送到下一個入站處理器中,我們繼續(xù)往下看。

2.3.2 調用服務

解碼器將數(shù)據(jù)包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,并將這個對象繼續(xù)向下傳遞。這期間該對象會被依次傳遞給 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 將該對象封裝到 Runnable 實現(xiàn)類對象中,并將 Runnable 放入線程池中執(zhí)行后續(xù)的調用邏輯。整個調用棧如下:

NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
          —> ExecutorService#execute(Runnable)    // 由線程池執(zhí)行后續(xù)的調用邏輯


public class NettyHandler extends SimpleChannelHandler {
    private final Map channels = new ConcurrentHashMap();

    private final URL url;

    private final ChannelHandler handler;
    public NettyHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        this.url = url;
        // 這里的 handler 類型為 NettyServer
        this.handler = handler;
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        // 獲取 NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            // 繼續(xù)向下調用
            handler.received(channel, e.getMessage());
        } finally {

如上,NettyHandler 中的 messageReceived 邏輯比較簡單。首先根據(jù)一些信息獲取 NettyChannel 實例,然后將 NettyChannel 實例以及 Request 對象向下傳遞。下面再來看看 AllChannelHandler 的邏輯,在詳細分析代碼之前,我們先來了解一下 Dubbo 中的線程派發(fā)模型。 線程派發(fā)模型

Dubbo 將底層通信框架中接收請求的線程稱為 IO 線程。如果一些事件處理邏輯可以很快執(zhí)行完,比如只在內存打一個標記,此時直接在 IO 線程上執(zhí)行該段邏輯即可。但如果事件的處理邏輯比較耗時,比如該段邏輯會發(fā)起數(shù)據(jù)庫查詢或者 HTTP 請求。此時我們就不應該讓事件處理邏輯在 IO 線程上執(zhí)行,而是應該派發(fā)到線程池中去執(zhí)行。原因也很簡單,IO 線程主要用于接收請求,如果 IO 線程被占滿,將導致它不能接收新的請求。

以上就是線程派發(fā)的背景,下面我們再來通過 Dubbo 調用圖,看一下線程派發(fā)器所處的位置。

如上圖,紅框中的 Dispatcher 就是線程派發(fā)器。需要說明的是,Dispatcher 真實的職責創(chuàng)建具有線程派發(fā)能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具備線程派發(fā)能力。Dubbo 支持 5 種不同的線程派發(fā)策略,下面通過一個表格列舉一下。

策略 用途
all 所有消息都派發(fā)到線程池,包括請求,響應,連接事件,斷開事件等
direct 所有消息都不派發(fā)到線程池,全部在 IO 線程上直接執(zhí)行
message 只有請求響應消息派發(fā)到線程池,其它消息均在 IO 線程上執(zhí)行
execution 只有請求消息派發(fā)到線程池,不含響應。其它消息均在 IO 線程上執(zhí)行
connection 在 IO 線程上,將連接斷開事件放入隊列,有序逐個執(zhí)行,其它消息派發(fā)到線程池

默認配置下,Dubbo 使用 all 派發(fā)策略,即將所有的消息都派發(fā)到線程池中。下面我們來分析一下 AllChannelHandler 的代碼。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);

    /** 處理連接事件 */
    public void connected(Channel channel) throws RemotingException {
        // 獲取線程池
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將連接事件派發(fā)到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., " error when process connected event .", t);

    /** 處理斷開事件 */
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process disconnected event .", t);

    /** 處理請求和響應消息,這里的 message 變量類型可能是 Request,也可能是 Response */
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請求和響應消息派發(fā)到線程池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                // 如果通信方式為雙向通信,此時將 Server side ... threadpool is exhausted 
                // 錯誤信息封裝到 Response 中,并返回給服務消費方。
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() 
                        + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    // 返回包含錯誤信息的 Response 對象
            throw new ExecutionException(..., " error when process received event .", t);

    /** 處理異常信息 */
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException(..., "error when process caught event ...");

如上,請求對象會被封裝 ChannelEventRunnable 中,ChannelEventRunnable 將會是服務調用過程的新起點。所以接下來我們以 ChannelEventRunnable 為起點向下探索。 調用服務

本小節(jié),我們從 ChannelEventRunnable 開始分析,該類的主要代碼如下:

public class ChannelEventRunnable implements Runnable {
    private final ChannelHandler handler;
    private final Channel channel;
    private final ChannelState state;
    private final Throwable exception;
    private final Object message;
    public void run() {
        // 檢測通道狀態(tài),對于請求或響應消息,此時 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對象,進行后續(xù)的調用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("... operation error, channel is ... message is ...");
        // 其他消息類型通過 switch 進行處理
        else {
            switch (state) {
            case CONNECTED:
                try {
                } catch (Exception e) {
                    logger.warn("... operation error, channel is ...");
            case DISCONNECTED:
                // ...
            case SENT:
                // ...
            case CAUGHT:
                // ...
                logger.warn("unknown state: " + state + ", message is " + message);


如上,請求和響應消息出現(xiàn)頻率明顯比其他類型消息高,所以這里對該類型的消息進行了針對性判斷。ChannelEventRunnable 僅是一個中轉站,它的 run 方法中并不包含具體的調用邏輯,僅用于將參數(shù)傳給其他 ChannelHandler 對象進行處理,該對象類型為 DecodeHandler。

public class DecodeHandler extends AbstractChannelHandlerDelegate {

    public DecodeHandler(ChannelHandler handler) {

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            // 對 Decodeable 接口實現(xiàn)類對象進行解碼

        if (message instanceof Request) {
            // 對 Request 的 data 字段進行解碼
            decode(((Request) message).getData());

        if (message instanceof Response) {
            // 對 Request 的 result 字段進行解碼
            decode(((Response) message).getResult());

        // 執(zhí)行后續(xù)邏輯
        handler.received(channel, message);

    private void decode(Object message) {
        // Decodeable 接口目前有兩個實現(xiàn)類,
        // 分別為 DecodeableRpcInvocation 和 DecodeableRpcResult
        if (message != null && message instanceof Decodeable) {
            try {
                // 執(zhí)行解碼邏輯
                ((Decodeable) message).decode();
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);

DecodeHandler 主要是包含了一些解碼邏輯。2.2.1 節(jié)分析請求解碼時說過,請求解碼可在 IO 線程上執(zhí)行,也可在線程池中執(zhí)行,這個取決于運行時配置。DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼。解碼完畢后,完全解碼后的 Request 對象會繼續(xù)向后傳遞,下一站是 HeaderExchangeHandler。

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    private final ExchangeHandler handler;

    public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        this.handler = handler;

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求對象
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                // 處理普通的請求
                else {
                    // 雙向通信
                    if (request.isTwoWay()) {
                        // 向后調用服務,并得到調用結果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將調用結果返回給服務消費端
                    // 如果是單向通信,僅向后調用指定服務即可,無需返回調用結果
                    else {
                        handler.received(exchangeChannel, request.getData());
            // 處理響應對象,服務消費方會執(zhí)行此處邏輯,后面分析
            else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關,忽略
            } else {
                handler.received(exchangeChannel, message);
        } finally {

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測請求是否合法,不合法則返回狀態(tài)碼為 BAD_REQUEST 的響應
        if (req.isBroken()) {
            Object data = req.getData();

            String msg;
            if (data == null)
                msg = null;
            else if
                (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
                msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            // 設置 BAD_REQUEST 狀態(tài)

            return res;
        // 獲取 data 字段值,也就是 RpcInvocation 對象
        Object msg = req.getData();
        try {
            // 繼續(xù)向下調用
            Object result = handler.reply(channel, msg);
            // 設置 OK 狀態(tài)碼
            // 設置調用結果
        } catch (Throwable e) {
            // 若調用過程出現(xiàn)異常,則設置 SERVICE_ERROR,表示服務端異常




  • Dubbo 源碼分析 - 集群容錯之 Cluster

    摘要:集群用途是將多個服務提供者合并為一個,并將這個暴露給服務消費者。比如發(fā)請求,接受服務提供者返回的數(shù)據(jù)等。如果包含,表明對應的服務提供者可能因網絡原因未能成功提供服務。如果不包含,此時還需要進行可用性檢測,比如檢測服務提供者網絡連通性等。 1.簡介 為了避免單點故障,現(xiàn)在的應用至少會部署在兩臺服務器上。對于一些負載比較高的服務,會部署更多臺服務器。這樣,同一環(huán)境下的服務提供者數(shù)量會大于1...

    denson 評論0 收藏0
  • Dubbo 源碼分析 - 服務導出

    摘要:支持兩種服務導出方式,分別延遲導出和立即導出。本文打算分析服務延遲導出過程,因此不會分析方法。服務導出之前,要進行對一系列的配置進行檢查,以及生成。返回時,表示需要延遲導出。賽程預告,下一站是服務導出的前置工作。 1.服務導出過程 本篇文章,我們來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始于 Spring 容器發(fā)布刷新事件,Dubbo 在接收到事件后,會立即執(zhí)行服...

    劉玉平 評論0 收藏0
  • dubbo源碼解析(四十五)服務引用過程

    摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...

    xiaowugui666 評論0 收藏0
  • dubbo源碼解析——概要篇

    摘要:服務提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當網絡通訊層收到一個請求后,會找到對應的實例,并調用它所對應的實例,從而真正調用了服務提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進行源碼學習??偨Y起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現(xiàn)在我們在哪,我們下一...

    Meathill 評論0 收藏0
  • dubbo源碼解析(四十四)服務暴露過程

    摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數(shù)據(jù)組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0


