SpringCloud之Ribbon负载均衡策略

待我称王封你为后i 2023-02-23 15:25 94阅读 0赞

AbstractLoadBalancerRule

负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够再具体实现选择服务策略时,获取到一些负载均衡器中维护的信息作为分配依据,并以此设计一些算法针对特定场景的高校策略。

  1. public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
  2. private ILoadBalancer lb;
  3. @Override
  4. public void setLoadBalancer(ILoadBalancer lb){
  5. this.lb = lb;
  6. }
  7. @Override
  8. public ILoadBalancer getLoadBalancer(){
  9. return lb;
  10. }
  11. }

RandomRule

该策略实现了从服务实例清单中随机选择一个服务实例的功能。choose(ILoadBalancer lb, Object key)方法增加了一个负载均衡器对象的参数。通过负载均衡器获取可用实例列表upList和所有实例列表allList,并通过rand.nextInt(serverCount)获取一个随机数,并将随机数作为upList的索引值来返回具体实例。具体逻辑在一个while (server == null)循环内,正常情况下每次都会选择出一个服务实例,如果出现死循环获取不到服务实例时,则很可能存在并发的Bug。

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. return null;
  4. }
  5. Server server = null;
  6. while (server == null) {
  7. if (Thread.interrupted()) {
  8. return null;
  9. }
  10. List<Server> upList = lb.getReachableServers();
  11. List<Server> allList = lb.getAllServers();
  12. int serverCount = allList.size();
  13. if (serverCount == 0) {
  14. /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */
  15. return null;
  16. }
  17. int index = rand.nextInt(serverCount);
  18. server = upList.get(index);
  19. if (server == null) {
  20. /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */
  21. Thread.yield();
  22. continue;
  23. }
  24. if (server.isAlive()) {
  25. return (server);
  26. }
  27. // Shouldn't actually happen.. but must be transient or a bug.
  28. server = null;
  29. Thread.yield();
  30. }
  31. return server;
  32. }

RoundRobinRule

该策略实现了按照线性轮训的方式依次选择每个服务实例的功能。其详细结构和RandomRule非常类似。除了循环条件不同外,就是从可用列表中获取逻辑不通。循环条件增加了一个count计数变量,该变量会在每次循环之后累加,也就是说,如果一直选择不到server超过10次,那么就会结束尝试,并打印一个警告信息No available alive servers after 10 tries from load balancer:…。线性轮训的实现则是通过AtomicInteger nextServerCyclicCounter对象实现,每次进行实例选择时通过调用incrementAndGetModulo函数实现递增。

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. log.warn("no load balancer");
  4. return null;
  5. }
  6. Server server = null;
  7. int count = 0;
  8. while (server == null && count++ < 10) {
  9. List<Server> reachableServers = lb.getReachableServers();
  10. List<Server> allServers = lb.getAllServers();
  11. int upCount = reachableServers.size();
  12. int serverCount = allServers.size();
  13. if ((upCount == 0) || (serverCount == 0)) {
  14. log.warn("No up servers available from load balancer: " + lb);
  15. return null;
  16. }
  17. int nextServerIndex = incrementAndGetModulo(serverCount);
  18. server = allServers.get(nextServerIndex);
  19. if (server == null) {
  20. /* Transient. */
  21. Thread.yield();
  22. continue;
  23. }
  24. if (server.isAlive() && (server.isReadyToServe())) {
  25. return (server);
  26. }
  27. // Next.
  28. server = null;
  29. }
  30. if (count >= 10) {
  31. log.warn("No available alive servers after 10 tries from load balancer: "
  32. + lb);
  33. }
  34. return server;
  35. }

RetryRule

该策略实现了一个具备重试机制的实例选择功能。在其方法内部定义了一个IRule对象,默认使用了RoundRobinRule实例。而在choose方法中则实现了对内部定义的策略反复尝试的策略,如期间能够选择到具体服务实例就返回,如选择不到就根据设置的尝试结束时间为阈值,当超过该阈值后就返回null。

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. long requestTime = System.currentTimeMillis();
  3. long deadline = requestTime + maxRetryMillis;
  4. Server answer = null;
  5. answer = subRule.choose(key);
  6. if (((answer == null) || (!answer.isAlive()))
  7. && (System.currentTimeMillis() < deadline)) {
  8. InterruptTask task = new InterruptTask(deadline
  9. - System.currentTimeMillis());
  10. while (!Thread.interrupted()) {
  11. answer = subRule.choose(key);
  12. if (((answer == null) || (!answer.isAlive()))
  13. && (System.currentTimeMillis() < deadline)) {
  14. /* pause and retry hoping it's transient */
  15. Thread.yield();
  16. } else {
  17. break;
  18. }
  19. }
  20. task.cancel();
  21. }
  22. if ((answer == null) || (!answer.isAlive())) {
  23. return null;
  24. } else {
  25. return answer;
  26. }
  27. }

WeightedResponseTimeRule

该策略是对RoundRobinRule的扩展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,已达到更优的分配效果,它的实现主要有三个核心内容。

定时任务

WeightedResponseTimeRule策略在初始化的时候会通过serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,serverWeightTaskTimerInterval);启动一个定时任务,用来为每个服务实例计算权重,该任务默认30秒执行一次。

  1. class DynamicServerWeightTask extends TimerTask {
  2. public void run() {
  3. ServerWeight serverWeight = new ServerWeight();
  4. try {
  5. serverWeight.maintainWeights();
  6. } catch (Exception e) {
  7. logger.error("Error running DynamicServerWeightTask for {}", name, e);
  8. }
  9. }
  10. }
计算权重

在源码中我们可以找到用于存储权重的对象List accumulatedWeights = new ArrayList(),该List中每个权重值所处的位置对应了负载均衡器维护的服务实例清单中的所有实例在清单中的位置。

  1. public void maintainWeights() {
  2. ILoadBalancer lb = getLoadBalancer();
  3. if (lb == null) {
  4. return;
  5. }
  6. if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
  7. return;
  8. }
  9. try {
  10. logger.info("Weight adjusting job started");
  11. AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
  12. LoadBalancerStats stats = nlb.getLoadBalancerStats();
  13. if (stats == null) {
  14. // no statistics, nothing to do
  15. return;
  16. }
  17. double totalResponseTime = 0;
  18. // find maximal 95% response time
  19. for (Server server : nlb.getAllServers()) {
  20. // this will automatically load the stats if not in cache
  21. ServerStats ss = stats.getSingleServerStat(server);
  22. totalResponseTime += ss.getResponseTimeAvg();
  23. }
  24. // weight for each server is (sum of responseTime of all servers - responseTime)
  25. // so that the longer the response time, the less the weight and the less likely to be chosen
  26. Double weightSoFar = 0.0;
  27. // create new list and hot swap the reference
  28. List<Double> finalWeights = new ArrayList<Double>();
  29. for (Server server : nlb.getAllServers()) {
  30. ServerStats ss = stats.getSingleServerStat(server);
  31. double weight = totalResponseTime - ss.getResponseTimeAvg();
  32. weightSoFar += weight;
  33. finalWeights.add(weightSoFar);
  34. }
  35. setWeights(finalWeights);
  36. } catch (Exception e) {
  37. logger.error("Error calculating server weights", e);
  38. } finally {
  39. serverWeightAssignmentInProgress.set(false);
  40. }
  41. }

该函数的实现主要分为两个步骤:

  • 根据LoadBalancerStats中记录的每个服务实例的统计信息,累加所有实例的平均响应时间,得到总的平均响应时间totalResponseTime。
  • 为负载均衡器中维护的实例清单逐个计算权重,计算规则为weightSoFar+totalResponseTime - 实例的平均响应时间,其中weightSoFar初始值为0,并且计算好每一个权重需要累加到weightSoFar上供下一次计算使用。
    举个例子:
    现在用4个实例A、B、C、D,他们的平均响应时间为10、40、80、100,totalResponseTime=10+40+80+100=230,则实例A、B、C、D的权重分别如下:
    实例A:230-10=220
    实例B:220+(230-40)=410
    实例C:410+(230-80)=560
    实例D:560+(230-100)=690
    这里的权重只是表示了各个实例权重区间的上限,并非某个实例的优先级,所以不是数值越大被选中的概率就越大。以以上例子的计算结果为例,它实际上是为这4个实例构建了4个不同的区间,每个实例的区间下限是上个实例的区间上限,而每个实例的区间上限则是我们上面计算并存储于List accumulatedWeights中的权重值,第一个实例的下限为0。所以,根据上实例的权重计算结果,可以得到每个实例的权重区间。
    实例A:[0,220]
    实例B:(220,410]
    实例C:(410,560]
    实例D:(560,690)
    每个实例区间的宽度就是:总的平均响应时间-实例的平均响应时间,所以实例的响应时间越短,权重区间就越大,被选中的概率就越高。
实例选择
  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. return null;
  4. }
  5. Server server = null;
  6. while (server == null) {
  7. // get hold of the current reference in case it is changed from the other thread
  8. List<Double> currentWeights = accumulatedWeights;
  9. if (Thread.interrupted()) {
  10. return null;
  11. }
  12. List<Server> allList = lb.getAllServers();
  13. int serverCount = allList.size();
  14. if (serverCount == 0) {
  15. return null;
  16. }
  17. int serverIndex = 0;
  18. // last one in the list is the sum of all weights
  19. double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
  20. // No server has been hit yet and total weight is not initialized
  21. // fallback to use round robin
  22. if (maxTotalWeight < 0.001d) {
  23. server = super.choose(getLoadBalancer(), key);
  24. if(server == null) {
  25. return server;
  26. }
  27. } else {
  28. // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
  29. double randomWeight = random.nextDouble() * maxTotalWeight;
  30. // pick the server index based on the randomIndex
  31. int n = 0;
  32. for (Double d : currentWeights) {
  33. if (d >= randomWeight) {
  34. serverIndex = n;
  35. break;
  36. } else {
  37. n++;
  38. }
  39. }
  40. server = allList.get(serverIndex);
  41. }
  42. if (server == null) {
  43. /* Transient. */
  44. Thread.yield();
  45. continue;
  46. }
  47. if (server.isAlive()) {
  48. return (server);
  49. }
  50. // Next.
  51. server = null;
  52. }
  53. return server;
  54. }

选择实例的核心过程就两步:

  • 生成一个[0,最大权重值)区间内的随机数
  • 遍历权重列表,比较权重值与随机数的大小,如果权重值大于等于随机数,就拿当前权重列表的索引值去服务实例列表中获取具体的实例。

发表评论

表情:
评论列表 (有 0 条评论,94人围观)

还没有评论,来说两句吧...

相关阅读