谈谈dubbo集群容错

港控/mmm° 2022-10-01 10:52 269阅读 0赞

从继承图可以看出有以下几种策略:

  • Failover Cluster(失败转移) 当调用提供者的服务器发生错误时,再试下一个服务器。 用于读操作。重试有延时。设置重试次数

具体代码实现

  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
  2. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
  3. Set<String> providers = new HashSet<String>(len);
  4. for (int i = 0; i < len; i++) {//5
  5. if (i > 0) {
  6. checkWhetherDestroyed();
  7. copyinvokers = list(invocation);
  8. checkInvokers(copyinvokers, invocation);
  9. }
  10. Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);//1
  11. invoked.add(invoker);//2
  12. RpcContext.getContext().setInvokers((List) invoked);
  13. try {
  14. Result result = invoker.invoke(invocation);//3
  15. if (le != null && logger.isWarnEnabled()) {logger.warn
  16. return result;
  17. } catch (RpcException e) {//4
  18. if (e.isBiz()) { throw e;
  19. le = e;
  20. } catch (Throwable e) {
  21. le = new RpcException(e.getMessage(), e);
  22. } finally
  23. providers.add(invoker.getUrl().getAddress());
  24. }
  25. }//for-end
  26. throw new RpcException(le.getCode()//6
  27. }
  28. 复制代码

实现思想:利用for循环,成功直接return。这样就可以多次调用。

每一次调用先找一个服务器,加入到已调用结合。开始调用。拿到结果。如果出错,catch住,分析异常类型。当超出规定次数,直接抛异常。

  • Failfast Cluster (快速失败) 只发起一次调用,失败立即报错。一般对应于非幂等性的写操作。比如新增。

    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. checkInvokers(invokers, invocation);
    2. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    3. try {
    4. return invoker.invoke(invocation);
    5. } catch (Throwable e) {
    6. if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
    7. throw (RpcException) e;
    8. }
    9. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
    10. }
    11. }

    复制代码

实现思想:直接调用。处理异常。简单

  • Failback Cluster (失败自动恢复) 失败自动恢复,后台记录失败请求,定时重发。比如消息通知

    protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. try {
    2. checkInvokers(invokers, invocation);
    3. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    4. return invoker.invoke(invocation);
    5. } catch (Throwable e) {
    6. logger.error(
    7. addFailed(invocation, this);
    8. return new RpcResult(); // ignore
    9. }
    10. }
    11. private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    12. if (retryFuture == null) {
    13. synchronized (this) {
    14. if (retryFuture == null) {
    15. retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
    16. @Override
    17. public void run() {
    18. // collect retry statistics
    19. try {
    20. retryFailed();
    21. } catch (Throwable t) { // Defensive fault tolerance
    22. logger.error("Unexpected error occur at collect statistic", t);
    23. }
    24. }
    25. }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
    26. }
    27. }
    28. }
    29. failed.put(invocation, router);
    30. }

    复制代码

实现思想:先直接调用。失败后,放到map直接保存。然后用定时线程池去重试map里的信息。

  • FailSafe Cluster (失败安全) 失败了,直接忽略。比如写入日志啥的

    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. try {
    2. checkInvokers(invokers, invocation);
    3. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    4. return invoker.invoke(invocation);
    5. } catch (Throwable e) {
    6. logger.error("Failsafe ignore exception: " + e.getMessage(), e);
    7. return new RpcResult(); // ignore
    8. }
    9. }

    复制代码

  • Forking Cluster (并行调用) 同时调用多个服务器,有一个成功,就返回。用于实时性较高的情况

    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. try {
    2. checkInvokers(invokers, invocation);
    3. final List<Invoker<T>> selected;
    4. final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
    5. final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    6. if (forks <= 0 || forks >= invokers.size()) {
    7. selected = invokers;
    8. } else {
    9. selected = new ArrayList<>();
    10. for (int i = 0; i < forks; i++) {
    11. // TODO. Add some comment here, refer chinese version for more details.
    12. Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
    13. if (!selected.contains(invoker)) {
    14. //Avoid add the same invoker several times.
    15. selected.add(invoker);
    16. }
    17. }
    18. }
    19. RpcContext.getContext().setInvokers((List) selected);
    20. final AtomicInteger count = new AtomicInteger();
    21. final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
    22. for (final Invoker<T> invoker : selected) {
    23. executor.execute(new Runnable() {
    24. @Override
    25. public void run() {
    26. try {
    27. Result result = invoker.invoke(invocation);
    28. ref.offer(result);
    29. } catch (Throwable e) {
    30. int value = count.incrementAndGet();
    31. if (value >= selected.size()) {
    32. ref.offer(e);
    33. }
    34. }
    35. }
    36. });
    37. }
    38. try {
    39. Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
    40. if (ret instanceof Throwable) {
    41. Throwable e = (Throwable) ret;
    42. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    43. }
    44. return (Result) ret;
    45. } catch (InterruptedException e) {
    46. throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    47. }
    48. } finally {
    49. // clear attachments which is binding to current thread.
    50. RpcContext.getContext().clearAttachments();
    51. }
    52. }

    复制代码

实现思想:利用list保存多个服务器。用线程池同时提交调用。将结果放到LinkedBlockingQueue。然后从队列里取结果。

  • Broadcast Cluster (广播调用) 广播调用所有提供者,琢一调用。任意一台报错,则报错。用于通知服务提供者更新资源啥的。

    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. checkInvokers(invokers, invocation);
    2. RpcContext.getContext().setInvokers((List) invokers);
    3. RpcException exception = null;
    4. Result result = null;
    5. for (Invoker<T> invoker : invokers) {
    6. try {
    7. result = invoker.invoke(invocation);
    8. } catch (RpcException e) {
    9. exception = e;
    10. logger.warn(e.getMessage(), e);
    11. } catch (Throwable e) {
    12. exception = new RpcException(e.getMessage(), e);
    13. logger.warn(e.getMessage(), e);
    14. }
    15. }
    16. if (exception != null) {
    17. throw exception;
    18. }
    19. return result;
    20. }

    复制代码

实现思想:for循环挨个调用。

转载于:https://juejin.im/post/5c04ea1051882508851b7c53

发表评论

表情:
评论列表 (有 0 条评论,269人围观)

还没有评论,来说两句吧...

相关阅读

    相关 dubbo容错

    为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。这样, 在同一环境下的服务提供者数量会大于1。对于服务消费者来说,同一环

    相关 谈谈dubbo容错

    从继承图可以看出有以下几种策略: Failover Cluster(失败转移) 当调用提供者的服务器发生错误时,再试下一个服务器。 用于读操作。重试有延时。设置重试次数

    相关 Dubbo容错

    Dubbo官网:[dubbo 2.7  集群容错][dubbo 2.7_] 集群容错指的是,当消费者调用提供者集群时发生异常的处理方案 一、Dubbo内置的容错策略

    相关 Dubbo-容错

    当一个项目(应用或服务)放在多台服务器上时,怎么实现高可用,这就是集群容错。 \------------------------------------------

    相关 Dubbo——容错

    Cluster层概述 在微服务环境中,为了保证服务的高可用,很少会有单点服务出现,服务通常都是以集群的形式出现的。然而,被调用的远程服务并不是每时每刻都保持良好状况,当某