代码技巧——dubbo泛化调用

约定不等于承诺〃 2023-09-28 22:01 147阅读 0赞

现在有一种业务场景:

(1)业务方B调用任务系统A,提交一个任务,由A保证任务尽可能的被执行成功(系统A自带幂等、重试),在A完成任务的执行后将执行情况同步回调通知B应用;

(2)因此,B应用在提交任务时需要把自己的回调地址作为参数传递给A,这个参数被定义为dubbo接口的全路径+方法名+分组+版本号;

(3)系统A不可能提前知晓这个dubbo接口并注册对应的consumer,因此只能通过dubbo的「泛化调用」方式调用业务方B的dubbo接口,即Dubbo的调用方,在不引入服务接口类的情况下,远程调用其他Dubbo服务;

“泛化接口调用方式主要用于客户端没有API接口及模型类元的情况,参数及返回值中的所有POJO均用Map表示,通常用于框架集成,比如:实现一个通用的服务测试框架,可通过GenericService调用所有服务实现。”——使用泛化调用 | Apache Dubbo

下面给出代码示例;

(1)核心方法

  1. import com.alibaba.fastjson.JSON;
  2. import com.google.common.collect.Maps;
  3. import com.internet.bocfg.audit.client.model.AuditFlowCallbackRequest;
  4. import com.internet.bocfg.audit.client.model.AuditFlowCallbackResponse;
  5. import com.internet.bocfg.audit.constant.AuditConstants;
  6. import com.internet.bocfg.audit.exception.AuditBusinessException;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.commons.lang3.StringUtils;
  9. import org.apache.dubbo.config.ApplicationConfig;
  10. import org.apache.dubbo.config.ReferenceConfig;
  11. import org.apache.dubbo.config.RegistryConfig;
  12. import org.apache.dubbo.config.utils.ReferenceConfigCache;
  13. import org.apache.dubbo.rpc.service.GenericService;
  14. import org.springframework.stereotype.Service;
  15. import javax.annotation.Resource;
  16. import java.util.Map;
  17. /**
  18. * @author Akira
  19. * @description dubbo泛化调用服务
  20. * @date 2022/8/28
  21. */
  22. @Slf4j
  23. @Service
  24. public class DubboGenericService {
  25. /**
  26. * 缓存已经注册过的GenericService
  27. */
  28. private static final Map<String, GenericService> stubCache = Maps.newConcurrentMap();
  29. private final ApplicationConfig applicationConfig = new ApplicationConfig("bocfg-audit");
  30. @Resource(name = "myRegistryConfig")
  31. private RegistryConfig registryConfig;
  32. /**
  33. * (泛化)远程调用
  34. *
  35. * @param serviceName 接口路径
  36. * @param methodName 方法名
  37. * @param group 分组
  38. * @param version 版本
  39. * @param request 自定义的dubbo接口参数
  40. */
  41. private void remoteInvoke(String serviceName, String methodName, String group, String version,
  42. AuditFlowCallbackRequest request) {
  43. try {
  44. // 先尝试从缓存中拿GenericService 设置缓存的原因:ReferenceConfig实例很重,封装了与注册中心的连接以及与provider的连接,需要缓存,否则重复生成ReferenceConfig可能造成性能问题并且会有内存和连接泄漏
  45. String stubKey = stubKey(serviceName, methodName, version);
  46. GenericService genericService = stubCache.get(stubKey);
  47. // 未取到则准备初始化当前GenericService
  48. if (genericService == null) {
  49. ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
  50. // 连接相关的配置,包括:zk地址、应用名、协议等
  51. reference.setApplication(applicationConfig);
  52. reference.setRegistry(registryConfig);
  53. // 调用相关的配置,包括:声明为泛化接口、不检查状态、负载均衡方式、超时时间等
  54. reference.setGeneric(true);
  55. reference.setLoadbalance("roundrobin");
  56. reference.setCheck(false);
  57. reference.setTimeout(AuditConstants.AUDIT_CALLBACK_DEFALUT_TIMEOUT);
  58. // 调用目标provider相关的配置,包括:接口、group、version、
  59. reference.setInterface(serviceName);
  60. reference.setGroup(StringUtils.isBlank(group) ? AuditConstants.AUDIT_CALLBACK_DEFALUT_GROUP : group);
  61. reference.setVersion(StringUtils.isBlank(version) ? AuditConstants.AUDIT_CALLBACK_DEFALUT_VERSION : version);
  62. // 这里优先使用dubbo内置的简单缓存工具类进行缓存,若没有则放入自己定义的缓存stubCache中
  63. ReferenceConfigCache cache = ReferenceConfigCache.getCache();
  64. genericService = cache.get(reference);
  65. if (genericService != null) {
  66. stubCache.putIfAbsent(stubKey, genericService);
  67. }
  68. genericService = stubCache.get(stubKey);
  69. }
  70. // 至此,拿到了dubbo接口的genericService
  71. if (genericService == null) {
  72. throw new IllegalStateException("No provider available: " + stubKey);
  73. }
  74. // 泛化调用的参数:方法名、方法参数类型全路径、方法参数
  75. String[] parameterTypes = new String[]{AuditConstants.AUDIT_CALLBACK_REQUEST_CLASSNAME};
  76. Object[] args = new Object[]{request};
  77. Object result = genericService.$invoke(methodName, parameterTypes, args);
  78. if (result != null) {
  79. // 泛化调用返参为Object类型,这里做一个参数转换
  80. AuditFlowCallbackResponse callbackResponse = JSON.parseObject(JSON.toJSONString(result), AuditFlowCallbackResponse.class);
  81. if (!callbackResponse.isSuccess()) {
  82. log.warn("callbackResponse fail. serviceName={} methodName={} version={} request={} return={}", serviceName, methodName, version, JSON.toJSONString(request), JSON.toJSONString(callbackResponse));
  83. throw new AuditBusinessException("auditCallback fail:" + callbackResponse.getMsg());
  84. }
  85. }
  86. } catch (Throwable e) {
  87. log.error("dubbo remoteInvoke fail. serviceName={} methodName={} version={} auditFlowCallbackRequest={}", serviceName, methodName, version, JSON.toJSONString(request));
  88. throw e;
  89. }
  90. }
  91. /**
  92. * 拼接dubbo的GenericService缓存stubCache(map)中的存根key
  93. */
  94. private String stubKey(String serviceName, String method, String version) {
  95. return serviceName + "$" + method + "$" +
  96. (StringUtils.isBlank(version) ? AuditConstants.AUDIT_CALLBACK_DEFALUT_VERSION : version);
  97. }
  98. }

(2)RegistryConfig配置

  1. import com.internet.vivocfg.client.ConfigManager;
  2. import org.apache.dubbo.config.RegistryConfig;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class PropertyConfig {
  7. @Bean("myRegistryConfig")
  8. public RegistryConfig imExchangeConfig() {
  9. RegistryConfig registryConfig = null;
  10. registryConfig = new RegistryConfig(ConfigManager.get("dubbo.zk.registry.address"));
  11. registryConfig.setProtocol("zookeeper");
  12. registryConfig.setClient("curator");
  13. return registryConfig;
  14. }
  15. }

参考:

使用泛化调用 | Apache Dubbo

dubbo实战之“泛化调用”探索 - 博客园

发表评论

表情:
评论列表 (有 0 条评论,147人围观)

还没有评论,来说两句吧...

相关阅读

    相关 dubbo调用

    dubbo泛化调用 一、前言 > 泛接口调用方式主要用于客户端没有API接口及模型类元的情况,参数及返回值中的所有POJO均用Map表示,通常用于框架集成,比如:实