Nacos服务注册原理分析

向右看齐 2023-09-26 17:58 122阅读 0赞

在分布式服务中,原来的单体服务会被拆分成一个个微服务,服务注册实例到注册中心,服务消费者通过注册中心获取实例列表,直接请求调用服务。

format_png

服务是如何注册到注册中心,服务如果挂了,服务是如何检测?带着这些问题,我们从源码上对服务注册进行简单的源码分析。

版本 2.1.1

  • Nacos Server:2.1.1
  • spring-cloud-starter-alibaba:2.1.1.RELEASE
  • spring-boot:2.1.1.RELEASE

方便统一版本,客户端和服务端版本号都为2.1.1

客户端

启动nacos服务注册和发现需要添加maven依赖:

  1. <dependency>
  2. <groupId>com.alibaba.cloud</groupId>
  3. <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
  4. <version>${latest.version}</version>
  5. </dependency>
  6. 复制代码

根据maven依赖找到对应的spring.factories文件:

format_png 1

spring.factories文件里找到启动配置类信息,SpringBoot服务启动时会将这些配置类信息注入到bean容器中。

  1. org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  2. com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  3. com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  4. com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  5. com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  6. com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
  7. org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  8. com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
  9. 复制代码

服务注册的核心配置类为:NacosDiscoveryAutoConfiguration,该类配置三个bean对象:

  • NacosServiceRegistry
  • NacosRegistration
  • NacosAutoServiceRegistration

NacosAutoServiceRegistration

NacosAutoServiceRegistration继承了抽象类AbstractAutoServiceRegistrationAbstractAutoServiceRegistration抽象类又实现了ApplicationListener接口。

实现ApplicationListener接口的方法,会在Spring容器初始化完成之后调用onApplicationEvent方法:

  1. public void onApplicationEvent(WebServerInitializedEvent event) {
  2. bind(event);
  3. }
  4. 复制代码

调用bind方法:

  1. public void bind(WebServerInitializedEvent event) {
  2. ApplicationContext context = event.getApplicationContext();
  3. if (context instanceof ConfigurableWebServerApplicationContext) {
  4. if ("management".equals(((ConfigurableWebServerApplicationContext) context)
  5. .getServerNamespace())) {
  6. return;
  7. }
  8. }
  9. this.port.compareAndSet(0, event.getWebServer().getPort());
  10. // 调用 start 方法
  11. this.start();
  12. }
  13. 复制代码

调用了start方法:

  1. public void start() {
  2. if (!isEnabled()) {
  3. if (logger.isDebugEnabled()) {
  4. logger.debug("Discovery Lifecycle disabled. Not starting");
  5. }
  6. return;
  7. }
  8. if (!this.running.get()) {
  9. this.context.publishEvent(
  10. new InstancePreRegisteredEvent(this, getRegistration()));
  11. register();
  12. if (shouldRegisterManagement()) {
  13. registerManagement();
  14. }
  15. this.context.publishEvent(
  16. new InstanceRegisteredEvent<>(this, getConfiguration()));
  17. this.running.compareAndSet(false, true);
  18. }
  19. }
  20. 复制代码

调用了register方法,最终调用的是NacosServiceRegistry类的register方法。

NacosServiceRegistry

根据上文可知,服务器启动后调用NacosServiceRegistry类的register方法,该方法实现将实例注册到服务端

  1. public void register(Registration registration) {
  2. if (StringUtils.isEmpty(registration.getServiceId())) {
  3. log.warn("No service to register for nacos client...");
  4. return;
  5. }
  6. String serviceId = registration.getServiceId();
  7. String group = nacosDiscoveryProperties.getGroup();
  8. // 创建实例
  9. Instance instance = getNacosInstanceFromRegistration(registration);
  10. try {
  11. // 注册实例
  12. namingService.registerInstance(serviceId, group, instance);
  13. log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
  14. instance.getIp(), instance.getPort());
  15. }
  16. catch (Exception e) {
  17. log.error("nacos registry, {} register failed...{},", serviceId,
  18. registration.toString(), e);
  19. }
  20. }
  21. 复制代码

创建实例,然后通过namingService.registerInstance方法注册实例,然后查看registerInstance方法:

  1. @Override
  2. public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  3. if (instance.isEphemeral()) {
  4. // 封装心跳包
  5. BeatInfo beatInfo = new BeatInfo();
  6. beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
  7. beatInfo.setIp(instance.getIp());
  8. beatInfo.setPort(instance.getPort());
  9. beatInfo.setCluster(instance.getClusterName());
  10. beatInfo.setWeight(instance.getWeight());
  11. beatInfo.setMetadata(instance.getMetadata());
  12. beatInfo.setScheduled(false);
  13. long instanceInterval = instance.getInstanceHeartBeatInterval();
  14. beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
  15. // 发送心跳包
  16. beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
  17. }
  18. // 发送实例
  19. serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
  20. }
  21. 复制代码

registerInstance主要做两件事:

  • 发送心跳包

beatReactor.addBeatInfo使用定时服务,每隔5s向服务端发送一次心跳请求,通过http请求发送心跳信息,路径为/v1/ns/instance/beat

心跳请求定时任务使用线程池ScheduledThreadPoolExecutor.schedule(),而该方法只会调用一次,定时任务的实现是在每次请求任务只会再调用一次ScheduledThreadPoolExecutor.schedule(), 简单说就是nacos在发送心跳的时候,会调用schedule方法,在schedule要执行的任务中,如果正常发送完心跳,会再次调用schedule方法。

那为什么不直接调用周期执行的线程池ScheduledThreadPoolExecutor.scheduleAtFixedRate()?可能是由于发送心跳服务发生异常后,定时任务还会继续执行,但是周期执行的线程池遇到报错后也不会重复调用执行的任务。

线程任务BeatTaskrun方法,,每次执行会先判断isStopped,如果是false,说明心跳停止,就不会触发下次执行任务。如果使用定时任务scheduleAtFixedRate,即使心跳停止还会继续执行任务,造成资源不必要浪费。

  • 注册实例

registerService主要封装实例信息,比如ipportservicename,将这些信息通过http请求发送给服务端。路径为/v1/ns/instance

根据上面流程,查看以下的流程图:

format_png 2

服务端

服务端就是注册中心,服务注册到注册中心,在https://github.com/alibaba/nacos/releases/tag/2.1.1下载源码部署到本地,方便调式和查看

服务端主要接收两个信息:心跳包实例信息

心跳包

客户端向服务请求的路径为/v1/ns/instance/beat,对应的服务端为InstanceController类的beat方法:

  1. @PutMapping("/beat")
  2. @Secured(action = ActionTypes.WRITE)
  3. public ObjectNode beat(HttpServletRequest request) throws Exception {
  4. ObjectNode result = JacksonUtils.createEmptyJsonNode();
  5. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
  6. String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
  7. RsInfo clientBeat = null;
  8. // 判断是否有心跳,存在心跳就转成RsInfo
  9. if (StringUtils.isNotBlank(beat)) {
  10. clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
  11. }
  12. String clusterName = WebUtils
  13. .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
  14. String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
  15. int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
  16. if (clientBeat != null) {
  17. if (StringUtils.isNotBlank(clientBeat.getCluster())) {
  18. clusterName = clientBeat.getCluster();
  19. } else {
  20. // fix #2533
  21. clientBeat.setCluster(clusterName);
  22. }
  23. ip = clientBeat.getIp();
  24. port = clientBeat.getPort();
  25. }
  26. String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
  27. String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
  28. NamingUtils.checkServiceNameFormat(serviceName);
  29. Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}, namespaceId: {}", clientBeat,
  30. serviceName, namespaceId);
  31. // 获取实例信息
  32. BeatInfoInstanceBuilder builder = BeatInfoInstanceBuilder.newBuilder();
  33. builder.setRequest(request);
  34. int resultCode = getInstanceOperator()
  35. .handleBeat(namespaceId, serviceName, ip, port, clusterName, clientBeat, builder);
  36. result.put(CommonParams.CODE, resultCode);
  37. // 下次发送心跳包间隔
  38. result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,
  39. getInstanceOperator().getHeartBeatInterval(namespaceId, serviceName, ip, port, clusterName));
  40. result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
  41. return result;
  42. }
  43. 复制代码

handleBeat方法中执行线程任务ClientBeatProcessorV2run方法,延长lastHeartBeatTime时间。注册中心会定时查询实例,当前时间 - lastHeartBeatTime > 设置时间(默认15秒),就标记实例为不健康实例。如果心跳实例不健康,发送通知给订阅方,变更实例。

服务端在15秒没有收到心跳包会将实例设置为不健康,在30秒没有收到心跳包会将临时实例移除掉。

实例注册

客户端请求的地址是/nacos/v1/ns/instance, 对应的是服务端是在InstanceController类。找到类上对应的post请求方法上。

注册流程:

InstanceController#register ——>InstanceOperatorClientImpl#registerInstance ——>ClientOperationServiceProxy#registerInstance ——>EphemeralClientOperationServiceImpl#registerInstance

创建 Service

服务注册后,将服务存储在一个双层map集合中:

  1. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
  2. 复制代码

通过是否存在ephemeral,true,走AP模式,否则走CP模式。

Nacos 默认就是采用的AP模式使用Distro协议实现。实现的接口是EphemeralConsistencyService对节点信息的持久化主要是调用put方法,

会先写入到DataStore中:

  1. public void onPut(String key, Record value) {
  2. if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
  3. Datum<Instances> datum = new Datum<>();
  4. datum.value = (Instances) value;
  5. datum.key = key;
  6. datum.timestamp.incrementAndGet();
  7. // 数据持久化到缓存中
  8. dataStore.put(key, datum);
  9. }
  10. if (!listeners.containsKey(key)) {
  11. return;
  12. }
  13. notifier.addTask(key, DataOperation.CHANGE);
  14. }
  15. 复制代码

总结

  • 从依赖上找到需要启动的是要加载的服务注册类NacosDiscoveryAutoConfiguration,主要配置三个对象

    • NacosServiceRegistry
    • NacosRegistration
    • NacosAutoServiceRegistration
  • NacosServiceRegistry类的register方法,封装实例和心跳信息

    • 通过http请求,定时发送发送心跳包,默认时间间隔是5秒。
    • 通过http请求,发送实例信息。
  • 服务端

    • 接收到心跳请求,更新心跳包最新时间。服务端在15秒没有收到心跳包会将实例设为不健康,在30秒没有收到心跳包会将临时实例移除掉。
    • 接收到服务注册接口,通过ephemeral判断是否走AP还是走CPAP模式使用Distro协议。通过调用EphemeralConsistencyService接口实现,持久化实例信息。

发表评论

表情:
评论列表 (有 0 条评论,122人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Nacos服务注册原理分析

    > 在分布式服务中,原来的单体服务会被拆分成一个个微服务,服务注册实例到注册中心,服务消费者通过注册中心获取实例列表,直接请求调用服务。 ![format_png][] 服