Dubbo源码解析之客户端初始化及服务调用

一时失言乱红尘 2022-04-22 11:02 285阅读 0赞

准备

dubbo 版本:2.5.4

客户端初始化过程

初始化过程

先上时序图,帮助理解客户端初始化过程。
Dubbo客户端初始化

ReferenceBean 是客户端初始化入口,其实现 InitializingBean 接口,在 bean 初始化过程中会调用其 afterPropertiesSet 方法,进而调用 getObject() -> get() -> init() ,之后再调用 ReferenceConfigcreateProxy() 方法。

ReferenceConfig

  1. private T createProxy(Map<String, String> map) {
  2. URL tmpUrl = new URL("temp", "localhost", 0, map);
  3. final boolean isJvmRefer;
  4. if (isInjvm() == null) {
  5. if (url != null && url.length() > 0) { // 指定URL的情况下,不做本地引用
  6. isJvmRefer = false;
  7. } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
  8. // 默认情况下如果本地有服务暴露,则引用本地服务
  9. isJvmRefer = true;
  10. } else {
  11. isJvmRefer = false;
  12. }
  13. } else {
  14. isJvmRefer = isInjvm().booleanValue();
  15. }
  16. if (isJvmRefer) {
  17. URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
  18. invoker = refprotocol.refer(interfaceClass, url);
  19. if (logger.isInfoEnabled()) {
  20. logger.info("Using injvm service " + interfaceClass.getName());
  21. }
  22. } else {
  23. // 用户自己指定URL,可能是点对点直连地址,也可能是注册中心URL
  24. if (url != null && url.length() > 0) {
  25. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
  26. if (us != null && us.length > 0) {
  27. for (String u : us) {
  28. URL url = URL.valueOf(u);
  29. if (url.getPath() == null || url.getPath().length() == 0) {
  30. url = url.setPath(interfaceName);
  31. }
  32. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  33. urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
  34. } else {
  35. urls.add(ClusterUtils.mergeUrl(url, map));
  36. }
  37. }
  38. }
  39. } else {
  40. // 通过注册中心配置拼装URL
  41. List<URL> us = loadRegistries(false); // 从注册中心上获得相应的协议url地址
  42. if (us != null && us.size() > 0) {
  43. for (URL u : us) {
  44. URL monitorUrl = loadMonitor(u);
  45. if (monitorUrl != null) {
  46. map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
  47. }
  48. urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
  49. }
  50. }
  51. if (urls == null || urls.size() == 0) {
  52. throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
  53. }
  54. }
  55. if (urls.size() == 1) {
  56. // refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
  57. // refprotocol -> Protocol$Adaptive
  58. invoker = refprotocol.refer(interfaceClass, urls.get(0));
  59. } else {
  60. List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
  61. URL registryURL = null;
  62. for (URL url : urls) {
  63. invokers.add(refprotocol.refer(interfaceClass, url));
  64. if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  65. registryURL = url; // 用了最后一个registry url
  66. }
  67. }
  68. // 注册中心地址URL不为空
  69. if (registryURL != null) {
  70. // 对有注册中心的Cluster只用AvailableCluster
  71. URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
  72. // 返回的是MockClusterInvoker(FailoverClusterInvoker)
  73. invoker = cluster.join(new StaticDirectory(u, invokers));
  74. } else {
  75. invoker = cluster.join(new StaticDirectory(invokers));
  76. }
  77. }
  78. }
  79. Boolean c = check;
  80. if (c == null && consumer != null) {
  81. c = consumer.isCheck();
  82. }
  83. if (c == null) {
  84. c = true; // default true
  85. }
  86. if (c && ! invoker.isAvailable()) {
  87. throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
  88. }
  89. if (logger.isInfoEnabled()) {
  90. logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
  91. }
  92. // 创建服务代理
  93. return (T) proxyFactory.getProxy(invoker);
  94. }

Protocol$Adpative

  1. public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
  2. public void destroy() {
  3. throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  4. }
  5. public int getDefaultPort() {
  6. throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  7. }
  8. public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
  9. if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
  10. if (arg0.getUrl() == null)
  11. throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
  12. com.alibaba.dubbo.common.URL url = arg0.getUrl();
  13. String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
  14. if (extName == null)
  15. throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
  16. com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
  17. return extension.export(arg0);
  18. }
  19. public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
  20. if (arg1 == null) throw new IllegalArgumentException("url == null");
  21. com.alibaba.dubbo.common.URL url = arg1;
  22. String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
  23. if (extName == null)
  24. throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
  25. // extName -> registry
  26. // extension -> RegistryProtocol
  27. com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
  28. return extension.refer(arg0, arg1);
  29. }
  30. }

RegistryProtocol

  1. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  2. url = url.setProtocol(url.getParameter("registry", "dubbo")).removeParameter("registry");
  3. Registry registry = this.registryFactory.getRegistry(url);
  4. if (RegistryService.class.equals(type)) {
  5. return this.proxyFactory.getInvoker(registry, type, url);
  6. } else {
  7. Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded("refer"));
  8. String group = (String)qs.get("group");
  9. return group == null || group.length() <= 0 || Constants.COMMA_SPLIT_PATTERN.split(group).length <= 1 && !"*".equals(group) ? this.doRefer(this.cluster, registry, type, url) : this.doRefer(this.getMergeableCluster(), registry, type, url);
  10. }
  11. }

RegistryProtocol.doRefer方法存在一个参数 Cluster ,而 Cluster 是一个扩展点,存在加在方法级别上的 @Adaptive 注解,说明会动态生成自适应适配器( Cluster$Adaptive )。在 RegistryProtocol 中存在 Cluster 扩展点成员变量及 setter 方法,说明是一个自动注入的扩展点。

  1. @SPI("failover")
  2. public interface Cluster {
  3. @Adaptive
  4. <T> Invoker<T> join(Directory<T> var1) throws RpcException;
  5. }
  6. // RegistryProtocol
  7. private Cluster cluster;
  8. public void setCluster(Cluster cluster) {
  9. this.cluster = cluster;
  10. }

Cluster$Adpative

  1. public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
  2. public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
  3. if (arg0 == null)
  4. throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
  5. if (arg0.getUrl() == null)
  6. throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
  7. com.alibaba.dubbo.common.URL url = arg0.getUrl();
  8. String extName = url.getParameter("cluster", "failover");
  9. if (extName == null)
  10. throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
  11. com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
  12. return extension.join(arg0);
  13. }
  14. }

又因为 Cluster 扩展点实现中存在以扩展点作为参数的构造方法,所以会被 Wrapper 装饰,而该装饰器就是 MockClusterWrapper

  1. public class MockClusterWrapper implements Cluster {
  2. private Cluster cluster;
  3. public MockClusterWrapper(Cluster cluster) {
  4. this.cluster = cluster;
  5. }
  6. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  7. return new MockClusterInvoker(directory, this.cluster.join(directory));
  8. }
  9. }
  10. // RegistryProtocol
  11. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  12. RegistryDirectory<T> directory = new RegistryDirectory(type, url);
  13. directory.setRegistry(registry); // registry -> ZookeeperRegistry
  14. directory.setProtocol(this.protocol); // protocol -> Protocol$Adaptive
  15. URL subscribeUrl = new URL("consumer", NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
  16. if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
  17. // 注册consumer://协议地址到注册中心
  18. registry.register(subscribeUrl.addParameters(new String[]{ "category", "consumers", "check", String.valueOf(false)}));
  19. }
  20. // 注册zookeeper地址变更
  21. directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
  22. // 返回一个MockClusterInvoker(FailoverClusterInvoker)
  23. return cluster.join(directory);
  24. }

MockClusterWrapper

  1. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  2. return new MockClusterInvoker(directory, this.cluster.join(directory));
  3. }

FailoverCluster

  1. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  2. return new FailoverClusterInvoker(directory);
  3. }

回到ReferenceConfig.createProxy中的proxyFactory.getProxy()

  1. private T createProxy(Map<String, String> map) {
  2. ......
  3. // 创建服务代理
  4. // proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
  5. // proxyFactory -> ProxyFactory$Adpative
  6. return (T) proxyFactory.getProxy(invoker);
  7. }

ProxyFactory$Adpative

  1. public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
  2. public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
  3. if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
  4. if (arg0.getUrl() == null)
  5. throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
  6. com.alibaba.dubbo.common.URL url = arg0.getUrl();
  7. String extName = url.getParameter("proxy", "javassist");
  8. if (extName == null)
  9. throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
  10. // extName -> javassist
  11. // extension -> StubProxyFactoryWrapper(JavassistProxyFactory)
  12. com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
  13. return extension.getProxy(arg0);
  14. }
  15. ......
  16. }

StubProxyFactoryWrapper

  1. public <T> T getProxy(Invoker<T> invoker) throws RpcException {
  2. T proxy = this.proxyFactory.getProxy(invoker);
  3. if (GenericService.class != invoker.getInterface()) {
  4. String stub = invoker.getUrl().getParameter("stub", invoker.getUrl().getParameter("local"));
  5. if (ConfigUtils.isNotEmpty(stub)) {
  6. Class<?> serviceType = invoker.getInterface();
  7. if (ConfigUtils.isDefault(stub)) {
  8. if (invoker.getUrl().hasParameter("stub")) {
  9. stub = serviceType.getName() + "Stub";
  10. } else {
  11. stub = serviceType.getName() + "Local";
  12. }
  13. }
  14. try {
  15. Class<?> stubClass = ReflectUtils.forName(stub);
  16. if (!serviceType.isAssignableFrom(stubClass)) {
  17. throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + serviceType.getName());
  18. }
  19. try {
  20. Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
  21. proxy = constructor.newInstance(proxy);
  22. URL url = invoker.getUrl();
  23. if (url.getParameter("dubbo.stub.event", false)) {
  24. url = url.addParameter("dubbo.stub.event.methods", StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
  25. url = url.addParameter("isserver", Boolean.FALSE.toString());
  26. try {
  27. this.export(proxy, invoker.getInterface(), url);
  28. } catch (Exception var9) {
  29. LOGGER.error("export a stub service error.", var9);
  30. }
  31. }
  32. } catch (NoSuchMethodException var10) {
  33. throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implemention class " + stubClass.getName(), var10);
  34. }
  35. } catch (Throwable var11) {
  36. LOGGER.error("Failed to create stub implemention class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + var11.getMessage(), var11);
  37. }
  38. }
  39. }
  40. return proxy;
  41. }

JavassistProxyFactory

  1. public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
  2. return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
  3. }

客户端与服务端创建连接过程

客户端与服务端创建连接时序图。
Dubbo客户端与服务端创建连接过程

RegistryProtocol

  1. private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  2. RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
  3. directory.setRegistry(registry);
  4. directory.setProtocol(protocol);
  5. URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
  6. if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
  7. && url.getParameter(Constants.REGISTER_KEY, true)) {
  8. registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
  9. Constants.CHECK_KEY, String.valueOf(false)));
  10. }
  11. // 客户端与服务端创建连接过程
  12. directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
  13. Constants.PROVIDERS_CATEGORY
  14. + "," + Constants.CONFIGURATORS_CATEGORY
  15. + "," + Constants.ROUTERS_CATEGORY));
  16. return cluster.join(directory);
  17. }

RegistryDirectory

  1. public void subscribe(URL url) {
  2. setConsumerUrl(url);
  3. registry.subscribe(url, this);
  4. }

FailbackRegistry

  1. public void subscribe(URL url, NotifyListener listener) {
  2. super.subscribe(url, listener);
  3. removeFailedSubscribed(url, listener);
  4. try {
  5. // 向服务器端发送订阅请求
  6. doSubscribe(url, listener);
  7. } catch (Exception e) {
  8. Throwable t = e;
  9. List<URL> urls = getCacheUrls(url);
  10. if (urls != null && urls.size() > 0) {
  11. notify(url, listener, urls);
  12. logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
  13. } else {
  14. // 如果开启了启动时检测,则直接抛出异常
  15. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
  16. && url.getParameter(Constants.CHECK_KEY, true);
  17. boolean skipFailback = t instanceof SkipFailbackWrapperException;
  18. if (check || skipFailback) {
  19. if(skipFailback) {
  20. t = t.getCause();
  21. }
  22. throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
  23. } else {
  24. logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  25. }
  26. }
  27. // 将失败的订阅请求记录到失败列表,定时重试
  28. addFailedSubscribed(url, listener);
  29. }
  30. }

ZookeeperRegistry

  1. protected void doSubscribe(final URL url, final NotifyListener listener) {
  2. try {
  3. if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  4. String root = toRootPath();
  5. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  6. if (listeners == null) {
  7. zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  8. listeners = zkListeners.get(url);
  9. }
  10. ChildListener zkListener = listeners.get(listener);
  11. if (zkListener == null) {
  12. listeners.putIfAbsent(listener, new ChildListener() {
  13. public void childChanged(String parentPath, List<String> currentChilds) {
  14. for (String child : currentChilds) {
  15. child = URL.decode(child);
  16. if (! anyServices.contains(child)) {
  17. anyServices.add(child);
  18. subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
  19. Constants.CHECK_KEY, String.valueOf(false)), listener);
  20. }
  21. }
  22. }
  23. });
  24. zkListener = listeners.get(listener);
  25. }
  26. zkClient.create(root, false);
  27. List<String> services = zkClient.addChildListener(root, zkListener);
  28. if (services != null && services.size() > 0) {
  29. for (String service : services) {
  30. service = URL.decode(service);
  31. anyServices.add(service);
  32. subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
  33. Constants.CHECK_KEY, String.valueOf(false)), listener);
  34. }
  35. }
  36. } else {
  37. List<URL> urls = new ArrayList<URL>();
  38. for (String path : toCategoriesPath(url)) {
  39. ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
  40. if (listeners == null) {
  41. zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
  42. listeners = zkListeners.get(url);
  43. }
  44. ChildListener zkListener = listeners.get(listener);
  45. if (zkListener == null) {
  46. listeners.putIfAbsent(listener, new ChildListener() {
  47. public void childChanged(String parentPath, List<String> currentChilds) {
  48. ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
  49. }
  50. });
  51. zkListener = listeners.get(listener);
  52. }
  53. zkClient.create(path, false);
  54. List<String> children = zkClient.addChildListener(path, zkListener);
  55. if (children != null) {
  56. urls.addAll(toUrlsWithEmpty(url, path, children));
  57. }
  58. }
  59. notify(url, listener, urls);
  60. }
  61. } catch (Throwable e) {
  62. throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
  63. }
  64. }

FailbackRegistry

  1. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  2. if (url == null) {
  3. throw new IllegalArgumentException("notify url == null");
  4. }
  5. if (listener == null) {
  6. throw new IllegalArgumentException("notify listener == null");
  7. }
  8. try {
  9. doNotify(url, listener, urls);
  10. } catch (Exception t) {
  11. // 将失败的通知请求记录到失败列表,定时重试
  12. Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
  13. if (listeners == null) {
  14. failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
  15. listeners = failedNotified.get(url);
  16. }
  17. listeners.put(listener, urls);
  18. logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
  19. }
  20. }
  21. protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
  22. super.notify(url, listener, urls);
  23. }

AbstractRegistry

  1. protected void notify(URL url, NotifyListener listener, List<URL> urls) {
  2. if (url == null) {
  3. throw new IllegalArgumentException("notify url == null");
  4. }
  5. if (listener == null) {
  6. throw new IllegalArgumentException("notify listener == null");
  7. }
  8. if ((urls == null || urls.size() == 0)
  9. && ! Constants.ANY_VALUE.equals(url.getServiceInterface())) {
  10. logger.warn("Ignore empty notify urls for subscribe url " + url);
  11. return;
  12. }
  13. if (logger.isInfoEnabled()) {
  14. logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
  15. }
  16. Map<String, List<URL>> result = new HashMap<String, List<URL>>();
  17. for (URL u : urls) {
  18. if (UrlUtils.isMatch(url, u)) {
  19. String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  20. List<URL> categoryList = result.get(category);
  21. if (categoryList == null) {
  22. categoryList = new ArrayList<URL>();
  23. result.put(category, categoryList);
  24. }
  25. categoryList.add(u);
  26. }
  27. }
  28. if (result.size() == 0) {
  29. return;
  30. }
  31. Map<String, List<URL>> categoryNotified = notified.get(url);
  32. if (categoryNotified == null) {
  33. notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
  34. categoryNotified = notified.get(url);
  35. }
  36. // 对providers、configurators、routers路径下的进行更新
  37. for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
  38. String category = entry.getKey();
  39. List<URL> categoryList = entry.getValue();
  40. categoryNotified.put(category, categoryList);
  41. saveProperties(url);
  42. listener.notify(categoryList);
  43. }
  44. }

RegistryDirectory

  1. public synchronized void notify(List<URL> urls) {
  2. List<URL> invokerUrls = new ArrayList<URL>();
  3. List<URL> routerUrls = new ArrayList<URL>();
  4. List<URL> configuratorUrls = new ArrayList<URL>();
  5. for (URL url : urls) {
  6. String protocol = url.getProtocol();
  7. String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
  8. if (Constants.ROUTERS_CATEGORY.equals(category)
  9. || Constants.ROUTE_PROTOCOL.equals(protocol)) {
  10. routerUrls.add(url);
  11. } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
  12. || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
  13. configuratorUrls.add(url);
  14. } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
  15. invokerUrls.add(url);
  16. } else {
  17. logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
  18. }
  19. }
  20. // configurators
  21. if (configuratorUrls != null && configuratorUrls.size() >0 ){
  22. this.configurators = toConfigurators(configuratorUrls);
  23. }
  24. // routers
  25. if (routerUrls != null && routerUrls.size() >0 ){
  26. List<Router> routers = toRouters(routerUrls);
  27. if(routers != null){ // null - do nothing
  28. setRouters(routers);
  29. }
  30. }
  31. List<Configurator> localConfigurators = this.configurators; // local reference
  32. // 合并override参数
  33. this.overrideDirectoryUrl = directoryUrl;
  34. if (localConfigurators != null && localConfigurators.size() > 0) {
  35. for (Configurator configurator : localConfigurators) {
  36. this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
  37. }
  38. }
  39. // providers
  40. // 刷新Invoker>>
  41. refreshInvoker(invokerUrls);
  42. }
  43. /** * 根据invokerURL列表转换为invoker列表。转换规则如下: * 1.如果url已经被转换为invoker,则不在重新引用,直接从缓存中获取,注意如果url中任何一个参数变更也会重新引用 * 2.如果传入的invoker列表不为空,则表示最新的invoker列表 * 3.如果传入的invokerUrl列表是空,则表示只是下发的override规则或route规则,需要重新交叉对比,决定是否需要重新引用。 * @param invokerUrls 传入的参数不能为null */
  44. // RegistryDirectory
  45. private void refreshInvoker(List<URL> invokerUrls){
  46. if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
  47. && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
  48. this.forbidden = true; // 禁止访问
  49. this.methodInvokerMap = null; // 置空列表
  50. destroyAllInvokers(); // 关闭所有Invoker
  51. } else {
  52. this.forbidden = false; // 允许访问
  53. Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
  54. if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
  55. invokerUrls.addAll(this.cachedInvokerUrls);
  56. } else {
  57. this.cachedInvokerUrls = new HashSet<URL>();
  58. this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
  59. }
  60. if (invokerUrls.size() ==0 ){
  61. return;
  62. }
  63. Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
  64. Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
  65. // state change
  66. //如果计算错误,则不进行处理.
  67. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
  68. logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
  69. return ;
  70. }
  71. this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
  72. this.urlInvokerMap = newUrlInvokerMap;
  73. try{
  74. destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
  75. }catch (Exception e) {
  76. logger.warn("destroyUnusedInvokers error. ", e);
  77. }
  78. }
  79. }
  80. // RegistryDirectory
  81. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  82. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
  83. if(urls == null || urls.size() == 0){
  84. return newUrlInvokerMap;
  85. }
  86. Set<String> keys = new HashSet<String>();
  87. String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
  88. for (URL providerUrl : urls) {
  89. //如果reference端配置了protocol,则只选择匹配的protocol
  90. if (queryProtocols != null && queryProtocols.length() >0) {
  91. boolean accept = false;
  92. String[] acceptProtocols = queryProtocols.split(",");
  93. for (String acceptProtocol : acceptProtocols) {
  94. if (providerUrl.getProtocol().equals(acceptProtocol)) {
  95. accept = true;
  96. break;
  97. }
  98. }
  99. if (!accept) {
  100. continue;
  101. }
  102. }
  103. if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
  104. continue;
  105. }
  106. if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
  107. logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
  108. + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
  109. continue;
  110. }
  111. URL url = mergeUrl(providerUrl);
  112. String key = url.toFullString(); // URL参数是排序的
  113. if (keys.contains(key)) { // 重复URL
  114. continue;
  115. }
  116. keys.add(key);
  117. // 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
  118. Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
  119. Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
  120. if (invoker == null) { // 缓存中没有,重新refer
  121. try {
  122. boolean enabled = true;
  123. if (url.hasParameter(Constants.DISABLED_KEY)) {
  124. enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
  125. } else {
  126. enabled = url.getParameter(Constants.ENABLED_KEY, true);
  127. }
  128. if (enabled) {
  129. /** * url -> dubbo://...... * providerUrl -> dubbo://...... * protocol -> Protocol$Adaptive * extension -> getExtension("dubbo") -> ProtocolFilterWrapper(ProtocolListenerWrapper(DubboProtocol)) * 最后调用DubboProtocol的refer方法 */
  130. invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
  131. }
  132. } catch (Throwable t) {
  133. logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
  134. }
  135. if (invoker != null) { // 将新的引用放入缓存
  136. newUrlInvokerMap.put(key, invoker);
  137. }
  138. }else {
  139. newUrlInvokerMap.put(key, invoker);
  140. }
  141. }
  142. keys.clear();
  143. return newUrlInvokerMap;
  144. }

DubboProtocol

  1. public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  2. // create rpc invoker.
  3. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  4. invokers.add(invoker);
  5. return invoker;
  6. }
  7. // DubboProtocol
  8. private ExchangeClient[] getClients(URL url){
  9. //是否共享连接
  10. boolean service_share_connect = false;
  11. int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
  12. //如果connections不配置,则共享连接,否则每服务每连接
  13. if (connections == 0){
  14. service_share_connect = true;
  15. connections = 1;
  16. }
  17. ExchangeClient[] clients = new ExchangeClient[connections];
  18. for (int i = 0; i < clients.length; i++) {
  19. if (service_share_connect){
  20. clients[i] = getSharedClient(url);
  21. } else {
  22. // 初始化客户端
  23. clients[i] = initClient(url);
  24. }
  25. }
  26. return clients;
  27. }
  28. // DubboProtocol
  29. private ExchangeClient initClient(URL url) {
  30. // client type setting.
  31. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
  32. String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
  33. boolean compatible = (version != null && version.startsWith("1.0."));
  34. url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
  35. //默认开启heartbeat
  36. url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
  37. // BIO存在严重性能问题,暂时不允许使用
  38. if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
  39. throw new RpcException("Unsupported client type: " + str + "," +
  40. " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
  41. }
  42. ExchangeClient client ;
  43. try {
  44. //设置连接应该是lazy的
  45. if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
  46. client = new LazyConnectExchangeClient(url ,requestHandler);
  47. } else {
  48. // HeaderExchanger
  49. client = Exchangers.connect(url ,requestHandler);
  50. }
  51. } catch (RemotingException e) {
  52. throw new RpcException("Fail to create remoting client for service(" + url
  53. + "): " + e.getMessage(), e);
  54. }
  55. return client;
  56. }

HeaderExchanger

  1. public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  2. return new HeaderExchangeClient(Transporters.connect(url, new ChannelHandler[]{ new DecodeHandler(new HeaderExchangeHandler(handler))}));
  3. }

NettyTransporter

  1. public Client connect(URL url, ChannelHandler listener) throws RemotingException {
  2. // 创建netty客户端
  3. return new NettyClient(url, listener);
  4. }

客户端初始化过程完成。

服务调用过程

服务调用过程时序图。
Dubbo服务调用过程

客户端生成 Proxy0 代理

  • client:Proxy0
  • server:JavassistProxyFactory

Proxy0

  1. public java.lang.String sayHello(new InvokerInvocationHandler(invoker)) {
  2. Object[] args = new Object[1];
  3. args[0] = ($w) $1;
  4. Object ret = handler.invoke(this, methods[0], args);
  5. return (java.lang.String) ret;
  6. }

InvokerInvocationHandler

  1. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  2. String methodName = method.getName();
  3. Class<?>[] parameterTypes = method.getParameterTypes();
  4. if (method.getDeclaringClass() == Object.class) {
  5. return method.invoke(this.invoker, args);
  6. } else if ("toString".equals(methodName) && parameterTypes.length == 0) {
  7. return this.invoker.toString();
  8. } else if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
  9. return this.invoker.hashCode();
  10. } else {
  11. // invoker -> MockClusterInvoker
  12. return "equals".equals(methodName) && parameterTypes.length == 1 ? this.invoker.equals(args[0]) : this.invoker.invoke(new RpcInvocation(method, args)).recreate();
  13. }
  14. }

MockClusterInvoker

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. Result result = null;
  3. String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
  4. if (value.length() != 0 && !value.equalsIgnoreCase("false")) {
  5. if (value.startsWith("force")) {
  6. if (logger.isWarnEnabled()) {
  7. logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.directory.getUrl());
  8. }
  9. // 直接调用mock
  10. result = this.doMockInvoke(invocation, (RpcException)null);
  11. } else {
  12. try {
  13. result = this.invoker.invoke(invocation);
  14. } catch (RpcException var5) {
  15. if (var5.isBiz()) {
  16. throw var5;
  17. }
  18. if (logger.isWarnEnabled()) {
  19. logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.directory.getUrl(), var5);
  20. }
  21. // 调用异常执行mock方法
  22. result = this.doMockInvoke(invocation, var5);
  23. }
  24. }
  25. } else {
  26. // 调用执行
  27. // invoker -> FailoverClusterInvoker invoke方法在其父类AbstractClusterInvoker中
  28. result = this.invoker.invoke(invocation);
  29. }
  30. return result;
  31. }

AbstractClusterInvoker

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. this.checkWhetherDestroyed();
  3. // 获取所有invoker列表>>
  4. List<Invoker<T>> invokers = this.list(invocation);
  5. LoadBalance loadbalance;
  6. if (invokers != null && invokers.size() > 0) {
  7. loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(invocation.getMethodName(), "loadbalance", "random"));
  8. } else {
  9. loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
  10. }
  11. RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
  12. // 执行调用
  13. return this.doInvoke(invocation, invokers, loadbalance);
  14. }
  15. // 获取所有invoker列表
  16. protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
  17. List<Invoker<T>> invokers = this.directory.list(invocation);
  18. return invokers;
  19. }

AbstractDirectory

  1. public List<Invoker<T>> list(Invocation invocation) throws RpcException {
  2. if (this.destroyed) {
  3. throw new RpcException("Directory already destroyed .url: " + this.getUrl());
  4. } else {
  5. // 执行获取所有invoker列表
  6. // invoker -> ProtocolFilterWrapper
  7. List<Invoker<T>> invokers = this.doList(invocation);
  8. List<Router> localRouters = this.routers;
  9. if (localRouters != null && localRouters.size() > 0) {
  10. Iterator var4 = localRouters.iterator();
  11. while(var4.hasNext()) {
  12. Router router = (Router)var4.next();
  13. try {
  14. if (router.getUrl() == null || router.getUrl().getParameter("runtime", true)) {
  15. invokers = router.route(invokers, this.getConsumerUrl(), invocation);
  16. }
  17. } catch (Throwable var7) {
  18. logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var7.getMessage(), var7);
  19. }
  20. }
  21. }
  22. return invokers;
  23. }
  24. }

RegistryDirectory

  1. public List<Invoker<T>> doList(Invocation invocation) {
  2. if (this.forbidden) {
  3. throw new RpcException(4, "No provider available from registry " + this.getUrl().getAddress() + " for service " + this.getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
  4. } else {
  5. List<Invoker<T>> invokers = null;
  6. Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
  7. if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
  8. String methodName = RpcUtils.getMethodName(invocation);
  9. Object[] args = RpcUtils.getArguments(invocation);
  10. if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) {
  11. invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);
  12. }
  13. if (invokers == null) {
  14. invokers = (List)localMethodInvokerMap.get(methodName);
  15. }
  16. if (invokers == null) {
  17. invokers = (List)localMethodInvokerMap.get("*");
  18. }
  19. if (invokers == null) {
  20. Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
  21. if (iterator.hasNext()) {
  22. invokers = (List)iterator.next();
  23. }
  24. }
  25. }
  26. return (List)(invokers == null ? new ArrayList(0) : invokers);
  27. }
  28. }

回到 AbstractClusterInvokerinvoke 方法,最后会调用 FailoverClusterInvokerdoInvoke 方法

FailoverClusterInvoker

  1. public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. List<Invoker<T>> copyinvokers = invokers;
  3. this.checkInvokers(invokers, invocation);
  4. int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
  5. if (len <= 0) {
  6. len = 1;
  7. }
  8. RpcException le = null;
  9. List<Invoker<T>> invoked = new ArrayList(invokers.size());
  10. Set<String> providers = new HashSet(len);
  11. for(int i = 0; i < len; ++i) {
  12. if (i > 0) {
  13. this.checkWhetherDestroyed();
  14. copyinvokers = this.list(invocation);
  15. this.checkInvokers(copyinvokers, invocation);
  16. }
  17. // 选择调用invoker>>
  18. Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
  19. invoked.add(invoker);
  20. RpcContext.getContext().setInvokers(invoked);
  21. try {
  22. // 执行调用
  23. Result result = invoker.invoke(invocation);
  24. if (le != null && logger.isWarnEnabled()) {
  25. logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + this.getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
  26. }
  27. Result var12 = result;
  28. return var12;
  29. } catch (RpcException var17) {
  30. if (var17.isBiz()) {
  31. throw var17;
  32. }
  33. le = var17;
  34. } catch (Throwable var18) {
  35. le = new RpcException(var18.getMessage(), var18);
  36. } finally {
  37. providers.add(invoker.getUrl().getAddress());
  38. }
  39. }
  40. throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + this.getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + this.directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), (Throwable)(le != null && le.getCause() != null ? le.getCause() : le));
  41. }
  42. // AbstractClusterInvoker
  43. protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
  44. if (invokers != null && invokers.size() != 0) {
  45. String methodName = invocation == null ? "" : invocation.getMethodName();
  46. boolean sticky = ((Invoker)invokers.get(0)).getUrl().getMethodParameter(methodName, "sticky", false);
  47. if (this.stickyInvoker != null && !invokers.contains(this.stickyInvoker)) {
  48. this.stickyInvoker = null;
  49. }
  50. if (sticky && this.stickyInvoker != null && (selected == null || !selected.contains(this.stickyInvoker)) && this.availablecheck && this.stickyInvoker.isAvailable()) {
  51. return this.stickyInvoker;
  52. } else {
  53. Invoker<T> invoker = this.doselect(loadbalance, invocation, invokers, selected);
  54. if (sticky) {
  55. this.stickyInvoker = invoker;
  56. }
  57. return invoker;
  58. }
  59. } else {
  60. return null;
  61. }
  62. }
  63. // AbstractClusterInvoker
  64. private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
  65. if (invokers != null && invokers.size() != 0) {
  66. if (invokers.size() == 1) {
  67. return (Invoker)invokers.get(0);
  68. } else if (invokers.size() == 2 && selected != null && selected.size() > 0) {
  69. return selected.get(0) == invokers.get(0) ? (Invoker)invokers.get(1) : (Invoker)invokers.get(0);
  70. } else {
  71. // 负载均衡策略执行选择
  72. Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
  73. if (selected != null && selected.contains(invoker) || !invoker.isAvailable() && this.getUrl() != null && this.availablecheck) {
  74. try {
  75. Invoker<T> rinvoker = this.reselect(loadbalance, invocation, invokers, selected, this.availablecheck);
  76. if (rinvoker != null) {
  77. invoker = rinvoker;
  78. } else {
  79. int index = invokers.indexOf(invoker);
  80. try {
  81. invoker = index < invokers.size() - 1 ? (Invoker)invokers.get(index + 1) : invoker;
  82. } catch (Exception var9) {
  83. logger.warn(var9.getMessage() + " may because invokers list dynamic change, ignore.", var9);
  84. }
  85. }
  86. } catch (Throwable var10) {
  87. logger.error("clustor relselect fail reason is :" + var10.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", var10);
  88. }
  89. }
  90. return invoker;
  91. }
  92. } else {
  93. return null;
  94. }
  95. }

AbstractLoadBalance

  1. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  2. if (invokers != null && invokers.size() != 0) {
  3. return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
  4. } else {
  5. return null;
  6. }
  7. }
  8. // 模板方法
  9. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> var1, URL var2, Invocation var3);

RandomLoadBalance

  1. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  2. int length = invokers.size();
  3. int totalWeight = 0;
  4. boolean sameWeight = true;
  5. int offset;
  6. int i;
  7. for(offset = 0; offset < length; ++offset) {
  8. i = this.getWeight((Invoker)invokers.get(offset), invocation);
  9. totalWeight += i;
  10. if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {
  11. sameWeight = false;
  12. }
  13. }
  14. if (totalWeight > 0 && !sameWeight) {
  15. offset = this.random.nextInt(totalWeight);
  16. for(i = 0; i < length; ++i) {
  17. offset -= this.getWeight((Invoker)invokers.get(i), invocation);
  18. if (offset < 0) {
  19. return (Invoker)invokers.get(i);
  20. }
  21. }
  22. }
  23. return (Invoker)invokers.get(this.random.nextInt(length));
  24. }

回到 FailoverClusterInvokerdoInvoke 方法,会继续调用 DubboProtocol ,而 invoke 方法在其父类 AbstractInvoker

AbstractInvoker

  1. public Result invoke(Invocation inv) throws RpcException {
  2. if (this.destroyed.get()) {
  3. throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is DESTROYED, can not be invoked any more!");
  4. } else {
  5. RpcInvocation invocation = (RpcInvocation)inv;
  6. invocation.setInvoker(this);
  7. if (this.attachment != null && this.attachment.size() > 0) {
  8. invocation.addAttachmentsIfAbsent(this.attachment);
  9. }
  10. Map<String, String> context = RpcContext.getContext().getAttachments();
  11. if (context != null) {
  12. invocation.addAttachmentsIfAbsent(context);
  13. }
  14. if (this.getUrl().getMethodParameter(invocation.getMethodName(), "async", false)) {
  15. invocation.setAttachment("async", Boolean.TRUE.toString());
  16. }
  17. RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
  18. try {
  19. return this.doInvoke(invocation);
  20. } catch (InvocationTargetException var6) {
  21. Throwable te = var6.getTargetException();
  22. if (te == null) {
  23. return new RpcResult(var6);
  24. } else {
  25. if (te instanceof RpcException) {
  26. ((RpcException)te).setCode(3);
  27. }
  28. return new RpcResult(te);
  29. }
  30. } catch (RpcException var7) {
  31. if (var7.isBiz()) {
  32. return new RpcResult(var7);
  33. } else {
  34. throw var7;
  35. }
  36. } catch (Throwable var8) {
  37. return new RpcResult(var8);
  38. }
  39. }
  40. }
  41. protected abstract Result doInvoke(Invocation var1) throws Throwable;

DubboInvoker

  1. protected Result doInvoke(Invocation invocation) throws Throwable {
  2. RpcInvocation inv = (RpcInvocation)invocation;
  3. String methodName = RpcUtils.getMethodName(invocation);
  4. inv.setAttachment("path", this.getUrl().getPath());
  5. inv.setAttachment("version", this.version);
  6. ExchangeClient currentClient;
  7. if (this.clients.length == 1) {
  8. currentClient = this.clients[0];
  9. } else {
  10. currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
  11. }
  12. try {
  13. boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
  14. boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
  15. int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
  16. if (isOneway) {
  17. boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
  18. currentClient.send(inv, isSent);
  19. RpcContext.getContext().setFuture((Future)null);
  20. return new RpcResult();
  21. } else if (isAsync) {
  22. ResponseFuture future = currentClient.request(inv, timeout);
  23. RpcContext.getContext().setFuture(new FutureAdapter(future));
  24. return new RpcResult();
  25. } else {
  26. RpcContext.getContext().setFuture((Future)null);
  27. // 发送请求>>
  28. // currentClient -> 当前创建的连接
  29. return (Result)currentClient.request(inv, timeout).get();
  30. }
  31. } catch (TimeoutException var9) {
  32. throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var9.getMessage(), var9);
  33. } catch (RemotingException var10) {
  34. throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
  35. }
  36. }

HeaderExchangeClient

  1. public ResponseFuture request(Object request, int timeout) throws RemotingException {
  2. return this.channel.request(request, timeout);
  3. }

HeaderExchangeChannel

  1. public ResponseFuture request(Object request, int timeout) throws RemotingException {
  2. if (this.closed) {
  3. throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
  4. } else {
  5. Request req = new Request();
  6. req.setVersion("2.0.0");
  7. req.setTwoWay(true);
  8. req.setData(request);
  9. DefaultFuture future = new DefaultFuture(this.channel, req, timeout);
  10. try {
  11. this.channel.send(req);
  12. return future;
  13. } catch (RemotingException var6) {
  14. future.cancel();
  15. throw var6;
  16. }
  17. }
  18. }
  19. public void send(Object message) throws RemotingException {
  20. this.send(message, this.getUrl().getParameter("sent", false));
  21. }
  22. public void send(Object message, boolean sent) throws RemotingException {
  23. if (this.closed) {
  24. throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
  25. } else {
  26. if (!(message instanceof Request) && !(message instanceof Response) && !(message instanceof String)) {
  27. Request request = new Request();
  28. request.setVersion("2.0.0");
  29. request.setTwoWay(false);
  30. request.setData(message);
  31. this.channel.send(request, sent);
  32. } else {
  33. // channel -> NettyChannel
  34. this.channel.send(message, sent);
  35. }
  36. }
  37. }

NettyChannel

  1. public void send(Object message, boolean sent) throws RemotingException {
  2. super.send(message, sent);
  3. boolean success = true;
  4. int timeout = 0;
  5. try {
  6. ChannelFuture future = this.channel.write(message);
  7. if (sent) {
  8. timeout = this.getUrl().getPositiveParameter("timeout", 1000);
  9. success = future.await((long)timeout);
  10. }
  11. Throwable cause = future.getCause();
  12. if (cause != null) {
  13. throw cause;
  14. }
  15. } catch (Throwable var7) {
  16. throw new RemotingException(this, "Failed to send message " + message + " to " + this.getRemoteAddress() + ", cause: " + var7.getMessage(), var7);
  17. }
  18. if (!success) {
  19. throw new RemotingException(this, "Failed to send message " + message + " to " + this.getRemoteAddress() + "in timeout(" + timeout + "ms) limit");
  20. }
  21. }

服务调用完成。

发表评论

表情:
评论列表 (有 0 条评论,285人围观)

还没有评论,来说两句吧...

相关阅读