dubbo源码——核心概念梳理

矫情吗;* 2022-05-22 10:20 339阅读 0赞
  1. 1.proxyFactory:就是为了获取一个接口的代理类,例如获取一个远程接口的代理。
  2. 它有2个方法,代表2个作用
  3. a.getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象。
  4. b.getProxy :针对client端,创建接口的代理对象,例如DemoService的接口。
  5. 2.Wrapper:它类似springBeanWrapper,它就是包装了一个接口或一个类,可以通过wrapper对实例对象进行赋值 取值以及制定方法的调用。
  6. 3.Invoker:它是一个可执行的对象,能够根据方法的名称、参数得到相应的执行结果。
  7. 它里面有一个很重要的方法 Result invoke(Invocation invocation),
  8. Invocation是包含了需要执行的方法和参数等重要信息,目前它只有2个实现类RpcInvocation MockInvocation
  9. 它有3种类型的Invoker
  10. 1.本地执行类的Invoker
  11. server端:要执行 demoService.sayHello,就通过InjvmExporter来进行反射执行demoService.sayHello就可以了。
  12. 2.远程通信类的Invoker
  13. client端:要执行 demoService.sayHello,它封装了DubboInvoker进行远程通信,发送要执行的接口给server端。
  14. server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.
  15. 3.多个远程通信执行类的Invoker聚合成集群版的Invoker
  16. client端:要执行 demoService.sayHello,就要通过AbstractClusterInvoker来进行负载均衡,DubboInvoker进行远程通信,发送要执行的接口给server端。
  17. server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.
  18. 4.Protocol
  19. 1.export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类 invoker对象,通过协议暴露给外部。
  20. 2.refer:引用远程服务(用于客户端), 通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoService的远程动态接口。
  21. 5.exporter:维护invoder的生命周期。
  22. 6.exchanger:信息交换层,封装请求响应模式,同步转异步。
  23. 7.transporter:网络传输层,用来抽象nettymina的统一接口。
  24. 8.Directory:目录服务
  25. StaticDirectory:静态目录服务,他的Invoker是固定的。
  26. RegistryDirectory:注册目录服务,他的Invoker集合数据来源于zk注册中心的,他实现了NotifyListener接口,并且实现回调notify(List<URL> urls),
  27. 整个过程有一个重要的map变量,methodInvokerMap(它是数据的来源;同时也是notify的重要操作对象,重点是写操作。)

dubbo-spi:

  1. 为什么要设计adaptive?注解在类上和注解在方法上的区别?
  2. adaptive设计的目的是为了识别固定已知类和扩展未知类。
  3. 1.注解在类上:代表人工实现,实现一个装饰类(设计模式中的装饰模式),它主要作用于固定已知类,
  4. 目前整个系统只有2个,AdaptiveCompilerAdaptiveExtensionFactory
  5. a.为什么AdaptiveCompiler这个类是固定已知的?因为整个框架仅支持JavassistJdkCompiler
  6. b.为什么AdaptiveExtensionFactory这个类是固定已知的?因为整个框架仅支持2objFactory,一个是spi,另一个是spring
  7. 2.注解在方法上:代表自动生成和编译一个动态的Adpative类,它主要是用于SPI,因为spi的类是不固定、未知的扩展类,所以设计了动态$Adaptive类.
  8. 例如 Protocolspi类有 injvm dubbo registry filter listener等等 很多扩展未知类,
  9. 它设计了Protocol$Adaptive的类,通过ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(spi类);来提取对象
  10. 为什么dubbo要自己设计一套SPI
  11. 这是原始JDK spi的代码
  12. ServiceLoader<Command> serviceLoader=ServiceLoader.load(Command.class);
  13. for(Command command:serviceLoader){
  14. command.execute();
  15. }
  16. dubbo在原来的基础上设计了以下功能
  17. 1.原始JDK spi不支持缓存;dubbo设计了缓存对象:spikeyvalue 缓存在 cachedInstances对象里面,它是一个ConcurrentMap
  18. 2.原始JDK spi不支持默认值,dubbo设计默认值:@SPI("dubbo") 代表默认的spi对象,例如Protocol@SPI("dubbo")就是 DubboProtocol
  19. 通过 ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension()那默认对象
  20. 3.jdk要用for循环判断对象,dubbo设计getExtension灵活方便,动态获取spi对象,
  21. 例如 ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(spikey)来提取对象
  22. 4.原始JDK spi不支持 AOP功能,dubbo设计增加了AOP功能,在cachedWrapperClasses,在原始spi类,包装了XxxxFilterWrapper XxxxListenerWrapper
  23. 5.原始JDK spi不支持 IOC功能,dubbo设计增加了IOC,通过构造函数注入,代码为:wrapperClass.getConstructor(type).newInstance(instance),
  24. dubbo spi 的目的:获取一个指定实现类的对象。
  25. 途径:ExtensionLoader.getExtension(String name)
  26. 实现路径:
  27. getExtensionLoader(Class<T> type) 就是为该接口new 一个ExtensionLoader,然后缓存起来。
  28. getAdaptiveExtension() 获取一个扩展类,如果@Adaptive注解在类上就是一个装饰类;如果注解在方法上就是一个动态代理类,例如Protocol$Adaptive对象。
  29. getExtension(String name) 获取一个指定对象。
  30. -----------------------ExtensionLoader.getExtensionLoader(Class<T> type)
  31. ExtensionLoader.getExtensionLoader(Container.class)
  32. -->this.type = type;
  33. -->objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
  34. -->ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
  35. -->this.type = type;
  36. -->objectFactory =null;
  37. 执行以上代码完成了2个属性的初始化
  38. 1.每个一个ExtensionLoader都包含了2个值 type objectFactory
  39. Class<?> type//构造器 初始化时要得到的接口名
  40. ExtensionFactory objectFactory//构造器 初始化时 AdaptiveExtensionFactory[SpiExtensionFactory,SpringExtensionFactory]
  41. 2.new 一个ExtensionLoader 存储在ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS
  42. 关于这个objectFactory的一些细节:
  43. 1.objectFactory就是ExtensionFactory,它也是通过ExtensionLoader.getExtensionLoader(ExtensionFactory.class)来实现的,但是它的objectFactory=null
  44. 2.objectFactory作用,它就是为dubboIOC提供所有对象。
  45. -----------------------getAdaptiveExtension()
  46. -->getAdaptiveExtension()//为cachedAdaptiveInstance赋值
  47. -->createAdaptiveExtension()
  48. -->getAdaptiveExtensionClass()
  49. -->getExtensionClasses()//为cachedClasses 赋值
  50. -->loadExtensionClasses()
  51. -->loadFile
  52. -->createAdaptiveExtensionClass()//自动生成和编译一个动态的adpative类,这个类是一个代理类
  53. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension()
  54. -->compiler.compile(code, classLoader)
  55. -->injectExtension()//作用:进入IOC的反转控制模式,实现了动态入注
  56. 关于loadfile的一些细节
  57. 目的:通过把配置文件META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol的内容,存储在缓存变量里面。
  58. cachedAdaptiveClass//如果这个class含有adative注解就赋值,例如ExtensionFactory,而例如Protocol在这个环节是没有的。
  59. cachedWrapperClasses//只有当该class无adative注解,并且构造函数包含目标接口(type)类型,
  60. 例如protocol里面的spi就只有ProtocolFilterWrapperProtocolListenerWrapper能命中
  61. cachedActivates//剩下的类,包含Activate注解
  62. cachedNames//剩下的类就存储在这里。
  63. -----------------------getExtension(String name)
  64. getExtension(String name) //指定对象缓存在cachedInstances;get出来的对象wrapper对象,例如protocol就是ProtocolFilterWrapper和ProtocolListenerWrapper其中一个。
  65. -->createExtension(String name)
  66. -->getExtensionClasses()
  67. -->injectExtension(T instance)//dubbo的IOC反转控制,就是从spi和spring里面提取对象赋值。
  68. -->objectFactory.getExtension(pt, property)
  69. -->SpiExtensionFactory.getExtension(type, name)
  70. -->ExtensionLoader.getExtensionLoader(type)
  71. -->loader.getAdaptiveExtension()
  72. -->SpringExtensionFactory.getExtension(type, name)
  73. -->context.getBean(name)
  74. -->injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance))//AOP的简单设计

服务发布原理:

  1. 服务发布-原理
  2. 第一个发布的动作:暴露本地服务
  3. Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 127.0.0.1
  4. 第二个发布动作:暴露远程服务
  5. Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
  6. Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&monitor=dubbo%3A%2F%2F192.168.48.117%3A2181%2Fcom.alibaba.dubbo.registry.RegistryService%3Fapplication%3Ddemo-provider%26backup%3D192.168.48.120%3A2181%2C192.168.48.123%3A2181%26dubbo%3D2.0.0%26owner%3Dwilliam%26pid%3D8484%26protocol%3Dregistry%26refer%3Ddubbo%253D2.0.0%2526interface%253Dcom.alibaba.dubbo.monitor.MonitorService%2526pid%253D8484%2526timestamp%253D1473908495729%26registry%3Dzookeeper%26timestamp%3D1473908495398&owner=william&pid=8484&side=provider&timestamp=1473908495465 to registry registry://192.168.48.117:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&backup=192.168.48.120:2181,192.168.48.123:2181&dubbo=2.0.0&owner=william&pid=8484&registry=zookeeper&timestamp=1473908495398, dubbo version: 2.0.0, current host: 127.0.0.1
  7. 第三个发布动作:启动netty
  8. Start NettyServer bind /0.0.0.0:20880, export /192.168.100.38:20880, dubbo version: 2.0.0, current host: 127.0.0.1
  9. 第四个发布动作:打开连接zk
  10. INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.48.117:2181
  11. 第五个发布动作:到zk注册
  12. Register: dubbo://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
  13. 第六个发布动作;监听zk
  14. Subscribe: provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, dubbo version: 2.0.0, current host: 127.0.0.1
  15. Notify urls for subscribe url provider://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465, urls: [empty://192.168.100.38:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=8484&side=provider&timestamp=1473908495465], dubbo version: 2.0.0, current host: 127.0.0.1
  16. 暴露本地服务和暴露远程服务的区别是什么?
  17. 1.暴露本地服务:指暴露在用一个JVM里面,不用通过调用zk来进行远程通信。例如:在同一个服务,自己调用自己的接口,就没必要进行网络IP连接来通信。
  18. 2.暴露远程服务:指暴露给远程客户端的IP和端口号,通过网络来实现通信。
  19. zk持久化节点 和临时节点有什么区别?
  20. 持久化节点:一旦被创建,触发主动删除掉,否则就一直存储在ZK里面。
  21. 临时节点:与客户端会话绑定,一旦客户端会话失效,这个客户端端所创建的所有临时节点都会被删除。
  22. ServiceBean.onApplicationEvent
  23. -->export()
  24. -->ServiceConfig.export()
  25. -->doExport()
  26. -->doExportUrls()//里面有一个for循环,代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,默认是tcp协议
  27. -->loadRegistries(true)//从dubbo.properties里面组装registry的url信息
  28. -->doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)
  29. //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
  30. -->exportLocal(URL url)
  31. -->proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
  32. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
  33. -->extension.getInvoker(arg0, arg1, arg2)
  34. -->StubProxyFactoryWrapper.getInvoker(T proxy, Class<T> type, URL url)
  35. -->proxyFactory.getInvoker(proxy, type, url)
  36. -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
  37. -->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)
  38. -->makeWrapper(Class<?> c)
  39. -->return new AbstractProxyInvoker<T>(proxy, type, url)
  40. -->protocol.export
  41. -->Protocol$Adpative.export
  42. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm");
  43. -->extension.export(arg0)
  44. -->ProtocolFilterWrapper.export
  45. -->buildInvokerChain //创建8个filter
  46. -->ProtocolListenerWrapper.export
  47. -->InjvmProtocol.export
  48. -->return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap)
  49. -->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter
  50. //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
  51. -->proxyFactory.getInvoker//原理和本地暴露一样都是为了获取一个Invoker对象
  52. -->protocol.export(invoker)
  53. -->Protocol$Adpative.export
  54. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
  55. -->extension.export(arg0)
  56. -->ProtocolFilterWrapper.export
  57. -->ProtocolListenerWrapper.export
  58. -->RegistryProtocol.export
  59. -->doLocalExport(originInvoker)
  60. -->getCacheKey(originInvoker);//读取 dubbo://192.168.100.51:20880/
  61. -->rotocol.export
  62. -->Protocol$Adpative.export
  63. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");
  64. -->extension.export(arg0)
  65. -->ProtocolFilterWrapper.export
  66. -->buildInvokerChain//创建8个filter
  67. -->ProtocolListenerWrapper.export
  68. ---------1.netty服务暴露的开始------- -->DubboProtocol.export
  69. -->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20880
  70. -->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService:20880, this=DubboExporter
  71. -->openServer(url)
  72. -->createServer(url)
  73. --------2.信息交换层 exchanger 开始-------------->Exchangers.bind(url, requestHandler)//exchaanger是一个信息交换层
  74. -->getExchanger(url)
  75. -->getExchanger(type)
  76. -->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
  77. -->HeaderExchanger.bind
  78. -->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
  79. -->new HeaderExchangeHandler(handler)//this.handler = handler
  80. -->new DecodeHandler
  81. -->new AbstractChannelHandlerDelegate//this.handler = handler;
  82. ---------3.网络传输层 transporter--------------------->Transporters.bind
  83. -->getTransporter()
  84. -->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
  85. -->Transporter$Adpative.bind
  86. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
  87. -->extension.bind(arg0, arg1)
  88. -->NettyTransporter.bind
  89. --new NettyServer(url, listener)
  90. -->AbstractPeer //this.url = url; this.handler = handler;
  91. -->AbstractEndpoint//codec timeout=1000 connectTimeout=3000
  92. -->AbstractServer //bindAddress accepts=0 idleTimeout=600000
  93. ---------4.打开断开,暴露netty服务-------------------------------->doOpen()
  94. -->设置 NioServerSocketChannelFactory boss worker的线程池 线程个数为3
  95. -->设置编解码 hander
  96. -->bootstrap.bind(getBindAddress())
  97. -->new HeaderExchangeServer
  98. -->this.server=NettyServer
  99. -->heartbeat=60000
  100. -->heartbeatTimeout=180000
  101. -->startHeatbeatTimer()//这是一个心跳定时器,采用了线程池,如果断开就心跳重连。
  102. -->getRegistry(originInvoker)//zk 连接
  103. -->registryFactory.getRegistry(registryUrl)
  104. -->ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper");
  105. -->extension.getRegistry(arg0)
  106. -->AbstractRegistryFactory.getRegistry//创建一个注册中心,存储在REGISTRIES
  107. -->createRegistry(url)
  108. -->new ZookeeperRegistry(url, zookeeperTransporter)
  109. -->AbstractRegistry
  110. -->loadProperties()//目的:把C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
  111. 文件中的内容加载为properties
  112. -->notify(url.getBackupUrls())//不做任何事
  113. -->FailbackRegistry
  114. -->retryExecutor.scheduleWithFixedDelay(new Runnable()//建立线程池,检测并连接注册中心,如果失败了就重连
  115. -->ZookeeperRegistry
  116. -->zookeeperTransporter.connect(url)
  117. -->ZookeeperTransporter$Adpative.connect(url)
  118. -->ExtensionLoader.getExtensionLoader(ZookeeperTransporter.class).getExtension("zkclient");
  119. -->extension.connect(arg0)
  120. -->ZkclientZookeeperTransporter.connect
  121. -->new ZkclientZookeeperClient(url)
  122. -->AbstractZookeeperClient
  123. -->ZkclientZookeeperClient
  124. -->new ZkClient(url.getBackupAddress());//连接ZK
  125. -->client.subscribeStateChanges(new IZkStateListener()//订阅的目标:连接断开,重连
  126. -->zkClient.addStateListener(new StateListener()
  127. -->recover //连接失败 重连
  128. -->registry.register(registedProviderUrl)//创建节点
  129. -->AbstractRegistry.register
  130. -->FailbackRegistry.register
  131. -->doRegister(url)//向zk服务器端发送注册请求
  132. -->ZookeeperRegistry.doRegister
  133. -->zkClient.create
  134. -->AbstractZookeeperClient.create//dubbo/com.alibaba.dubbo.demo.DemoService/providers/
  135. dubbo%3A%2F%2F192.168.100.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26
  136. application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3D
  137. com.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3
  138. Dwilliam%26pid%3D2416%26side%3Dprovider%26timestamp%3D1474276306353
  139. -->createEphemeral(path);//临时节点 dubbo%3A%2F%2F192.168.100.52%3A20880%2F.............
  140. -->createPersistent(path);//持久化节点 dubbo/com.alibaba.dubbo.demo.DemoService/providers
  141. -->registry.subscribe//订阅ZK
  142. -->AbstractRegistry.subscribe
  143. -->FailbackRegistry.subscribe
  144. -->doSubscribe(url, listener)// 向服务器端发送订阅请求
  145. -->ZookeeperRegistry.doSubscribe
  146. -->new ChildListener()
  147. -->实现了 childChanged
  148. -->实现并执行 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
  149. //A
  150. -->zkClient.create(path, false);//第一步:先创建持久化节点/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
  151. -->zkClient.addChildListener(path, zkListener)
  152. -->AbstractZookeeperClient.addChildListener
  153. //C
  154. -->createTargetChildListener(path, listener)//第三步:收到订阅后的处理,交给FailbackRegistry.notify处理
  155. -->ZkclientZookeeperClient.createTargetChildListener
  156. -->new IZkChildListener()
  157. -->实现了 handleChildChange //收到订阅后的处理
  158. -->listener.childChanged(parentPath, currentChilds);
  159. -->实现并执行ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
  160. -->收到订阅后处理 FailbackRegistry.notify
  161. //B
  162. -->addTargetChildListener(path, targetListener)第二步
  163. -->ZkclientZookeeperClient.addTargetChildListener
  164. -->client.subscribeChildChanges(path, listener)//第二步:启动加入订阅/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
  165. -->notify(url, listener, urls)
  166. -->FailbackRegistry.notify
  167. -->doNotify(url, listener, urls);
  168. -->AbstractRegistry.notify
  169. -->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
  170. -->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
  171. -->listener.notify(categoryList)
  172. -->RegistryProtocol.notify
  173. -->RegistryProtocol.this.getProviderUrl(originInvoker)//通过invoker的url 获取 providerUrl的地址

服务响应原理:

  1. NettyHandler.messageReceived
  2. -->AbstractPeer.received
  3. -->MultiMessageHandler.received
  4. -->HeartbeatHandler.received
  5. -->AllChannelHandler.received
  6. -->ChannelEventRunnable.run //线程池 执行线程
  7. -->DecodeHandler.received
  8. -->HeaderExchangeHandler.received
  9. -->handleRequest(exchangeChannel, request)//网络通信接收处理
  10. -->DubboProtocol.reply
  11. -->getInvoker
  12. -->exporterMap.get(serviceKey)//从服务暴露里面提取
  13. -->DubboExporter.getInvoker()//最终得到一个invoker
  14. -------------------------------------------------------------------------扩展点--------------
  15. -->ProtocolFilterWrapper.invoke
  16. -->EchoFilter.invoke
  17. -->ClassLoaderFilter.invoke
  18. -->GenericFilter.invoke
  19. -->TraceFilter.invoke
  20. -->MonitorFilter.invoke
  21. -->TimeoutFilter.invoke
  22. -->ExceptionFilter.invoke
  23. -->InvokerWrapper.invoke
  24. -------------------------------------------------------------------------扩展点--------------
  25. -->AbstractProxyInvoker.invoke
  26. -->JavassistProxyFactory.AbstractProxyInvoker.doInvoke
  27. --> 进入真正执行的实现类 DemoServiceImpl.sayHello
  28. ....................................
  29. -->channel.send(response);//把接收处理的结果,发送回去
  30. -->AbstractPeer.send
  31. -->NettyChannel.send
  32. -->ChannelFuture future = channel.write(message);//数据发回consumer

消费者引用:

  1. ReferenceBean.getObject()
  2. -->ReferenceConfig.get()
  3. -->init()
  4. -->createProxy(map)
  5. -->refprotocol.refer(interfaceClass, urls.get(0))
  6. -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry");
  7. -->extension.refer(arg0, arg1);
  8. -->ProtocolFilterWrapper.refer
  9. -->RegistryProtocol.refer
  10. -->registryFactory.getRegistry(url)//建立zk的连接,和服务端发布一样(省略代码)
  11. -->doRefer(cluster, registry, type, url)
  12. -->registry.register//创建zk的节点,和服务端发布一样(省略代码)。节点名为:dubbo/com.alibaba.dubbo.demo.DemoService/consumers
  13. -->registry.subscribe//订阅zk的节点,和服务端发布一样(省略代码)。 /dubbo/com.alibaba.dubbo.demo.DemoService/providers,
  14. /dubbo/com.alibaba.dubbo.demo.DemoService/configurators,
  15. /dubbo/com.alibaba.dubbo.demo.DemoService/routers]
  16. -->notify(url, listener, urls);
  17. -->FailbackRegistry.notify
  18. -->doNotify(url, listener, urls);
  19. -->AbstractRegistry.notify
  20. -->saveProperties(url);//把服务端的注册url信息更新到C:\Users\bobo\.dubbo\dubbo-registry-192.168.48.117.cache
  21. -->registryCacheExecutor.execute(new SaveProperties(version));//采用线程池来处理
  22. -->listener.notify(categoryList)
  23. -->RegistryDirectory.notify
  24. -->refreshInvoker(invokerUrls)//刷新缓存中的invoker列表
  25. -->destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 关闭未使用的Invoker
  26. -->最终目的:刷新Map<String, Invoker<T>> urlInvokerMap 对象
  27. 刷新Map<String, List<Invoker<T>>> methodInvokerMap对象
  28. -->cluster.join(directory)//加入集群路由
  29. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension("failover");
  30. -->MockClusterWrapper.join
  31. -->this.cluster.join(directory)
  32. -->FailoverCluster.join
  33. -->return new FailoverClusterInvoker<T>(directory)
  34. -->new MockClusterInvoker
  35. -->proxyFactory.getProxy(invoker)//创建服务代理
  36. -->ProxyFactory$Adpative.getProxy
  37. -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
  38. -->StubProxyFactoryWrapper.getProxy
  39. -->proxyFactory.getProxy(invoker)
  40. -->AbstractProxyFactory.getProxy
  41. -->getProxy(invoker, interfaces)
  42. -->Proxy.getProxy(interfaces)//目前代理对象interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService
  43. -->InvokerInvocationHandler// 采用jdk自带的InvocationHandler,创建InvokerInvocationHandler对象。

消费者响应:

  1. demoService.sayHello("world" + i)
  2. -->InvokerInvocationHandler.invoke
  3. -->invoker.invoke
  4. -->RpcInvocation//所有请求参数都会转换为RpcInvocation
  5. -->MockClusterInvoker.invoke //1.进入集群
  6. -->invoker.invoke(invocation)
  7. -->AbstractClusterInvoker.invoke
  8. -->list(invocation)
  9. -->directory.list//2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker
  10. -->AbstractDirectory.list
  11. -->doList(invocation)
  12. -->RegistryDirectory.doList// 从this.methodInvokerMap里面查找一个Invoker
  13. -->router.route //3.进入路由
  14. -->MockInvokersSelector.route
  15. -->getNormalInvokers
  16. -->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("roundrobin")
  17. -->doInvoke
  18. -->FailoverClusterInvoker.doInvoke
  19. -->select//4.进入负载均衡
  20. -->AbstractClusterInvoker.select
  21. -->doselect
  22. -->loadbalance.select
  23. -->AbstractLoadBalance.select
  24. -->doSelect
  25. -->RoundRobinLoadBalance.doSelect
  26. -->invokers.get(currentSequence % length)//取模轮循
  27. -->Result result = invoker.invoke(invocation)
  28. --------------------------------------------------------------------------扩展点----------------
  29. -->InvokerWrapper.invoke
  30. -->ProtocolFilterWrapper.invoke
  31. -->ConsumerContextFilter.invoke
  32. -->ProtocolFilterWrapper.invoke
  33. -->MonitorFilter.invoke
  34. -->ProtocolFilterWrapper.invoke
  35. -->FutureFilter.invoke
  36. -->ListenerInvokerWrapper.invoke
  37. -->AbstractInvoker.invoke
  38. ---------------------------------------------------------------------------扩展点---------------
  39. -->doInvoke(invocation)
  40. -->DubboInvoker.doInvoke//为什么DubboInvoker是个protocol? 因为RegistryDirectory.refreshInvoker.toInvokers: protocol.refer
  41. -->ReferenceCountExchangeClient.request
  42. -->HeaderExchangeClient.request
  43. -->HeaderExchangeChannel.request
  44. -->NettyClient.send
  45. -->AbstractPeer.send
  46. -->NettyChannel.send
  47. -->ChannelFuture future = channel.write(message);//最终的目的:通过netty的channel发送网络数据
  48. //consumer的接收原理
  49. NettyHandler.messageReceived
  50. -->AbstractPeer.received
  51. -->MultiMessageHandler.received
  52. -->HeartbeatHandler.received
  53. -->AllChannelHandler.received
  54. -->ChannelEventRunnable.run //线程池 执行线程
  55. -->DecodeHandler.received
  56. -->HeaderExchangeHandler.received
  57. -->handleResponse(channel, (Response) message);
  58. -->HeaderExchangeHandler.handleResponse
  59. -->DefaultFuture.received
  60. -->DefaultFuture.doReceived
  61. private void doReceived(Response res) {
  62. lock.lock();
  63. try {
  64. response = res;
  65. if (done != null) {
  66. done.signal();
  67. }
  68. } finally {
  69. lock.unlock();
  70. }
  71. if (callback != null) {
  72. invokeCallback(callback);
  73. }
  74. }
  75. 灰度发布例子:
  76. provider 192.168.100.38 192.168.48.32
  77. 1.发布192.168.48.32,切断192.168.48.32访问流量,然后进行服务的发布。
  78. 2.192.168.48.32发布成功后,恢复 192.168.48.32的流量,
  79. 3.切断192.168.100.38,继续发布 192.168.100.38
  80. 2个疑问
  81. 1.启动路由规则,它触发了那些动作?
  82. a.什么时候加入ConditionRouter
  83. b.ConditionRouter是怎么过滤的?
  84. 2.路由规则有哪些实现类?
  85. ConditionRouter:条件路由,后台管理的路由配置都是条件路由。
  86. ScriptRouter:脚本路由

同步-异步:

  1. dubbo 是基于netty NIO的非阻塞 并行调用通信。 (阻塞 非阻塞 异步 同步 区别
  2. dubbo 的通信方式 3类类型:
  3. 1.异步,有返回值
  4. <dubbo:method name="sayHello" async="true"></dubbo:method>
  5. Future<String> temp= RpcContext.getContext().getFuture();
  6. hello=temp.get();
  7. 2.异步,无返回值
  8. <dubbo:method name="sayHello" return="false"></dubbo:method>
  9. 3.异步,变同步(默认的通信方式)
  10. A.当前线程怎么让它 “暂停,等结果回来后,再执行”?
  11. B.socket是一个全双工的通信方式,那么在多线程的情况下,如何知道那个返回结果对应原先那条线程的调用?
  12. 通过一个全局唯一的ID来做consumer provider 来回传输。

编解码:

  1. tcp 为什么会出现粘包 拆包的问题?
  2. 1.消息的定长,例如定1000个字节
  3. 2.就是在包尾增加回车或空格等特殊字符作为切割,典型的FTP协议
  4. 3.将消息分为消息头消息体。例如 dubbo
  5. ----------1------consumer请求编码----------------------
  6. -->NettyCodecAdapter.InternalEncoder.encode
  7. -->DubboCountCodec.encode
  8. -->ExchangeCodec.encode
  9. -->ExchangeCodec.encodeRequest
  10. -->DubboCodec.encodeRequestData
  11. dubbo的消息头是一个定长的 16个字节。
  12. 1-2个字节:是一个魔数数字:就是一个固定的数字
  13. 3个字节:是双向(有去有回) 或单向(有去无回)的标记
  14. 第四个字节:??? request 没有第四个字节)
  15. 5-12个字节:请求idlong8个字节。异步变同步的全局唯一ID,用来做consumerprovider的来回通信标记。
  16. 13-16个字节:消息体的长度,也就是消息头+请求数据的长度。
  17. ----------2------provider 请求解码----------------------
  18. --NettyCodecAdapter.InternalDecoder.messageReceived
  19. -->DubboCountCodec.decode
  20. -->ExchangeCodec.decode
  21. -->ExchangeCodec.decodeBody
  22. ----------3------provider响应结果编码----------------------
  23. -->NettyCodecAdapter.InternalEncoder.encode
  24. -->DubboCountCodec.encode
  25. -->ExchangeCodec.encode
  26. -->ExchangeCodec.encodeResponse
  27. -->DubboCodec.encodeResponseData//先写入一个字节 这个字节可能是RESPONSE_NULL_VALUE RESPONSE_VALUE RESPONSE_WITH_EXCEPTION
  28. dubbo的消息头是一个定长的 16个字节。
  29. 1-2个字节:是一个魔数数字:就是一个固定的数字
  30. 3个字节:序列号组件类型,它用于和客户端约定的序列号编码号
  31. 第四个字节:它是response的结果响应码 例如 OK=20
  32. 5-12个字节:请求idlong8个字节。异步变同步的全局唯一ID,用来做consumerprovider的来回通信标记。
  33. 13-16个字节:消息体的长度,也就是消息头+请求数据的长度。
  34. ----------4------consumer响应结果解码----------------------
  35. --NettyCodecAdapter.InternalDecoder.messageReceived
  36. -->DubboCountCodec.decode
  37. -->ExchangeCodec.decode
  38. -->DubboCodec.decodeBody
  39. -->DecodeableRpcResult.decode//根据RESPONSE_NULL_VALUE RESPONSE_VALUE RESPONSE_WITH_EXCEPTION进行响应的处理

Adaptive动态类模板:

  1. package <扩展点接口所在包>;
  2. public class <扩展点接口名>$Adpative implements <扩展点接口> {
  3. public <有@Adaptive注解的接口方法>(<方法参数>) {
  4. if(是否有URL类型方法参数?) 使用该URL参数
  5. else if(是否有方法类型上有URL属性) 使用该URL属性
  6. # <else 在加载扩展点生成自适应扩展点类时抛异常,即加载扩展点失败!>
  7. if(获取的URL == null) {
  8. throw new IllegalArgumentException("url == null");
  9. }
  10. 根据@Adaptive注解上声明的Key的顺序,从URL获致Value,作为实际扩展点名。
  11. URL没有Value,则使用缺省扩展点实现。如没有扩展点, throw new IllegalStateException("Fail to get extension");
  12. 在扩展点实现调用该方法,并返回结果。
  13. }
  14. public <有@Adaptive注解的接口方法>(<方法参数>) {
  15. throw new UnsupportedOperationException("is not adaptive method!");
  16. }
  17. }

发表评论

表情:
评论列表 (有 0 条评论,339人围观)

还没有评论,来说两句吧...

相关阅读