成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

dubbo源碼解析(四十七)服務端處理請求過程

yzzz / 3429人閱讀

摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發(fā)到的。

2.7大揭秘——服務端處理請求過程
目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。
前言

上一篇講到了消費端發(fā)送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數(shù)據(jù)包后的一系列處理以及如何返回最終結果。我們也知道消費端在發(fā)送請求的時候已經(jīng)做了編碼,所以我們也需要在服務端接收到數(shù)據(jù)包后,對協(xié)議頭和協(xié)議體進行解碼。不過本篇不講解如何解碼。有興趣的可以翻翻我以前的文章,有講到關于解碼的邏輯。接下來就開始講解服務端收到請求后的邏輯。

處理過程

假設遠程通信的實現(xiàn)還是用netty4,解碼器將數(shù)據(jù)包解析成 Request 對象后,NettyHandler 的 messageReceived 方法緊接著會收到這個對象,所以第一步就是NettyServerHandler的channelRead。

(一)NettyServerHandler的channelRead

可以參考《dubbo源碼解析(十七)遠程通信——Netty4》的(三)NettyServerHandler

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 看是否在緩存中命中,如果沒有命中,則創(chuàng)建NettyChannel并且緩存。
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    try {
        // 接受消息
        handler.received(channel, msg);
    } finally {
        // 如果通道不活躍或者斷掉,則從緩存中清除
        NettyChannel.removeChannelIfDisconnected(ctx.channel());
    }
}

NettyServerHandler是基于netty4實現(xiàn)的服務端通道處理實現(xiàn)類,而該方法就是用來接收請求,接下來就是執(zhí)行AbstractPeer的received。

(二)AbstractPeer的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(一)AbstractPeer

public void received(Channel ch, Object msg) throws RemotingException {
    // 如果通道已經(jīng)關閉,則直接返回
    if (closed) {
        return;
    }
    handler.received(ch, msg);
}

該方法比較簡單,之前也講過AbstractPeer類就做了裝飾模式中裝飾角色,只是維護了通道的正在關閉和關閉完成兩個狀態(tài)。然后到了MultiMessageHandler的received

(三)MultiMessageHandler的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(八)MultiMessageHandler

public void received(Channel channel, Object message) throws RemotingException {
    // 如果消息是MultiMessage類型的,也就是多消息類型
    if (message instanceof MultiMessage) {
        // 強制轉化為MultiMessage
        MultiMessage list = (MultiMessage) message;
        // 把各個消息進行發(fā)送
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        // 直接發(fā)送
        handler.received(channel, message);
    }
}

該方法也比較簡單,就是對于多消息的處理。

(四)HeartbeatHandler的received

可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(二十)HeartbeatHandler。其中就是對心跳事件做了處理。如果不是心跳請求,那么接下去走到AllChannelHandler的received。

(五)AllChannelHandler的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(十一)AllChannelHandler。該類處理的是連接、斷開連接、捕獲異常以及接收到的所有消息都分發(fā)到線程池。所以這里的received方法就是把請求分發(fā)到線程池,讓線程池去執(zhí)行該請求。

還記得我在之前文章里面講到到Dispatcher接口嗎,它是一個線程派發(fā)器。分別有五個實現(xiàn):

Dispatcher實現(xiàn)類 對應的handler 用途
AllDispatcher AllChannelHandler 所有消息都派發(fā)到線程池,包括請求,響應,連接事件,斷開事件等
ConnectionOrderedDispatcher ConnectionOrderedChannelHandler 在 IO 線程上,將連接和斷開事件放入隊列,有序逐個執(zhí)行,其它消息派發(fā)到線程池
DirectDispatcher 所有消息都不派發(fā)到線程池,全部在 IO 線程上直接執(zhí)行
ExecutionDispatcher ExecutionChannelHandler 只有請求消息派發(fā)到線程池,不含響應。其它消息均在 IO 線程上執(zhí)行
MessageOnlyDispatcher MessageOnlyChannelHandler 只有請求和響應消息派發(fā)到線程池,其它消息均在 IO 線程上執(zhí)行

這些Dispatcher的實現(xiàn)類以及對應的Handler都可以在《dubbo源碼解析(九)遠程通信——Transport層》中查看相關實現(xiàn)。dubbo默認all為派發(fā)策略。所以我在這里講了AllChannelHandler的received。把消息送到線程池后,可以看到首先會創(chuàng)建一個ChannelEventRunnable實體。那么接下來就是線程接收并且執(zhí)行任務了。

(六)ChannelEventRunnable的run

ChannelEventRunnable實現(xiàn)了Runnable接口,主要是用來接收消息事件,并且根據(jù)事件的種類來分別執(zhí)行不同的操作。來看看它的run方法:

public void run() {
    // 如果是接收的消息
    if (state == ChannelState.RECEIVED) {
        try {
            // 直接調(diào)用下一個received
            handler.received(channel, message);
        } catch (Exception e) {
            logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                    + ", message is " + message, e);
        }
    } else {
        switch (state) {
            //如果是連接事件請求
        case CONNECTED:
            try {
                // 執(zhí)行連接
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 如果是斷開連接事件請求
        case DISCONNECTED:
            try {
                // 執(zhí)行斷開連接
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
            // 如果是發(fā)送消息
        case SENT:
            try {
                // 執(zhí)行發(fā)送消息
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
            // 如果是異常
        case CAUGHT:
            try {
                // 執(zhí)行異常捕獲
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}

可以看到把消息分為了幾種類別,因為請求和響應消息出現(xiàn)頻率明顯比其他類型消息高,也就是RECEIVED,所以多帶帶先做處理,根據(jù)不同的類型的消息,會被執(zhí)行不同的邏輯,我們這里主要看state為RECEIVED的,那么如果是RECEIVED,則會執(zhí)行下一個received方法。

(七)DecodeHandler的received

可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(七)DecodeHandler。可以看到received方法中根據(jù)消息的類型進行不同的解碼。而DecodeHandler 存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發(fā)到HeaderExchangeHandler的received。

(八)HeaderExchangeHandler的received
public void received(Channel channel, Object message) throws RemotingException {
    // 設置接收到消息的時間戳
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    // 獲得通道
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        // 如果消息是Request類型
        if (message instanceof Request) {
            // handle request.
            // 強制轉化為Request
            Request request = (Request) message;
            // 如果該請求是事件心跳事件或者只讀事件
            if (request.isEvent()) {
                // 執(zhí)行事件
                handlerEvent(channel, request);
            } else {
                // 如果是正常的調(diào)用請求,且需要響應
                if (request.isTwoWay()) {
                    // 處理請求
                    handleRequest(exchangeChannel, request);
                } else {
                    // 如果不需要響應,則繼續(xù)下一步
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
            // 處理響應
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            // 如果是telnet相關的請求
            if (isClientSide(channel)) {
                // 如果是客戶端側,則直接拋出異常,因為客戶端側不支持telnet
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                // 如果是服務端側,則執(zhí)行telnet命令
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            // 如果都不是,則繼續(xù)下一步
            handler.received(exchangeChannel, message);
        }
    } finally {
        // 移除關閉或者不活躍的通道
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

該方法中就對消息進行了細分,事件請求、正常的調(diào)用請求、響應、telnet命令請求等,并且針對不同的消息類型做了不同的邏輯調(diào)用。我們這里主要看正常的調(diào)用請求。見下一步。

(九)HeaderExchangeHandler的handleRequest
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 創(chuàng)建一個Response實例
    Response res = new Response(req.getId(), req.getVersion());
    // 如果請求被破壞了
    if (req.isBroken()) {
        // 獲得請求的數(shù)據(jù)包
        Object data = req.getData();

        String msg;
        // 如果數(shù)據(jù)為空
        if (data == null) {
            //消息設置為空
            msg = null;
            // 如果在這之前已經(jīng)出現(xiàn)異常,也就是數(shù)據(jù)為Throwable類型
        } else if (data instanceof Throwable) {
            // 響應消息把異常信息返回
            msg = StringUtils.toString((Throwable) data);
        } else {
            // 返回請求數(shù)據(jù)
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg);
        // 設置錯誤請求的狀態(tài)碼
        res.setStatus(Response.BAD_REQUEST);

        // 發(fā)送該消息
        channel.send(res);
        return;
    }
    // find handler by message class.
    // 獲得請求數(shù)據(jù) 也就是 RpcInvocation 對象
    Object msg = req.getData();
    try {
        // 繼續(xù)向下調(diào)用 返回一個future
        CompletionStage future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            try {
                if (t == null) {
                    //設置調(diào)用結果狀態(tài)為成功
                    res.setStatus(Response.OK);
                    // 把結果放入響應
                    res.setResult(appResult);
                } else {
                    // 如果服務調(diào)用有異常,則設置結果狀態(tài)碼為服務錯誤
                    res.setStatus(Response.SERVICE_ERROR);
                    // 把報錯信息放到響應中
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 發(fā)送該響應
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            } finally {
                // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        });
    } catch (Throwable e) {
        // 如果在執(zhí)行中拋出異常,則也算服務異常
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

該方法是處理正常的調(diào)用請求,主要做了正常、異常調(diào)用的情況處理,并且加入了狀態(tài)碼,然后發(fā)送執(zhí)行后的結果給客戶端。接下來看下一步。

(十)DubboProtocol的requestHandler實例的reply

這里我默認是使用dubbo協(xié)議,所以執(zhí)行的是DubboProtocol的requestHandler的reply方法??梢詤⒖肌禿ubbo源碼解析(二十四)遠程調(diào)用——dubbo協(xié)議》的(三)DubboProtocol

public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {

    // 如果請求消息不屬于會話域,則拋出異常
    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
                + (message == null ? null : (message.getClass().getName() + ": " + message))
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    //強制類型轉化
    Invocation inv = (Invocation) message;
    // 獲得暴露的服務invoker
    Invoker invoker = getInvoker(channel, inv);
    // need to consider backward-compatibility if it"s a callback
    // 如果是回調(diào)服務
    if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
        // 獲得 方法定義
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        // 如果只有一個方法定義
        if (methodsStr == null || !methodsStr.contains(",")) {
            // 設置會話域中是否有一致的方法定義標志
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            // 分割不同的方法
            String[] methods = methodsStr.split(",");
            // 如果方法不止一個,則分割后遍歷查詢,找到了則設置為true
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        // 如果沒有該方法,則打印告警日志
        if (!hasMethod) {
            logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                    + " not found in callback service interface ,invoke will be ignored."
                    + " please update the api interface. url is:"
                    + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }
    // 設置遠程地址
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    // 調(diào)用下一個調(diào)用鏈
    Result result = invoker.invoke(inv);
    //  返回CompletableFuture
    return result.completionFuture().thenApply(Function.identity());
}

上述代碼有些變化,是最新的代碼,加入了CompletableFuture,這個我會在后續(xù)的異步化改造中講到。這里主要關注的是又開始跟客戶端發(fā)請求一樣執(zhí)行invoke調(diào)用鏈了。

(十一)ProtocolFilterWrapper的CallbackRegistrationInvoker的invoke

可以直接參考《dubbo源碼解析(四十六)消費端發(fā)送請求過程》的(六)ProtocolFilterWrapper的內(nèi)部類CallbackRegistrationInvoker的invoke。

(十二)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。

可以直接參考《dubbo源碼解析(四十六)消費端發(fā)送請求過程》的(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。

(十三)EchoFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調(diào)用——Filter》的(八)EchoFilter

public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果調(diào)用的方法是回聲測試的方法 則直接返回結果,否則 調(diào)用下一個調(diào)用鏈
    if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
        // 創(chuàng)建一個默認的AsyncRpcResult返回
        return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
    }
    return invoker.invoke(inv);
}

該過濾器就是對回聲測試的調(diào)用進行攔截,上述源碼跟連接中源碼唯一區(qū)別就是改為了AsyncRpcResult。

(十四)ClassLoaderFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調(diào)用——Filter》的(三)ClassLoaderFilter,用來做類加載器的切換。

(十五)GenericFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調(diào)用——Filter》的(十一)GenericFilter。

public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果是泛化調(diào)用
    if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !GenericService.class.isAssignableFrom(invoker.getInterface())) {
        // 獲得請求名字
        String name = ((String) inv.getArguments()[0]).trim();
        // 獲得請求參數(shù)類型
        String[] types = (String[]) inv.getArguments()[1];
        // 獲得請求參數(shù)
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            // 獲得方法
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            // 獲得該方法的參數(shù)類型
            Class[] params = method.getParameterTypes();
            if (args == null) {
                args = new Object[params.length];
            }
            // 獲得附加值
            String generic = inv.getAttachment(GENERIC_KEY);

            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 如果附加值為空,在用上下文攜帶的附加值
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                // 直接進行類型轉化
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
                            // 使用nativejava方式反序列化
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_NATIVE_JAVA +
                                        "] only support message type " +
                                        byte[].class +
                                        " and your message type is " +
                                        args[i].getClass());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        // 用JavaBean方式反序列化
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        GENERIC_SERIALIZATION_BEAN +
                                        "] only support message type " +
                                        JavaBeanDescriptor.class.getName() +
                                        " and your message type is " +
                                        args[i].getClass().getName());
                    }
                }
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                // as proto3 only accept one protobuf parameter
                if (args.length == 1 && args[0] instanceof String) {
                    try (UnsafeByteArrayInputStream is =
                                 new UnsafeByteArrayInputStream(((String) args[0]).getBytes())) {
                        // 用protobuf-json進行反序列化
                        args[0] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                .getExtension("" + GENERIC_SERIALIZATION_PROTOBUF)
                                .deserialize(null, is).readObject(method.getParameterTypes()[0]);
                    } catch (Exception e) {
                        throw new RpcException("Deserialize argument failed.", e);
                    }
                } else {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_PROTOBUF +
                                    "] only support one" + String.class.getName() +
                                    " argument and your message size is " +
                                    args.length + " and type is" +
                                    args[0].getClass().getName());
                }
            }
            return invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    return invoker.invoke(inv);
}

static class GenericListener implements Listener {

    @Override
    public void onResponse(Result appResponse, Invoker invoker, Invocation inv) {
        // 如果是泛化調(diào)用
        if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
                && inv.getArguments() != null
                && inv.getArguments().length == 3
                && !GenericService.class.isAssignableFrom(invoker.getInterface())) {

            // 獲得序列化方式
            String generic = inv.getAttachment(GENERIC_KEY);
            // 如果為空,默認獲取會話域中的配置
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
            }

            // 如果回調(diào)有異常,直接設置異常
            if (appResponse.hasException() && !(appResponse.getException() instanceof GenericException)) {
                appResponse.setException(new GenericException(appResponse.getException()));
            }
            // 如果是native java形式序列化
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 使用native java形式序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA).serialize(null, os).writeObject(appResponse.getValue());
                    // 加入結果
                    appResponse.setValue(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException(
                            "Generic serialization [" +
                                    GENERIC_SERIALIZATION_NATIVE_JAVA +
                                    "] serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                // 用JavaBean方式序列化
                appResponse.setValue(JavaBeanSerializeUtil.serialize(appResponse.getValue(), JavaBeanAccessor.METHOD));
            } else if (ProtocolUtils.isProtobufGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 用protobuf-json進行序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(GENERIC_SERIALIZATION_PROTOBUF)
                            .serialize(null, os).writeObject(appResponse.getValue());
                    appResponse.setValue(os.toString());
                } catch (IOException e) {
                    throw new RpcException("Generic serialization [" +
                            GENERIC_SERIALIZATION_PROTOBUF +
                            "] serialize result failed.", e);
                }
            } else {
                // 直接進行類型轉化并且設置值
                appResponse.setValue(PojoUtils.generalize(appResponse.getValue()));
            }
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {

    }
}

跟連接內(nèi)的代碼對比,第一個就是對過濾器設計發(fā)生了變化,這個我在異步化改造里面會講到,第二個是新增了protobuf-json的泛化序列化方式。

(十六)ContextFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調(diào)用——Filter》的(六)ContextFilter,最新代碼幾乎差不多,除了因為對Filter的設計做了修改以外,還有新增了tag路由的相關邏輯,tag相關部分我會在后續(xù)文章中講解,該類主要是做了初始化rpc上下文。

(十七)TraceFilter的invoke

可以參考《dubbo源碼解析(二十四)遠程調(diào)用——dubbo協(xié)議》的(十三)TraceFilter,該過濾器是增強的功能是通道的跟蹤,會在通道內(nèi)把最大的調(diào)用次數(shù)和現(xiàn)在的調(diào)用數(shù)量放進去。方便使用telnet來跟蹤服務的調(diào)用次數(shù)等。

(十八)TimeoutFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調(diào)用——Filter》的(十三)TimeoutFilter,該過濾器是當服務調(diào)用超時的時候,記錄告警日志。

(十九)MonitorFilter的invoke
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 如果開啟監(jiān)控
    if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
        // 設置開始監(jiān)控時間
        invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        // 對同時在線數(shù)量加1
        getConcurrent(invoker, invocation).incrementAndGet(); // count up
    }
    return invoker.invoke(invocation); // proceed invocation chain
}
class MonitorListener implements Listener {

    @Override
    public void onResponse(Result result, Invoker invoker, Invocation invocation) {
        // 如果開啟監(jiān)控
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監(jiān)控對數(shù)據(jù),并且更新監(jiān)控數(shù)據(jù)
            collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);
            // 同時在線監(jiān)控數(shù)減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    @Override
    public void onError(Throwable t, Invoker invoker, Invocation invocation) {
        if (invoker.getUrl().hasParameter(MONITOR_KEY)) {
            // 收集監(jiān)控對數(shù)據(jù),并且更新監(jiān)控數(shù)據(jù)
            collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);
            // 同時在線監(jiān)控數(shù)減1
            getConcurrent(invoker, invocation).decrementAndGet(); // count down
        }
    }

    private void collect(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        try {
            // 獲得監(jiān)控的url
            URL monitorUrl = invoker.getUrl().getUrlParameter(MONITOR_KEY);
            // 通過該url獲得Monitor實例
            Monitor monitor = monitorFactory.getMonitor(monitorUrl);
            if (monitor == null) {
                return;
            }
            // 創(chuàng)建一個統(tǒng)計的url
            URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
            // 把收集的信息更新并且發(fā)送信息
            monitor.collect(statisticsURL);
        } catch (Throwable t) {
            logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
        }
    }

    private URL createStatisticsUrl(Invoker invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
        // ---- service statistics ----
        // 調(diào)用服務消耗的時間
        long elapsed = System.currentTimeMillis() - start; // invocation cost
        // 獲得同時監(jiān)控的數(shù)量
        int concurrent = getConcurrent(invoker, invocation).get(); // current concurrent count
        String application = invoker.getUrl().getParameter(APPLICATION_KEY);
        // 獲得服務名
        String service = invoker.getInterface().getName(); // service name
        // 獲得調(diào)用的方法名
        String method = RpcUtils.getMethodName(invocation); // method name
        // 獲得組
        String group = invoker.getUrl().getParameter(GROUP_KEY);
        // 獲得版本號
        String version = invoker.getUrl().getParameter(VERSION_KEY);

        int localPort;
        String remoteKey, remoteValue;
        // 如果是消費者端的監(jiān)控
        if (CONSUMER_SIDE.equals(invoker.getUrl().getParameter(SIDE_KEY))) {
            // ---- for service consumer ----
            // 本地端口為0
            localPort = 0;
            // key為provider
            remoteKey = MonitorService.PROVIDER;
            // value為服務ip
            remoteValue = invoker.getUrl().getAddress();
        } else {
            // ---- for service provider ----
            // 端口為服務端口
            localPort = invoker.getUrl().getPort();
            // key為consumer
            remoteKey = MonitorService.CONSUMER;
            // value為遠程地址
            remoteValue = remoteHost;
        }
        String input = "", output = "";
        if (invocation.getAttachment(INPUT_KEY) != null) {
            input = invocation.getAttachment(INPUT_KEY);
        }
        if (result != null && result.getAttachment(OUTPUT_KEY) != null) {
            output = result.getAttachment(OUTPUT_KEY);
        }

        // 返回一個url
        return new URL(COUNT_PROTOCOL, NetUtils.getLocalHost(), localPort, service + PATH_SEPARATOR + method, MonitorService.APPLICATION, application, MonitorService.INTERFACE, service, MonitorService.METHOD, method, remoteKey, remoteValue, error ? MonitorService.FAILURE : MonitorService.SUCCESS, "1", MonitorService.ELAPSED, String.valueOf(elapsed), MonitorService.CONCURRENT, String.valueOf(concurrent), INPUT_KEY, input, OUTPUT_KEY, output, GROUP_KEY, group, VERSION_KEY, version);
    }

}

在invoke里面只是做了記錄開始監(jiān)控的時間以及對同時監(jiān)控的數(shù)量加1操作,當結果回調(diào)時,會對結果數(shù)據(jù)做搜集計算,最后通過監(jiān)控服務記錄和發(fā)送最新信息。

(二十)ExceptionFilter的invoke

可以參考《dubbo源碼解析(二十)遠程調(diào)用——Filter》的(九)ExceptionFilter,該過濾器主要是對異常的處理。

(二十一)InvokerWrapper的invoke

可以參考《dubbo源碼解析(二十二)遠程調(diào)用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過并沒有實現(xiàn)實際的功能增強。

(二十二)DelegateProviderMetaDataInvoker的invoke
public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

該類也是用了裝飾模式,不過該類是invoker和配置中心的適配類,其中也沒有進行實際的功能增強。

(二十三)AbstractProxyInvoker的invoke

可以參考《dubbo源碼解析(二十三)遠程調(diào)用——Proxy》的(二)AbstractProxyInvoker。不過代碼已經(jīng)有更新了,下面貼出最新的代碼。

public Result invoke(Invocation invocation) throws RpcException {
    try {
        // 執(zhí)行下一步
        Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
        // 把返回結果用CompletableFuture包裹
        CompletableFuture future = wrapWithFuture(value, invocation);
        // 創(chuàng)建AsyncRpcResult實例
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
        future.whenComplete((obj, t) -> {
            AppResponse result = new AppResponse();
            // 如果拋出異常
            if (t != null) {
                // 屬于CompletionException異常
                if (t instanceof CompletionException) {
                    // 設置異常信息
                    result.setException(t.getCause());
                } else {
                    // 直接設置異常
                    result.setException(t);
                }
            } else {
                // 如果沒有異常,則把結果放入異步結果內(nèi)
                result.setValue(obj);
            }
            // 完成
            asyncRpcResult.complete(result);
        });
        return asyncRpcResult;
    } catch (InvocationTargetException e) {
        if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
            logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
        }
        return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這里主要是因為異步化改造而出現(xiàn)的代碼變化,我會在異步化改造中講到這部分?,F(xiàn)在主要來看下一步。

(二十四)JavassistProxyFactory的getInvoker方法中匿名類的doInvoke

這里默認代理實現(xiàn)方式是Javassist??梢詤⒖肌禿ubbo源碼解析(二十三)遠程調(diào)用——Proxy》的(六)JavassistProxyFactory。其中Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現(xiàn)類,并實現(xiàn) invokeMethod 方法,該方法最終會根據(jù)調(diào)用信息調(diào)用具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

/** Wrapper0 是在運行時生成的,大家可使用 Arthas 進行反編譯 */
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據(jù)方法名調(diào)用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method "").append(string).append("" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

然后就是直接調(diào)用的是對應的方法了。到此,方法執(zhí)行完成了。

結果返回

可以看到我上述講到的(八)HeaderExchangeHandler的received和(九)HeaderExchangeHandler的handleRequest有好幾處channel.send方法的調(diào)用,也就是當結果返回的返回的時候,會主動發(fā)送執(zhí)行結果給客戶端。當然發(fā)送的時候還是會對結果Response 對象進行編碼,編碼邏輯我就先不在這里闡述。

當客戶端接收到這個返回的消息時候,進行解碼后,識別為Response 對象,將該對象派發(fā)到線程池中,該過程跟服務端接收到調(diào)用請求到邏輯是一樣的,可以參考上述的解析,區(qū)別在于到(八)HeaderExchangeHandler的received方法的時候,執(zhí)行的是handleResponse方法。

(九)HeaderExchangeHandler的handleResponse
static void handleResponse(Channel channel, Response response) throws RemotingException {
    // 如果響應不為空,并且不是心跳事件的響應,則調(diào)用received
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}
(十)DefaultFuture的received

可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(七)DefaultFuture,不過該類的繼承的是CompletableFuture,因為對異步化的改造,該類已經(jīng)做了一些變化。

public static void received(Channel channel, Response response) {
    received(channel, response, false);
}

public static void received(Channel channel, Response response, boolean timeout) {
    try {
        // future集合中移除該請求的future,(響應id和請求id一一對應的)
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            //獲得超時
            Timeout t = future.timeoutCheckTask;
            // 如果沒有超時,則取消timeoutCheckTask
            if (!timeout) {
                // decrease Time
                t.cancel();
            }
            // 接收響應結果
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        // 通道集合移除該請求對應的通道,代表著這一次請求結束
        CHANNELS.remove(response.getId());
    }
}

該方法中主要是對超時的處理,還有對應的請求和響應的匹配,也就是返回相應id的future。

(十一)DefaultFuture的doReceived

可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(七)DefaultFuture,不過因為運用了CompletableFuture,所以該方法完全重寫了。這部分的改造我也會在異步化改造中講述到。

private void doReceived(Response res) {
    // 如果結果為空,則拋出異常
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // 如果結果的狀態(tài)碼為ok
    if (res.getStatus() == Response.OK) {
        // 則future調(diào)用完成
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        // 如果超時,則返回一個超時異常
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        // 否則返回一個RemotingException
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}

隨后用戶線程即可從 DefaultFuture 實例中獲取到相應結果。

后記

該文章講解了dubbo調(diào)服務端接收到請求后一系列處理的所有步驟以及服務端怎么把結果發(fā)送給客戶端,是目前最新代碼的解析。因為里面涉及到很多流程,所以可以自己debug一步一步看一看整個過程。可以看到本文涉及到異步化改造已經(jīng)很多了,這也是我在講解異步化改造前想先講解這些過程的原因。只有弄清楚這些過程,才能更加理解對于異步化改造的意義。下一篇文將講解異步化改造。

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://systransis.cn/yun/74869.html

相關文章

  • dubbo源碼解析四十八)異步化改造

    摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評論0 收藏0
  • dubbo源碼解析四十六)消費發(fā)送請求過程

    摘要:可以參考源碼解析二十四遠程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發(fā)送請求過程 目標:從源碼的角度分析一個服務方法調(diào)用經(jīng)歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創(chuàng)建出一個代理。供消費者調(diào)用服務的相關方法。...

    fish 評論0 收藏0
  • dubbo源碼解析四十四)服務暴露過程

    摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經(jīng)沒有了。將泛化調(diào)用版本號或者等信息加入獲得服務暴露地址和端口號,利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0
  • dubbo源碼解析四十三)2.7新特性

    摘要:大揭秘目標了解的新特性,以及版本升級的引導。四元數(shù)據(jù)改造我們知道以前的版本只有注冊中心,注冊中心的有數(shù)十個的鍵值對,包含了一個服務所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • dubbo源碼解析四十五)服務引用過程

    摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調(diào)用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...

    xiaowugui666 評論0 收藏0

發(fā)表評論

0條評論

yzzz

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<