Dubbo源码学习--服务发布过程

偏执的太偏执、 2022-05-26 22:22 292阅读 0赞
  1. 这篇博客我们简单了解一下Dubbo在服务发布过程中的相关流程。在上一篇博客[ Dubbo源码学习--DubboSpring融合][Dubbo_--Dubbo_Spring]中我们已经了解了Dubbospring相融合实现的原理,在解析xml中完成bean的初始化的时候会将服务发布到注册中心并对外提供服务。接下来我们分析以下配置的服务发布过程。
  2. <!-- provider's application name, used for tracing dependency relationship -->
  3. <dubbo:application name="demo-provider"/>
  4. <!-- use multicast registry center to export service -->
  5. <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  6. <!-- use dubbo protocol to export service on port 20880 -->
  7. <dubbo:protocol name="dubbo" port="20880"/>
  8. <!-- service implementation, as same as regular local bean -->
  9. <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
  10. <!-- declare the service interface to be exported -->
  11. <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

上述配置在Spring容器启动时会进行解析,在解析成ServiceBean时会在ServiceBean中完成服务的发布流程。

70

ServiceBean处理继承dubbo自己的配置抽象类以外,还实现了一系列的spring接口用来参与到spring容器的启动以及bean的创建过程中去。由于spring的实例化ServiceBean是单例模式的,在Spring的容器ApplicationContext的启动过程refresh过程中最后第二步会预先初始化单例的bean, 在bean的初始化过程会设置beanName, 设置容器applicationContext, 回调 InitializingBean的afterPropertiesSet最后一步finishRefresh会触发ContextRefreshedEvent事件, 而ServiceBean实现了ApplicationListener接口监听了此事件, 而在之前一步实例化的ServiceBean注册了这个事件,所以ServiceBean的onApplicationEvent(ApplicationEvent event)方法被触发, 在这个方法中触发了export方法来暴露服务。

  1. @SuppressWarnings({"unchecked", "deprecation"})
  2. public void afterPropertiesSet() throws Exception {
  3. //<dubbo:provider/> 提供方的缺省值,当ProtocolConfig和ServiceConfig某属性没有配置时,采用此缺省值,可选。
  4. if (getProvider() == null) {
  5. Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false);
  6. if (providerConfigMap != null && providerConfigMap.size() > 0) {
  7. Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
  8. if ((protocolConfigMap == null || protocolConfigMap.size() == 0)
  9. && providerConfigMap.size() > 1) { // backward compatibility
  10. List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>();
  11. for (ProviderConfig config : providerConfigMap.values()) {
  12. if (config.isDefault() != null && config.isDefault().booleanValue()) {
  13. providerConfigs.add(config);
  14. }
  15. }
  16. if (!providerConfigs.isEmpty()) {
  17. setProviders(providerConfigs);
  18. }
  19. } else {
  20. ProviderConfig providerConfig = null;
  21. for (ProviderConfig config : providerConfigMap.values()) {
  22. if (config.isDefault() == null || config.isDefault().booleanValue()) {
  23. if (providerConfig != null) {
  24. throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config);
  25. }
  26. providerConfig = config;
  27. }
  28. }
  29. if (providerConfig != null) {
  30. setProvider(providerConfig);
  31. }
  32. }
  33. }
  34. }
  35. // <dubbo:application name="demo-provider"/> 获取application标签初始化的实体类
  36. if (getApplication() == null
  37. && (getProvider() == null || getProvider().getApplication() == null)) {
  38. Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);
  39. if (applicationConfigMap != null && applicationConfigMap.size() > 0) {
  40. ApplicationConfig applicationConfig = null;
  41. for (ApplicationConfig config : applicationConfigMap.values()) {
  42. if (config.isDefault() == null || config.isDefault().booleanValue()) {
  43. if (applicationConfig != null) {
  44. throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);
  45. }
  46. applicationConfig = config;
  47. }
  48. }
  49. if (applicationConfig != null) {
  50. setApplication(applicationConfig);
  51. }
  52. }
  53. }
  54. //<dubbo:module/> 模块配置,用于配置当前模块信息,可选。
  55. if (getModule() == null
  56. && (getProvider() == null || getProvider().getModule() == null)) {
  57. Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);
  58. if (moduleConfigMap != null && moduleConfigMap.size() > 0) {
  59. ModuleConfig moduleConfig = null;
  60. for (ModuleConfig config : moduleConfigMap.values()) {
  61. if (config.isDefault() == null || config.isDefault().booleanValue()) {
  62. if (moduleConfig != null) {
  63. throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);
  64. }
  65. moduleConfig = config;
  66. }
  67. }
  68. if (moduleConfig != null) {
  69. setModule(moduleConfig);
  70. }
  71. }
  72. }
  73. //<dubbo:registry address="zookeeper://127.0.0.1:2181"/>注册中心配置,用于配置连接注册中心相关信息。
  74. if ((getRegistries() == null || getRegistries().isEmpty())
  75. && (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().isEmpty())
  76. && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {
  77. Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);
  78. if (registryConfigMap != null && registryConfigMap.size() > 0) {
  79. List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();
  80. for (RegistryConfig config : registryConfigMap.values()) {
  81. if (config.isDefault() == null || config.isDefault().booleanValue()) {
  82. registryConfigs.add(config);
  83. }
  84. }
  85. if (registryConfigs != null && !registryConfigs.isEmpty()) {
  86. super.setRegistries(registryConfigs);
  87. }
  88. }
  89. }
  90. //
  91. if (getMonitor() == null
  92. && (getProvider() == null || getProvider().getMonitor() == null)
  93. && (getApplication() == null || getApplication().getMonitor() == null)) {
  94. Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);
  95. if (monitorConfigMap != null && monitorConfigMap.size() > 0) {
  96. MonitorConfig monitorConfig = null;
  97. for (MonitorConfig config : monitorConfigMap.values()) {
  98. if (config.isDefault() == null || config.isDefault().booleanValue()) {
  99. if (monitorConfig != null) {
  100. throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);
  101. }
  102. monitorConfig = config;
  103. }
  104. }
  105. if (monitorConfig != null) {
  106. setMonitor(monitorConfig);
  107. }
  108. }
  109. }
  110. //<dubbo:protocol name="dubbo" port="20880"/> 服务提供者协议相关的配置
  111. if ((getProtocols() == null || getProtocols().isEmpty())
  112. && (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().isEmpty())) {
  113. Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false);
  114. if (protocolConfigMap != null && protocolConfigMap.size() > 0) {
  115. List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>();
  116. for (ProtocolConfig config : protocolConfigMap.values()) {
  117. if (config.isDefault() == null || config.isDefault().booleanValue()) {
  118. protocolConfigs.add(config);
  119. }
  120. }
  121. if (protocolConfigs != null && !protocolConfigs.isEmpty()) {
  122. super.setProtocols(protocolConfigs);
  123. }
  124. }
  125. }
  126. //
  127. if (getPath() == null || getPath().length() == 0) {
  128. if (beanName != null && beanName.length() > 0
  129. && getInterface() != null && getInterface().length() > 0
  130. && beanName.startsWith(getInterface())) {
  131. setPath(beanName);
  132. }
  133. }
  134. if (!isDelay()) {
  135. export();//暴露服务
  136. }
  137. }

export()方法是在父类ServiceConfig中实现的,提供了服务暴露可以延时的功能。

  1. public synchronized void export() {
  2. if (provider != null) {
  3. if (export == null) {
  4. export = provider.getExport();
  5. }
  6. if (delay == null) {
  7. delay = provider.getDelay();
  8. }
  9. }
  10. if (export != null && !export) {
  11. return;
  12. }
  13. //可以设置服务暴露延迟时间
  14. if (delay != null && delay > 0) {
  15. delayExportExecutor.schedule(new Runnable() {
  16. public void run() {
  17. doExport();
  18. }
  19. }, delay, TimeUnit.MILLISECONDS);
  20. } else {
  21. doExport();
  22. }
  23. }

在doExport中会初始化服务提供者相关的协议、注册中心、端口等相关信息,并在doExportUrls()中完成相关服务暴露的url信息进行暴露。

  1. //组装服务暴露的相关接口信息,暴露相关服务
  2. protected synchronized void doExport() {
  3. if (unexported) {
  4. throw new IllegalStateException("Already unexported!");
  5. }
  6. if (exported) {
  7. return;
  8. }
  9. exported = true;
  10. if (interfaceName == null || interfaceName.length() == 0) {
  11. throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
  12. }
  13. checkDefault();
  14. if (provider != null) {
  15. if (application == null) {
  16. application = provider.getApplication();
  17. }
  18. if (module == null) {
  19. module = provider.getModule();
  20. }
  21. if (registries == null) {
  22. registries = provider.getRegistries();
  23. }
  24. if (monitor == null) {
  25. monitor = provider.getMonitor();
  26. }
  27. if (protocols == null) {
  28. protocols = provider.getProtocols();
  29. }
  30. }
  31. if (module != null) {
  32. if (registries == null) {
  33. registries = module.getRegistries();
  34. }
  35. if (monitor == null) {
  36. monitor = module.getMonitor();
  37. }
  38. }
  39. if (application != null) {
  40. if (registries == null) {
  41. registries = application.getRegistries();
  42. }
  43. if (monitor == null) {
  44. monitor = application.getMonitor();
  45. }
  46. }
  47. if (ref instanceof GenericService) {
  48. interfaceClass = GenericService.class;
  49. if (StringUtils.isEmpty(generic)) {
  50. generic = Boolean.TRUE.toString();
  51. }
  52. } else {
  53. try {
  54. interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
  55. .getContextClassLoader());
  56. } catch (ClassNotFoundException e) {
  57. throw new IllegalStateException(e.getMessage(), e);
  58. }
  59. checkInterfaceAndMethods(interfaceClass, methods);
  60. checkRef();
  61. generic = Boolean.FALSE.toString();
  62. }
  63. if (local != null) {
  64. if ("true".equals(local)) {
  65. local = interfaceName + "Local";
  66. }
  67. Class<?> localClass;
  68. try {
  69. localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
  70. } catch (ClassNotFoundException e) {
  71. throw new IllegalStateException(e.getMessage(), e);
  72. }
  73. if (!interfaceClass.isAssignableFrom(localClass)) {
  74. throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
  75. }
  76. }
  77. if (stub != null) {
  78. if ("true".equals(stub)) {
  79. stub = interfaceName + "Stub";
  80. }
  81. Class<?> stubClass;
  82. try {
  83. stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
  84. } catch (ClassNotFoundException e) {
  85. throw new IllegalStateException(e.getMessage(), e);
  86. }
  87. if (!interfaceClass.isAssignableFrom(stubClass)) {
  88. throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
  89. }
  90. }
  91. checkApplication();
  92. checkRegistry();
  93. checkProtocol();
  94. appendProperties(this);
  95. checkStubAndMock(interfaceClass);
  96. if (path == null || path.length() == 0) {
  97. path = interfaceName;
  98. }
  99. doExportUrls();//暴露服务相关url地址信息
  100. ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
  101. ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
  102. }

在doExportUrls()中如果服务提供者提供多个协议暴露服务时依次暴露服务到注册中心中。

  1. private void doExportUrls() {
  2. List<URL> registryURLs = loadRegistries(true);
  3. for (ProtocolConfig protocolConfig : protocols) {
  4. doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  5. }
  6. }

doExportUrlsFor1Protocol中会初始化接口、接口方法及参数信息通过通过protocol.export暴露服务并注册到注册中心

  1. //完成接口暴露信息的组装,接口信息、接口方法信息及参数信息等,通过protocol.export暴露服务并注册到注册中心
  2. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  3. String name = protocolConfig.getName();
  4. if (name == null || name.length() == 0) {
  5. name = "dubbo";
  6. }
  7. Map<String, String> map = new HashMap<String, String>();
  8. map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
  9. map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
  10. map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
  11. if (ConfigUtils.getPid() > 0) {
  12. map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
  13. }
  14. appendParameters(map, application);
  15. appendParameters(map, module);
  16. appendParameters(map, provider, Constants.DEFAULT_KEY);
  17. appendParameters(map, protocolConfig);
  18. appendParameters(map, this);
  19. if (methods != null && !methods.isEmpty()) {
  20. for (MethodConfig method : methods) {
  21. appendParameters(map, method, method.getName());
  22. String retryKey = method.getName() + ".retry";
  23. if (map.containsKey(retryKey)) {
  24. String retryValue = map.remove(retryKey);
  25. if ("false".equals(retryValue)) {
  26. map.put(method.getName() + ".retries", "0");
  27. }
  28. }
  29. List<ArgumentConfig> arguments = method.getArguments();
  30. if (arguments != null && !arguments.isEmpty()) {
  31. for (ArgumentConfig argument : arguments) {
  32. // convert argument type
  33. if (argument.getType() != null && argument.getType().length() > 0) {
  34. Method[] methods = interfaceClass.getMethods();
  35. // visit all methods
  36. if (methods != null && methods.length > 0) {
  37. for (int i = 0; i < methods.length; i++) {
  38. String methodName = methods[i].getName();
  39. // target the method, and get its signature
  40. if (methodName.equals(method.getName())) {
  41. Class<?>[] argtypes = methods[i].getParameterTypes();
  42. // one callback in the method
  43. if (argument.getIndex() != -1) {
  44. if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
  45. appendParameters(map, argument, method.getName() + "." + argument.getIndex());
  46. } else {
  47. throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
  48. }
  49. } else {
  50. // multiple callbacks in the method
  51. for (int j = 0; j < argtypes.length; j++) {
  52. Class<?> argclazz = argtypes[j];
  53. if (argclazz.getName().equals(argument.getType())) {
  54. appendParameters(map, argument, method.getName() + "." + j);
  55. if (argument.getIndex() != -1 && argument.getIndex() != j) {
  56. throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
  57. }
  58. }
  59. }
  60. }
  61. }
  62. }
  63. }
  64. } else if (argument.getIndex() != -1) {
  65. appendParameters(map, argument, method.getName() + "." + argument.getIndex());
  66. } else {
  67. throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
  68. }
  69. }
  70. }
  71. } // end of methods for
  72. }
  73. if (ProtocolUtils.isGeneric(generic)) {
  74. map.put("generic", generic);
  75. map.put("methods", Constants.ANY_VALUE);
  76. } else {
  77. String revision = Version.getVersion(interfaceClass, version);
  78. if (revision != null && revision.length() > 0) {
  79. map.put("revision", revision);
  80. }
  81. String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
  82. if (methods.length == 0) {
  83. logger.warn("NO method found in service interface " + interfaceClass.getName());
  84. map.put("methods", Constants.ANY_VALUE);
  85. } else {
  86. map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
  87. }
  88. }
  89. if (!ConfigUtils.isEmpty(token)) {
  90. if (ConfigUtils.isDefault(token)) {
  91. map.put("token", UUID.randomUUID().toString());
  92. } else {
  93. map.put("token", token);
  94. }
  95. }
  96. if ("injvm".equals(protocolConfig.getName())) {
  97. protocolConfig.setRegister(false);
  98. map.put("notify", "false");
  99. }
  100. // export service
  101. String contextPath = protocolConfig.getContextpath();
  102. if ((contextPath == null || contextPath.length() == 0) && provider != null) {
  103. contextPath = provider.getContextpath();
  104. }
  105. String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
  106. Integer port = this.findConfigedPorts(protocolConfig, name, map);
  107. URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
  108. if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
  109. .hasExtension(url.getProtocol())) {
  110. url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
  111. .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
  112. }
  113. String scope = url.getParameter(Constants.SCOPE_KEY);
  114. // don't export when none is configured
  115. if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
  116. // export to local if the config is not remote (export to remote only when config is remote)
  117. if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
  118. exportLocal(url);
  119. }
  120. // export to remote if the config is not local (export to local only when config is local)
  121. if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
  122. if (logger.isInfoEnabled()) {
  123. logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
  124. }
  125. if (registryURLs != null && !registryURLs.isEmpty()) {
  126. for (URL registryURL : registryURLs) {
  127. url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
  128. URL monitorUrl = loadMonitor(registryURL);
  129. if (monitorUrl != null) {
  130. url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
  131. }
  132. if (logger.isInfoEnabled()) {
  133. logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
  134. }
  135. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  136. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  137. Exporter<?> exporter = protocol.export(wrapperInvoker);
  138. exporters.add(exporter);
  139. }
  140. } else {
  141. Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
  142. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  143. Exporter<?> exporter = protocol.export(wrapperInvoker);
  144. exporters.add(exporter);
  145. }
  146. }
  147. }
  148. this.urls.add(url);
  149. }

RegistryProtocol暴露服务过程

这里传入的Invoker是由RegistryUrl从ProxyFactory得到的Invoker

  1. 从Invoker获取providerUrl,在获取cacheKey, 根据cacheKey获取本地缓存的ExporterChangeableWrapper(exporter代理,建立返回的exporter与protocol export出的exporter的对应关系), 如果存在返回。

  2. 如果不存在,根据传入的 Invoker获取providerUrl, 在构建InvokerDelegete(originInvoker, providerUrl)

  3. Protocol.exprot(invokerDelegete) 根据providerUrl 的协议(一般是dubbo协议)通过Protocol的设配类暴露务,得到exporter

  4. 利用providerUr导出的exporter和invoker构建对象ExporterChangeableWrapper缓存到本地

  5. 由Invoker得到registryUrl。

    在根据registryUrl从RegistryFactory获取Registry, 获取RegistryUrl的注册中心协议,这里我们拿zooKeeper协议为例。由dubbo的扩展机制得到的是ZookeeperRegistryFactory,得到注册器为ZookeeperRegistry

  6. 由Invoker获取ProviderUrl在去除不需要在注册中心看到的字段得到registryProviderUrl

  7. 注册中心(ZookeeperRegistry)注册registryProviderUrl

Registry.register(registryProviderUrl)

  1. 由registryProviderUrl获取overrideSubscribeUrl,在构建OverrideListener

  2. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener) 注册中心订阅这个url, 用来当数据变化通知重新暴露, 哪zookeeper为例,暴露服务会在zookeeper生成一个节点,当节点发生变化的时候会触发overrideSubscribeListener的notify方法重新暴露服务

  3. 构建并返回一个新的exporter实例

DubboProtocol暴露服务的过程

  1. 从invoker获取统一数据模型url

  2. 由url构建serviceKey(一般由端口,接口名,版本,group分组)

如:com.alibaba.dubbo.demo.DemoService:20880 这个是由接口和端口组成的

  1. 构建DubboExporter放入本地map做缓存

  2. 根据url openserver。 查找本地缓存以key为url.getAddress如果没有ExchangeServer创建。设置heartbeat时间,设置编码解码协议

根据url和ExchangeHandler 绑定server并返回(具体如何绑定专题介绍)

  1. 返回DubboExporter对象

官方文档服务发布序列图

Center

发布活动图

Center 1

参考:https://blog.csdn.net/quhongwei\_zhanqiu/article/details/41651205

发表评论

表情:
评论列表 (有 0 条评论,292人围观)

还没有评论,来说两句吧...

相关阅读