dubbo源码之服务调用过程
前言
前面有介绍服务暴露和服务引入两个流程,而这两个流程就是为了服务的调用。由前面两篇可以知道我们具体调用信息已经被封装到invoker 里面。今天主要是介绍dubbo在调用服务的时候,如何获取到封装好invoker,对服务进行调用。
这篇文章和前面三篇一样,基于dubbo 2.7.1、 zookeeper为注册中心、采用dubbo 协议。
调用流程-消费端源码分析
由前面服务引入可以知道调用接口,是dubbo controller 进行属性赋值的时候注入代理对象。
可以看到注入代理是来自InvokerInvocationHandler 的代理对象,InvokerInvocationHandler 里面封装了MockClusterInvoker 对象。
可以看到前面主要是获取一些参数,就由MockClusterInvoker 对象进行具体调用了
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
//查看是否配置mock参数
if (value.length() != 0 && !value.equalsIgnoreCase("false")) {
//如果配置强制mock调用
if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.directory.getUrl());
}
result = this.doMockInvoke(invocation, (RpcException)null);
} else {
try {
result = this.invoker.invoke(invocation);
} catch (RpcException var5) {
if (var5.isBiz()) {
throw var5;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.directory.getUrl(), var5);
}
result = this.doMockInvoke(invocation, var5);
}
}
} else {
//如果没有配置,正常调用
result = this.invoker.invoke(invocation);
}
return result;
}
mock 的话就不展开分析了,我们来看看 this.invoker.invoke 的实现,实际上会调用 AbstractClusterInvoker#invoker
public Result invoke(Invocation invocation) throws RpcException {
//检查是否被销毁
this.checkWhetherDestroyed();
//查看是否有附件,如果有绑定到invocation 上面
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation)invocation).addAttachments(contextAttachments);
}
//里面调用的是this.directory.list(invocation); 里面做路由过滤
List<Invoker<T>> invokers = this.list(invocation);
//过滤完初始化负载均衡器
LoadBalance loadbalance = this.initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
//默认调用子类FailoverClusterInvoker #doInvoker
return this.doInvoke(invocation, invokers, loadbalance);
}
//初始化负载均衡器
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
//可以看到getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")
//如果获取loadbalance为空则设置默认,随机均衡器
return CollectionUtils.isNotEmpty(invokers) ? (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")) : (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
}
FailoverClusterInvoker #doInvoker 的调用方法,在这个方法中进行重试容错处理
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
this.checkInvokers(invokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//可以看到如果调用失败默认重试2+1次
int len = this.getUrl().getMethodParameter(methodName, "retries", 2) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
List<Invoker<T>> invoked = new ArrayList(invokers.size());
Set<String> providers = new HashSet(len);
for(int i = 0; i < len; ++i) {
//重试时候调用
if (i > 0) {
this.checkWhetherDestroyed();
copyInvokers = this.list(invocation);
this.checkInvokers(copyInvokers, invocation);
}
//根据上一个类AbstractClusterInvoker 中获取到负载均衡方式,获取到一个合适invoker
Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
//保存上下文调用过信息,下次重试过滤用
RpcContext.getContext().setInvokers(invoked);
try {
//发起调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName + " in the service " + this.getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
}
Result var13 = result;
return var13;
} catch (RpcException var18) {
if (var18.isBiz()) {
throw var18;
}
le = var18;
} catch (Throwable var19) {
le = new RpcException(var19.getMessage(), var19);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + this.getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), (Throwable)(le.getCause() != null ? le.getCause() : le));
}
}
invoker.invoke(invocation);过程中会经过挺多过滤器、适配器,我们直接跳过看AbstractInvoker的 invoke方法。这个是跳过过滤器类。
AbstractInvoker调用this.doInvoke(invocation);的子类DubboInvoker#
doInvoke方法
protected Result doInvoke(Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation)invocation;
String methodName = RpcUtils.getMethodName(invocation);
//设置路径到附件中
inv.setAttachment("path", this.getUrl().getPath());
//设置版本到附件中
inv.setAttachment("version", this.version);
ExchangeClient currentClient;
//选择需要通信的netty 客户端
if (this.clients.length == 1) {
currentClient = this.clients[0];
} else {
currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
}
try {
//是否是异步调用
boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
//是否是异步调用需要获取返回值
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
//是否是oneway 方式发送
boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
//获取配置中的超时减价
int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
//oneway 不需要获取返回值
if (isOneway) {
boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture((Future)null);
return new RpcResult();
//异步
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter<Object> futureAdapter = new FutureAdapter(future);
RpcContext.getContext().setFuture(futureAdapter);
Object result;
if (isAsyncFuture) {
result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
} else {
result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
}
return (Result)result;
} else {
RpcContext.getContext().setFuture((Future)null);
return (Result)currentClient.request(inv, timeout).get();
}
} catch (TimeoutException var12) {
throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var12.getMessage(), var12);
} catch (RemotingException var13) {
throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var13.getMessage(), var13);
}
}
oneway,就是当你不关心你的请求是否发送成功的情况下,就用 oneway 的方式发送,这种方式消耗最小,啥都不用记,啥都不用管。
异步调用,其实 Dubbo 天然就是异步的,可以看到 client 发送请求之后会得到一个 ResponseFuture,然后把 future 包装一下塞到上下文中,这样用户就可以从上下文中拿到这个 future,然后用户可以做了一波操作之后再调用 future.get 等待结果。
同步调用,这是我们最常用的,也就是 Dubbo 框架帮助我们异步转同步了,从代码可以看到在 Dubbo 源码中就调用了 future.get,所以给用户的感觉就是我调用了这个接口的方法之后就阻塞住了,必须要等待结果到了之后才能返回,所以就是同步的。
到这里就将dubbo 消息发送给服务提供者了
具体流程
首先客户端调用接口的某个方法,实际调用的是InvokerInvocationHandler 代理类,代理类会通过 cluster 从 directory 中获取 invokers列表,然后进行 router 的过滤,然后再通过 SPI 得到 loadBalance 进行一波负载均衡。
默认的 cluster 是 FailoverCluster ,根据获取到负载均衡器选择invoker 进行调用,如果调用失败进行容错重试处理,
在调用过程中,根据设置具体的协议构造请求头,然后将参数根据具体的序列化协议序列化之后构造塞入请求体中,再通过 NettyClient 发起远程调用。
还没有评论,来说两句吧...