eureka客户端源码分析

约定不等于承诺〃 2023-07-12 08:58 80阅读 0赞

eureka客户端可以完成的事情

  1. eureka客户端可以获取其他客户端的信息,用于向其他客户端发送请求。
  2. eureka客户端可以把自己注册到注册中心,这样其他客户端就可以发现本客户端,直接来调用本客户端。

相关依赖

首先要是springboot应用

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-eureka</artifactId>
  4. </dependency>

我这里使用的版本是Brixton.SR5,不同版本的实现逻辑可能不一样。

逻辑详解

启动类上需要加@EnableDiscoveryClient注解,我们打开该注解开一下代码。

  1. /**
  2. * Annotation to enable a DiscoveryClient implementation.
  3. * @author Spencer Gibb
  4. */
  5. @Target(ElementType.TYPE)
  6. @Retention(RetentionPolicy.RUNTIME)
  7. @Documented
  8. @Inherited
  9. @Import(EnableDiscoveryClientImportSelector.class)
  10. public @interface EnableDiscoveryClient {
  11. }

按注释的意思就可以知道,这里是去启用DiscoveryClient的实现类。这里如何去启动的?大概的思路是通过springboot的starter机制实现的,通过扫描特定的配置类EurekaClientAutoConfiguration,在该配置类中,生成了DiscoveryClient的bean。这里贴一小段代码。

  1. @Bean(destroyMethod = "shutdown")
  2. @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
  3. public EurekaClient eurekaClient(ApplicationInfoManager manager,
  4. EurekaClientConfig config) {
  5. return new CloudEurekaClient(manager, config, this.optionalArgs,
  6. this.context);
  7. }

其实CloudEurekaClient里面的代码很少,基本都是父类DiscoveryClient实现的,我们直接看父类的方法。
最后是调用了如下构造方法,这里先贴一下构造方法的具体代码,不用去研究。

  1. @Inject
  2. DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
  3. Provider<BackupRegistry> backupRegistryProvider) {
  4. //健康校验的,跳过
  5. if (args != null) {
  6. this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
  7. this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
  8. this.eventListeners.addAll(args.getEventListeners());
  9. } else {
  10. this.healthCheckCallbackProvider = null;
  11. this.healthCheckHandlerProvider = null;
  12. }
  13. //这主要是获取本应用信息的
  14. this.applicationInfoManager = applicationInfoManager;
  15. InstanceInfo myInfo = applicationInfoManager.getInfo();
  16. clientConfig = config;
  17. staticClientConfig = clientConfig;
  18. transportConfig = config.getTransportConfig();
  19. instanceInfo = myInfo;
  20. if (myInfo != null) {
  21. appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
  22. } else {
  23. logger.warn("Setting instanceInfo to a passed in null value");
  24. }
  25. this.backupRegistryProvider = backupRegistryProvider;
  26. this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
  27. localRegionApps.set(new Applications());
  28. fetchRegistryGeneration = new AtomicLong(0);
  29. remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
  30. remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
  31. //一些监视器
  32. if (config.shouldFetchRegistry()) {
  33. this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
  34. } else {
  35. this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  36. }
  37. if (config.shouldRegisterWithEureka()) {
  38. this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
  39. } else {
  40. this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
  41. }
  42. //如果不需要注册到eureka,也不需要从eureka读取信息的话,直接返回。
  43. if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
  44. logger.info("Client configured to neither register nor query for data.");
  45. scheduler = null;
  46. heartbeatExecutor = null;
  47. cacheRefreshExecutor = null;
  48. eurekaTransport = null;
  49. instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
  50. // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  51. // to work with DI'd DiscoveryClient
  52. DiscoveryManager.getInstance().setDiscoveryClient(this);
  53. DiscoveryManager.getInstance().setEurekaClientConfig(config);
  54. initTimestampMs = System.currentTimeMillis();
  55. logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
  56. initTimestampMs, this.getApplications().size());
  57. return; // no need to setup up an network tasks and we are done
  58. }
  59. //这里初始化了一些线程池,主要是给下面的task用的,
  60. //heartbeatExecutor用于心跳,cacheRefreshExecutor用于重新获取信息
  61. try {
  62. scheduler = Executors.newScheduledThreadPool(3,
  63. new ThreadFactoryBuilder()
  64. .setNameFormat("DiscoveryClient-%d")
  65. .setDaemon(true)
  66. .build());
  67. heartbeatExecutor = new ThreadPoolExecutor(
  68. 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
  69. new SynchronousQueue<Runnable>(),
  70. new ThreadFactoryBuilder()
  71. .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
  72. .setDaemon(true)
  73. .build()
  74. ); // use direct handoff
  75. cacheRefreshExecutor = new ThreadPoolExecutor(
  76. 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
  77. new SynchronousQueue<Runnable>(),
  78. new ThreadFactoryBuilder()
  79. .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
  80. .setDaemon(true)
  81. .build()
  82. ); // use direct handoff
  83. eurekaTransport = new EurekaTransport();
  84. scheduleServerEndpointTask(eurekaTransport, args);
  85. AzToRegionMapper azToRegionMapper;
  86. if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
  87. azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
  88. } else {
  89. azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
  90. }
  91. if (null != remoteRegionsToFetch.get()) {
  92. azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
  93. }
  94. instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
  95. } catch (Throwable e) {
  96. throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
  97. }
  98. if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
  99. fetchRegistryFromBackup();
  100. }
  101. //重点,刷新缓存和心跳是在这个方法里面干的
  102. initScheduledTasks();
  103. try {
  104. Monitors.registerObject(this);
  105. } catch (Throwable e) {
  106. logger.warn("Cannot register timers", e);
  107. }
  108. // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
  109. // to work with DI'd DiscoveryClient
  110. DiscoveryManager.getInstance().setDiscoveryClient(this);
  111. DiscoveryManager.getInstance().setEurekaClientConfig(config);
  112. initTimestampMs = System.currentTimeMillis();
  113. logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
  114. initTimestampMs, this.getApplications().size());
  115. }

主要逻辑在initScheduledTasks()里面,我们直接看这个里面的实现。

  1. /**
  2. * Initializes all scheduled tasks.
  3. */
  4. private void initScheduledTasks() {
  5. //如果需要获取注册中心的信息,则执行里面的定时任务。
  6. if (clientConfig.shouldFetchRegistry()) {
  7. // registry cache refresh timer
  8. int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
  9. int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
  10. //定时去获取注册中心的信息,这里主要的逻辑是通过CacheRefreshThread去实现的
  11. scheduler.schedule(
  12. new TimedSupervisorTask(
  13. "cacheRefresh",
  14. scheduler,
  15. cacheRefreshExecutor,
  16. registryFetchIntervalSeconds,
  17. TimeUnit.SECONDS,
  18. expBackOffBound,
  19. new CacheRefreshThread()
  20. ),
  21. registryFetchIntervalSeconds, TimeUnit.SECONDS);
  22. }
  23. //如果需要把自身注册到注册中心的话,执行里面的逻辑
  24. if (clientConfig.shouldRegisterWithEureka()) {
  25. int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
  26. int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
  27. logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
  28. //每隔一段时间,发送心跳信息到注册中心,具体的逻辑由HeartbeatThread线程实现。
  29. // Heartbeat timer
  30. scheduler.schedule(
  31. new TimedSupervisorTask(
  32. "heartbeat",
  33. scheduler,
  34. heartbeatExecutor,
  35. renewalIntervalInSecs,
  36. TimeUnit.SECONDS,
  37. expBackOffBound,
  38. new HeartbeatThread()
  39. ),
  40. renewalIntervalInSecs, TimeUnit.SECONDS);
  41. // InstanceInfo replicator
  42. instanceInfoReplicator = new InstanceInfoReplicator(
  43. this,
  44. instanceInfo,
  45. clientConfig.getInstanceInfoReplicationIntervalSeconds(),
  46. 2); // burstSize
  47. statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
  48. @Override
  49. public String getId() {
  50. return "statusChangeListener";
  51. }
  52. @Override
  53. public void notify(StatusChangeEvent statusChangeEvent) {
  54. if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
  55. InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
  56. // log at warn level if DOWN was involved
  57. logger.warn("Saw local status change event {}", statusChangeEvent);
  58. } else {
  59. logger.info("Saw local status change event {}", statusChangeEvent);
  60. }
  61. instanceInfoReplicator.onDemandUpdate();
  62. }
  63. };
  64. if (clientConfig.shouldOnDemandUpdateStatusChange()) {
  65. applicationInfoManager.registerStatusChangeListener(statusChangeListener);
  66. }
  67. //用于更新本地instanceinfo并将其复制到远程服务器的任务
  68. instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
  69. } else {
  70. logger.info("Not registering with Eureka server per configuration");
  71. }
  72. }

这里一共有三个逻辑

  1. 获取注册中心信息
  2. 心跳
  3. 更新本地信息并同步至远程

更新本地信息并同步至远程

这里看下InstanceInfoReplicator的run方法

  1. public void run() {
  2. try {
  3. //刷新本地配置信息
  4. discoveryClient.refreshInstanceInfo();
  5. Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
  6. //如果本地信息已经更新了且没有同步到远程,才有dirtyTimestamp时间
  7. if (dirtyTimestamp != null) {
  8. //向远程注册
  9. discoveryClient.register();
  10. //已经同步到远程过了,这里需要更新状态,表明下次不需要重新注册
  11. instanceInfo.unsetIsDirty(dirtyTimestamp);
  12. }
  13. } catch (Throwable t) {
  14. logger.warn("There was a problem with the instance info replicator", t);
  15. } finally {
  16. //这里会继续轮询,发现本地信息更新了,就会同步到远程
  17. Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
  18. scheduledPeriodicRef.set(next);
  19. }
  20. }

我们再看下注册方法,注册方法也很简单,只是调用一下http请求,把自身信息告诉远程。

  1. boolean register() throws Throwable {
  2. logger.info(PREFIX + appPathIdentifier + ": registering service...");
  3. EurekaHttpResponse<Void> httpResponse;
  4. try {
  5. httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  6. } catch (Exception e) {
  7. logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
  8. throw e;
  9. }
  10. if (logger.isInfoEnabled()) {
  11. logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
  12. }
  13. return httpResponse.getStatusCode() == 204;
  14. }

获取注册中心的信息

这里主要看一下CacheRefreshThread的run方法,最后调用了fetchRegistry方法

  1. private boolean fetchRegistry(boolean forceFullRegistryFetch) {
  2. Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
  3. try {
  4. //获取原来注册中心获取的信息
  5. Applications applications = getApplications();
  6. //新增的情况
  7. if (clientConfig.shouldDisableDelta()
  8. || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
  9. || forceFullRegistryFetch
  10. || (applications == null)
  11. || (applications.getRegisteredApplications().size() == 0)
  12. || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
  13. {
  14. logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
  15. logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
  16. logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
  17. logger.info("Application is null : {}", (applications == null));
  18. logger.info("Registered Applications size is zero : {}",
  19. (applications.getRegisteredApplications().size() == 0));
  20. logger.info("Application version is -1: {}", (applications.getVersion() == -1));
  21. getAndStoreFullRegistry();
  22. } else {
  23. //更新的情况
  24. getAndUpdateDelta(applications);
  25. }
  26. applications.setAppsHashCode(applications.getReconcileHashCode());
  27. logTotalInstances();
  28. } catch (Throwable e) {
  29. logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
  30. return false;
  31. } finally {
  32. if (tracer != null) {
  33. tracer.stop();
  34. }
  35. }
  36. // Notify about cache refresh before updating the instance remote status
  37. onCacheRefreshed();
  38. // Update remote status based on refreshed data held in the cache
  39. updateInstanceRemoteStatus();
  40. // registry was fetched successfully, so return true
  41. return true;
  42. }

这里直接看一个更新的情况getAndUpdateDelta

  1. private void getAndUpdateDelta(Applications applications) throws Throwable {
  2. long currentUpdateGeneration = fetchRegistryGeneration.get();
  3. Applications delta = null;
  4. //调用远程获取信息
  5. EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
  6. if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
  7. delta = httpResponse.getEntity();
  8. }
  9. if (delta == null) {
  10. logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
  11. + "Hence got the full registry.");
  12. getAndStoreFullRegistry();
  13. } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
  14. logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
  15. String reconcileHashCode = "";
  16. if (fetchRegistryUpdateLock.tryLock()) {
  17. try {
  18. //调用该方法更新了注册信息。
  19. updateDelta(delta);
  20. reconcileHashCode = getReconcileHashCode(applications);
  21. } finally {
  22. fetchRegistryUpdateLock.unlock();
  23. }
  24. } else {
  25. logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
  26. }
  27. // There is a diff in number of instances for some reason
  28. if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
  29. reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
  30. }
  31. } else {
  32. logger.warn("Not updating application delta as another thread is updating it already");
  33. logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
  34. }
  35. }

心跳

下面再看下心跳,主要看HeartbeatThread的run方法,最后调用了renew方法。

  1. boolean renew() {
  2. EurekaHttpResponse<InstanceInfo> httpResponse;
  3. try {
  4. //发送http请求,instanceInfo为客户端信息。
  5. httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
  6. logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
  7. if (httpResponse.getStatusCode() == 404) {
  8. REREGISTER_COUNTER.increment();
  9. logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
  10. return register();
  11. }
  12. return httpResponse.getStatusCode() == 200;
  13. } catch (Throwable e) {
  14. logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
  15. return false;
  16. }
  17. }

注册信息

这里贴一个Applications类的代码,该类用于保存注册中心返回的信息,具体信息内容可以自己打断点查看。
里面保存了很多数据结构,主要是为了后期数据更方便的使用。

  1. /**
  2. * The class that wraps all the registry information returned by eureka server.
  3. *
  4. */
  5. @Serializer("com.netflix.discovery.converters.EntityBodyConverter")
  6. @XStreamAlias("applications")
  7. @JsonRootName("applications")
  8. public class Applications {
  9. private static final String APP_INSTANCEID_DELIMITER = "$$";
  10. private static final Logger logger = LoggerFactory.getLogger(Applications.class);
  11. private static final String STATUS_DELIMITER = "_";
  12. private Long versionDelta = Long.valueOf(-1);
  13. @XStreamImplicit
  14. private AbstractQueue<Application> applications;
  15. private Map<String, Application> appNameApplicationMap = new ConcurrentHashMap<String, Application>();
  16. private Map<String, AbstractQueue<InstanceInfo>> virtualHostNameAppMap = new ConcurrentHashMap<String, AbstractQueue<InstanceInfo>>();
  17. private Map<String, AbstractQueue<InstanceInfo>> secureVirtualHostNameAppMap = new ConcurrentHashMap<String, AbstractQueue<InstanceInfo>>();
  18. private Map<String, AtomicLong> virtualHostNameIndexMap = new ConcurrentHashMap<String, AtomicLong>();
  19. private Map<String, AtomicLong> secureVirtualHostNameIndexMap = new ConcurrentHashMap<String, AtomicLong>();
  20. private Map<String, AtomicReference<List<InstanceInfo>>> shuffleVirtualHostNameMap = new ConcurrentHashMap<String, AtomicReference<List<InstanceInfo>>>();
  21. private Map<String, AtomicReference<List<InstanceInfo>>> shuffledSecureVirtualHostNameMap = new ConcurrentHashMap<String, AtomicReference<List<InstanceInfo>>>();
  22. //其他代码省略。
  23. }

发表评论

表情:
评论列表 (有 0 条评论,80人围观)

还没有评论,来说两句吧...

相关阅读