spring响应式编程系列:总体流程
目录
示例
程序流程
just
subscribe
new LambdaMonoSubscriber
MonoJust.subscribe
new Operators.ScalarSubscription
onSubscribe
request
onNext
时序图
类图
数据发布者
MonoJust
数据订阅者
LambdaSubscriber
订阅的消息体
ScalarSubscription
想要了解响应式编程的总体流程,只要做到真正吃透一个简单的示例即可。
如下所示:
示例
首先,通过调用Mono.just创建一个单元素的数据发布者(Publisher);
然后,通过调用mono.subscribe订阅数据发布者(Publisher)发布的数据。
如下所示:
// 创建一个包含数据的 Mono |
程序流程
点击Mono.just,如下所示:
just
public static <T> Mono<T> just(T data) { return onAssembly(new MonoJust(data)); } |
在这里,直接new一个MonoJust对象并返回。
点击示例里的mono.subscribe,如下所示:
subscribe
public abstract class Mono<T> implements CorePublisher<T> { ... ... public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Context initialContext) { return (Disposable)this.subscribeWith(new LambdaMonoSubscriber(consumer, errorConsumer, completeConsumer, (Consumer)null, initialContext)); } |
在这里,将示例里subscribe的参数作为LambdaMonoSubscriber的构造参数,然后new一个LambdaMonoSubscriber对象。
LambdaMonoSubscriber对象的初始化参数,如下所示:
new LambdaMonoSubscriber
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { final Consumer<? super T> consumer; final Consumer<? super Throwable> errorConsumer; final Runnable completeConsumer; final Consumer<? super Subscription> subscriptionConsumer; final Context initialContext; volatile Subscription subscription; ... ... LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer, @Nullable Consumer<? super Subscription> subscriptionConsumer, @Nullable Context initialContext) { this.consumer = consumer; this.errorConsumer = errorConsumer; this.completeConsumer = completeConsumer; this.subscriptionConsumer = subscriptionConsumer; this.initialContext = initialContext == null ? Context.empty() : initialContext; } |
MonoJust.subscribe
final class MonoJust<T> extends Mono<T> implements ScalarCallable<T>, Fuseable, SourceProducer<T> { ... ... public void subscribe(CoreSubscriber<? super T> actual) { actual.onSubscribe(Operators.scalarSubscription(actual, this.value)); } |
在这里,来到了MonoJust对象的subscribe方法,该方法调用了LambdaMonoSubscriber对象的onSubscribe方法;
同时,new一个Operators.ScalarSubscription对象,该对象封装了LambdaMonoSubscriber对象和数据发布者MonoJust发布的数据。
如下所示:
new Operators.ScalarSubscription
public static <T> Subscription scalarSubscription(CoreSubscriber<? super T> subscriber, T value, String stepName) { return new Operators.ScalarSubscription(subscriber, value, stepName); } |
点击actual.onSubscribe,如下所示:
onSubscribe
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { ... ... public final void onSubscribe(Subscription s) { if (Operators.validate(this.subscription, s)) { this.subscription = s; if (this.subscriptionConsumer != null) { try { this.subscriptionConsumer.accept(s); } catch (Throwable var3) { Exceptions.throwIfFatal(var3); s.cancel(); this.onError(var3); } } else { s.request(9223372036854775807L); } } } |
在这里,LambdaMonoSubscriber对象调用了Operators.ScalarSubscription对象的request方法。
如下所示:
request
static final class ScalarSubscription<T> implements SynchronousSubscription<T>, InnerProducer<T> { public void request(long n) { if (Operators.validate(n) && ONCE.compareAndSet(this, 0, 1)) { Subscriber<? super T> a = this.actual; a.onNext(this.value); if (this.once != 2) { a.onComplete(); } } } |
在这里,Operators.ScalarSubscription对象又调用了LambdaMonoSubscriber对象的onNext方法。
LambdaMonoSubscriber对象的onNext方法如下所示:
onNext
final class LambdaMonoSubscriber<T> implements InnerConsumer<T>, Disposable { public final void onNext(T x) { Subscription s = (Subscription)S.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { Operators.onNextDropped(x, this.initialContext); } else { if (this.consumer != null) { try { this.consumer.accept(x); } catch (Throwable var5) { Exceptions.throwIfFatal(var5); s.cancel(); this.doError(var5); } } if (this.completeConsumer != null) { try { this.completeConsumer.run(); } catch (Throwable var4) { Operators.onErrorDropped(var4, this.initialContext); } } } } |
终于,在这里,调用了示例里subscribe()方法的回调函数了。
时序图
【说明】
- Mono和MonoJust是数据发布者,LambdaMonoSubscriber是数据消费者,ScalarSubscription是订阅的消息;
- 类的设计还是比较清晰的,就是方法的调用显示有点绕。
- 数据发布者,提供了just方法来生成数据发布者(Publisher);
- 数据订阅者,提供了onSubscribe和onNext方法来响应订阅事件和读取数据;
- 订阅的消息体,封装了数据订阅者和数据发布发布的数据,并且提供了request方法用来处理数据。
- 使用了观察者设计模式:LambdaMonoSubscriber是观察者模式中的观察者(Observer),它订阅(subscribe)一个发布者(MonoJust),MonoJust是观察者模式中的主题(Subject),它负责通知所有的 Subscriber。
类图
数据发布者
MonoJust
【说明】
- Publisher
定义了接口:void subscribe(Subscriber<? super T> var1)。
- CorePublisher
定义了接口:void subscribe(CoreSubscriber<? super T> subscriber)。
- Mono
是一个抽象类,实现了数据发布者通用的各种功能。
比如:使用了工厂方法设计模式来创建诸如MonoJust、MonoCreate、MonoDefer、MonoError等各种具体的数据发布者。
- MonoJust
是一个特定的数据发布者(Publisher),实现了接口void subscribe(CoreSubscriber<? super T> actual)。
数据订阅者
LambdaSubscriber
【说明】
- Subscriber
定义了如下接口:onSubscribe、onNext、onError、onComplete。
- CoreSubscriber
定义了如下接口:onSubscribe
- LambdaMonoSubscriber
关联了consumer、errorConsumer、completeConsumer、subscriptionConsumer这些对象,以完成订阅相关的各种操作。
订阅的消息体
ScalarSubscription
【说明】
- Subscription
提供了如下接口:void request(long var1)、void cancel();
- ScalarSubscription
封装了数据订阅者和数据发布者发布的数据。