Ribbon(六) Ribbon源码分析 - IRULE负载均衡的实现

向右看齐 2022-12-17 01:23 235阅读 0赞

具体负载均衡服务还是由IRULE实现,在ribbon的使用案例中我们可以注意到切换具体的路由实现非常简单:

  1. @RibbonClient(name = "MICROSERVICE-PROVIDER-PRODUCT", configuration = RibbonConfig.class)

通过 @RibbonClient指定负载均衡的实现类 RibbonConfig后,在这个类中创建具体实现算法即可.

  1. public class RibbonConfig {
  2. @Bean
  3. public IRule ribbonRule() { // 其中IRule就是所有规则的标准
  4. return new com.netflix.loadbalancer.RandomRule(); // 随机的访问策略
  5. }
  6. }

那么如果不是自已像上面一样配置 RibbonConfig作为负载均衡的实现类的话,ribbon会自动提供一个默认实现。

让我们追踪一下 RibbonClientConfiguration 这个类,它位于包 org.springframework.cloud.netflix.ribbon 下, 它是Ribbon客户端的默认配置类。它里面加载了一系列的Ribbon所需的对象.其中就包括 IRule.

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public IRule ribbonRule(IClientConfig config) {
  4. if (this.propertiesFactory.isSet(IRule.class, name)) {
  5. return this.propertiesFactory.get(IRule.class, config, name);
  6. }
  7. ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
  8. rule.initWithNiwsConfig(config);
  9. return rule;
  10. }

从以上代码可以看出,加载的默认IRule实现是 ZoneAvoidanceRule, 让我们先来看一下IRule有哪些常用实现.

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5neWluZ2NoZW5ncWk_size_16_color_FFFFFF_t_70

BestAvailableRule :会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务RoundRobinRule:默认算法,轮询
ClientConfigEnabledRoundRobinRule: 使用RoundRobinRule选择服务器
RetryRule: 先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会进行重试,获取可用的服务WeightedResponseTimeRule: 根据平均响应时间计算所有服务的权重,响应时间越快服务权重越大被选中的概率越高。刚启动时如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换到WeightedResponseTimeRule
ZoneAvoidanceRule: 默认规则,复合判断server所在区域的性能和server的可用性选择服务器

  1. ZoneAvoidanceRule 的具体实现

ZoneAvoidanceRule是默认的IRule实例,他使用PredicateBasedRule来根据服务区的运行状况和服务器的可用性来选择服务器
它的父类是com.netflix.loadbalancer.PredicateBasedRule
它的choose()方法具体依次做了以下工作:

  1. 先使用ILoadBalancer 获取服务器列表
  2. 使用AbstractServerPredicate进行服务器过滤
  3. 最后轮询从剩余的服务器列表中选择最终的服务器

    public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {

    1. public abstract AbstractServerPredicate getPredicate();
    2. @Override
    3. public Server choose(Object key) {
    4. ILoadBalancer lb = getLoadBalancer();
    5. //轮询
    6. Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
    7. if (server.isPresent()) {
    8. return server.get();
    9. } else {
    10. return null;
    11. }
    12. }

    }

追踪 chooseRoundRobinAfterFiltering 方法,

  1. public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
  2. //根据loadBalancerKey获取可用服务器列表
  3. List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
  4. if (eligible.size() == 0) {
  5. return Optional.absent();
  6. }
  7. // 这里面的 incrementAndGetModulo 是轮询的具体实现
  8. return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
  9. }

incrementAndGetModulo方法的源码,它是轮询的具体实现: 至此,可以得出ZoneAvoidanceRule也是使用的轮询算法的结论.

  1. // modulo 可用服务器列表数
  2. private int incrementAndGetModulo(int modulo) {
  3. for (;;) {
  4. int current = nextIndex.get();
  5. int next = (current + 1) % modulo;
  6. if (nextIndex.compareAndSet(current, next) && current < modulo)
  7. return current;
  8. }
  9. }

再回到上面的代码中 getPredicate()是一个抽象方法,具体在 ZoneAvoidanceRule中实现,请看下面的实现代码:

  1. @Override
  2. public AbstractServerPredicate getPredicate() {
  3. return compositePredicate; //请注意这里的 compositePredicate 是在下面的构造方法中初始化的.
  4. }
  5. public ZoneAvoidanceRule() {
  6. super();
  7. //判断一个服务器的运行状况是否可用,去除不可用服务器的所有服务器
  8. ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
  9. //用于过滤连接数过多的服务器
  10. AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
  11. compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
  12. }
  13. //将两个Predicate组合成一个CompositePredicate
  14. private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
  15. return CompositePredicate.withPredicates(p1, p2)
  16. .addFallbackPredicate(p2)
  17. .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
  18. .build();
  19. }

接着再看另一个方法 chooseRoundRobinAfterFiltering, 它是过滤的方法,然后AvailabilityPredicate 里面并没有这方法,他直接继承了他的父类com.netflix.loadbalancer.AbstractServerPredicate#chooseRoundRobinAfterFiltering(java.util.List)

  1. public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
  2. //过滤服务器列表
  3. List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
  4. if (eligible.size() == 0) {
  5. return Optional.absent();
  6. }
  7. //(i+1)%n 轮询选择一个服务实例
  8. return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
  9. }

查看一下 getEligibleServers()

  1. public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
  2. //如果前面loadBalancerKey直接传入的null, 方法getEligibleServers会使用serverOnlyPredicate来依次过滤
  3. if (loadBalancerKey == null) {
  4. return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
  5. } else {
  6. List<Server> results = Lists.newArrayList();
  7. for (Server server: servers) {
  8. if (this.apply(new PredicateKey(loadBalancerKey, server))) {
  9. results.add(server);
  10. }
  11. }
  12. return results;
  13. }
  14. }

追踪 getServerOnlyPredicate() , 会发现它返回的 serverOnlyPredicate 的创建代码如下:

  1. //serverOnlyPredicate 则会调用apply方法,并将Server 对象分装PredicateKey当作参数传入
  2. private final Predicate<Server> serverOnlyPredicate = new Predicate<Server>() {
  3. @Override
  4. public boolean apply(@Nullable Server input) {
  5. return AbstractServerPredicate.this.apply(new PredicateKey(input));
  6. }
  7. };

AbstractServerPredicate并没有实现apply方法,具体的实现又回到了子类CompositePredicate的apply方法,它会依次调用ZoneAvoidancePredicate(它以查看服务区是否可用为主)与AvailabilityPredicate(它看的是联接数是否达到可用)的apply方法. 两个类的算法实现不同.

那么首先看 ZoneAvoidancePredicate 的apply实现如下:

  1. @Override
  2. public boolean apply(@Nullable PredicateKey input) {
  3. if (!ENABLED.get()) {
  4. return true;
  5. }
  6. String serverZone = input.getServer().getZone();
  7. if (serverZone == null) {
  8. //如果服务器没有zone的相关信息,直接返回
  9. return true;
  10. }
  11. LoadBalancerStats lbStats = getLBStats();
  12. //LoadBalancerStats 存储每个服务器节点的执行特征和运行记录,这些信息可供动态负载均衡使用
  13. if (lbStats == null) {
  14. //如果没有服务器的运行状态的记录,直接返回
  15. return true;
  16. }
  17. if (lbStats.getAvailableZones().size() <= 1) {
  18. //如果只有就一个服务器,直接返回
  19. return true;
  20. }
  21. //PredicateKey 封装了Server的信息,判断下服务器区的记录是否用当前区的信息
  22. Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
  23. //如果没有直接返回
  24. if (!zoneSnapshot.keySet().contains(serverZone)) {
  25. // The server zone is unknown to the load balancer, do not filter it out
  26. return true;
  27. }
  28. logger.debug("Zone snapshots: {}", zoneSnapshot);
  29. // 最重要的方法: 前面都是一些基本信息的判断 获取可用的服务器列表
  30. Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
  31. logger.debug("Available zones: {}", availableZones);
  32. //判断当前服务器是否在可用的服务器列表中
  33. if (availableZones != null) {
  34. return availableZones.contains(input.getServer().getZone());
  35. } else {
  36. return false;
  37. }
  38. }

在上面代码中大量出现的ZoneSnapshot代码如下: 它用于封装zone快照信息

  1. public class ZoneSnapshot {
  2. //实例数
  3. final int instanceCount;
  4. //平均负载
  5. final double loadPerServer;
  6. //断路器端口数量
  7. final int circuitTrippedCount;
  8. //活动请求数量
  9. final int activeRequestsCount;
  10. }

在这个代码: Set availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());中的getAvailableZones是用来筛选服务区列表的.

  1. public static Set<String> getAvailableZones(
  2. Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
  3. double triggeringBlackoutPercentage) {
  4. if (snapshot.isEmpty()) { //服务区列表是空的,则返回空
  5. return null;
  6. }
  7. //只有一个区,则直接返回此 availableZones
  8. Set<String> availableZones = new HashSet<String>(snapshot.keySet());
  9. if (availableZones.size() == 1) {
  10. return availableZones;
  11. }
  12. Set<String> worstZones = new HashSet<String>();
  13. double maxLoadPerServer = 0;
  14. boolean limitedZoneAvailability = false;
  15. //遍历所有的服务区
  16. for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
  17. String zone = zoneEntry.getKey();
  18. ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
  19. //获取服务区中的服务实例数
  20. int instanceCount = zoneSnapshot.getInstanceCount();
  21. if (instanceCount == 0) {
  22. //如果服务区中没有服务实例,那么移除该服务区
  23. availableZones.remove(zone);
  24. limitedZoneAvailability = true;
  25. } else {
  26. //获取此服务区中的平均负载
  27. double loadPerServer = zoneSnapshot.getLoadPerServer();
  28. //服务区的实例平均负载小于0,或者实例故障率(断路器端口次数/实例数)大于等于阈值(默认0.99999),则去掉该服务区
  29. if (((double) zoneSnapshot.getCircuitTrippedCount())
  30. / instanceCount >= triggeringBlackoutPercentage
  31. || loadPerServer < 0) {
  32. availableZones.remove(zone);
  33. limitedZoneAvailability = true;
  34. } else {
  35. //如果该服务区的平均负载和最大负载的差小于一定的数量,则将该服务区加入到最坏服务区集合
  36. if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
  37. worstZones.add(zone);
  38. } else if (loadPerServer > maxLoadPerServer) {
  39. //如果该zone的平均负载还大于最大负载,也将此服务区加入到最坏服务区集合中
  40. maxLoadPerServer = loadPerServer;
  41. worstZones.clear();
  42. worstZones.add(zone);
  43. }
  44. }
  45. }
  46. }
  47. //如果最大的平均负载小于设定的阈值 , 则此服务区可以用,直接返回
  48. if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
  49. return availableZones;
  50. }
  51. //否则,从最坏的服务区集合服务器集合里面随机挑选一个(没办法啊 )
  52. String zoneToAvoid = randomChooseZone(snapshot, worstZones);
  53. if (zoneToAvoid != null) {
  54. availableZones.remove(zoneToAvoid);
  55. }
  56. return availableZones;
  57. }

下面再谈谈AvailabilityPredicate(它看的是联接数是否达到可用) 的apply方法:

  1. @Override
  2. public boolean apply(@Nullable PredicateKey input) {
  3. LoadBalancerStats stats = getLBStats();
  4. if (stats == null) {
  5. return true;
  6. }
  7. //获得关于该服务器的记录
  8. return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
  9. }
  10. private boolean shouldSkipServer(ServerStats stats) {
  11. //如果该服务器的断路器已经打开,或者他的连接数大于设定的阈值,那么就需要将服务区过滤掉
  12. if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())
  13. || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
  14. return true;
  15. }
  16. return false;
  17. }

至此一个 IRule的实现类 ZoneAvoidanceRule的分析到此结束. 其它的实现方案可以自己去查看源码了.

小结: ZoneAvoidanceRule类从名字上看与RoundRobin没有什么关系,但仔细查看源码,还是可以看出它使用的就是轮询方案.

发表评论

表情:
评论列表 (有 0 条评论,235人围观)

还没有评论,来说两句吧...

相关阅读

    相关 负载均衡---ribbon

    Ribbon:提供云端负载均衡,有多种负载均衡策略可供选择,可配合服务发现和断路器使用。 上一篇简单讲解了eureka的使用,这一篇文章基于上一篇的基础上,讲一下spring

    相关 Ribbon负载均衡

    1. 集中式负载均衡 > 在客户端和服务端之间使用独立的负载均衡设施(可以是硬件,如F5, 也可以是软件,如nginx、LVS等), 由该设施负责把访问请求通过某种策略转发