当前位置: 首页 > news >正文

(九)Spring Webflux

底层基于Netty实现的Web容器与请求/响应处理机制

参照:Spring WebFlux :: Spring Frameworkhttps://docs.spring.io/spring-framework/reference/6.0/web/webflux.html

一、组件对比

API功能

Servlet-阻塞式Web

WebFlux-响应式Web

前端控制器

DispatcherServlet

DispatcherHandler

处理器

Controller

WebHandler/Controller

请求、响应

ServletRequest、ServletResponse

ServerWebExchange:
ServerHttpRequest、ServerHttpResponse

过滤器

Filter(HttpFilter)

WebFilter

异常处理器

HandlerExceptionResolver

DispatchExceptionHandler

Web配置

@EnableWebMvc

@EnableWebFlux

自定义配置

WebMvcConfigurer

WebFluxConfigurer

返回结果

任意

Mono、Flux、任意

发送REST请求

RestTemplate

WebClient

Mono: 返回0|1 数据流

Flux:返回N数据流

二、依赖

        maven可以找到 Spring Webflux 的依赖,自行选择版本添加到pom.xml文件中。

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <version>3.4.4</version>
</dependency>

三、Reactor Core

1、HttpHandler、HttpServer

public class Main{

        public static void main(String[] args) throws IOException {
        //快速自己编写一个能处理请求的服务器

        //1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
        HttpHandler handler = (ServerHttpRequest request,
                                   ServerHttpResponse response)->{
            URI uri = request.getURI();
            System.out.println(Thread.currentThread()+"请求进来:"+uri);
            //编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
//            response.getHeaders(); //获取响应头
//            response.getCookies(); //获取Cookie
//            response.getStatusCode(); //获取响应状态码;
//            response.bufferFactory(); //buffer工厂
//            response.writeWith() //把xxx写出去
//            response.setComplete(); //响应结束

            //数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>

            //创建 响应数据的 DataBuffer
            DataBufferFactory factory = response.bufferFactory();

            //数据Buffer
            DataBuffer buffer = factory.wrap(new String(uri.toString() + " ==> Hello!").getBytes());


            // 需要一个 DataBuffer 的发布者
            return response.writeWith(Mono.just(buffer));
        };

        //2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);


        //3、启动Netty服务器
        HttpServer.create()
                .host("localhost")
                .port(80)
                .handle(adapter) //用指定的处理器处理请求
                .bindNow(); //现在就绑定

        System.out.println("服务器启动完成....监听80,接受请求");
        System.in.read();
        System.out.println("服务器停止....");

    }
}

2、DispatcherHandler

SpringMVC: DispatcherServlet;

SpringWebFlux: DispatcherHandler

请求流程:

  • HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
  • HandlerAdapter:处理器适配器;反射执行目标方法
  • HandlerResultHandler:处理器结果处理器

SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;

WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;

public Mono<Void> handle(ServerWebExchange exchange) { 
		if (this.handlerMappings == null) {
			return createNotFoundError();
		}
		if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
			return handlePreFlight(exchange);
		}
		return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings
				.concatMap(mapping -> mapping.getHandler(exchange)) //找每一个mapping看谁能处理请求
				.next() //直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
				.switchIfEmpty(createNotFoundError()) //如果没拿到这个元素,则响应404错误;
				.onErrorResume(ex -> handleDispatchError(exchange, ex)) //异常处理,一旦前面发生异常,调用处理异常
				.flatMap(handler -> handleRequestWith(exchange, handler)); //调用方法处理请求,得到响应结果
	}
  • 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
  • 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
  • 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
  • 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;

源码中的核心两个:

  • handleRequestWith: 编写了handlerAdapter怎么处理请求
  • handleResult: String、User、ServerSendEvent、Mono、Flux ...

四、注解开发

1.方法入参

Method Arguments :: Spring Frameworkhttps://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html

Controller method argument

Description

ServerWebExchange

封装了请求和响应对象的对象; 自定义获取数据、自定义响应

ServerHttpRequest, ServerHttpResponse

请求、响应

WebSession

访问Session对象

java.security.Principal

org.springframework.http.HttpMethod

请求方式

java.util.Locale

国际化

java.util.TimeZone + java.time.ZoneId

时区

@PathVariable

路径变量

@MatrixVariable

矩阵变量

@RequestParam

请求参数

@RequestHeader

请求头;

@CookieValue

获取Cookie

@RequestBody

获取请求体,Post、文件上传

HttpEntity<B>

封装后的请求对象

@RequestPart

获取文件上传的数据 multipart/form-data.

java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap.

Map、Model、ModelMap

@ModelAttribute

Errors, BindingResult

数据校验,封装错误

SessionStatus + class-level @SessionAttributes

UriComponentsBuilder

For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links.

@SessionAttribute

@RequestAttribute

转发请求的请求域数据

Any other argument

所有对象都能作为参数:
1、基本类型 ,等于标注@RequestParam
2、对象类型,等于标注 @ModelAttribute

2、方法返回值

sse和websocket区别:
● SSE:单工;请求过去以后,等待服务端源源不断的数据
● websocket:双工: 连接建立后,可以任何交互;

Controller method return value

Description

@ResponseBody

把响应数据写出去,如果是对象,可以自动转为json

HttpEntity<B>, ResponseEntity<B>

ResponseEntity:支持快捷自定义响应内容

HttpHeaders

没有响应内容,只有响应头

ErrorResponse

快速构建错误响应

ProblemDetail

SpringBoot3;

String

就是和以前的使用规则一样;
forward: 转发到一个地址
redirect: 重定向到一个地址
配合模板引擎

View

直接返回视图对象

java.util.Map, org.springframework.ui.Model

以前一样

@ModelAttribute

以前一样

Rendering

新版的页面跳转API; 不能标注 @ResponseBody 注解

void

仅代表响应完成信号

Flux<ServerSentEvent>, Observable<ServerSentEvent>, or other reactive type

使用 text/event-stream 完成SSE效果

Other return values

未在上述列表的其他返回值,都会当成给页面的数据;

五、文件上传

Multipart Content :: Spring Frameworkhttps://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html使用注解@RequestPart,可简便处理复杂的表单


@PostMapping("/")
public String handle(@RequestPart("host") String host, 
		@RequestPart("file") FilePart file) { 
	// ...
}

六、自定义Flux配置-WebFluxConfigurer

容器中注入这个类型的组件,重写底层逻辑

@Configuration
public class MyWebConfiguration {

    //配置底层
    @Bean
    public WebFluxConfigurer webFluxConfigurer(){

        return new WebFluxConfigurer() {
            @Override
            public void addCorsMappings(CorsRegistry registry) {
                registry.addMapping("/**")
                        .allowedHeaders("*")
                        .allowedMethods("*")
                        .allowedOrigins("localhost");
            }
        };
    }
}

七、过滤-Filter

@Component
public class MyWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();

        System.out.println("请求处理放行到目标方法之前...");
        Mono<Void> filter = chain.filter(exchange); //放行


        //流一旦经过某个操作就会变成新流

        Mono<Void> voidMono = filter.doOnError(err -> {
                    System.out.println("目标方法异常以后...");
                }) // 目标方法发生异常后做事
                .doFinally(signalType -> {
                    System.out.println("目标方法执行以后...");
                });// 目标方法执行之后

        //上面执行不花时间。
        return voidMono; //看清楚返回的是谁!!!
    }
}

八、错误处理 

        这些是 Java 中常见的响应式编程错误处理方法,通过 Project Reactor 来处理流中的错误。你可以根据实际情况选择不同的错误处理策略,确保你的应用在出现问题时不会崩溃,而是能优雅地恢复或者处理错误。

1. 使用 onErrorResume 处理错误

onErrorResume 可以捕获错误并恢复一个备用值(如另一个流),避免整个流崩溃。

public class ErrorHandlingExample {
    public static void main(String[] args) {
        Mono.error(new RuntimeException("Something went wrong"))
            .onErrorResume(error -> {
                System.out.println("Caught error: " + error.getMessage());
                return Mono.just("Fallback data"); // 返回备用值
            })
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("This will not be executed"),
                () -> System.out.println("Completed")
            );
    }
}

//输出:
//Caught error: Something went wrong
//Received: Fallback data
//Completed

2. 使用 retry 重试逻辑

retry 操作符可以在发生错误时自动重试,通常用于临时故障的恢复。

public class RetryExample {
    public static void main(String[] args) {
        Mono<String> source = Mono.fromCallable(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Temporary error");
            }
            return "Success!";
        });

        source
            .retry(3)  // 重试3次
            .doOnRetry(retrySignal -> System.out.println("Retry attempt: " + retrySignal.totalRetries()))
            .onErrorResume(error -> Mono.just("Failed after retries"))
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed")
            );
    }
}

//输出:
//Retry attempt: 1
//Retry attempt: 2
//Received: Success!
//Completed

3. 使用 onErrorMap 转换错误

onErrorMap 允许你将捕获到的错误转换为不同类型的错误或自定义错误。

public class ErrorMappingExample {
    public static void main(String[] args) {

        Mono.error(new RuntimeException("Original error"))
            .onErrorMap(error -> new IllegalArgumentException("Mapped error: " + error.getMessage()))
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("Caught error: " + error.getMessage()),
                () -> System.out.println("Completed")
            );
    }
}


//输出:
//caught error: Mapped error: Original error

4. 使用 doOnError 进行额外的错误处理

doOnError 可以用来在流中出现错误时执行额外的副作用,比如日志记录。

public class DoOnErrorExample {
    public static void main(String[] args) {

        Mono.error(new RuntimeException("Something went wrong"))
            .doOnError(error -> System.out.println("Error occurred: " + error.getMessage()))
            .onErrorReturn("Fallback data") // 错误时返回备用数据
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("This will not be executed"),
                () -> System.out.println("Completed")
            );
    }
}


//输出:
//Error occurred: Something went wrong
//Received: Fallback data
//Completed

5. 使用 doFinally 清理资源

doFinally 会在流完成时执行,无论是正常完成还是发生错误,常用于资源清理。

public class DoFinallyExample {
    public static void main(String[] args) {

         Mono.just("Hello")
            .doFinally(signalType -> System.out.println("Cleanup after signal: " + signalType))
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed")
            );
    }
}

//输出:
//Received: Hello
//Cleanup after signal: COMPLETED
//Completed

6. 使用 onErrorReturn 返回默认值

onErrorReturn 可以在发生错误时返回一个默认的值,避免流崩溃。

public class OnErrorReturnExample {
    public static void main(String[] args) {

        Mono.error(new RuntimeException("An error occurred"))
            .onErrorReturn("Default value")  // 错误时返回默认值
            .subscribe(
                data -> System.out.println("Received: " + data),
                error -> System.out.println("This will not be executed"),
                () -> System.out.println("Completed")
            );
    }
}


//输出:
//Received: Default value
//Completed

什么问题都可以评论区留言,看见都会回复的

如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区

多多支持吧!!!

点赞加藏评论,是对小编莫大的肯定。抱拳了!

相关文章:

  • 深度学习 Note.1
  • 海康HTTP监听报警事件数据
  • 从 MySQL 到时序数据库 TDengine:Zendure 如何实现高效储能数据管理?
  • 破局离散制造:主数据管理驱动数字化转型的实践与启示
  • Rabbitmq消息被消费时抛异常,进入Unacked 状态,进而导致消费者不断尝试消费(上)
  • BC93 公务员面试
  • 16-CSS3新增选择器
  • 从子查询到连接:提升数据库查询性能的 7 种方法
  • 生成式AI课程 比较好
  • C++:重载操作符
  • APM-基于Grafana生态以及OTLP协议的Java轻量级日志监控系统
  • Qt信号槽函数
  • springBoot中雪花算术法
  • 导出sql命令
  • 融合YOLO11与行为树的人机协作智能框架:动态工效学优化与自适应安全决策
  • LabVIEW医疗设备备用电源实时监控系统
  • Activiti工作流
  • CH32V208GBU6沁恒协议栈BUG:在主机Write的同一包notify会造成主机一直Write不成功;最终还是用心跳包来解决
  • 进程、线程和协程
  • Leecode Hot50
  • 人民日报社论:做新时代挺膺担当的奋斗者
  • 孟泽:我们简化了历史,因此也简化了人性
  • 持续更新丨伊朗内政部长:港口爆炸已致14人死亡
  • 手机号旧机主信用卡欠款、新机主被催收骚扰四年,光大银行济南分行回应
  • 王毅会见乌兹别克斯坦外长赛义多夫
  • 铜钴巨头洛阳钼业一季度净利润同比大增九成,最新宣布首度进军黄金矿产