当前位置: 首页 > news >正文

spring响应式编程系列:异步生产数据

目录

示例

大致流程

create

new MonoCreate

subscribe

new LambdaMonoSubscriber

monoCreate.subscribe

accept

success

onNext

时序图

类图

数据发布者

MonoCreate

数据订阅者

LambdaMonoSubscriber

订阅的消息体

DefaultMonoSink

        本篇文章我们来研究如何将现有异步 API(如回调式接口)适配到 Reactor 的响应式流中。

        默认情况下,Mono.create的代码块执行在订阅时的线程上,但如果在该代码块中启动其他线程或使用异步API,那么数据生产就会变成异步的。示例如下所示:

示例

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步API操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});
log.info("main start");
mono.subscribe(x -> log.info("main finish"));
Thread.sleep(5000);

        在这里,通过Mono.create模拟一个异步API操作,API操作成功后,调用sink.success("Hello, World!")进行数据发布者发送数据,从而触发数据的订阅。

        接下来,让我们一起看看程序的流程是怎么处理的。

        点击create()方法,如下所示:

大致流程

create

public static <T> Mono<T> create(Consumer<MonoSink<T>> callback) {
    return onAssembly(new MonoCreate<>(callback));
}

        在这里,new一个MonoCreate对象并返回。

        点击MonoCreate,如下所示:

new MonoCreate

final class MonoCreate<T> extends Mono<T> implements SourceProducer<T> {
   static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
   static final Disposable CANCELLED = Disposables.disposed();
   final Consumer<MonoSink<T>> callback;
   MonoCreate(Consumer<MonoSink<T>> callback) {
      this.callback = callback;
   }

        在这里,将create()方法的回调接口参数赋值给callback属性。因此,Mono.create的参数就作为数据发布者的一个属性信息了。

        点击示例里的mono.subscribe(),如下所示:

subscribe

public final Disposable subscribe(
      @Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Context initialContext) {
   return subscribeWith(new LambdaMonoSubscriber<>(consumer, errorConsumer,
         completeConsumer, null, initialContext));
}

        在这里,new一个LambdaMonoSubscriber对象,如下所示:

new LambdaMonoSubscriber

LambdaMonoSubscriber(@Nullable Consumer<? super T> consumer,
      @Nullable Consumer<? super Throwable> errorConsumer,
      @Nullable Runnable completeConsumer,
      @Nullable Consumer<? super Subscription> subscriptionConsumer,
      @Nullable Context initialContext) {
   this.consumer = consumer;
   this.errorConsumer = errorConsumer;
   this.completeConsumer = completeConsumer;
   this.subscriptionConsumer = subscriptionConsumer;
   this.initialContext = initialContext == null ? Context.empty() : initialContext;
}

        在这里,将subscribe的回调接口参数赋值给consumer 属性,因此,mono.subscribe的参数就作为数据消费者的属性了。

        点击上一步的subscribeWith()方法,如下所示:

monoCreate.subscribe

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
   DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);
   actual.onSubscribe(emitter);
   try {
      callback.accept(emitter);
   }
   catch (Throwable ex) {
      emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
   }
}

        在这里,首先调用了数据消费者的onSubscribe()方法,这个与《spring响应式编程系列:总体流程》一样。

        另外,调用了callback.accept()方法,也就是Mono.create()的回调接口参数。

accept

Mono<String> mono = Mono.create(sink -> {
    // 模拟一个异步操作
    new Thread(() -> {
        try {
            Thread.sleep(1000); // 模拟耗时操作
            log.info("success");
            sink.success("Hello, World!"); // 成功时发射数据
        } catch (InterruptedException e) {
            sink.error(e); // 发生错误时发射错误信号
        }
    }).start();
});

        在这里,模拟了耗时操作,然后调用sink.success()方法。

      通常,可以将sink对象保存在线程共享环境里,等其它的业务操作执行完成后,再调用sink.success()方法,即可发射数据发布者数据,从而触发消费者订阅。

        点击sink.success(),如下所示:

​​​​​​​success

public void success(@Nullable T value) {

... ...
     for (; ; ) {
      int s = state;
      if (s == HAS_REQUEST_HAS_VALUE || s == NO_REQUEST_HAS_VALUE) {
         Operators.onNextDropped(value, actual.currentContext());
         return;
      }
      if (s == HAS_REQUEST_NO_VALUE) {
         if (STATE.compareAndSet(this, s, HAS_REQUEST_HAS_VALUE)) {
            try {
               actual.onNext(value);
               actual.onComplete();
            }
            catch (Throwable t) {
               actual.onError(t);
            }
            finally {
               disposeResource(false);
            }
         } else {
            Operators.onNextDropped(value, actual.currentContext());
         }
         return;
      }
      ... ...
   }
}

        在这里,调用了数据订阅者的onNext()方法,如下所示:

​​​​​​​onNext

public final void onNext(T x) {
   Subscription s = S.getAndSet(this, Operators.cancelledSubscription());
   if (s == Operators.cancelledSubscription()) {
      Operators.onNextDropped(x, this.initialContext);
      return;
   }
   if (consumer != null) {
      try {
         consumer.accept(x);
      }
      catch (Throwable t) {
         Exceptions.throwIfFatal(t);
         s.cancel();
         doError(t);
      }
   }
   if (completeConsumer != null) {
      try {
         completeConsumer.run();
      }
      catch (Throwable t) {
         Operators.onErrorDropped(t, this.initialContext);
      }
   }
}

时序图

  1. 类关系的设计,与《spring响应式编程系列:总体流程》类似,主要包括数据发布者对象、数据订阅者对象及订阅的消息体对象;
  2. Mono和MonoCreate是数据发布者,LambdaMonoSubscriber是数据订阅者,DefaultMonoSink是订阅的消息体;
  3. 不同点在于,DefaultMonoSink可以通过示例里的Mono.create暴露给业务侧,业务侧的相关业务执行完成之后,可以通过调用该对象success方法,来触发订阅者的回调函数。

​​​​​​​类图

数据发布者

MonoCreate

        MonoCreate与《spring响应式编程系列:总体流程》介绍的类似,都是继承于Mono类,并且实现了CorePublisher和Publisher接口。

        不同点在于,该数据发布者多了一个属性,如下所示:

        final Consumer<MonoSink<T>> callback;

        该属性是一个可以接收所订阅消息体(类型为MonoSink<T>)参数的回调函数,在这里可以将该消息体与对应的业务建立绑定关系,为后续业务执行结束后的回调做准备。

数据订阅者

LambdaMonoSubscriber

        LambdaMonoSubscriber与《spring响应式编程系列:总体流程》介绍的一样。

订阅的消息体

DefaultMonoSink

        DefaultMonoSink与《spring响应式编程系列:总体流程​​​​​​​》介绍的类似,都实现了Subscription接口。

        不同点在于,DefaultMonoSink实现了MonoSink接口,该接口提供了供业务侧调用 的接口方法,如下所示:

void success(@Nullable T value);

        业务侧的相关业务执行完成之后,可以通过调用该接口方法,来触发订阅者的回调函数。

相关文章:

  • 计算机网络的五层结构(物理层、数据链路层、网络层、传输层、应用层)到底是什么?
  • 如何保证线程安全(含典型手段与应用场景)
  • 什么是智能导诊知识库?
  • 平面连杆机构(上)
  • H.264/AVC标准主流开源编解码器编译说明
  • 在分类任务中,显著性分析
  • 【课题推荐】基于场景的改进IMM算法
  • 在线录屏工具(压箱底)-免费高清
  • 为什么vllm能够加快大模型推理速度?
  • SM30 权限检查
  • 实验四 进程调度实验
  • 英语中的介词(preposition)
  • OSPF中DR/BDR的选举
  • 黑马Java基础笔记-4
  • Linux渗透测试
  • 7.Geometric Intersection: Interval
  • 产销协同是什么?产销协同流程有哪些?
  • 一台服务器已经有个python3.11版本了,如何手动安装 Python 3.10,两个版本共存
  • Neo4j 常用查询语句
  • 数据库系统概论(四)关系操作,关系完整性与关系代数
  • 大学2025丨专访南开人工智能学院院长赵新:人工智能未来会变成通识类课程
  • 图像编辑新增一款开源模型,阶跃星辰发布Step1X-Edit
  • 商务部:将积极会同相关部门加快推进离境退税政策的落实落地
  • 年客流超2500万,九岁的上海国际旅游度假区有哪些文旅商体实践?
  • 去年立案侦办侵权假冒案件3.7万起,公安部公布13起案例
  • 朝中社发表评论文章,谴责美军部署B1-B轰炸机至日本