06--springCloudAlibab WebFlux集成sentinel
- 在spring-cloud-alibaba项目中像创建webflux-example模块
引入sentinel开发包
<?xml version=”1.0” encoding=”UTF-8”?>
<parent>
<artifactId>spring-cloud-alibaba</artifactId>
<groupId>spring-cloud-alibaba</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>webflux-example</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.cloud.alibaba.GatewayApplication</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置 bootstrap.yml
spring:
application:name: webflux-example
profiles:
active: dev
main:
allow-bean-definition-overriding: true
cloud:
sentinel:
# 取消控制台懒加载
eager: true
## 取消默认过滤器
filter:
enabled: false
# sentinel 地址
transport:
dashboard: 123.206.19.217:8858
# 客户端ip及端口 sentinel通过此端口服务判断服务连接状态
clientIp: 192.168.1.25
port: 8719
#将sentinel配置信息持久化到nacos 熔断规则
datasource:
ds1:
## 从nacos中读取限流配置
nacos:
server-addr: 101.37.152.195:8848
dataId: webflux-example-flow
groupId: DEFAULT_GROUP
data-type: json
## flow 流控规则 degrade 降级规则 system 系统规则 authority 授权规则 param-flow 热点规则
rule-type: flow
server:
port: 10003SentinelConfig 自定义sentinel异常处理器及过滤器
import com.alibaba.csp.sentinel.adapter.spring.webflux.SentinelWebFluxFilter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;/* @author 刘志强 @date 2020/9/25 15:31 /
@Configuration
public class SentinelConfig {/** * 自定义Sentinel过滤器 * @return */
@Bean
@Order(-2)
public WebFilter sentinelWebFluxFilter()
{
return new MySentinelWebFluxFilter();
}
/** * 自定义异常处理器 * @return */
@Bean
@Order(Ordered.HIGHEST_PRECEDENCE)
public WebExceptionHandler sentinelBlockExceptionHandler() {
return new MySentinelBlockExceptionHandler();
}
}
自定义过滤器 设置来源为客户端ip
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.adapter.reactor.ContextConfig;
import com.alibaba.csp.sentinel.adapter.reactor.EntryConfig;
import com.alibaba.csp.sentinel.adapter.reactor.SentinelReactorTransformer;
import com.alibaba.csp.sentinel.adapter.spring.webflux.SentinelWebFluxFilter;
import com.alibaba.csp.sentinel.adapter.spring.webflux.callback.WebFluxCallbackManager;
import java.util.Optional;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
/** * 自定义过滤器 * @author admin */
public class MySentinelWebFluxFilter extends SentinelWebFluxFilter {
private static final String EMPTY_ORIGIN = "";
public MySentinelWebFluxFilter() {
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange).transform(this.buildSentinelTransformer(exchange));
}
private SentinelReactorTransformer<Void> buildSentinelTransformer(ServerWebExchange exchange) {
ServerHttpRequest serverHttpRequest = exchange.getRequest();
String path = exchange.getRequest().getPath().value();
String finalPath = (String)Optional.ofNullable(WebFluxCallbackManager.getUrlCleaner()).map((f) -> {
return (String)f.apply(exchange, path);
}).orElse(path);
// String origin = (String)Optional.ofNullable(WebFluxCallbackManager.getRequestOriginParser()).map((f) -> {
// return (String)f.apply(exchange);
// }).orElse("");
String origin = getIpAddress(serverHttpRequest);
return new SentinelReactorTransformer(new EntryConfig(finalPath, EntryType.IN, new ContextConfig(finalPath, origin)));
}
public static String getIpAddress(ServerHttpRequest request) {
HttpHeaders headers = request.getHeaders();
String ip = headers.getFirst("x-forwarded-for");
if (ip != null && ip.length() != 0 && !"unknown".equalsIgnoreCase(ip)) {
// 多次反向代理后会有多个ip值,第一个ip才是真实ip
if (ip.indexOf(",") != -1) {
ip = ip.split(",")[0];
}
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = headers.getFirst("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = headers.getFirst("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = headers.getFirst("HTTP_CLIENT_IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = headers.getFirst("HTTP_X_FORWARDED_FOR");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = headers.getFirst("X-Real-IP");
}
if (ip == null || ip.length() == 0 || "unknown".equalsIgnoreCase(ip)) {
ip = request.getRemoteAddress().getAddress().getHostAddress();
}
return ip;
}
}
自定义异常处理器
import com.alibaba.csp.sentinel.adapter.gateway.sc.callback.GatewayCallbackManager;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import java.nio.charset.StandardCharsets;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import reactor.core.publisher.Mono;
/**
* @author admin
*/
public class MySentinelBlockExceptionHandler implements WebExceptionHandler {
private Mono<Void> writeResponse(ServerResponse response, ServerWebExchange exchange) {
ServerHttpResponse serverHttpResponse = exchange.getResponse();
serverHttpResponse.getHeaders().add("Content-Type", "application/json;charset=UTF-8");
byte[] datas = "{\"status\":429,\"message\":\"请求超过最大数,请稍后再试\"}".getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = serverHttpResponse.bufferFactory().wrap(datas);
return serverHttpResponse.writeWith(Mono.just(buffer));
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
if (exchange.getResponse().isCommitted()) {
return Mono.error(ex);
}
if (!BlockException.isBlockException(ex)) {
return Mono.error(ex);
}
return handleBlockedRequest(exchange, ex).flatMap(response -> writeResponse(response, exchange));
}
private Mono<ServerResponse> handleBlockedRequest(ServerWebExchange exchange, Throwable throwable) {
return GatewayCallbackManager.getBlockHandler().handleRequest(exchange, throwable);
}
}
nacos配置
Data ID: webflux-example-flow
Group: DEFAULT_GROUP
配置格式: JSON
配置内容:
[{
"resource": "/getList",
"limitApp": "192.168.1.25",
"grade": 1,
"count": 1,
"strategy": 0,
"controlBehavior": 0,
"clusterMode": false
}
]
编写一个contller
package com.cloud.alibaba.controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;import java.util.ArrayList;
import java.util.List;/**
- @author 刘志强
@date 2020/11/19 13:21
*/
@RestController
public class WebFluxController {@GetMapping(“/getList”)
public FluxgetList() { List<String> list = new ArrayList<>();
list.add("aaaa");
list.add("bbbb");
list.add("ccc");
return Flux.fromIterable(list).take(2);
}
}
源码地址
WebFlux和SpringMvc对比
- 传统的基于Servlet的web框架(springmvc)。在本质上都是线程堵塞的,每个链接占用一个线程。知道连接结束线程才会释放。
- WebFlux 是响应式异步web框架。可以使用一个线程处理更多的请求。
还没有评论,来说两句吧...