Dubbo源码学习--MergeableCluster集群容错(九)

太过爱你忘了你带给我的痛 2022-05-28 01:26 231阅读 0赞

MergeableCluster聚合集群,将集群中的调用结果聚合起来返回结果。比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。比较鸡肋,小点的项目应该都用不到

配置如:(搜索所有分组)

  1. 按组合并返回结果,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。比较鸡肋,小点的项目应该都用不到
  2. 配置如:(搜索所有分组)
  3. <dubbo:reference interface="com.xxx.MenuService" group="*" merger="true" />
  4. (合并指定分组)
  5. <dubbo:reference interface="com.xxx.MenuService" group="aaa,bbb" merger="true" />
  6. (指定方法合并结果,其它未指定的方法,将只调用一个Group)
  7. <dubbo:reference interface="com.xxx.MenuService" group="*">
  8. <dubbo:method name="getMenuItems" merger="true" />
  9. </dubbo:service>
  10. (某个方法不合并结果,其它都合并结果)
  11. <dubbo:reference interface="com.xxx.MenuService" group="*" merger="true">
  12. <dubbo:method name="getMenuItems" merger="false" />
  13. </dubbo:service>
  14. (指定合并策略,缺省根据返回值类型自动匹配,如果同一类型有两个合并器时,需指定合并器的名称)
  15. <dubbo:reference interface="com.xxx.MenuService" group="*">
  16. <dubbo:method name="getMenuItems" merger="mymerge" />
  17. </dubbo:service>
  18. (指定合并方法,将调用返回结果的指定方法进行合并,合并方法的参数类型必须是返回结果类型本身)
  19. <dubbo:reference interface="com.xxx.MenuService" group="*">
  20. <dubbo:method name="getMenuItems" merger=".addAll" />
  21. </dubbo:service>

MergeableCluster的实现如下:

  1. public class MergeableCluster implements Cluster {
  2. public static final String NAME = "mergeable";
  3. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  4. return new MergeableClusterInvoker<T>(directory);
  5. }
  6. }

结果调用及聚合操作是在MergeableClusterInvoker中完成实现的。

  1. @SuppressWarnings("unchecked")
  2. public class MergeableClusterInvoker<T> implements Invoker<T> {
  3. private static final Logger log = LoggerFactory.getLogger(MergeableClusterInvoker.class);
  4. private final Directory<T> directory;
  5. private ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("mergeable-cluster-executor", true));
  6. public MergeableClusterInvoker(Directory<T> directory) {
  7. this.directory = directory;
  8. }
  9. @SuppressWarnings("rawtypes")
  10. public Result invoke(final Invocation invocation) throws RpcException {
  11. List<Invoker<T>> invokers = directory.list(invocation);
  12. String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
  13. //如果不存在分组,则只调用一个即可
  14. if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
  15. for (final Invoker<T> invoker : invokers) {
  16. if (invoker.isAvailable()) {
  17. return invoker.invoke(invocation);
  18. }
  19. }
  20. return invokers.iterator().next().invoke(invocation);
  21. }
  22. Class<?> returnType;
  23. try {
  24. returnType = getInterface().getMethod(
  25. invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
  26. } catch (NoSuchMethodException e) {
  27. returnType = null;
  28. }
  29. //如果存在分组则调用所有的分组,将结果添加到results中
  30. Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
  31. for (final Invoker<T> invoker : invokers) {
  32. Future<Result> future = executor.submit(new Callable<Result>() {
  33. public Result call() throws Exception {
  34. return invoker.invoke(new RpcInvocation(invocation, invoker));
  35. }
  36. });
  37. results.put(invoker.getUrl().getServiceKey(), future);
  38. }
  39. Object result = null;
  40. List<Result> resultList = new ArrayList<Result>(results.size());
  41. //合并结果值
  42. int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  43. for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
  44. Future<Result> future = entry.getValue();
  45. try {
  46. Result r = future.get(timeout, TimeUnit.MILLISECONDS);
  47. if (r.hasException()) {
  48. log.error(new StringBuilder(32).append("Invoke ")
  49. .append(getGroupDescFromServiceKey(entry.getKey()))
  50. .append(" failed: ")
  51. .append(r.getException().getMessage()).toString(),
  52. r.getException());
  53. } else {
  54. resultList.add(r);
  55. }
  56. } catch (Exception e) {
  57. throw new RpcException(new StringBuilder(32)
  58. .append("Failed to invoke service ")
  59. .append(entry.getKey())
  60. .append(": ")
  61. .append(e.getMessage()).toString(),
  62. e);
  63. }
  64. }
  65. if (resultList.isEmpty()) {
  66. return new RpcResult((Object) null);
  67. } else if (resultList.size() == 1) {
  68. return resultList.iterator().next();
  69. }
  70. if (returnType == void.class) {
  71. return new RpcResult((Object) null);
  72. }
  73. //分组以.开头则只返回一个结果值
  74. if (merger.startsWith(".")) {
  75. merger = merger.substring(1);
  76. Method method;
  77. try {
  78. method = returnType.getMethod(merger, returnType);
  79. } catch (NoSuchMethodException e) {
  80. throw new RpcException(new StringBuilder(32)
  81. .append("Can not merge result because missing method [ ")
  82. .append(merger)
  83. .append(" ] in class [ ")
  84. .append(returnType.getClass().getName())
  85. .append(" ]")
  86. .toString());
  87. }
  88. if (!Modifier.isPublic(method.getModifiers())) {
  89. method.setAccessible(true);
  90. }
  91. result = resultList.remove(0).getValue();
  92. try {
  93. if (method.getReturnType() != void.class
  94. && method.getReturnType().isAssignableFrom(result.getClass())) {
  95. for (Result r : resultList) {
  96. result = method.invoke(result, r.getValue());
  97. }
  98. } else {
  99. for (Result r : resultList) {
  100. method.invoke(result, r.getValue());
  101. }
  102. }
  103. } catch (Exception e) {
  104. throw new RpcException(
  105. new StringBuilder(32)
  106. .append("Can not merge result: ")
  107. .append(e.getMessage()).toString(),
  108. e);
  109. }
  110. } else {
  111. //返回所有的结果值
  112. Merger resultMerger;
  113. if (ConfigUtils.isDefault(merger)) {
  114. resultMerger = MergerFactory.getMerger(returnType);
  115. } else {
  116. resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
  117. }
  118. if (resultMerger != null) {
  119. List<Object> rets = new ArrayList<Object>(resultList.size());
  120. for (Result r : resultList) {
  121. rets.add(r.getValue());
  122. }
  123. result = resultMerger.merge(
  124. rets.toArray((Object[]) Array.newInstance(returnType, 0)));
  125. } else {
  126. throw new RpcException("There is no merger to merge result.");
  127. }
  128. }
  129. return new RpcResult(result);
  130. }
  131. public Class<T> getInterface() {
  132. return directory.getInterface();
  133. }
  134. public URL getUrl() {
  135. return directory.getUrl();
  136. }
  137. public boolean isAvailable() {
  138. return directory.isAvailable();
  139. }
  140. public void destroy() {
  141. directory.destroy();
  142. }
  143. private String getGroupDescFromServiceKey(String key) {
  144. int index = key.indexOf("/");
  145. if (index > 0) {
  146. return new StringBuilder(32).append("group [ ")
  147. .append(key.substring(0, index)).append(" ]").toString();
  148. }
  149. return key;
  150. }
  151. }

发表评论

表情:
评论列表 (有 0 条评论,231人围观)

还没有评论,来说两句吧...

相关阅读