  • Failover Cluster(失败转移) 当调用提供者的服务器发生错误时,再试下一个服务器。 用于读操作。重试有延时。设置重试次数


  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
  2. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
  3. Set<String> providers = new HashSet<String>(len);
  4. for (int i = 0; i < len; i++) {//5
  5. if (i > 0) {
  6. checkWhetherDestroyed();
  7. copyinvokers = list(invocation);
  8. checkInvokers(copyinvokers, invocation);
  9. }
  10. Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);//1
  11. invoked.add(invoker);//2
  12. RpcContext.getContext().setInvokers((List) invoked);
  13. try {
  14. Result result = invoker.invoke(invocation);//3
  15. if (le != null && logger.isWarnEnabled()) {logger.warn
  16. return result;
  17. } catch (RpcException e) {//4
  18. if (e.isBiz()) { throw e;
  19. le = e;
  20. } catch (Throwable e) {
  21. le = new RpcException(e.getMessage(), e);
  22. } finally
  23. providers.add(invoker.getUrl().getAddress());
  24. }
  25. }//for-end
  26. throw new RpcException(le.getCode()//6
  27. }
  • Failfast Cluster (快速失败) 只发起一次调用,失败立即报错。一般对应于非幂等性的写操作。比如新增。

    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. checkInvokers(invokers, invocation);
    2. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    3. try {
    4. return invoker.invoke(invocation);
    5. } catch (Throwable e) {
    6. if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
    7. throw (RpcException) e;
    8. }
    9. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
    10. }
    11. }



  • Failback Cluster (失败自动恢复) 失败自动恢复,后台记录失败请求,定时重发。比如消息通知

    protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. try {
    2. checkInvokers(invokers, invocation);
    3. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    4. return invoker.invoke(invocation);
    5. } catch (Throwable e) {
    6. logger.error(
    7. addFailed(invocation, this);
    8. return new RpcResult(); // ignore
    9. }
    10. }
    11. private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    12. if (retryFuture == null) {
    13. synchronized (this) {
    14. if (retryFuture == null) {
    15. retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
    16. @Override
    17. public void run() {
    18. // collect retry statistics
    19. try {
    20. retryFailed();
    21. } catch (Throwable t) { // Defensive fault tolerance
    22. logger.error("Unexpected error occur at collect statistic", t);
    23. }
    24. }
    26. }
    27. }
    28. }
    29. failed.put(invocation, router);
    30. }



  • FailSafe Cluster (失败安全) 失败了,直接忽略。比如写入日志啥的

    public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. try {
    2. checkInvokers(invokers, invocation);
    3. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    4. return invoker.invoke(invocation);
    5. } catch (Throwable e) {
    6. logger.error("Failsafe ignore exception: " + e.getMessage(), e);
    7. return new RpcResult(); // ignore
    8. }
    9. }


  • Forking Cluster (并行调用) 同时调用多个服务器,有一个成功,就返回。用于实时性较高的情况

    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. try {
    2. checkInvokers(invokers, invocation);
    3. final List<Invoker<T>> selected;
    4. final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
    5. final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    6. if (forks <= 0 || forks >= invokers.size()) {
    7. selected = invokers;
    8. } else {
    9. selected = new ArrayList<>();
    10. for (int i = 0; i < forks; i++) {
    11. // TODO. Add some comment here, refer chinese version for more details.
    12. Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
    13. if (!selected.contains(invoker)) {
    14. //Avoid add the same invoker several times.
    15. selected.add(invoker);
    16. }
    17. }
    18. }
    19. RpcContext.getContext().setInvokers((List) selected);
    20. final AtomicInteger count = new AtomicInteger();
    21. final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
    22. for (final Invoker<T> invoker : selected) {
    23. executor.execute(new Runnable() {
    24. @Override
    25. public void run() {
    26. try {
    27. Result result = invoker.invoke(invocation);
    28. ref.offer(result);
    29. } catch (Throwable e) {
    30. int value = count.incrementAndGet();
    31. if (value >= selected.size()) {
    32. ref.offer(e);
    33. }
    34. }
    35. }
    36. });
    37. }
    38. try {
    39. Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
    40. if (ret instanceof Throwable) {
    41. Throwable e = (Throwable) ret;
    42. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    43. }
    44. return (Result) ret;
    45. } catch (InterruptedException e) {
    46. throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    47. }
    48. } finally {
    49. // clear attachments which is binding to current thread.
    50. RpcContext.getContext().clearAttachments();
    51. }
    52. }



  • Broadcast Cluster (广播调用) 广播调用所有提供者,琢一调用。任意一台报错,则报错。用于通知服务提供者更新资源啥的。

    public Result doInvoke(final Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {

    1. checkInvokers(invokers, invocation);
    2. RpcContext.getContext().setInvokers((List) invokers);
    3. RpcException exception = null;
    4. Result result = null;
    5. for (Invoker<T> invoker : invokers) {
    6. try {
    7. result = invoker.invoke(invocation);
    8. } catch (RpcException e) {
    9. exception = e;
    10. logger.warn(e.getMessage(), e);
    11. } catch (Throwable e) {
    12. exception = new RpcException(e.getMessage(), e);
    13. logger.warn(e.getMessage(), e);
    14. }
    15. }
    16. if (exception != null) {
    17. throw exception;
    18. }
    19. return result;
    20. }





