Dubbo集群容错
Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker 来处理集群容错问题。
Cluster
Cluster接口的实现类有FailoverCluster、FailfastCluster、FailsafeCluster等,其作用都是通过join方法生成对应的invoker。
@SPI(FailoverCluster.NAME)
public interface Cluster {
/**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
AbstractClusterInvoker
AbstractClusterInvoker实现了Invoker接口,同时也是FairoverClusterInvoker、FairfastClusterInvoker、FairsafeClusterInvoker等Cluster Invoker的父类。在服务消费者进行远程调用时,AbstractClusterInvoker 的 invoke 方法会被调用。列举 Invoker、负载均衡、集群容错等操作均会在此阶段被执行。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
......
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 列举 Invoker
List<Invoker<T>> invokers = list(invocation);
// 加载 LoadBalance
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用 doInvoke 进行后续集群容错等操作(抽象方法,由子类实现)
return doInvoke(invocation, invokers, loadbalance);
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
......
}
集群容错方式
名称 | 简介 |
Failover | 失败自动切换: 当出现请求失败时,会重试其它服务器。可以通过retries设置重试次数,默认为2次。该方式是dubbo默认的容错机制,适用于读操作或幂等的写操作。 |
Failfast | 快速失败: 当请求失败后,快速返回异常结果,不做任何重试。适用于非幂等接口。 |
Failsafe | 失败安全: 当请求出现异常时,直接忽略异常。适用于佛系调用场景,即不关心调用是否成功,也不想影响外层的调用,例如不重要的日志同步等。 |
Failback | 失败自动恢复: 请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试。适用于一些异步请求或最终一致性的请求。 |
Forking | 并行调用: 同时调用多个相同的服务,只要有一个返回,则立即返回结果,可通过forks设置并行数。适用于某些对实时性要求极高的调用上,但也会浪费更多的资源。 |
Broadcast | 广播调用: 广播调用所有可用的服务,任意一个节点报错则报错。适用于服务测试。 |
Available | 最简单的调用: 请求不会做负载均衡,遍历所有服务列表,找到第一个可用的节点,直接请求并返回,如果没有可用节点则抛出异常。 |
FailoverClusterInvoker.doInvoker
FailoverClusterInvoker是默认的Cluster Invoker,在doInvoker方法中实现集群容错。
首先是获取重试次数,然后根据重试次数进行循环调用,失败后进行重试。在 for 循环内,首先是通过负载均衡组件选择一个 Invoker,然后再通过这个 Invoker 的 invoke 方法进行远程调用。如果失败了,记录下异常,并进行重试。重试时会再次调用父类的 list 方法列举 Invoker。
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取重试次数 DEFAULT_RETRIES 默认 2
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 通过负载均衡选择 Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置 invoked 到 RPC 上下文中
RpcContext.getContext().setInvokers((List) invoked);
try {
// 调用目标 Invoker 的 invoke 方法
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 若重试失败,则抛出异常
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
还没有评论,来说两句吧...