06--springCloudAlibab WebFlux集成sentinel

àì夳堔傛蜴生んèń 2022-12-22 09:39 217阅读 0赞
  1. 在spring-cloud-alibaba项目中像创建webflux-example模块
  2. 引入sentinel开发包

    <?xml version=”1.0” encoding=”UTF-8”?>

    1. <parent>
    2. <artifactId>spring-cloud-alibaba</artifactId>
    3. <groupId>spring-cloud-alibaba</groupId>
    4. <version>1.0-SNAPSHOT</version>
    5. </parent>
    6. <modelVersion>4.0.0</modelVersion>
    7. <artifactId>webflux-example</artifactId>
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-webflux</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.cloud</groupId>
  8. <artifactId>spring-cloud-context</artifactId>
  9. </dependency>
  10. <!--sentinel-->
  11. <dependency>
  12. <groupId>com.alibaba.cloud</groupId>
  13. <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>com.alibaba.csp</groupId>
  17. <artifactId>sentinel-datasource-nacos</artifactId>
  18. </dependency>
  19. </dependencies>
  20. <build>
  21. <plugins>
  22. <plugin>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-maven-plugin</artifactId>
  25. <configuration>
  26. <mainClass>com.cloud.alibaba.GatewayApplication</mainClass>
  27. </configuration>
  28. </plugin>
  29. </plugins>
  30. </build>
  31. </project>
  1. 配置 bootstrap.yml

    spring:
    application:

    1. name: webflux-example

    profiles:

    1. active: dev

    main:

    1. allow-bean-definition-overriding: true

    cloud:

    1. sentinel:
    2. # 取消控制台懒加载
    3. eager: true
    4. ## 取消默认过滤器
    5. filter:
    6. enabled: false
    7. # sentinel 地址
    8. transport:
    9. dashboard: 123.206.19.217:8858
    10. # 客户端ip及端口 sentinel通过此端口服务判断服务连接状态
    11. clientIp: 192.168.1.25
    12. port: 8719
    13. #将sentinel配置信息持久化到nacos 熔断规则
    14. datasource:
    15. ds1:
    16. ## 从nacos中读取限流配置
    17. nacos:
    18. server-addr: 101.37.152.195:8848
    19. dataId: webflux-example-flow
    20. groupId: DEFAULT_GROUP
    21. data-type: json
    22. ## flow 流控规则 degrade 降级规则 system 系统规则 authority 授权规则 param-flow 热点规则
    23. rule-type: flow

    server:
    port: 10003

  2. SentinelConfig 自定义sentinel异常处理器及过滤器

    import com.alibaba.csp.sentinel.adapter.spring.webflux.SentinelWebFluxFilter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.annotation.Order;

    /* @author 刘志强 @date 2020/9/25 15:31 /
    @Configuration
    public class SentinelConfig {

    1. /** * 自定义Sentinel过滤器 * @return */
    2. @Bean
    3. @Order(-2)
    4. public WebFilter sentinelWebFluxFilter()
    5. {
    6. return new MySentinelWebFluxFilter();
    7. }
    8. /** * 自定义异常处理器 * @return */
    9. @Bean
    10. @Order(Ordered.HIGHEST_PRECEDENCE)
    11. public WebExceptionHandler sentinelBlockExceptionHandler() {
    12. return new MySentinelBlockExceptionHandler();
    13. }

    }

自定义过滤器 设置来源为客户端ip

  1. import com.alibaba.csp.sentinel.EntryType;
  2. import com.alibaba.csp.sentinel.adapter.reactor.ContextConfig;
  3. import com.alibaba.csp.sentinel.adapter.reactor.EntryConfig;
  4. import com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer;
  5. import com.alibaba.csp.sentinel.adapter.spring.webflux.SentinelWebFluxFilter;
  6. import com.alibaba.csp.sentinel.adapter.spring.webflux.callback.WebFluxCallbackManager;
  7. import java.util.Optional;
  8. import org.springframework.http.HttpHeaders;
  9. import org.springframework.http.server.reactive.ServerHttpRequest;
  10. import org.springframework.web.server.ServerWebExchange;
  11. import org.springframework.web.server.WebFilterChain;
  12. import reactor.core.publisher.Mono;
  13. /** * 自定义过滤器 * @author admin */
  14. public class MySentinelWebFluxFilter extends SentinelWebFluxFilter {
  15. private static final String EMPTY_ORIGIN = "";
  16. public MySentinelWebFluxFilter() {
  17. }
  18. @Override
  19. public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
  20. return chain.filter(exchange).transform(this.buildSentinelTransformer(exchange));
  21. }
  22. private SentinelReactorTransformer<Void> buildSentinelTransformer(ServerWebExchange exchange) {
  23. ServerHttpRequest serverHttpRequest = exchange.getRequest();
  24. String path = exchange.getRequest().getPath().value();
  25. String finalPath = (String)Optional.ofNullable(WebFluxCallbackManager.getUrlCleaner()).map((f) -> {
  26. return (String)f.apply(exchange, path);
  27. }).orElse(path);
  28. // String origin = (String)Optional.ofNullable(WebFluxCallbackManager.getRequestOriginParser()).map((f) -> {
  29. // return (String)f.apply(exchange);
  30. // }).orElse("");
  31. String origin = getIpAddress(serverHttpRequest);
  32. return new SentinelReactorTransformer(new EntryConfig(finalPath, EntryType.IN, new ContextConfig(finalPath, origin)));
  33. }
  34. public static String getIpAddress(ServerHttpRequest request) {
  35. HttpHeaders headers = request.getHeaders();
  36. String ip = headers.getFirst("x-forwarded-for");
  37. if (ip != null && ip.length() != 0 && !"unknown".equalsIgnoreCase(ip)) {
  38. // 多次反向代理后会有多个ip值,第一个ip才是真实ip
  39. if (ip.indexOf(",") != -1) {
  40. ip = ip.split(",")[0];
  41. }
  42. }
  43. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  44. ip = headers.getFirst("Proxy-Client-IP");
  45. }
  46. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  47. ip = headers.getFirst("WL-Proxy-Client-IP");
  48. }
  49. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  50. ip = headers.getFirst("HTTP_CLIENT_IP");
  51. }
  52. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  53. ip = headers.getFirst("HTTP_X_FORWARDED_FOR");
  54. }
  55. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  56. ip = headers.getFirst("X-Real-IP");
  57. }
  58. if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
  59. ip = request.getRemoteAddress().getAddress().getHostAddress();
  60. }
  61. return ip;
  62. }
  63. }

自定义异常处理器

  1. import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
  2. import com.alibaba.csp.sentinel.slots.block.BlockException;
  3. import java.nio.charset.StandardCharsets;
  4. import org.springframework.core.io.buffer.DataBuffer;
  5. import org.springframework.http.server.reactive.ServerHttpResponse;
  6. import org.springframework.web.reactive.function.server.ServerResponse;
  7. import org.springframework.web.server.ServerWebExchange;
  8. import org.springframework.web.server.WebExceptionHandler;
  9. import reactor.core.publisher.Mono;
  10. /**
  11. * @author admin
  12. */
  13. public class MySentinelBlockExceptionHandler implements WebExceptionHandler {
  14. private Mono<Void> writeResponse(ServerResponse response, ServerWebExchange exchange) {
  15. ServerHttpResponse serverHttpResponse = exchange.getResponse();
  16. serverHttpResponse.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
  17. byte[] datas = "{\"status\":429,\"message\":\"请求超过最大数,请稍后再试\"}".getBytes(StandardCharsets.UTF_8);
  18. DataBuffer buffer = serverHttpResponse.bufferFactory().wrap(datas);
  19. return serverHttpResponse.writeWith(Mono.just(buffer));
  20. }
  21. @Override
  22. public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
  23. if (exchange.getResponse().isCommitted()) {
  24. return Mono.error(ex);
  25. }
  26. if (!BlockException.isBlockException(ex)) {
  27. return Mono.error(ex);
  28. }
  29. return handleBlockedRequest(exchange, ex).flatMap(response -> writeResponse(response, exchange));
  30. }
  31. private Mono<ServerResponse> handleBlockedRequest(ServerWebExchange exchange, Throwable throwable) {
  32. return GatewayCallbackManager.getBlockHandler().handleRequest(exchange, throwable);
  33. }
  34. }
  1. nacos配置

    Data ID: webflux-example-flow
    Group: DEFAULT_GROUP
    配置格式: JSON
    配置内容:
    [

    1. {
    2. "resource": "/getList",
    3. "limitApp": "192.168.1.25",
    4. "grade": 1,
    5. "count": 1,
    6. "strategy": 0,
    7. "controlBehavior": 0,
    8. "clusterMode": false
    9. }

    ]

  2. 编写一个contller

    package com.cloud.alibaba.controller;

    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;

    import java.util.ArrayList;
    import java.util.List;

    /**

    • @author 刘志强
    • @date 2020/11/19 13:21
      */
      @RestController
      public class WebFluxController {

      @GetMapping(“/getList”)
      public Flux getList() {

      1. List<String> list = new ArrayList<>();
      2. list.add("aaaa");
      3. list.add("bbbb");
      4. list.add("ccc");
      5. return Flux.fromIterable(list).take(2);

      }
      }

源码地址

  1. WebFlux和SpringMvc对比

    1. 传统的基于Servlet的web框架(springmvc)。在本质上都是线程堵塞的,每个链接占用一个线程。知道连接结束线程才会释放。
    2. WebFlux 是响应式异步web框架。可以使用一个线程处理更多的请求。

发表评论

表情:
评论列表 (有 0 条评论,217人围观)

还没有评论,来说两句吧...

相关阅读