spring cloud eureka server 集群同步之peernodes管理

ゝ一纸荒年。 2022-04-22 00:14 183阅读 0赞

在PeerAwareInstanceRegistryImpl这个注册实现类中会看到replicateToPeers这个方法:

  1. @Override
  2. public boolean cancel(final String appName, final String id,
  3. final boolean isReplication) {
  4. if (super.cancel(appName, id, isReplication)) {
  5. replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
  6. ...
  7. return true;
  8. }
  9. return false;
  10. }
  11. @Override
  12. public void register(final InstanceInfo info, final boolean isReplication) {
  13. int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
  14. if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
  15. leaseDuration = info.getLeaseInfo().getDurationInSecs();
  16. }
  17. super.register(info, leaseDuration, isReplication);
  18. replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
  19. }

了解eureka的朋友可能都知道replicateToPeers这个方法是用于eureka server之间同步eureka client信息的方法。其中的代码是这样的:

  1. private void replicateToPeers(Action action, String appName, String id,
  2. InstanceInfo info /* optional */,
  3. InstanceStatus newStatus /* optional */, boolean isReplication) {
  4. Stopwatch tracer = action.getTimer().start();
  5. try {
  6. if (isReplication) {
  7. numberOfReplicationsLastMin.increment();
  8. }
  9. // If it is a replication already, do not replicate again as this will create a poison replication
  10. if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
  11. return;
  12. }
  13. for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
  14. // If the url represents this host, do not replicate to yourself.
  15. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
  16. continue;
  17. }
  18. replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
  19. }
  20. } finally {
  21. tracer.stop();
  22. }
  23. }

其中有两个关键代码:peerEurekaNodes.getPeerEurekaNodes() 和 peerEurekaNodes.isThisMyUrl(node.getServiceUrl()))

peerEurekaNodes.isThisMyUrl() 这个方法是判断node是否自己(当前节点),如果是自己,就跳过,否则发送复制信息给这个eureka server node。

peerEurekaNodes.getPeerEurekaNodes() 是eureka server node的列表,eureka server会维护集群中跟自己有直接关系的eureka server信息,其实就是将eureka.client.serviceUrl.{zone}这个url列表中生成,默认是eureka.client.serviceUrl.defaultZone,接下来看下维护这个peerEurekaNodes的逻辑。

peerEurekaNodes管理
其实逻辑很简单,代码的执行序列是这样的:

springDefaultEurekaPeerEurekaNodesScheduledExeinitialize在EurekaServerAu toConfiguration 中发布了DefaultEure kaServerContext 和PeerEurekaNode sstartDefaultEurekaServerContext .initialize()方法标注了@PostCon struct,Spring会在加载完该类后执行这个方 法。scheduleWithFixedDelayPeerEurekaNo des创建一个定时任务, 定时执行updateP eerEurekaNod es()方法,更新pee rEurekaNodesupdatePeerEurekaNodesspringDefaultEurekaPeerEurekaNodesScheduledExe

这里将PeerEurekaNodes的部分代码贴出来,我认为设计得挺巧妙的

  1. public void start() {
  2. ...
  3. try {
  4. updatePeerEurekaNodes(resolvePeerUrls());
  5. Runnable peersUpdateTask = new Runnable() {
  6. @Override
  7. public void run() {
  8. try {
  9. updatePeerEurekaNodes(resolvePeerUrls());
  10. } catch (Throwable e) {
  11. logger.error("Cannot update the replica Nodes", e);
  12. }
  13. }
  14. };
  15. taskExecutor.scheduleWithFixedDelay(
  16. peersUpdateTask,
  17. serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
  18. serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
  19. TimeUnit.MILLISECONDS
  20. );
  21. } catch (Exception e) {
  22. throw new IllegalStateException(e);
  23. }
  24. ...
  25. }
  26. }
  27. protected List<String> resolvePeerUrls() {
  28. InstanceInfo myInfo = applicationInfoManager.getInfo();
  29. String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
  30. List<String> replicaUrls = EndpointUtils
  31. .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
  32. int idx = 0;
  33. while (idx < replicaUrls.size()) {
  34. if (isThisMyUrl(replicaUrls.get(idx))) {
  35. replicaUrls.remove(idx);
  36. } else {
  37. idx++;
  38. }
  39. }
  40. return replicaUrls;
  41. }
  42. protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
  43. if (newPeerUrls.isEmpty()) {
  44. logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
  45. return;
  46. }
  47. Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
  48. toShutdown.removeAll(newPeerUrls);
  49. Set<String> toAdd = new HashSet<>(newPeerUrls);
  50. toAdd.removeAll(peerEurekaNodeUrls);
  51. if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
  52. return;
  53. }
  54. // Remove peers no long available
  55. List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
  56. if (!toShutdown.isEmpty()) {
  57. logger.info("Removing no longer available peer nodes {}", toShutdown);
  58. int i = 0;
  59. while (i < newNodeList.size()) {
  60. PeerEurekaNode eurekaNode = newNodeList.get(i);
  61. if (toShutdown.contains(eurekaNode.getServiceUrl())) {
  62. newNodeList.remove(i);
  63. eurekaNode.shutDown();
  64. } else {
  65. i++;
  66. }
  67. }
  68. }
  69. // Add new peers
  70. if (!toAdd.isEmpty()) {
  71. logger.info("Adding new peer nodes {}", toAdd);
  72. for (String peerUrl : toAdd) {
  73. newNodeList.add(createPeerEurekaNode(peerUrl));
  74. }
  75. }
  76. this.peerEurekaNodes = newNodeList;
  77. this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
  78. }

1、在创建执行updatePeerEurekaNodes的定时任务之前,先执行一遍updatePeerEurekaNodes()方法,更新peer eureka nodes列表,之前要先执行这个方法,是因为定时任务执行的周期默认是10分钟,如果等定时调度来执行的话,太慢了。

2、resolvePeerUrls()获取peer node url时,调用了一个内部判断方法isThisMyUrl(),判断peer url是否是当前节点的url。如果是,就剔除掉。

3、updatePeerEurekaNodes()方法中,通过以前的peerurls减去当前的peerurls判断是否要更新peerEurekaNodes列表,以减少PeerEurekaNode的创建工作。

发表评论

表情:
评论列表 (有 0 条评论,183人围观)

还没有评论,来说两句吧...

相关阅读