Spring Cloud源码分析之Eureka篇:服务注册

柔光的暖阳◎ 2022-02-21 02:49 280阅读 0赞

本章学习的是服务注册逻辑的相关代码,对应用如何将自身信息注册到Eureka进行深入了解,原文地址:https://blog.csdn.net/boling_cavalry/article/details/82861618

关于源码版本
本次分析的Spring Cloud版本为Edgware.RELEASE,对应的eureka-client版本为1.7.0;

源码分析
首先回顾com.netflix.discovery.DiscoveryClient类的initScheduledTasks方法,Eureka client在启动的时侯都会执行此方法,如下方所示,已经略去了周期性更新服务列表相关的代码:

  1. //来自EurekaClientConfigBean,默认为true
  2. if (clientConfig.shouldRegisterWithEureka()) {
  3. //续租间隔
  4. int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
  5. //周期性任务处理超时后,下一次执行时将超时事件翻倍,但是不可超过expBackOffBound的设定范围
  6. int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
  7. logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
  8. //指定时间后启动周期性续租的任务
  9. scheduler.schedule(
  10. new TimedSupervisorTask(
  11. "heartbeat",
  12. scheduler,
  13. heartbeatExecutor,
  14. renewalIntervalInSecs,
  15. TimeUnit.SECONDS,
  16. expBackOffBound,
  17. new HeartbeatThread()
  18. ),
  19. renewalIntervalInSecs, TimeUnit.SECONDS);
  20. //上报自身信息到Eureka server的操作委托给InstanceInfoReplicator实例发起,
  21. //如果有多个场景需要上报,都由InstanceInfoReplicator进行调度和安排,
  22. //并且还有限流逻辑,避免频繁先服务端请求
  23. instanceInfoReplicator = new InstanceInfoReplicator(
  24. this,
  25. instanceInfo,
  26. clientConfig.getInstanceInfoReplicationIntervalSeconds(),
  27. 2); // burstSize
  28. //监听和响应应用状态变化,包括从停止状态恢复或者进入停止状态,
  29. statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
  30. @Override
  31. public String getId() {
  32. return "statusChangeListener";
  33. }
  34. @Override
  35. public void notify(StatusChangeEvent statusChangeEvent) {
  36. if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
  37. InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
  38. // log at warn level if DOWN was involved
  39. logger.warn("Saw local status change event {}", statusChangeEvent);
  40. } else {
  41. logger.info("Saw local status change event {}", statusChangeEvent);
  42. }
  43. //将自身状态上报都Eureka server(有限流逻辑避免频繁上报)
  44. instanceInfoReplicator.onDemandUpdate();
  45. }
  46. };
  47. if (clientConfig.shouldOnDemandUpdateStatusChange()) {
  48. //注册状态变化的监听
  49. applicationInfoManager.registerStatusChangeListener(statusChangeListener);
  50. }
  51. //更新信息并注册到Eureka server
  52. instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
  53. } else {
  54. logger.info("Not registering with Eureka server per configuration");
  55. }

从上述代码可以看出,主动更新和状态变化触发的更新,都委托给成员变量instanceInfoReplicator执行,InstanceInfoReplicator是个辅助类,在服务注册过程中主要负责并发控制、周期性执行等工作,有关此类的详细介绍请参考文章《Eureka的InstanceInfoReplicator类(服务注册辅助工具)》;
本文聚焦服务注册,因此InstanceInfoReplicator类本身的细节就不在此展开,这里主要关注的是InstanceInfoReplicator的run方法中注册到Eureka server的代码,如下图红框,discoveryClient.register()实现了注册的功能:

注意:由上图绿框中代码可见,注册完成后又会提交一个一次性的延时任务,这就相当于周期性的执行run方法了,这么一来岂不是会周期性注册?其实并不会,红框上面是有个判断条件的:if (dirtyTimestamp != null),只要成员变量instanceInfo的isDirtyWithTime方法返回为空,就不会执行注册;

先看代码discoveryClient.refreshInstanceInfo(),弄清楚即将上报到Eureka server的信息是如何更新的,如下代码所示,信息更新的操作是委托给ApplicationInfoManager实例来完成的:

  1. void refreshInstanceInfo() {
  2. //更新数据
  3. applicationInfoManager.refreshDataCenterInfoIfRequired();
  4. //如果续租时间有变化就要及时更新
  5. applicationInfoManager.refreshLeaseInfoIfRequired();
  6. InstanceStatus status;
  7. try {
  8. status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
  9. } catch (Exception e) {
  10. logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
  11. //如果获取状态异常,就设置当前状态为DOWN
  12. status = InstanceStatus.DOWN;
  13. }
  14. if (null != status) {
  15. applicationInfoManager.setInstanceStatus(status);
  16. }
  17. }

接下来看看服务注册相关的代码,也就是DiscoveryClient类的register方法,如下所示,源码注释中说到是注册请求类型是Restful的,Eureka server的返回码如果是204表示注册成功,然而在前面的discoveryClient.register()方法内,其实并不关注这个返回值:

  1. /**
  2. * Register with the eureka service by making the appropriate REST call.
  3. */
  4. boolean register() throws Throwable {
  5. logger.info(PREFIX + appPathIdentifier + ": registering service...");
  6. EurekaHttpResponse<Void> httpResponse;
  7. try {
  8. //注册操作
  9. httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
  10. } catch (Exception e) {
  11. logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
  12. throw e;
  13. }
  14. if (logger.isInfoEnabled()) {
  15. logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
  16. }
  17. return httpResponse.getStatusCode() == 204;
  18. }

继续展开注册操作的源码eurekaTransport.registrationClient.register(instanceInfo),多层调用一路展开,最终由JerseyApplicationClient类来完成注册操作,对应源码在父类AbstractJerseyEurekaHttpClient中,如下所示,主要工作是利用jersey库的Restful Api将自身的信息POST到Eureka server:

  1. @Override
  2. public EurekaHttpResponse<Void> register(InstanceInfo info) {
  3. String urlPath = "apps/" + info.getAppName();
  4. ClientResponse response = null;
  5. try {
  6. Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
  7. addExtraHeaders(resourceBuilder);
  8. response = resourceBuilder
  9. .header("Accept-Encoding", "gzip")
  10. .type(MediaType.APPLICATION_JSON_TYPE)
  11. .accept(MediaType.APPLICATION_JSON)
  12. .post(ClientResponse.class, info);
  13. return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
  14. } finally {
  15. if (logger.isDebugEnabled()) {
  16. logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
  17. response == null ? "N/A" : response.getStatus());
  18. }
  19. if (response != null) {
  20. response.close();
  21. }
  22. }
  23. }

至此,Eureka client向服务注册的源码就分析完毕了,过程相对简单,DiscoveryClient、InstanceInfoReplicator、ApplicationInfoManager、JerseyApplicationClient等实例各司其职将应用自身信息上报到Eureka server,由Eureka server保存,再被其他实例下载;

发表评论

表情:
评论列表 (有 0 条评论,280人围观)

还没有评论,来说两句吧...

相关阅读