「DUBBO系列」服务超时机制源码分析

灰太狼 2021-09-23 05:02 405阅读 0赞

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,欢迎大家加我微信「java_front」一起交流学习

1 文章概述

DUBBO有很多地方可以配置超时时间,可以配置在消费者,可以配置在生产者,可以配置为方法级别,可以配置为接口级别,还可以配置为全局级别,DUBBO官方文档介绍这些配置优先级如下:

  1. 第一优先级:方法级 > 接口级 > 全局级
  2. 第二优先级:消费者 > 生产者

本文从源码层面对超时机制进行分析,我们首先分析优先级如何生效,然后再分析超时机制在消费者和生产者分别如何实现。

2 配置优先级

2.1 消费者 > 生产者

配置生产者接口级别超时时间888毫秒

  1. <beans>
  2. <dubbo:registry address="zookeeper://127.0.0.1:2181" />
  3. <dubbo:protocol name="dubbo" port="20880" />
  4. <dubbo:service timeout="888" interface="com.itxpz.dubbo.demo.provider.HelloService" ref="helloService" />
  5. </beans>

配置消费者接口级别超时时间999毫秒

  1. <beans>
  2. <dubbo:application name="xpz-consumer" />
  3. <dubbo:registry address="zookeeper://127.0.0.1:2181" />
  4. <dubbo:reference timeout="999" id="helloService" interface="com.itxpz.dubbo.demo.provider.HelloService" />
  5. </beans>

生产者首先注册服务信息至注册中心,消费者从注册中心订阅服务信息,在获取到生产者服务信息后,会将这些配置与消费者配置进行融合,核心在消费者订阅信息后会将服务信息转化为Invokers这一段代码

  1. public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
  2. private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
  3. Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
  4. if (urls == null || urls.isEmpty()) {
  5. return newUrlInvokerMap;
  6. }
  7. for (URL providerUrl : urls) {
  8. // providerUrl是从注册中心订阅的生产者配置
  9. // providerUrl=dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-provider&dubbo=2.0.2&generic=false&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=16736&release=2.7.0&side=provider&timeout=888
  10. // mergeUrl方法进行多维度参数融合
  11. // 本文只分析消费者和生产者参数融合
  12. URL url = mergeUrl(providerUrl);
  13. }
  14. }
  15. }

分析消费者和生产者参数融合代码

  1. public class ClusterUtils {
  2. public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
  3. // 消费者参数localMap = {side=consumer, register.ip=x.x.x.x, methods=sayHello, release=2.7.0, qos.port=55555, dubbo=2.0.2, pid=16904, interface=com.itxpz.dubbo.demo.provider.HelloService, qos.enable=true, timeout=999, application=xpz-consumer, qos.accept.foreign.ip=false, timestamp=123}
  4. // 生产者参数remoteMap = {side=provider, methods=sayHello, release=2.7.0, dubbo=2.0.2, pid=16736, interface=com.itxpz.dubbo.demo.provider.HelloService, generic=false, timeout=888, application=xpz-provider, anyhost=true, timestamp=123}
  5. Map<String, String> remoteMap = remoteUrl.getParameters();
  6. Map<String, String> map = new HashMap<String, String>();
  7. // 消费者配置不为空则全部赋值至结果对象
  8. if (localMap != null && localMap.size() > 0) {
  9. String remoteGroup = map.get(Constants.GROUP_KEY);
  10. map.put(Constants.GROUP_KEY, remoteGroup);
  11. map.putAll(localMap);
  12. }
  13. // 生产者配置不为空则设置一些信息
  14. if (remoteMap != null && remoteMap.size() > 0) {
  15. // 省略代码
  16. }
  17. // 超时时间已经从888毫秒变为999毫秒
  18. // dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer&dubbo=2.0.2&generic=false&group=&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=16284&qos.accept.foreign.ip=false&qos.enable=true&qos.port=55555&release=2.7.0&remote.application=xpz-provider&side=consumer&timeout=999
  19. URL result = remoteUrl.clearParameters().addParameters(map);
  20. return result;
  21. }
  22. }

2.2 方法级 > 接口级

配置消费者接口级别超时时间999毫秒

  1. <beans>
  2. <dubbo:application name="xpz-consumer" />
  3. <dubbo:registry address="zookeeper://127.0.0.1:2181" />
  4. <dubbo:reference timeout="999" id="helloService" interface="com.itxpz.dubbo.demo.provider.HelloService" />
  5. </beans>

配置生产者方法级别超时时间1111毫秒

  1. <beans>
  2. <dubbo:registry address="zookeeper://127.0.0.1:2181" />
  3. <dubbo:protocol name="dubbo" port="20880" />
  4. <dubbo:service interface="com.itxpz.dubbo.demo.provider.HelloService" ref="helloService">
  5. <dubbo:method name="sayHello" timeout="1111" />
  6. </dubbo:service>
  7. </beans>

首先观察经过参数融合后URL

  1. public class ClusterUtils {
  2. public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
  3. // 消费者参数localMap = {side=consumer, register.ip=x.x.x.x, methods=sayHello, release=2.7.0, qos.port=55555, dubbo=2.0.2, pid=15436, interface=com.itxpz.dubbo.demo.provider.HelloService, qos.enable=true, timeout=999, application=xpz-consumer, qos.accept.foreign.ip=false, timestamp=123}
  4. // 生产者参数remoteMap = {side=provider, methods=sayHello, release=2.7.0 dubbo=2.0.2, pid=16260,interface = com.itxpz.dubbo.demo.provider.HelloService, sayHello.timeout = 1111, generic = false, application = xpz - provider, anyhost = true, timestamp = 123}
  5. Map<String, String> remoteMap = remoteUrl.getParameters();
  6. Map<String, String> map = new HashMap<String, String>();
  7. // 消费者配置不为空则全部赋值至结果对象
  8. if (localMap != null && localMap.size() > 0) {
  9. String remoteGroup = map.get(Constants.GROUP_KEY);
  10. map.put(Constants.GROUP_KEY, remoteGroup);
  11. map.putAll(localMap);
  12. }
  13. // 生产者配置不为空则设置一些信息
  14. if (remoteMap != null && remoteMap.size() > 0) {
  15. // 省略代码
  16. }
  17. // 我们看到两个配置sayHello.timeout=1111、timeout=999
  18. // dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer&dubbo=2.0.2&generic=false&group=&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=5456&qos.accept.foreign.ip=false&qos.enable=true&qos.port=55555&release=2.7.0&remote.application=xpz-provider&sayHello.timeout=1111&side=consumer&timeout=999
  19. URL result = remoteUrl.clearParameters().addParameters(map);
  20. return result;
  21. }
  22. }

我们看到timeout有两个配置,这两个配置优先级在消费者发起远程调用时体现

  1. public class DubboInvoker<T> extends AbstractInvoker<T> {
  2. @Override
  3. protected Result doInvoke(final Invocation invocation) throws Throwable {
  4. RpcInvocation inv = (RpcInvocation) invocation;
  5. final String methodName = RpcUtils.getMethodName(invocation);
  6. inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
  7. inv.setAttachment(Constants.VERSION_KEY, version);
  8. ExchangeClient currentClient;
  9. if (clients.length == 1) {
  10. currentClient = clients[0];
  11. } else {
  12. currentClient = clients[index.getAndIncrement() % clients.length];
  13. }
  14. try {
  15. boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
  16. boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
  17. boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
  18. // 获取超时时间方法体现优先级
  19. // getUrl() = dubbo://x.x.x.x:20880/com.itxpz.dubbo.demo.provider.HelloService?anyhost=true&application=xpz-consumer&dubbo=2.0.2&generic=false&group=&interface=com.itxpz.dubbo.demo.provider.HelloService&methods=sayHello&pid=5456&qos.accept.foreign.ip=false&qos.enable=true&qos.port=55555&release=2.7.0&remote.application=xpz-provider&sayHello.timeout=1111&side=consumer&timeout=999
  20. int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
  21. if (isOneway) {
  22. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
  23. currentClient.send(inv, isSent);
  24. RpcContext.getContext().setFuture(null);
  25. return new RpcResult();
  26. } else if (isAsync) {
  27. ResponseFuture future = currentClient.request(inv, timeout);
  28. FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
  29. RpcContext.getContext().setFuture(futureAdapter);
  30. Result result;
  31. if (isAsyncFuture) {
  32. result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
  33. } else {
  34. result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
  35. }
  36. return result;
  37. } else {
  38. RpcContext.getContext().setFuture(null);
  39. // currentClient.request方法发起远程调用
  40. // get方法进行超时判断
  41. return (Result) currentClient.request(inv, timeout).get();
  42. }
  43. } catch (TimeoutException e) {
  44. throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  45. } catch (RemotingException e) {
  46. throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  47. }
  48. }
  49. }
  50. public int getMethodParameter(String method, String key, int defaultValue) {
  51. // 获取sayHello.timeout属性不为空则直接返回
  52. // sayHello.timeout正是由方法级别生成优先级最高
  53. String methodKey = method + "." + key;
  54. Number n = getNumbers().get(methodKey);
  55. if (n != null) {
  56. return n.intValue();
  57. }
  58. // 获取timeout属性如果为空则返回默认值
  59. String value = getMethodParameter(method, key);
  60. if (StringUtils.isEmpty(value)) {
  61. return defaultValue;
  62. }
  63. int i = Integer.parseInt(value);
  64. getNumbers().put(methodKey, i);
  65. return i;
  66. }

3 消费者超时机制

  1. public class DubboInvoker<T> extends AbstractInvoker<T> {
  2. @Override
  3. protected Result doInvoke(final Invocation invocation) throws Throwable {
  4. try {
  5. // get方法进行超时判断
  6. // currentClient.request方法发起远程调用
  7. return (Result) currentClient.request(inv, timeout).get();
  8. } catch (TimeoutException e) {
  9. throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  10. } catch (RemotingException e) {
  11. throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
  12. }
  13. }
  14. }

DefaultFuture尝试接收响应结果,如果阻塞达到超时时间响应结果还是为空,那么消费者会抛出超时异常

  1. public class DefaultFuture implements ResponseFuture {
  2. @Override
  3. public Object get(int timeout) throws RemotingException {
  4. if (timeout <= 0) {
  5. timeout = Constants.DEFAULT_TIMEOUT;
  6. }
  7. // 如果response对象为空
  8. if (!isDone()) {
  9. long start = System.currentTimeMillis();
  10. lock.lock();
  11. try {
  12. while (!isDone()) {
  13. // 放弃锁并使当前线程等待,直到发出信号或中断它,或者达到超时时间
  14. done.await(timeout, TimeUnit.MILLISECONDS);
  15. if (isDone()) {
  16. break;
  17. }
  18. if(System.currentTimeMillis() - start > timeout) {
  19. break;
  20. }
  21. }
  22. } catch (InterruptedException e) {
  23. throw new RuntimeException(e);
  24. } finally {
  25. lock.unlock();
  26. }
  27. // 如果response对象仍然为空则抛出超时异常
  28. if (!isDone()) {
  29. throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
  30. }
  31. }
  32. return returnFromResponse();
  33. }
  34. @Override
  35. public boolean isDone() {
  36. return response != null;
  37. }
  38. private void doReceived(Response res) {
  39. lock.lock();
  40. try {
  41. // 接收到服务器响应赋值response
  42. response = res;
  43. if (done != null) {
  44. // 唤醒get方法中处于等待的代码块
  45. done.signal();
  46. }
  47. } finally {
  48. lock.unlock();
  49. }
  50. if (callback != null) {
  51. invokeCallback(callback);
  52. }
  53. }
  54. }

4 生产者超时机制

生产者超时机制体现在TimeoutFilter过滤器,需要注意生产者超时只记录一条日志,流程继续进行,不会抛出异常或者中断

  1. @Activate(group = Constants.PROVIDER)
  2. public class TimeoutFilter implements Filter {
  3. private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
  4. @Override
  5. public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  6. long start = System.currentTimeMillis();
  7. Result result = invoker.invoke(invocation);
  8. long elapsed = System.currentTimeMillis() - start;
  9. // 只读取生产者配置
  10. int timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE);
  11. // 如果超时只记录一条日志流程继续进行
  12. if (invoker.getUrl() != null && elapsed > timeout ) {
  13. if (logger.isWarnEnabled()) {
  14. logger.warn("invoke time out method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
  15. }
  16. }
  17. return result;
  18. }
  19. }

5 合理设置超时时间

我们设想这样一种场景:业务系统调用订单中心服务查询订单信息,由于业务系统没有合理设置超时时间,用户长时间得不到响应会反复查询订单信息,所以无论上游系统还是下游系统都可能因为流量激增导致系统崩溃,这就是系统雪崩。

消费者需要了解生产者服务大概率响应时间,设置消费者超时时间略长于大概率响应时间。如果无需同步响应可以采用Failback集群容错策略或者异步调用。消费者和生产者都需要做好限流、降级、熔断策略保护系统,防止出现系统雪崩这类严重问题。

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,欢迎大家加我微信「java_front」一起交流学习

发表评论

表情:
评论列表 (有 0 条评论,405人围观)

还没有评论,来说两句吧...

相关阅读