当前位置: 首页 > news >正文

Flow原理

fun main() {runBlocking {launch {flow4.collect{println("---collect-4")}println("---flow4")}}val flow4 = flow<Boolean>{delay(5000)emit(false)
}

我们分析下整个流程 

1.flow为什么之后在collect之后才会发送数据

2.collect的调用流程

我们先看创建flow的方法

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

可以看见是一个方法,返回的是一个SafeFlow对象,然后把我们传入 {
      delay(5000)
       emit(false)
}   
表达式 
的传入到了这个对象中。

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {override suspend fun collectSafely(collector: FlowCollector<T>) {collector.block()}
}

然后,这个对象提供一个方法collectSafely,这个时候可以看到,要发送数据,必须得调用SafeFlow的collectSafely方法才行。

接下来我们分析下collect方法。看源码发现需要传入一个FlowCollector接口实现类

public suspend fun collect(collector: FlowCollector<T>)
public fun interface FlowCollector<in T> {//注意这个emit方法public suspend fun emit(value: T)
}

因为我们是使用flow方法返回的SafeFlow对象去调用的,所以我们看下SafeFlow的collect方法。SafeFlow是继承AbstractFlow的类,所以我们看这个类就行

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {public final override suspend fun collect(collector: FlowCollector<T>) {//创建SafeCollector对象val safeCollector = SafeCollector(collector, coroutineContext)try {//调用实现类的collectSafely方法,把SafeCollector对象传递过去collectSafely(safeCollector)} finally {safeCollector.releaseIntercepted()}}public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

再接着查看collectSafely方法,发现调用到了我们传入的闭包

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {override suspend fun collectSafely(collector: FlowCollector<T>) {//调用我们传入的闭包,而且他是FlowCollector的扩展函数collector.block()}
}

而我们传入的闭包是suspend FlowCollector<T>.() -> Unit扩展函数,这个对象就是我们collect传入的接口实现类,所以在

val flow4 = flow<Boolean>{
      delay(5000)
       emit(false)
}

方法中调用emit()实际就是调用collect传入接口实现类的emit方法


 

相关文章:

  • 使用TortoiseGit进行文件比较
  • JAVA-ArrayList使用方法
  • RecoNIC 入门:SmartNIC 上支持 RDMA 的计算卸载-FPGA-智能网卡-AMD-Xilinx
  • 实战指南:搭建AIRIOT全场景智慧养老管理平台系统全流程解析
  • 使用手机录制rosbag包
  • 高性价比手机如何挑选?
  • 基于 SSE 和分块传输的 Uniapp 微信小程序 实现 流式传输 对话
  • 第十二节:性能优化高频题-shallowRef/shallowReactive使用场景
  • Kotlin await等待多个异步任务都完成后才进行下一步操作
  • web技术与nginx网站环境部署
  • docker搭建swarm集群
  • node.js 实战——mongoDB
  • 【Docker】——在Docker工具上安装创建容器并完成项目部署
  • Flink HA 总结
  • 人工智能大语言模型与AI芯片新进展:技术演进与商业化路径
  • 【3D基础】深入解析OBJ与MTL文件格式:Blender导出模型示例及3D开发应用
  • 【Linux】第十二章 安装和更新软件包
  • deepseek对IBM MQ SSL 证书算法的建议与解答
  • 自动驾驶L4级技术落地:特斯拉、Waymo与华为的路线之争
  • [三分钟]web自动化测试(三):selenium自动化测试常用函数(下)
  • 北京动物园:大熊猫“萌兰”没有参加日本大阪世博会的计划
  • 老凤祥一季度净利减少两成,去年珠宝首饰营收下滑19%
  • 国家发改委答澎湃:将建立和实施育儿补贴制度,深入实施提振消费专项行动
  • 油电同智,安全超充!从上海车展看中国汽车产业先发优势
  • 楼下电瓶车起火老夫妻逃生时被烧伤,消防解析躲火避烟注意事项
  • 人民日报读者点题:规范涉企执法,怎样防止问题反弹、提振企业信心?