NacosWatch不支持监听订阅服务节点变化发布事件
总结,基于NacosWatch不支持监听订阅服务节点变化发布事件,可以把0.9.0之前的版本的侦听逻辑用于开发的项止平台中,来判断是否有新的服务。
1.5.0之后的版本是基于每30秒侦听一次的原理。
=============
问题
spring-cloud-starter-alibaba-nacos-discovery从1.5.0.RELEASE开始对NacosWatch的改动,NacosWatch不再支持自动去监听订阅服务节点变化发布事件,重点是不再自动去处理。
引入包的对比
0.9.0之前的版本
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>0.9.0.RELEASE</version>
</dependency>
1.5.0之后的版本
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>1.5.0.RELEASE</version>
</dependency>
注意:groupId变成了com.alibaba.cloud,也就是的类的包变成了com.alibaba.cloud,不再是org.springframework.cloud了
NacosWatch监听方法的变化
这里我们重点只关注nacosServicesWatch()方法,这个方法是由start()方法以定时任务的方式定时调用。
0.9.0之前的版本源码
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
// 定时任务:默认30秒调用一次nacosServicesWatch()方法
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
public void nacosServicesWatch() {
try {
boolean changed = false;
NamingService namingService = properties.namingServiceInstance();
// 根据默认分组DEFAULT_GROUP,获取注册中心所有的服务信息
ListView<String> listView = properties.namingServiceInstance().getServicesOfServer(1, Integer.MAX_VALUE);
// 所有服务名称
List<String> serviceList = listView.getData();
// if there are new services found, publish event
Set<String> currentServices = new HashSet<>(serviceList);
// 移除缓存的服务名
currentServices.removeAll(cacheServices);
if (currentServices.size() > 0) {
changed = true;
}
// if some services disappear, publish event
if (cacheServices.removeAll(new HashSet<>(serviceList)) && cacheServices.size() > 0) {
changed = true;
for (String serviceName : cacheServices) {
// 移除服务监听
namingService.unsubscribe(serviceName, subscribeListeners.get(serviceName));
subscribeListeners.remove(serviceName);
}
}
// 更新缓存服务名
cacheServices = new HashSet<>(serviceList);
// subscribe services's node change, publish event if nodes changed
for (String serviceName : cacheServices) {
if (!subscribeListeners.containsKey(serviceName)) {
// 服务不在监听列表,发布心跳事件
EventListener eventListener = event -> NacosWatch.this.publisher.publishEvent(new HeartbeatEvent(NacosWatch.this, nacosWatchIndex.getAndIncrement()));
// 服务监听放入列表
subscribeListeners.put(serviceName, eventListener);
// 订阅监听实例,最后由EventDispatcher添加监听
namingService.subscribe(serviceName, eventListener);
}
}
if (changed) {
// 有改变,发布心跳事件
this.publisher.publishEvent(new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
}
catch (Exception e) {
log.error("Error watching Nacos Service change", e);
}
}
1.5.0之后的版本源码
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
Optional<Instance> instanceOptional = selectCurrentInstance(instances);
instanceOptional.ifPresent(currentInstance -> {resetIfNeeded(currentInstance);});
}
}
});
NamingService namingService = nacosServiceManager.getNamingService(properties.getNacosProperties());
try {
namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
// 定时任务:默认30秒调用一次nacosServicesWatch()方法
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
对比发现,新版本中的nacosServicesWatch()方法只做了一件事,就是发布心跳事件,去掉了旧版本中大堆服务获取、改变、发布监听事件等等。改变原因不清楚,有兴趣可看看源码和github https://github.com/alibaba/spring-cloud-alibaba/tree/master/spring-cloud-alibaba-starters
根据心跳来监听服务节点变化
新版本中不再自动监听服务节点变化,我们只能自己来处理了,这里我通过监听心跳事件来定时自动监听服务变化。
@EventListener(classes = HeartbeatEvent.class)
public void listenNacosEvent(ApplicationEvent heartbeatEvent) {
try {
// 根据默认分组查询注册的所有服务名,nacosServiceDiscovery自动注入对象
List<String> serviceNameList = nacosServiceDiscovery.getServices();
if (CollectionUtils.isEmpty(serviceNameList)) {
return;
}
/**
* 获取到服务名,就可以根据自己需求来实现自己的功能
* 如,可以缓存一份服务列表,根据需要时再去使用
*/
// 循环所有服务获取服务信息
serviceNameList.stream().forEach(serviceName -> {
try {
// 根据服务名获取服务实例
List<ServiceInstance> serviceInstanceList = nacosServiceDiscovery.getInstances(serviceName);
// TODO
} catch (Exception e) {
logger.error("获取服务实例异常!serviceName:{}, {}:{}", serviceName, e.getClass().getName(), e.getMessage());
}
});
} catch (NacosException e) {
logger.error("查询注册nacos服务列表异常!{}:{}", e.getClass().getName(), e.getMessage(), e);
}
}
总结
初次接触nacos,感觉功能很强大 ,换了新版本包发现有功能不生效了,就调试源码,对比发现代码变化不小,查看源码,提供了获取服务名、服务实例等方法,记录一下学习过程和解决问题办法,如有不正确的地方,欢迎指正。
还没有评论,来说两句吧...