当前位置: 首页 > news >正文

spring响应式编程系列:总体流程

目录

示例

程序流程

just

subscribe

new LambdaMonoSubscriber

​​​​​​​MonoJust.subscribe

​​​​​​​new Operators.ScalarSubscription  

​​​​​​​onSubscribe

​​​​​​​request

​​​​​​​onNext

时序图

类图

数据发布者

MonoJust

数据订阅者

LambdaSubscriber

订阅的消息体

ScalarSubscription


       

        想要了解响应式编程的总体流程,只要做到真正吃透一个简单的示例即可。

        如下所示:

示例

        首先,通过调用Mono.just创建一个单元素的数据发布者(Publisher);

        然后,通过调用mono.subscribe订阅数据发布者(Publisher)发布的数据。

        如下所示:

// 创建一个包含数据的 Mono
Mono<String> mono = Mono.just("Hello, Reactive World!");
// 订阅并消费 Mono
mono.subscribe(System.out::println);

程序流程

        点击Mono.just,如下所示:

​​​​​​​just

public static <T> Mono<T> just(T data) {

        return onAssembly(new MonoJust(data));

    }

        在这里,直接new一个MonoJust对象并返回。

        点击示例里的mono.subscribe,如下所示:

subscribe

public abstract class Mono<T> implements CorePublisher<T> {

    ... ...

    public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) {

        return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext));

    }

      在这里,将示例里subscribe的参数作为LambdaMonoSubscriber的构造参数,然后new一个LambdaMonoSubscriber对象。

        LambdaMonoSubscriber对象的初始化参数,如下所示:

​​​​​​​new LambdaMonoSubscriber

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

    final Consumer<? super T> consumer;

    final Consumer<? super Throwable> errorConsumer;

    final Runnable completeConsumer;

    final Consumer<? super Subscription> subscriptionConsumer;

    final Context initialContext;

    volatile Subscription subscription;

    ... ...

    LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) {

        this.consumer = consumer;

        this.errorConsumer = errorConsumer;

        this.completeConsumer = completeConsumer;

        this.subscriptionConsumer = subscriptionConsumer;

        this.initialContext = initialContext == null ? Context.empty() : initialContext;

    }

​​​​​​​MonoJust.subscribe

final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> {

    ... ...

public void subscribe(CoreSubscriber<? super T> actual) {

        actual.onSubscribe(Operators.scalarSubscription(actual, this.value));

    }

       在这里,来到了MonoJust对象的subscribe方法,该方法调用了LambdaMonoSubscriber对象的onSubscribe方法;

        同时,new一个Operators.ScalarSubscription对象,该对象封装了LambdaMonoSubscriber对象和数据发布者MonoJust发布的数据。

        如下所示:

​​​​​​​new Operators.ScalarSubscription  

public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) {

        return new Operators.ScalarSubscription(subscriber, value, stepName);

    }

        点击actual.onSubscribe,如下所示:

​​​​​​​onSubscribe

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

    ... ...

    public final void onSubscribe(Subscription s) {

        if (Operators.validate(this.subscription, s)) {

            this.subscription = s;

            if (this.subscriptionConsumer != null) {

                try {

                    this.subscriptionConsumer.accept(s);

                } catch (Throwable var3) {

                    Exceptions.throwIfFatal(var3);

                    s.cancel();

                    this.onError(var3);

                }

            } else {

                s.request(9223372036854775807L);

            }

        }

    }

      在这里,LambdaMonoSubscriber对象调用了Operators.ScalarSubscription对象的request方法。

        如下所示:

​​​​​​​request

static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> {

public void request(long n) {

            if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) {

                Subscriber<? super T> a = this.actual;

                a.onNext(this.value);

                if (this.once != 2) {

                    a.onComplete();

                }

            }

        }

        在这里,Operators.ScalarSubscription对象又调用了LambdaMonoSubscriber对象的onNext方法。

        LambdaMonoSubscriber对象的onNext方法如下所示:

​​​​​​​onNext

final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable {

public final void onNext(T x) {

        Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription());

        if (s == Operators.cancelledSubscription()) {

            Operators.onNextDropped(x, this.initialContext);

        } else {

            if (this.consumer != null) {

                try {

                    this.consumer.accept(x);

                } catch (Throwable var5) {

                    Exceptions.throwIfFatal(var5);

                    s.cancel();

                    this.doError(var5);

                }

            }

            if (this.completeConsumer != null) {

                try {

                    this.completeConsumer.run();

                } catch (Throwable var4) {

                    Operators.onErrorDropped(var4, this.initialContext);

                }

            }

        }

}

        终于,在这里,调用了示例里subscribe()方法的回调函数了。

时序图

【说明】

  1. Mono和MonoJust是数据发布者,LambdaMonoSubscriber是数据消费者,ScalarSubscription是订阅的消息;
  2. 类的设计还是比较清晰的,就是方法的调用显示有点绕。
  3. 数据发布者,提供了just方法来生成数据发布者(Publisher);
  4. 数据订阅者,提供了onSubscribe和onNext方法来响应订阅事件和读取数据;
  5. 订阅的消息体,封装了数据订阅者和数据发布发布的数据,并且提供了request方法用来处理数据。
  6. 使用了观察者设计模式:LambdaMonoSubscriber是观察者模式中的观察者(Observer),它订阅(subscribe)一个发布者(MonoJust),MonoJust是观察者模式中的主题(Subject),它负责通知所有的 Subscriber。

类图

数据发布者

MonoJust

【说明】

  • Publisher

    定义了接口:void subscribe(Subscriber<? super T> var1)。

  • CorePublisher

    定义了接口:void subscribe(CoreSubscriber<? super T> subscriber)。

  • Mono

    是一个抽象类,实现了数据发布者通用的各种功能。

比如:使用了工厂方法设计模式来创建诸如MonoJust、MonoCreate、MonoDefer、MonoError等各种具体的数据发布者。

  • MonoJust

    是一个特定的数据发布者(Publisher),实现了接口void subscribe(CoreSubscriber<? super T> actual)。

数据订阅者

LambdaSubscriber

【说明】

  • Subscriber

    定义了如下接口:onSubscribe、onNext、onError、onComplete。

  • CoreSubscriber

    定义了如下接口:onSubscribe

  • LambdaMonoSubscriber

    关联了consumer、errorConsumer、completeConsumer、subscriptionConsumer这些对象,以完成订阅相关的各种操作。

订阅的消息体

ScalarSubscription

【说明】

  • Subscription

    提供了如下接口:void request(long var1)、void cancel();

  • ScalarSubscription

    封装了数据订阅者和数据发布者发布的数据。

相关文章:

  • Git-使用教程(新手向)
  • MCP Server驱动传统SaaS智能化转型:从工具堆叠到AI Agent生态重构,基于2025年技术演进与产业实践
  • 【mysql】mysql疑难问题:实际场景解释什么是排它锁 当前读 快照读
  • 【Linux】进程概念(二):PCB,ps 和 fork
  • excel解析图片pdf附件不怕
  • 一.学习python工具准备
  • spring cloud gateway前面是否必须要有个nginx
  • ARINC818协议(三)
  • CUDA Driver 安装与升级(CentOS 7)
  • 前端:uniapp框架中<scroll-view>r如何控制元素进行局部滚动
  • rancher 网红无法上传大视频,小于2m可以正常上传
  • vmware17 虚拟机 ubuntu22.04 桥接模式,虚拟机无法接收组播消息
  • 【AI插件开发】Notepad++ AI插件开发实践:支持配置界面
  • OpenBMC:BmcWeb log输出
  • 消息中间件——RocketMQ(二)
  • 笔记本电脑屏幕闪烁是怎么回事 原因及解决方法
  • shiro使用
  • 汽车行驶工况特征参数:从“速度曲线”到“驾驶DNA”的硬核解码
  • 原型模式详解及c++代码实现(以自动驾驶感知场景为例)
  • 如何使用Python进行自动化的系统管理?
  • 韩国新一届总统选举将于6月3日举行,民调显示李在明继续领跑
  • 李家超将率团访问浙江
  • 西藏艺术来到黄浦江畔,“隐秘之门”艺术展外滩三号开幕
  • “站在亚洲实现整体振兴的新起点上”——习近平主席对越南、马来西亚、柬埔寨进行国事访问纪实
  • 碎片化时代如何阅读?巴金图书馆推出世界读书日系列活动
  • 梅德韦杰夫:如果欧盟和美国 “撒手不管”,俄罗斯会更快解决俄乌冲突