dubbo2.6源码-负载均衡

秒速五厘米 2021-12-10 05:37 329阅读 0赞

生产者集群启动时,才会使用负载均衡,否则跳过

  1. <!--
  2. use dubbo protocol to export service on port 20880
  3. 修改端口,即可启动集群,此时负载均衡才会使用
  4. -->
  5. <dubbo:protocol name="dubbo" port="20881"/>

LoadBalance

  1. @SPI(RandomLoadBalance.NAME)
  2. public interface LoadBalance {
  3. /**
  4. * select one invoker in list.
  5. *
  6. * @param invokers invokers.
  7. * @param url refer url
  8. * @param invocation invocation.
  9. * @return selected invoker.
  10. */
  11. @Adaptive("loadbalance")
  12. <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
  13. }

AbstractLoadBalance

  1. public abstract class AbstractLoadBalance implements LoadBalance {
  2. static int calculateWarmupWeight(int uptime, int warmup, int weight) {
  3. int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
  4. return ww < 1 ? 1 : (ww > weight ? weight : ww);
  5. }
  6. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  7. if (invokers == null || invokers.isEmpty())
  8. return null;
  9. if (invokers.size() == 1)
  10. return invokers.get(0);
  11. return doSelect(invokers, url, invocation);
  12. }
  13. protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
  14. protected int getWeight(Invoker<?> invoker, Invocation invocation) {
  15. int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
  16. if (weight > 0) {
  17. long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
  18. if (timestamp > 0L) {
  19. int uptime = (int) (System.currentTimeMillis() - timestamp);
  20. int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
  21. if (uptime > 0 && uptime < warmup) {
  22. weight = calculateWarmupWeight(uptime, warmup, weight);
  23. }
  24. }
  25. }
  26. return weight;
  27. }
  28. }

四种负载均衡策略

RandomLoadBalance随机加权,默认

  1. public class RandomLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "random";
  3. private final Random random = new Random();
  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  5. int length = invokers.size(); // Number of invokers
  6. int totalWeight = 0; // The sum of weights
  7. boolean sameWeight = true; // Every invoker has the same weight?
  8. for (int i = 0; i < length; i++) {
  9. int weight = getWeight(invokers.get(i), invocation);
  10. totalWeight += weight; // Sum
  11. if (sameWeight && i > 0
  12. && weight != getWeight(invokers.get(i - 1), invocation)) {
  13. sameWeight = false;
  14. }
  15. }
  16. if (totalWeight > 0 && !sameWeight) {
  17. // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
  18. int offset = random.nextInt(totalWeight);
  19. // Return a invoker based on the random value.
  20. for (int i = 0; i < length; i++) {
  21. offset -= getWeight(invokers.get(i), invocation);
  22. if (offset < 0) {
  23. return invokers.get(i);
  24. }
  25. }
  26. }
  27. // If all invokers have the same weight value or totalWeight=0, return evenly.
  28. return invokers.get(random.nextInt(length));
  29. }

生产者改变权重

  1. <dubbo:provider delay="-1" timeout="1000000" retries="0" weight="80"/>

默认权重值时100,

我们现在假设集群有四个节点分别对应的权重为{A:1,B:2,C:3,D:4},分别将权重套入到代码中进行分析,该随机算法按总权重进行加权随机,A节点负载请求的概率为1/(1+2+3+4),依次类推,B,C,D负载的请求概率分别是20%,30%,40%。在这种方式下,用户可以根据机器的实际性能动态调整权重比率,如果发现机器D负载过大,请求堆积过多,通过调整权重可以缓解机器D处理请求的压力。

轮询加权

修改负载均衡策略

  1. <dubbo:reference id="userService" check="false" interface="com.study.dubbo.userapi.service.UserService" loadbalance="roundrobin"/>

消费者添加 loadbalance=“roundrobin”,就是类中Name常量

RoundRobinLoadBalance

  1. public class RoundRobinLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "roundrobin";
  3. private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();
  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  5. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  6. int length = invokers.size(); // Number of invokers
  7. int maxWeight = 0; // The maximum weight
  8. int minWeight = Integer.MAX_VALUE; // The minimum weight
  9. final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
  10. int weightSum = 0;
  11. for (int i = 0; i < length; i++) {
  12. int weight = getWeight(invokers.get(i), invocation);
  13. maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight
  14. minWeight = Math.min(minWeight, weight); // Choose the minimum weight
  15. if (weight > 0) {
  16. invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
  17. weightSum += weight;
  18. }
  19. }
  20. AtomicPositiveInteger sequence = sequences.get(key);
  21. if (sequence == null) {
  22. sequences.putIfAbsent(key, new AtomicPositiveInteger());
  23. sequence = sequences.get(key);
  24. }
  25. int currentSequence = sequence.getAndIncrement();
  26. if (maxWeight > 0 && minWeight < maxWeight) {
  27. int mod = currentSequence % weightSum;
  28. for (int i = 0; i < maxWeight; i++) {
  29. for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
  30. final Invoker<T> k = each.getKey();
  31. final IntegerWrapper v = each.getValue();
  32. if (mod == 0 && v.getValue() > 0) {
  33. return k;
  34. }
  35. if (v.getValue() > 0) {
  36. v.decrement();
  37. mod--;
  38. }
  39. }
  40. }
  41. }
  42. // Round robin
  43. return invokers.get(currentSequence % length);
  44. }
  45. private static final class IntegerWrapper {
  46. private int value;
  47. public IntegerWrapper(int value) {
  48. this.value = value;
  49. }
  50. public int getValue() {
  51. return value;
  52. }
  53. public void setValue(int value) {
  54. this.value = value;
  55. }
  56. public void decrement() {
  57. this.value--;
  58. }
  59. }
  60. }

最少调用

LeastActiveLoadBalance

  1. public class LeastActiveLoadBalance extends AbstractLoadBalance {
  2. public static final String NAME = "leastactive";
  3. private final Random random = new Random();
  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  5. int length = invokers.size(); // Number of invokers
  6. int leastActive = -1; // The least active value of all invokers
  7. int leastCount = 0; // The number of invokers having the same least active value (leastActive)
  8. int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
  9. int totalWeight = 0; // The sum of weights
  10. int firstWeight = 0; // Initial value, used for comparision
  11. boolean sameWeight = true; // Every invoker has the same weight value?
  12. for (int i = 0; i < length; i++) {
  13. Invoker<T> invoker = invokers.get(i);
  14. int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
  15. int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
  16. if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.
  17. leastActive = active; // Record the current least active value
  18. leastCount = 1; // Reset leastCount, count again based on current leastCount
  19. leastIndexs[0] = i; // Reset
  20. totalWeight = weight; // Reset
  21. firstWeight = weight; // Record the weight the first invoker
  22. sameWeight = true; // Reset, every invoker has the same weight value?
  23. } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.
  24. leastIndexs[leastCount++] = i; // Record index number of this invoker
  25. totalWeight += weight; // Add this invoker's weight to totalWeight.
  26. // If every invoker has the same weight?
  27. if (sameWeight && i > 0
  28. && weight != firstWeight) {
  29. sameWeight = false;
  30. }
  31. }
  32. }
  33. // assert(leastCount > 0)
  34. if (leastCount == 1) {
  35. // If we got exactly one invoker having the least active value, return this invoker directly.
  36. return invokers.get(leastIndexs[0]);
  37. }
  38. if (!sameWeight && totalWeight > 0) {
  39. // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
  40. int offsetWeight = random.nextInt(totalWeight);
  41. // Return a invoker based on the random value.
  42. for (int i = 0; i < leastCount; i++) {
  43. int leastIndex = leastIndexs[i];
  44. offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
  45. if (offsetWeight <= 0)
  46. return invokers.get(leastIndex);
  47. }
  48. }
  49. // If all invokers have the same weight value or totalWeight=0, return evenly.
  50. return invokers.get(leastIndexs[random.nextInt(leastCount)]);
  51. }
  52. }

一致hash

使用consistenthash

  1. <dubbo:reference id="userService" check="false" interface="com.study.dubbo.userapi.service.UserService" loadbalance="consistenthash"/>
  2. public class ConsistentHashLoadBalance extends AbstractLoadBalance {
  3. private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
  4. @SuppressWarnings("unchecked")
  5. @Override
  6. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  7. String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
  8. int identityHashCode = System.identityHashCode(invokers);
  9. ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
  10. if (selector == null || selector.identityHashCode != identityHashCode) {
  11. selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
  12. selector = (ConsistentHashSelector<T>) selectors.get(key);
  13. }
  14. return selector.select(invocation);
  15. }
  16. private static final class ConsistentHashSelector<T> {
  17. private final TreeMap<Long, Invoker<T>> virtualInvokers;
  18. private final int replicaNumber;
  19. private final int identityHashCode;
  20. private final int[] argumentIndex;
  21. ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
  22. this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
  23. this.identityHashCode = identityHashCode;
  24. URL url = invokers.get(0).getUrl();
  25. this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
  26. String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
  27. argumentIndex = new int[index.length];
  28. for (int i = 0; i < index.length; i++) {
  29. argumentIndex[i] = Integer.parseInt(index[i]);
  30. }
  31. for (Invoker<T> invoker : invokers) {
  32. String address = invoker.getUrl().getAddress();
  33. for (int i = 0; i < replicaNumber / 4; i++) {
  34. byte[] digest = md5(address + i);
  35. for (int h = 0; h < 4; h++) {
  36. long m = hash(digest, h);
  37. virtualInvokers.put(m, invoker);
  38. }
  39. }
  40. }
  41. }
  42. public Invoker<T> select(Invocation invocation) {
  43. String key = toKey(invocation.getArguments());
  44. byte[] digest = md5(key);
  45. return selectForKey(hash(digest, 0));
  46. }
  47. private String toKey(Object[] args) {
  48. StringBuilder buf = new StringBuilder();
  49. for (int i : argumentIndex) {
  50. if (i >= 0 && i < args.length) {
  51. buf.append(args[i]);
  52. }
  53. }
  54. return buf.toString();
  55. }
  56. private Invoker<T> selectForKey(long hash) {
  57. Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
  58. if (entry == null) {
  59. entry = virtualInvokers.firstEntry();
  60. }
  61. return entry.getValue();
  62. }
  63. private long hash(byte[] digest, int number) {
  64. return (((long) (digest[3 + number * 4] & 0xFF) << 24)
  65. | ((long) (digest[2 + number * 4] & 0xFF) << 16)
  66. | ((long) (digest[1 + number * 4] & 0xFF) << 8)
  67. | (digest[number * 4] & 0xFF))
  68. & 0xFFFFFFFFL;
  69. }
  70. private byte[] md5(String value) {
  71. MessageDigest md5;
  72. try {
  73. md5 = MessageDigest.getInstance("MD5");
  74. } catch (NoSuchAlgorithmException e) {
  75. throw new IllegalStateException(e.getMessage(), e);
  76. }
  77. md5.reset();
  78. byte[] bytes;
  79. try {
  80. bytes = value.getBytes("UTF-8");
  81. } catch (UnsupportedEncodingException e) {
  82. throw new IllegalStateException(e.getMessage(), e);
  83. }
  84. md5.update(bytes);
  85. return md5.digest();
  86. }
  87. }
  88. }

构造函数中,每个实际的提供者均有160个(默认值,可调整)虚拟节点,每个提供者对应的虚拟节点将平均散列到哈希环上,当有请求时,先计算该请求参数对应的哈希值,然后顺时针寻找最近的虚拟节点,得到实际的提供者节点。

ConsistentHashLoadBalance: 一致性Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

发表评论

表情:
评论列表 (有 0 条评论,329人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Dubbo - 负载均衡

    前言 负载均衡,英文名称为Load Balance,其含义就是指将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行。 例如:在Dubbo中,同一个服务有多个服务提

    相关 Dubbo-负载均衡

    切勿将负载均衡策略写死在代码里,将来我们可以用控制台来进行控制。 负载均衡 在集群负载均衡时,Dubbo 提供了多种均衡策略,缺省为 `random` 随机调