dubbo(10) 简单理解一个dubbo服务的完整调用过程

喜欢ヅ旅行 2023-06-20 06:13 213阅读 0赞

参考文献dubbo官网-dubbo服务的完整调用过程

感受学习业内优秀开源分布式框架的底层rpc实现。

调用过程大致可以分为六个阶段,这里只贴出服务调用各个阶段的调用栈进行备忘,详细源码分析请点击原文链接进行阅读

1 服务消费方(dubbo-consumer)发布请求
调用栈

  1. proxy0#sayHello(String)
  2. —> InvokerInvocationHandler#invoke(Object, Method, Object[])
  3. —> MockClusterInvoker#invoke(Invocation)
  4. —> AbstractClusterInvoker#invoke(Invocation)
  5. —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
  6. —> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
  7. —> ListenerInvokerWrapper#invoke(Invocation)
  8. —> AbstractInvoker#invoke(Invocation)
  9. —> DubboInvoker#doInvoke(Invocation)
  10. —> ReferenceCountExchangeClient#request(Object, int)
  11. —> HeaderExchangeClient#request(Object, int)
  12. —> HeaderExchangeChannel#request(Object, int)
  13. —> AbstractPeer#send(Object)
  14. —> AbstractClient#send(Object, boolean)
  15. —> NettyChannel#send(Object, boolean)
  16. —> NioClientSocketChannel#write(Object)

2 请求(request)编码
dubbo数据包
在这里插入图片描述
请求头
























































偏移量(Bit) 字段 取值
0 ~ 7 魔数高位 0xda00
8 ~ 15 魔数低位 0xbb
16 数据包类型 0 - Response, 1 - Request
17 调用方式 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18 事件标识 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23 序列化器编号 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31 状态 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE …
32 ~ 95 请求编号 共8字节,运行时生成
96 ~ 127 消息体长度 运行时计算

调用栈

  1. ExchangeCodec#encode(Channel channel, ChannelBuffer buffer, Object msg)
  2. ->ExchangeCodec#encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
  3. ->DubboCodec#encodeRequestData(Channel channel, ObjectOutput out, Object data, String version)

3 服务提供方(dubbo-provider)解码请求
调用栈

  1. ExchangeCodec#decode(Channel channel, ChannelBuffer buffer)
  2. ->ExchangeCodec#decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
  3. ->DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
  4. ->DecodeableRpcInvocation#decode

4 服务提供方调用服务
提供方处理请求的线程模型
解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。这期间该对象会被依次传递给 NettyServer、MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。

调用栈

  1. NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent)
  2. —> AbstractPeer#received(Channel, Object)
  3. —> MultiMessageHandler#received(Channel, Object)
  4. —> HeartbeatHandler#received(Channel, Object)
  5. —> AllChannelHandler#received(Channel, Object)
  6. —> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑

在不同的子线程里进行实际的服务调用,整个调用栈如下

  1. ChannelEventRunnable#run()
  2. —> DecodeHandler#received(Channel, Object)
  3. —> HeaderExchangeHandler#received(Channel, Object)
  4. —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
  5. —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
  6. —> Filter#invoke(Invoker, Invocation)
  7. —> AbstractProxyInvoker#invoke(Invocation)
  8. —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
  9. —> DemoServiceImpl#sayHello(String)

5 服务提供方返回调用结果(response)
服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。

调用栈

  1. ExchangeCodec#(Channel channel, ChannelBuffer buffer, Object msg)
  2. ->ExchangeCodec#encodeResponse(Channel channel, ChannelBuffer buffer, Response res)
  3. ->DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data, String version)

6 服务消费方接收调用结果
服务消费方在收到响应数据后,首先要做的事情是对响应数据进行解码,得到 Response 对象。然后再将该对象传递给下一个入站处理器,这个入站处理器就是 NettyHandler。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。

响应数据解码-调用栈

  1. DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)
  2. ->DecodeableRpcResult#DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
  3. ->DecodeableRpcResult#decode()
  4. ->DecodeableRpcResult#decode(Channel channel, InputStream input)

解码接收到的数据的线程与rpc请求最初发起的线程必定不是同一个线程,所以最后要解决的问题就是如何将调用结果传递给用户线程。dubbo设计了一个类似于 java.util.concurrent.Future的ResponseFuture(具体实现类为DefaultFuture)。使用ReentrantLock进行线程通讯。在用户线程发起请求时会调用condition.awit()对用户线程进行阻塞,在接收到响应结果后反序列化并塞入DefaultFuture的response字段,此时调用condition.signal()唤醒用户线程,用户线程便可拿到序列化后的结果。更具体的实现推荐直接阅读源码,这里只给出调用栈。

序列化结果传递-调用栈

  1. HeaderExchangeHandler#received(Channel channel, Object message)
  2. ->HeaderExchangeHandler#handleResponse(Channel channel, Response response)
  3. ->DefaultFuture#received(Channel channel, Response response)
  4. ->DefaultFuture#doReceived(Response res)

一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,213人围观)

还没有评论,来说两句吧...

相关阅读

    相关 dubbo框架调用关系简单理解

    前言:因为项目有用的dubbo框架,所以去官方文档了解了一下,对于整体架构有了点浅显的理解,记录一下 个人理解(看图理解): 把服务提供者当作一个马戏团,把注册中心当作