手写一个简易版断路器(hystrix)

以你之姓@ 2022-10-10 11:09 192阅读 0赞

理论基础

根据熔断器模式(Circuit Breaker Pattern)

在这里插入图片描述
共有上面几种状态

  • Closed:默认状态,Circuit Breaker 内部维护着最近的失败次数(failure count)。每操作失败一次,失败次数就会加一。当失败次数达到设定的阈值,就会进入Open状态
  • Open: 操作不会执行,会立即失败。Circuit Breaker 内部维护一个计时器。当时间达到设定的阈值,就会从Open状态进入到Half-Open状态
  • Half-Open: 只要操作失败一次,立即进入Open状态。否则增加操作成功的次数,当成功次数达到设定阈值,进入Closed状态

简化为自己的理解模型如下:
在这里插入图片描述

基于上面的理论,我们来实现我们的一个断路器

实现

定义状态

  • State.java
    首先我们定义一个枚举,定义上面的三种状态

    public enum State {

    1. CLOSED,
    2. HALF_OPEN,
    3. OPEN

    }

定义阈值配置

然后我们要有一个配置类,定义失败多少次进入Open状态,Half-Open状态的超时时间等。具体看代码

  • Config.java

    @Data
    public class Config {

    1. /** * Closed 状态进入 Open 状态的错误个数阀值 */
    2. private int failureCount = 5;
    3. /** * failureCount 统计时间窗口(超过该时间重置失败次数) */
    4. private long failureTimeInterval = 2 * 1000;
    5. /** * Open 状态进入 Half-Open 状态的超时时间 */
    6. private int halfOpenTimeout = 5 * 1000;
    7. /** * Half-Open 状态进入 Open 状态的成功个数阀值 */
    8. private int halfOpenSuccessCount = 2;
  1. }

计数器

配置类我们基本都解决了,我们还需要有一个计数器,来记录上次操作调用失败的时间戳,失败次数等。用于和配置类比对来作状态转换。

  • Counter.java

    public class Counter {

    1. // Closed 状态进入 Open 状态的错误个数阀值
    2. private final int failureCount;
    3. // failureCount 统计时间窗口
    4. private final long failureTimeInterval;
    5. // 当前错误次数
    6. private final AtomicInteger currentCount;
    7. // 上一次调用失败的时间戳
    8. private long lastTime;
    9. // Half-Open 状态下成功次数
    10. private final AtomicInteger halfOpenSuccessCount;
    11. public Counter(int failureCount, long failureTimeInterval) {
    12. this.failureCount = failureCount;
    13. this.failureTimeInterval = failureTimeInterval;
    14. this.currentCount = new AtomicInteger(0);
    15. this.halfOpenSuccessCount = new AtomicInteger(0);
    16. this.lastTime = System.currentTimeMillis();
    17. }
    18. /** * 失败次数 + 1 * synchronized 保证线程安全 * @return */
    19. public synchronized int incrFailureCount() {
    20. long current = System.currentTimeMillis();
    21. // 超过时间窗口,当前失败次数重置为 0
    22. if (current - lastTime > failureTimeInterval) {
    23. lastTime = current;
    24. currentCount.set(0);
    25. }
    26. return currentCount.getAndIncrement();
    27. }
    28. /** * half Open 状态下 成功数 + 1 * @return */
    29. public int incrSuccessHalfOpenCount() {
    30. return this.halfOpenSuccessCount.incrementAndGet();
    31. }
    32. /** * 失败总次数是否超过阈值 * @return */
    33. public boolean failureThresholdReached() {
    34. return getCurCount() >= failureCount;
    35. }
    36. public int getCurCount() {
    37. return currentCount.get();
    38. }
    39. /** * 重置 */
    40. public synchronized void reset() {
    41. halfOpenSuccessCount.set(0);
    42. currentCount.set(0);
    43. }

    }

核心实现类

上面的一切都准备好了,我们就来编写我们的核心实现类(CircuitBreaker)。CircuitBreaker主要是负责状态的状态转换,并提供方法给外部调用达到限流降级作用。

  • CircuitBreaker.java

    public class CircuitBreaker {

    1. private State state;
    2. private final Config config;
    3. private final Counter counter;
    4. private long lastOpenedTime;
    5. public CircuitBreaker(Config config) {
    6. this.counter = new Counter(config.getFailureCount(), config.getFailureTimeInterval());
    7. this.state = CLOSED;
    8. this.config = config;
    9. }
  1. /** * 供外部调用达到限流降级作用 * @param toRun * @param fallback 异常回滚方法 * @param <T> * @return */
  2. public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
  3. try {
  4. if (state == OPEN) {
  5. // 判断 是否超时可以进入 half-open状态
  6. if (halfOpenTimeout()) {
  7. return halfOpenHandle(toRun, fallback);
  8. }
  9. // 直接执行失败回调方法
  10. return fallback.apply(new DegradeException("degrade by circuit breaker"));
  11. // 如果降级关闭则正常执行
  12. } else if (state == CLOSED) {
  13. T result = toRun.get();
  14. // 注意重置 错误数
  15. closed();
  16. return result;
  17. } else {
  18. // if (state == half-Open)
  19. return halfOpenHandle(toRun, fallback);
  20. }
  21. } catch (Exception e) {
  22. // 执行失败 错误次数+ 1
  23. counter.incrFailureCount();
  24. // 错误次数达到阀值,进入 open 状态
  25. if (counter.failureThresholdReached()) {
  26. open();
  27. }
  28. return fallback.apply(e);
  29. }
  30. }
  31. /** * 转换为 half-Open状态 * @param toRun * @param fallback * @param <T> * @return */
  32. private <T> T halfOpenHandle(Supplier<T> toRun, Function<Throwable, T> fallback) {
  33. try {
  34. // closed 状态超时进入 half-open 状态
  35. halfOpen();
  36. T result = toRun.get();
  37. int halfOpenSuccCount = counter.incrSuccessHalfOpenCount();
  38. // half-open 状态成功次数到达阀值,进入 closed 状态
  39. if (halfOpenSuccCount >= this.config.getHalfOpenSuccessCount()) {
  40. closed();
  41. }
  42. return result;
  43. } catch (Exception e) {
  44. // half-open 状态发生一次错误进入 open 状态
  45. open();
  46. return fallback.apply(new DegradeException("degrade by circuit breaker"));
  47. }
  48. }
  49. /** * 判断是否 Open是否超时, 如果是则进入 halfOpen状态 * @return */
  50. private boolean halfOpenTimeout() {
  51. return System.currentTimeMillis() - lastOpenedTime > config.getHalfOpenTimeout();
  52. }
  53. private void closed() {
  54. counter.reset();
  55. state = CLOSED;
  56. }
  57. private void open() {
  58. state = OPEN;
  59. lastOpenedTime = System.currentTimeMillis();
  60. }
  61. private void halfOpen() {
  62. state = HALF_OPEN;
  63. }
  64. }

测试

首先我们测试Open状态

  1. @Test
  2. public void testScene1() {
  3. CircuitBreaker cb = new CircuitBreaker(new Config());
  4. IntStream.range(0, 10).forEach(s -> {
  5. String result = cb.run(() -> {
  6. // 假装执行失败
  7. System.out.println("执行方法次数:" + s);
  8. int i = 1 / 0;
  9. return "Success";
  10. }, t -> {
  11. System.err.println(t);
  12. return "执行自定义降级方法";
  13. });
  14. });
  15. }

执行结果:
在这里插入图片描述

可以看到当达到5次后就触发降级操作,不再处理,直接降级

如果我们在每个方法执行后休眠2s,会看到不会触发降级操作

  1. @Test
  2. public void testScene1() {
  3. CircuitBreaker cb = new CircuitBreaker(new Config());
  4. IntStream.range(0, 10).forEach(s -> {
  5. String result = cb.run(() -> {
  6. // 假装执行失败
  7. System.out.println("执行方法次数:" + s);
  8. try {
  9. TimeUnit.SECONDS.sleep(2);
  10. } catch (InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. int i = 1 / 0;
  14. return "Success";
  15. }, t -> {
  16. System.err.println(t);
  17. return "执行自定义降级方法";
  18. });
  19. });
  20. }

执行结果:
在这里插入图片描述

测试 转换为half_open 然后转close

  1. @Test
  2. public void testScene1() {
  3. CircuitBreaker cb = new CircuitBreaker(new Config());
  4. IntStream.range(0, 10).forEach(s -> {
  5. if (s == 5) {
  6. try {
  7. TimeUnit.SECONDS.sleep(5);
  8. } catch (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. String result = cb.run(() -> {
  13. // 假装执行失败
  14. System.out.println("执行方法次数:" + s);
  15. if (s < 5) {
  16. int i = 1 / 0;
  17. }
  18. System.out.println("方法执行成功");
  19. return "Success";
  20. }, t -> {
  21. System.err.println(t);
  22. return "执行自定义降级方法";
  23. });
  24. });
  25. }

为了让测试查看方便,我在上面的状态转换里面加了一些输出,然后执行结果如下
在这里插入图片描述

可以看到最终都达到了我们想要的效果

发表评论

表情:
评论列表 (有 0 条评论,192人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringCloud Hystrix 断路

    雪崩效应 在微服务架构中通常会有多个服务层调用,大量的微服务通过网络进行通信,从而支撑起整个系统。各个微服务之间也难免存在大量的依赖关系。然而任何服务都不是100%可用的,网

    相关 Hystrix断路

    分布式系统面临的问题: 1.扇出:     多个微服务互相调用的时候,如果A调用B、C,而B、C又继续调用其他微服务,这就是扇出(像一把扇子一样慢慢打开)。 2.服务雪崩