Davids原理探究:Dubbo服务消费原理

快来打我* 2023-02-20 07:40 133阅读 0赞

文章目录

    • Dubbo服务消费原理
      • 服务消费核心类ReferenceConfig
      • 从注册中心消费
      • 创建具体远程Invoker
      • 多注册中心消费原理
      • 直连服务消费原理

Dubbo服务消费原理

在这里插入图片描述

关注可以查看更多粉丝专享blog~

前面已经讲过Dubbo服务暴露的原理,今天分析一下Dubbo服务消费原理,相比于服务暴露原理,服务消费就有点像把暴露原理倒过来,可以看一下消费原理图。跟服务暴露的流程很相似,只是服务暴露的核心类是ServiceConfig,服务消费的核心类是ReferenceConfig,从名字上也可以看出来。

服务暴露流程:ServiceConfig --> ProxyFactory --> Invoker --> Protocol --> Exporter
服务消费流程:ReferenceConfig --> Protocol --> Invoker --> ProxyFactory --> ref

一个是把本地的推倒远端,一个是把远端的代理到本地,像本地调用一样的进行远程调用,具体是否调用本地或者远端,对于我们来说是透明的,Dubbo已经帮我们封装好了。

Dubbo服务消费机制

整体上看,Dubbo框架实现服务消费也分为两大部分:

  1. 通过持有远程对象实例生成Invoker,这个Invoker在客户端是核心的远程代理对象。
  2. 把Invoker通过动态代理转换成实现用户接口的动态代理引用。

这里Invoker承载了网络连接、服务调用和重试等功能,在客户端,它可能是一个远程的实现,也可能是一个集群的实现。
框架真正进行服务引用的入口点在ReferenceBean#getObject,不管是XML还是注解,都会转换成ReferenceBean,它继承自ReferenceConfig。

服务消费核心类ReferenceConfig

  1. 优先判断是否处于同一个JVM里面,默认场景下Dubbo会找出内存中的injvm协议的服务(服务暴露时会注册一份到injvm,将服务实例放到内存map中)直接获取实例调用。
  2. 在注册中心追加消费者元数据信息,应用启动时订阅注册中心、服务提供者参数等合并时会用到这部分信息。
  3. 处理只有一个注册中心的场景,这种场景在客户端中是最常见的,客户端启动拉取服务元数据,订阅provider、路由和配置变更。
  4. 处理多注册中心的场景。逐个获取注册中心的服务,并添加到invokers列表中,后面通过Cluster将多个Invoker转换成一个Invoker。

    // ReferenceConfig#createProxy
    private T createProxy(Map map) {

    1. // 检查是否是同一个JVM的内部引用
    2. if (shouldJvmRefer(map)) {
    3. // 构造injvm url
    4. URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
    5. // 直接使用injvm协议从内存中获取实例
    6. invoker = REF_PROTOCOL.refer(interfaceClass, url);
    7. if (logger.isInfoEnabled()) {
    8. logger.info("Using injvm service " + interfaceClass.getName());
    9. }
    10. } else {
    11. urls.clear();
    12. // 用户指定的URL,可以是点对点地址,或注册中心地址
    13. if (url != null && url.length() > 0) {
    14. String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
    15. if (us != null && us.length > 0) {
    16. for (String u : us) {
    17. URL url = URL.valueOf(u);
    18. if (StringUtils.isEmpty(url.getPath())) {
    19. url = url.setPath(interfaceName);
    20. }
    21. if (UrlUtils.isRegistry(url)) {
    22. // 注册中心地址后面添加refer存储服务消费元数据信息
    23. urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
    24. } else {
    25. // 直连某台服务提供者
    26. urls.add(ClusterUtils.mergeUrl(url, map));
    27. }
    28. }
    29. }
    30. } else {
    31. // 如果协议是不injvm,则从注册中心的配置组装URL
    32. if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
    33. checkRegistry();
    34. List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
    35. if (CollectionUtils.isNotEmpty(us)) {
    36. for (URL u : us) {
    37. URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
    38. if (monitorUrl != null) {
    39. map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
    40. }
    41. // 注册中心地址后面添加refer存储服务消费元数据信息
    42. urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
    43. }
    44. }
    45. // 如果urls为空则报错,既不是injvm,注册中也未获取到对应的服务
    46. if (urls.isEmpty()) {
    47. throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
    48. }
    49. }
    50. }
    51. // 单注册中心消费
    52. if (urls.size() == 1) {
    53. // 使用注册中心对应协议从内存中获取实例
    54. invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
    55. } else {
    56. // 多注册中心消费
    57. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    58. URL registryURL = null;
    59. for (URL url : urls) {
    60. // 遍历注册中心列表,诸葛获取注册中心服务,并添加到invokers列表
    61. invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
    62. if (UrlUtils.isRegistry(url)) {
    63. // 使用最后的注册地址
    64. registryURL = url;
    65. }
    66. }
    67. // registryURL != null,标识注册表url可用
    68. if (registryURL != null) {
    69. // 多注册中心场景, 使用'zone-aware'策略
    70. URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
    71. // 使用Cluster将多个Invoker转换成一个Invoker
    72. invoker = CLUSTER.join(new StaticDirectory(u, invokers));
    73. } else {
    74. // 非注册中心url则直接调用
    75. invoker = CLUSTER.join(new StaticDirectory(invokers));
    76. }
    77. }
    78. }
    79. // 如果开启了状态检查,则检查Invoker是否可用
    80. if (shouldCheck() && !invoker.isAvailable()) {
    81. invoker.destroy();
    82. throw new IllegalStateException("Failed to check the status of the service "
    83. + interfaceName
    84. + ". No provider available for the service "
    85. + (group == null ? "" : group + "/")
    86. + interfaceName +
    87. (version == null ? "" : ":" + version)
    88. + " from the url "
    89. + invoker.getUrl()
    90. + " to the consumer "
    91. + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
    92. }
    93. // 把Invoker转换成代理接口
    94. return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

    }

从注册中心消费

当经过注册中心线消费时,主要通过RegistryProtocol#refer触发数据拉取、订阅和服务Invoker转换等操作,其中最核心的数据结构是RegistryDirectory。这段逻辑主要完成了注册中心实例的创建,元数据注册到注册中心及订阅的功能。

  1. 根据用户指定的注册中心进行协议替换,具体注册中心协议会在启动时用registry储存对应值。
  2. 创建注册中心实例,这里的URL其实是注册中心地址,真实消费方的元数据信息是放在refer属性中储存的。
  3. 提取消费方refer中的元数据信息,如果包含多个分组值则会把调用结果进行合并。
  4. 触发真正的服务订阅和Invoker转换。
  5. RegistryDirectory实现了NotifyListerer接口,服务变更会触发这个类回调notify方法,用于重新引用服务。
  6. 把消费者元数据信息注册到注册中心(如:消费方应用名、IP和端口号等)。
  7. 处理provider、路由和动态配置的订阅。
  8. 通过Cluster合并invokers

    // RegistryProtocol#refer
    @Override
    @SuppressWarnings(“unchecked”)
    public Invoker refer(Class type, URL url) throws RpcException {

    1. // 设置具体的注册中心协议
    2. url = URLBuilder.from(url)
    3. .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
    4. .removeParameter(REGISTRY_KEY)
    5. .build();
    6. // 创建具体注册中心实例
    7. Registry registry = registryFactory.getRegistry(url);
    8. // 如果引用服务的接口类是RegistryService则直接使用动态代理创建Invoker代理对象
    9. if (RegistryService.class.equals(type)) {
    10. return proxyFactory.getInvoker((T) registry, type, url);
    11. }
    12. // group="a,b" or group="*"
    13. // 根据配置处理多分组结果聚合
    14. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    15. String group = qs.get(GROUP_KEY);
    16. if (group != null && group.length() > 0) {
    17. // 当一个接口有多种实现时,使用MergeableCluster对结果集进行合并处理
    18. if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
    19. return doRefer(getMergeableCluster(), registry, type, url);
    20. }
    21. }
    22. // 处理订阅数据并通过Cluster合并多个Invoker
    23. return doRefer(cluster, registry, type, url);

    }

    private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {

    1. // 消费核心关键,持有实际Invoker和接收订阅通知
    2. // RegistryDirectory实现了NotifyListerer接口,服务变更会触发这个类回调notify方法,用于重新引用服务
    3. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    4. directory.setRegistry(registry);
    5. directory.setProtocol(protocol);
    6. // REFER_KEY的所有属性
    7. Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    8. URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    9. // isShouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
    10. // 如果directory需要注册,则将消费者注册到注册中心
    11. if (directory.isShouldRegister()) {
    12. directory.setRegisteredConsumerUrl(subscribeUrl);
    13. registry.register(directory.getRegisteredConsumerUrl());
    14. }
    15. // 构造路由规则链
    16. directory.buildRouterChain(subscribeUrl);
    17. // 向注册中心发起订阅(服务提供者、路由和动态配置)
    18. directory.subscribe(toSubscribeUrl(subscribeUrl));
    19. // 通过Cluster合并Invokers
    20. Invoker<T> invoker = cluster.join(directory);
    21. // 如果没有相关订阅者则直接返回
    22. List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    23. if (CollectionUtils.isEmpty(listeners)) {
    24. return invoker;
    25. }
    26. // 如果有订阅者则通知订阅者,该consumer注册事件
    27. RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
    28. for (RegistryProtocolListener listener : listeners) {
    29. listener.onRefer(this, registryInvokerWrapper);
    30. }
    31. return registryInvokerWrapper;

    }

创建具体远程Invoker

当第一次发起订阅时会进行一次数据拉取操作,同时触发RegistryDirectory#notify方法,这里的通知数据是某一个类别的全量数据,比如provider和router类别数据。当通知provider数据时,在RegistryDirectory#toInvokers方法内完成Invoker转换。

  1. // RegistryDirectory#toInvokers
  2. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  3. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
  4. if (urls == null || urls.isEmpty()) {
  5. return newUrlInvokerMap;
  6. }
  7. Set<String> keys = new HashSet<>();
  8. String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
  9. for (URL providerUrl : urls) {
  10. // 如果协议在reference端配置,则只选择匹配的协议
  11. if (queryProtocols != null && queryProtocols.length() > 0) {
  12. boolean accept = false;
  13. String[] acceptProtocols = queryProtocols.split(",");
  14. // 根据消费方protocol配置过滤不匹配协议
  15. for (String acceptProtocol : acceptProtocols) {
  16. if (providerUrl.getProtocol().equals(acceptProtocol)) {
  17. accept = true;
  18. break;
  19. }
  20. }
  21. if (!accept) {
  22. continue;
  23. }
  24. }
  25. // empty协议直接跳过
  26. if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
  27. continue;
  28. }
  29. // 没有对应的协议扩展打印日志并跳过
  30. if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
  31. logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
  32. " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
  33. " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
  34. ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
  35. continue;
  36. }
  37. // 合并provider端配置数据(如:服务端IP和port等)
  38. URL url = mergeUrl(providerUrl);
  39. String key = url.toFullString();
  40. // 忽略重复推送列表
  41. if (keys.contains(key)) {
  42. continue;
  43. }
  44. keys.add(key);
  45. // 缓存键是不与用户端参数合并的url,无论用户如何合并参数,如果服务器url更改,则再次引用
  46. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
  47. Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
  48. if (invoker == null) {
  49. try {
  50. boolean enabled = true;
  51. if (url.hasParameter(DISABLED_KEY)) {
  52. enabled = !url.getParameter(DISABLED_KEY, false);
  53. } else {
  54. enabled = url.getParameter(ENABLED_KEY, true);
  55. }
  56. if (enabled) {
  57. // 使用具体协议创建远程调用
  58. invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
  59. }
  60. } catch (Throwable t) {
  61. logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
  62. }
  63. if (invoker != null) {
  64. // 缓存invoker
  65. newUrlInvokerMap.put(key, invoker);
  66. }
  67. } else {
  68. // 缓存invoker
  69. newUrlInvokerMap.put(key, invoker);
  70. }
  71. }
  72. keys.clear();
  73. return newUrlInvokerMap;
  74. }

多注册中心消费原理

在实际使用中,我们更多遇到的是但注册中心场景,但是当跨机房消费时,Dubbo框架允许同时消费多个机房的服务。默认Dubbo消费机房服务的顺序按照配置注册中心的顺序决定,配置靠前优先消费。(多注册中心场景下,默认使用的集群策略是available)
处理逻辑在上方ReferenceConfig#createProxy第57至78行。

直连服务消费原理

Dubbo可以绕过注册中心直接向指定一台或多台服务(直接指定目标IP和端口)发起RPC调用,使用直连模式可以方便在某些场景下使用,比如压测指定机器等。
处理逻辑在上方ReferenceConfig#createProxy第13至31行。

相关文章:
Davids原理探究:Dubbo源码编译(2.7.8)
Davids原理探究:Dubbo SPI和Java SPI实现原理
Davids原理探究:Dubbo注册中心(ZooKeeper、Redis)实现原理
Davids原理探究:Dubbo配置解析原理
Davids原理探究:Dubbo服务暴露原理
Davids原理探究:Dubbo服务消费原理
Davids原理探究:Dubbo优雅停机原理
Davids原理探究:Dubbo调用流程图
Davids原理探究:Dubbo路由实现原理
Davids原理探究:Dubbo负载均衡实现原理
Davids原理探究:Dubbo过滤器原理

发表评论

表情:
评论列表 (有 0 条评论,133人围观)

还没有评论,来说两句吧...

相关阅读