成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

dubbo源碼解析(二十)遠(yuǎn)程調(diào)用——Filter

cheukyin / 1150人閱讀

摘要:源碼分析一該過濾器是對(duì)記錄日志的過濾器,它所做的工作就是把引用服務(wù)或者暴露服務(wù)的調(diào)用鏈信息寫入到文件中。然后返回,再清空,這樣是因?yàn)楹竺娴恼{(diào)用鏈中的附加值對(duì)前面的調(diào)用鏈?zhǔn)遣豢梢姷摹?/p>

遠(yuǎn)程調(diào)用——Filter
目標(biāo):介紹dubbo-rpc-api中的各種filter過濾器的實(shí)現(xiàn)邏輯。
前言

本文會(huì)介紹在dubbo中的過濾器,先來看看下面的圖:

可以看到紅色圈圈不服,在服務(wù)發(fā)現(xiàn)和服務(wù)引用中都會(huì)進(jìn)行一些過濾器過濾。具體有哪些過濾器,就看下面的介紹。

源碼分析 (一)AccessLogFilter

該過濾器是對(duì)記錄日志的過濾器,它所做的工作就是把引用服務(wù)或者暴露服務(wù)的調(diào)用鏈信息寫入到文件中。日志消息先被放入日志集合,然后加入到日志隊(duì)列,然后被放入到寫入文件到任務(wù)中,最后進(jìn)入文件。

1.屬性
private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);

/**
 * 日志訪問名稱,默認(rèn)的日志訪問名稱
 */
private static final String ACCESS_LOG_KEY = "dubbo.accesslog";

/**
 * 日期格式
 */
private static final String FILE_DATE_FORMAT = "yyyyMMdd";

private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

/**
 * 日志隊(duì)列大小
 */
private static final int LOG_MAX_BUFFER = 5000;

/**
 * 日志輸出的頻率
 */
private static final long LOG_OUTPUT_INTERVAL = 5000;

/**
 * 日志隊(duì)列 key為訪問日志的名稱,value為該日志名稱對(duì)應(yīng)的日志集合
 */
private final ConcurrentMap> logQueue = new ConcurrentHashMap>();

/**
 * 日志線程池
 */
private final ScheduledExecutorService logScheduled = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Dubbo-Access-Log", true));

/**
 * 日志記錄任務(wù)
 */
private volatile ScheduledFuture logFuture = null;

按照我上面講到日志流向,日志先進(jìn)入到是日志隊(duì)列中的日志集合,再進(jìn)入logQueue,在進(jìn)入logFuture,最后落地到文件。

2.init
private void init() {
    // synchronized是一個(gè)重操作消耗性能,所有加上判空
    if (logFuture == null) {
        synchronized (logScheduled) {
            // 為了不重復(fù)初始化
            if (logFuture == null) {
                // 創(chuàng)建日志記錄任務(wù)
                logFuture = logScheduled.scheduleWithFixedDelay(new LogTask(), LOG_OUTPUT_INTERVAL, LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);
            }
        }
    }
}

該方法是初始化方法,就創(chuàng)建了日志記錄任務(wù)。

3.log
private void log(String accesslog, String logmessage) {
    init();
    Set logSet = logQueue.get(accesslog);
    if (logSet == null) {
        logQueue.putIfAbsent(accesslog, new ConcurrentHashSet());
        logSet = logQueue.get(accesslog);
    }
    if (logSet.size() < LOG_MAX_BUFFER) {
        logSet.add(logmessage);
    }
}

該方法是增加日志信息到日志集合中。

4.invoke
@Override
public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    try {
        // 獲得日志名稱
        String accesslog = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY);
        if (ConfigUtils.isNotEmpty(accesslog)) {
            // 獲得rpc上下文
            RpcContext context = RpcContext.getContext();
            // 獲得調(diào)用的接口名稱
            String serviceName = invoker.getInterface().getName();
            // 獲得版本號(hào)
            String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);
            // 獲得組,是消費(fèi)者側(cè)還是生產(chǎn)者側(cè)
            String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);
            StringBuilder sn = new StringBuilder();
            sn.append("[").append(new SimpleDateFormat(MESSAGE_DATE_FORMAT).format(new Date())).append("] ").append(context.getRemoteHost()).append(":").append(context.getRemotePort())
                    .append(" -> ").append(context.getLocalHost()).append(":").append(context.getLocalPort())
                    .append(" - ");
            // 拼接組
            if (null != group && group.length() > 0) {
                sn.append(group).append("/");
            }
            // 拼接服務(wù)名稱
            sn.append(serviceName);
            // 拼接版本號(hào)
            if (null != version && version.length() > 0) {
                sn.append(":").append(version);
            }
            sn.append(" ");
            // 拼接方法名
            sn.append(inv.getMethodName());
            sn.append("(");
            // 拼接參數(shù)類型
            Class[] types = inv.getParameterTypes();
            // 拼接參數(shù)類型
            if (types != null && types.length > 0) {
                boolean first = true;
                for (Class type : types) {
                    if (first) {
                        first = false;
                    } else {
                        sn.append(",");
                    }
                    sn.append(type.getName());
                }
            }
            sn.append(") ");
            // 拼接參數(shù)
            Object[] args = inv.getArguments();
            if (args != null && args.length > 0) {
                sn.append(JSON.toJSONString(args));
            }
            String msg = sn.toString();
            // 如果用默認(rèn)的日志訪問名稱
            if (ConfigUtils.isDefault(accesslog)) {
                LoggerFactory.getLogger(ACCESS_LOG_KEY + "." + invoker.getInterface().getName()).info(msg);
            } else {
                // 把日志加入集合
                log(accesslog, msg);
            }
        }
    } catch (Throwable t) {
        logger.warn("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")", t);
    }
    // 調(diào)用下一個(gè)調(diào)用鏈
    return invoker.invoke(inv);
}

該方法是最重要的方法,從拼接了日志信息,把日志加入到集合,并且調(diào)用下一個(gè)調(diào)用鏈。

4.LogTask
private class LogTask implements Runnable {
    @Override
    public void run() {
        try {
            if (logQueue != null && logQueue.size() > 0) {
                // 遍歷日志隊(duì)列
                for (Map.Entry> entry : logQueue.entrySet()) {
                    try {
                        // 獲得日志名稱
                        String accesslog = entry.getKey();
                        // 獲得日志集合
                        Set logSet = entry.getValue();
                        // 如果文件不存在則創(chuàng)建文件
                        File file = new File(accesslog);
                        File dir = file.getParentFile();
                        if (null != dir && !dir.exists()) {
                            dir.mkdirs();
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("Append log to " + accesslog);
                        }
                        if (file.exists()) {
                            // 獲得現(xiàn)在的時(shí)間
                            String now = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date());
                            // 獲得文件最后一次修改的時(shí)間
                            String last = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date(file.lastModified()));
                            // 如果文件最后一次修改的時(shí)間不等于現(xiàn)在的時(shí)間
                            if (!now.equals(last)) {
                                // 獲得重新生成文件名稱
                                File archive = new File(file.getAbsolutePath() + "." + last);
                                // 因?yàn)槎际莊ile的絕對(duì)路徑,所以沒有進(jìn)行移動(dòng)文件,而是修改文件名
                                file.renameTo(archive);
                            }
                        }
                        // 把日志集合中的日志寫入到文件
                        FileWriter writer = new FileWriter(file, true);
                        try {
                            for (Iterator iterator = logSet.iterator();
                                 iterator.hasNext();
                                 iterator.remove()) {
                                writer.write(iterator.next());
                                writer.write("
");
                            }
                            writer.flush();
                        } finally {
                            writer.close();
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}

該內(nèi)部類實(shí)現(xiàn)了Runnable,是把日志消息落地到文件到線程。

(二)ActiveLimitFilter

該類時(shí)對(duì)于每個(gè)服務(wù)的每個(gè)方法的最大可并行調(diào)用數(shù)量限制的過濾器,它是在服務(wù)消費(fèi)者側(cè)的過濾。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得url對(duì)象
    URL url = invoker.getUrl();
    // 獲得方法名稱
    String methodName = invocation.getMethodName();
    // 獲得并發(fā)調(diào)用數(shù)(單個(gè)服務(wù)的單個(gè)方法),默認(rèn)為0
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    // 通過方法名來獲得對(duì)應(yīng)的狀態(tài)
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    if (max > 0) {
        // 獲得該方法調(diào)用的超時(shí)次數(shù)
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        // 獲得系統(tǒng)時(shí)間
        long start = System.currentTimeMillis();
        long remain = timeout;
        // 獲得該方法的調(diào)用數(shù)量
        int active = count.getActive();
        // 如果活躍數(shù)量大于等于最大的并發(fā)調(diào)用數(shù)量
        if (active >= max) {
            synchronized (count) {
                // 當(dāng)活躍數(shù)量大于等于最大的并發(fā)調(diào)用數(shù)量時(shí)一直循環(huán)
                while ((active = count.getActive()) >= max) {
                    try {
                        // 等待超時(shí)時(shí)間
                        count.wait(remain);
                    } catch (InterruptedException e) {
                    }
                    // 獲得累計(jì)時(shí)間
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    // 如果累計(jì)時(shí)間大于超時(shí)時(shí)間,則拋出異常
                    if (remain <= 0) {
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                + invoker.getInterface().getName() + ", method: "
                                + invocation.getMethodName() + ", elapsed: " + elapsed
                                + ", timeout: " + timeout + ". concurrent invokes: " + active
                                + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }
    }
    try {
        // 獲得系統(tǒng)時(shí)間作為開始時(shí)間
        long begin = System.currentTimeMillis();
        // 開始計(jì)數(shù)
        RpcStatus.beginCount(url, methodName);
        try {
            // 調(diào)用后面的調(diào)用鏈,如果沒有拋出異常,則算成功
            Result result = invoker.invoke(invocation);
            // 結(jié)束計(jì)數(shù),記錄時(shí)間
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
            return result;
        } catch (RuntimeException t) {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
            throw t;
        }
    } finally {
        if (max > 0) {
            synchronized (count) {
                // 喚醒count
                count.notify();
            }
        }
    }
}

該類只有這一個(gè)方法。該過濾器是用來限制調(diào)用數(shù)量,先進(jìn)行調(diào)用數(shù)量的檢測,如果沒有到達(dá)最大的調(diào)用數(shù)量,則先調(diào)用后面的調(diào)用鏈,如果在后面的調(diào)用鏈?zhǔn)?,則記錄相關(guān)時(shí)間,如果成功也記錄相關(guān)時(shí)間和調(diào)用次數(shù)。

(三)ClassLoaderFilter

該過濾器是做類加載器切換的。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得當(dāng)前的類加載器
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
    // 設(shè)置invoker攜帶的服務(wù)的類加載器
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
    try {
        // 調(diào)用下面的調(diào)用鏈
        return invoker.invoke(invocation);
    } finally {
        // 最后切換回原來的類加載器
        Thread.currentThread().setContextClassLoader(ocl);
    }
}

可以看到先切換成當(dāng)前的線程鎖攜帶的類加載器,然后調(diào)用結(jié)束后,再切換回原先的類加載器。

(四)CompatibleFilter

該過濾器是做兼容性的過濾器。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 調(diào)用下一個(gè)調(diào)用鏈
    Result result = invoker.invoke(invocation);
    // 如果方法前面沒有$或者結(jié)果沒有異常
    if (!invocation.getMethodName().startsWith("$") && !result.hasException()) {
        Object value = result.getValue();
        if (value != null) {
            try {
                // 獲得方法
                Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                // 獲得返回的數(shù)據(jù)類型
                Class type = method.getReturnType();
                Object newValue;
                // 序列化方法
                String serialization = invoker.getUrl().getParameter(Constants.SERIALIZATION_KEY);
                // 如果是json或者fastjson形式
                if ("json".equals(serialization)
                        || "fastjson".equals(serialization)) {
                    // 獲得方法的泛型返回值類型
                    Type gtype = method.getGenericReturnType();
                    // 把數(shù)據(jù)結(jié)果進(jìn)行類型轉(zhuǎn)化
                    newValue = PojoUtils.realize(value, type, gtype);
                    // 如果value不是type類型
                } else if (!type.isInstance(value)) {
                    // 如果是pojo,則,轉(zhuǎn)化為type類型,如果不是,則進(jìn)行兼容類型轉(zhuǎn)化。
                    newValue = PojoUtils.isPojo(type)
                            ? PojoUtils.realize(value, type)
                            : CompatibleTypeUtils.compatibleTypeConvert(value, type);

                } else {
                    newValue = value;
                }
                // 重新設(shè)置RpcResult的result
                if (newValue != value) {
                    result = new RpcResult(newValue);
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    return result;
}

可以看到對(duì)于調(diào)用鏈的返回結(jié)果,如果返回值類型和返回值不一樣的時(shí)候,就需要做兼容類型的轉(zhuǎn)化。重新把結(jié)果放入RpcResult,返回。

(五)ConsumerContextFilter

該過濾器做的是在當(dāng)前的RpcContext中記錄本地調(diào)用的一次狀態(tài)信息。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 設(shè)置rpc上下文
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
            .setLocalAddress(NetUtils.getLocalHost(), 0)
            .setRemoteAddress(invoker.getUrl().getHost(),
                    invoker.getUrl().getPort());
    // 如果該會(huì)話域是rpc會(huì)話域
    if (invocation instanceof RpcInvocation) {
        // 設(shè)置實(shí)體域
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    try {
        // 調(diào)用下個(gè)調(diào)用鏈
        RpcResult result = (RpcResult) invoker.invoke(invocation);
        // 設(shè)置附加值
        RpcContext.getServerContext().setAttachments(result.getAttachments());
        return result;
    } finally {
        // 情況附加值
        RpcContext.getContext().clearAttachments();
    }
}

可以看到RpcContext記錄了一次調(diào)用狀態(tài)信息,然后先調(diào)用后面的調(diào)用鏈,再回來把附加值設(shè)置到RpcContext中。然后返回RpcContext,再清空,這樣是因?yàn)楹竺娴恼{(diào)用鏈中的附加值對(duì)前面的調(diào)用鏈?zhǔn)遣豢梢姷摹?/p> (六)ContextFilter

該過濾器做的是初始化rpc上下文。

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        // 獲得會(huì)話域的附加值
        Map attachments = invocation.getAttachments();
        // 刪除異步屬性以避免傳遞給以下調(diào)用鏈
        if (attachments != null) {
            attachments = new HashMap(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
        }
        // 在rpc上下文添加上一個(gè)調(diào)用鏈的信息
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

        // mreged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            // 把會(huì)話域中的附加值全部加入RpcContext中
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll(attachments);
            } else {
                RpcContext.getContext().setAttachments(attachments);
            }
        }

        // 如果會(huì)話域?qū)儆趓pc的會(huì)話域,則設(shè)置實(shí)體域
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            // 調(diào)用下一個(gè)調(diào)用鏈
            RpcResult result = (RpcResult) invoker.invoke(invocation);
            // pass attachments to result 把附加值加入到RpcResult
            result.addAttachments(RpcContext.getServerContext().getAttachments());
            return result;
        } finally {
            // 移除本地的上下文
            RpcContext.removeContext();
            // 清空附加值
            RpcContext.getServerContext().clearAttachments();
        }
    }

在《 dubbo源碼解析(十九)遠(yuǎn)程調(diào)用——開篇》中我已經(jīng)介紹了RpcContext的作用,角色。該過濾器就是做了初始化RpcContext的作用。

(七)DeprecatedFilter

該過濾器的作用是調(diào)用了廢棄的方法時(shí)打印錯(cuò)誤日志。

private static final Logger LOGGER = LoggerFactory.getLogger(DeprecatedFilter.class);

/**
 * 日志集合
 */
private static final Set logged = new ConcurrentHashSet();

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得key 服務(wù)+方法
    String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
    // 如果集合中沒有該key
    if (!logged.contains(key)) {
        // 則加入集合
        logged.add(key);
        // 如果該服務(wù)方法是廢棄的,則打印錯(cuò)誤日志
        if (invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.DEPRECATED_KEY, false)) {
            LOGGER.error("The service method " + invoker.getInterface().getName() + "." + getMethodSignature(invocation) + " is DEPRECATED! Declare from " + invoker.getUrl());
        }
    }
    // 調(diào)用下一個(gè)調(diào)用鏈
    return invoker.invoke(invocation);
}

/**
 * 獲得方法定義
 * @param invocation
 * @return
 */
private String getMethodSignature(Invocation invocation) {
    // 方法名
    StringBuilder buf = new StringBuilder(invocation.getMethodName());
    buf.append("(");
    // 參數(shù)類型
    Class[] types = invocation.getParameterTypes();
    // 拼接參數(shù)
    if (types != null && types.length > 0) {
        boolean first = true;
        for (Class type : types) {
            if (first) {
                first = false;
            } else {
                buf.append(", ");
            }
            buf.append(type.getSimpleName());
        }
    }
    buf.append(")");
    return buf.toString();
}

該過濾器比較簡單。

(八)EchoFilter

該過濾器是處理回聲測試的方法。

@Override
public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果調(diào)用的方法是回聲測試的方法 則直接返回結(jié)果,否則 調(diào)用下一個(gè)調(diào)用鏈
    if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
        return new RpcResult(inv.getArguments()[0]);
    return invoker.invoke(inv);
}

如果調(diào)用的方法是回聲測試的方法 則直接返回結(jié)果,否則 調(diào)用下一個(gè)調(diào)用鏈。

(九)ExceptionFilter

該過濾器是作用是對(duì)異常的處理。

private final Logger logger;

public ExceptionFilter() {
    this(LoggerFactory.getLogger(ExceptionFilter.class));
}

public ExceptionFilter(Logger logger) {
    this.logger = logger;
}

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    try {
        // 調(diào)用下一個(gè)調(diào)用鏈,返回結(jié)果
        Result result = invoker.invoke(invocation);
        // 如果結(jié)果有異常,并且該服務(wù)不是一個(gè)泛化調(diào)用
        if (result.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                // 獲得異常
                Throwable exception = result.getException();

                // directly throw if it"s checked exception
                // 如果這是一個(gè)checked的異常,則直接返回異常,也就是接口上聲明的Unchecked的異常
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return result;
                }
                // directly throw if the exception appears in the signature
                // 如果已經(jīng)在接口方法上聲明了該異常,則直接返回
                try {
                    // 獲得方法
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    // 獲得異常類型
                    Class[] exceptionClassses = method.getExceptionTypes();
                    for (Class exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }

                // for the exception not found in method"s signature, print ERROR message in server"s log.
                // 打印錯(cuò)誤 該異常沒有在方法上申明
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                // 如果異常類和接口類在同一個(gè)jar包里面,則拋出異常
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }
                // directly throw if it"s JDK exception
                // 如果是jdk中定義的異常,則直接拋出
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return result;
                }
                // directly throw if it"s dubbo exception
                // 如果 是dubbo的異常,則直接拋出
                if (exception instanceof RpcException) {
                    return result;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                // 如果不是以上的異常,則包裝成為RuntimeException并且拋出
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;
            }
        }
        return result;
    } catch (RuntimeException e) {
        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
        throw e;
    }
}

可以看到除了接口上聲明的Unchecked的異常和有定義的異常外,都會(huì)包裝成RuntimeException來返回,為了防止客戶端反序列化失敗。

(十)ExecuteLimitFilter

該過濾器是限制最大可并行執(zhí)行請(qǐng)求數(shù),該過濾器是服務(wù)提供者側(cè),而上述講到的ActiveLimitFilter是在消費(fèi)者側(cè)的限制。

    @Override
    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
        // 獲得url對(duì)象
        URL url = invoker.getUrl();
        // 方法名稱
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        // 如果該方法設(shè)置了executes并且值大于0
        if (max > 0) {
            // 獲得該方法對(duì)應(yīng)的RpcStatus
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
            // 獲得信號(hào)量
            executesLimit = count.getSemaphore(max);
            // 如果不能獲得許可,則拋出異常
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than  limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        // 計(jì)數(shù)加1
        RpcStatus.beginCount(url, methodName);
        try {
            // 調(diào)用下一個(gè)調(diào)用鏈
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            // 計(jì)數(shù)減1
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

為什么這里需要用到信號(hào)量來控制,可以看一下以下鏈接的介紹:http://manzhizhen.iteye.com/b...

(十一)GenericFilter

該過濾器就是對(duì)于泛化調(diào)用的請(qǐng)求和結(jié)果進(jìn)行反序列化和序列化的操作,它是服務(wù)提供者側(cè)的。

@Override
public Result invoke(Invoker invoker, Invocation inv) throws RpcException {
    // 如果是泛化調(diào)用
    if (inv.getMethodName().equals(Constants.$INVOKE)
            && inv.getArguments() != null
            && inv.getArguments().length == 3
            && !invoker.getInterface().equals(GenericService.class)) {
        // 獲得請(qǐng)求名字
        String name = ((String) inv.getArguments()[0]).trim();
        // 獲得請(qǐng)求參數(shù)類型
        String[] types = (String[]) inv.getArguments()[1];
        // 獲得請(qǐng)求參數(shù)
        Object[] args = (Object[]) inv.getArguments()[2];
        try {
            // 獲得方法
            Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
            // 獲得該方法的參數(shù)類型
            Class[] params = method.getParameterTypes();
            if (args == null) {
                args = new Object[params.length];
            }
            // 獲得附加值
            String generic = inv.getAttachment(Constants.GENERIC_KEY);

            // 如果附加值為空,在用上下文攜帶的附加值
            if (StringUtils.isBlank(generic)) {
                generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
            }

            // 如果附加值還是為空或者是默認(rèn)的泛化序列化類型
            if (StringUtils.isEmpty(generic)
                    || ProtocolUtils.isDefaultGenericSerialization(generic)) {
                // 直接進(jìn)行類型轉(zhuǎn)化
                args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
            } else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (byte[].class == args[i].getClass()) {
                        try {
                            UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
                            // 使用nativejava方式反序列化
                            args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
                                    .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                                    .deserialize(null, is).readObject();
                        } catch (Exception e) {
                            throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
                        }
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
                                        "] only support message type " +
                                        byte[].class +
                                        " and your message type is " +
                                        args[i].getClass());
                    }
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                for (int i = 0; i < args.length; i++) {
                    if (args[i] instanceof JavaBeanDescriptor) {
                        // 用JavaBean方式反序列化
                        args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
                    } else {
                        throw new RpcException(
                                "Generic serialization [" +
                                        Constants.GENERIC_SERIALIZATION_BEAN +
                                        "] only support message type " +
                                        JavaBeanDescriptor.class.getName() +
                                        " and your message type is " +
                                        args[i].getClass().getName());
                    }
                }
            }
            // 調(diào)用下一個(gè)調(diào)用鏈
            Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));
            if (result.hasException()
                    && !(result.getException() instanceof GenericException)) {
                return new RpcResult(new GenericException(result.getException()));
            }
            if (ProtocolUtils.isJavaGenericSerialization(generic)) {
                try {
                    UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
                    // 用nativejava方式序列化
                    ExtensionLoader.getExtensionLoader(Serialization.class)
                            .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
                            .serialize(null, os).writeObject(result.getValue());
                    return new RpcResult(os.toByteArray());
                } catch (IOException e) {
                    throw new RpcException("Serialize result failed.", e);
                }
            } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                // 使用JavaBean方式序列化返回結(jié)果
                return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));
            } else {
                // 直接轉(zhuǎn)化為pojo類型然后返回
                return new RpcResult(PojoUtils.generalize(result.getValue()));
            }
        } catch (NoSuchMethodException e) {
            throw new RpcException(e.getMessage(), e);
        } catch (ClassNotFoundException e) {
            throw new RpcException(e.getMessage(), e);
        }
    }
    // 調(diào)用下一個(gè)調(diào)用鏈
    return invoker.invoke(inv);
}
(十二)GenericImplFilter

該過濾器也是對(duì)于泛化調(diào)用的序列化檢查和處理,它是消費(fèi)者側(cè)的過濾器。

private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);

/**
 * 參數(shù)集合
 */
private static final Class[] GENERIC_PARAMETER_TYPES = new Class[]{String.class, String[].class, Object[].class};

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得泛化的值
    String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);
    // 如果該值是nativejava或者bean或者true,并且不是一個(gè)返回調(diào)用
    if (ProtocolUtils.isGeneric(generic)
            && !Constants.$INVOKE.equals(invocation.getMethodName())
            && invocation instanceof RpcInvocation) {
        RpcInvocation invocation2 = (RpcInvocation) invocation;
        // 獲得方法名稱
        String methodName = invocation2.getMethodName();
        // 獲得參數(shù)類型集合
        Class[] parameterTypes = invocation2.getParameterTypes();
        // 獲得參數(shù)集合
        Object[] arguments = invocation2.getArguments();

        // 把參數(shù)類型的名稱放入集合
        String[] types = new String[parameterTypes.length];
        for (int i = 0; i < parameterTypes.length; i++) {
            types[i] = ReflectUtils.getName(parameterTypes[i]);
        }

        Object[] args;
        // 對(duì)參數(shù)集合進(jìn)行序列化
        if (ProtocolUtils.isBeanGenericSerialization(generic)) {
            args = new Object[arguments.length];
            for (int i = 0; i < arguments.length; i++) {
                args[i] = JavaBeanSerializeUtil.serialize(arguments[i], JavaBeanAccessor.METHOD);
            }
        } else {
            args = PojoUtils.generalize(arguments);
        }

        // 重新把序列化的參數(shù)放入
        invocation2.setMethodName(Constants.$INVOKE);
        invocation2.setParameterTypes(GENERIC_PARAMETER_TYPES);
        invocation2.setArguments(new Object[]{methodName, types, args});
        // 調(diào)用下一個(gè)調(diào)用鏈
        Result result = invoker.invoke(invocation2);

        if (!result.hasException()) {
            Object value = result.getValue();
            try {
                Method method = invoker.getInterface().getMethod(methodName, parameterTypes);
                if (ProtocolUtils.isBeanGenericSerialization(generic)) {
                    if (value == null) {
                        return new RpcResult(value);
                    } else if (value instanceof JavaBeanDescriptor) {
                        // 用javabean方式反序列化
                        return new RpcResult(JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) value));
                    } else {
                        throw new RpcException(
                                "The type of result value is " +
                                        value.getClass().getName() +
                                        " other than " +
                                        JavaBeanDescriptor.class.getName() +
                                        ", and the result is " +
                                        value);
                    }
                } else {
                    // 直接轉(zhuǎn)化為pojo類型
                    return new RpcResult(PojoUtils.realize(value, method.getReturnType(), method.getGenericReturnType()));
                }
            } catch (NoSuchMethodException e) {
                throw new RpcException(e.getMessage(), e);
            }
            // 如果調(diào)用鏈中有異常拋出,并且是GenericException類型的異常
        } else if (result.getException() instanceof GenericException) {
            GenericException exception = (GenericException) result.getException();
            try {
                // 獲得異常類名
                String className = exception.getExceptionClass();
                Class clazz = ReflectUtils.forName(className);
                Throwable targetException = null;
                Throwable lastException = null;
                try {
                    targetException = (Throwable) clazz.newInstance();
                } catch (Throwable e) {
                    lastException = e;
                    for (Constructor constructor : clazz.getConstructors()) {
                        try {
                            targetException = (Throwable) constructor.newInstance(new Object[constructor.getParameterTypes().length]);
                            break;
                        } catch (Throwable e1) {
                            lastException = e1;
                        }
                    }
                }
                if (targetException != null) {
                    try {
                        Field field = Throwable.class.getDeclaredField("detailMessage");
                        if (!field.isAccessible()) {
                            field.setAccessible(true);
                        }
                        field.set(targetException, exception.getExceptionMessage());
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                    result = new RpcResult(targetException);
                } else if (lastException != null) {
                    throw lastException;
                }
            } catch (Throwable e) {
                throw new RpcException("Can not deserialize exception " + exception.getExceptionClass() + ", message: " + exception.getExceptionMessage(), e);
            }
        }
        return result;
    }

    // 如果是泛化調(diào)用
    if (invocation.getMethodName().equals(Constants.$INVOKE)
            && invocation.getArguments() != null
            && invocation.getArguments().length == 3
            && ProtocolUtils.isGeneric(generic)) {

        Object[] args = (Object[]) invocation.getArguments()[2];
        if (ProtocolUtils.isJavaGenericSerialization(generic)) {

            for (Object arg : args) {
                // 如果調(diào)用消息不是字節(jié)數(shù)組類型,則拋出異常
                if (!(byte[].class == arg.getClass())) {
                    error(generic, byte[].class.getName(), arg.getClass().getName());
                }
            }
        } else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
            for (Object arg : args) {
                if (!(arg instanceof JavaBeanDescriptor)) {
                    error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
                }
            }
        }

        // 設(shè)置附加值
        ((RpcInvocation) invocation).setAttachment(
                Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
    }
    return invoker.invoke(invocation);
}

/**
 * 拋出錯(cuò)誤異常
 * @param generic
 * @param expected
 * @param actual
 * @throws RpcException
 */
private void error(String generic, String expected, String actual) throws RpcException {
    throw new RpcException(
            "Generic serialization [" +
                    generic +
                    "] only support message type " +
                    expected +
                    " and your message type is " +
                    actual);
}
(十三)TimeoutFilter

該過濾器是當(dāng)服務(wù)調(diào)用超時(shí)的時(shí)候,記錄告警日志。

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {
    // 獲得開始時(shí)間
    long start = System.currentTimeMillis();
    // 調(diào)用下一個(gè)調(diào)用鏈
    Result result = invoker.invoke(invocation);
    // 獲得調(diào)用使用的時(shí)間
    long elapsed = System.currentTimeMillis() - start;
    // 如果服務(wù)調(diào)用超時(shí),則打印告警日志
    if (invoker.getUrl() != null
            && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
            "timeout", Integer.MAX_VALUE)) {
        if (logger.isWarnEnabled()) {
            logger.warn("invoke time out. method: " + invocation.getMethodName()
                    + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
                    + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
        }
    }
    return result;
}
(十四)TokenFilter

該過濾器提供了token的驗(yàn)證功能,關(guān)于token的介紹可以查看官方文檔。

@Override
public Result invoke(Invoker invoker, Invocation inv)
        throws RpcException {
    // 獲得token值
    String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY);
    if (ConfigUtils.isNotEmpty(token)) {
        // 獲得服務(wù)類型
        Class serviceType = invoker.getInterface();
        // 獲得附加值
        Map attachments = inv.getAttachments();
        String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY);
        // 如果令牌不一樣,則拋出異常
        if (!token.equals(remoteToken)) {
            throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
        }
    }
    // 調(diào)用下一個(gè)調(diào)用鏈
    return invoker.invoke(inv);
}
(十五)TpsLimitFilter

該過濾器的作用是對(duì)TPS限流。

/**
 * TPS 限制器對(duì)象
 */
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

@Override
public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {

    // 如果限流器不允許,則拋出異常
    if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
        throw new RpcException(
                "Failed to invoke service " +
                        invoker.getInterface().getName() +
                        "." +
                        invocation.getMethodName() +
                        " because exceed max service tps.");
    }

    // 調(diào)用下一個(gè)調(diào)用鏈
    return invoker.invoke(invocation);
}

其中關(guān)鍵是TPS 限制器對(duì)象,請(qǐng)看下面的分析。

(十六)TPSLimiter
public interface TPSLimiter {

    /**
     * judge if the current invocation is allowed by TPS rule
     * 是否允許通過
     * @param url        url
     * @param invocation invocation
     * @return true allow the current invocation, otherwise, return false
     */
    boolean isAllowable(URL url, Invocation invocation);

}

該接口是tps限流器的接口,只定義了一個(gè)是否允許通過的方法。

(十七)StatItem

該類是統(tǒng)計(jì)的數(shù)據(jù)結(jié)構(gòu)。

class StatItem {

    /**
     * 服務(wù)名
     */
    private String name;

    /**
     * 最后一次重置的時(shí)間
     */
    private long lastResetTime;

    /**
     * 周期
     */
    private long interval;

    /**
     * 剩余多少流量
     */
    private AtomicInteger token;

    /**
     * 限制大小
     */
    private int rate;

    StatItem(String name, int rate, long interval) {
        this.name = name;
        this.rate = rate;
        this.interval = interval;
        this.lastResetTime = System.currentTimeMillis();
        this.token = new AtomicInteger(rate);
    }

    public boolean isAllowable() {
        long now = System.currentTimeMillis();
        // 如果限制的時(shí)間大于最后一次時(shí)間加上周期,則重置
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        // 直到有流量
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        // 返回flag
        return flag;
    }

    long getLastResetTime() {
        return lastResetTime;
    }

    int getToken() {
        return token.get();
    }

    @Override
    public String toString() {
        return new StringBuilder(32).append("StatItem ")
                .append("[name=").append(name).append(", ")
                .append("rate = ").append(rate).append(", ")
                .append("interval = ").append(interval).append("]")
                .toString();
    }

}

可以看到該類中記錄了一些訪問的流量,并且設(shè)置了周期重置機(jī)制。

(十八)DefaultTPSLimiter

該類實(shí)現(xiàn)了TPSLimiter,是默認(rèn)的tps限流器實(shí)現(xiàn)。

public class DefaultTPSLimiter implements TPSLimiter {

    /**
     * 統(tǒng)計(jì)項(xiàng)集合
     */
    private final ConcurrentMap stats
            = new ConcurrentHashMap();

    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        // 獲得tps限制大小,默認(rèn)-1,不限制
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        // 獲得限流周期
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        String serviceKey = url.getServiceKey();
        // 如果限制
        if (rate > 0) {
            // 從集合中獲得統(tǒng)計(jì)項(xiàng)
            StatItem statItem = stats.get(serviceKey);
            // 如果為空,則新建
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                        new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            // 返回是否允許
            return statItem.isAllowable();
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                // 移除該服務(wù)的統(tǒng)計(jì)項(xiàng)
                stats.remove(serviceKey);
            }
        }

        return true;
    }

}

是否允許的邏輯還是調(diào)用了統(tǒng)計(jì)項(xiàng)中的isAllowable方法。

本文介紹了很多的過濾器,哪些過濾器是在服務(wù)引用的,哪些服務(wù)器是服務(wù)暴露的,可以查看相應(yīng)源碼過濾器的實(shí)現(xiàn)上的注解,

例如ActiveLimitFilter上:

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)

可以看到group為consumer組的,也就是服務(wù)消費(fèi)者側(cè)的,則是服務(wù)引用過程中的的過濾器。

例如ExecuteLimitFilter上:

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)

可以看到group為provider組的,也就是服務(wù)消費(fèi)者側(cè)的,則是服務(wù)暴露過程中的的過濾器。

后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了在服務(wù)引用和服務(wù)暴露中的各種filter過濾器。接下來我將開始對(duì)rpc模塊的監(jiān)聽器進(jìn)行講解。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72871.html

相關(guān)文章

  • dubbo源碼解析(四十八)異步化改造

    摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對(duì)于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請(qǐng)求過程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對(duì)于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...

    lijinke666 評(píng)論0 收藏0
  • dubbo源碼解析(四十六)消費(fèi)端發(fā)送請(qǐng)求過程

    摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...

    fish 評(píng)論0 收藏0
  • dubbo源碼解析(四十七)服務(wù)端處理請(qǐng)求過程

    摘要:而存在的意義就是保證請(qǐng)求或響應(yīng)對(duì)象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。 2.7大揭秘——服務(wù)端處理請(qǐng)求過程 目標(biāo):從源碼的角度分析服務(wù)端接收到請(qǐng)求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請(qǐng)求的過程,該篇就要將服務(wù)端處理請(qǐng)求的過程。也就是當(dāng)服務(wù)端收到請(qǐng)求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請(qǐng)求的時(shí)候已經(jīng)做了編碼,所以我...

    yzzz 評(píng)論0 收藏0
  • dubbo源碼解析二十四)遠(yuǎn)程調(diào)用——dubbo協(xié)議

    摘要:遠(yuǎn)程調(diào)用協(xié)議目標(biāo)介紹遠(yuǎn)程調(diào)用中跟協(xié)議相關(guān)的設(shè)計(jì)和實(shí)現(xiàn),介紹的源碼。二該類繼承了,是協(xié)議中獨(dú)有的服務(wù)暴露者。八該類也是對(duì)的裝飾,其中增強(qiáng)了調(diào)用次數(shù)多功能。 遠(yuǎn)程調(diào)用——dubbo協(xié)議 目標(biāo):介紹遠(yuǎn)程調(diào)用中跟dubbo協(xié)議相關(guān)的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-dubbo的源碼。 前言 Dubbo 缺省協(xié)議采用單一長連接和 NIO 異步通訊,適合于小數(shù)據(jù)量大并發(fā)的服務(wù)調(diào)用,以及服務(wù)消費(fèi)者...

    rickchen 評(píng)論0 收藏0
  • dubbo源碼解析二十二)遠(yuǎn)程調(diào)用——Protocol

    摘要:七該類也實(shí)現(xiàn)了,也是裝飾了接口,但是它是在服務(wù)引用和暴露過程中加上了監(jiān)聽器的功能。如果是注冊中心,則暴露該創(chuàng)建一個(gè)暴露者監(jiān)聽器包裝類對(duì)象該方法是在服務(wù)暴露上做了監(jiān)聽器功能的增強(qiáng),也就是加上了監(jiān)聽器。 遠(yuǎn)程調(diào)用——Protocol 目標(biāo):介紹遠(yuǎn)程調(diào)用中協(xié)議的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-api中的各種protocol包的源碼,是重點(diǎn)內(nèi)容。 前言 在遠(yuǎn)程調(diào)用中協(xié)議是非常重要的一層,看...

    孫淑建 評(píng)論0 收藏0
  • dubbo源碼解析二十七)遠(yuǎn)程調(diào)用——injvm本地調(diào)用

    摘要:遠(yuǎn)程調(diào)用本地調(diào)用目標(biāo)介紹本地調(diào)用的設(shè)計(jì)和實(shí)現(xiàn),介紹的源碼。前言是一個(gè)遠(yuǎn)程調(diào)用的框架,但是它沒有理由不支持本地調(diào)用,本文就要講解關(guān)于本地調(diào)用的實(shí)現(xiàn)。服務(wù)暴露者集合取消暴露調(diào)用父類的取消暴露方法從集合中移除二該類繼承了類,是本地調(diào)用的實(shí)現(xiàn)。 遠(yuǎn)程調(diào)用——injvm本地調(diào)用 目標(biāo):介紹injvm本地調(diào)用的設(shè)計(jì)和實(shí)現(xiàn),介紹dubbo-rpc-injvm的源碼。 前言 dubbo是一個(gè)遠(yuǎn)程調(diào)用的...

    sean 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<