(九)Spring Webflux
底层基于Netty实现的Web容器与请求/响应处理机制
参照:Spring WebFlux :: Spring Frameworkhttps://docs.spring.io/spring-framework/reference/6.0/web/webflux.html
一、组件对比
API功能 | Servlet-阻塞式Web | WebFlux-响应式Web |
前端控制器 | DispatcherServlet | DispatcherHandler |
处理器 | Controller | WebHandler/Controller |
请求、响应 | ServletRequest、ServletResponse | ServerWebExchange: |
过滤器 | Filter(HttpFilter) | WebFilter |
异常处理器 | HandlerExceptionResolver | DispatchExceptionHandler |
Web配置 | @EnableWebMvc | @EnableWebFlux |
自定义配置 | WebMvcConfigurer | WebFluxConfigurer |
返回结果 | 任意 | Mono、Flux、任意 |
发送REST请求 | RestTemplate | WebClient |
Mono: 返回0|1 数据流
Flux:返回N数据流
二、依赖
maven可以找到 Spring Webflux 的依赖,自行选择版本添加到pom.xml文件中。
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.4.4</version>
</dependency>
三、Reactor Core
1、HttpHandler、HttpServer
public class Main{
public static void main(String[] args) throws IOException {
//快速自己编写一个能处理请求的服务器
//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
HttpHandler handler = (ServerHttpRequest request,
ServerHttpResponse response)->{
URI uri = request.getURI();
System.out.println(Thread.currentThread()+"请求进来:"+uri);
//编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
// response.getHeaders(); //获取响应头
// response.getCookies(); //获取Cookie
// response.getStatusCode(); //获取响应状态码;
// response.bufferFactory(); //buffer工厂
// response.writeWith() //把xxx写出去
// response.setComplete(); //响应结束
//数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>
//创建 响应数据的 DataBuffer
DataBufferFactory factory = response.bufferFactory();
//数据Buffer
DataBuffer buffer = factory.wrap(new String(uri.toString() + " ==> Hello!").getBytes());
// 需要一个 DataBuffer 的发布者
return response.writeWith(Mono.just(buffer));
};
//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
//3、启动Netty服务器
HttpServer.create()
.host("localhost")
.port(80)
.handle(adapter) //用指定的处理器处理请求
.bindNow(); //现在就绑定
System.out.println("服务器启动完成....监听80,接受请求");
System.in.read();
System.out.println("服务器停止....");
}
}
2、DispatcherHandler
SpringMVC: DispatcherServlet;
SpringWebFlux: DispatcherHandler
请求流程:
- HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
- HandlerAdapter:处理器适配器;反射执行目标方法
- HandlerResultHandler:处理器结果处理器;
SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;
WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings
.concatMap(mapping -> mapping.getHandler(exchange)) //找每一个mapping看谁能处理请求
.next() //直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
.switchIfEmpty(createNotFoundError()) //如果没拿到这个元素,则响应404错误;
.onErrorResume(ex -> handleDispatchError(exchange, ex)) //异常处理,一旦前面发生异常,调用处理异常
.flatMap(handler -> handleRequestWith(exchange, handler)); //调用方法处理请求,得到响应结果
}
- 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
- 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
- 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
- 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;
源码中的核心两个:
- handleRequestWith: 编写了handlerAdapter怎么处理请求
- handleResult: String、User、ServerSendEvent、Mono、Flux ...
四、注解开发
1.方法入参
Method Arguments :: Spring Frameworkhttps://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html
Controller method argument | Description |
ServerWebExchange | 封装了请求和响应对象的对象; 自定义获取数据、自定义响应 |
ServerHttpRequest, ServerHttpResponse | 请求、响应 |
WebSession | 访问Session对象 |
java.security.Principal | |
org.springframework.http.HttpMethod | 请求方式 |
java.util.Locale | 国际化 |
java.util.TimeZone + java.time.ZoneId | 时区 |
@PathVariable | 路径变量 |
@MatrixVariable | 矩阵变量 |
@RequestParam | 请求参数 |
@RequestHeader | 请求头; |
@CookieValue | 获取Cookie |
@RequestBody | 获取请求体,Post、文件上传 |
HttpEntity<B> | 封装后的请求对象 |
@RequestPart | 获取文件上传的数据 multipart/form-data. |
java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap. | Map、Model、ModelMap |
@ModelAttribute | |
Errors, BindingResult | 数据校验,封装错误 |
SessionStatus + class-level @SessionAttributes | |
UriComponentsBuilder | For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links. |
@SessionAttribute | |
@RequestAttribute | 转发请求的请求域数据 |
Any other argument | 所有对象都能作为参数: |
2、方法返回值
sse和websocket区别:
● SSE:单工;请求过去以后,等待服务端源源不断的数据
● websocket:双工: 连接建立后,可以任何交互;
Controller method return value | Description |
@ResponseBody | 把响应数据写出去,如果是对象,可以自动转为json |
HttpEntity<B>, ResponseEntity<B> | ResponseEntity:支持快捷自定义响应内容 |
HttpHeaders | 没有响应内容,只有响应头 |
ErrorResponse | 快速构建错误响应 |
ProblemDetail | SpringBoot3; |
String | 就是和以前的使用规则一样; |
View | 直接返回视图对象 |
java.util.Map, org.springframework.ui.Model | 以前一样 |
@ModelAttribute | 以前一样 |
Rendering | 新版的页面跳转API; 不能标注 @ResponseBody 注解 |
void | 仅代表响应完成信号 |
Flux<ServerSentEvent>, Observable<ServerSentEvent>, or other reactive type | 使用 text/event-stream 完成SSE效果 |
Other return values | 未在上述列表的其他返回值,都会当成给页面的数据; |
五、文件上传
Multipart Content :: Spring Frameworkhttps://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html使用注解@RequestPart,可简便处理复杂的表单
@PostMapping("/")
public String handle(@RequestPart("host") String host,
@RequestPart("file") FilePart file) {
// ...
}
六、自定义Flux配置-WebFluxConfigurer
容器中注入这个类型的组件,重写底层逻辑
@Configuration
public class MyWebConfiguration {
//配置底层
@Bean
public WebFluxConfigurer webFluxConfigurer(){
return new WebFluxConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedHeaders("*")
.allowedMethods("*")
.allowedOrigins("localhost");
}
};
}
}
七、过滤-Filter
@Component
public class MyWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
System.out.println("请求处理放行到目标方法之前...");
Mono<Void> filter = chain.filter(exchange); //放行
//流一旦经过某个操作就会变成新流
Mono<Void> voidMono = filter.doOnError(err -> {
System.out.println("目标方法异常以后...");
}) // 目标方法发生异常后做事
.doFinally(signalType -> {
System.out.println("目标方法执行以后...");
});// 目标方法执行之后
//上面执行不花时间。
return voidMono; //看清楚返回的是谁!!!
}
}
八、错误处理
这些是 Java 中常见的响应式编程错误处理方法,通过 Project Reactor 来处理流中的错误。你可以根据实际情况选择不同的错误处理策略,确保你的应用在出现问题时不会崩溃,而是能优雅地恢复或者处理错误。
1. 使用 onErrorResume
处理错误
onErrorResume
可以捕获错误并恢复一个备用值(如另一个流),避免整个流崩溃。
public class ErrorHandlingExample {
public static void main(String[] args) {
Mono.error(new RuntimeException("Something went wrong"))
.onErrorResume(error -> {
System.out.println("Caught error: " + error.getMessage());
return Mono.just("Fallback data"); // 返回备用值
})
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.out.println("This will not be executed"),
() -> System.out.println("Completed")
);
}
}
//输出:
//Caught error: Something went wrong
//Received: Fallback data
//Completed
2. 使用 retry
重试逻辑
retry
操作符可以在发生错误时自动重试,通常用于临时故障的恢复。
public class RetryExample {
public static void main(String[] args) {
Mono<String> source = Mono.fromCallable(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Temporary error");
}
return "Success!";
});
source
.retry(3) // 重试3次
.doOnRetry(retrySignal -> System.out.println("Retry attempt: " + retrySignal.totalRetries()))
.onErrorResume(error -> Mono.just("Failed after retries"))
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.out.println("Error: " + error.getMessage()),
() -> System.out.println("Completed")
);
}
}
//输出:
//Retry attempt: 1
//Retry attempt: 2
//Received: Success!
//Completed
3. 使用 onErrorMap
转换错误
onErrorMap
允许你将捕获到的错误转换为不同类型的错误或自定义错误。
public class ErrorMappingExample {
public static void main(String[] args) {
Mono.error(new RuntimeException("Original error"))
.onErrorMap(error -> new IllegalArgumentException("Mapped error: " + error.getMessage()))
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.out.println("Caught error: " + error.getMessage()),
() -> System.out.println("Completed")
);
}
}
//输出:
//caught error: Mapped error: Original error
4. 使用 doOnError
进行额外的错误处理
doOnError
可以用来在流中出现错误时执行额外的副作用,比如日志记录。
public class DoOnErrorExample {
public static void main(String[] args) {
Mono.error(new RuntimeException("Something went wrong"))
.doOnError(error -> System.out.println("Error occurred: " + error.getMessage()))
.onErrorReturn("Fallback data") // 错误时返回备用数据
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.out.println("This will not be executed"),
() -> System.out.println("Completed")
);
}
}
//输出:
//Error occurred: Something went wrong
//Received: Fallback data
//Completed
5. 使用 doFinally
清理资源
doFinally
会在流完成时执行,无论是正常完成还是发生错误,常用于资源清理。
public class DoFinallyExample {
public static void main(String[] args) {
Mono.just("Hello")
.doFinally(signalType -> System.out.println("Cleanup after signal: " + signalType))
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.out.println("Error: " + error.getMessage()),
() -> System.out.println("Completed")
);
}
}
//输出:
//Received: Hello
//Cleanup after signal: COMPLETED
//Completed
6. 使用 onErrorReturn
返回默认值
onErrorReturn
可以在发生错误时返回一个默认的值,避免流崩溃。
public class OnErrorReturnExample {
public static void main(String[] args) {
Mono.error(new RuntimeException("An error occurred"))
.onErrorReturn("Default value") // 错误时返回默认值
.subscribe(
data -> System.out.println("Received: " + data),
error -> System.out.println("This will not be executed"),
() -> System.out.println("Completed")
);
}
}
//输出:
//Received: Default value
//Completed
什么问题都可以评论区留言,看见都会回复的
如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区
多多支持吧!!!
点赞加藏评论,是对小编莫大的肯定。抱拳了!