Dubbo源码分析系列之-深入Dubbo扩展机制

分手后的思念是犯贱 2021-10-15 01:45 445阅读 0赞

导语:
  在之前的博客中分析过Java的SPI机制,其实Dubbo的扩展点加载机制也是从JDK表中的SPI(Service Provider Interface)机制中开发而来,只不过在原生的基础上做了发现机制的增强处理。改进了如下的三个问题

  • JDK的SPI机制会一次性的实例化所有的扩展点,也就是说数据一种饿汉式加载,在初始化的时候消耗比较大,但是有些资源被加载之后可能很少使用,所以就导致了资源的消耗问题。
  • 如果扩展点加载失败,JDK不会获取扩展点名称,导致排查问题效率低下。
  • 增加了Spring的对于扩展点的IOC以及AOP的支持操作。也就是说一个扩展点可以直接通过setter方式注入其他的扩展点。

扩展点配置

  了解完基本的问题之后就来看看关于JavaSPI机制以及Dubbo的SPI机制的一些基本约定,对于JavaSPI来说是将扩展点的配置文件放到META_INF/目录下面并且与扩展接口同名的文件中写入对应的扩展类。而在Dubbo中在META_INF/dubbo/接口全类名。文件的内容是k-v的形式,配置名=扩展类全类名,多个实现类之间使用的是换行符分隔,也就是文件中所有的配置形式都是key-value的形式。例如对于Dubbo协议的扩展。
在这里插入图片描述

  1. dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

在配置文件中会指定对应的配置标签。

  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
  2. <dubbo:application name="demo-provider"/>
  3. <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  4. <dubbo:protocol name="dubbo"/>
  5. <bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
  6. <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>
  7. </beans>

扩展点特性

扩展点包装

  在对于扩展点的使用的时候首先要对其进行包装,将其包装成一个比较完美的类。这就用到了装饰者模式,对于扩展点进行自动包装。Dubbo对于这些类的包装都是通过xxxWrapper中来进行包装。在ExtensionLoader在加载扩展点时,如果加载到扩展点有拷贝构造函数,则判断为扩展点的Wrapper,这个地方使用到了原型模式,从一个原型复制很多的克隆实例。这些设计模式在后期详细分析源码的时候都会提及到。这里就拿Protocol的包装类来说明。

  1. public class ProtocolFilterWrapper implements Protocol {
  2. private final Protocol protocol;
  3. public ProtocolFilterWrapper(Protocol protocol) {
  4. if (protocol == null) {
  5. throw new IllegalArgumentException("protocol == null");
  6. }
  7. this.protocol = protocol;
  8. }
  9. private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
  10. Invoker<T> last = invoker;
  11. List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
  12. if (!filters.isEmpty()) {
  13. for (int i = filters.size() - 1; i >= 0; i--) {
  14. final Filter filter = filters.get(i);
  15. final Invoker<T> next = last;
  16. last = new Invoker<T>() {
  17. @Override
  18. public Class<T> getInterface() {
  19. return invoker.getInterface();
  20. }
  21. @Override
  22. public URL getUrl() {
  23. return invoker.getUrl();
  24. }
  25. @Override
  26. public boolean isAvailable() {
  27. return invoker.isAvailable();
  28. }
  29. @Override
  30. public Result invoke(Invocation invocation) throws RpcException {
  31. Result asyncResult;
  32. try {
  33. asyncResult = filter.invoke(next, invocation);
  34. } catch (Exception e) {
  35. // onError callback
  36. if (filter instanceof ListenableFilter) {
  37. Filter.Listener listener = ((ListenableFilter) filter).listener();
  38. if (listener != null) {
  39. listener.onError(e, invoker, invocation);
  40. }
  41. }
  42. throw e;
  43. }
  44. return asyncResult;
  45. }
  46. @Override
  47. public void destroy() {
  48. invoker.destroy();
  49. }
  50. @Override
  51. public String toString() {
  52. return invoker.toString();
  53. }
  54. };
  55. }
  56. }
  57. return new CallbackRegistrationInvoker<>(last, filters);
  58. }
  59. @Override
  60. public int getDefaultPort() {
  61. return protocol.getDefaultPort();
  62. }
  63. @Override
  64. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  65. if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
  66. return protocol.export(invoker);
  67. }
  68. return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
  69. }
  70. @Override
  71. public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
  72. if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
  73. return protocol.refer(type, url);
  74. }
  75. return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
  76. }
  77. @Override
  78. public void destroy() {
  79. protocol.destroy();
  80. }
  81. /** * Register callback for each filter may be better, just like {@link java.util.concurrent.CompletionStage}, each callback * registration generates a new CompletionStage whose status is determined by the original CompletionStage. * * If bridging status between filters is proved to not has significant performance drop, consider revert to the following commit: * https://github.com/apache/dubbo/pull/4127 */
  82. static class CallbackRegistrationInvoker<T> implements Invoker<T> {
  83. private final Invoker<T> filterInvoker;
  84. private final List<Filter> filters;
  85. public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<Filter> filters) {
  86. this.filterInvoker = filterInvoker;
  87. this.filters = filters;
  88. }
  89. @Override
  90. public Result invoke(Invocation invocation) throws RpcException {
  91. Result asyncResult = filterInvoker.invoke(invocation);
  92. asyncResult = asyncResult.whenCompleteWithContext((r, t) -> {
  93. for (int i = filters.size() - 1; i >= 0; i--) {
  94. Filter filter = filters.get(i);
  95. // onResponse callback
  96. if (filter instanceof ListenableFilter) {
  97. Filter.Listener listener = ((ListenableFilter) filter).listener();
  98. if (listener != null) {
  99. if (t == null) {
  100. listener.onResponse(r, filterInvoker, invocation);
  101. } else {
  102. listener.onError(t, filterInvoker, invocation);
  103. }
  104. }
  105. } else {
  106. filter.onResponse(r, filterInvoker, invocation);
  107. }
  108. }
  109. });
  110. return asyncResult;
  111. }
  112. @Override
  113. public Class<T> getInterface() {
  114. return filterInvoker.getInterface();
  115. }
  116. @Override
  117. public URL getUrl() {
  118. return filterInvoker.getUrl();
  119. }
  120. @Override
  121. public boolean isAvailable() {
  122. return filterInvoker.isAvailable();
  123. }
  124. @Override
  125. public void destroy() {
  126. filterInvoker.destroy();
  127. }
  128. }
  129. }

  通过代码我们会发现Wrapper类同样实现了扩展点接口,但是Wrapper并不是真正的实现。它的主要作用是用于从Extensionloader返回扩展点的时候,对真正的扩展点进行包装。也就是在其中buildInvokerChain()所实现的内容,也就是说从ExtensionLoader中返回的实际上是Wrapper类的实例,Wrapper持有实际的扩展点实现类。
  当然除了上面所展示的ProtocolFilterWrapper类还有其他的Wrapper,也就是说扩展点的包装类可以有多个,也可以根据需要继续增加,通过Wrapper类可以把所有扩展点公共逻辑移植到Wrapper中,新增加的Wrapper在所有的扩展点上添加了逻辑,有点像是AOP,Wrapper实际上是对扩展点的代理。

扩展点自动装配

  在前面提到了一个装饰者模式,在这里扩展点自动装配所使用的就是装配者模式。加载扩展点时会自动注入依赖的扩展点,加载扩展点的时候扩展点实现类的成员如果为其他扩展点类型ExtensionLoader会进行自动注入依赖的扩展点。ExtensionLoader通过扫描扩展点实现类的所有setter方法来判断其成员类型。也就是说ExtensionLoader会执行扩展点的拼装操作。

例如我们现在有个汽车工厂,其中有两个制造车间
制造轮子

  1. public interface MarkerWheel{
  2. Wheel makeWheel();
  3. }

制造引擎

  1. public interface MarkerEngine{
  2. Engine makerEngine();
  3. }

MarkerWheel 的实现类

  1. public class RealMarkerWheel implements MarkerWheel{
  2. MarkerWheel markerWheel;
  3. public setWheelMaker(MarkerWheel markerWheel){
  4. this.markerWheel = markerWheel;
  5. }
  6. public Car makeCar(){
  7. Wheel wheel = markerWheel.makeWheel();
  8. return new CarFactory(wheel,......)
  9. }
  10. }

  当ExtensionLoader加载的Car的扩展点实现的时候,调用setWheelMaker方法如果MarkerWheel也是扩展点则会注入MarkerWheel的实现并且实现装配。
  在这里所带来的一个问题就是ExtensionLoader要注入依赖扩展点的时候,如果决定使用哪个依赖扩展点的实现,也就是说如果有高中低三种类型的轮胎,在组装汽车的时候应该使用哪一种轮胎进行组装。

扩展点自适应

  首先ExtensionLoader注入的依赖扩展点是一个Adaptive实例,直到扩展点执行的时候才会决定调用哪个实现。Dubbo使用URL对象或者使用Key-Value 的方式进行传递参数的配置信息。扩展点方法调用都会有URL参数或者是类似的操作成员。
  这样依赖的扩展点可以从URL中获取到配置的信息,所有的扩展点配置好自己的key后,配置信息从URL上从最外层传入,URL在配置传递的过程中是一条总线的服务。

例如我们现在有个汽车工厂,其中有两个制造车间
制造轮子

  1. public interface MarkerWheel{
  2. Wheel makeWheel(URL url);
  3. }

制造引擎

  1. public interface MarkerEngine{
  2. Engine makerEngine(URL url);
  3. }

MarkerWheel 的实现类

  1. public class RealMarkerWheel implements MarkerWheel{
  2. MarkerWheel markerWheel;
  3. public setWheelMaker(MarkerWheel markerWheel){
  4. this.markerWheel = markerWheel;
  5. }
  6. public Car makeCar(URL url){
  7. Wheel wheel = markerWheel.makeWheel(url);
  8. return new CarFactory(wheel,......)
  9. }
  10. }

当执行Wheel wheel = markerWheel.makeWheel(url);方法的时候,注入的Adaptive实例可以提取约定Key来决定使用哪个MarkerWheel实现来调用对应实现真正的markerWheel.makeWheel()方法,例如上面提到的高中低三个层次的轮子。对于Adaptive的实现逻辑来说是固定的。指定提取的就是URL的key,也就是可以代理在真实的实现类上,可以动态生成。

  在Dubbo的ExtensionLoader的扩展点对应的Adaptive实现是在加载扩展点里动态生成,指定提取的URL的Key通过@Adaptive注解在方法上提供。例如在Dubbo中的Transport扩展点的代码

  1. @SPI("netty")
  2. public interface Transporter {
  3. /** * Bind a server. * * @param url server url * @param handler * @return server * @throws RemotingException * @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...) */
  4. @Adaptive({ Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
  5. Server bind(URL url, ChannelHandler handler) throws RemotingException;
  6. /** * Connect to a server. * * @param url server url * @param handler * @return client * @throws RemotingException * @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...) */
  7. @Adaptive({ Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
  8. Client connect(URL url, ChannelHandler handler) throws RemotingException;
  9. }

  对于bind()方法,Adaptive实现先查找server Key,如果没有找到再找transport Key,来决定代理到哪个扩展点点。对于connect()方法也是类似。

扩展点自动激活

  在对扩展点的自动激活上,会看到Dubbo上有另外的注解@Activate,例如集合扩展点,Filter、InvokerListener、ExportListener、TelnetHandler、StatusChecker等等,同时可以加载多个实现类。这个时候就可以通过提供的自动激活来简化配置。例如

  1. @Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
  2. public class AccessLogFilter implements Filter {
  3. private static final Logger logger = LoggerFactory.getLogger(AccessLogFilter.class);
  4. private static final String LOG_KEY = "dubbo.accesslog";

  当然在使用@Activate注解的时候也可以不指定值,也可以指定单个值。

注意

  • 1、在使用SPI配置的时候,所配置的META_INF/目录是放在开发者的jar内,而不是dubbo本身的jar包内,Dubbo会全Classpath扫描所有的Jar包内的同名这个文件并进行合并操作。
  • 2、扩展点使用的是单一实例加载(也就是在扩展的时候需要保证线程安全),缓存在ExtensionLoader中。

总结

  这篇博客结合源码以及官网的文档对于Dubbo的扩展机制进行深入的说明,提到了在Dubbo中的三个比较重要的注解,@SPI、@Adaptive、@Activate三个注解。在实际操作的时候也提到了它是对JavaSPI机制的扩展解决了JavaSPI机制存在的三个问题。基于Spring容器对于扩展点提供了IOC和AOP的功能。利用到了装配者模式,原型模式、代理模式以及装饰者模式等等。

发表评论

表情:
评论列表 (有 0 条评论,445人围观)

还没有评论,来说两句吧...

相关阅读