SpringCloud之Ribbon负载均衡策略
AbstractLoadBalancerRule
负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够再具体实现选择服务策略时,获取到一些负载均衡器中维护的信息作为分配依据,并以此设计一些算法针对特定场景的高校策略。
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
RandomRule
该策略实现了从服务实例清单中随机选择一个服务实例的功能。choose(ILoadBalancer lb, Object key)方法增加了一个负载均衡器对象的参数。通过负载均衡器获取可用实例列表upList和所有实例列表allList,并通过rand.nextInt(serverCount)获取一个随机数,并将随机数作为upList的索引值来返回具体实例。具体逻辑在一个while (server == null)循环内,正常情况下每次都会选择出一个服务实例,如果出现死循环获取不到服务实例时,则很可能存在并发的Bug。
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
/* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */
return null;
}
int index = rand.nextInt(serverCount);
server = upList.get(index);
if (server == null) {
/* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}
return server;
}
RoundRobinRule
该策略实现了按照线性轮训的方式依次选择每个服务实例的功能。其详细结构和RandomRule非常类似。除了循环条件不同外,就是从可用列表中获取逻辑不通。循环条件增加了一个count计数变量,该变量会在每次循环之后累加,也就是说,如果一直选择不到server超过10次,那么就会结束尝试,并打印一个警告信息No available alive servers after 10 tries from load balancer:…。线性轮训的实现则是通过AtomicInteger nextServerCyclicCounter对象实现,每次进行实例选择时通过调用incrementAndGetModulo函数实现递增。
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
RetryRule
该策略实现了一个具备重试机制的实例选择功能。在其方法内部定义了一个IRule对象,默认使用了RoundRobinRule实例。而在choose方法中则实现了对内部定义的策略反复尝试的策略,如期间能够选择到具体服务实例就返回,如选择不到就根据设置的尝试结束时间为阈值,当超过该阈值后就返回null。
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
Server answer = null;
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
WeightedResponseTimeRule
该策略是对RoundRobinRule的扩展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,已达到更优的分配效果,它的实现主要有三个核心内容。
定时任务
WeightedResponseTimeRule策略在初始化的时候会通过serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,serverWeightTaskTimerInterval);启动一个定时任务,用来为每个服务实例计算权重,该任务默认30秒执行一次。
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
计算权重
在源码中我们可以找到用于存储权重的对象List
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;
// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
该函数的实现主要分为两个步骤:
- 根据LoadBalancerStats中记录的每个服务实例的统计信息,累加所有实例的平均响应时间,得到总的平均响应时间totalResponseTime。
- 为负载均衡器中维护的实例清单逐个计算权重,计算规则为weightSoFar+totalResponseTime - 实例的平均响应时间,其中weightSoFar初始值为0,并且计算好每一个权重需要累加到weightSoFar上供下一次计算使用。
举个例子:
现在用4个实例A、B、C、D,他们的平均响应时间为10、40、80、100,totalResponseTime=10+40+80+100=230,则实例A、B、C、D的权重分别如下:
实例A:230-10=220
实例B:220+(230-40)=410
实例C:410+(230-80)=560
实例D:560+(230-100)=690
这里的权重只是表示了各个实例权重区间的上限,并非某个实例的优先级,所以不是数值越大被选中的概率就越大。以以上例子的计算结果为例,它实际上是为这4个实例构建了4个不同的区间,每个实例的区间下限是上个实例的区间上限,而每个实例的区间上限则是我们上面计算并存储于List accumulatedWeights中的权重值,第一个实例的下限为0。所以,根据上实例的权重计算结果,可以得到每个实例的权重区间。
实例A:[0,220]
实例B:(220,410]
实例C:(410,560]
实例D:(560,690)
每个实例区间的宽度就是:总的平均响应时间-实例的平均响应时间,所以实例的响应时间越短,权重区间就越大,被选中的概率就越高。
实例选择
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
选择实例的核心过程就两步:
- 生成一个[0,最大权重值)区间内的随机数
- 遍历权重列表,比较权重值与随机数的大小,如果权重值大于等于随机数,就拿当前权重列表的索引值去服务实例列表中获取具体的实例。
还没有评论,来说两句吧...