dubbo源码之服务暴露

Love The Way You Lie 2022-12-06 12:56 264阅读 0赞

Dubbo 作为rpc 封装的典型框架,我们其实要了解一个rpc调用大致过程,dubbo可以作为一个比较好学习资料。

在这里插入图片描述

这个是dubbo 调用的流程图,这篇文件主要要介绍第一步中的服务暴露过程,这次使用的源码为2.7.1

服务暴露过程

这里先将具体暴露过程贴出来,配合源码进行分析

在这里插入图片描述

我们在看源码的时候需要一个入口,dubbo的入口可以从ServiceBean 开始

  1. public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean,
  2. DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>,
  3. BeanNameAware, ApplicationEventPublisherAware {

这里涉及到Spring 相关内容了,主要是实现ApplicationContextAware、InitializingBean等接口。这两个是在spring生命周期中会先后调用,主要进行是一些初始化的操作,我们可以不关心。同时它实现了 ApplicationListener,这个是关于spring的监听器内容。这会在 Spring IOC 容器刷新完成后调用 onApplicationEvent 方法,而这个方法里面做的就是服务暴露,这就是服务暴露的启动点。

ServiceBean 中onApplicationEvent方法

  1. public void onApplicationEvent(ContextRefreshedEvent event) {
  2. //是否没暴露过
  3. if (!this.isExported() && !this.isUnexported()) {
  4. if (logger.isInfoEnabled()) {
  5. logger.info("The service ready on spring started. service: " + this.getInterface());
  6. }
  7. //如果不是延迟暴露,并且没有暴露过,直接调用暴露方法
  8. this.export();
  9. }
  10. }

直接调用,父类暴露方法
在这里插入图片描述
父类的serviceConfig的export方法

  1. public synchronized void export() {
  2. //检查更新配置
  3. this.checkAndUpdateSubConfigs();
  4. //判断是否需要暴露,并且是延迟暴露,如果延迟暴露通过延迟线程池进行暴露
  5. if (this.shouldExport()) {
  6. if (this.shouldDelay()) {
  7. delayExportExecutor.schedule(this::doExport, (long)this.delay, TimeUnit.MILLISECONDS);
  8. } else {
  9. //调用方法直接暴露
  10. this.doExport();
  11. }
  12. }
  13. }

serviceConfig的doExport方法在这里插入图片描述

父类的serviceConfig的doExportUrls方法

  1. private void doExportUrls() {
  2. //1、加载注册信息,构造url
  3. List<URL> registryURLs = this.loadRegistries(true);
  4. Iterator var2 = this.protocols.iterator();
  5. while(var2.hasNext()) {
  6. ProtocolConfig protocolConfig = (ProtocolConfig)var2.next();
  7. String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
  8. return p + "/" + this.path;
  9. }).orElse(this.path), this.group, this.version);
  10. ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass);
  11. ApplicationModel.initProviderModel(pathKey, providerModel);
  12. //根据 URL 来进行服务暴露了
  13. this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
  14. }
  15. }
  16. private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  17. String name = protocolConfig.getName();
  18. if (StringUtils.isEmpty(name)) {
  19. name = "dubbo";
  20. }
  21. //将参数塞进map,构建url
  22. Map<String, String> map = new HashMap();
  23. map.put("side", "provider");
  24. appendRuntimeParameters(map);
  25. appendParameters(map, this.application);
  26. appendParameters(map, this.module);
  27. appendParameters(map, this.provider, "default");
  28. appendParameters(map, protocolConfig);
  29. appendParameters(map, this);
  30. String scope;
  31. Iterator metadataReportService;
  32. if (CollectionUtils.isNotEmpty(this.methods)) {
  33. Iterator var5 = this.methods.iterator();
  34. label161:
  35. while(true) {
  36. MethodConfig method;
  37. List arguments;
  38. do {
  39. if (!var5.hasNext()) {
  40. break label161;
  41. }
  42. method = (MethodConfig)var5.next();
  43. appendParameters(map, method, method.getName());
  44. String retryKey = method.getName() + ".retry";
  45. if (map.containsKey(retryKey)) {
  46. scope = (String)map.remove(retryKey);
  47. if ("false".equals(scope)) {
  48. map.put(method.getName() + ".retries", "0");
  49. }
  50. }
  51. arguments = method.getArguments();
  52. } while(!CollectionUtils.isNotEmpty(arguments));
  53. metadataReportService = arguments.iterator();
  54. //... 省略部分代码(关于一些参数的判断)
  55. }
  56. }
  57. //..省略部分代码
  58. if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
  59. url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
  60. }
  61. //获取scope为null,会进入判断进行本地暴露
  62. scope = url.getParameter("scope");
  63. if (!"none".equalsIgnoreCase(scope)) {
  64. if (!"remote".equalsIgnoreCase(scope)) {
  65. //进行本地暴露,默认javassist 代理对象,由于本地暴露和远程暴露流程大致一样,不再展开
  66. this.exportLocal(url);
  67. }
  68. if (!"local".equalsIgnoreCase(scope)) {
  69. if (logger.isInfoEnabled()) {
  70. logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url);
  71. }
  72. if (CollectionUtils.isNotEmpty(registryURLs)) {
  73. metadataReportService = registryURLs.iterator();
  74. while(metadataReportService.hasNext()) {
  75. URL registryURL = (URL)metadataReportService.next();
  76. url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
  77. URL monitorUrl = this.loadMonitor(registryURL);
  78. if (monitorUrl != null) {
  79. url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
  80. }
  81. if (logger.isInfoEnabled()) {
  82. logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
  83. }
  84. String proxy = url.getParameter("proxy");
  85. if (StringUtils.isNotEmpty(proxy)) {
  86. registryURL = registryURL.addParameter("proxy", proxy);
  87. }
  88. //通过proxyFactory 将接口封装为invoker 的代理
  89. Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
  90. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  91. //根据协议对象进行远程暴露
  92. Exporter<?> exporter = protocol.export(wrapperInvoker);
  93. this.exporters.add(exporter);
  94. }
  95. } else {
  96. Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
  97. DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
  98. Exporter<?> exporter = protocol.export(wrapperInvoker);
  99. this.exporters.add(exporter);
  100. }
  101. metadataReportService = null;
  102. MetadataReportService metadataReportService;
  103. if ((metadataReportService = this.getMetadataReportService()) != null) {
  104. metadataReportService.publishProvider(url);
  105. }
  106. }
  107. }
  108. this.urls.add(url);
  109. }

在这里插入图片描述

远程暴露

这个是远程暴露的大致流程
在这里插入图片描述

由于目前协议是registery,调用registeryProtocol#export 进行暴露

  1. public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
  2. URL registryUrl = this.getRegistryUrl(originInvoker);
  3. URL providerUrl = this.getProviderUrl(originInvoker);
  4. URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl);
  5. RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
  6. this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  7. providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
  8. RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl);
  9. //获取注册中心的URL
  10. Registry registry = this.getRegistry(originInvoker);
  11. // 获取注册服务提供者的URL
  12. URL registeredProviderUrl = this.getRegisteredProviderUrl(providerUrl, registryUrl);
  13. //将提供者的信息注册到服务提供和消费者注册表中
  14. ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
  15. boolean register = registeredProviderUrl.getParameter("register", true);
  16. //如果需要注册
  17. if (register) {
  18. //想注册中心注册服务,这里是向zookeeper注册
  19. this.register(registryUrl, registeredProviderUrl);
  20. providerInvokerWrapper.setReg(true);
  21. }
  22. //获取订阅url
  23. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  24. exporter.setRegisterUrl(registeredProviderUrl);
  25. exporter.setSubscribeUrl(overrideSubscribeUrl);
  26. return new RegistryProtocol.DestroyableExporter(exporter);
  27. }

调用registeryProtocol#export 之后会再调用DubboProtocol#export进行暴露

  1. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  2. //获取url
  3. URL url = invoker.getUrl();
  4. //这里构建的时候serverMap的key
  5. String key = serviceKey(url);
  6. DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
  7. this.exporterMap.put(key, exporter);
  8. Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
  9. Boolean isCallbackservice = url.getParameter("is_callback_service", false);
  10. if (isStubSupportEvent && !isCallbackservice) {
  11. String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
  12. if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
  13. this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
  14. } else if (this.logger.isWarnEnabled()) {
  15. this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
  16. }
  17. }
  18. //如果没有server创建server方法
  19. this.openServer(url);
  20. this.optimizeSerialization(url);
  21. return exporter;
  22. }
  23. private void openServer(URL url) {
  24. String key = url.getAddress();
  25. boolean isServer = url.getParameter("isserver", true);
  26. if (isServer) {
  27. ExchangeServer server = (ExchangeServer)this.serverMap.get(key);
  28. if (server == null) {
  29. synchronized(this) {
  30. server = (ExchangeServer)this.serverMap.get(key);
  31. if (server == null) {
  32. //如果构建server为空,则创建server(这个方法是创建Netty监听器,这里不再跟了)
  33. this.serverMap.put(key, this.createServer(url));
  34. }
  35. }
  36. } else {
  37. server.reset(url);
  38. }
  39. }
  40. }

可以看到将provider信息已经注册到zookeeper节点上了
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,264人围观)

还没有评论,来说两句吧...

相关阅读