public class MergeableCluster implements Cluster {
public static final String NAME = "mergeable";
@Override
public Invoker join(Directory directory) throws RpcException {
// 創(chuàng)建MergeableClusterInvoker
return new MergeableClusterInvoker(directory);
}
}
該類實現(xiàn)了Cluster接口,是分組集合的集群實現(xiàn)。
(二)MergeableClusterInvoker
該類是分組聚合的實現(xiàn)類,其中最關機的就是invoke方法。
@Override
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
// 獲得invoker集合
List> invokers = directory.list(invocation);
/**
* 獲得是否merger
*/
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
// 如果沒有設置需要聚合,則只調用一個invoker的
if (ConfigUtils.isEmpty(merger)) { // If a method doesn"t have a merger, only invoke one Group
// 只要有一個可用就返回
for (final Invoker invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
// 返回類型
Class> returnType;
try {
// 獲得返回類型
returnType = getInterface().getMethod(
invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
// 結果集合
Map> results = new HashMap>();
// 循環(huán)invokers
for (final Invoker invoker : invokers) {
// 獲得每次調用的future
Future future = executor.submit(new Callable() {
@Override
public Result call() throws Exception {
// 回調,把返回結果放入future
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
// 加入集合
results.put(invoker.getUrl().getServiceKey(), future);
}
Object result = null;
List resultList = new ArrayList(results.size());
// 獲得超時時間
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 遍歷每一個結果
for (Map.Entry> entry : results.entrySet()) {
Future future = entry.getValue();
try {
// 獲得調用返回的結果
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),
r.getException());
} else {
// 加入集合
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
}
}
// 如果為空,則返回空的結果
if (resultList.isEmpty()) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
// 如果只有一個結果,則返回該結果
return resultList.iterator().next();
}
// 如果返回類型是void,也就是沒有返回值,那么返回空結果
if (returnType == void.class) {
return new RpcResult((Object) null);
}
// 根據(jù)方法來合并,將調用返回結果的指定方法進行合并
if (merger.startsWith(".")) {
merger = merger.substring(1);
Method method;
try {
// 獲得方法
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
returnType.getClass().getName() + " ]");
}
// 有 Method ,進行合并
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
// 從集合中移除
result = resultList.remove(0).getValue();
try {
// 方法返回類型匹配,合并時,修改 result
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
// 方法返回類型不匹配,合并時,不修改 result
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException("Can not merge result: " + e.getMessage(), e);
}
} else {
// 基于 Merger
Merger resultMerger;
// 如果是默認的方式
if (ConfigUtils.isDefault(merger)) {
// 獲得該類型的合并方式
resultMerger = MergerFactory.getMerger(returnType);
} else {
// 如果不是默認的,則配置中指定獲得Merger的實現(xiàn)類
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List