Dubbo中Netty使用形式源码解读

秒速五厘米 2022-05-06 10:44 266阅读 0赞

先从Provider的角度讲,后从Consumer的角度讲,最后合起来讲共同和不同点。

每一个Dubbo接口作为Provider时,都会对外暴露Invoker,执行Export方法:

  1. public class DubboProtocol extends AbstractProtocol {
  2. public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  3. URL url = invoker.getUrl();
  4. // 创建 DubboExporter 对象,并添加到 `exporterMap` 。
  5. // export service.
  6. String key = serviceKey(url);
  7. DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  8. exporterMap.put(key, exporter);
  9. ......
  10. // 启动服务器
  11. openServer(url);
  12. // 初始化序列化优化器
  13. optimizeSerialization(url);
  14. return exporter;
  15. }
  16. /** * 启动通信服务器 * * @param url URL */
  17. private void openServer(URL url) {
  18. // find server, ip:port
  19. String key = url.getAddress();
  20. //client 也可以暴露一个只有server可以调用的服务。
  21. //client can export a service which's only for server to invoke
  22. boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); // isserver
  23. if (isServer) {
  24. ExchangeServer server = serverMap.get(key);
  25. if (server == null) {
  26. serverMap.put(key, createServer(url));
  27. } else {
  28. // server supports reset, use together with override
  29. server.reset(url);
  30. }
  31. }
  32. }
  33. }

在此期间需要实例化Server,多个不同的Provider,如果端口相同,那么会共用一个Server(NettyServer),也就是只实例化一次。

  1. public class NettyServer extends AbstractServer implements Server {
  2. @Override
  3. protected void doOpen() {
  4. // 设置日志工厂
  5. NettyHelper.setNettyLoggerFactory();
  6. // 实例化 ServerBootstrap
  7. bootstrap = new ServerBootstrap();
  8. // 创建线程组
  9. bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
  10. workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
  11. new DefaultThreadFactory("NettyServerWorker", true));
  12. // 创建 NettyServerHandler 对象
  13. final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
  14. // 设置 `channels` 属性
  15. channels = nettyServerHandler.getChannels();
  16. bootstrap
  17. // 设置它的线程组
  18. .group(bossGroup, workerGroup)
  19. // 设置 Channel类型
  20. .channel(NioServerSocketChannel.class) // Server
  21. // 设置可选项
  22. .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
  23. .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
  24. .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  25. // 设置Handler链路
  26. .childHandler(new ChannelInitializer<NioSocketChannel>() {
  27. @Override
  28. protected void initChannel(NioSocketChannel ch) {
  29. // 创建 NettyCodecAdapter 对象
  30. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
  31. ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
  32. .addLast("decoder", adapter.getDecoder()) // 解码
  33. .addLast("encoder", adapter.getEncoder()) // 编码
  34. .addLast("handler", nettyServerHandler); // 处理器
  35. }
  36. });
  37. // 服务器绑定端口监听
  38. // bind
  39. ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
  40. channelFuture.syncUninterruptibly();
  41. channel = channelFuture.channel();
  42. }
  43. }

Netty的Reactor模型和ChannelPipeline模型网上很多,这里就不列举了。

上述代码实例化Netty的Reactor模型相关的对象,默认使用NioSocketChannel(Selector)作为数据传输通道,按顺序依次添加日志,解码,编码,业务处理的Handler。

Provider的Server在接收到Consumer(Client)的请求数据后,先通过解码Handler进行以下处理:

  1. 首先通过Dubbo协议的数据格式来对TCP/IP数据进行拆包
  2. 将拆包得到的数据进行反序列化,比如Hession,Json,Thrift,ProtoBuf等,得到Java DTO
  3. 将DTO以及Invoker的相关信息封装为一个Task,投递到执行线程池中,最终调用Dubbo接口
  4. 将业务结果通过序列化工具序列化为可传输的字节数组,然后根据传输协议(Dubbo)组装成相应的数据格式,写入Channel

1.数据拆包及获取传输的Body:

Dubbo协议数据格式

  1. public class ExchangeCodec extends TelnetCodec {
  2. // header length.
  3. protected static final int HEADER_LENGTH = 16;
  4. // magic header.
  5. protected static final short MAGIC = (short) 0xdabb;
  6. protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
  7. protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
  8. public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
  9. // 读取 Header 数组,Header长度为16字节
  10. int readable = buffer.readableBytes();
  11. byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
  12. buffer.readBytes(header);
  13. // 解码
  14. return decode(channel, buffer, readable, header);
  15. }
  16. /** * @param channel * @param buffer * @param readable 可读取长度 * @param header 数据头部 * @return * @throws IOException */
  17. @Override
  18. protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
  19. // 非 Dubbo 协议,目前是 Telnet 命令。
  20. // 通过MAGIC判断请求数据是否为Dubbo协议
  21. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
  22. // 将 buffer 完全复制到 `header` 数组中。因为,上面的 `#decode(channel, buffer)` 方法,可能未读全
  23. int length = header.length;
  24. if (header.length < readable) {
  25. header = Bytes.copyOf(header, readable);
  26. buffer.readBytes(header, length, readable - length);
  27. }
  28. // 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW ?
  29. for (int i = 1; i < header.length - 1; i++) {
  30. if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
  31. buffer.readerIndex(buffer.readerIndex() - header.length + i);
  32. header = Bytes.copyOf(header, i);
  33. break;
  34. }
  35. }
  36. // 提交给父类( Telnet ) 处理,目前是 Telnet 命令。
  37. return super.decode(channel, buffer, readable, header);
  38. }
  39. // Header 长度不够,返回需要更多的输入
  40. // check length.
  41. if (readable < HEADER_LENGTH) {
  42. return DecodeResult.NEED_MORE_INPUT;
  43. }
  44. // `[96 - 127]`:Body 的**长度**。通过该长度,读取 Body 。
  45. // get data length.
  46. int len = Bytes.bytes2int(header, 12);
  47. checkPayload(channel, len);
  48. // 总长度不够,返回需要更多的输入
  49. int tt = len + HEADER_LENGTH;
  50. if (readable < tt) {
  51. return DecodeResult.NEED_MORE_INPUT;
  52. }
  53. // 解析 Header + Body
  54. // limit input stream.
  55. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
  56. try {
  57. return decodeBody(channel, is, header);
  58. } finally {
  59. // skip 未读完的流,并打印错误日志
  60. if (is.available() > 0) {
  61. try {
  62. if (logger.isWarnEnabled()) {
  63. logger.warn("Skip input stream " + is.available());
  64. }
  65. StreamUtils.skipUnusedStream(is);
  66. } catch (IOException e) {
  67. logger.warn(e.getMessage(), e);
  68. }
  69. }
  70. }
  71. }
  72. }

每次请求的数据都严格按照指定协议组装,Header+Body。Header16个字节长度,通过Header的数据结构来判断Channel里读取的数据是不是一次请求的完整数据。如果不够,那么返回,等待此Channel的下次数据读取;如果够了,那么将ByteBufer里指定Body长度的数据提交给序列化器反序列化为Java DTO。

2.将Body反序列化为Java DTO:

  1. public class ExchangeCodec extends TelnetCodec {
  2. /** * 解析,返回 Request 或 Response * * @param channel 通道 * @param is 输出 * @param header Header * @return 结果 * @throws IOException 当发生 IO 异常时 */
  3. protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
  4. byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
  5. Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
  6. ObjectInput in = s.deserialize(channel.getUrl(), is);
  7. // Response
  8. // get request id.
  9. long id = Bytes.bytes2long(header, 4);
  10. if ((flag & FLAG_REQUEST) == 0) { // Response
  11. // decode response.
  12. Response res = new Response(id);
  13. if ((flag & FLAG_EVENT) != 0) {
  14. res.setEvent(Response.HEARTBEAT_EVENT);
  15. }
  16. // get status.
  17. byte status = header[3];
  18. res.setStatus(status);
  19. if (status == Response.OK) {
  20. try {
  21. Object data;
  22. if (res.isHeartbeat()) {
  23. data = decodeHeartbeatData(channel, in);
  24. } else if (res.isEvent()) {
  25. data = decodeEventData(channel, in);
  26. } else {
  27. data = decodeResponseData(channel, in, getRequestData(id)); // `#getRequestData(id)` 的调用,是多余的
  28. }
  29. res.setResult(data);
  30. } catch (Throwable t) {
  31. res.setStatus(Response.CLIENT_ERROR);
  32. res.setErrorMessage(StringUtils.toString(t));
  33. }
  34. } else {
  35. res.setErrorMessage(in.readUTF());
  36. }
  37. return res;
  38. // Request
  39. } else { // Request
  40. // decode request.
  41. Request req = new Request(id);
  42. req.setVersion("2.0.0");
  43. req.setTwoWay((flag & FLAG_TWOWAY) != 0);
  44. if ((flag & FLAG_EVENT) != 0) { // 心跳事件
  45. req.setEvent(Request.HEARTBEAT_EVENT);
  46. }
  47. try {
  48. Object data;
  49. if (req.isHeartbeat()) {
  50. data = decodeHeartbeatData(channel, in);
  51. } else if (req.isEvent()) {
  52. data = decodeEventData(channel, in);
  53. } else {
  54. data = decodeRequestData(channel, in);
  55. }
  56. req.setData(data);
  57. } catch (Throwable t) {
  58. // bad request
  59. req.setBroken(true);
  60. req.setData(t);
  61. }
  62. return req;
  63. }
  64. }
  65. }

根据Header里的第三个字节来判断当前数据是请求还是响应(因为Server和Client共用一套数据处理流程),实例化响应的Request,Response对象,将Header的第四个字节Status存入返回对象中,将第4-12个字节组成的RequestId存入返回对象,将Body数据通过反序列化得到的DTO存入返回对象。RequestId是一次请求的唯一标识,Client通过此标识来将Server返回的数据映射到之前发送的请求上。

3.对于Server,异步形式处理请求数据

  1. public class AllChannelHandler extends WrappedChannelHandler {
  2. public void received(Channel channel, Object message) throws RemotingException {
  3. ExecutorService cexecutor = getExecutorService();
  4. try {
  5. cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
  6. } catch (Throwable t) {
  7. ......
  8. }
  9. }
  10. }

4.调用Dubbo接口,将请求结果反序列化,然后按照协议格式组装数据写入Channel:

调用Dubbo接口的代码在 DubboProtocol#requestHandler里,代码很容易看明白,这里就不列举了
编码的Handler执行顺序在业务Handler写入数据之后,其会请求结果序列化,然后按照协议格式数据

  1. public class ExchangeCodec extends TelnetCodec {
  2. public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
  3. if (msg instanceof Request) { // 请求
  4. encodeRequest(channel, buffer, (Request) msg);
  5. } else if (msg instanceof Response) { // 响应
  6. encodeResponse(channel, buffer, (Response) msg);
  7. } else { // 提交给父类( Telnet ) 处理,目前是 Telnet 命令的结果。
  8. super.encode(channel, buffer, msg);
  9. }
  10. }
  11. /** * 编码响应 * * @param channel 通道 * @param buffer Buffer * @param res 响应 * @throws IOException 当发生 IO 异常时 */
  12. protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
  13. int savedWriteIndex = buffer.writerIndex();
  14. try {
  15. Serialization serialization = getSerialization(channel);
  16. // `[0, 15]`:Magic Number
  17. // 构建Header
  18. byte[] header = new byte[HEADER_LENGTH];
  19. // set magic number.
  20. Bytes.short2bytes(MAGIC, header);
  21. // `[16, 20]`:Serialization 编号 && `[23]`:响应。
  22. // set request and serialization flag.
  23. header[2] = serialization.getContentTypeId();
  24. // `[21]`:`event` 是否为事件。
  25. if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
  26. // `[24 - 31]`:`status` 状态。
  27. // set response status.
  28. byte status = res.getStatus();
  29. header[3] = status;
  30. // `[32 - 95]`:`id` 编号,Long 型。
  31. // set request id.
  32. Bytes.long2bytes(res.getId(), header, 4);
  33. // 编码 `Request.data` 到 Body ,并写入到 Buffer
  34. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
  35. ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // 序列化 Output
  36. ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
  37. // encode response data or error message.
  38. if (status == Response.OK) {
  39. if (res.isHeartbeat()) {
  40. encodeHeartbeatData(channel, out, res.getResult());
  41. } else {
  42. encodeResponseData(channel, out, res.getResult());
  43. }
  44. } else {
  45. out.writeUTF(res.getErrorMessage());
  46. }
  47. // 释放资源
  48. out.flushBuffer();
  49. if (out instanceof Cleanable) {
  50. ((Cleanable) out).cleanup();
  51. }
  52. bos.flush();
  53. bos.close();
  54. // 检查 Body 长度,是否超过消息上限。
  55. int len = bos.writtenBytes();
  56. checkPayload(channel, len);
  57. // `[96 - 127]`:Body 的**长度**。
  58. Bytes.int2bytes(len, header, 12);
  59. // 写入 Header 到 Buffer
  60. // write
  61. buffer.writerIndex(savedWriteIndex);
  62. buffer.writeBytes(header); // write header.
  63. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
  64. } catch (Throwable t) {
  65. ......
  66. }
  67. }
  68. }

在Provider端的Netty的作用流程基本就如上述所示,下面将Consumer端相对于Provider端不同的作用流程。

每个接口的Consumer在初始化时都会实例化一个NettyClient,也就是实例化一个Netty的Bootstrap。如果按照传统思路,每个Bootstrap关联一个EventLoopGroup,内部持有一个Selector,那么每个接口Consumer实例都会有一个Selector,这会造成资源的很大浪费,所以Dubbp定义了一个EventLoopGroup类型的常量,让所有Consumer在实例化时都用同一个EventLoopGroup,也就是一个Selector处理所有的Consumer的Channel:

  1. public class NettyClient extends AbstractClient {
  2. private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
  3. protected void doOpen() {
  4. // 设置日志工厂
  5. NettyHelper.setNettyLoggerFactory();
  6. // 创建 NettyClientHandler 对象
  7. final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
  8. // 实例化 ServerBootstrap
  9. bootstrap = new Bootstrap();
  10. bootstrap
  11. // 设置它的线程组
  12. .group(nioEventLoopGroup)
  13. // 设置可选项
  14. .option(ChannelOption.SO_KEEPALIVE, true)
  15. .option(ChannelOption.TCP_NODELAY, true)
  16. .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
  17. //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
  18. // 设置 Channel类型
  19. .channel(NioSocketChannel.class);
  20. // 设置连接超时时间
  21. if (getTimeout() < 3000) {
  22. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
  23. } else {
  24. bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
  25. }
  26. // 设置责Handler链路
  27. bootstrap.handler(new ChannelInitializer() {
  28. @Override
  29. protected void initChannel(Channel ch) {
  30. // 创建 NettyCodecAdapter 对象
  31. NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
  32. ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
  33. .addLast("decoder", adapter.getDecoder()) // 解码
  34. .addLast("encoder", adapter.getEncoder()) // 编码
  35. .addLast("handler", nettyClientHandler); // 处理器
  36. }
  37. });
  38. }
  39. }

Consumer相比于Provider还有一点不同:
Provider会使用线程池异步的处理Consumer发送的请求,处理完成后将相应数据写入Channel;
Consumer在大部分情况下都是发送同步请求,也就是会阻塞当前线程,直到Provider返回数据,或者阻塞超时。

  1. public class DubboInvoker<T> extends AbstractInvoker<T> {
  2. protected Result doInvoke(final Invocation invocation) {
  3. RpcInvocation inv = (RpcInvocation) invocation;
  4. // 获得方法名
  5. final String methodName = RpcUtils.getMethodName(invocation);
  6. // 获得 `path`( 服务名 ),`version`
  7. inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
  8. inv.setAttachment(Constants.VERSION_KEY, version);
  9. // 获得 ExchangeClient 对象
  10. ExchangeClient currentClient;
  11. if (clients.length == 1) {
  12. currentClient = clients[0];
  13. } else {
  14. currentClient = clients[index.getAndIncrement() % clients.length];
  15. }
  16. // 远程调用
  17. try {
  18. // 获得是否异步调用
  19. boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
  20. // 获得是否单向调用
  21. boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
  22. // 获得超时时间
  23. int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  24. // 单向调用
  25. if (isOneway) {
  26. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
  27. currentClient.send(inv, isSent);
  28. RpcContext.getContext().setFuture(null);
  29. return new RpcResult();
  30. // 异步调用
  31. } else if (isAsync) {
  32. ResponseFuture future = currentClient.request(inv, timeout);
  33. RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
  34. return new RpcResult();
  35. // 同步调用
  36. } else {
  37. RpcContext.getContext().setFuture(null);
  38. return (Result) currentClient.request(inv, timeout).get();
  39. }
  40. } catch (TimeoutException e) {
  41. throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  42. } catch (RemotingException e) {
  43. throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  44. }
  45. }
  46. }

如果是同步调用的情况下,会将调用相关信息存入一个DefaultFuture中,然后阻塞,等待Provider返回信息,依据requestId来唤醒当前线程:

  1. public class DefaultFuture implements ResponseFuture {
  2. /** * 通道集合 * * key:请求编号 */
  3. private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
  4. /** * Future 集合 * * key:请求编号 */
  5. private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
  6. public DefaultFuture(Channel channel, Request request, int timeout) {
  7. this.channel = channel;
  8. this.request = request;
  9. this.id = request.getId();
  10. this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  11. // put into waiting map.
  12. FUTURES.put(id, this);
  13. CHANNELS.put(id, channel);
  14. }
  15. /** * 阻塞式的等待结果 * * @param timeout * @return * @throws RemotingException */
  16. public Object get(int timeout) throws RemotingException {
  17. if (timeout <= 0) {
  18. timeout = Constants.DEFAULT_TIMEOUT;
  19. }
  20. // 若未完成,等待
  21. if (!isDone()) {
  22. long start = System.currentTimeMillis();
  23. lock.lock();
  24. try {
  25. // 等待完成或超时
  26. while (!isDone()) {
  27. done.await(timeout, TimeUnit.MILLISECONDS);
  28. if (isDone() || System.currentTimeMillis() - start > timeout) {
  29. break;
  30. }
  31. }
  32. } catch (InterruptedException e) {
  33. throw new RuntimeException(e);
  34. } finally {
  35. lock.unlock();
  36. }
  37. // 未完成,抛出超时异常 TimeoutException
  38. if (!isDone()) {
  39. throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
  40. }
  41. }
  42. // 返回响应
  43. return returnFromResponse();
  44. }
  45. }

Consumer解析处Provider传递回来的数据,每解析成功一个返回结果,如果此次请求时同步请求,就会通过Condition唤醒相应requestId的请求线程,此时同步请求才会有结果返回。

  1. /** * 完成 Condition */
  2. private final Condition done = lock.newCondition();
  3. /** * 接收响应( Response ) * * @param channel 通道 * @param response 响应 */
  4. public static void received(Channel channel, Response response) {
  5. try {
  6. // 移除 FUTURES
  7. DefaultFuture future = FUTURES.remove(response.getId());
  8. // 接收结果
  9. if (future != null) {
  10. future.doReceived(response);
  11. } else {
  12. logger.warn("The timeout response finally returned at "
  13. + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
  14. + ", response " + response
  15. + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
  16. + " -> " + channel.getRemoteAddress()));
  17. }
  18. // 移除 CHANNELS
  19. } finally {
  20. CHANNELS.remove(response.getId());
  21. }
  22. }
  23. private void doReceived(Response res) {
  24. // 锁定
  25. lock.lock();
  26. try {
  27. // 设置结果
  28. response = res;
  29. // 通知,唤醒等待
  30. if (done != null) {
  31. done.signal();
  32. }
  33. } finally {
  34. // 释放锁定
  35. lock.unlock();
  36. }
  37. // 调用回调
  38. if (callback != null) {
  39. invokeCallback(callback);
  40. }
  41. }

Consumer的拆包,序列化,协议格式数据组装都和Provider都是相同的,每次请求都会有一个唯一标识requestId来匹配请求和响应。请求在Provider端使用线程池异步执行,在Consumer端可同步,异步,Oneway形式,如果是同步则阻塞线程,等待指定requestId的响应返回来唤醒线程。

发表评论

表情:
评论列表 (有 0 条评论,266人围观)

还没有评论,来说两句吧...

相关阅读