Spring Cloud Consul 服务注册和发现实现

阳光穿透心脏的1/2处 2022-11-17 14:37 431阅读 0赞

Spring Cloud Consul 服务注册和发现实现

Spring Cloud Kubernetes 使用,可以通过引入 org.springframework.cloud:spring-cloud-starter-consul-discovery,这个 starter 依赖于 org.springframework.cloud:spring-cloud-consul-coreorg.springframework.cloud:spring-cloud-consul-discovery

Consul 的核心概念

  • server
    集群的核心节点,用于和 agent 通讯,保存服务的信息
  • agent
    集群节点的守护进程,用于服务注册等行为,但不保存数据
  • catalog
    集群服务通信的接口

初始化 Kubernetes Client

初始化 Consul 依赖

相关 Consul 核心依赖的初始化是通过 org.springframework.cloud.consul.ConsulAutoConfiguration实现的

  • 初始化 ConsulClient

    @Bean

    1. @ConditionalOnMissingBean
    2. public ConsulClient consulClient(ConsulProperties consulProperties) {
    3. final int agentPort = consulProperties.getPort();
    4. final String agentHost = !StringUtils.isEmpty(consulProperties.getScheme())
    5. ? consulProperties.getScheme() + "://" + consulProperties.getHost()
    6. : consulProperties.getHost();
    7. if (consulProperties.getTls() != null) {
    8. ConsulProperties.TLSConfig tls = consulProperties.getTls();
    9. TLSConfig tlsConfig = new TLSConfig(tls.getKeyStoreInstanceType(),
    10. tls.getCertificatePath(), tls.getCertificatePassword(),
    11. tls.getKeyStorePath(), tls.getKeyStorePassword());
    12. return new ConsulClient(agentHost, agentPort, tlsConfig);
    13. }
    14. return new ConsulClient(agentHost, agentPort);
    15. }

服务注册

初始化 Bean

相关 Bean 的初始化是在 org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistrationAutoConfiguration 中完成的

  1. // 自动注册
  2. @Bean
  3. @ConditionalOnMissingBean
  4. public ConsulAutoServiceRegistration consulAutoServiceRegistration(
  5. ConsulServiceRegistry registry,
  6. AutoServiceRegistrationProperties autoServiceRegistrationProperties,
  7. ConsulDiscoveryProperties properties,
  8. ConsulAutoRegistration consulRegistration) {
  9. return new ConsulAutoServiceRegistration(registry, autoServiceRegistrationProperties, properties, consulRegistration);
  10. }
  11. // 启动事件监听器
  12. @Bean
  13. public ConsulAutoServiceRegistrationListener consulAutoServiceRegistrationListener(
  14. ConsulAutoServiceRegistration registration) {
  15. return new ConsulAutoServiceRegistrationListener(registration);
  16. }
  17. @Bean
  18. @ConditionalOnMissingBean
  19. public ConsulAutoRegistration consulRegistration(
  20. AutoServiceRegistrationProperties autoServiceRegistrationProperties,
  21. ConsulDiscoveryProperties properties, ApplicationContext applicationContext,
  22. ObjectProvider<List<ConsulRegistrationCustomizer>> registrationCustomizers,
  23. ObjectProvider<List<ConsulManagementRegistrationCustomizer>> managementRegistrationCustomizers,
  24. HeartbeatProperties heartbeatProperties) {
  25. return ConsulAutoRegistration.registration(autoServiceRegistrationProperties,
  26. properties, applicationContext, registrationCustomizers.getIfAvailable(),
  27. managementRegistrationCustomizers.getIfAvailable(), heartbeatProperties);
  28. }

注册流程

  • 当监听到 WebServerInitializedEvent 事件时触发注册

ConsulAutoServiceRegistrationListener 类实现了 SmartApplicationListener接口

  1. @Override
  2. public void onApplicationEvent(ApplicationEvent applicationEvent) {
  3. // 判断是否是 web server 初始化事件
  4. if (applicationEvent instanceof WebServerInitializedEvent) {
  5. WebServerInitializedEvent event = (WebServerInitializedEvent) applicationEvent;
  6. ApplicationContext context = event.getApplicationContext();
  7. if (context instanceof ConfigurableWebServerApplicationContext) {
  8. if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
  9. return;
  10. }
  11. }
  12. this.autoServiceRegistration.setPortIfNeeded(event.getWebServer().getPort());
  13. // 真正触发服务注册
  14. this.autoServiceRegistration.start();
  15. }
  16. }

调用 org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistration#register 注册

  1. @Override
  2. protected void register() {
  3. if (!this.properties.isRegister()) {
  4. log.debug("Registration disabled.");
  5. return;
  6. }
  7. super.register();
  8. }

然后调用 org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#start

  1. public void start() {
  2. if (!this.isEnabled()) {
  3. if (logger.isDebugEnabled()) {
  4. logger.debug("Discovery Lifecycle disabled. Not starting");
  5. }
  6. } else {
  7. if (!this.running.get()) {
  8. this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
  9. this.register();
  10. if (this.shouldRegisterManagement()) {
  11. this.registerManagement();
  12. }
  13. this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
  14. this.running.compareAndSet(false, true);
  15. }
  16. }
  17. }
  18. protected void register() {
  19. this.serviceRegistry.register(this.getRegistration());
  20. }
  • 最终在 ConsulServiceRegistry 实现注册逻辑

    @Override

    1. public void register(ConsulRegistration reg) {
    2. log.info("Registering service with consul: " + reg.getService());
    3. try {
    4. // 将服务注册到 Consul
    5. this.client.agentServiceRegister(reg.getService(), this.properties.getAclToken());
    6. NewService service = reg.getService();
    7. // 添加到心跳检查中
    8. if (this.heartbeatProperties.isEnabled() &&
    9. this.ttlScheduler != null &&
    10. service.getCheck() != null &&
    11. service.getCheck().getTtl() != null) {
    12. this.ttlScheduler.add(reg.getInstanceId());
    13. }
    14. } catch (ConsulException e) {
    15. if (this.properties.isFailFast()) {
    16. log.error("Error registering service with consul: " + reg.getService(), e);
    17. ReflectionUtils.rethrowRuntimeException(e);
    18. }
    19. log.warn("Failfast is false. Error registering service with consul: " + reg.getService(), e);
    20. }
    21. }

最后发出服务注册事件

取消注册流程

  • 在 Bean org.springframework.cloud.consul.serviceregistry.ConsulAutoServiceRegistration 销毁的时候调用 stop 方法,执行关闭逻辑;在 stop 方法中调用 deregister 方法,取消注册

    public void stop() {

    1. if (this.getRunning().compareAndSet(true, false) && this.isEnabled()) {
    2. this.deregister();
    3. if (this.shouldRegisterManagement()) {
    4. this.deregisterManagement();
    5. }
    6. this.serviceRegistry.close();
    7. }
    8. }
  • ConsulServiceRegistry 实现取消注册逻辑

    @Override

    1. public void deregister(ConsulRegistration reg) {
    2. if (this.ttlScheduler != null) {
    3. this.ttlScheduler.remove(reg.getInstanceId());
    4. }
    5. if (log.isInfoEnabled()) {
    6. log.info("Deregistering service with consul: " + reg.getInstanceId());
    7. }
    8. // 将实例从 Consul 中移除
    9. this.client.agentServiceDeregister(reg.getInstanceId(), this.properties.getAclToken());
    10. }

服务发现

初始化 Bean

相关 Bean 的初始化在 org.springframework.cloud.consul.discovery.ConsulDiscoveryClientConfiguration 中完成

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public ConsulDiscoveryClient consulDiscoveryClient(ConsulClient consulClient,
  4. ConsulDiscoveryProperties discoveryProperties) {
  5. return new ConsulDiscoveryClient(consulClient, discoveryProperties);
  6. }

获取服务

  • getService

调用 org.springframework.cloud.consul.discovery.ConsulDiscoveryClient#getServices 方法获取指定条件下的服务名称

  1. @Override
  2. public List<String> getServices() {
  3. String aclToken = this.properties.getAclToken();
  4. CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
  5. .setQueryParams(QueryParams.DEFAULT)
  6. .setToken(this.properties.getAclToken()).build();
  7. return new ArrayList<>(this.client.getCatalogServices(request).getValue().keySet());
  8. }

最终是调用了 Consul 的 /v1/catalog/services接口

获取实例

  • getInstance

调用 org.springframework.cloud.consul.discovery.ConsulDiscoveryClient#getInstances(java.lang.String, com.ecwid.consul.v1.QueryParams),根据服务的名称获取相应的实例列表

  1. public List<ServiceInstance> getInstances(final String serviceId,
  2. final QueryParams queryParams) {
  3. List<ServiceInstance> instances = new ArrayList<>();
  4. addInstancesToList(instances, serviceId, queryParams);
  5. return instances;
  6. }
  7. private void addInstancesToList(List<ServiceInstance> instances, String serviceId,
  8. QueryParams queryParams) {
  9. // 请求参数
  10. HealthServicesRequest request = HealthServicesRequest.newBuilder()
  11. .setTag(this.properties.getDefaultQueryTag())
  12. .setPassing(this.properties.isQueryPassing())
  13. .setQueryParams(queryParams)
  14. .setToken(this.properties.getAclToken()).build();
  15. Response<List<HealthService>> services = this.client.getHealthServices(serviceId,
  16. request);
  17. for (HealthService service : services.getValue()) {
  18. String host = findHost(service);
  19. Map<String, String> metadata = getMetadata(service, this.properties.isTagsAsMetadata());
  20. boolean secure = false;
  21. if (metadata.containsKey("secure")) {
  22. secure = Boolean.parseBoolean(metadata.get("secure"));
  23. }
  24. instances.add(
  25. new DefaultServiceInstance(
  26. service.getService().getId(),
  27. serviceId,
  28. host,
  29. service.getService().getPort(),
  30. secure,
  31. metadata)
  32. );
  33. }
  34. }

服务列表更新

Consul 的实例监听是通过定时任务,默认每秒都会拉取服务列表,如果发现返回的 Index 发生变化,则说明服务发生变化,发出 HeartbeatEvent 事件

实例初始化

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public ConsulCatalogWatch consulCatalogWatch(
  4. ConsulDiscoveryProperties discoveryProperties,
  5. ConsulClient consulClient,
  6. @Qualifier(CATALOG_WATCH_TASK_SCHEDULER_NAME) TaskScheduler taskScheduler) {
  7. return new ConsulCatalogWatch(discoveryProperties, consulClient, taskScheduler);
  8. }

监听实现

是在 org.springframework.cloud.consul.discovery.ConsulCatalogWatch#catalogServicesWatch

  1. @Timed("consul.watch-catalog-services")
  2. public void catalogServicesWatch() {
  3. try {
  4. long index = -1;
  5. if (this.catalogServicesIndex.get() != null) {
  6. index = this.catalogServicesIndex.get().longValue();
  7. }
  8. // 获取服务信息
  9. CatalogServicesRequest request = CatalogServicesRequest.newBuilder()
  10. .setQueryParams(new QueryParams(this.properties.getCatalogServicesWatchTimeout(), index))
  11. .setToken(this.properties.getAclToken()).build();
  12. Response<Map<String, List<String>>> response = this.consul.getCatalogServices(request);
  13. // 获取位点并发送事件
  14. Long consulIndex = response.getConsulIndex();
  15. if (consulIndex != null) {
  16. this.catalogServicesIndex.set(BigInteger.valueOf(consulIndex));
  17. }
  18. if (log.isTraceEnabled()) {
  19. log.trace("Received services update from consul: " + response.getValue() + ", index: " + consulIndex);
  20. }
  21. this.publisher.publishEvent(new HeartbeatEvent(this, consulIndex));
  22. } catch (Exception e) {
  23. log.error("Error watching Consul CatalogServices", e);
  24. }
  25. }

发表评论

表情:
评论列表 (有 0 条评论,431人围观)

还没有评论,来说两句吧...

相关阅读