dubbo源码之服务暴露
Dubbo 作为rpc 封装的典型框架,我们其实要了解一个rpc调用大致过程,dubbo可以作为一个比较好学习资料。
这个是dubbo 调用的流程图,这篇文件主要要介绍第一步中的服务暴露过程,这次使用的源码为2.7.1
服务暴露过程
这里先将具体暴露过程贴出来,配合源码进行分析
我们在看源码的时候需要一个入口,dubbo的入口可以从ServiceBean 开始
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean,
DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>,
BeanNameAware, ApplicationEventPublisherAware {
这里涉及到Spring 相关内容了,主要是实现ApplicationContextAware、InitializingBean等接口。这两个是在spring生命周期中会先后调用,主要进行是一些初始化的操作,我们可以不关心。同时它实现了 ApplicationListener,这个是关于spring的监听器内容。这会在 Spring IOC 容器刷新完成后调用 onApplicationEvent 方法,而这个方法里面做的就是服务暴露,这就是服务暴露的启动点。
ServiceBean 中onApplicationEvent方法
public void onApplicationEvent(ContextRefreshedEvent event) {
//是否没暴露过
if (!this.isExported() && !this.isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + this.getInterface());
}
//如果不是延迟暴露,并且没有暴露过,直接调用暴露方法
this.export();
}
}
直接调用,父类暴露方法
父类的serviceConfig的export方法
public synchronized void export() {
//检查更新配置
this.checkAndUpdateSubConfigs();
//判断是否需要暴露,并且是延迟暴露,如果延迟暴露通过延迟线程池进行暴露
if (this.shouldExport()) {
if (this.shouldDelay()) {
delayExportExecutor.schedule(this::doExport, (long)this.delay, TimeUnit.MILLISECONDS);
} else {
//调用方法直接暴露
this.doExport();
}
}
}
serviceConfig的doExport方法
父类的serviceConfig的doExportUrls方法
private void doExportUrls() {
//1、加载注册信息,构造url
List<URL> registryURLs = this.loadRegistries(true);
Iterator var2 = this.protocols.iterator();
while(var2.hasNext()) {
ProtocolConfig protocolConfig = (ProtocolConfig)var2.next();
String pathKey = URL.buildKey((String)this.getContextPath(protocolConfig).map((p) -> {
return p + "/" + this.path;
}).orElse(this.path), this.group, this.version);
ProviderModel providerModel = new ProviderModel(pathKey, this.ref, this.interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//根据 URL 来进行服务暴露了
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = "dubbo";
}
//将参数塞进map,构建url
Map<String, String> map = new HashMap();
map.put("side", "provider");
appendRuntimeParameters(map);
appendParameters(map, this.application);
appendParameters(map, this.module);
appendParameters(map, this.provider, "default");
appendParameters(map, protocolConfig);
appendParameters(map, this);
String scope;
Iterator metadataReportService;
if (CollectionUtils.isNotEmpty(this.methods)) {
Iterator var5 = this.methods.iterator();
label161:
while(true) {
MethodConfig method;
List arguments;
do {
if (!var5.hasNext()) {
break label161;
}
method = (MethodConfig)var5.next();
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
scope = (String)map.remove(retryKey);
if ("false".equals(scope)) {
map.put(method.getName() + ".retries", "0");
}
}
arguments = method.getArguments();
} while(!CollectionUtils.isNotEmpty(arguments));
metadataReportService = arguments.iterator();
、//... 省略部分代码(关于一些参数的判断)
}
}
//..省略部分代码
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {
url = ((ConfiguratorFactory)ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol())).getConfigurator(url).configure(url);
}
//获取scope为null,会进入判断进行本地暴露
scope = url.getParameter("scope");
if (!"none".equalsIgnoreCase(scope)) {
if (!"remote".equalsIgnoreCase(scope)) {
//进行本地暴露,默认javassist 代理对象,由于本地暴露和远程暴露流程大致一样,不再展开
this.exportLocal(url);
}
if (!"local".equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + this.interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
metadataReportService = registryURLs.iterator();
while(metadataReportService.hasNext()) {
URL registryURL = (URL)metadataReportService.next();
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = this.loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded("monitor", monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + this.interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
String proxy = url.getParameter("proxy");
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter("proxy", proxy);
}
//通过proxyFactory 将接口封装为invoker 的代理
Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//根据协议对象进行远程暴露
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
metadataReportService = null;
MetadataReportService metadataReportService;
if ((metadataReportService = this.getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
远程暴露
这个是远程暴露的大致流程
由于目前协议是registery,调用registeryProtocol#export 进行暴露
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
URL registryUrl = this.getRegistryUrl(originInvoker);
URL providerUrl = this.getProviderUrl(originInvoker);
URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl);
RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl);
//获取注册中心的URL
Registry registry = this.getRegistry(originInvoker);
// 获取注册服务提供者的URL
URL registeredProviderUrl = this.getRegisteredProviderUrl(providerUrl, registryUrl);
//将提供者的信息注册到服务提供和消费者注册表中
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
boolean register = registeredProviderUrl.getParameter("register", true);
//如果需要注册
if (register) {
//想注册中心注册服务,这里是向zookeeper注册
this.register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
//获取订阅url
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
return new RegistryProtocol.DestroyableExporter(exporter);
}
调用registeryProtocol#export 之后会再调用DubboProtocol#export进行暴露
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//获取url
URL url = invoker.getUrl();
//这里构建的时候serverMap的key
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
this.exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
Boolean isCallbackservice = url.getParameter("is_callback_service", false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
} else if (this.logger.isWarnEnabled()) {
this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
}
}
//如果没有server创建server方法
this.openServer(url);
this.optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter("isserver", true);
if (isServer) {
ExchangeServer server = (ExchangeServer)this.serverMap.get(key);
if (server == null) {
synchronized(this) {
server = (ExchangeServer)this.serverMap.get(key);
if (server == null) {
//如果构建server为空,则创建server(这个方法是创建Netty监听器,这里不再跟了)
this.serverMap.put(key, this.createServer(url));
}
}
} else {
server.reset(url);
}
}
}
可以看到将provider信息已经注册到zookeeper节点上了
还没有评论,来说两句吧...