cloud源码-Ribbon

曾经终败给现在 2022-10-01 09:59 222阅读 0赞

boot2.0.0.7release版本

什么是Ribbon
Ribbon是Netflix公司开源的一个负载均衡的项目,它属于上述的第二种,是一个客户端负载均衡器,运行在客户端上。它是一个经过了云端测试的IPC库,可以很好地控制HTTP和TCP客户端的一些行为。 Feign已经默认使用了Ribbon。

负载均衡
容错
多协议(HTTP,TCP,UDP)支持异步和反应模型
缓存和批处理

RestTemplate和Ribbon相结合

Ribbon在Netflix组件是非常重要的一个组件,在Zuul中使用Ribbon做负载均衡,以及Feign组件的结合等。在Spring Cloud 中,作为开发中,做的最多的可能是将RestTemplate和Ribbon相结合,你可能会这样写:

  1. @Configuration
  2. public class RibbonConfig {
  3. @Bean
  4. @LoadBalanced
  5. RestTemplate restTemplate() {
  6. return new RestTemplate();
  7. }
  8. }

消费另外一个的服务的接口,差不多是这样的:

  1. @Service
  2. public class RibbonService {
  3. @Autowired
  4. RestTemplate restTemplate;
  5. public String hi(String name) {
  6. return restTemplate.getForObject("http://eureka-client/hi?name="+name,String.class);
  7. }
  8. }

LoadBalancerClient

在Riibon中一个非常重要的组件为LoadBalancerClient,它作为负载均衡的一个客户端。它在spring-cloud-commons包下:
的LoadBalancerClient是一个接口,它继承ServiceInstanceChooser,它的实现类是RibbonLoadBalancerClient,这三者之间的关系如下图:
在这里插入图片描述
其中LoadBalancerClient接口,有如下三个方法,其中excute()为执行请求,reconstructURI()用来重构url:

  1. public interface LoadBalancerClient extends ServiceInstanceChooser {
  2. <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
  3. <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
  4. URI reconstructURI(ServiceInstance instance, URI original);
  5. }

ServiceInstanceChooser接口,主要有一个方法,用来根据serviceId来获取ServiceInstance,代码如下:

  1. public interface ServiceInstanceChooser {
  2. ServiceInstance choose(String serviceId);
  3. }

LoadBalancerClient的实现类为RibbonLoadBalancerClient,这个类是非常重要的一个类,最终的负载均衡的请求处理,由它来执行。它的部分源码如下:

  1. public class RibbonLoadBalancerClient implements LoadBalancerClient {
  2. ...//省略代码
  3. @Override
  4. public ServiceInstance choose(String serviceId) {
  5. Server server = getServer(serviceId);
  6. if (server == null) {
  7. return null;
  8. }
  9. return new RibbonServer(serviceId, server, isSecure(server, serviceId),
  10. serverIntrospector(serviceId).getMetadata(server));
  11. }
  12. protected Server getServer(String serviceId) {
  13. return getServer(getLoadBalancer(serviceId));
  14. }
  15. protected Server getServer(ILoadBalancer loadBalancer) {
  16. if (loadBalancer == null) {
  17. return null;
  18. }
  19. return loadBalancer.chooseServer("default"); // TODO: better handling of key
  20. }
  21. protected ILoadBalancer getLoadBalancer(String serviceId) {
  22. return this.clientFactory.getLoadBalancer(serviceId);
  23. }
  24. ...//省略代码

在RibbonLoadBalancerClient的源码中,其中choose()方法是选择具体服务实例的一个方法。该方法通过getServer()方法去获取实例,经过源码跟踪,最终交给了ILoadBalancer类去选择服务实例。

ILoadBalancer在ribbon-loadbalancer的jar包下,它是定义了实现软件负载均衡的一个接口,它需要一组可供选择的服务注册列表信息,以及根据特定方法去选择服务,它的源码如下 :

  1. public interface ILoadBalancer {
  2. public void addServers(List<Server> newServers);
  3. public Server chooseServer(Object key);
  4. public void markServerDown(Server server);
  5. public List<Server> getReachableServers();
  6. public List<Server> getAllServers();
  7. }

其中,addServers()方法是添加一个Server集合;chooseServer()方法是根据key去获取Server;markServerDown()方法用来标记某个服务下线;getReachableServers()获取可用的Server集合;getAllServers()获取所有的Server集合。

DynamicServerListLoadBalancer

DynamicServerListLoadBalancer实现了ILoadBalancer接口,是负载均衡的真正实现类,
它的继承类为BaseLoadBalancer,它的实现类为DynamicServerListLoadBalancer,这三者之间的关系如下:
在这里插入图片描述

构造器

  1. public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
  2. ServerList<T> serverList, ServerListFilter<T> filter,
  3. ServerListUpdater serverListUpdater) {
  4. super(clientConfig, rule, ping);
  5. this.serverListImpl = serverList;
  6. this.filter = filter;
  7. this.serverListUpdater = serverListUpdater;
  8. if (filter instanceof AbstractServerListFilter) {
  9. ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
  10. }
  11. restOfInit(clientConfig);
  12. }
  13. public DynamicServerListLoadBalancer() {
  14. super();
  15. }

查看上述三个类的源码,可用发现,配置以下信息,IClientConfig、IRule、IPing、ServerList、ServerListFilter和ILoadBalancer,查看BaseLoadBalancer类,它默认的情况下,实现了以下配置:

  1. public class BaseLoadBalancer extends AbstractLoadBalancer implements
  2. PrimeConnections.PrimeConnectionListener, IClientConfigAware {
  3. private static Logger logger = LoggerFactory
  4. .getLogger(BaseLoadBalancer.class);
  5. private final static IRule DEFAULT_RULE = new RoundRobinRule();
  6. private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
  7. private static final String DEFAULT_NAME = "default";
  8. private static final String PREFIX = "LoadBalancer_";
  9. protected IRule rule = DEFAULT_RULE;
  10. protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
  11. protected IPing ping = null;
  12. @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
  13. protected volatile List<Server> allServerList = Collections
  14. .synchronizedList(new ArrayList<Server>());
  15. @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
  16. protected volatile List<Server> upServerList = Collections
  17. .synchronizedList(new ArrayList<Server>());
  18. protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
  19. protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();
  20. protected String name = DEFAULT_NAME;
  21. protected Timer lbTimer = null;
  22. protected int pingIntervalSeconds = 10;
  23. protected int maxTotalPingTimeSeconds = 5;
  24. protected Comparator<Server> serverComparator = new ServerComparator();
  25. protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
  26. protected LoadBalancerStats lbStats;
  27. private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");
  28. private PrimeConnections primeConnections;
  29. private volatile boolean enablePrimingConnections = false;
  30. private IClientConfig config;
  31. private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();
  32. private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
  33. /**
  34. * Default constructor which sets name as "default", sets null ping, and
  35. * {@link RoundRobinRule} as the rule.
  36. * <p>
  37. * This constructor is mainly used by {@link ClientFactory}. Calling this
  38. * constructor must be followed by calling {@link #init()} or
  39. * {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
  40. * This constructor is provided for reflection. When constructing
  41. * programatically, it is recommended to use other constructors.
  42. */
  43. public BaseLoadBalancer() {
  44. this.name = DEFAULT_NAME;
  45. this.ping = null;
  46. setRule(DEFAULT_RULE);
  47. setupPingTask();
  48. lbStats = new LoadBalancerStats(DEFAULT_NAME);
  49. }

IClientConfig ribbonClientConfig: DefaultClientConfigImpl配置
IRule ribbonRule: RoundRobinRule 路由策略
IPing ribbonPing: DummyPing
ServerList ribbonServerList: ConfigurationBasedServerList
ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter
ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer
IClientConfig 用于对客户端或者负载均衡的配置,它的默认实现类为DefaultClientConfigImpl。

IRule

用于复杂均衡的策略,它有三个方法,其中choose()是根据key 来获取server,setLoadBalancer()和getLoadBalancer()是用来设置和获取ILoadBalancer的,它的源码如下:

  1. public interface IRule{
  2. public Server choose(Object key);
  3. public void setLoadBalancer(ILoadBalancer lb);
  4. public ILoadBalancer getLoadBalancer();
  5. }

IRule有很多默认的实现类,这些实现类根据不同的算法和逻辑来处理负载均衡。Ribbon实现的IRule有一下。在大多数情况下,这些默认的实现类是可以满足需求的,如果有特性的需求,可以自己实现。
在这里插入图片描述
BestAvailableRule 选择最小请求数

ClientConfigEnabledRoundRobinRule 轮询

RandomRule 随机选择一个server

RoundRobinRule 轮询选择server
原子类递增计数器,对服务器个数取模,取到返回,否则最多取10次

  1. public class RoundRobinRule extends AbstractLoadBalancerRule {
  2. private AtomicInteger nextServerCyclicCounter;
  3. private static final boolean AVAILABLE_ONLY_SERVERS = true;
  4. private static final boolean ALL_SERVERS = false;
  5. private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
  6. public RoundRobinRule() {
  7. nextServerCyclicCounter = new AtomicInteger(0);
  8. }
  9. public RoundRobinRule(ILoadBalancer lb) {
  10. this();
  11. setLoadBalancer(lb);
  12. }
  13. public Server choose(ILoadBalancer lb, Object key) {
  14. if (lb == null) {
  15. log.warn("no load balancer");
  16. return null;
  17. }
  18. Server server = null;
  19. int count = 0;
  20. while (server == null && count++ < 10) {
  21. List<Server> reachableServers = lb.getReachableServers();
  22. List<Server> allServers = lb.getAllServers();
  23. int upCount = reachableServers.size();
  24. int serverCount = allServers.size();
  25. if ((upCount == 0) || (serverCount == 0)) {
  26. log.warn("No up servers available from load balancer: " + lb);
  27. return null;
  28. }
  29. int nextServerIndex = incrementAndGetModulo(serverCount);
  30. server = allServers.get(nextServerIndex);
  31. if (server == null) {
  32. /* Transient. */
  33. Thread.yield();
  34. continue;
  35. }
  36. if (server.isAlive() && (server.isReadyToServe())) {
  37. return (server);
  38. }
  39. // Next.
  40. server = null;
  41. }
  42. if (count >= 10) {
  43. log.warn("No available alive servers after 10 tries from load balancer: "
  44. + lb);
  45. }
  46. return server;
  47. }
  48. /**
  49. * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
  50. *
  51. * @param modulo The modulo to bound the value of the counter.
  52. * @return The next value.
  53. */
  54. private int incrementAndGetModulo(int modulo) {
  55. for (;;) {
  56. int current = nextServerCyclicCounter.get();
  57. int next = (current + 1) % modulo;
  58. if (nextServerCyclicCounter.compareAndSet(current, next))
  59. return next;
  60. }
  61. }
  62. @Override
  63. public Server choose(Object key) {
  64. return choose(getLoadBalancer(), key);
  65. }
  66. @Override
  67. public void initWithNiwsConfig(IClientConfig clientConfig) {
  68. }
  69. }

RetryRule 根据轮询的方式重试

WeightedResponseTimeRule 根据响应时间去分配一个weight ,weight越低,被选择的可能性就越低

ZoneAvoidanceRule 根据server的zone区域和可用性来轮询选择

IPing

是用来想server发生”ping”,来判断该server是否有响应,从而判断该server是否可用。它有一个isAlive()方法,它的源码如下:

  1. public interface IPing {
  2. public boolean isAlive(Server server);
  3. }

IPing的实现类有PingUrl、PingConstant、NoOpPing、DummyPing和NIWSDiscoveryPing。它门之间的关系如下:
在这里插入图片描述

PingUrl 真实的去ping 某个url,判断其是否alive
PingConstant 固定返回某服务是否可用,默认返回true,即可用

  1. public class PingConstant implements IPing {
  2. boolean constant = true;
  3. public void setConstant(String constantStr) {
  4. constant = (constantStr != null) && (constantStr.toLowerCase().equals("true"));
  5. }
  6. public void setConstant(boolean constant) {
  7. this.constant = constant;
  8. }
  9. public boolean getConstant() {
  10. return constant;
  11. }
  12. public boolean isAlive(Server server) {
  13. return constant;
  14. }
  15. }

NoOpPing 不去ping,直接返回true,即可用。

  1. public class NoOpPing implements IPing {
  2. @Override
  3. public boolean isAlive(Server server) {
  4. return true;
  5. }
  6. }

DummyPing 直接返回true,并实现了initWithNiwsConfig方法。

  1. public class DummyPing extends AbstractLoadBalancerPing {
  2. public DummyPing() {
  3. }
  4. public boolean isAlive(Server server) {
  5. return true;
  6. }
  7. @Override
  8. public void initWithNiwsConfig(IClientConfig clientConfig) {
  9. }
  10. }

NIWSDiscoveryPing,根据DiscoveryEnabledServer的InstanceInfo的InstanceStatus去判断,如果为InstanceStatus.UP,则为可用,否则不可用。

ServerList

是定义获取所有的server的注册列表信息的接口,它的代码如下:

  1. public interface ServerList<T extends Server> {
  2. public List<T> getInitialListOfServers();
  3. public List<T> getUpdatedListOfServers();
  4. }

ServerListFilter接口,定于了可根据配置去过滤或者根据特性动态获取符合条件的server列表的方法,代码如下:

  1. public interface ServerListFilter<T extends Server> {
  2. public List<T> getFilteredListOfServers(List<T> servers);
  3. }

初始化配置

阅读DynamicServerListLoadBalancer的源码,DynamicServerListLoadBalancer的构造函数中有个initWithNiwsConfig()方法。在改方法中,经过一系列的初始化配置,最终执行了restOfInit()方法。其代码如下:

  1. public DynamicServerListLoadBalancer(IClientConfig clientConfig) {
  2. initWithNiwsConfig(clientConfig);
  3. }
  4. @Override
  5. public void initWithNiwsConfig(IClientConfig clientConfig) {
  6. try {
  7. super.initWithNiwsConfig(clientConfig);
  8. String niwsServerListClassName = clientConfig.getPropertyAsString(
  9. CommonClientConfigKey.NIWSServerListClassName,
  10. DefaultClientConfigImpl.DEFAULT_SEVER_LIST_CLASS);
  11. ServerList<T> niwsServerListImpl = (ServerList<T>) ClientFactory
  12. .instantiateInstanceWithClientConfig(niwsServerListClassName, clientConfig);
  13. this.serverListImpl = niwsServerListImpl;
  14. if (niwsServerListImpl instanceof AbstractServerList) {
  15. AbstractServerListFilter<T> niwsFilter = ((AbstractServerList) niwsServerListImpl)
  16. .getFilterImpl(clientConfig);
  17. niwsFilter.setLoadBalancerStats(getLoadBalancerStats());
  18. this.filter = niwsFilter;
  19. }
  20. String serverListUpdaterClassName = clientConfig.getPropertyAsString(
  21. CommonClientConfigKey.ServerListUpdaterClassName,
  22. DefaultClientConfigImpl.DEFAULT_SERVER_LIST_UPDATER_CLASS
  23. );
  24. this.serverListUpdater = (ServerListUpdater) ClientFactory
  25. .instantiateInstanceWithClientConfig(serverListUpdaterClassName, clientConfig);
  26. restOfInit(clientConfig);
  27. } catch (Exception e) {
  28. throw new RuntimeException(
  29. "Exception while initializing NIWSDiscoveryLoadBalancer:"
  30. + clientConfig.getClientName()
  31. + ", niwsClientConfig:" + clientConfig, e);
  32. }
  33. }

在restOfInit()方法上,有一个 updateListOfServers()的方法,该方法是用来获取所有的ServerList的。

  1. void restOfInit(IClientConfig clientConfig) {
  2. boolean primeConnection = this.isEnablePrimingConnections();
  3. // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
  4. this.setEnablePrimingConnections(false);
  5. enableAndInitLearnNewServersFeature();
  6. updateListOfServers();
  7. if (primeConnection && this.getPrimeConnections() != null) {
  8. this.getPrimeConnections()
  9. .primeConnections(getReachableServers());
  10. }
  11. this.setEnablePrimingConnections(primeConnection);
  12. LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
  13. }

进一步跟踪updateListOfServers()方法的源码,最终由serverListImpl.getUpdatedListOfServers()获取所有的服务列表的,代码如下:

  1. @VisibleForTesting
  2. public void updateListOfServers() {
  3. List<T> servers = new ArrayList<T>();
  4. if (serverListImpl != null) {
  5. servers = serverListImpl.getUpdatedListOfServers();
  6. LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
  7. getIdentifier(), servers);
  8. if (filter != null) {
  9. servers = filter.getFilteredListOfServers(servers);
  10. LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
  11. getIdentifier(), servers);
  12. }
  13. }
  14. updateAllServerList(servers);
  15. }

而serverListImpl是ServerList接口的具体实现类。跟踪代码,ServerList的实现类为DiscoveryEnabledNIWSServerList,在ribbon-eureka.jar的com.netflix.niws.loadbalancer下。其中DiscoveryEnabledNIWSServerList有 getInitialListOfServers()和getUpdatedListOfServers()方法,具体代码如下:

  1. @Override
  2. public List<DiscoveryEnabledServer> getInitialListOfServers(){
  3. return obtainServersViaDiscovery();
  4. }
  5. @Override
  6. public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
  7. return obtainServersViaDiscovery();
  8. }

继续跟踪源码,obtainServersViaDiscovery(),是根据eurekaClientProvider.get()来回去EurekaClient,再根据EurekaClient来获取注册列表信息,代码如下:

  1. private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
  2. List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
  3. if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
  4. logger.warn("EurekaClient has not been initialized yet, returning an empty list");
  5. return new ArrayList<DiscoveryEnabledServer>();
  6. }
  7. EurekaClient eurekaClient = eurekaClientProvider.get();
  8. if (vipAddresses!=null){
  9. for (String vipAddress : vipAddresses.split(",")) {
  10. // if targetRegion is null, it will be interpreted as the same region of client
  11. List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
  12. for (InstanceInfo ii : listOfInstanceInfo) {
  13. if (ii.getStatus().equals(InstanceStatus.UP)) {
  14. if(shouldUseOverridePort){
  15. if(logger.isDebugEnabled()){
  16. logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
  17. }
  18. // copy is necessary since the InstanceInfo builder just uses the original reference,
  19. // and we don't want to corrupt the global eureka copy of the object which may be
  20. // used by other clients in our system
  21. InstanceInfo copy = new InstanceInfo(ii);
  22. if(isSecure){
  23. ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
  24. }else{
  25. ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
  26. }
  27. }
  28. DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
  29. des.setZone(DiscoveryClient.getZone(ii));
  30. serverList.add(des);
  31. }
  32. }
  33. if (serverList.size()>0 && prioritizeVipAddressBasedServers){
  34. break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
  35. }
  36. }
  37. }
  38. return serverList;
  39. }

其中eurekaClientProvider的实现类是LegacyEurekaClientProvider,它是一个获取eurekaClient类,通过静态的方法去获取eurekaClient,其代码如下:

  1. class LegacyEurekaClientProvider implements Provider<EurekaClient> {
  2. private volatile EurekaClient eurekaClient;
  3. @Override
  4. public synchronized EurekaClient get() {
  5. if (eurekaClient == null) {
  6. eurekaClient = DiscoveryManager.getInstance().getDiscoveryClient();
  7. }
  8. return eurekaClient;
  9. }
  10. }

EurekaClient的实现类为DiscoveryClient,在之前已经分析了它具有服务注册、获取服务注册列表等的全部功能。

  1. public class DiscoveryClient implements EurekaClient {

由此可见,负载均衡器是从EurekaClient获取服务信息,并根据IRule去路由,并且根据IPing去判断服务的可用性。

那么现在还有个问题,负载均衡器多久一次去获取一次从Eureka Client获取注册信息呢。

获取服务

在BaseLoadBalancer类下,BaseLoadBalancer的构造函数,该构造函数开启了一个PingTask任务,代码如下:

  1. public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,
  2. IPing ping, IPingStrategy pingStrategy) {
  3. ...//代码省略
  4. setupPingTask();
  5. ...//代码省略
  6. }

setupPingTask()的具体代码逻辑,它开启了ShutdownEnabledTimer执行PingTask任务,在默认情况下pingIntervalSeconds为10,即每10秒钟,想EurekaClient发送一次”ping”。

  1. void setupPingTask() {
  2. if (canSkipPing()) {
  3. return;
  4. }
  5. if (lbTimer != null) {
  6. lbTimer.cancel();
  7. }
  8. lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
  9. true);
  10. lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
  11. forceQuickPing();
  12. }

PingTask源码,即new一个Pinger对象,并执行runPinger()方法。

  1. class PingTask extends TimerTask {
  2. public void run() {
  3. try {
  4. new Pinger(pingStrategy).runPinger();
  5. } catch (Exception e) {
  6. logger.error("LoadBalancer [{}]: Error pinging", name, e);
  7. }
  8. }
  9. }

查看Pinger的runPinger()方法,最终根据 pingerStrategy.pingServers(ping, allServers)来获取服务的可用性,如果该返回结果,如之前相同,则不去向EurekaClient获取注册列表,如果不同则通知ServerStatusChangeListener或者changeListeners发生了改变,进行更新或者重新拉取。

  1. public void runPinger() throws Exception {
  2. if (!pingInProgress.compareAndSet(false, true)) {
  3. return; // Ping in progress - nothing to do
  4. }
  5. // we are "in" - we get to Ping
  6. Server[] allServers = null;
  7. boolean[] results = null;
  8. Lock allLock = null;
  9. Lock upLock = null;
  10. try {
  11. /*
  12. * The readLock should be free unless an addServer operation is
  13. * going on...
  14. */
  15. allLock = allServerLock.readLock();
  16. allLock.lock();
  17. allServers = allServerList.toArray(new Server[allServerList.size()]);
  18. allLock.unlock();
  19. int numCandidates = allServers.length;
  20. results = pingerStrategy.pingServers(ping, allServers);
  21. final List<Server> newUpList = new ArrayList<Server>();
  22. final List<Server> changedServers = new ArrayList<Server>();
  23. for (int i = 0; i < numCandidates; i++) {
  24. boolean isAlive = results[i];
  25. Server svr = allServers[i];
  26. boolean oldIsAlive = svr.isAlive();
  27. svr.setAlive(isAlive);
  28. if (oldIsAlive != isAlive) {
  29. changedServers.add(svr);
  30. logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}",
  31. name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
  32. }
  33. if (isAlive) {
  34. newUpList.add(svr);
  35. }
  36. }
  37. upLock = upServerLock.writeLock();
  38. upLock.lock();
  39. upServerList = newUpList;
  40. upLock.unlock();
  41. notifyServerStatusChangeListener(changedServers);
  42. } finally {
  43. pingInProgress.set(false);
  44. }
  45. }

由此可见,LoadBalancerClient是在初始化的时候,会向Eureka回去服务注册列表,并且向通过10s一次向EurekaClient发送“ping”,来判断服务的可用性,如果服务的可用性发生了改变或者服务数量和之前的不一致,则更新或者重新拉取。LoadBalancerClient有了这些服务注册列表,就可以根据具体的IRule来进行负载均衡。

RestTemplate是如何和Ribbon结合的

最后,回答问题的本质,为什么在RestTemplate加一个@LoadBalance注解就可可以开启负载均衡呢?

  1. @LoadBalanced
  2. RestTemplate restTemplate() {
  3. return new RestTemplate();
  4. }

搜索@LoadBalanced有哪些类用到了LoadBalanced有哪些类用到了, 发现LoadBalancerAutoConfiguration类,即LoadBalancer自动配置类。

  1. @Configuration
  2. @ConditionalOnClass(RestTemplate.class)
  3. @ConditionalOnBean(LoadBalancerClient.class)
  4. @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
  5. public class LoadBalancerAutoConfiguration {
  6. @LoadBalanced
  7. @Autowired(required = false)
  8. private List<RestTemplate> restTemplates = Collections.emptyList();
  9. }
  10. @Bean
  11. public SmartInitializingSingleton loadBalancedRestTemplateInitializer(
  12. final List<RestTemplateCustomizer> customizers) {
  13. return new SmartInitializingSingleton() {
  14. @Override
  15. public void afterSingletonsInstantiated() {
  16. for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
  17. for (RestTemplateCustomizer customizer : customizers) {
  18. customizer.customize(restTemplate);
  19. }
  20. }
  21. }
  22. };
  23. }
  24. @Configuration
  25. @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
  26. static class LoadBalancerInterceptorConfig {
  27. @Bean
  28. public LoadBalancerInterceptor ribbonInterceptor(
  29. LoadBalancerClient loadBalancerClient,
  30. LoadBalancerRequestFactory requestFactory) {
  31. return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
  32. }
  33. @Bean
  34. @ConditionalOnMissingBean
  35. public RestTemplateCustomizer restTemplateCustomizer(
  36. final LoadBalancerInterceptor loadBalancerInterceptor) {
  37. return new RestTemplateCustomizer() {
  38. @Override
  39. public void customize(RestTemplate restTemplate) {
  40. List<ClientHttpRequestInterceptor> list = new ArrayList<>(
  41. restTemplate.getInterceptors());
  42. list.add(loadBalancerInterceptor);
  43. restTemplate.setInterceptors(list);
  44. }
  45. };
  46. }
  47. }
  48. }

在该类中,首先维护了一个被@LoadBalanced修饰的RestTemplate对象的List,在初始化的过程中,通过调用customizer.customize(restTemplate)方法来给RestTemplate增加拦截器LoadBalancerInterceptor。

而LoadBalancerInterceptor,用于实时拦截,在LoadBalancerInterceptor这里实现来负载均衡。LoadBalancerInterceptor的拦截方法如下:

  1. @Override
  2. public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
  3. final ClientHttpRequestExecution execution) throws IOException {
  4. final URI originalUri = request.getURI();
  5. String serviceName = originalUri.getHost();
  6. Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
  7. return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
  8. }

总结

综上所述,Ribbon的负载均衡,主要通过LoadBalancerClient来实现的,而LoadBalancerClient具体交给了ILoadBalancer来处理,ILoadBalancer通过配置IRule、IPing等信息,并向EurekaClient获取注册列表的信息,并默认10秒一次向EurekaClient发送“ping”,进而检查是否更新服务列表,最后,得到注册列表后,ILoadBalancer根据IRule的策略进行负载均衡。

而RestTemplate 被@LoadBalance注解后,能过用负载均衡,主要是维护了一个被@LoadBalance注解的RestTemplate列表,并给列表中的RestTemplate添加拦截器,进而交给负载均衡器去处理。

发表评论

表情:
评论列表 (有 0 条评论,222人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Ribbon分析

    Ribbon是Netflix公司开源的一个客户端负载均衡的项目,一般配合Eureka使用。不过为了降低其他干扰因素,专注于Ribbon,这一次我们脱离Eureka讲Ribbon

    相关 Ribbon解析

      SpringCloud中的Ribbon开源项目,提供了客户端的负载均衡算法。这篇文章,我们来介绍下他是如何实现的。为了方便理解,我们以客户端调用的流程来