NacosWatch不支持监听订阅服务节点变化发布事件

小鱼儿 2022-11-22 05:14 224阅读 0赞

总结,基于NacosWatch不支持监听订阅服务节点变化发布事件,可以把0.9.0之前的版本的侦听逻辑用于开发的项止平台中,来判断是否有新的服务。

1.5.0之后的版本是基于每30秒侦听一次的原理。

=============

问题

spring-cloud-starter-alibaba-nacos-discovery从1.5.0.RELEASE开始对NacosWatch的改动,NacosWatch不再支持自动去监听订阅服务节点变化发布事件,重点是不再自动去处理。

引入包的对比

0.9.0之前的版本

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  4. <version>0.9.0.RELEASE</version>
  5. </dependency>

1.5.0之后的版本

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  4. <version>1.5.0.RELEASE</version>
  5. </dependency>

注意:groupId变成了com.alibaba.cloud,也就是的类的包变成了com.alibaba.cloud,不再是org.springframework.cloud了

NacosWatch监听方法的变化

这里我们重点只关注nacosServicesWatch()方法,这个方法是由start()方法以定时任务的方式定时调用。

0.9.0之前的版本源码

  1. @Override
  2. public void start() {
  3. if (this.running.compareAndSet(false, true)) {
  4. // 定时任务:默认30秒调用一次nacosServicesWatch()方法
  5. this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());
  6. }
  7. }
  8. public void nacosServicesWatch() {
  9. try {
  10. boolean changed = false;
  11. NamingService namingService = properties.namingServiceInstance();
  12. // 根据默认分组DEFAULT_GROUP,获取注册中心所有的服务信息
  13. ListView<String> listView = properties.namingServiceInstance().getServicesOfServer(1, Integer.MAX_VALUE);
  14. // 所有服务名称
  15. List<String> serviceList = listView.getData();
  16. // if there are new services found, publish event
  17. Set<String> currentServices = new HashSet<>(serviceList);
  18. // 移除缓存的服务名
  19. currentServices.removeAll(cacheServices);
  20. if (currentServices.size() > 0) {
  21. changed = true;
  22. }
  23. // if some services disappear, publish event
  24. if (cacheServices.removeAll(new HashSet<>(serviceList)) && cacheServices.size() > 0) {
  25. changed = true;
  26. for (String serviceName : cacheServices) {
  27. // 移除服务监听
  28. namingService.unsubscribe(serviceName, subscribeListeners.get(serviceName));
  29. subscribeListeners.remove(serviceName);
  30. }
  31. }
  32. // 更新缓存服务名
  33. cacheServices = new HashSet<>(serviceList);
  34. // subscribe services's node change, publish event if nodes changed
  35. for (String serviceName : cacheServices) {
  36. if (!subscribeListeners.containsKey(serviceName)) {
  37. // 服务不在监听列表,发布心跳事件
  38. EventListener eventListener = event -> NacosWatch.this.publisher.publishEvent(new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement()));
  39. // 服务监听放入列表
  40. subscribeListeners.put(serviceName, eventListener);
  41. // 订阅监听实例,最后由EventDispatcher添加监听
  42. namingService.subscribe(serviceName, eventListener);
  43. }
  44. }
  45. if (changed) {
  46. // 有改变,发布心跳事件
  47. this.publisher.publishEvent(new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
  48. }
  49. }
  50. catch (Exception e) {
  51. log.error("Error watching Nacos Service change", e);
  52. }
  53. }

1.5.0之后的版本源码

  1. @Override
  2. public void start() {
  3. if (this.running.compareAndSet(false, true)) {
  4. EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
  5. event -> new EventListener() {
  6. @Override
  7. public void onEvent(Event event) {
  8. if (event instanceof NamingEvent) {
  9. List<Instance> instances = ((NamingEvent) event).getInstances();
  10. Optional<Instance> instanceOptional = selectCurrentInstance(instances);
  11. instanceOptional.ifPresent(currentInstance -> {resetIfNeeded(currentInstance);});
  12. }
  13. }
  14. });
  15. NamingService namingService = nacosServiceManager.getNamingService(properties.getNacosProperties());
  16. try {
  17. namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener);
  18. }
  19. catch (Exception e) {
  20. log.error("namingService subscribe failed, properties:{}", properties, e);
  21. }
  22. // 定时任务:默认30秒调用一次nacosServicesWatch()方法
  23. this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());
  24. }
  25. }
  26. public void nacosServicesWatch() {
  27. // nacos doesn't support watch now , publish an event every 30 seconds.
  28. this.publisher.publishEvent(new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
  29. }

对比发现,新版本中的nacosServicesWatch()方法只做了一件事,就是发布心跳事件,去掉了旧版本中大堆服务获取、改变、发布监听事件等等。改变原因不清楚,有兴趣可看看源码和github https://github.com/alibaba/spring-cloud-alibaba/tree/master/spring-cloud-alibaba-starters

根据心跳来监听服务节点变化

新版本中不再自动监听服务节点变化,我们只能自己来处理了,这里我通过监听心跳事件来定时自动监听服务变化。

  1. @EventListener(classes = HeartbeatEvent.class)
  2. public void listenNacosEvent(ApplicationEvent heartbeatEvent) {
  3. try {
  4. // 根据默认分组查询注册的所有服务名,nacosServiceDiscovery自动注入对象
  5. List<String> serviceNameList = nacosServiceDiscovery.getServices();
  6. if (CollectionUtils.isEmpty(serviceNameList)) {
  7. return;
  8. }
  9. /**
  10. * 获取到服务名,就可以根据自己需求来实现自己的功能
  11. * 如,可以缓存一份服务列表,根据需要时再去使用
  12. */
  13. // 循环所有服务获取服务信息
  14. serviceNameList.stream().forEach(serviceName -> {
  15. try {
  16. // 根据服务名获取服务实例
  17. List<ServiceInstance> serviceInstanceList = nacosServiceDiscovery.getInstances(serviceName);
  18. // TODO
  19. } catch (Exception e) {
  20. logger.error("获取服务实例异常!serviceName:{}, {}:{}", serviceName, e.getClass().getName(), e.getMessage());
  21. }
  22. });
  23. } catch (NacosException e) {
  24. logger.error("查询注册nacos服务列表异常!{}:{}", e.getClass().getName(), e.getMessage(), e);
  25. }
  26. }

总结

初次接触nacos,感觉功能很强大 ,换了新版本包发现有功能不生效了,就调试源码,对比发现代码变化不小,查看源码,提供了获取服务名、服务实例等方法,记录一下学习过程和解决问题办法,如有不正确的地方,欢迎指正。

发表评论

表情:
评论列表 (有 0 条评论,224人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring事件发布监听

    > 最近算是把spring整体的又过了一遍,发现很多东西虽然用的多,但是有些思想理解的不够透彻,在此记录下, > > 顺便感叹下,spring源码看了大部分,这才没过多久又忘