成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

dubbo源碼解析(九)遠(yuǎn)程通信——Transport層

Magicer / 3005人閱讀

摘要:層也就是網(wǎng)絡(luò)傳輸層,在遠(yuǎn)程通信中必然會(huì)涉及到傳輸。值為,不等待消息發(fā)出,將消息放入隊(duì)列,即刻返回。三該類(lèi)繼承了并且實(shí)現(xiàn)接口,是服務(wù)器抽象類(lèi)。八該類(lèi)是多消息處理器的抽象類(lèi)。創(chuàng)建線(xiàn)程池設(shè)置組件的獲得實(shí)例把線(xiàn)程池放到

遠(yuǎn)程通訊——Transport層
目標(biāo):介紹Transport層的相關(guān)設(shè)計(jì)和邏輯、介紹dubbo-remoting-api中的transport包內(nèi)的源碼解析。
前言

先預(yù)警一下,該文篇幅會(huì)很長(zhǎng),做好心理準(zhǔn)備。Transport層也就是網(wǎng)絡(luò)傳輸層,在遠(yuǎn)程通信中必然會(huì)涉及到傳輸。它在dubbo 的框架設(shè)計(jì)中也處于倒數(shù)第二層,當(dāng)然最底層是序列化,這個(gè)后面介紹。官方文檔對(duì)Transport層的解釋是抽象 mina 和 netty 為統(tǒng)一接口,以 Message 為中心,擴(kuò)展接口為 Channel、Transporter、Client、Server、Codec。那我們現(xiàn)在先來(lái)看這個(gè)包下面的類(lèi)圖:

可以看到有四個(gè)包繼承了AbstractChannel、AbstractServer、AbstractClient。也就是說(shuō)現(xiàn)在Transport層是抽象mina、netty以及grizzly為統(tǒng)一接口??赐觐?lèi)圖,再來(lái)看看包結(jié)構(gòu):

下面的講解大致會(huì)按照類(lèi)圖中類(lèi)的順序往下講,盡量把client、server、channel、codec、dispacher五部分涉及到的內(nèi)容一起講解。

源碼解析 (一)AbstractPeer
public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    private final ChannelHandler handler;

    private volatile URL url;
    /**
     * 是否正在關(guān)閉
     */
    // closing closed means the process is being closed and close is finished
    private volatile boolean closing;
    /**
     * 是否關(guān)閉完成
     */
    private volatile boolean closed;

    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }
}

該類(lèi)實(shí)現(xiàn)了Endpoint和ChannelHandler兩個(gè)接口,要關(guān)注的兩個(gè)點(diǎn):

實(shí)現(xiàn)ChannelHandler接口并且有在屬性中還有一個(gè)handler,下面很多實(shí)現(xiàn)方法也是直接調(diào)用了handler方法,這種模式叫做裝飾模式,這樣做可以對(duì)裝飾對(duì)象靈活的增強(qiáng)功能。對(duì)裝飾模式不懂的朋友可以google一下。有很多例子介紹。

在該類(lèi)中有closing和closed屬性,在Endpoint中有很多關(guān)于關(guān)閉通道的操作,會(huì)有關(guān)閉中和關(guān)閉完成的狀態(tài)區(qū)分,在該類(lèi)中就緩存了這兩個(gè)屬性來(lái)判斷關(guān)閉的狀態(tài)。

下面我就介紹該類(lèi)中的send方法,其他方法比較好理解,到時(shí)候可以直接看源碼:

@Override
public void send(Object message) throws RemotingException {
    // url中sent的配置項(xiàng)
    send(message, url.getParameter(Constants.SENT_KEY, false));
}

該配置項(xiàng)是選擇是否等待消息發(fā)出:

sent值為true,等待消息發(fā)出,消息發(fā)送失敗將拋出異常。

sent值為false,不等待消息發(fā)出,將消息放入 IO 隊(duì)列,即刻返回。

對(duì)該類(lèi)還有點(diǎn)糊涂的朋友,記住在ChannelHandler接口,該類(lèi)就做了裝飾模式中裝飾角色,在Endpoint接口,只是維護(hù)了通道的正在關(guān)閉和關(guān)閉完成兩個(gè)狀態(tài)。

(二)AbstractEndpoint
public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    /**
     * 日志記錄
     */
    private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);

    /**
     * 編解碼器
     */
    private Codec2 codec;

    /**
     * 超時(shí)時(shí)間
     */
    private int timeout;

    /**
     * 連接超時(shí)時(shí)間
     */
    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = getChannelCodec(url);
        // 優(yōu)先從url配置中取,如果沒(méi)有,默認(rèn)為1s
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 優(yōu)先從url配置中取,如果沒(méi)有,默認(rèn)為3s
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }

    /**
     * 從url中獲得編解碼器的配置,并且返回該實(shí)例
     * @param url
     * @return
     */
    protected static Codec2 getChannelCodec(URL url) {
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        // 優(yōu)先從Codec2的擴(kuò)展類(lèi)中找
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }

}

該類(lèi)是端點(diǎn)的抽象類(lèi),其中封裝了編解碼器以及兩個(gè)超時(shí)時(shí)間?;赿ubbo 的SPI機(jī)制,獲得相應(yīng)的編解碼器實(shí)現(xiàn)對(duì)象,編解碼器優(yōu)先從Codec2的擴(kuò)展類(lèi)中尋找。

下面來(lái)看看該類(lèi)中的reset方法:

@Override
public void reset(URL url) {
    if (isClosed()) {
        throw new IllegalStateException("Failed to reset parameters "
                + url + ", cause: Channel closed. channel: " + getLocalAddress());
    }
    try {
        // 判斷重置的url中有沒(méi)有攜帶timeout,有的話(huà)重置
        if (url.hasParameter(Constants.TIMEOUT_KEY)) {
            int t = url.getParameter(Constants.TIMEOUT_KEY, 0);
            if (t > 0) {
                this.timeout = t;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 判斷重置的url中有沒(méi)有攜帶connect.timeout,有的話(huà)重置
        if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) {
            int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0);
            if (t > 0) {
                this.connectTimeout = t;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 判斷重置的url中有沒(méi)有攜帶codec,有的話(huà)重置
        if (url.hasParameter(Constants.CODEC_KEY)) {
            this.codec = getChannelCodec(url);
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
}

@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters) {
    reset(getUrl().addParameters(parameters.getParameters()));
}

這個(gè)方法是Resetable接口中的方法,可以看到以前的reset實(shí)現(xiàn)方法都加上了@Deprecated注解,不推薦使用了,因?yàn)檫@種實(shí)現(xiàn)方式重置太復(fù)雜,需要把所有參數(shù)都設(shè)置一遍,比如我只想重置一個(gè)超時(shí)時(shí)間,但是其他值不變,如果用以前的reset,我需要在url中把所有值都帶上,就會(huì)很多余?,F(xiàn)在用新的reset,每次只關(guān)心我需要重置的值,只更改為需要重置的值。比如上面的代碼所示,只想修改超時(shí)時(shí)間,那我就只在url中攜帶超時(shí)時(shí)間的參數(shù)。

(三)AbstractServer

該類(lèi)繼承了AbstractEndpoint并且實(shí)現(xiàn)Server接口,是服務(wù)器抽象類(lèi)。重點(diǎn)實(shí)現(xiàn)了服務(wù)器的公共邏輯,比如發(fā)送消息,關(guān)閉通道,連接通道,斷開(kāi)連接等。并且抽象了打開(kāi)和關(guān)閉服務(wù)器兩個(gè)方法。

1.屬性
/**
 * 服務(wù)器線(xiàn)程名稱(chēng)
 */
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class);
/**
 * 線(xiàn)程池
 */
ExecutorService executor;
/**
 * 服務(wù)地址,也就是本地地址
 */
private InetSocketAddress localAddress;
/**
 * 綁定地址
 */
private InetSocketAddress bindAddress;
/**
 * 最大可接受的連接數(shù)
 */
private int accepts;
/**
 * 空閑超時(shí)時(shí)間,單位是s
 */
private int idleTimeout = 600; //600 seconds

該類(lèi)的屬性比較好理解,就是稍微注意一下idleTimeout的單位是s。

2.構(gòu)造函數(shù)
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    // 從url中獲得本地地址
    localAddress = getUrl().toInetSocketAddress();

    // 從url配置中獲得綁定的ip
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    // 從url配置中獲得綁定的端口號(hào)
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    // 判斷url中配置anyhost是否為true或者判斷host是否為不可用的本地Host
    if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = NetUtils.ANYHOST;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 從url中獲取配置,默認(rèn)值為0
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    // 從url中獲取配置,默認(rèn)600s
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
    try {
        // 開(kāi)啟服務(wù)器
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    // 獲得線(xiàn)程池
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

構(gòu)造函數(shù)大部分邏輯就是從url中取配置,存到緩存中,并且做了開(kāi)啟服務(wù)器的操作。具體的看上面的注釋?zhuān)€是比較清晰的。

3.reset方法
@Override
public void reset(URL url) {
    if (url == null) {
        return;
    }
    try {
        // 重置accepts的值
        if (url.hasParameter(Constants.ACCEPTS_KEY)) {
            int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
            if (a > 0) {
                this.accepts = a;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 重置idle.timeout的值
        if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
            int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
            if (t > 0) {
                this.idleTimeout = t;
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    try {
        // 重置線(xiàn)程數(shù)配置
        if (url.hasParameter(Constants.THREADS_KEY)
                && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
            // 獲得url配置中的線(xiàn)程數(shù)
            int threads = url.getParameter(Constants.THREADS_KEY, 0);
            // 獲得線(xiàn)程池允許的最大線(xiàn)程數(shù)
            int max = threadPoolExecutor.getMaximumPoolSize();
            // 返回核心線(xiàn)程數(shù)
            int core = threadPoolExecutor.getCorePoolSize();
            // 設(shè)置最大線(xiàn)程數(shù)和核心線(xiàn)程數(shù)
            if (threads > 0 && (threads != max || threads != core)) {
                if (threads < core) {
                    // 如果設(shè)置的線(xiàn)程數(shù)比核心線(xiàn)程數(shù)少,則直接設(shè)置核心線(xiàn)程數(shù)
                    threadPoolExecutor.setCorePoolSize(threads);
                    if (core == max) {
                        // 當(dāng)核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)相等的時(shí)候,把最大線(xiàn)程數(shù)也重置
                        threadPoolExecutor.setMaximumPoolSize(threads);
                    }
                } else {
                    // 當(dāng)大于核心線(xiàn)程數(shù)時(shí),直接設(shè)置最大線(xiàn)程數(shù)
                    threadPoolExecutor.setMaximumPoolSize(threads);
                    // 只有當(dāng)核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)相等的時(shí)候才設(shè)置核心線(xiàn)程數(shù)
                    if (core == max) {
                        threadPoolExecutor.setCorePoolSize(threads);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
    // 重置url
    super.setUrl(getUrl().addParameters(url.getParameters()));
}

該類(lèi)中的reset方法做了三個(gè)值的重置,分別是最大可連接的客戶(hù)端數(shù)量、空閑超時(shí)時(shí)間以及線(xiàn)程池的兩個(gè)配置參數(shù)。其中要注意核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)的區(qū)別。舉個(gè)例子,核心線(xiàn)程數(shù)就像是工廠(chǎng)正式工,最大線(xiàn)程數(shù),就是工廠(chǎng)臨時(shí)工作量加大,請(qǐng)了一批臨時(shí)工,臨時(shí)工加正式工的和就是最大線(xiàn)程數(shù),等這批任務(wù)結(jié)束后,臨時(shí)工要辭退的,而正式工會(huì)留下。

還有send、close、connected、disconnected等方法比較簡(jiǎn)單,如果有興趣,可以到我的GitHub查看,地址文章末尾會(huì)給出。

(四)AbstractClient

該類(lèi)是客戶(hù)端的抽象類(lèi),繼承了AbstractEndpoint類(lèi),實(shí)現(xiàn)了Client接口,該類(lèi)中也是做了客戶(hù)端公用的重連邏輯,抽象了打開(kāi)客戶(hù)端、關(guān)閉客戶(hù)端、連接服務(wù)器、斷開(kāi)服務(wù)器連接以及獲得通道方法,讓子類(lèi)去重點(diǎn)關(guān)注這幾個(gè)方法。

1.屬性
/**
 * 客戶(hù)端線(xiàn)程名稱(chēng)
 */
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
/**
 * 線(xiàn)程池id
 */
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
/**
 * 重連定時(shí)任務(wù)執(zhí)行器
 */
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
/**
 * 連接鎖
 */
private final Lock connectLock = new ReentrantLock();
/**
 * 發(fā)送消息時(shí),若斷開(kāi),是否重連
 */
private final boolean send_reconnect;
/**
 * 重連次數(shù)
 */
private final AtomicInteger reconnect_count = new AtomicInteger(0);
/**
 * 在這之前是否調(diào)用重新連接的錯(cuò)誤日志
 */
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
/**
 * 重連 warning 的間隔.(waring多少次之后,warning一次),也就是錯(cuò)誤多少次后告警一次錯(cuò)誤
 */
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
/**
 * 關(guān)閉超時(shí)時(shí)間
 */
private final long shutdown_timeout;
/**
 * 線(xiàn)程池
 */
protected volatile ExecutorService executor;
/**
 * 重連執(zhí)行任務(wù)
 */
private volatile ScheduledFuture reconnectExecutorFuture = null;
// the last successed connected time
/**
 * 最后成功連接的時(shí)間
 */
private long lastConnectedTime = System.currentTimeMillis();

上述屬性大部分跟重連有關(guān),該類(lèi)最重要的也是封裝了重連的邏輯。

2.構(gòu)造函數(shù)
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);

    // 從url中獲得是否重連的配置,默認(rèn)為false
    send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

    // 從url中獲得關(guān)閉超時(shí)時(shí)間,默認(rèn)為900s
    shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

    // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
    // 重連的默認(rèn)值是2s,重連 warning 的間隔默認(rèn)是1800,當(dāng)出錯(cuò)的時(shí)候,每隔1800*2=3600s報(bào)警一次
    reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

    try {
        // 打開(kāi)客戶(hù)端
        doOpen();
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    try {
        // connect.
        // 連接服務(wù)器
        connect();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (RemotingException t) {
        if (url.getParameter(Constants.CHECK_KEY, true)) {
            close();
            throw t;
        } else {
            logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                    + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
        }
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }

    // 從緩存中獲得線(xiàn)程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    // 清楚線(xiàn)程池緩存
    ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

該構(gòu)造函數(shù)中做了一些屬性值的設(shè)置,并且做了打開(kāi)客戶(hù)端和連接服務(wù)器的操作。

3.wrapChannelHandler
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    // 加入線(xiàn)程名稱(chēng)
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    // 設(shè)置使用的線(xiàn)程池類(lèi)型
    url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
    // 包裝
    return ChannelHandlers.wrap(handler, url);
}

該方法是包裝通道處理器,設(shè)置使用的線(xiàn)程池類(lèi)型是可緩存線(xiàn)程池。

4.initConnectStatusCheckCommand
private synchronized void initConnectStatusCheckCommand() {
    //reconnect=false to close reconnect
    int reconnect = getReconnectParam(getUrl());
    // 有連接頻率的值,并且當(dāng)前沒(méi)有連接任務(wù)
    if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
        Runnable connectStatusCheckCommand = new Runnable() {
            @Override
            public void run() {
                try {
                    if (!isConnected()) {
                        // 重連
                        connect();
                    } else {
                        // 記錄最后一次重連的時(shí)間
                        lastConnectedTime = System.currentTimeMillis();
                    }
                } catch (Throwable t) {
                    String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                    // wait registry sync provider list
                    if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                        // 如果之前沒(méi)有打印過(guò)重連的誤日志
                        if (!reconnect_error_log_flag.get()) {
                            reconnect_error_log_flag.set(true);
                            // 打印日志
                            logger.error(errorMsg, t);
                            return;
                        }
                    }
                    // 如果到達(dá)一次重連日志告警周期,則打印告警日志
                    if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                        logger.warn(errorMsg, t);
                    }
                }
            }
        };
        // 開(kāi)啟重連定時(shí)任務(wù)
        reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
    }
}

該方法是初始化重連線(xiàn)程,其中做了重連失敗后的告警日志和錯(cuò)誤日志打印策略。

5.reconnect
@Override
public void reconnect() throws RemotingException {
    disconnect();
    connect();
}

多帶帶放該方法是因?yàn)檫@是該類(lèi)關(guān)注的重點(diǎn)。實(shí)現(xiàn)了客戶(hù)端的重連邏輯。

6.其他

connect、disconnect、close等方法都是調(diào)用了對(duì)應(yīng)的抽象方法,而具體的邏輯需要看具體的子類(lèi)如何去實(shí)現(xiàn)相關(guān)的抽象方法,這幾個(gè)方法邏輯比較簡(jiǎn)單,我不在這里貼出源碼,有興趣可以看我的GitHub,地址文章末尾會(huì)給出。

(四)AbstractChannel

該類(lèi)是通道的抽象類(lèi),該類(lèi)里面做的邏輯很簡(jiǎn)單,具體的發(fā)送消息邏輯在它 的子類(lèi)中實(shí)現(xiàn)。

@Override
public void send(Object message, boolean sent) throws RemotingException {
    // 檢測(cè)通道是否關(guān)閉
    if (isClosed()) {
        throw new RemotingException(this, "Failed to send message "
                + (message == null ? "" : message.getClass().getName()) + ":" + message
                + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
    }
}

可以看到send方法,其中只做了檢測(cè)通道是否關(guān)閉的狀態(tài)檢測(cè),沒(méi)有實(shí)現(xiàn)具體的發(fā)送消息的邏輯。

(五)ChannelHandlerDelegate

該類(lèi)繼承了ChannelHandler,從它的名字可以看出是ChannelHandler的代表,它就是作為裝飾模式中的Component角色,后面講到的AbstractChannelHandlerDelegate作為裝飾模式中的Decorator角色。

public interface ChannelHandlerDelegate extends ChannelHandler {
    /**
     * 獲得通道
     * @return
     */
    ChannelHandler getHandler();
}
(六)AbstractChannelHandlerDelegate

屬性:

protected ChannelHandler handler

該類(lèi)實(shí)現(xiàn)了ChannelHandlerDelegate接口,并且有一個(gè)屬性是ChannelHandler,上述已經(jīng)說(shuō)到這是裝飾模式中的裝飾角色,其中的所有實(shí)現(xiàn)方法都直接調(diào)用被裝飾的handler屬性的方法。

(七)DecodeHandler

該類(lèi)為解碼處理器,繼承了AbstractChannelHandlerDelegate,對(duì)接收到的消息進(jìn)行解碼,在父類(lèi)處理接收消息的功能上疊加了解碼功能。

我們來(lái)看看received方法:

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 如果是Decodeable類(lèi)型的消息,則對(duì)整個(gè)消息解碼
    if (message instanceof Decodeable) {
        decode(message);
    }

    // 如果是Request請(qǐng)求類(lèi)型消息,則對(duì)請(qǐng)求中對(duì)請(qǐng)求數(shù)據(jù)解碼
    if (message instanceof Request) {
        decode(((Request) message).getData());
    }

    // 如果是Response返回類(lèi)型的消息,則對(duì)返回消息中對(duì)結(jié)果進(jìn)行解碼
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }

    // 繼續(xù)將消息委托給handler,繼續(xù)處理
    handler.received(channel, message);
}

可以看到做了三次判斷,根據(jù)消息的不同會(huì)對(duì)消息的不同數(shù)據(jù)做解碼??梢钥吹?,這里用到裝飾模式后,在處理消息的前面做了解碼的處理,并且還能繼續(xù)委托給handler來(lái)處理消息,通過(guò)組合做到了功能的疊加。

private void decode(Object message) {
    // 如果消息類(lèi)型是Decodeable,進(jìn)一步調(diào)用Decodeable的decode來(lái)解碼
    if (message != null && message instanceof Decodeable) {
        try {
            ((Decodeable) message).decode();
            if (log.isDebugEnabled()) {
                log.debug("Decode decodeable message " + message.getClass().getName());
            }
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Call Decodeable.decode failed: " + e.getMessage(), e);
            }
        } // ~ end of catch
    } // ~ end of if
} // ~ end of method decode

可以看到這是解析消息的邏輯,當(dāng)消息是Decodeable類(lèi)型,還會(huì)繼續(xù)調(diào)用Decodeable的decode方法來(lái)進(jìn)行解析。它的實(shí)現(xiàn)類(lèi)后續(xù)會(huì)講解到。

(八)MultiMessageHandler

該類(lèi)是多消息處理器的抽象類(lèi)。同樣繼承了AbstractChannelHandlerDelegate類(lèi),我們來(lái)看看它的received方法:

@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 當(dāng)消息為多消息時(shí) 循環(huán)交給handler處理接收到當(dāng)消息
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        // 如果是單消息,就直接交給handler處理器
        handler.received(channel, message);
    }
}

邏輯很簡(jiǎn)單,當(dāng)消息是多消息類(lèi)型時(shí),也就是一次性接收到多條消息的情況,循環(huán)去處理消息,當(dāng)消息是單消息時(shí)候,直接交給handler去處理。

(九)WrappedChannelHandler

該類(lèi)跟AbstractChannelHandlerDelegate的作用類(lèi)似,都是裝飾模式中的裝飾角色,其中的所有實(shí)現(xiàn)方法都直接調(diào)用被裝飾的handler屬性的方法,該類(lèi)是為了添加線(xiàn)程池的功能,它的子類(lèi)都是去關(guān)心哪些消息是需要分發(fā)到線(xiàn)程池的,哪些消息直接由I / O線(xiàn)程執(zhí)行,現(xiàn)在版本有四種場(chǎng)景,也就是它的四個(gè)子類(lèi),下面我一一描述。

public WrappedChannelHandler(ChannelHandler handler, URL url) {
    this.handler = handler;
    this.url = url;
    // 創(chuàng)建線(xiàn)程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    // 設(shè)置組件的key
    String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
    if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
        componentKey = Constants.CONSUMER_SIDE;
    }
    // 獲得dataStore實(shí)例
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    // 把線(xiàn)程池放到dataStore中緩存
    dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}

可以看到構(gòu)造方法除了屬性的填充以外,線(xiàn)程池是基于dubbo 的SPI Adaptive機(jī)制創(chuàng)建的,在dataStore中把線(xiàn)程池加進(jìn)去, 該線(xiàn)程池就是AbstractClient 或 AbstractServer 從 DataStore 獲得的線(xiàn)程池。

public ExecutorService getExecutorService() {
    // 首先返回的不是共享線(xiàn)程池,是該類(lèi)的線(xiàn)程池
    ExecutorService cexecutor = executor;
    // 如果該類(lèi)的線(xiàn)程池關(guān)閉或者為空,則返回的是共享線(xiàn)程池
    if (cexecutor == null || cexecutor.isShutdown()) {
        cexecutor = SHARED_EXECUTOR;
    }
    return cexecutor;
}

該方法是獲得線(xiàn)程池的實(shí)例,不過(guò)該類(lèi)里面有兩個(gè)線(xiàn)程池,還加入了一個(gè)共享線(xiàn)程池,共享線(xiàn)程池優(yōu)先級(jí)較低。

(十)ExecutionChannelHandler

該類(lèi)繼承了WrappedChannelHandler,也是增強(qiáng)了功能,處理的是接收請(qǐng)求消息時(shí),把請(qǐng)求消息分發(fā)到線(xiàn)程池,而除了請(qǐng)求消息以外,其他消息類(lèi)型都直接通過(guò)I / O線(xiàn)程直接執(zhí)行。

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 獲得線(xiàn)程池實(shí)例
    ExecutorService cexecutor = getExecutorService();
    // 如果消息是request類(lèi)型,才會(huì)分發(fā)到線(xiàn)程池,其他消息,如響應(yīng),連接,斷開(kāi)連接,心跳將由I / O線(xiàn)程直接執(zhí)行。
    if (message instanceof Request) {
        try {
            // 把請(qǐng)求消息分發(fā)到線(xiàn)程池
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
            // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
            // this scenario from happening, but a better solution should be considered later.
            // 當(dāng)線(xiàn)程池滿(mǎn)了,SERVER_THREADPOOL_EXHAUSTED_ERROR錯(cuò)誤無(wú)法正常返回
            // 因此消費(fèi)者方必須等到超時(shí)。這是一種預(yù)防的臨時(shí)解決方案,所以這里直接返回該錯(cuò)誤
            if (t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort()
                            + ") thread pool is exhausted, detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
        }
    } else {
        // 如果消息不是request類(lèi)型,則直接處理
        handler.received(channel, message);
    }
}

上述就可以都看到對(duì)于請(qǐng)求消息的處理,其中有個(gè)打補(bǔ)丁的方式是當(dāng)線(xiàn)程池滿(mǎn)了的時(shí)候,消費(fèi)者只能等待請(qǐng)求超時(shí),所以這里直接返回線(xiàn)程池滿(mǎn)的錯(cuò)誤。

(十一)AllChannelHandler

該類(lèi)也繼承了WrappedChannelHandler,也是為了增強(qiáng)功能,處理的是連接、斷開(kāi)連接、捕獲異常以及接收到的所有消息都分發(fā)到線(xiàn)程池。

@Override
public void connected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把連接操作分發(fā)到線(xiàn)程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}

@Override
public void disconnected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把斷開(kāi)連接操作分發(fā)到線(xiàn)程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
    }
}

@Override
public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把所有消息分發(fā)到線(xiàn)程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
        //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
        // 這里處理線(xiàn)程池滿(mǎn)的問(wèn)題,只有在請(qǐng)求時(shí)候會(huì)出現(xiàn)。
        //復(fù)線(xiàn)程池已滿(mǎn),拒絕調(diào)用,不返回,并導(dǎo)致使用者等待超時(shí)
       if(message instanceof Request && t instanceof RejectedExecutionException){
          Request request = (Request)message;
          if(request.isTwoWay()){
             String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
             Response response = new Response(request.getId(), request.getVersion());
             response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
             response.setErrorMessage(msg);
             channel.send(response);
             return;
          }
       }
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

@Override
public void caught(Channel channel, Throwable exception) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把捕獲異常作分發(fā)到線(xiàn)程池處理
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    } catch (Throwable t) {
        throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
    }
}

可以看到,所有操作以及消息都分到到線(xiàn)程池中。并且注意操作不同,傳入的狀態(tài)也不同。

(十二)ConnectionOrderedChannelHandler

該類(lèi)也是繼承了WrappedChannelHandler,增強(qiáng)功能,該類(lèi)是把連接、取消連接以及接收到的消息都分發(fā)到線(xiàn)程池,但是不同的是,該類(lèi)自己創(chuàng)建了一個(gè)跟連接相關(guān)的線(xiàn)程池,把連接操作和斷開(kāi)連接操分發(fā)到該線(xiàn)程池,而接收到的消息則分發(fā)到WrappedChannelHandler的線(xiàn)程池中。來(lái)看看具體的實(shí)現(xiàn)。

/**
 * 連接線(xiàn)程池
 */
protected final ThreadPoolExecutor connectionExecutor;
/**
 * 連接隊(duì)列大小限制
 */
private final int queuewarninglimit;

public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
    super(handler, url);
    // 獲得線(xiàn)程名,默認(rèn)是Dubbo
    String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    // 創(chuàng)建連接線(xiàn)程池
    connectionExecutor = new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
            new NamedThreadFactory(threadName, true),
            new AbortPolicyWithReport(threadName, url)
    );  // FIXME There"s no place to release connectionExecutor!
    // 設(shè)置工作隊(duì)列限制,默認(rèn)是1000
    queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}

可以屬性中有一個(gè)連接線(xiàn)程池,看到在構(gòu)造函數(shù)里創(chuàng)建了該線(xiàn)程池,而queuewarninglimit是用來(lái)限制連接線(xiàn)程池的工作隊(duì)列長(zhǎng)度,比較簡(jiǎn)單。來(lái)看看連接和斷開(kāi)連接到邏輯。

@Override
public void connected(Channel channel) throws RemotingException {
    try {
        // 核對(duì)工作隊(duì)列長(zhǎng)度
        checkQueueLength();
        // 分發(fā)連接操作
        connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
    }
}

@Override
public void disconnected(Channel channel) throws RemotingException {
    try {
        // 核對(duì)工作隊(duì)列長(zhǎng)度
        checkQueueLength();
        // 分發(fā)斷開(kāi)連接操作
        connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    } catch (Throwable t) {
        throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
    }
}

可以看到,這兩個(gè)操作都是分發(fā)到連接線(xiàn)程池connectionExecutor中,和AllChannelHandle類(lèi)r中的分發(fā)的線(xiàn)程池不是同一個(gè)。而ConnectionOrderedChannelHandler的received方法跟AllChannelHandle一樣,我就不貼出來(lái)。

(十三)MessageOnlyChannelHandler

該類(lèi)也是繼承了WrappedChannelHandler,是WrappedChannelHandler的最后一個(gè)子類(lèi),也是增強(qiáng)功能,不過(guò)該類(lèi)只是處理了所有的消息分發(fā)到線(xiàn)程池。可以看到源碼,比較簡(jiǎn)單:

@Override
public void received(Channel channel, Object message) throws RemotingException {
    // 獲得線(xiàn)程池實(shí)例
    ExecutorService cexecutor = getExecutorService();
    try {
        // 把消息分發(fā)到線(xiàn)程池
        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

下面我講講解五種線(xiàn)程池的調(diào)度策略,也就是我在《dubbo源碼解析(八)遠(yuǎn)程通信——開(kāi)篇》中提到的Dispatcher接口的五種實(shí)現(xiàn),分別是AllDispatcher、DirectDispatcher、MessageOnlyDispatcher、ExecutionDispatcher、ConnectionOrderedDispatcher。

(十四)AllDispatcher
public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 線(xiàn)程池調(diào)度方法:任何消息以及操作都分發(fā)到線(xiàn)程池中
        return new AllChannelHandler(handler, url);
    }

}

對(duì)照著上述講到的AllChannelHandler,是不是很清晰這種線(xiàn)程池的調(diào)度方法。并且該調(diào)度方法是默認(rèn)的調(diào)度方法。

(十五)ConnectionOrderedDispatcher
public class ConnectionOrderedDispatcher implements Dispatcher {

    public static final String NAME = "connection";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 線(xiàn)程池調(diào)度方法:連接、斷開(kāi)連接分發(fā)到到線(xiàn)程池和其他消息分發(fā)到線(xiàn)程池不是同一個(gè)
        return new ConnectionOrderedChannelHandler(handler, url);
    }

}

對(duì)照上述講到的ConnectionOrderedChannelHandler,也很清晰該線(xiàn)程池調(diào)度方法。

(十六)DirectDispatcher
public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 直接處理消息,不分發(fā)到線(xiàn)程池
        return handler;
    }

}

該線(xiàn)程池調(diào)度方法是不調(diào)度線(xiàn)程池,直接執(zhí)行。

(十七)ExecutionDispatcher
public class ExecutionDispatcher implements Dispatcher {

    public static final String NAME = "execution";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 線(xiàn)程池調(diào)度方法:只有請(qǐng)求消息分發(fā)到線(xiàn)程池,其他都直接執(zhí)行
        return new ExecutionChannelHandler(handler, url);
    }

}

對(duì)照著上述的ExecutionChannelHandler講解,也可以很清晰的看出該線(xiàn)程池調(diào)度策略。

(十八)MessageOnlyDispatcher
public class MessageOnlyDispatcher implements Dispatcher {

    public static final String NAME = "message";

    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        // 只要是接收到的消息,都分發(fā)到線(xiàn)程池
        return new MessageOnlyChannelHandler(handler, url);
    }

}

對(duì)照著上述講到的MessageOnlyChannelHandler,可以很清晰該線(xiàn)程池調(diào)度策略。

(十九)ChannelHandlers

該類(lèi)是通道處理器工廠(chǎng),會(huì)對(duì)傳入的handler進(jìn)行一次包裝,無(wú)論是Client還是Server都會(huì)做這樣的處理,也就是做了一些功能上的增強(qiáng),就像上述我說(shuō)到的裝飾模式中的那些功能。

我們來(lái)看看源碼:

public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    // 調(diào)用了多消息處理器,對(duì)心跳消息進(jìn)行了功能加強(qiáng)
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

最關(guān)鍵的是這兩個(gè)方法,看第二個(gè)方法,其實(shí)就是包裝了MultiMessageHandler功能,增加了多消息處理的功能,以及對(duì)心跳消息做了功能增強(qiáng)。

(二十)AbstractCodec

實(shí)現(xiàn) Codec2 接口,,其中實(shí)現(xiàn)了一些編解碼的公共邏輯。

1.checkPayload
protected static void checkPayload(Channel channel, long size) throws IOException {
    // 默認(rèn)長(zhǎng)度
    int payload = Constants.DEFAULT_PAYLOAD;
    if (channel != null && channel.getUrl() != null) {
        // 優(yōu)先從url中獲得消息長(zhǎng)度配置,如果沒(méi)有則用默認(rèn)長(zhǎng)度
        payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
    }
    // 如果消息長(zhǎng)度過(guò)長(zhǎng),則報(bào)錯(cuò)
    if (payload > 0 && size > payload) {
        ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
        logger.error(e);
        throw e;
    }
}

該方法是檢驗(yàn)消息長(zhǎng)度。

2.getSerialization
protected Serialization getSerialization(Channel channel) {
    return CodecSupport.getSerialization(channel.getUrl());
}

該方法是獲得序列化對(duì)象。

3.isClientSide
protected boolean isClientSide(Channel channel) {
    // 獲得是side對(duì)應(yīng)的value
    String side = (String) channel.getAttribute(Constants.SIDE_KEY);
    if ("client".equals(side)) {
        return true;
    } else if ("server".equals(side)) {
        return false;
    } else {
        InetSocketAddress address = channel.getRemoteAddress();
        URL url = channel.getUrl();
        // 判斷url的主機(jī)地址是否和遠(yuǎn)程地址一樣,如果是,則判斷為client,如果不是,則判斷為server
        boolean client = url.getPort() == address.getPort()
                && NetUtils.filterLocalHost(url.getIp()).equals(
                NetUtils.filterLocalHost(address.getAddress()
                        .getHostAddress()));
        // 把value設(shè)置進(jìn)去
        channel.setAttribute(Constants.SIDE_KEY, client ? "client"
                : "server");
        return client;
    }
}

該方法是判斷是否為客戶(hù)端側(cè)的通道。

4.isServerSide
protected boolean isServerSide(Channel channel) {
    return !isClientSide(channel);
}

該方法是判斷是否為服務(wù)端側(cè)的通道。

(二十一)TransportCodec

該類(lèi)是傳輸編解碼器,使用 Serialization 進(jìn)行序列化/反序列化,直接編解碼。關(guān)于序列化為會(huì)在后續(xù)文章中介紹。

@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException {
    // 獲得序列化的 ObjectOutput 對(duì)象
    OutputStream output = new ChannelBufferOutputStream(buffer);
    ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output);
    // 寫(xiě)入 ObjectOutput
    encodeData(channel, objectOutput, message);
    objectOutput.flushBuffer();
    // 釋放
    if (objectOutput instanceof Cleanable) {
        ((Cleanable) objectOutput).cleanup();
    }
}

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // 獲得反序列化的 ObjectInput 對(duì)象
    InputStream input = new ChannelBufferInputStream(buffer);
    ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input);
    // 讀取 ObjectInput
    Object object = decodeData(channel, objectInput);
    // 釋放
    if (objectInput instanceof Cleanable) {
        ((Cleanable) objectInput).cleanup();
    }
    return object;
}

該類(lèi)關(guān)鍵方法就是編碼和解碼,比較好理解,直接進(jìn)行了序列化和反序列化。

(二十二)CodecAdapter

該類(lèi)是Codec 的適配器,用到了適配器模式,把Codec適配成Codec2。將Codec的編碼和解碼方法都適配成Codec2。比如很多時(shí)候都只能用Codec2的編解碼器,但是有的時(shí)候需要用Codec,但是不能滿(mǎn)足導(dǎo)致只能加入適配器來(lái)完成使用。

@Override
public void encode(Channel channel, ChannelBuffer buffer, Object message)
        throws IOException {
    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(1024);
    // 調(diào)用舊的編解碼器的編碼
    codec.encode(channel, os, message);
    buffer.writeBytes(os.toByteArray());
}

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    byte[] bytes = new byte[buffer.readableBytes()];
    int savedReaderIndex = buffer.readerIndex();
    buffer.readBytes(bytes);
    UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes);
    // 調(diào)用舊的編解碼器的解碼
    Object result = codec.decode(channel, is);
    buffer.readerIndex(savedReaderIndex + is.position());
    return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result;
}

可以看到,在編碼和解碼的方法中都調(diào)用了codec的方法。

(二十三)ChannelDelegate、ServerDelegate、ClientDelegate

ChannelDelegate實(shí)現(xiàn)類(lèi)Channel,ServerDelegate實(shí)現(xiàn)了Server,ClientDelegate實(shí)現(xiàn)了Client,都用到了裝飾模式,都作為裝飾模式中的裝飾角色,所以類(lèi)中的所有實(shí)現(xiàn)方法都調(diào)用了屬性的方法。具體代碼就不貼了,朋友們可以自行查看。

(二十四)ChannelHandlerAdapter

該類(lèi)實(shí)現(xiàn)了ChannelHandler接口,是通道處理器適配類(lèi),該類(lèi)中所有實(shí)現(xiàn)方法都是空的,所有想實(shí)現(xiàn)ChannelHandler接口的類(lèi)可以直接繼承該類(lèi),選擇需要實(shí)現(xiàn)的方法進(jìn)行實(shí)現(xiàn),不需要實(shí)現(xiàn)ChannelHandler接口中所有方法。

(二十五)ChannelHandlerDispatcher

該類(lèi)是通道處理器調(diào)度器,其中緩存了所有通道處理器,有一個(gè)通道處理器集合。并且每個(gè)操作都會(huì)去遍歷該集合,執(zhí)行相應(yīng)的操作,例如:

@Override
public void connected(Channel channel) {
    // 遍歷通道處理器集合
    for (ChannelHandler listener : channelHandlers) {
        try {
            // 連接
            listener.connected(channel);
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}
(二十六)CodecSupport

該類(lèi)是編解碼工具類(lèi),提供查詢(xún) Serialization 的功能。

/**
 * 序列化對(duì)象集合 key為序列化類(lèi)型編號(hào)
 */
private static Map ID_SERIALIZATION_MAP = new HashMap();
/**
 * 序列化擴(kuò)展名集合 key為序列化類(lèi)型編號(hào) value為序列化擴(kuò)展名
 */
private static Map ID_SERIALIZATIONNAME_MAP = new HashMap();

static {
    // 利用dubbo 的SPI機(jī)制獲得序列化擴(kuò)展名
    Set supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class).getSupportedExtensions();
    for (String name : supportedExtensions) {
        // 獲得相應(yīng)擴(kuò)展名的序列化實(shí)現(xiàn)
        Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(name);
        byte idByte = serialization.getContentTypeId();
        if (ID_SERIALIZATION_MAP.containsKey(idByte)) {
            logger.error("Serialization extension " + serialization.getClass().getName()
                    + " has duplicate id to Serialization extension "
                    + ID_SERIALIZATION_MAP.get(idByte).getClass().getName()
                    + ", ignore this Serialization extension");
            continue;
        }
        // 緩存序列化實(shí)現(xiàn)
        ID_SERIALIZATION_MAP.put(idByte, serialization);
        // 緩存序列化編號(hào)和擴(kuò)展名
        ID_SERIALIZATIONNAME_MAP.put(idByte, name);
    }
}

可以看到該類(lèi)中緩存了所有的序列化對(duì)象和序列化擴(kuò)展名??梢詮闹心玫絊erialization。

(二十七)ExceedPayloadLimitException

該類(lèi)是消息長(zhǎng)度限制異常。

public class ExceedPayloadLimitException extends IOException {
    private static final long serialVersionUID = -1112322085391551410L;

    public ExceedPayloadLimitException(String message) {
        super(message);
    }
}
后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了Transport層的相關(guān)設(shè)計(jì)和邏輯、介紹dubbo-remoting-api中的transport包內(nèi)的源碼解,其中關(guān)鍵的是整個(gè)設(shè)計(jì)都在使用裝飾模式,傳輸層中關(guān)鍵的編解碼器以及客戶(hù)端、服務(wù)的、通道的抽象,還有關(guān)鍵的就是線(xiàn)程池的調(diào)度方法,熟悉那五種調(diào)度方法,對(duì)消息的處理。整個(gè)傳輸層核心的消息,很多操作圍繞著消息展開(kāi)。下一篇我會(huì)講解交換層exchange部分。如果我在哪一部分寫(xiě)的不夠到位或者寫(xiě)錯(cuò)了,歡迎給我提意見(jiàn),我的私人微信號(hào)碼:HUA799695226。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72722.html

相關(guān)文章

  • dubbo源碼解析(四十七)服務(wù)端處理請(qǐng)求過(guò)程

    摘要:而存在的意義就是保證請(qǐng)求或響應(yīng)對(duì)象可在線(xiàn)程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。 2.7大揭秘——服務(wù)端處理請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析服務(wù)端接收到請(qǐng)求后的一系列操作,最終把客戶(hù)端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請(qǐng)求的過(guò)程,該篇就要將服務(wù)端處理請(qǐng)求的過(guò)程。也就是當(dāng)服務(wù)端收到請(qǐng)求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請(qǐng)求的時(shí)候已經(jīng)做了編碼,所以我...

    yzzz 評(píng)論0 收藏0
  • dubbo源碼解析(四十六)消費(fèi)端發(fā)送請(qǐng)求過(guò)程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評(píng)論0 收藏0
  • dubbo源碼解析(八)遠(yuǎn)程通信——開(kāi)篇

    摘要:而編碼器是講應(yīng)用程序的數(shù)據(jù)轉(zhuǎn)化為網(wǎng)絡(luò)格式,解碼器則是講網(wǎng)絡(luò)格式轉(zhuǎn)化為應(yīng)用程序,同時(shí)具備這兩種功能的單一組件就叫編解碼器。在中是老的編解碼器接口,而是新的編解碼器接口,并且已經(jīng)用把適配成了。 遠(yuǎn)程通訊——開(kāi)篇 目標(biāo):介紹之后解讀遠(yuǎn)程通訊模塊的內(nèi)容如何編排、介紹dubbo-remoting-api中的包結(jié)構(gòu)設(shè)計(jì)以及最外層的的源碼解析。 前言 服務(wù)治理框架中可以大致分為服務(wù)通信和服務(wù)管理兩個(gè)...

    Faremax 評(píng)論0 收藏0
  • dubbo源碼解析(十)遠(yuǎn)程通信——Exchange

    摘要:和斷開(kāi),處理措施不一樣,會(huì)分別做出重連和關(guān)閉通道的操作。取消定時(shí)器取消大量已排隊(duì)任務(wù),用于回收空間該方法是停止現(xiàn)有心跳,也就是停止定時(shí)器,釋放空間。做到異步處理返回結(jié)果時(shí)能給準(zhǔn)確的返回給對(duì)應(yīng)的請(qǐng)求。 遠(yuǎn)程通訊——Exchange層 目標(biāo):介紹Exchange層的相關(guān)設(shè)計(jì)和邏輯、介紹dubbo-remoting-api中的exchange包內(nèi)的源碼解析。 前言 上一篇文章我講的是dubb...

    cppprimer 評(píng)論0 收藏0
  • dubbo源碼解析(四十四)服務(wù)暴露過(guò)程

    摘要:服務(wù)暴露過(guò)程目標(biāo)從源碼的角度分析服務(wù)暴露過(guò)程。導(dǎo)出服務(wù),包含暴露服務(wù)到本地,和暴露服務(wù)到遠(yuǎn)程兩個(gè)過(guò)程。其中服務(wù)暴露的第八步已經(jīng)沒(méi)有了。將泛化調(diào)用版本號(hào)或者等信息加入獲得服務(wù)暴露地址和端口號(hào),利用內(nèi)數(shù)據(jù)組裝成。 dubbo服務(wù)暴露過(guò)程 目標(biāo):從源碼的角度分析服務(wù)暴露過(guò)程。 前言 本來(lái)這一篇一個(gè)寫(xiě)異步化改造的內(nèi)容,但是最近我一直在想,某一部分的優(yōu)化改造該怎么去撰寫(xiě)才能更加的讓讀者理解。我覺(jué)...

    light 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

Magicer

|高級(jí)講師

TA的文章

閱讀更多
最新活動(dòng)
閱讀需要支付1元查看
<