Dubbo进阶(十一):容错机制

首先我们先了解一下Dubbo调用的流程
在这里插入图片描述

本文主要讲解Cluster

在集群调用失败时,Dubbo提供了多种容错方案,默认值为failover重试。Dubbo中现在有FailoverFailfastFailsafeFailbackForkingBroadcast等容错机制,每个容错机制的特性如下表。


































机制名 机制简介
Failover Dubbo容错机制的默认值。当出现失败的时候,会尝试其他服务。用户可以通过retries=”2”设置重试次数。这是Dubbo的默认容错机制,会对请求做负载均衡。通常使用在读操作或者幂等性写操作上,但是重试操作会导致接口的延迟增大,在下游机器负载已经到达极限的情况下,重试会增加下游机器的负载。
Failfast 快速失败,当请求失败之后快速返回异常结果,不做任何重试。改容错机制会对请求做负载均衡,通常使用在非幂等性的接口上。该机制受网络抖动的影响较大。
Failsafe 当出现异常的时候直接忽略异常,会对请求做负载均衡。通常的使用场景是不关心调用是否成功,并且不想抛出异常影响外部调用,如某些不重要的日志同步,及时出现异常也无所谓。
Failback 请求失败之后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用一些异步或者最终以执行的请求。会对请求做负载均衡。
Forking 同时调用多个相同的服务,只要其中一个返回,就立即返回结果。用户可以配置forks=”2” 来设置最大并行数。通常用在对接口实时性较高的场景,但是也会浪费更多的资源。
Broadcast 广播调用所有可用服务,任意一个节点报错则报错。由于是广播,因此请求不需要做负载均衡。

通过上面的这个表格我们可以大概了解到Dubbo中每种容错机制的含义和大概含义,接下来我们来了解一下怎么在服务中使用以上的容错机制。

使用方法

通常可以通过<dubbo:service><dubbo:reference><dubbo:consumer><dubbo:provider>标签上通过cluster属性进行设置。

  • <dubbo:service cluster="Failfast" />
  • <dubbo:reference cluster="Failfast" />
  • <dubbo:consumer cluster="Failfast" />
  • <dubbo:provider cluster="Failfast" />
源码分析

通过源码可以看到,目前Dubbo对Cluster的扩展有以下几种,在/dubbo-cluster/src/main/resources/META-INF/dubbo/internal下可以看到有这么多种容错机制,那么在实际业务中使用哪种就要按具体场景了,可以参考一下上面的表格。

  1. mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
  2. failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
  3. failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
  4. failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
  5. failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
  6. forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
  7. available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
  8. mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
  9. broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
  10. zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
  • 首先我们看一下org.apache.dubbo.rpc.cluster.Cluster这个接口,可以看到Dubbo Cluster的默认值是FailoverCluster

    @SPI(FailoverCluster.NAME)
    public interface Cluster {

    1. /**
    2. * Merge the directory invokers to a virtual invoker.
    3. *
    4. * @param <T>
    5. * @param directory
    6. * @return cluster invoker
    7. * @throws RpcException
    8. */
    9. @Adaptive
    10. <T> Invoker<T> join(Directory<T> directory) throws RpcException;

    }

  • 我们以Dubbo Cluster的默认值为例,来分析一下是怎么实现容错的。FailoverClusterCluster的一种实现,FailoverCluster直接创建一个新的Invoker并返回。

    public class FailoverCluster extends AbstractCluster {

    1. // @SPI指定的名字,也就是Dubbo Cluster的默认值
    2. public final static String NAME = "failover";
    3. @Override
    4. public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
    5. // 直接new一个FailoverClusterInvoker,是一个Invoker,具体的实现逻辑都封装在这里面
    6. return new FailoverClusterInvoker<>(directory);
    7. }

    }

  • FailoverCluster的实现逻辑

    public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {

    1. List<Invoker<T>> copyInvokers = invokers;
    2. checkInvokers(copyInvokers, invocation);
    3. String methodName = RpcUtils.getMethodName(invocation);
    4. // 获取重试次数,通过retries="N"来指定
    5. int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    6. // 默认为一次
    7. if (len <= 0) {
    8. len = 1;
    9. }
    10. RpcException le = null;
    11. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
    12. Set<String> providers = new HashSet<String>(len);
    13. // 循环调用,失败重试
    14. for (int i = 0; i < len; i++) {
    15. if (i > 0) {
    16. checkWhetherDestroyed();
    17. // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
    18. // 通过调用 list 可得到最新可用的 Invoker 列表
    19. copyInvokers = list(invocation);
    20. checkInvokers(copyInvokers, invocation);
    21. }
    22. // 通过负载均衡选择Invoker
    23. Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
    24. // 添加到 invoker 到 invoked 列表中
    25. invoked.add(invoker);
    26. // 设置 invoked 到 RPC 上下文中
    27. RpcContext.getContext().setInvokers((List) invoked);
    28. try {
    29. // 调用目标 Invoker 的 invoke 方法
    30. Result result = invoker.invoke(invocation);
    31. if (le != null && logger.isWarnEnabled()) {
    32. logger.warn("Although retry the method " + methodName
    33. + " in the service " + getInterface().getName()
    34. + " was successful by the provider " + invoker.getUrl().getAddress()
    35. + ", but there have been failed providers " + providers
    36. + " (" + providers.size() + "/" + copyInvokers.size()
    37. + ") from the registry " + directory.getUrl().getAddress()
    38. + " on the consumer " + NetUtils.getLocalHost()
    39. + " using the dubbo version " + Version.getVersion() + ". Last error is: "
    40. + le.getMessage(), le);
    41. }
    42. return result;
    43. } catch (RpcException e) {
    44. if (e.isBiz()) { // biz exception.
    45. throw e;
    46. }
    47. le = e;
    48. } catch (Throwable e) {
    49. le = new RpcException(e.getMessage(), e);
    50. } finally {
    51. providers.add(invoker.getUrl().getAddress());
    52. }
    53. }
    54. throw new RpcException(le.getCode(), "Failed to invoke the method "
    55. + methodName + " in the service " + getInterface().getName()
    56. + ". Tried " + len + " times of the providers " + providers
    57. + " (" + providers.size() + "/" + copyInvokers.size()
    58. + ") from the registry " + directory.getUrl().getAddress()
    59. + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
    60. + Version.getVersion() + ". Last error is: "
    61. + le.getMessage(), le.getCause() != null ? le.getCause() : le);

    }

如上,FailoverClusterInvoker 的 doInvoke 方法首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。

发表评论

表情:
评论列表 (有 0 条评论,29人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Dubbo容错机制

    Dubbo相关容错机制记录: 一、容错机制: 1、Failover Cluster(默认)  失败自动切换,当出现失败,重试其它服务器。 通常用于读操作,但重试会