dubbo源码之服务调用过程

阳光穿透心脏的1/2处 2022-12-10 14:43 281阅读 0赞

前言

前面有介绍服务暴露和服务引入两个流程,而这两个流程就是为了服务的调用。由前面两篇可以知道我们具体调用信息已经被封装到invoker 里面。今天主要是介绍dubbo在调用服务的时候,如何获取到封装好invoker,对服务进行调用。

这篇文章和前面三篇一样,基于dubbo 2.7.1、 zookeeper为注册中心、采用dubbo 协议。

调用流程-消费端源码分析

由前面服务引入可以知道调用接口,是dubbo controller 进行属性赋值的时候注入代理对象。
在这里插入图片描述

可以看到注入代理是来自InvokerInvocationHandler 的代理对象,InvokerInvocationHandler 里面封装了MockClusterInvoker 对象。

在这里插入图片描述

可以看到前面主要是获取一些参数,就由MockClusterInvoker 对象进行具体调用了

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. Result result = null;
  3. String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
  4. //查看是否配置mock参数
  5. if (value.length() != 0 && !value.equalsIgnoreCase("false")) {
  6. //如果配置强制mock调用
  7. if (value.startsWith("force")) {
  8. if (logger.isWarnEnabled()) {
  9. logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.directory.getUrl());
  10. }
  11. result = this.doMockInvoke(invocation, (RpcException)null);
  12. } else {
  13. try {
  14. result = this.invoker.invoke(invocation);
  15. } catch (RpcException var5) {
  16. if (var5.isBiz()) {
  17. throw var5;
  18. }
  19. if (logger.isWarnEnabled()) {
  20. logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.directory.getUrl(), var5);
  21. }
  22. result = this.doMockInvoke(invocation, var5);
  23. }
  24. }
  25. } else {
  26. //如果没有配置,正常调用
  27. result = this.invoker.invoke(invocation);
  28. }
  29. return result;
  30. }

mock 的话就不展开分析了,我们来看看 this.invoker.invoke 的实现,实际上会调用 AbstractClusterInvoker#invoker

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. //检查是否被销毁
  3. this.checkWhetherDestroyed();
  4. //查看是否有附件,如果有绑定到invocation 上面
  5. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
  6. if (contextAttachments != null && contextAttachments.size() != 0) {
  7. ((RpcInvocation)invocation).addAttachments(contextAttachments);
  8. }
  9. //里面调用的是this.directory.list(invocation); 里面做路由过滤
  10. List<Invoker<T>> invokers = this.list(invocation);
  11. //过滤完初始化负载均衡器
  12. LoadBalance loadbalance = this.initLoadBalance(invokers, invocation);
  13. RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
  14. //默认调用子类FailoverClusterInvoker #doInvoker
  15. return this.doInvoke(invocation, invokers, loadbalance);
  16. }
  17. //初始化负载均衡器
  18. protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
  19. //可以看到getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")
  20. //如果获取loadbalance为空则设置默认,随机均衡器
  21. return CollectionUtils.isNotEmpty(invokers) ? (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random")) : (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
  22. }

FailoverClusterInvoker #doInvoker 的调用方法,在这个方法中进行重试容错处理

  1. public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. List<Invoker<T>> copyInvokers = invokers;
  3. this.checkInvokers(invokers, invocation);
  4. String methodName = RpcUtils.getMethodName(invocation);
  5. //可以看到如果调用失败默认重试2+1次
  6. int len = this.getUrl().getMethodParameter(methodName, "retries", 2) + 1;
  7. if (len <= 0) {
  8. len = 1;
  9. }
  10. RpcException le = null;
  11. List<Invoker<T>> invoked = new ArrayList(invokers.size());
  12. Set<String> providers = new HashSet(len);
  13. for(int i = 0; i < len; ++i) {
  14. //重试时候调用
  15. if (i > 0) {
  16. this.checkWhetherDestroyed();
  17. copyInvokers = this.list(invocation);
  18. this.checkInvokers(copyInvokers, invocation);
  19. }
  20. //根据上一个类AbstractClusterInvoker 中获取到负载均衡方式,获取到一个合适invoker
  21. Invoker<T> invoker = this.select(loadbalance, invocation, copyInvokers, invoked);
  22. invoked.add(invoker);
  23. //保存上下文调用过信息,下次重试过滤用
  24. RpcContext.getContext().setInvokers(invoked);
  25. try {
  26. //发起调用
  27. Result result = invoker.invoke(invocation);
  28. if (le != null && logger.isWarnEnabled()) {
  29. logger.warn("Although retry the method " + methodName + " in the service " + this.getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
  30. }
  31. Result var13 = result;
  32. return var13;
  33. } catch (RpcException var18) {
  34. if (var18.isBiz()) {
  35. throw var18;
  36. }
  37. le = var18;
  38. } catch (Throwable var19) {
  39. le = new RpcException(var19.getMessage(), var19);
  40. } finally {
  41. providers.add(invoker.getUrl().getAddress());
  42. }
  43. }
  44. throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + this.getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), (Throwable)(le.getCause() != null ? le.getCause() : le));
  45. }
  46. }

invoker.invoke(invocation);过程中会经过挺多过滤器、适配器,我们直接跳过看AbstractInvoker的 invoke方法。这个是跳过过滤器类。
在这里插入图片描述

AbstractInvoker调用this.doInvoke(invocation);的子类DubboInvoker#
doInvoke方法

  1. protected Result doInvoke(Invocation invocation) throws Throwable {
  2. RpcInvocation inv = (RpcInvocation)invocation;
  3. String methodName = RpcUtils.getMethodName(invocation);
  4. //设置路径到附件中
  5. inv.setAttachment("path", this.getUrl().getPath());
  6. //设置版本到附件中
  7. inv.setAttachment("version", this.version);
  8. ExchangeClient currentClient;
  9. //选择需要通信的netty 客户端
  10. if (this.clients.length == 1) {
  11. currentClient = this.clients[0];
  12. } else {
  13. currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
  14. }
  15. try {
  16. //是否是异步调用
  17. boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
  18. //是否是异步调用需要获取返回值
  19. boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
  20. //是否是oneway 方式发送
  21. boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
  22. //获取配置中的超时减价
  23. int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
  24. //oneway 不需要获取返回值
  25. if (isOneway) {
  26. boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
  27. currentClient.send(inv, isSent);
  28. RpcContext.getContext().setFuture((Future)null);
  29. return new RpcResult();
  30. //异步
  31. } else if (isAsync) {
  32. ResponseFuture future = currentClient.request(inv, timeout);
  33. FutureAdapter<Object> futureAdapter = new FutureAdapter(future);
  34. RpcContext.getContext().setFuture(futureAdapter);
  35. Object result;
  36. if (isAsyncFuture) {
  37. result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
  38. } else {
  39. result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
  40. }
  41. return (Result)result;
  42. } else {
  43. RpcContext.getContext().setFuture((Future)null);
  44. return (Result)currentClient.request(inv, timeout).get();
  45. }
  46. } catch (TimeoutException var12) {
  47. throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var12.getMessage(), var12);
  48. } catch (RemotingException var13) {
  49. throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var13.getMessage(), var13);
  50. }
  51. }

oneway,就是当你不关心你的请求是否发送成功的情况下,就用 oneway 的方式发送,这种方式消耗最小,啥都不用记,啥都不用管。

异步调用,其实 Dubbo 天然就是异步的,可以看到 client 发送请求之后会得到一个 ResponseFuture,然后把 future 包装一下塞到上下文中,这样用户就可以从上下文中拿到这个 future,然后用户可以做了一波操作之后再调用 future.get 等待结果。

同步调用,这是我们最常用的,也就是 Dubbo 框架帮助我们异步转同步了,从代码可以看到在 Dubbo 源码中就调用了 future.get,所以给用户的感觉就是我调用了这个接口的方法之后就阻塞住了,必须要等待结果到了之后才能返回,所以就是同步的。

到这里就将dubbo 消息发送给服务提供者了

具体流程

首先客户端调用接口的某个方法,实际调用的是InvokerInvocationHandler 代理类,代理类会通过 cluster 从 directory 中获取 invokers列表,然后进行 router 的过滤,然后再通过 SPI 得到 loadBalance 进行一波负载均衡。
默认的 cluster 是 FailoverCluster ,根据获取到负载均衡器选择invoker 进行调用,如果调用失败进行容错重试处理,
在调用过程中,根据设置具体的协议构造请求头,然后将参数根据具体的序列化协议序列化之后构造塞入请求体中,再通过 NettyClient 发起远程调用。

发表评论

表情:
评论列表 (有 0 条评论,281人围观)

还没有评论,来说两句吧...

相关阅读