Kotlin学习:5.2、异步数据流 Flow
Flow
- 一、Flow
- 1、Flow是什么东西?
- 2、实现功能
- 3、特点
- 4、冷流和热流
- 5、流的连续性
- 6、流的构建器
- 7、流的上下文
- 8、指定流所在协程
- 9、流的取消
- 9.1、超时取消
- 9.2、主动取消
- 9.3、密集型任务的取消
- 10、背压和优化
- 10.1、buffer 操作符
- 10.2、 flowOn
- 10.3、conflate 操作符
- 10.4、collectLatest 操作符
- 二、操作符
- 1、变换操作符
- 1.1、buffer (缓存)
- 1.2、map (变换)
- 1.2.1、map
- 1.2.2、mapNotNull (不空的下发)
- 1.2.3、mapLatest
- 1.3、transform (一转多)
- 1.4、reduce (累*加减乘除)
- 1.5、fold(累*加减乘除 and 拼接)
- 1.6、flatMapConcat (有序变换)
- 1.7、flatMapMerge (无序变换)
- 1.8、flatMapLatest (截留)
- 2、过滤型操作符
- 2.1、take (截留)
- 2.1.2、takeWhile
- 2.2、filter(满足条件下发)
- 2.2.2、filterNotNull (不空的下发)
- 2.2.3、filterNot(符合条件的值将被丢弃)
- 2.2.4、filterInstance (筛选符合类型的值)
- 2.3、skip 和 drop(跳过)
- 2.3.2、dropWhile
- 2.4、distinctUntilChanged (过滤重复)
- 2.4.2、distinctUntilChangedBy
- 2.5、single (判断是否一个事件)
- 2.6、first (截留第一个事件)
- 2.7、debounce (防抖动)
- 2.8、conflate
- 2.9、sample (周期采样)
- 3、组合型操作符
- 3.1、count (计数)
- 3.2、zip (合并元素)
- 3.3、combine(合并元素)
- 3.4、merge (合并成流)
- 3.5、flattenConcat (展平流)
- 3.6、flattenMerge(展平流)
- 4、异常操作符
- 4.1、catch (拦截异常)
- 4.2、retry (重试)
- 4.2.2、retryWhen
- 4.3、withTimeout (超时)
- 5、辅助操作符
- 5.1、onXXX
- 5.2、delay (延时)
- 5.3、measureTimeMillis (计时)
- 参考地址
一、Flow
1、Flow是什么东西?
Flow 是有点类似 RxJava 的 Observable
都有冷流和热流之分;
都有流式构建结构;
都包含 map、filter 等操作符。
区别于Observable,Flow可以配合挂起函数使用
2、实现功能
异步返回多个值
可以实现下载功能等,Observable 下发数组时可以实现什么功能,他就能实现什么功能

当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。
转:https://blog.csdn.net/qq_30382601/article/details/121825461
3、特点
- flow{…}块中的代码可以挂起
- 使用flow,suspend修饰符可以省略
- 流使用emit函数发射值
- 流使用collect的函数收集值
- flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。
- 流的连续性:流收集都是按顺序收集的
- flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
- 与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。
转:https://blog.csdn.net/zx_android/article/details/122744370
4、冷流和热流
-
冷流
冷流类似冷启动,代码在被用到才会执行,如你需要使用的数据在网络,需要先请求网络才能得到数据
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。 -
热流
热流类似热启动,代码在用到之前已经准备好,如你请求过网络,数据已经缓存在本地,你只需直接使用即可
5、流的连续性
流的连续性:流收集都是按顺序收集的

6、流的构建器
如下三种为冷流构建器
- flow{emit} .collect{}
- flowOf(***).collect{}
- (***).asFlow().collect{}
@Testfun `test flow builder`() = runBlocking<Unit> {flowOf("one", "two", "three").onEach { delay(1000) }.collect { value ->println(value)}(1..3).asFlow().collect { value ->println(value)}flow<Int> {for (i in 11..13) {delay(1000) //假装在一些重要的事情emit(i) //发射,产生一个元素}}.collect { value ->println(value)}}
7、流的上下文
flowOn (多用于切线程)
流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
fun simpleFlow3() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow3().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}
如下:流的发射和接收在一个协程内
Flow started Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)
如下这种写法不被允许
fun simpleFlow4() = flow<Int> {withContext(Dispatchers.Default) {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}}
那么如何切换协程上下文呢?
flowOn操作符,该函数用于更改流发射的上下文
fun simpleFlow5() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}.flowOn(Dispatchers.Default)@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow5().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}
如下:切换上下文成功
Flow started DefaultDispatcher-worker-2 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
8、指定流所在协程
launchIn 用于指定协程作用域通知flow执行
使用 launchIn 替换 collect 在单独的协程中启动收集流
- 指定协程
//事件源private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)@Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}.launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow
// .launchIn(this)//这里使用当前上下文传入Flowjob.join()}
打印:
Event: 1 DefaultDispatcher-worker-2 @coroutine#2
Event: 2 DefaultDispatcher-worker-1 @coroutine#2
Event: 3 DefaultDispatcher-worker-3 @coroutine#2
- 也可以指定当前协程中执行
@Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}
// .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow.launchIn(this)//这里使用当前上下文传入Flow// job.join()}
Event: 1 Test worker @coroutine#2
Event: 2 Test worker @coroutine#2
Event: 3 Test worker @coroutine#2
9、流的取消
流采用和协程同样的协作取消。流可以在挂起函数的挂起的时候取消。
9.1、超时取消
withTimeoutOrNull 不能取消密集型任务
fun simpleFlow6() = flow<Int> {for (i in 1..300) {delay(1000)emit(i)println("Emitting $i")}}@Testfun `test cancel flow`() = runBlocking<Unit> {withTimeoutOrNull(2500) {simpleFlow6().collect { value -> println(value) }}println("Done")}
9.2、主动取消
cancel
@Testfun `test cancel flow `() = runBlocking<Unit> {simpleFlow6().collect { value ->if (value == 3) {cancel()}println(value)}println("Done")}
9.3、密集型任务的取消
密集型任务需要流的取消检测
cancel + cancellable
@Testfun `test cancel flow check`() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect { value ->println(value)if (value == 3) cancel()println("cancel check ${coroutineContext[Job]?.isActive}")}}
10、背压和优化
- 什么是背压?

生产者生产的效率大于消费者消费的效率,元素积压
例,演示背压
fun simpleFlow8() = flow<Int> {for (i in 1..10) {// emit 上面这段代码在collect之前执行delay(100)emit(i) // 调用collect// emit下面这段代码在 collect 之后执行println("Emitting $i ${Thread.currentThread().name}")}}@Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Collected 1 Test worker @coroutine#1
Emitting 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Emitting 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#1
Collected 4 Test worker @coroutine#1
Emitting 4 Test worker @coroutine#1
Collected 5 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Emitting 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Emitting 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Emitting 10 Test worker @coroutine#1
Collected in 3169 ms
- 如何解决背压?
通过缓存进行性能优化
10.1、buffer 操作符
并发运行流中发射元素的代码
注意:for (i in 1…10) 这里用的是 1到 10,原因是 for循环 有耗时问题,通过打印时间戳在 for (i in 1…x) 上下,发现 for (i in 1…x) 这行代码有时耗时超过200毫秒,目前不知是何问题,特此记录,为方便对比优化时长,使用1到10.
@Testfun `test flow back pressure buffer`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().buffer(10) //缓存发射事件.collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2398 ms

10.2、 flowOn
flowOn(),修改流上下文,达到异步处理的效果,从而优化背压
@Testfun `test flow back pressure flowOn`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().flowOn(Dispatchers.IO).collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 DefaultDispatcher-worker-1 @coroutine#2
Emitting 2 DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 DefaultDispatcher-worker-1 @coroutine#2
Emitting 4 DefaultDispatcher-worker-1 @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 DefaultDispatcher-worker-1 @coroutine#2
Emitting 6 DefaultDispatcher-worker-1 @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 DefaultDispatcher-worker-1 @coroutine#2
Emitting 8 DefaultDispatcher-worker-1 @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 DefaultDispatcher-worker-1 @coroutine#2
Emitting 10 DefaultDispatcher-worker-1 @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2385 ms
10.3、conflate 操作符
conflate(),合并发射项,处理最新的值,不对每个值进行处理;
@Testfun `test flow back pressure conflate`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().conflate().collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 6 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 8 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 1554 ms
10.4、collectLatest 操作符
collectLatest(),取消并重新发射最后一个值
@Testfun `test flow back pressure collectLatest`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collectLatest { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 10 Test worker @coroutine#12
Collected in 1648 ms
二、操作符
1、变换操作符
1.1、buffer (缓存)
上面背压有栗子
1.2、map (变换)
1.2.1、map
map 是变换元素
data class Student(var name: String, var age: Int)
private suspend fun performRequest(age: Int): Student {delay(500)return Student("这是name", age)}@Testfun `test map flow operator`() = runBlocking<Unit> {(1..3).asFlow().map { request -> performRequest(request) }.collect { value -> println(value) }}
Student(name=这是name, age=1)
Student(name=这是name, age=2)
Student(name=这是name, age=3)
1.2.2、mapNotNull (不空的下发)
@Testfun `test mapNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(2)}.mapNotNull { request ->if (1 == request) {null} else {Student("这是name", request)}}.collect { value -> println(value) }}
Student(name=这是name, age=3)
Student(name=这是name, age=2)
1.2.3、mapLatest
当有新值发送时,如果上个转换还没结束,会取消掉,用法同map
@Testfun `test mapLatest flow operator`() = runBlocking<Unit> {flow {emit(1)emit(2)emit(3)}.mapLatest {if (2 == it) delay(100L)"it is $it"}.collect {println(it)}}
1.3、transform (一转多)
@Testfun `test transform flow operator`() = runBlocking<Unit> {(1..3).asFlow().transform { request ->emit("Making request $request")emit(performRequest(request))}.collect { value -> println(value) }}
Making request 1
Student(name=这是name, age=1)
Making request 2
Student(name=这是name, age=2)
Making request 3
Student(name=这是name, age=3)
1.4、reduce (累*加减乘除)
@Testfun `test reduce operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.reduce { accumulator, value -> accumulator + value })}
1.5、fold(累*加减乘除 and 拼接)
- 加
@Testfun `test fold + operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.fold(3) { accumulator, value -> accumulator + value })}
17
- 减
@Testfun `test fold - operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator - value })}
13
- 乘
@Testfun `test fold multiply by operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)}.fold(3) { accumulator, value -> accumulator * value })}
18
- 除
@Testfun `test fold devide operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator / value })}
3
- 拼接
@Testfun `test fold joint operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(2)emit(3)}.fold("拼接") { accumulator, value -> return@fold "$accumulator =+= $value" })}
拼接 =+= 1 =+= 2 =+= 3
1.6、flatMapConcat (有序变换)
元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素会等待。
@Testfun `test flatMapConcat operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapConcat { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 3
value -> num: 4
value -> num: 5
1.7、flatMapMerge (无序变换)
元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素不会等待。
@Testfun `test flatMapMerge operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapMerge() { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 4
value -> num: 3
value -> num: 5
1.8、flatMapLatest (截留)
快速执行的事件都正常下发,
当有新值发送时,如果上个转换还没结束,会上取消掉上一个,直接下发新值。
@Testfun `test flatMapLatest operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapLatest() { num ->flow {if (3 == num) {delay(200)}emit("num: $num")emit("num2: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num2: 1
value -> num: 2
value -> num2: 2
value -> num: 4
value -> num2: 4
value -> num: 5
value -> num2: 5
2、过滤型操作符
2.1、take (截留)
跟Rxjava一样
fun numbers() = flow<Int> {try {emit(1)emit(2)println("This line will not execute")emit(3)} finally {println("Finally in numbers")}}@Testfun `test limit length operator`() = runBlocking<Unit> {//take(2),表示 当计数元素被消耗时,原始流被取消numbers().take(2).collect { value -> println(value) }}
1
2
Finally in numbers
2.1.2、takeWhile
找到第一个不满足条件的值,发送它之前的值,和dropWhile相反
@Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(2)emit(1)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}
如上什么也不会输出;
@Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}
会输出 1
2.2、filter(满足条件下发)
跟Rxjava一样
@Testfun `test filter operator`() = runBlocking<Unit> {numbers().filter {it == 2}.collect { value -> println(value) }}
2.2.2、filterNotNull (不空的下发)
@Testfun `test filterNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(null)emit(2)}.filterNotNull ().collect { value -> println(value) }}
1
3
2
2.2.3、filterNot(符合条件的值将被丢弃)
筛选不符合条件的值,相当于filter取反
@Testfun `test filterNot operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)}.filterNot {it > 2}.collect { value -> println(value) }}
1
2
2.2.4、filterInstance (筛选符合类型的值)
对标rxjava中的ofType
筛选符合类型的值(不符合类型的值将被丢弃)
@Testfun `test filterInstance operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit("2")emit(3)emit("str")}.filterIsInstance<String>().collect { value -> println(value) }}
2
str
2.3、skip 和 drop(跳过)

@Testfun `test skip operator`() = runBlocking<Unit> {numbers().drop(2).collect { value -> println(value) }}
输出
3
2.3.2、dropWhile
找到第一个不满足条件的值,继续发送它和它之后的值
@Testfun `test dropWhile operator`() = runBlocking<Unit> {numbers().dropWhile { it <= 2 }.collect { value -> println(value) }}
This line will not execute
3
Finally in numbers
2.4、distinctUntilChanged (过滤重复)
@Testfun `test distinctUntilChanged operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.distinctUntilChanged().collect { value -> println(value) }}
2.4.2、distinctUntilChangedBy
判断两个连续值是否重复,可以设置是否丢弃重复值。
去重规则有点复杂,没完全懂
@Testfun `test distinctUntilChangedBy operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).distinctUntilChangedBy { it.name == "Jack" }.collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=11)
Student(name=Tom, age=10)
Student(name=Jack, age=12)
Student(name=Tom, age=11)
2.5、single (判断是否一个事件)
用于确保 flow 输出值唯一。若只有一个值,则可以正常执行,若输出的值不止只有一个的时候,就会抛出异常:
@Testfun `test single operator`() = runBlocking<Unit> {try {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.single())} catch (e: Exception) {println("e =$e")}}
如果一个事件,就正常执行;否则异常。
e =java.lang.IllegalArgumentException: Flow has more than one element
2.6、first (截留第一个事件)
@Testfun `test first operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.first())}
1
2.7、debounce (防抖动)
@Testfun `test debounce operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).onEach {if (it.name == "Jack" && it.age == 13)delay(500)}.debounce(500).collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=12)
Student(name=Tom, age=11)
2.8、conflate
见 10.3、conflate
仅保留最新值, 内部就是 buffer(CONFLATED``)
2.9、sample (周期采样)
固定周期采样 ,给定一个时间周期,保留周期内最后发出的值,其他的值将被丢弃
sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。
@Testfun `test sample operator`() = runBlocking<Unit> {flow {repeat(10) {delay(50)emit(it)}}.sample(100).collect {println(it)}}
0
2
4
6
8
3、组合型操作符
3.1、count (计数)
@Testfun `test count operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.count())}
3.2、zip (合并元素)
跟Rxjava一样
@Testfun `test zip operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.zip(numFlow) { string, num ->"$string:$num"}.collect {println("value -> $it")}}
3.3、combine(合并元素)
@Testfun `test combine operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.combine(numFlow) { string, num ->"$string:$num"}.collect {println("value -> $it")}}
value -> 小红:1
value -> 小黑:2
value -> 小黑:3
3.4、merge (合并成流)
merge 是将两个flow合并起来,将每个值依次发出来
@Testfun `test merge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()merge(flow1, flow2).collect { value -> println(value) }}
1
2
one
two
three
3.5、flattenConcat (展平流)
展平操作符 flattenConcat 以顺序方式将给定的流展开为单个流,通俗点讲,减少层级 ,感觉和merge这么像呢,这个不太理解啥用
@Testfun `test flattenConcat operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenConcat().collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx
3.6、flattenMerge(展平流)
flattenMerge 作用和 flattenConcat 一样,但是可以设置并发收集流的数量
@Testfun `test flattenMerge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenMerge(2).collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx
4、异常操作符
4.1、catch (拦截异常)
对标rxjava 中的 onErrorResumeNext
Exception、Throwable、Error 都会拦截
@Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it) throw NullPointerException() }.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
@Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it)
// throw Exception("测试 异常")
// throw Throwable("测试 异常")throw Error("测试 错误")}.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
value -> 1
value -> 110
e == java.lang.Error: 测试 错误
4.2、retry (重试)
所有异常错误都拦截
- 拦截次数
@Testfun `test retry operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Exception("异常")emit(3)}.retry(2).catch { emit(110) }.collect { value -> println(value) }}
- 拦截条件
@Testfun `test retry 2 operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Error("异常")emit(3)}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}
如上,满足拦截条件,所以会一直打印日志
1
2
1
2
1
2
1
2
1
... 不杀死程序一直打印
4.2.2、retryWhen
4.3、withTimeout (超时)
@Testfun `test retry 2 operator`() = runBlocking<Unit> {withTimeout(2500) {flow<Any> {emit(1)throw Error("异常")}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}}
输出:
1
1
... 好多个
1
1
1Timed out waiting for 2500 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms(Coroutine boundary)at com.yoshin.kt.kotlindemo20220713.ExampleUnitTest$test retry 2 operator$1.invokeSuspend(ExampleUnitTest.kt:928)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 msat app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)at app//kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)at app//kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)at java.base@11.0.13/java.lang.Thread.run(Thread.java:834)
5、辅助操作符
5.1、onXXX
onXXX 的方法包含
onCompletion 流完成时调用
onStart 流开始时调用
onEach 元素下发时调用,每次下发都调用
对比rxjava 中:
onCompletion == doOnComplete
onStart == doOnSubscribe 或者 doOnLifecycle
onEach == doNext
@Testfun `test do operator`() = runBlocking<Unit> {(1..5).asFlow().onCompletion { println(" onCompletion == $it ") }.onStart { println(" onStart ") }.onEach { println(" onEach == $it ") }.collect {println("value -> $it")}}
5.2、delay (延时)
延时
private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)
5.3、measureTimeMillis (计时)
测量代码用时
@Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200) //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
参考地址
笔记大部分内容来自动脑学院的文章和视频
动脑学院
:https://blog.csdn.net/qq_30382601/article/details/121825461
Kotlin 之 协程(三)Flow异步流
:https://blog.csdn.net/zx_android/article/details/122744370
Android Kotlin之Flow数据流:https://blog.csdn.net/u013700502/article/details/120526170
相关文章:
Kotlin学习:5.2、异步数据流 Flow
Flow一、Flow1、Flow是什么东西?2、实现功能3、特点4、冷流和热流5、流的连续性6、流的构建器7、流的上下文8、指定流所在协程9、流的取消9.1、超时取消9.2、主动取消9.3、密集型任务的取消10、背压和优化10.1、buffer 操作符10.2、 flowOn10.3、conflate 操作符10.…...
EPICS synApps介绍
一、synApps是什么? 1) 一个用于同步束线用户的EPICS模块集合。 2) EPICS模块 alive, autosave, busy, calc, camac, caputRecorder, dac128V, delaygen, dxp, ip, ip330, ipUnidig, love, mca, measComp, modbus, motor, optics, quadEM,…...
Pycharm和跳板机 连接内网服务器
Pycharm和跳板机 连接内网服务器 建立配置文件 本地配置 .ssh 文件夹下配置 config 文件 Host jumpHostName xxxPort 22User xxxServerAliveInterval 30IdentityFile C:\Users\15284\.ssh\id_rsa # 通过密钥连接Host server # 同样,任意名字,随…...
mysql去重查询的三种方法
文章目录前言一、插入测试数据二、剔除重复数据方法1.方法一:使用distinct2.方法二:使用group by3.方法三:使用开窗函数总结前言 数据库生成环境中经常会遇到表中有重复的数据,或者进行关联过程中产生重复数据,下面介…...
PHP反序列化
文章目录简介POP链构造和Phar://题目[CISCN2019 华北赛区 Day1 Web1]Dropbox字符串逃逸简介 php序列化的过程就是把数据转化成一种可逆的数据结构,逆向的过程就叫做反序列化。 php将数据序列化和反序列化会用到两个函数: serialize 将对象格式化成有序的…...
什么蓝牙耳机打电话效果最好?通话效果好的无线蓝牙耳机
2023年了,TWS耳机虽说近乎人手一只了,但用户换新的需求和呼声依然热火朝天,因为我们想要听音乐、刷视频的时候都得准备,下面整理一些通话效果不错的耳机品牌。 第一款:南卡小音舱蓝牙耳机 动圈单元:13.3m…...
Tesseract centos环境安装,基于springboot图片提取文字
下载tesseract-orc https://github.com/tesseract-ocr/tesseract/tags下载leptonica wget http://www.leptonica.org/source/leptonica-1.78.0.tar.gz解压leptonica tar -xvf leptonica-1.78.0.tar.gz 配置编译安装leptonica 进文件夹 ./configure make make install安装aut…...
Elasticsearch7.8.0版本优化——写入速度优化
目录一、 写入速度优化的概述二、如何写入速度优化2.1、 批量数据提交2.2、 优化存储设备2.31、 合理使用合并2.4、 减少 Refresh2.5、 加大 Flush2.6、 减少副本的数量一、 写入速度优化的概述 ES 的默认配置,是综合了数据可靠性、写入速度、搜索实时性等因素。实使…...
【Redis】Redis主从同步中数据同步原理
【Redis】Redis主从同步中数据同步原理 文章目录【Redis】Redis主从同步中数据同步原理1. 全量同步1.1 判断是否第一次数据同步2. 增量同步3. 优化Redis主从集群4. 总结1. 全量同步 主从第一次同步是全量同步。 数据同步包括以下三个阶段: 在从节点执行slaveof命令…...
Python基础—while循环
(1)while循环: 语法格式: while 条件: 执行语句1…… 执行语句2…… 适用条件:无限循环 死循环 while True:print(条件是真的!)代码实例: i 0 # 创建一个计数的变量 while i < 5: # Truepr…...
linux基础(管道符,检索,vim和vi编辑使用)
♥️作者:小刘在C站 ♥️个人主页:小刘主页 ♥️每天分享云计算网络运维课堂笔记,努力不一定有收获,但一定会有收获加油!一起努力,共赴美好人生! ♥️夕阳下,是最美的绽放࿰…...
GAN | 代码简单实现生成对抗网络(GAN)(PyTorch)
2014年GAN发表,直到最近大火的AI生成全部有GAN的踪迹,快来简单实现它!!!GAN通过计算图和博弈论的创新组合,他们表明,如果有足够的建模能力,相互竞争的两个模型将能够通过普通的旧反向…...
华为面试题就这?00后卷王直接拿下30k华为offer......
先说一下我的情况,某211本计算机,之前在深圳那边做了大约半年多少儿编程老师,之后内部平调回长沙这边,回来之后发现有点难,这边可能是业绩难做,虚假承诺很厉害,要给那些家长虚假承诺去骗人家&am…...
html的常见标签使用
目录 1.vscode基础操作 2.html基础 语法 3.HTML文件的基本结构标签 4.注释标签 5.标题标签 6.段落标签:p 7.格式化标签 8.图片标签:img 绝对路径 相对路径 网络路径 alt属性 title属性 width/height属性 9.超链接标签:a 10.表格标签 11.列表标签 有序列表 无…...
STM32——毕设智能感应窗户
智能感应窗户 一、功能设计 以STM32F103芯片最小系统作为主控,实现自动监测、阈值设定功能和手动控制功能。 1、自动监测模式下: ① 采用温湿度传感器,实现采集当前环境的温度、湿度数值。 ② 采用光敏传感器,实现判断当前的环境…...
golang archive/tar库的学习
archive/tar 是 Golang 标准库中用于读取和写入 tar 归档文件的包。tar 是一种常见的文件压缩格式,它可以将多个文件和目录打包成单个文件,可以用于文件备份、传输等场景。 以下是一些学习 archive/tar 包的建议: 了解 tar 文件格式。在学习…...
MongoDB 详细教程,这一篇就够啦
文章目录1. 简介2. 特点3. 应用场景4. 安装(docker)5. 核心概念5.1 库5.2 集合5.3 文档6. 基本操作6.1 库6.1.1 增6.1.2 删6.1.3 改6.1.4 查6.2 集合6.2.1 增6.2.2 删6.2.3 改6.2.4 查6.3. 文档6.3.1 增6.3.2 删6.3.3 改6.3.4 查1. 语法2. 对比语法3. AN…...
python为什么慢
解释性 python是动态类型解释性语言,不管使用哪种解释器 因为“解释性语言”这个概念更多地是指代码的执行方式,而不是编译方式。在解释性语言中,代码在执行时会一行一行地解释并执行,而不是预先编译为机器语言。而即使使用了PyP…...
Android kotlin 组件间通讯 - LiveEventBus 及测试(更新中)
<<返回总目录 文章目录 一、LiveEventBus是什么二、测试一、LiveEventBus是什么 LiveEventBus是Android中组件间传递消息,支持AndroidX,Event:事件,Bus:总线 范围全覆盖的消息总线解决方案 进程内消息发送App内,跨进程消息发送App之间的消息发送更多特性支持 免配…...
linux服务器时间同步
Linux服务器时间同步 需求:两台以上服务器之间的时间同步,以其中一台服务器为时间源,其余服务器同步这台时间源服务器的时间 其中,时间源服务器需要有访问外网权限,不然时间源服务器无法同互联网同步最新的时间&#…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理
引言 Bitmap(位图)是Android应用内存占用的“头号杀手”。一张1080P(1920x1080)的图片以ARGB_8888格式加载时,内存占用高达8MB(192010804字节)。据统计,超过60%的应用OOM崩溃与Bitm…...
论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
R 语言科研绘图第 55 期 --- 网络图-聚类
在发表科研论文的过程中,科研绘图是必不可少的,一张好看的图形会是文章很大的加分项。 为了便于使用,本系列文章介绍的所有绘图都已收录到了 sciRplot 项目中,获取方式: R 语言科研绘图模板 --- sciRplothttps://mp.…...
抽象类和接口(全)
一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...
__VUE_PROD_HYDRATION_MISMATCH_DETAILS__ is not explicitly defined.
这个警告表明您在使用Vue的esm-bundler构建版本时,未明确定义编译时特性标志。以下是详细解释和解决方案: 问题原因: 该标志是Vue 3.4引入的编译时特性标志,用于控制生产环境下SSR水合不匹配错误的详细报告1使用esm-bundler…...
中科院1区顶刊|IF14+:多组学MR联合单细胞时空分析,锁定心血管代谢疾病的免疫治疗新靶点
中科院1区顶刊|IF14:多组学MR联合单细胞时空分析,锁定心血管代谢疾病的免疫治疗新靶点 当下,免疫与代谢性疾病的关联研究已成为生命科学领域的前沿热点。随着研究的深入,我们愈发清晰地认识到免疫系统与代谢系统之间存在着极为复…...
SFTrack:面向警务无人机的自适应多目标跟踪算法——突破小尺度高速运动目标的追踪瓶颈
【导读】 本文针对无人机(UAV)视频中目标尺寸小、运动快导致的多目标跟踪难题,提出一种更简单高效的方法。核心创新在于从低置信度检测启动跟踪(贴合无人机场景特性),并改进传统外观匹配算法以关联此类检测…...
