Dubbo源码学习--MergeableCluster集群容错(九)
MergeableCluster聚合集群,将集群中的调用结果聚合起来返回结果。比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。比较鸡肋,小点的项目应该都用不到
配置如:(搜索所有分组)
按组合并返回结果,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。比较鸡肋,小点的项目应该都用不到
配置如:(搜索所有分组)
<dubbo:reference interface="com.xxx.MenuService" group="*" merger="true" />
(合并指定分组)
<dubbo:reference interface="com.xxx.MenuService" group="aaa,bbb" merger="true" />
(指定方法合并结果,其它未指定的方法,将只调用一个Group)
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="true" />
</dubbo:service>
(某个方法不合并结果,其它都合并结果)
<dubbo:reference interface="com.xxx.MenuService" group="*" merger="true">
<dubbo:method name="getMenuItems" merger="false" />
</dubbo:service>
(指定合并策略,缺省根据返回值类型自动匹配,如果同一类型有两个合并器时,需指定合并器的名称)
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="mymerge" />
</dubbo:service>
(指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型本身)
<dubbo:reference interface="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:service>
MergeableCluster的实现如下:
public class MergeableCluster implements Cluster {
public static final String NAME = "mergeable";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MergeableClusterInvoker<T>(directory);
}
}
结果调用及聚合操作是在MergeableClusterInvoker中完成实现的。
@SuppressWarnings("unchecked")
public class MergeableClusterInvoker<T> implements Invoker<T> {
private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);
private final Directory<T> directory;
private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
public MergeableClusterInvoker(Directory<T> directory) {
this.directory = directory;
}
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
//如果不存在分组,则只调用一个即可
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
Class<?> returnType;
try {
returnType = getInterface().getMethod(
invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
//如果存在分组则调用所有的分组,将结果添加到results中
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
}
Object result = null;
List<Result> resultList = new ArrayList<Result>(results.size());
//合并结果值
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) {
log.error(new StringBuilder(32).append("Invoke ")
.append(getGroupDescFromServiceKey(entry.getKey()))
.append(" failed: ")
.append(r.getException().getMessage()).toString(),
r.getException());
} else {
resultList.add(r);
}
} catch (Exception e) {
throw new RpcException(new StringBuilder(32)
.append("Failed to invoke service ")
.append(entry.getKey())
.append(": ")
.append(e.getMessage()).toString(),
e);
}
}
if (resultList.isEmpty()) {
return new RpcResult((Object) null);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}
if (returnType == void.class) {
return new RpcResult((Object) null);
}
//分组以.开头则只返回一个结果值
if (merger.startsWith(".")) {
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException(new StringBuilder(32)
.append("Can not merge result because missing method [ ")
.append(merger)
.append(" ] in class [ ")
.append(returnType.getClass().getName())
.append(" ]")
.toString());
}
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
if (method.getReturnType() != void.class
&& method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException(
new StringBuilder(32)
.append("Can not merge result: ")
.append(e.getMessage()).toString(),
e);
}
} else {
//返回所有的结果值
Merger resultMerger;
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(
rets.toArray((Object[]) Array.newInstance(returnType, 0)));
} else {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
}
public Class<T> getInterface() {
return directory.getInterface();
}
public URL getUrl() {
return directory.getUrl();
}
public boolean isAvailable() {
return directory.isAvailable();
}
public void destroy() {
directory.destroy();
}
private String getGroupDescFromServiceKey(String key) {
int index = key.indexOf("/");
if (index > 0) {
return new StringBuilder(32).append("group [ ")
.append(key.substring(0, index)).append(" ]").toString();
}
return key;
}
}
还没有评论,来说两句吧...