当前位置: 首页 > news >正文

Kotlin学习:5.2、异步数据流 Flow

Flow

  • 一、Flow
    • 1、Flow是什么东西?
    • 2、实现功能
    • 3、特点
    • 4、冷流和热流
    • 5、流的连续性
    • 6、流的构建器
    • 7、流的上下文
    • 8、指定流所在协程
    • 9、流的取消
      • 9.1、超时取消
      • 9.2、主动取消
      • 9.3、密集型任务的取消
    • 10、背压和优化
      • 10.1、buffer 操作符
      • 10.2、 flowOn
      • 10.3、conflate 操作符
      • 10.4、collectLatest 操作符
  • 二、操作符
    • 1、变换操作符
      • 1.1、buffer (缓存)
      • 1.2、map (变换)
        • 1.2.1、map
        • 1.2.2、mapNotNull (不空的下发)
        • 1.2.3、mapLatest
      • 1.3、transform (一转多)
      • 1.4、reduce (累*加减乘除)
      • 1.5、fold(累*加减乘除 and 拼接)
      • 1.6、flatMapConcat (有序变换)
      • 1.7、flatMapMerge (无序变换)
      • 1.8、flatMapLatest (截留)
    • 2、过滤型操作符
      • 2.1、take (截留)
        • 2.1.2、takeWhile
      • 2.2、filter(满足条件下发)
        • 2.2.2、filterNotNull (不空的下发)
        • 2.2.3、filterNot(符合条件的值将被丢弃)
        • 2.2.4、filterInstance (筛选符合类型的值)
      • 2.3、skip 和 drop(跳过)
        • 2.3.2、dropWhile
      • 2.4、distinctUntilChanged (过滤重复)
        • 2.4.2、distinctUntilChangedBy
      • 2.5、single (判断是否一个事件)
      • 2.6、first (截留第一个事件)
      • 2.7、debounce (防抖动)
      • 2.8、conflate
      • 2.9、sample (周期采样)
    • 3、组合型操作符
      • 3.1、count (计数)
      • 3.2、zip (合并元素)
      • 3.3、combine(合并元素)
      • 3.4、merge (合并成流)
      • 3.5、flattenConcat (展平流)
      • 3.6、flattenMerge(展平流)
    • 4、异常操作符
      • 4.1、catch (拦截异常)
      • 4.2、retry (重试)
        • 4.2.2、retryWhen
      • 4.3、withTimeout (超时)
    • 5、辅助操作符
      • 5.1、onXXX
      • 5.2、delay (延时)
      • 5.3、measureTimeMillis (计时)
  • 参考地址

一、Flow

1、Flow是什么东西?

Flow 是有点类似 RxJava 的 Observable

都有冷流和热流之分;
都有流式构建结构;
都包含 map、filter 等操作符。

区别于Observable,Flow可以配合挂起函数使用

2、实现功能

异步返回多个值

可以实现下载功能等,Observable 下发数组时可以实现什么功能,他就能实现什么功能
在这里插入图片描述
当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。

转:https://blog.csdn.net/qq_30382601/article/details/121825461

3、特点

  • flow{…}块中的代码可以挂起
  • 使用flow,suspend修饰符可以省略
  • 流使用emit函数发射值
  • 流使用collect的函数收集值
  • flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。
  • 流的连续性:流收集都是按顺序收集的
  • flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
  • 与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。

转:https://blog.csdn.net/zx_android/article/details/122744370

4、冷流和热流

  • 冷流
    冷流类似冷启动,代码在被用到才会执行,如你需要使用的数据在网络,需要先请求网络才能得到数据
    Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。

  • 热流
    热流类似热启动,代码在用到之前已经准备好,如你请求过网络,数据已经缓存在本地,你只需直接使用即可

5、流的连续性

流的连续性:流收集都是按顺序收集的

在这里插入图片描述

6、流的构建器

如下三种为冷流构建器

  1. flow{emit} .collect{}
  2. flowOf(***).collect{}
  3. (***).asFlow().collect{}
    @Testfun `test flow builder`() = runBlocking<Unit> {flowOf("one", "two", "three").onEach { delay(1000) }.collect { value ->println(value)}(1..3).asFlow().collect { value ->println(value)}flow<Int> {for (i in 11..13) {delay(1000) //假装在一些重要的事情emit(i) //发射,产生一个元素}}.collect { value ->println(value)}}

7、流的上下文

flowOn (多用于切线程)

流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。

    fun simpleFlow3() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow3().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}

如下:流的发射和接收在一个协程内

Flow started Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1

flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)

如下这种写法不被允许

    fun simpleFlow4() = flow<Int> {withContext(Dispatchers.Default) {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}}

那么如何切换协程上下文呢?
flowOn操作符,该函数用于更改流发射的上下文

    fun simpleFlow5() = flow<Int> {println("Flow started ${Thread.currentThread().name}")for (i in 1..3) {delay(1000)emit(i)}}.flowOn(Dispatchers.Default)@Testfun `test flow context`() = runBlocking<Unit> {simpleFlow5().collect { value -> println("Collected $value ${Thread.currentThread().name}") }}

如下:切换上下文成功

Flow started DefaultDispatcher-worker-2 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1

8、指定流所在协程

launchIn 用于指定协程作用域通知flow执行

使用 launchIn 替换 collect 在单独的协程中启动收集流

  • 指定协程
    //事件源private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)@Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
//                .collect {}.launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow
//                .launchIn(this)//这里使用当前上下文传入Flowjob.join()}

打印:

Event: 1 DefaultDispatcher-worker-2 @coroutine#2
Event: 2 DefaultDispatcher-worker-1 @coroutine#2
Event: 3 DefaultDispatcher-worker-3 @coroutine#2
  • 也可以指定当前协程中执行
    @Testfun `test flow launch`() = runBlocking<Unit> {val job = events().onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
//                .collect {}
//            .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow.launchIn(this)//这里使用当前上下文传入Flow//        job.join()}
Event: 1 Test worker @coroutine#2
Event: 2 Test worker @coroutine#2
Event: 3 Test worker @coroutine#2

9、流的取消

流采用和协程同样的协作取消。流可以在挂起函数的挂起的时候取消。

9.1、超时取消

withTimeoutOrNull 不能取消密集型任务

    fun simpleFlow6() = flow<Int> {for (i in 1..300) {delay(1000)emit(i)println("Emitting $i")}}@Testfun `test cancel flow`() = runBlocking<Unit> {withTimeoutOrNull(2500) {simpleFlow6().collect { value -> println(value) }}println("Done")}

9.2、主动取消

cancel

    @Testfun `test cancel flow `() = runBlocking<Unit> {simpleFlow6().collect { value ->if (value == 3) {cancel()}println(value)}println("Done")}

9.3、密集型任务的取消

密集型任务需要流的取消检测

cancel + cancellable

    @Testfun `test cancel flow check`() = runBlocking<Unit> {(1..5).asFlow().cancellable().collect { value ->println(value)if (value == 3) cancel()println("cancel check ${coroutineContext[Job]?.isActive}")}}

10、背压和优化

  • 什么是背压?

在这里插入图片描述

生产者生产的效率大于消费者消费的效率,元素积压

例,演示背压

     fun simpleFlow8() = flow<Int> {for (i in 1..10) {// emit 上面这段代码在collect之前执行delay(100)emit(i) // 调用collect// emit下面这段代码在 collect 之后执行println("Emitting $i ${Thread.currentThread().name}")}}@Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Collected 1 Test worker @coroutine#1
Emitting 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Emitting 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#1
Collected 4 Test worker @coroutine#1
Emitting 4 Test worker @coroutine#1
Collected 5 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Emitting 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Emitting 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Emitting 10 Test worker @coroutine#1
Collected in 3169 ms
  • 如何解决背压?

通过缓存进行性能优化

10.1、buffer 操作符

并发运行流中发射元素的代码

注意:for (i in 1…10) 这里用的是 1到 10,原因是 for循环 有耗时问题,通过打印时间戳在 for (i in 1…x) 上下,发现 for (i in 1…x) 这行代码有时耗时超过200毫秒,目前不知是何问题,特此记录,为方便对比优化时长,使用1到10.

    @Testfun `test flow back pressure buffer`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().buffer(10) //缓存发射事件.collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2398 ms

在这里插入图片描述

10.2、 flowOn

flowOn(),修改流上下文,达到异步处理的效果,从而优化背压

    @Testfun `test flow back pressure flowOn`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().flowOn(Dispatchers.IO).collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 DefaultDispatcher-worker-1 @coroutine#2
Emitting 2 DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 DefaultDispatcher-worker-1 @coroutine#2
Emitting 4 DefaultDispatcher-worker-1 @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 DefaultDispatcher-worker-1 @coroutine#2
Emitting 6 DefaultDispatcher-worker-1 @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 DefaultDispatcher-worker-1 @coroutine#2
Emitting 8 DefaultDispatcher-worker-1 @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 DefaultDispatcher-worker-1 @coroutine#2
Emitting 10 DefaultDispatcher-worker-1 @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2385 ms

10.3、conflate 操作符

conflate(),合并发射项,处理最新的值,不对每个值进行处理;

    @Testfun `test flow back pressure conflate`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().conflate().collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 6 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 8 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 1554 ms

10.4、collectLatest 操作符

collectLatest(),取消并重新发射最后一个值

    @Testfun `test flow back pressure collectLatest`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collectLatest { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 10 Test worker @coroutine#12
Collected in 1648 ms

二、操作符

1、变换操作符

1.1、buffer (缓存)

上面背压有栗子

1.2、map (变换)

1.2.1、map

map 是变换元素

data class Student(var name: String, var age: Int)
    private suspend fun performRequest(age: Int): Student {delay(500)return Student("这是name", age)}@Testfun `test map flow operator`() = runBlocking<Unit> {(1..3).asFlow().map { request -> performRequest(request) }.collect { value -> println(value) }}
Student(name=这是name, age=1)
Student(name=这是name, age=2)
Student(name=这是name, age=3)

1.2.2、mapNotNull (不空的下发)

    @Testfun `test mapNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(2)}.mapNotNull { request ->if (1 == request) {null} else {Student("这是name", request)}}.collect { value -> println(value) }}
Student(name=这是name, age=3)
Student(name=这是name, age=2)

1.2.3、mapLatest

当有新值发送时,如果上个转换还没结束,会取消掉,用法同map

    @Testfun `test mapLatest flow operator`() = runBlocking<Unit> {flow {emit(1)emit(2)emit(3)}.mapLatest {if (2 == it) delay(100L)"it is $it"}.collect {println(it)}}

1.3、transform (一转多)

    @Testfun `test transform flow operator`() = runBlocking<Unit> {(1..3).asFlow().transform { request ->emit("Making request $request")emit(performRequest(request))}.collect { value -> println(value) }}
Making request 1
Student(name=这是name, age=1)
Making request 2
Student(name=这是name, age=2)
Making request 3
Student(name=这是name, age=3)

1.4、reduce (累*加减乘除)

    @Testfun `test reduce operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.reduce { accumulator, value -> accumulator + value })}

1.5、fold(累*加减乘除 and 拼接)

    @Testfun `test fold + operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.fold(3) { accumulator, value -> accumulator + value })}
17
    @Testfun `test fold - operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator - value })}
13
    @Testfun `test fold multiply by operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)}.fold(3) { accumulator, value -> accumulator * value })}
18
    @Testfun `test fold devide operator`() = runBlocking<Unit> {println(flow<Int> {emit(2)emit(3)}.fold(18) { accumulator, value -> accumulator / value })}
3
  • 拼接
    @Testfun `test fold joint operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(2)emit(3)}.fold("拼接") { accumulator, value -> return@fold "$accumulator =+= $value" })}
拼接 =+= 1 =+= 2 =+= 3

1.6、flatMapConcat (有序变换)

元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素会等待。

    @Testfun `test flatMapConcat operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapConcat { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 3
value -> num: 4
value -> num: 5

1.7、flatMapMerge (无序变换)

元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素不会等待。

    @Testfun `test flatMapMerge operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapMerge() { num ->flow {if (3==num){delay(200)}emit("num: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num: 2
value -> num: 4
value -> num: 3
value -> num: 5

1.8、flatMapLatest (截留)

快速执行的事件都正常下发,
当有新值发送时,如果上个转换还没结束,会上取消掉上一个,直接下发新值。

    @Testfun `test flatMapLatest operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.flatMapLatest() { num ->flow {if (3 == num) {delay(200)}emit("num: $num")emit("num2: $num")}}.collect {println("value -> $it")}}
value -> num: 1
value -> num2: 1
value -> num: 2
value -> num2: 2
value -> num: 4
value -> num2: 4
value -> num: 5
value -> num2: 5

2、过滤型操作符

2.1、take (截留)

跟Rxjava一样

    fun numbers() = flow<Int> {try {emit(1)emit(2)println("This line will not execute")emit(3)} finally {println("Finally in numbers")}}@Testfun `test limit length operator`() = runBlocking<Unit> {//take(2),表示 当计数元素被消耗时,原始流被取消numbers().take(2).collect { value -> println(value) }}
1
2
Finally in numbers

2.1.2、takeWhile

找到第一个不满足条件的值,发送它之前的值,和dropWhile相反

    @Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(2)emit(1)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}

如上什么也不会输出;

    @Testfun `test takeWhile operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)emit(4)emit(1)}.takeWhile { it < 2 }.collect { value -> println(value) }}

会输出 1

2.2、filter(满足条件下发)

跟Rxjava一样

    @Testfun `test filter operator`() = runBlocking<Unit> {numbers().filter {it == 2}.collect { value -> println(value) }}

2.2.2、filterNotNull (不空的下发)

    @Testfun `test filterNotNull flow operator`() = runBlocking<Unit> {flow {emit(1)emit(3)emit(null)emit(2)}.filterNotNull ().collect { value -> println(value) }}
1
3
2

2.2.3、filterNot(符合条件的值将被丢弃)

筛选不符合条件的值,相当于filter取反

    @Testfun `test filterNot operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(2)emit(3)}.filterNot {it > 2}.collect { value -> println(value) }}
1
2

2.2.4、filterInstance (筛选符合类型的值)

对标rxjava中的ofType

筛选符合类型的值(不符合类型的值将被丢弃)

    @Testfun `test filterInstance operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit("2")emit(3)emit("str")}.filterIsInstance<String>().collect { value -> println(value) }}
2
str

2.3、skip 和 drop(跳过)

在这里插入图片描述

    @Testfun `test skip operator`() = runBlocking<Unit> {numbers().drop(2).collect { value -> println(value) }}

输出

3

2.3.2、dropWhile

找到第一个不满足条件的值,继续发送它和它之后的值

    @Testfun `test dropWhile operator`() = runBlocking<Unit> {numbers().dropWhile { it <= 2 }.collect { value -> println(value) }}
This line will not execute
3
Finally in numbers

2.4、distinctUntilChanged (过滤重复)

    @Testfun `test distinctUntilChanged operator`() = runBlocking<Unit> {flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.distinctUntilChanged().collect { value -> println(value) }}

2.4.2、distinctUntilChangedBy

判断两个连续值是否重复,可以设置是否丢弃重复值。
去重规则有点复杂,没完全懂

    @Testfun `test distinctUntilChangedBy operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).distinctUntilChangedBy { it.name == "Jack" }.collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=11)
Student(name=Tom, age=10)
Student(name=Jack, age=12)
Student(name=Tom, age=11)

2.5、single (判断是否一个事件)

用于确保 flow 输出值唯一。若只有一个值,则可以正常执行,若输出的值不止只有一个的时候,就会抛出异常:

    @Testfun `test single operator`() = runBlocking<Unit> {try {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.single())} catch (e: Exception) {println("e =$e")}}

如果一个事件,就正常执行;否则异常。

e =java.lang.IllegalArgumentException: Flow has more than one element

2.6、first (截留第一个事件)

    @Testfun `test first operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.first())}
1

2.7、debounce (防抖动)

    @Testfun `test debounce operator`() = runBlocking<Unit> {flowOf(Student(name = "Jack", age = 11),Student(name = "Tom", age = 10),Student(name = "Jack", age = 12),Student(name = "Jack", age = 13),Student(name = "Tom", age = 11)).onEach {if (it.name == "Jack" && it.age == 13)delay(500)}.debounce(500).collect { //第三个Stu将被丢弃println(it.toString())}}
Student(name=Jack, age=12)
Student(name=Tom, age=11)

2.8、conflate

见 10.3、conflate

仅保留最新值, 内部就是 buffer(CONFLATED``)

2.9、sample (周期采样)

固定周期采样 ,给定一个时间周期,保留周期内最后发出的值,其他的值将被丢弃

sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。

    @Testfun `test sample operator`() = runBlocking<Unit> {flow {repeat(10) {delay(50)emit(it)}}.sample(100).collect {println(it)}}
0
2
4
6
8

3、组合型操作符

3.1、count (计数)

    @Testfun `test count operator`() = runBlocking<Unit> {println(flow<Int> {emit(1)emit(1)emit(2)emit(3)emit(3)emit(4)}.count())}

3.2、zip (合并元素)

跟Rxjava一样

    @Testfun `test zip operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.zip(numFlow) { string, num ->"$string$num"}.collect {println("value -> $it")}}

3.3、combine(合并元素)

    @Testfun `test combine operator`() = runBlocking<Unit> {val nameFlow = mutableListOf("小红", "小黑").asFlow()val numFlow = (1..3).asFlow()nameFlow.combine(numFlow) { string, num ->"$string$num"}.collect {println("value -> $it")}}
value -> 小红:1
value -> 小黑:2
value -> 小黑:3

3.4、merge (合并成流)

merge 是将两个flow合并起来,将每个值依次发出来

    @Testfun `test merge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()merge(flow1, flow2).collect { value -> println(value) }}
1
2
one
two
three

3.5、flattenConcat (展平流)

展平操作符 flattenConcat 以顺序方式将给定的流展开为单个流,通俗点讲,减少层级 ,感觉和merge这么像呢,这个不太理解啥用

    @Testfun `test flattenConcat operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenConcat().collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx

3.6、flattenMerge(展平流)

flattenMerge 作用和 flattenConcat 一样,但是可以设置并发收集流的数量

    @Testfun `test flattenMerge operator`() = runBlocking<Unit> {val flow1 = listOf(1, 2).asFlow()val flow2 = listOf("one", "two", "three").asFlow()val flow3 = listOf("x", "xx", "xxx").asFlow()flowOf(flow1, flow2, flow3).flattenMerge(2).collect { value -> println(value) }}
1
2
one
two
three
x
xx
xxx

4、异常操作符

4.1、catch (拦截异常)

对标rxjava 中的 onErrorResumeNext

Exception、Throwable、Error 都会拦截

    @Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it) throw NullPointerException() }.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
    @Testfun `test catch operator`() = runBlocking<Unit> {(1..5).asFlow().onEach { delay(100) }.onEach { if (2 == it)
//                throw Exception("测试 异常")
//                throw Throwable("测试 异常")throw Error("测试 错误")}.catch {emit(110)println("e == $it")}.collect {println("value -> $it")}}
value -> 1
value -> 110
e == java.lang.Error: 测试 错误

4.2、retry (重试)

所有异常错误都拦截

  • 拦截次数
    @Testfun `test retry operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Exception("异常")emit(3)}.retry(2).catch { emit(110) }.collect { value -> println(value) }}
  • 拦截条件
    @Testfun `test retry 2 operator`() = runBlocking<Unit> {flow<Any> {emit(1)emit(2)throw Error("异常")emit(3)}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}

如上,满足拦截条件,所以会一直打印日志

1
2
1
2
1
2
1
2
1
... 不杀死程序一直打印

4.2.2、retryWhen

4.3、withTimeout (超时)

    @Testfun `test retry 2 operator`() = runBlocking<Unit> {withTimeout(2500) {flow<Any> {emit(1)throw Error("异常")}.retry { it.message == "异常" }.catch { emit(110) }.collect { value -> println(value) }}}

输出:

1
1
... 好多个
1
1
1Timed out waiting for 2500 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms(Coroutine boundary)at com.yoshin.kt.kotlindemo20220713.ExampleUnitTest$test retry 2 operator$1.invokeSuspend(ExampleUnitTest.kt:928)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 msat app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)at app//kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)at app//kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)at java.base@11.0.13/java.lang.Thread.run(Thread.java:834)

5、辅助操作符

5.1、onXXX

onXXX 的方法包含

onCompletion 流完成时调用
onStart 流开始时调用
onEach 元素下发时调用,每次下发都调用

对比rxjava 中:
onCompletion == doOnComplete
onStart == doOnSubscribe 或者 doOnLifecycle
onEach == doNext

    @Testfun `test do operator`() = runBlocking<Unit> {(1..5).asFlow().onCompletion { println(" onCompletion == $it ") }.onStart { println(" onStart ") }.onEach { println(" onEach == $it ") }.collect {println("value -> $it")}}

5.2、delay (延时)

延时

    private fun events() = (1..3).asFlow().onEach { delay(100) }.flowOn(Dispatchers.Default)

5.3、measureTimeMillis (计时)

测量代码用时

    @Testfun `test flow back pressure`() = runBlocking<Unit> {val time = measureTimeMillis {simpleFlow8().collect { value ->delay(200)   //处理这个元素消耗 200msprintln("Collected $value ${Thread.currentThread().name}")}}println("Collected in $time ms")}

参考地址

笔记大部分内容来自动脑学院的文章和视频

动脑学院
:https://blog.csdn.net/qq_30382601/article/details/121825461

Kotlin 之 协程(三)Flow异步流
:https://blog.csdn.net/zx_android/article/details/122744370

Android Kotlin之Flow数据流:https://blog.csdn.net/u013700502/article/details/120526170

相关文章:

Kotlin学习:5.2、异步数据流 Flow

Flow一、Flow1、Flow是什么东西&#xff1f;2、实现功能3、特点4、冷流和热流5、流的连续性6、流的构建器7、流的上下文8、指定流所在协程9、流的取消9.1、超时取消9.2、主动取消9.3、密集型任务的取消10、背压和优化10.1、buffer 操作符10.2、 flowOn10.3、conflate 操作符10.…...

EPICS synApps介绍

一、synApps是什么&#xff1f; 1&#xff09; 一个用于同步束线用户的EPICS模块集合。 2&#xff09; EPICS模块 alive, autosave, busy, calc, camac, caputRecorder, dac128V, delaygen, dxp, ip, ip330, ipUnidig, love, mca, measComp, modbus, motor, optics, quadEM,…...

Pycharm和跳板机 连接内网服务器

Pycharm和跳板机 连接内网服务器 建立配置文件 本地配置 .ssh 文件夹下配置 config 文件 Host jumpHostName xxxPort 22User xxxServerAliveInterval 30IdentityFile C:\Users\15284\.ssh\id_rsa # 通过密钥连接Host server # 同样&#xff0c;任意名字&#xff0c;随…...

mysql去重查询的三种方法

文章目录前言一、插入测试数据二、剔除重复数据方法1.方法一&#xff1a;使用distinct2.方法二&#xff1a;使用group by3.方法三&#xff1a;使用开窗函数总结前言 数据库生成环境中经常会遇到表中有重复的数据&#xff0c;或者进行关联过程中产生重复数据&#xff0c;下面介…...

PHP反序列化

文章目录简介POP链构造和Phar://题目[CISCN2019 华北赛区 Day1 Web1]Dropbox字符串逃逸简介 php序列化的过程就是把数据转化成一种可逆的数据结构&#xff0c;逆向的过程就叫做反序列化。 php将数据序列化和反序列化会用到两个函数&#xff1a; serialize 将对象格式化成有序的…...

什么蓝牙耳机打电话效果最好?通话效果好的无线蓝牙耳机

2023年了&#xff0c;TWS耳机虽说近乎人手一只了&#xff0c;但用户换新的需求和呼声依然热火朝天&#xff0c;因为我们想要听音乐、刷视频的时候都得准备&#xff0c;下面整理一些通话效果不错的耳机品牌。 第一款&#xff1a;南卡小音舱蓝牙耳机 动圈单元&#xff1a;13.3m…...

Tesseract centos环境安装,基于springboot图片提取文字

下载tesseract-orc https://github.com/tesseract-ocr/tesseract/tags下载leptonica wget http://www.leptonica.org/source/leptonica-1.78.0.tar.gz解压leptonica tar -xvf leptonica-1.78.0.tar.gz 配置编译安装leptonica 进文件夹 ./configure make make install安装aut…...

Elasticsearch7.8.0版本优化——写入速度优化

目录一、 写入速度优化的概述二、如何写入速度优化2.1、 批量数据提交2.2、 优化存储设备2.31、 合理使用合并2.4、 减少 Refresh2.5、 加大 Flush2.6、 减少副本的数量一、 写入速度优化的概述 ES 的默认配置&#xff0c;是综合了数据可靠性、写入速度、搜索实时性等因素。实使…...

【Redis】Redis主从同步中数据同步原理

【Redis】Redis主从同步中数据同步原理 文章目录【Redis】Redis主从同步中数据同步原理1. 全量同步1.1 判断是否第一次数据同步2. 增量同步3. 优化Redis主从集群4. 总结1. 全量同步 主从第一次同步是全量同步。 数据同步包括以下三个阶段&#xff1a; 在从节点执行slaveof命令…...

Python基础—while循环

(1)while循环&#xff1a; 语法格式&#xff1a; while 条件&#xff1a;   执行语句1……   执行语句2…… 适用条件&#xff1a;无限循环 死循环 while True:print(条件是真的&#xff01;)代码实例&#xff1a; i 0 # 创建一个计数的变量 while i < 5: # Truepr…...

linux基础(管道符,检索,vim和vi编辑使用)

♥️作者&#xff1a;小刘在C站 ♥️个人主页&#xff1a;小刘主页 ♥️每天分享云计算网络运维课堂笔记&#xff0c;努力不一定有收获&#xff0c;但一定会有收获加油&#xff01;一起努力&#xff0c;共赴美好人生&#xff01; ♥️夕阳下&#xff0c;是最美的绽放&#xff0…...

GAN | 代码简单实现生成对抗网络(GAN)(PyTorch)

2014年GAN发表&#xff0c;直到最近大火的AI生成全部有GAN的踪迹&#xff0c;快来简单实现它&#xff01;&#xff01;&#xff01;GAN通过计算图和博弈论的创新组合&#xff0c;他们表明&#xff0c;如果有足够的建模能力&#xff0c;相互竞争的两个模型将能够通过普通的旧反向…...

华为面试题就这?00后卷王直接拿下30k华为offer......

先说一下我的情况&#xff0c;某211本计算机&#xff0c;之前在深圳那边做了大约半年多少儿编程老师&#xff0c;之后内部平调回长沙这边&#xff0c;回来之后发现有点难&#xff0c;这边可能是业绩难做&#xff0c;虚假承诺很厉害&#xff0c;要给那些家长虚假承诺去骗人家&am…...

html的常见标签使用

目录 1.vscode基础操作 2.html基础 语法 3.HTML文件的基本结构标签 4.注释标签 5.标题标签 6.段落标签:p 7.格式化标签 8.图片标签:img 绝对路径 相对路径 网络路径 alt属性 title属性 width/height属性 9.超链接标签:a 10.表格标签 11.列表标签 有序列表 无…...

STM32——毕设智能感应窗户

智能感应窗户 一、功能设计 以STM32F103芯片最小系统作为主控&#xff0c;实现自动监测、阈值设定功能和手动控制功能。 1、自动监测模式下&#xff1a; ① 采用温湿度传感器&#xff0c;实现采集当前环境的温度、湿度数值。 ② 采用光敏传感器&#xff0c;实现判断当前的环境…...

golang archive/tar库的学习

archive/tar 是 Golang 标准库中用于读取和写入 tar 归档文件的包。tar 是一种常见的文件压缩格式&#xff0c;它可以将多个文件和目录打包成单个文件&#xff0c;可以用于文件备份、传输等场景。 以下是一些学习 archive/tar 包的建议&#xff1a; 了解 tar 文件格式。在学习…...

MongoDB 详细教程,这一篇就够啦

文章目录1. 简介2. 特点3. 应用场景4. 安装&#xff08;docker&#xff09;5. 核心概念5.1 库5.2 集合5.3 文档6. 基本操作6.1 库6.1.1 增6.1.2 删6.1.3 改6.1.4 查6.2 集合6.2.1 增6.2.2 删6.2.3 改6.2.4 查6.3. 文档6.3.1 增6.3.2 删6.3.3 改6.3.4 查1. 语法2. 对比语法3. AN…...

python为什么慢

解释性 python是动态类型解释性语言&#xff0c;不管使用哪种解释器 因为“解释性语言”这个概念更多地是指代码的执行方式&#xff0c;而不是编译方式。在解释性语言中&#xff0c;代码在执行时会一行一行地解释并执行&#xff0c;而不是预先编译为机器语言。而即使使用了PyP…...

Android kotlin 组件间通讯 - LiveEventBus 及测试(更新中)

<<返回总目录 文章目录 一、LiveEventBus是什么二、测试一、LiveEventBus是什么 LiveEventBus是Android中组件间传递消息,支持AndroidX,Event:事件,Bus:总线 范围全覆盖的消息总线解决方案 进程内消息发送App内,跨进程消息发送App之间的消息发送更多特性支持 免配…...

linux服务器时间同步

Linux服务器时间同步 需求&#xff1a;两台以上服务器之间的时间同步&#xff0c;以其中一台服务器为时间源&#xff0c;其余服务器同步这台时间源服务器的时间 其中&#xff0c;时间源服务器需要有访问外网权限&#xff0c;不然时间源服务器无法同互联网同步最新的时间&#…...

扒系统CR8记录

目录 终极改造目标 过程记录 参考 为了将一套在线安装的系统&#xff0c;在不了解其架构、各模块细节的基础上&#xff0c;进行扒弄清楚&#xff0c;作以下记录。 终极改造目标 最终的目标&#xff0c;就是只通过CreMedia8_20230207.tar.gz解压 install 就把业务包安装了&…...

面试题(基础篇)

1、你是怎样理解OOP面向对象的面向对象是利于语言对现实事物进行抽象。面向对象具有以下特征&#xff1a;&#xff08;1&#xff09;继承&#xff1a;继承是从已有类得到继承信息创建新类的过程&#xff08;2&#xff09;封装&#xff1a;通常认为封装是把数据和操作数据的方法…...

如何利用ReconPal将自然语言处理技术应用于信息安全

关于ReconPal 网络侦查一直是网络安全研究以及渗透测试活动中最重要的阶段之一&#xff0c;而这一阶段看起来很容易&#xff0c;但往往需要很大的努力和很强的技术才能做好来。首先&#xff0c;我们需要使用正确的工具、正确的查询/语法以及正确的操作&#xff0c;并将所有信息…...

攻略 | 6步帮助中小微企业开拓东盟机电产品市场

如何帮助中小微外贸企业在东盟市场拓展机电产品一般贸易&#xff1f;随着全球化的发展&#xff0c;越来越多的中小微外贸企业开始涉足国际贸易。对于机电产品行业而言&#xff0c;东盟市场是一个非常重要的出口目的地。本文将为您介绍如何帮助中小微外贸企业在东盟市场拓展机电…...

Linux服务器磁盘分区、挂载、卸载及报错处理

整体操作是&#xff1a;先对磁盘进行格式化&#xff0c;格式化后挂载到需要的挂载点&#xff0c;最后添加分区启动表&#xff0c;以便下次系统启动时自动挂载。一、linux分区1、Linux来说wulun有几个分区&#xff0c;分给哪一目录使用&#xff0c;他归根结底只有一个根目录&…...

JavaScript基础语法入门

一. JS简介 JavaScript , 简称JS, JS最初只是为了进行前端页面开发, 但随这后来JS越来越火之后, JS就被赋予了更多的功能, 可以用来开发桌面程序, 手机App, 服务器端的程序等… JS是一种动态类型, 弱类型的脚本语言, 通过解释器运行, 主要在客户端和浏览器上运行, 比如Chrome…...

Linux基础命令-ln创建链接文件

文章目录 ln 命令介绍 命令格式 基本参数 参考实例 1&#xff09; 创建文件的硬链接 2&#xff09;创建文件的软链接 3&#xff09;创建链接文件时&#xff0c;相同目标文件创建备份文件 命令总结 ln 命令介绍 先看下帮助文档中的含义 NAME ln - make links …...

Day21【元宇宙的实践构想07】—— 元宇宙与人工智能

&#x1f483;&#x1f3fc; 本人简介&#xff1a;男 &#x1f476;&#x1f3fc; 年龄&#xff1a;18 &#x1f91e; 作者&#xff1a;那就叫我亮亮叭 &#x1f4d5; 专栏&#xff1a;元宇宙 0.0 写在前面 “元宇宙”在2021年成为时髦的概念。元宇宙到底是什么&#xff1f;元宇…...

MySQL的InnoDB 三种行锁,SQL 语句加了哪些锁?

InnoDB 三种行锁&#xff1a; Record Lock&#xff08;记录锁&#xff09;&#xff1a;锁住某一行记录 Gap Lock&#xff08;间隙锁&#xff09;&#xff1a;锁住一段左开右开的区间 Next-key Lock&#xff08;临键锁&#xff09;&#xff1a;锁住一段左开右闭的区间 哪些语句…...

Java培训:深入解读函数式接口

函数式编程是一种编程规范或一种编程思想&#xff0c;简单可以理解问将运算或实现过程看做是函数的计算。 Java8为了实现函数式编程&#xff0c;提出了3个重要的概念&#xff1a;Lambda表达式、方法引用、函数式接口。现在很多公司都在使用lambda表达式进行代码编写&#xff0c…...

网站个人备案步骤/推广网站源码

实现线程安全总结来说存在四种方法&#xff1a;1. 使用ThreadLocal----主要用于数据的传递2. synchronized----JVm来实现的3. lock----cpu的硬件指令4. 使用Atomic类型----使用CPU的指令来实现5. 并发包中读写分离CopyOnWriteArrayList等...6 ......package J…...

ghost wordpress比较/优化关键词排名提升

正则表达式(regex)的定义 正则表达式是一套特殊字符和格式组成的对字符串进行运算的表达式,用表达式对目标字符串从左到右依次匹配,达成检索、过滤、摘取的目的。 正则表达式的功能 判断用户输入内容是否符合要求。将用户输入的内容与预先定义的表达式比对,若不符合要求则提…...

网站访问量怎么增加/网页制作培训教程

就想发个吐槽文章也不行吗&#xff1f; 抄抄抄&#xff0c;抄NM的头&#xff01;&#xff01;...

咸宁市做网站/百度推广开户渠道公司

java 集合类Array、List、Map区别和联系转载▼java集合类主要分为以下三类&#xff1a;第一类&#xff1a;Array、Arrays第二类&#xff1a;Collection&#xff1a;List、Set第三类&#xff1a;Map&#xff1a;HashMap、HashTable一、Array &#xff0c; ArraysJava所有“存储及…...

wordpress侧边栏tab/网络推广策划书

WinForm 之 窗口最小化到托盘及右键图标显示菜单 Form最小化是指整个Form都缩小到任务栏上&#xff0c;但是窗体以Form的标题栏形式显示在任务栏上&#xff0c; 若是想让Form以Icon的形式显示在任务栏右下角&#xff0c;则需要给Form添加一个NotifyIcon控件。 一、添加NotifyIc…...

如何做公众号小说网站赚钱/俄罗斯引擎搜索

CNC加工中心的高精高效&#xff0c;安全是前提。安全生产离不开优秀的车间管理&#xff0c;设备的精良保养以及丰富的加工经验。 1.预先开机 正式加工前可以进行开机空转&#xff0c;让CNC加工中心主轴空转几分钟&#xff0c;可以让主轴的轴承充分润滑&#xff0c;减少加工误…...