3.SpringCloud -- 服务调用、负载均衡 Ribbon、OpenFeign

深藏阁楼爱情的钟 2022-09-11 00:22 340阅读 0赞

3.SpringCloud — 服务调用、负载均衡 Ribbon、OpenFeign

  • 一、引入 服务调用、负载均衡
    • 1.1 问题 与 解决
  • 二、服务调用、负载均衡 — Ribbon
    • 2.1 什么是 Ribbon?
    • 2.2 Ribbon 与 Nginx 负载均衡区别
    • 2.3 更换 Ribbon 负载均衡规则(两种方式)
    • 2.4 轮询原理(RoundRobinRule)
      • 5、手写一个轮询算法
      • (1)说明
      • (2)相关代码
  • 三、补充知识
    • 3.1 CAS
      • (1)什么是 CAS?
      • (2)原子性
    • 3.2 Atomic 类底层原理
      • (1)Atomic 常用类有哪些?
      • (2)底层原理
      • (3)以 AtomicInteger 为例。
      • (4)缺点:
      • (5)ABA 再现
      • (6)ABA 解决
    • 3.3 自旋锁(SpinLock)
      • (1)什么是自旋锁?
      • (2)手写一个自旋锁
  • 四、服务调用、负载均衡 — Feign、OpenFeign
    • 4.1 什么是 Feign、 OpenFeign ?
    • 4.2 Feign 集成了 Ribbon
    • 4.3 使用 OpenFeign
      • (1)说明
      • (2)创建项目 eureka_client_consumer_9006
      • (3)编写接口,绑定服务。
      • (4)编写 controller
      • (5)启动服务,并测试。
      • (6)启动失败时的错误
      • (7)自定义负载均衡策略
    • 4.4 Feign 超时控制 以及 日志打印
      • (1)超时控制
      • (2)日志打印
  • SpringCloud (一)— 从单体架构到微服务架构、代码拆分(maven 聚合)

  • SpringCloud (二)— 服务注册中心 Eureka、Zookeeper、Consul、Nacos
  • SpringCloud (三)— 服务调用、负载均衡 Ribbon、OpenFeign
  • SpringCloud (四)— 服务降级、熔断 Hystrix、Sentinel
  • SpringCloud (五)— 配置中心 Config、消息总线 Bus、链路追踪 Sleuth、配置中心 Nacos
  • SpringCloud (六)— 注册中心与配置中心 Nacos、网关 Gateway

一、引入 服务调用、负载均衡

1.1 问题 与 解决

  1. 【问题:】
  2. 在上一篇中,介绍了 EurekaZookeeperConsul 作为注册中心,并使用 RestTemplate 进行服务调用。 详见:https://www.cnblogs.com/l-y-h/p/14193443.html
  3. 那么是如何进行负载均衡的呢?
  4. 【解决:】
  5. @Bean 声明 RestTemplate 时,添加一个 @LoadBalanced,并使用 注册中心中 的服务名 作为 RestTemplate URL 地址(ip、端口号)。
  6. 就这么简单的两步,即可实现了负载均衡。
  7. 那么这里面又涉及到什么技术知识呢?能不能更换负载均衡策略?能不能自定义负载均衡策略?
  8. 常用技术:
  9. Ribbon(维护状态,替代产品为 Loadbalancer
  10. OpenFeign(推荐使用)
  11. 【说明:】
  12. 此处以 Eureka 伪集群版创建的几个模块作为演示。代码地址:https://github.com/lyh-man/SpringCloudDemo
  13. 服务注册中心:eureka_server_7001eureka_server_7002eureka_server_7003
  14. 服务提供者:eureka_client_producer_8002eureka_client_producer_8003eureka_client_producer_8004
  15. 服务消费者:eureka_client_consumer_9002
  16. 注:
  17. 主要还是在 服务消费者 上配置负载均衡策略(可以 Debug 模式启动看看执行流程),其他模块直接启动即可。

img

二、服务调用、负载均衡 – Ribbon

2.1 什么是 Ribbon?

  1. Ribbon:】
  2. Ribbon Netflix 公司实现的一套基于 HTTPTCP 的客户端负载均衡的工具。
  3. SpringCloud 已将其集成到 spring-cloud-netflix 中,实现 SpringCloud 的服务调用、负载均衡。
  4. Ribbon 提供了多种方式进行负载均衡(默认轮询),也可以自定义负载均衡方法。
  5. 注:
  6. Ribbon 虽然已进入维护模式,但是一时半会还不容易被完全淘汰,还是可以学习一下基本使用的。
  7. Ribbon 替代产品是 Loadbalancer
  8. 【相关网址:】
  9. https://github.com/Netflix/ribbon
  10. http://jvm123.com/doc/springcloud/index.html#spring-cloud-ribbon

img

2.2 Ribbon 与 Nginx 负载均衡区别

  1. 【负载均衡(Load Balance):】
  2. 负载均衡指的是 将工作任务 按照某种规则 平均分摊到 多个操作单元上执行。
  3. 注:
  4. Web 项目的负载均衡,可以理解为:将用户请求 平均分摊到 多个服务器上处理,从而提高系统的并发度、可用性。
  5. 【负载均衡分类:】
  6. 按照软硬件划分:
  7. 硬件负载均衡: 一般造价昂贵,但数据传输更加稳定。比如: F5 负载均衡。
  8. 软件负载均衡: 一般采用某个代理组件,并使用 某种 负载均衡 算法实现(一种消息队列分发机制)。比如:NginxRibbon
  9. 按照负载均衡位置划分:
  10. 集中式负载均衡:提供一个 独立的 负载均衡系统(可以是软件,比如:Nginx,可以是硬件,比如:F5)。
  11. 通过此系统,将服务消费者的 请求 通过某种负载均衡策略 转发给 服务提供者。
  12. 客户端负载均衡(进程式负载均衡):将负载均衡逻辑整合到 服务消费者中,服务消费者 定时同步获取到 服务提供者信息,并保存在本地。
  13. 每次均从本地缓存中取得 服务提供者信息,并根据 某种负载均衡策略 将请求发给 服务提供者。
  14. 注:
  15. 使用集中式负载均衡时,服务消费者 不知道 任何一个服务提供者的信息,只知道独立负载均衡设备的信息。
  16. 使用客户端负载均衡时,服务消费者 知道 所有服务提供者的信息。
  17. Nginx 负载均衡:】
  18. Nginx 实现的是 集中式负载均衡,Nginx 接收 客户端所有请求,并将请求转发到不同的服务器进行处理。
  19. Ribbon 负载均衡:】
  20. Ribbon 实现的是 客户端负载均衡,从注册中心获得服务信息并缓存在本地,在本地进行 负载均衡。

2.3 更换 Ribbon 负载均衡规则(两种方式)

(1)引入依赖

  1. 【依赖:】
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
  5. </dependency>

一般使用 Ribbon 时需要引入上述依赖,但是对于 eureka 来说,其 eureka-client 依赖中已经集成了 ribbon 依赖,所以无需再次引入。

img

(2)Ribbon 提供的几种负载均衡算法
  Ribbon 提供了 IRule 接口,通过其可以设置并更换负载均衡规则。
  IRule 实质就是 根据某种负载均衡规则,从服务列表中选取一个需要访问的服务。
  一般默认使用 ZoneAvoidanceRule + RoundRobinRule。

  1. IRule 子类如下:】
  2. RoundRobinRule
  3. 轮询,按照服务列表顺序 循环选择服务。
  4. RandomRule
  5. 随机,随机的从服务列表中选取服务。
  6. RetryRule
  7. 重试,先按照轮询策略获取服务,若获取失败,则在指定时间进行重试,重新获取可用服务。
  8. WeightedResponseTimeRule
  9. 加权响应时间,响应时间越低(即响应时间快),权重越高,越容易被选择。刚开始启动时,使用轮询策略。
  10. BestAvailableRule
  11. 高可用,先过滤掉不可用服务(多次访问故障而处于断路器跳闸的服务),选择一个并发量最小的服务。
  12. AvailabilityFilteringRule
  13. 可用筛选,先过滤掉不可用服务 以及 并发量超过阈值的服务,对剩余服务按轮询策略访问。
  14. ZoneAvoidanceRule
  15. 区域回避,默认规则,综合判断服务所在区域的性能 以及 服务的可用性,过滤结果后采用轮询的方式选择结果。
  16. IRule:】
  17. package com.netflix.loadbalancer;
  18. public interface IRule {
  19. Server choose(Object var1);
  20. void setLoadBalancer(ILoadBalancer var1);
  21. ILoadBalancer getLoadBalancer();
  22. }

img

以 Dubug 模式 启动 eureka_client_consumer_9002,并在 IRule 接口 实现类的 choose() 方法上打上断点,发送请求时,将会进入断点,此时可以看到执行的 负载均衡规则。

img

不停的刷新页面,可以看到请求以轮询的方式被 服务提供者 处理。

img

(3)替换负载均衡规则(方式一:新建配置类)

  1. 【步骤一:】
  2. 新建一个配置类(该类不能被 @ComponentScan 扫描到,即不能与 启动类 在同一个包下),并定义规则。
  3. 比如:
  4. package com.lyh.springcloud.customize;
  5. import com.netflix.loadbalancer.IRule;
  6. import com.netflix.loadbalancer.RandomRule;
  7. import org.springframework.context.annotation.Bean;
  8. import org.springframework.context.annotation.Configuration;
  9. @Configuration
  10. public class CustomizeLoadBalanceRule {
  11. @Bean
  12. public IRule customizeRule() {
  13. return new RandomRule();
  14. }
  15. }
  16. 【步骤二:】
  17. 在启动类上添加 @RibbonClient 注解,并指定服务名 以及 规则。
  18. 比如:
  19. @RibbonClient(name = "EUREKA-CLIENT-PRODUCER", configuration = CustomizeLoadBalanceRule.class)

img

img

img

不停的刷新页面,可以看到请求以随机的方式被 服务提供者 处理,而非轮询。

img

(4)替换负载均衡规则(方式二:修改配置文件)
  在服务消费者 配置文件中 根据 服务提供者 服务名,
  通过 ribbon.NFLoadBalancerRuleClassName 指定负载均衡策略。
注:
  在后面的 OpenFeign 的使用中进行演示。

  1. 【举例:】
  2. EUREKA-CLIENT-PRODUCER: # 服务提供者的服务名
  3. ribbon:
  4. NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

2.4 轮询原理(RoundRobinRule)

(1)相关源码
  主要就是 choose()、incrementAndGetModulo() 这两个方法。
  choose() 用于选择 server 服务。
  incrementAndGetModulo() 用来决定 选择哪个服务,返回服务下标。

  1. public Server choose(ILoadBalancer lb, Object key) {
  2. if (lb == null) {
  3. log.warn("no load balancer");
  4. return null;
  5. }
  6. Server server = null;
  7. int count = 0;
  8. while (server == null && count++ < 10) {
  9. List<Server> reachableServers = lb.getReachableServers();
  10. List<Server> allServers = lb.getAllServers();
  11. int upCount = reachableServers.size();
  12. int serverCount = allServers.size();
  13. if ((upCount == 0) || (serverCount == 0)) {
  14. log.warn("No up servers available from load balancer: " + lb);
  15. return null;
  16. }
  17. int nextServerIndex = incrementAndGetModulo(serverCount);
  18. server = allServers.get(nextServerIndex);
  19. if (server == null) {
  20. /* Transient. */
  21. Thread.yield();
  22. continue;
  23. }
  24. if (server.isAlive() && (server.isReadyToServe())) {
  25. return (server);
  26. }
  27. // Next.
  28. server = null;
  29. }
  30. if (count >= 10) {
  31. log.warn("No available alive servers after 10 tries from load balancer: "
  32. + lb);
  33. }
  34. return server;
  35. }
  36. private AtomicInteger nextServerCyclicCounter;
  37. private int incrementAndGetModulo(int modulo) {
  38. for (;;) {
  39. int current = nextServerCyclicCounter.get();
  40. int next = (current + 1) % modulo;
  41. if (nextServerCyclicCounter.compareAndSet(current, next))
  42. return next;
  43. }
  44. }

(2)代码分析

  1. choose():】
  2. 初始进入 choose() 方法,server null 表示服务不存在,count 0 表示属于尝试第一次获取服务。
  3. 进入 while 循环后,退出条件为 server 不为 null(即找到服务) 或者 count 大于等于 10 (即尝试了 10 次仍未找到服务)。
  4. 获取 server 的核心在于获取 服务的下标,即 int nextServerIndex = incrementAndGetModulo(serverCount);
  5. incrementAndGetModulo()】
  6. 核心就是 自旋 CAS 并取模。
  7. modulo 表示服务器总数,current 表示当前服务下标,next 表示下一个服务下标。
  8. compareAndSet() CAS 实现,如果 内存中的值 current 相同,那么将内存中值改为 next,并返回 true,否则返回 false
  9. compareAndSet 失败后,会不停的执行循环 以获取 最新的 current
  10. 注:
  11. 自旋、CAS 后面会讲到。CAS 保证原子性。
  12. 其实就是一个公式: 第几次请求 % 服务器总数量 = 实际调用服务器下标位置

5、手写一个轮询算法

(1)说明

  1. 【说明:】
  2. 创建一个与 eureka_client_consumer_9002 模块类似的模块 eureka_client_consumer_9005
  3. 修改配置文件,并去除 @LoadBalanced 注解(避免引起误解)。
  4. 自己实现一个轮询算法(与 RoundRobinRule 类似)。
  5. 注:
  6. 此处在 controller 中定义一个接口,用于测试 轮询的功能(仅供参考,可以继承 AbstractLoadBalancerRule,自行构造一个负载均衡类)。
  7. 去除 @LoadBalanced 注解后,访问调用 RestTemplate 请求的接口时会报错(用于区分)。

(2)相关代码

模块创建此处省略(需要修改 pom.xml,配置文件)。
  详情请见上篇博客:https://www.cnblogs.com/l-y-h/p/14193443.html\#\_label2\_3

面向接口编程,此处新建一个 LoadBalacner 接口,用于定义抽象方法(返回服务信息)。
  并定义一个 LoadBalacner 接口的实现类 LoadBalancerImpl。
  在 controller 中编写接口(服务发现),测试一下。

  1. LoadBalacner
  2. package com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance;
  3. import org.springframework.cloud.client.ServiceInstance;
  4. import java.util.List;
  5. public interface LoadBalacner {
  6. /** * 从服务实例列表中获取出 服务实例 */
  7. ServiceInstance getInstances(List<ServiceInstance> serviceInstances);
  8. }
  9. LoadBalancerImpl
  10. package com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.impl;
  11. import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner;
  12. import org.springframework.cloud.client.ServiceInstance;
  13. import org.springframework.stereotype.Component;
  14. import java.util.List;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16. @Component
  17. public class LoadBalancerImpl implements LoadBalacner {
  18. private AtomicInteger atomicInteger = new AtomicInteger();
  19. @Override
  20. public ServiceInstance getInstances(List<ServiceInstance> serviceInstances) {
  21. if (serviceInstances == null || serviceInstances.size() == 0) {
  22. return null;
  23. }
  24. return serviceInstances.get(incrementAndGetModulo(serviceInstances.size()));
  25. }
  26. public int incrementAndGetModulo(int count) {
  27. int current = 0;
  28. int next = 0;
  29. do {
  30. current = atomicInteger.get();
  31. next = (current >= Integer.MAX_VALUE ? 0 : current + 1) % count;
  32. } while (!atomicInteger.compareAndSet(current, next));
  33. return next;
  34. }
  35. }
  36. ConsumerController
  37. package com.lyh.springcloud.eureka_client_consumer_9005.controller;
  38. import com.lyh.springcloud.common.tools.Result;
  39. import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner;
  40. import com.lyh.springcloud.eureka_client_consumer_9005.entity.User;
  41. import org.springframework.beans.factory.annotation.Autowired;
  42. import org.springframework.cloud.client.ServiceInstance;
  43. import org.springframework.cloud.client.discovery.DiscoveryClient;
  44. import org.springframework.web.bind.annotation.*;
  45. import org.springframework.web.client.RestTemplate;
  46. @RestController
  47. @RequestMapping("/consumer/user")
  48. public class ConsumerController {
  49. // 注意,此处 url 写死的,仅用于演示,实际项目中不能这么干。
  50. // public static final String PRODUCER_URL = "http://localhost:8001/producer/";
  51. // 通过服务名 找到 Eureka 注册中心真实访问的 地址
  52. public static final String PRODUCER_URL = "http://EUREKA-CLIENT-PRODUCER";
  53. @Autowired
  54. private RestTemplate restTemplate;
  55. @Autowired
  56. private DiscoveryClient discoveryClient;
  57. @Autowired
  58. private LoadBalacner loadBalancer;
  59. @GetMapping("/loadBalance")
  60. public Result testLoadBalance() {
  61. ServiceInstance serviceInstance = loadBalancer.getInstances(discoveryClient.getInstances("EUREKA-CLIENT-PRODUCER"));
  62. if (serviceInstance == null) {
  63. return Result.error().message("服务不存在");
  64. }
  65. return Result.ok().data("HostAndPort", serviceInstance.getHost() + ":" + serviceInstance.getPort());
  66. }
  67. @GetMapping("/get/{id}")
  68. public Result getUser(@PathVariable Integer id) {
  69. return restTemplate.getForObject(PRODUCER_URL + "/producer/user/get/" + id, Result.class);
  70. }
  71. @PostMapping("/create")
  72. public Result createUser(@RequestBody User user) {
  73. return restTemplate.postForObject(PRODUCER_URL + "/producer/user/create", user, Result.class);
  74. }
  75. }

img

img

img

三、补充知识

3.1 CAS

(1)什么是 CAS?

  1. CAS:】
  2. CAS Compare And Swap 的缩写,即 比较交换。
  3. 是一种无锁算法,在不加锁的情况下实现多线程之间变量同步,从而保证数据的原子性。
  4. 属于硬件层面对并发操作的支持(CPU 原语)。
  5. 注:
  6. 原子性:一个操作或多个操作要么全部执行且执行过程中不会被其他因素打断,要么全部不执行。
  7. 原语:指的是若干条指令组成的程序段,实现特定的功能,执行过程中不能被中断(也即原子性)。
  8. 【基本流程:】
  9. CAS 操作包含三个操作数 —— 内存值(V)、预期原值(A)和新值(B)。
  10. 如果内存里面的值 V A 的值是一样的,那么就将内存里面的值更新成 B
  11. V A 不一致,则不操作(某些情况下,可以通过自旋操作,不断尝试修改数据直至成功修改)。
  12. V = V == A ? B : V;
  13. 或者
  14. for(;;) {
  15. V = getV();
  16. if (V == A) {
  17. V = B;
  18. break;
  19. }
  20. }
  21. 【缺点:(详见下面的 Atomic 类底层原理)】
  22. 会出现 ABA 问题(两次读取数据时值相同,但不确定值是否被修改过)。
  23. 使用自旋(死循环)CAS 时会占用系统资源、影响执行效率。
  24. 每次只能对一个共享变量进行原子操作。

(2)原子性

  1. 【说明:】
  2. 初始 i = 0,现有 10 个线程,分别执行 i++ 10000次,若不对 i++ 做任何限制,那么最终执行结果一般都是小于 100000 的。
  3. 因为 AB 执行 i++ 操作时,彼此会相互干扰,也即不能保证原子性。
  4. 【如何保证原子性:】
  5. 可以给 i++ 操作加上 synchronized 进行同步控制,从而保证操作按照顺序执行、互不干扰。
  6. 也可以使用 Atomic 相关类进行操作(核心是 自旋 CAS 操作 volatile 变量)。
  7. 注:
  8. synchronized JDK1.6 之前,属于重量级锁,属于悲观锁的一种(在操作锁变量前就给对象加锁,而不管对象是否发生资源竞争),性能较差。
  9. JDK1.6 之后,对 synchronized 进行了优化,引入了 偏向锁、轻量级锁、采用 CAS 思想,提升了效率。
  10. CAS synchronized 比较:】
  11. CAS
  12. CAS 属于无锁算法,可以支持多个线程并发修改,并发度高。
  13. CAS 每次只支持一个共享变量进行原子操作。
  14. CAS 会出现 ABA 问题。
  15. synchronized
  16. synchronized 一次只能允许一个线程修改,并发度低。
  17. synchronized 可以对多个共享变量进行原子操作。

【举例:】

  1. package com.lyh.tree;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. public class Test {
  4. private int count = 0;
  5. private int count2 = 0;
  6. private AtomicInteger count3 = new AtomicInteger(0);
  7. /** * 普通方法 */
  8. public void increment() {
  9. count++;
  10. }
  11. /** * 使用 synchronized 修饰的方法 */
  12. public synchronized void increment2() {
  13. count2++;
  14. }
  15. /** * 使用 atomic 类的方法 */
  16. public void increment3() {
  17. count3.getAndIncrement();
  18. }
  19. public static void main(String[] args) {
  20. // 实例化一个对象
  21. Test test = new Test();
  22. // 创建 10 个线程
  23. for (int i = 0; i < 10; i++) {
  24. // 每个线程内部均 执行 10000 次 三种 i++ 操作
  25. new Thread(() -> {
  26. for (int j = 0; j < 10000; j++) {
  27. // 普通方法 i++
  28. test.increment();
  29. // 添加 synchronized 关键字后的 i++
  30. test.increment2();
  31. // 使用 atomic 类的 i++
  32. test.increment3();
  33. }
  34. }, "thread-" + "i").start();
  35. }
  36. // 等待 1 秒,确保上面线程可以执行完毕
  37. try {
  38. Thread.sleep(1000);
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. // 输出三种 i++ 的最终结果
  43. System.out.println("普通方法 i++ 操作后最终值 i = " + test.count);
  44. System.out.println("添加 synchronized 关键字后的 i++ 操作后最终值 i = " + test.count2);
  45. System.out.println("使用 atomic 类的 i++ 操作后最终值 i = " + test.count3);
  46. }
  47. }

img

3.2 Atomic 类底层原理

(1)Atomic 常用类有哪些?

  1. Atomic:】
  2. Atomic 类存放于 java.util.concurrent.atomic 包下,用于提供对变量的原子操作(保证变量操作的原子性)。
  3. 【常用类:】
  4. 操作基本类型的 Atomic 类(提供了对 booleanintlong 类型的原子操作):
  5. AtomicBoolean
  6. AtomicInteger
  7. AtomicLong
  8. 操作引用类型的 Atomic 类(提供了引用类型的原子操作):
  9. AtomicReference
  10. AtomicStampedReference
  11. AtomicMarkableReference
  12. 注:
  13. AtomicStampedReferenceAtomicMarkableReference 以版本号、标记的方式解决 ABA 问题。
  14. 操作数组的 Atomic 类(提供了数组的原子操作):
  15. AtomicIntegerArray
  16. AtomicLongArray
  17. AtomicReferenceArray

img

(2)底层原理

  1. 【底层原理:】
  2. 一句话概括:Atomic 类基于 Unsafe 以及 (自旋) CAS 操作 volatile 变量实现的。
  3. 核心点:
  4. Unsafe 提供 native 方法,用于操作内存
  5. valueOffset 变量 指的是变量在内存中的地址。
  6. volatile 变量 指的是共享变量(保证操作可见性)。
  7. CAS CPU 原语(保证操作原子性)。
  8. Unsafe:】
  9. Unsafe 存放于 JDK 源码 rt.jar sun.misc 包下。
  10. 内部提供了一系列 native 方法,用于与操作系统交互(可以操作特定内存中的数据)。
  11. 注:
  12. Unsafe 属于 CAS 核心类,Java 无法直接访问底层操作系统,需要通过 native 方法进行操作,而 Unsafe 就是为此存在的。
  13. volatile 变量:】
  14. 使用 volatile 修改变量,保证数据在多线程之间的可见性(一个线程修改数据后,其余线程均能知道修改后的数据)。
  15. valueOffset 变量:】
  16. valueOffset 变量 表示共享变量在内存中的偏移地址,Unsafe 根据此地址获取 共享变量在内存中的值。

img

(3)以 AtomicInteger 为例。

compareAndSet() 直接调用 CAS 进行比较。
  getAndIncrement() 使用 自旋 CAS 进行比较。

  1. AtomicInteger 类:】
  2. /** * 直接调用 Unsafe 的 CAS 操作。 * 根据 this 以及 valueOffset 获取到内存中的值 V,expect 为期望值 A,如果 V == A,则将 V 值改为 update,否则不操作。 * * this 表示当前对象 * valueOffset 表示共享变量在内存的偏移地址 * expect 表示期望值 * update 表示更新值 */
  3. public final boolean compareAndSet(int expect, int update) {
  4. return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
  5. }
  6. /** * 以原子方式递增当前 value。 * 调用 Unsafe 的 getAndAddInt() 进行操作,即 自旋 CAS 操作。 * * this 表示当前对象 * valueOffset 表示共享变量在内存的偏移地址 * 1 表示每次增加值为 1 */
  7. public final int getAndIncrement() {
  8. return unsafe.getAndAddInt(this, valueOffset, 1);
  9. }
  10. Unsafe 类:】
  11. /** * CAS 原语操作。 * 各变量含义参考上面 AtomicInteger compareAndSet() 的注释。 */
  12. public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
  13. /** * 自旋 CAS 操作。 * var1 表示当前对象。 * var2 表示共享变量在内存的偏移地址 * var4 表示共享变量每次递增的值 */
  14. public final int getAndAddInt(Object var1, long var2, int var4) {
  15. int var5;
  16. do {
  17. // 先根据偏移地址,获取到 内存中存储的值
  18. var5 = this.getIntVolatile(var1, var2);
  19. // 然后死循环(自旋)CAS 判断。
  20. // 根据偏移地址再去取一次内存中的值 与 已经取得的值进行比较,相同则加上需要增加的值。
  21. // 不同,则 CAS 失败,也即 while 条件为 true,再进行下一次比较。
  22. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  23. return var5;
  24. }

(4)缺点:

  1. CAS 缺点:】
  2. 会出现 ABA 问题(两次读取数据时值相同,但不确定值是否被修改过)。
  3. 自旋(死循环)CAS 会占用系统资源、影响执行效率。
  4. 每次只能对一个共享变量进行原子操作。
  5. 【对于自旋 CAS:】
  6. 在上面 Unsafe 类的 getAndAddInt() 方法中,可以看到一个 do-while 循环。
  7. 存在这么一种情况:while 条件中 CAS 一直失败,也即循环一直持续(死循环),
  8. 此时将会占用 CPU 资源不断执行循环,影响执行效率。
  9. 【对于 ABA 问题:】
  10. CAS 算法是从内存中取出某时刻的值并与当前期望值进行比较,从内存中取出的值就可能存在问题。
  11. 存在这么一种情况:线程 A、线程 B 同时操作内存值 V,线程 A 由于某种原因停顿了一下,而线程 B 先将值 V 改为 K,然后又将 K 改为 V
  12. 此时 A 再次获取的值仍是 V,然后 A 进行 CAS 比较成功。
  13. 虽然 A 两次获取的值是同一个值,但是这个值中间是发生过变化的,也即此时 A 执行 CAS 不应该成功,这就是 ABA 问题。
  14. 为了解决ABA问题,可以在对象中额外再增加一个标记来标识对象是否有过变更,当且仅当 标记 预期标记位 也相同时,CAS 才可以执行成功。
  15. 比如:
  16. AtomicStampedReference 或者 AtomicMarkableReference 类。

(5)ABA 再现

使用 AtomicReference 再现 ABA 问题。

  1. 【问题再现:】
  2. package com.lyh.tree;
  3. import java.util.concurrent.atomic.AtomicReference;
  4. public class Test {
  5. public static void main(String[] args) {
  6. // 使用引用类型原子类(Integer 在缓存中 -128 ~ 127 表示的是同一个值,超出此范围,则会自动创建一个新的 Integer,即地址不同)
  7. AtomicReference<Integer> atomicInteger = new AtomicReference<>(100);
  8. System.out.println("原值为: " + atomicInteger.get());
  9. // 创建线程 A,执行两次值修改操作
  10. new Thread(() -> {
  11. // 第一次从 100 改为 127
  12. atomicInteger.compareAndSet(100, 127);
  13. System.out.println("第一次修改后,值为: " + atomicInteger.get());
  14. // 第二次从 127 改为 100
  15. atomicInteger.compareAndSet(127, 100);
  16. System.out.println("第二次修改后,值为: " + atomicInteger.get());
  17. }, "thread-A").start();
  18. // 创建线程 B,等待 1 秒,使线程 A 执行完毕,然后再执行值修改操作
  19. new Thread(() -> {
  20. try {
  21. Thread.sleep(1000);
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. // 第三次从 100 改为 128
  26. atomicInteger.compareAndSet(100, 128);
  27. System.out.println("第三次修改后,值为: " + atomicInteger.get());
  28. // 第四次,由于 128 超过 Integer 缓存范围,会自动创建一个新的 Integer,此时期望值 与 内存中 值地址不同,也即 CAS 失败
  29. atomicInteger.compareAndSet(128, -128);
  30. System.out.println("最终值为: " + atomicInteger.get());
  31. }, "thread-B").start();
  32. }
  33. }
  34. 【结果:】
  35. 原值为: 100
  36. 第一次修改后,值为: 127
  37. 第二次修改后,值为: 100
  38. 第三次修改后,值为: 128
  39. 最终值为: 128

img

(6)ABA 解决

使用 AtomicStampedReference 解决 ABA 问题。
  新增了标记位,用于判断当前值是否发生过变化。

  1. package com.lyh.tree;
  2. import java.util.concurrent.atomic.AtomicStampedReference;
  3. public class Test2 {
  4. public static void main(String[] args) {
  5. AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(100, 1);
  6. System.out.println("原值为: " + atomicInteger.getReference() + " , 标记为: " + atomicInteger.getStamp());
  7. int stamp = atomicInteger.getStamp();
  8. new Thread(() -> {
  9. atomicInteger.compareAndSet(100, 127, stamp, stamp + 1);
  10. System.out.println("第一次修改后,值为: " + atomicInteger.getReference() + " , 标记为: " + atomicInteger.getStamp());
  11. atomicInteger.compareAndSet(127, 100, stamp + 1, stamp + 2);
  12. System.out.println("第二次修改后,值为: " + atomicInteger.getReference() + " , 标记为: " + atomicInteger.getStamp());
  13. }, "thread-A").start();
  14. new Thread(() -> {
  15. try {
  16. Thread.sleep(1000);
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. atomicInteger.compareAndSet(100, 128, stamp, stamp + 1);
  21. System.out.println("最终值为: " + atomicInteger.getReference() + " , 标记为: " + atomicInteger.getStamp());
  22. }, "thread-B").start();
  23. }
  24. }

img

3.3 自旋锁(SpinLock)

(1)什么是自旋锁?

  1. 【自旋锁:】
  2. 指的是当一个线程尝试去获取锁时,
  3. 若此时锁已经被其他线程获取,那么当前线程将采用 循环 的方式不断的去尝试获取锁。
  4. 当循环执行的某次操作成功获取锁时,将退出循环。
  5. 【优点:】
  6. 减少了线程上下文切换带来的消耗。
  7. 【缺点:】
  8. 循环会占用 CPU 资源,若长时间获取不到锁,那么相当于一个死循环在执行。
  9. 【举例:】
  10. 前面介绍的 Unsafe 类中 getAndAddInt() 即为 自旋锁实现,自旋 CAS 进行值的递增。
  11. public final int getAndAddInt(Object var1, long var2, int var4) {
  12. int var5;
  13. do {
  14. var5 = this.getIntVolatile(var1, var2);
  15. } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  16. return var5;
  17. }

(2)手写一个自旋锁

通过 CAS 操作作为 循环(自旋)条件。

  1. SpinLockDemo
  2. package com.lyh.tree;
  3. import java.util.concurrent.atomic.AtomicReference;
  4. /** * 演示自旋锁(类似于 Unsafe 类中的 getAndAddInt() 方法)。 * * 通过 CAS 操作作为自旋锁条件,A 线程先获取锁,并占用锁时间为 3 秒, * B 线程获取锁,发现锁被 A 占用,B 则执行 循环 等待。 * A 线程释放锁后,B 在某次循环中 CAS 成功,即 B 获取到锁,然后释放。 */
  5. public class SpinLockDemo {
  6. private AtomicReference<Thread> atomicReference = new AtomicReference<>();
  7. /** * 获取锁 */
  8. public void lock() {
  9. // 获取当前线程
  10. Thread thread = Thread.currentThread();
  11. System.out.println("当前执行 lock 的线程为: " + thread.getName());
  12. System.out.println("线程 " + thread.getName() + " 尝试获取锁");
  13. Long start = System.currentTimeMillis();
  14. // 自旋 CAS,当值为 null 时,CAS 成功,此时锁被获取。
  15. // 当值不为 null 时,CAS 失败,不断执行循环直至值为 null。
  16. while(!atomicReference.compareAndSet(null, thread)) {
  17. }
  18. Long end = System.currentTimeMillis();
  19. System.out.println("线程 " + thread.getName() + " 成功获取到锁, 尝试获取时间为: " + ((end - start) / 1000) + "秒");
  20. }
  21. /** * 释放锁 */
  22. public void unLock() {
  23. // 获取当前线程
  24. Thread thread = Thread.currentThread();
  25. System.out.println("当前执行 unLock 的线程为: " + thread.getName());
  26. // 释放锁
  27. atomicReference.compareAndSet(thread, null);
  28. }
  29. public static void main(String[] args) {
  30. SpinLockDemo spinLockDemo = new SpinLockDemo();
  31. // 创建线程 A
  32. new Thread(() -> {
  33. // A 获取锁
  34. spinLockDemo.lock();
  35. // A 占用锁 3 秒
  36. try {
  37. Thread.sleep(3000);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. // A 释放锁
  42. spinLockDemo.unLock();
  43. }, "thread-A").start();
  44. // 等待 1 秒,确保 A 线程先执行
  45. try {
  46. Thread.sleep(1000);
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. // 创建线程 B
  51. new Thread(() -> {
  52. // B 获取锁
  53. spinLockDemo.lock();
  54. // B 释放锁
  55. spinLockDemo.unLock();
  56. }, "thread-B").start();
  57. }
  58. }
  59. 【输出结果:】
  60. 当前执行 lock 的线程为: thread-A
  61. 线程 thread-A 尝试获取锁
  62. 线程 thread-A 成功获取到锁, 尝试获取时间为: 0
  63. 当前执行 lock 的线程为: thread-B
  64. 线程 thread-B 尝试获取锁
  65. 当前执行 unLock 的线程为: thread-A
  66. 线程 thread-B 成功获取到锁, 尝试获取时间为: 2
  67. 当前执行 unLock 的线程为: thread-B

img

四、服务调用、负载均衡 – Feign、OpenFeign

4.1 什么是 Feign、 OpenFeign ?

  1. Feign:】
  2. Feign 是一个声明式 web service 客户端,使用 Feign 使得编写 Java HTTP 客户端更容易。
  3. SpringCloud 组件中 Feign 作为一个轻量级 Restful 风格的 HTTP 服务客户端,内置了 Ribbon,提供客户端的负载均衡。
  4. 使用 Feign 注解定义接口,调用该接口即可访问 服务。
  5. OpenFeign:】
  6. SpringCloud 组件中 Open Feign 是在 Feign 基础上支持了 SpringMVC 注解。
  7. 通过 @FeignClient 注解可以解析 SpringMVC @RequestMapping 等注解下的接口,并通过动态代理的方式产生实现类,在实现类中做负载均衡、服务调用。
  8. 【相关地址:】
  9. https://cloud.spring.io/spring-cloud-openfeign/reference/html/
  10. https://github.com/OpenFeign/feign

img

4.2 Feign 集成了 Ribbon

  1. 前面使用 Ribbon + RestTemplate 实现 负载均衡 以及 服务调用时,
  2. 流程如下:
  3. Step1:在配置类中通过 @Bean 声明一下 RestTemplate 类,再添加上 @LoadBalanced 注解。
  4. Step2:在 controller 中注入 RestTemplate
  5. Step3:使用 RestTemplate 根据服务名发送 HTTP 请求。
  6. 而不同的模块若要调用服务(一个接口可能会被多个模块调用),则每次都要进行 Step1Step2Step3
  7. 实际开发中,若不进行处理,对于编码、维护都是一件麻烦的事情。
  8. Feign 就是在 Ribbon 基础上做了进一步封装,只需要创建一个接口,并使用注解的方式进行配置,
  9. 即可完成 接口 服务提供方接口的绑定。通过自定义的接口即可完成 服务调用。
  10. 注:
  11. 类似于在 Dao 层接口上添加 @Mapper
  12. Feign 在接口上添加 @FeignClient,并配置服务名。

4.3 使用 OpenFeign

(1)说明

  1. 【说明:】
  2. 创建一个空模块 eureka_client_consumer_9006
  3. 用于测试 OpenFeign 的使用。
  4. 注:
  5. 创建流程此处省略,详细可参考上一篇博客:https://www.cnblogs.com/l-y-h/p/14193443.html#_label2_3

(2)创建项目 eureka_client_consumer_9006

创建 eureka_client_consumer_9006 模块,修改 父工程以及当前工程 pom.xml 文件。
  修改配置类。

  1. 【依赖:】
  2. <dependency>
  3. <groupId>org.springframework.cloud</groupId>
  4. <artifactId>spring-cloud-starter-openfeign</artifactId>
  5. </dependency>
  6. application.yml
  7. server:
  8. port: 9006
  9. spring:
  10. application:
  11. name: eureka-client-consumer
  12. eureka:
  13. instance:
  14. appname: eureka-client-consumer # 优先级比 spring.application.name 高
  15. instance-id: eureka-client-consumer-instance3 # 设置当前实例 ID
  16. hostname: eureka.client.consumer.9006 # 设置主机名
  17. client:
  18. register-with-eureka: true # 默认为 true,注册到 注册中心
  19. fetch-registry: true # 默认为 true,从注册中心 获取 注册信息
  20. service-url:
  21. # 指向 注册中心 地址,注册到 集群所有的 注册中心。
  22. defaultZone: http://eureka.server.7001.com:7001/eureka,http://eureka.server.7002.com:7002/eureka,http://eureka.server.7003.com:7003/eureka

img

img

(3)编写接口,绑定服务。

通过 @Component 注解,将该接口交给 Spring 管理(用于 @Autowired 注入)。
  通过 @FeignClient 注解配置 服务名,并编写方法,用于绑定需要访问的接口。

  1. ProducerFeignService
  2. package com.lyh.springcloud.eureka_client_consumer_9006.service;
  3. import com.lyh.springcloud.common.tools.Result;
  4. import org.springframework.cloud.openfeign.FeignClient;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. @FeignClient(value = "EUREKA-CLIENT-PRODUCER")
  9. @Component
  10. public interface ProducerFeignService {
  11. @GetMapping("/producer/user/get/{id}")
  12. Result getUser(@PathVariable Integer id);
  13. }

img

(4)编写 controller

  1. ConsumerController
  2. package com.lyh.springcloud.eureka_client_consumer_9006.controller;
  3. import com.lyh.springcloud.common.tools.Result;
  4. import com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.PathVariable;
  8. import org.springframework.web.bind.annotation.RequestMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10. @RestController
  11. @RequestMapping("/consumer/user")
  12. public class ConsumerController {
  13. @Autowired
  14. private ProducerFeignService producerFeignService;
  15. @GetMapping("/get/{id}")
  16. public Result getUser(@PathVariable Integer id) {
  17. return producerFeignService.getUser(id);
  18. }
  19. }

img

(5)启动服务,并测试。

按顺序依次启动,Eureka Server 以及 Producer。
  在 eureka_client_consumer_9006 启动类上添加 @EnableFeignClients 注解,并启动。
  效果与使用 ribbon 时相同(默认 ZoneAvoidanceRule 负载均衡规则)。

img

img

(6)启动失败时的错误

  1. 【错误:】
  2. Description:
  3. Field producerFeignService in com.lyh.springcloud.eureka_client_consumer_9006.controller.ConsumerController required a bean of type 'com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService' that could not be found. The injection point has the following annotations:
  4. - @org.springframework.beans.factory.annotation.Autowired(required=true)
  5. Action:
  6. Consider defining a bean of type 'com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService' in your configuration.
  7. 【解决:】
  8. 在启动类上添加 @EnableFeignClients 注解。

img

(7)自定义负载均衡策略

Feign 集成了 Ribbon,所以 Ribbon 自定义负载均衡策略也适用于 Feign。
  此处使用配置文件的方式进行自定义负载均衡。
  在服务调用方的配置文件中,根据 服务提供者的 服务名,进行 ribbon 负载均衡策略更换。

  1. 【自定义负载均衡策略:】
  2. EUREKA-CLIENT-PRODUCER:
  3. ribbon:
  4. NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
  5. application.yml
  6. server:
  7. port: 9006
  8. spring:
  9. application:
  10. name: eureka-client-consumer
  11. eureka:
  12. instance:
  13. appname: eureka-client-consumer # 优先级比 spring.application.name 高
  14. instance-id: eureka-client-consumer-instance3 # 设置当前实例 ID
  15. hostname: eureka.client.consumer.9006 # 设置主机名
  16. client:
  17. register-with-eureka: true # 默认为 true,注册到 注册中心
  18. fetch-registry: true # 默认为 true,从注册中心 获取 注册信息
  19. service-url:
  20. # 指向 注册中心 地址,注册到 集群所有的 注册中心。
  21. defaultZone: http://eureka.server.7001.com:7001/eureka,http://eureka.server.7002.com:7002/eureka,http://eureka.server.7003.com:7003/eureka
  22. EUREKA-CLIENT-PRODUCER:
  23. ribbon:
  24. NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

img

img

4.4 Feign 超时控制 以及 日志打印

(1)超时控制

  1. 【说明:】
  2. OpenFeign 默认请求等待时间为 1 秒钟,即 若服务器超过 1 秒仍未返回处理结果,那么 OpenFeign 将认为调用出错。
  3. 有时服务器业务处理时间需要超过 1 秒,此时直接报错是不合适的。
  4. 为了避免这样的问题,可以根据实际情况 适当设置 Feign 客户端的超时控制。
  5. 【演示:】
  6. 服务提供者 eureka_client_producer_8004 提供一个接口,并在内部暂停 3 秒,用于模拟业务处理时间。
  7. 服务消费者 eureka_client_consumer_9006 中调用并访问接口,用于测试 是否会出错。
  8. 在配置文件中 配置超时控制后,再次查看是否出错。
  9. application.yml
  10. # 设置 OpenFeign 超时时间(OpenFeign 默认支持 Ribbon)
  11. ribbon:
  12. # 指的是建立连接所用的超时时间
  13. ConnectTimeout: 3000
  14. # 指的是建立连接后从服务器获取资源的超时时间(即请求处理的超时时间)
  15. ReadTimeout: 4000

img

img

img

img

(2)日志打印

  1. 【说明:】
  2. Feign 提供了日志打印功能,通过配置调整日志级别,可以方便了解 Feign 中请求调用的细节。
  3. 日志级别:
  4. NONE:默认,不显示任何日志。
  5. BASIC 仅记录请求方法、URL、响应状态以及执行时间。
  6. HEADERS:包含 BASIC、请求头信息、响应头信息。
  7. FULL:包含 HEADERS、请求数据、响应数据。

Step1:
  配置 日志级别(Logger.Level)。
注:
  import feign.Logger。不要导错包了。

  1. package com.lyh.springcloud.eureka_client_consumer_9006.config;
  2. import feign.Logger;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class FeignConfig {
  7. /** * 配置 Feign 日志级别 */
  8. @Bean
  9. Logger.Level feignLoggerLever() {
  10. return Logger.Level.FULL;
  11. }
  12. }

img

Step2:
  配置需要打印日志的接口。

  1. application.yml
  2. logging:
  3. level:
  4. com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService: debug

img

Step3:
  启动服务,并调用服务,可以在控制台看到日志信息。

img

发表评论

表情:
评论列表 (有 0 条评论,340人围观)

还没有评论,来说两句吧...

相关阅读