Dubbo集群容错

偏执的太偏执、 2023-06-22 05:24 67阅读 0赞

Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker 来处理集群容错问题。

Cluster

Cluster接口的实现类有FailoverCluster、FailfastCluster、FailsafeCluster等,其作用都是通过join方法生成对应的invoker。

  1. @SPI(FailoverCluster.NAME)
  2. public interface Cluster {
  3. /**
  4. * Merge the directory invokers to a virtual invoker.
  5. *
  6. * @param <T>
  7. * @param directory
  8. * @return cluster invoker
  9. * @throws RpcException
  10. */
  11. @Adaptive
  12. <T> Invoker<T> join(Directory<T> directory) throws RpcException;
  13. }

AbstractClusterInvoker

AbstractClusterInvoker实现了Invoker接口,同时也是FairoverClusterInvoker、FairfastClusterInvoker、FairsafeClusterInvoker等Cluster Invoker的父类。在服务消费者进行远程调用时,AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker、负载均衡、集群容错等操作均会在此阶段被执行。

  1. public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
  2. ......
  3. @Override
  4. public Result invoke(final Invocation invocation) throws RpcException {
  5. checkWhetherDestroyed();
  6. // binding attachments into invocation.
  7. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
  8. if (contextAttachments != null && contextAttachments.size() != 0) {
  9. ((RpcInvocation) invocation).addAttachments(contextAttachments);
  10. }
  11. // 列举 Invoker
  12. List<Invoker<T>> invokers = list(invocation);
  13. // 加载 LoadBalance
  14. LoadBalance loadbalance = initLoadBalance(invokers, invocation);
  15. RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
  16. // 调用 doInvoke 进行后续集群容错等操作(抽象方法,由子类实现)
  17. return doInvoke(invocation, invokers, loadbalance);
  18. }
  19. protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
  20. LoadBalance loadbalance) throws RpcException;
  21. ......
  22. }

集群容错方式




































名称 简介
Failover

失败自动切换:

当出现请求失败时,会重试其它服务器。可以通过retries设置重试次数,默认为2次。该方式是dubbo默认的容错机制,适用于读操作或幂等的写操作。

Failfast

快速失败:

当请求失败后,快速返回异常结果,不做任何重试。适用于非幂等接口。

Failsafe

失败安全:

当请求出现异常时,直接忽略异常。适用于佛系调用场景,即不关心调用是否成功,也不想影响外层的调用,例如不重要的日志同步等。

Failback

失败自动恢复:

请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试。适用于一些异步请求或最终一致性的请求。

Forking

并行调用:

同时调用多个相同的服务,只要有一个返回,则立即返回结果,可通过forks设置并行数。适用于某些对实时性要求极高的调用上,但也会浪费更多的资源。

Broadcast

广播调用:

广播调用所有可用的服务,任意一个节点报错则报错。适用于服务测试。

Available

最简单的调用:

请求不会做负载均衡,遍历所有服务列表,找到第一个可用的节点,直接请求并返回,如果没有可用节点则抛出异常。

FailoverClusterInvoker.doInvoker

FailoverClusterInvoker是默认的Cluster Invoker,在doInvoker方法中实现集群容错。

首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。

  1. @Override
  2. @SuppressWarnings({"unchecked", "rawtypes"})
  3. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  4. List<Invoker<T>> copyInvokers = invokers;
  5. checkInvokers(copyInvokers, invocation);
  6. String methodName = RpcUtils.getMethodName(invocation);
  7. // 获取重试次数 DEFAULT_RETRIES 默认 2
  8. int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
  9. if (len <= 0) {
  10. len = 1;
  11. }
  12. // retry loop.
  13. RpcException le = null; // last exception.
  14. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
  15. Set<String> providers = new HashSet<String>(len);
  16. // 循环调用,失败重试
  17. for (int i = 0; i < len; i++) {
  18. //Reselect before retry to avoid a change of candidate `invokers`.
  19. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
  20. if (i > 0) {
  21. checkWhetherDestroyed();
  22. // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
  23. // 通过调用 list 可得到最新可用的 Invoker 列表
  24. copyInvokers = list(invocation);
  25. // check again
  26. checkInvokers(copyInvokers, invocation);
  27. }
  28. // 通过负载均衡选择 Invoker
  29. Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
  30. // 添加到 invoker 到 invoked 列表中
  31. invoked.add(invoker);
  32. // 设置 invoked 到 RPC 上下文中
  33. RpcContext.getContext().setInvokers((List) invoked);
  34. try {
  35. // 调用目标 Invoker 的 invoke 方法
  36. Result result = invoker.invoke(invocation);
  37. if (le != null && logger.isWarnEnabled()) {
  38. logger.warn("Although retry the method " + methodName
  39. + " in the service " + getInterface().getName()
  40. + " was successful by the provider " + invoker.getUrl().getAddress()
  41. + ", but there have been failed providers " + providers
  42. + " (" + providers.size() + "/" + copyInvokers.size()
  43. + ") from the registry " + directory.getUrl().getAddress()
  44. + " on the consumer " + NetUtils.getLocalHost()
  45. + " using the dubbo version " + Version.getVersion() + ". Last error is: "
  46. + le.getMessage(), le);
  47. }
  48. return result;
  49. } catch (RpcException e) {
  50. if (e.isBiz()) { // biz exception.
  51. throw e;
  52. }
  53. le = e;
  54. } catch (Throwable e) {
  55. le = new RpcException(e.getMessage(), e);
  56. } finally {
  57. providers.add(invoker.getUrl().getAddress());
  58. }
  59. }
  60. // 若重试失败,则抛出异常
  61. throw new RpcException(le.getCode(), "Failed to invoke the method "
  62. + methodName + " in the service " + getInterface().getName()
  63. + ". Tried " + len + " times of the providers " + providers
  64. + " (" + providers.size() + "/" + copyInvokers.size()
  65. + ") from the registry " + directory.getUrl().getAddress()
  66. + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
  67. + Version.getVersion() + ". Last error is: "
  68. + le.getMessage(), le.getCause() != null ? le.getCause() : le);
  69. }

发表评论

表情:
评论列表 (有 0 条评论,67人围观)

还没有评论,来说两句吧...

相关阅读

    相关 dubbo容错

    为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样, 在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环

    相关 谈谈dubbo容错

    从继承图可以看出有以下几种策略: Failover Cluster(失败转移) 当调用提供者的服务器发生错误时,再试下一个服务器。 用于读操作。重试有延时。设置重试次数

    相关 Dubbo容错

    Dubbo官网:[dubbo 2.7  集群容错][dubbo 2.7_] 集群容错指的是,当消费者调用提供者集群时发生异常的处理方案 一、Dubbo内置的容错策略

    相关 Dubbo-容错

    当一个项目(应用或服务)放在多台服务器上时,怎么实现高可用,这就是集群容错。 \------------------------------------------

    相关 Dubbo——容错

    Cluster层概述 在微服务环境中,为了保证服务的高可用,很少会有单点服务出现,服务通常都是以集群的形式出现的。然而,被调用的远程服务并不是每时每刻都保持良好状况,当某