文章 16
浏览 11956
Kotlin协程

Kotlin协程

Kotlin协程

协程

协程的挂起和恢复

阻塞:点98号技师,98号正在忙,就只有等着98号

挂起:点98号技师,98号正在忙,让前台记下来,就可以看点FBI电影

常规的函数操作:invoke(call)return

协程新增了suspendresume

suspend:也称为挂起或者暂停,用于暂停执行当前协程,并保存所有局部变量

resume:用于让已暂停的协程从其暂停的地方继续执行

协程的调度器

所有的协程都必须在调度器中运行,即使他们在主线程运行也是如此

Dispatchers.MainAndroid是的主线程,用来处理UI交互和一些轻量级的任务

调用suspend函数、调用UI函数、更新LiveData

Dispatchers.IO:非主线程,专为磁盘和网络IO进行了优化

数据库、文件读写、网络处理

Dispatchers.Default:非主线程,专为CPU密集型任务进行了优化

数组排序、JSON数据解析、处理差异判断

如果是在使用retrofit的话,使用的suspend函数,会自动把函数放在IO线程中

CoroutineScope
任务泄漏

当某个协程任务丟失,无法追踪,会导致内存、CPU、磁盘等资源浪费,甚至发送一个无用的网络请求, 这种情况称为任务泄漏
为了能够避免协程泄漏,Kotlin引入了结构化并发机制

使用结构化并发可以做到:
取消任务, 当某项任务不再需要时取消它。
追踪任务, 当任务正在执行是,追踪它。
发出错误信号, 当协程失败时,发出错误信号表明有错误发生

定义协程必须指定其CoroutineScope, 它会跟踪所有协程, 同样它还可以取消
由它所启动的所有协 程

GlobalScope:生命周期是process级别的,即使ActivityFragment已经被销毁, 协程仍然在执行。
MainScope:在Activity中使用, 可以在onDestroy中取消协程(设置变量,手动取消)
viewModelScope:只能在ViewModel中使用, 绑定ViewModel的生命周期
lifecycleScope:只能在Activity、 Fragment中使用, 会绑定Activity和Fragment的生命周期。

 1class MainActivity:AppCompatActivity(),CoroutineScope by MainScope(){
 2    onCreate(){
 3        launch{
 4            
 5        }
 6    onDestroy(){
 7        cancel()
 8    }
 9    }
10}//通过委托的方式,把MainScope的上下文,给了CoroutineScope,那么就有委托对象的东西了,可以直接用
协程构建器

launchasync构建器都是用来启动新协程

launch,返回一个Job并且不附带任何结果值

async,返回一个Deferred,Deferred也是一个Job,可以使用.await()在一个延期的值上得到他的最终结果,就是能有一个返回值

等待协程作业

launch返回的Job使用.join()等待他结束

async返回的Job使用await()有同样的效果

都是挂起函数,不会堵塞主线程

组合并发

使用async包含挂起函数得到Job

多个Job在一行代码里面.await()就能做到并发

协程的启动模式

DEFAULT:协程创建之后,立即开始调度,如果在调度之前协程被取消,其将直接进入取消的响应的状态。

ATOMIC:协程创建之后,立即开始调度,协程执行到第一个挂起点之前不响应取消

LAZY:只有协程被需要时,包括主动调用协程的start、join、await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。

UNDISPATCHED: 协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正的挂起的点

ATOMIC就是在执行delay()之前是不会响应取消的,就是他之前代码是肯定会完成的

LAZY就是只有在被需要的时候才会被调用,而不是自动调用

UNDISPATCHED不会等待调度,而是立即执行

协程的作用域构建器

coroutineScoperunBlocking

runBlocking是常规函数,coroutineScope是挂起函数

他们都会等待其协程体以及所有子协程结束,主要区别在于runBlocking方法会阻塞当前线程来等待,而coroutineScope只是挂起,会释放底层线程用于其他用途

coroutineScopesupervisorScope

coroutineScope一个协程失败了,所有其他兄弟协程也会被取消

supervisorScope一个协程失败了,不会影响其他兄弟协程

Job对象以及其生命周期

对于每一个创建的协程(通过launch或者async)会返回一个Job实例, 该实例是协程的唯一标示,并且负责管理协程的生命周期。

一个任务可以包含一系列状态:新创建New、 活跃Active、完成中Completing、 已完成Completed、取消中Cancelling和已取消Cancelled 虽然我们无法直接访问这些状态,但是我们可以访问Job的属性:isActiveisCancelledisCompleted

如果协程处于活跃状态,协程运行出错或者调用job.cancel()都会将当前任务置为取消中 Cancelling 状态isActive = false, isCancelled = true 当所有的子协程都完成后,协程会进入已取消Cancelled状态, 此时 isCompleted = true

协程的取消

取消作用域会取消它的子协程

被取消的子协程并不会影响其余的兄弟协程

协程通过抛出一个特殊的异常CancellationException来处理取消操作

所有kotlinx.coroutines的挂起函数(withContext``delay等)都是可以取消的

使用launch之后,可以创建返回值变量,对变量进行cancel或者join操作,cancel是取消,join是等待结束操作,可以使用cancelAndJoin()来操作。

为什么要cancelAndJoin()呢,因为取消是ing,还得等待他完成

一段协程代码必须协作才能被取消,所有的挂起函数都是可以被取消的,必须先检查协程的取消,并在取消的时候抛出CancellationException然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的
算作异常的,但是异常抛出会被作用域默认认为不是异常,如果使用try...catch...finally是可以抛出异常的
检查是使用isActive,是CoroutineScope中的扩展属性

1launch(Dispatchers.Default){
2while(isActive){
3xxx
4}
finally中释放资源

如果直接在finally块里面调用挂起函数的任何行为都会抛出CancellationException(因为这里持续运行的代码是可以被取消的)
使用withContext(NonCancellable)finally里面使用协程,只有finally里面使用协程才可以

超时

使用withTimeout函数来限制时间,会抛出TimeoutCancellationException,它是CancellationException的子类。CancellationException会被认为是一个结束协程的正常原因。
使用withTimeoutOrNull如果抛出异常,就返回一个null3
这一类的就是把超时的代码包装在一个try...catch里面来代替抛出一个异常

1val result = withTimeoutOrNull(1000L){
2    repeat(400){ i ->
3        println("I'm sleeping &i")
4        delay(400)
5    }
6    "OK"//null返回值
7}
 1Resource{
 2    init{
 3        a++;
 4    }
 5    fun close(){
 6        a--;
 7    }
 8}
 910launch{
11    var resource:Resource? = null
12    try{
13        withTimout(60){
14            delay(59)
15            resource = Resource()
16        }
17    }finally{
18        resource?.close()
19    }
20}

释放资源的操作,放入finally当中,否则一旦超时,释放资源的操作就不会进行

CPU密集型任务取消

isActive是一个可以被使用在CoroutineScope中的扩展属性,检查Job是否处于活跃状态。
ensureActive(),如果job处于非活跃状态, 这个方法会立即抛出异常。
yield函数会检查所在协程的状态,如果已经取消,则抛出CancellationException予以响应。此外,它还会尝试出让线程的执行权,给其他协程提供执行机会。

协程上下文

launch()每次使用这个的时候都会指定一个协程上下文,由以下组成

CoroutineScope是一组用于定义协程行为的元素

Job:控制协程的生命周期

CoroutineDispatcher:向合适的协程分发任务

CoroutineName:协程的名称,调试的时候很有用

CoroutineExceptionHandler:处理未被捕捉的异常

 1launch(Dispatchers.Default+CoroutineName("test")){
 2    println("workingthread${Yhread.currentThread().name}")
 3}
 4 5 6val scope = CoroutineScope(Job()+Dispatchers.Default+CoroutineName("test"))
 7val job =  scope.launch{
 8    println("${coroutineContext[Job]} workingthread${Yhread.currentThread().name}")
 9    val result = async{
10        println("${coroutineContext[Job]} workingthread${Yhread.currentThread().name}")
11        "OK"
12    }.await()
13    job.join()
14}

这里可以用相加是因为源码进行了运算符重载

对于新创建的协程,他的CoroutineContext会包含一个全新的Job实例,它会帮助我们控制协程的生命周期。而剩下的元素会从CoroutimeContext的父类继承,该父类可能是另一个协程或者创建该协程的CorountineScope

协程上下文的继承

协程的上下文 = 默认值 + 继承的CoroutineContext+ 参数

一些元素包含默认值:Dispatchers.Default是默认的CoroutineDispatcher,以及coroutine作为默认的CoroutineName

继承的CoroutineContextCoroutineScope或者其父协程的CoroutineContext

传入的协程构建器的参数的优先级高于继承的上下文参数,因此会覆盖对应的参数值

1val coroutineExceptionHandler = CoroutineExceptionHandler{ _,exception ->
2    println("Caught $exception")
3}
4val scope = CoroutineScope(Job() + Dispatchers.Main + coroutineExceptionHandler)

覆盖之后的协程的会是一个新的job,和之前的那个job不一样

异常的传播

协程构建器的有两种形式:自动传播异常(launchactor),向用户暴露异常(asyncproduce)当这些构建器用于创建一个根协程的时候(该协程不是另一个协程的子协程),前者这类构建器,异常会在它发生的第一时间被抛出,而后者则依赖用户来最终消费异常,例如通过awaitreceive

其他协程(非根协程)所创建的协程中,产生的异常总是会被传播(直接抛出)

当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给它的父级。接下来父级会进行下面几步操作:

取消它的子级

取消它自己

将异常传播并传递给它的父级

SupervisorJob

使用SupervisorJob时,一个子协程的运行失败不会影响到其他子协程。SupervisorJob不会传播异常给它的父级,它会让子协程自己处理异常

这种需求常见于在作用域内定义作业的UI组件,如果任何一个UI的子作业执行失败了,它并不总是有必要取消整个UI组件,但是如果UI组件被销毁了,由于它的结果不再被需要了它就有必要使所有的子作业执行失败。

supervisorScope

SupervisorJob效果相同,但是如果在作用域自身执行失败的时候,所有的子协程都会取消

异常的捕获

使用CoroutineExceptionHandler对协程的异常进行捕获

以下条件满足的时候,异常就会被捕获:

时机:异常是被自动抛出异常的协程所抛出(使用launch而不是async时)

位置:在CoroutineScopeCoroutineContext中或在一个根协程(CoroutineScope或者supervisorScope的直接子协程)中.

不能在非supervisorScope的子协程哟

捕获异常防止程序崩溃

Android中全局异常处理

全局异常处理器可以获取到所有协程未处理的未捕获异,不过它并不能对异常进行捕获,虽然不能阻止程序崩溃,全局异常处理器在程序调试和异常上报等场景中仍然有非常大的用处
我们需要在classpath下面创建META-INF/services目录,并在其中创建一个名为kotlinx.coroutines.CoroutineExceptionHandler的文件,文件内容就是我们的全局异常处理器的全类名(继承CoroutineExceptionHandler)

取消与异常

取消与异常紧密相关,协程内部使用CancellationException来进行取消, 这个异常会被
忽略
当子协程被取消时,不会取消它的父协程。
如果一个协程遇到了CancellationException以外的异常,它将使用该异常取消它的父协程。当父协程的所有子协程都结束后, 异常才会被父协程处理。

异常聚合

当协程的多个子协程因为异常而失败时,一般情况下取第一个异常进行处理.在第一个异常之后发生的所有其他异常,都将被绑定到第一个异常之上

CoroutineExceptionHanlder中的exception.suppressed.contentToString()获得更多其他的异常信息

Flow

 1//一次性返回了多个值,同步
 2    fun simpleList():List<Int> = listOf(1,2,3)
 3 4    //间断返回多个值,同步
 5    fun simpleSequence():Sequence<Int> = sequence {
 6        for(i in 1..3){
 7            Thread.sleep(1000)//假装在计算
 8            //delay(1000)//不是谁都能挂起的
 9            yield(i)
10        }
11    }
12    //返回多个值,异步,但是是一次的返回多个值
13    suspend fun simpleList2():List<Int>{
14        delay(1000)
15        return listOf(1,2,3)
16    }
17    //间断返回多个值,异步
18    fun simpleFlow() = flow<Int>{
19        for (i in 1..3){
20            delay(1000)
21            emit(i)//发射,产生一个元素
22        }
23    }

image-20210723174248993

名为flowFlow类型构建器函数

flow{...}构建块中的代码可以挂起

函数simpleFlow不再标有suspend修饰符

流使用emit函数发射值

流使用collect函数收集值

image-20210723174849252

冷流

Flow是一种类似于序列的冷流,flow构建器直到被调用的时候才会被执行,如果启动两次冷流,那么他们就会执行两次

热流

不管他有没有接受,有就发射

流的连续性

流的每次单独收集都是按顺序执行的,除非使用特殊操作符

从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符

 1@Test
 2    fun `test flow continuation`() = runBlocking {
 3        (1..5).asFlow().filter {//过滤器
 4            it % 2 == 0
 5        }.map {
 6            println("Map $it")
 7            "string $it"
 8        }.collect {
 9            println("Collect $it")
10        }
11    }
流构建器

flowOf构建器定义了一个发射固定值集的流

使用.asFlow()扩展函数,可以将各种集合与序列转换为流

流的上下文

流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存

flow{...}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)

flowOn操作符,该函数用于更改流发射的上下文(不能使用withContext来改变)

启动流

使用launchIn替代collect我们可以在单独的协程中启动流的收集

注意launchIn参数是一个作用域,用作用域包裹调度器就行

 1//事件源
 2    fun events() = (1..3)
 3        .asFlow()
 4        .onEach { delay(100) }
 5        .flowOn(Dispatchers.IO)
 6 7    @Test
 8    fun `test flow launch`() = runBlocking<Unit> {
 9        events().onEach { println(it) }
10            .launchIn(CoroutineScope(Dispatchers.Default))
11            .join()
12    }
流的取消

流采用与协程同样的协作取消。像往常一样,流的收集可以是当流在一个可取消的挂起函数(例如delay)中挂起的时候取消

流的取消检测

为方便起见,流构建器对每个发射值执行附加的ensureActive检测以进行取消,这意味着从flow{ ...}发出的繁忙循环是可以取消的
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消
通过cancellable操作符来执行此操作,检查是否有被取消

背压

buffer(),并发运行流中发射元素的代码
conflate()合并发射项,不对每个值进行处理
collectLatest()取消并重新发射最后一个值
当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但
buffer()显式地请求缓冲而不改变执行上下文

 1@Test
 2    fun `test flow back`() = runBlocking<Unit> {
 3        val time = measureTimeMillis {
 4            events()
 5                .buffer(50)//可以对比一下没有这一行的区别
 6                .collect {
 7                delay(300)
 8                println("Collect $it")
 9            }
10        }
11        println(time)
12    }

增加缓冲

buffer相当于把管子加长,而且是异步的,但是线程没有切换

如果使用flowOn的话,自带一个缓冲,但是线程是切换了

提高处理效率

conflate之后,会跳过中间的值不要,满足接受的条件

collectLatest()只收集最后一个值

流的操作符
过渡操作符

可以使用操作符转换流,就像使用集合与序列一样
过渡操作符应用于上游流,并返回下游流
这些操作符也是冷操作符,就像流一样。 这类操作符本身不是挂起函数
它运行的速度很快,返回新的转换流的定义

转换操作符

.map就是一次过渡操作

.transform<传入,传出>在里面使用emit发射,从一到多

 1@Test
 2    fun `test transform flow opperator`() = runBlocking<Unit>{
 3        (1..4).asFlow()
 4            .map { it+1 }
 5            .transform<Int,String> {
 6                emit(performRequest(it))
 7                emit("夹层 $it-1")
 8            }
 9            .collect { println(it) }
10    }

限长操作符

take取多少个值

1@Test
2    fun `test transform flow opperator`() = runBlocking<Unit>{
3        (1..4).asFlow()
4            .take(3)
5            .collect { println(it) }
6    }
末端操作符

末端操作符是在流上用于启动流收集的挂起函数,collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
转化为各种集合, 例如toListtoSet toCollection
获取第一个first值与确保流发射单个single值的操作符。
使用reducefold将流规约到单个值。

count计算有多少个值

toList toSet就是转换成集合

single确保只有一个值,如果多余一个值报错

reduce

1@Test
2    fun `test reduce flow`() = runBlocking<Unit>{
3        val value =  (1..4).asFlow()
4            .reduce { accumulator, value ->
5                accumulator - value
6            }
7        println(value)
8    }

accumulate是保存值,然后value是每次的值

fold对比reduce多了一个初始值

1@Test
2    fun `test reduce flow`() = runBlocking<Unit>{
3        val value =  (1..4).asFlow()
4            .fold(0){ acc, value -> 
5            acc+value    
6            }
7        println(value)
8    }
组合操作符

zip来合并两个流,两个流是异步的,时间短的等待时间长的那一个

1@Test
2    fun `test reduce flow`() = runBlocking<Unit>{
3        val flow1 =  (1..8).asFlow().filter { it % 2 == 0 }
4        val flow2 =  flowOf("One","Two","Three")
5        flow1.zip(flow2){ f1,f2 ->
6            "$f1 + $f2"
7        }.collect { println(it) }
8    }
展平操作符

流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式

flatMapConcat连接模式

flatMapMerge合并模式

flatMapLastest最新展平模式

flatMapConcat

 1suspend fun requestFlow(i:Int) = flow<String>{
 2        emit("$i one")
 3        delay(100)
 4        emit("$i two")
 5    }
 6    
 7    @Test
 8    fun `test flatMapConcat`() = runBlocking<Unit> {
 9        val startTime = System.currentTimeMillis()
10        (1..3).asFlow()
11//            .map { requestFlow(it) }//Flow<Flow<String>>,要把这个二维变成一维,使用展平操作符
12            .flatMapConcat { requestFlow(it) }
13            .collect { println("$it  ${System.currentTimeMillis() - startTime} ms") }
14        }

flatMapConcat结果

11 one  17 ms
21 two  123 ms
32 one  124 ms
42 two  235 ms
53 one  235 ms
63 two  346 ms

flatMapMerge结果

11 one  50 ms
22 one  50 ms
33 one  50 ms
41 two  156 ms
52 two  156 ms
63 two  156 ms

如果加一个three在two后面

11 one  45 ms
22 one  46 ms
33 one  46 ms
41 two  144 ms
51 three  144 ms
62 two  147 ms
72 three  147 ms
83 two  147 ms
93 three  147 ms

就是把每一个发射值一起代入进去,以delay()耗时操作作为界限

flatMapLatest

 1suspend fun requestFlow(i:Int) = flow<String>{
 2        emit("$i one")
 3        delay(100)
 4        emit("$i two")
 5        delay(100)
 6        emit("$i three")
 7    }
 8 9    @Test
10    fun `test flatMapLatest`() = runBlocking<Unit> {
11        val startTime = System.currentTimeMillis()
12        (1..3).asFlow()
13//            .map { requestFlow(it) }//Flow<Flow<String>>,要把这个二维变成一维,使用展平操作符
14            .flatMapLatest { requestFlow(it) }
15            .collect { println("$it  ${System.currentTimeMillis() - startTime} ms") }
16        }

只要开始有耗时操作,就只要最新的一个值,中间的值就不要了

流的异常处理

一般使用try/catch块或者catch函数

 1@Test
 2    fun `test flow exception`() = runBlocking<Unit> {
 3        try {
 4            simpleFlow().collect {
 5                println(it)
 6                check(it <= 1) {"Collected $it"}
 7            }
 8        }catch (e:Throwable){
 9            println("Caught $e")
10        }
11    }

这是在下流收集的时候抛出异常

如果是在上流计算的时候,上游的异常会影响下流的运行,如果使用try..catch,就会打破流的规范,所以使用,catch

可以在catch中补发一个,来恢复

 1@Test
 2    fun `test flow exception`() = runBlocking<Unit> {
 3            flow { 
 4                emit(1)
 5                throw ArithmeticException("Div 0")
 6        }.catch{ 
 7                e -> println("Caught $e")
 8                emit(2)
 9            }
10                .flowOn(Dispatchers.IO)
11                .collect { println(it) }
12    }
流的完成

当流收集完成时(普通情况或异常情况),它可能需要执行一个动作
命令式try...finally
onCompletion声明式处理

1@Test
2    fun `test flow finally`() = runBlocking<Unit> {
3        try {
4            (1..3).asFlow().collect { println(it) }
5        }finally {
6            println("已完成")
7        }
8    }
1@Test
2    fun `test flow onCompletion`() = runBlocking<Unit> {
3        (1..3).asFlow().onCompletion { println("已完成") }.collect { println(it) }
4    }

使用onCompletion也可以得到异常信息,也可以拿到下游的异常信息,不能捕获

 1fun simpleFlow() = flow<Int> {
 2        for (i in 1..3){
 3            println("Emit $i")
 4            emit(i)
 5            throw ArithmeticException("Div 0")
 6        }
 7    }
 8    @Test
 9    fun `test flow onCompletion`() = runBlocking<Unit> {
10        simpleFlow().onCompletion {
11            if (it != null) println("!e")
12        }
13            .collect { println(it) }
14    }

Channel通道

基本channel

Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信

image-20210724115304924

这是一个生产者消费者模型

 1@Test
 2    fun `test channel`() = runBlocking<Unit> {
 3        val channel = Channel<Int>()
 4        //生产者
 5        val producer = GlobalScope.launch {
 6            var i = 0
 7            while (true){
 8                delay(1000)
 9                channel.send(++i)
10                println("发送 $i")
11            }
12        }
1314        //消费者
15        val consumer = GlobalScope.launch {
16            while (true){
17                //delay(2000)//当消费者比生产者慢,就让消费者等着
18                val receive = channel.receive()
19                println("接受 $receive")
20            }
21        }
22        producer.start()
23        consumer.start()
24        delay(5000)
25    }

创建一个通道channel

然后在生产者中send,在消费者中receive

Channel实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用receive并取走函数,send就需要挂起.故意让接收端的节奏放慢,发现send总是会挂起,直到receive之后才会继续往下执行

可以指定缓冲区大小,默认为64

val channel = Channel<Int>(20)

Channel有四个默认值

UNLIMITEDINT.MAX

RENDEZVOUS没有缓冲,会等待send

CONFLATED容量为1,不暂停send,如果有新值就删旧的

BUFFERED默认挂起64,非挂起1

迭代channel

channel.iterator()

 1@Test
 2    fun `test channel`() = runBlocking<Unit> {
 3        val channel = Channel<Int>(Channel.BUFFERED)
 4        //生产者
 5        val producer = GlobalScope.launch {
 6            for(i in 1..5){
 7                channel.send(i)
 8                println("发送 $i")
 9            }
10        }
111213        //消费者
14        val consumer = GlobalScope.launch {
15            
16//           for (elment in channel){
17//               println("接受 $elment")
18//               delay(1000)
19//           }
20            
21            val iterator = channel.iterator()
22            while (iterator.hasNext()){
23                val element = iterator.next()
24                println("接受 $element")
25                delay(1000)
26            }
27        }
28        joinAll(producer,consumer)
29    }
生产者消费者的边界构造方法

我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了

反过来,我们可以用actor启动一个消费者协程。

 1@Test
 2    fun `test channel`() = runBlocking<Unit> {
 3       val channel = Channel<Int>(5)
 4       val receiveChannel:ReceiveChannel<Int> =  GlobalScope.produce {
 5            repeat(10){
 6                delay(800)
 7                send(it)
 8            }
 9        }
10        val  consumer = GlobalScope.launch {
11            for (i in receiveChannel){
12                println("接受 $i")
13            }
14        }
15        consumer.join()
16    }

使用作用域.produce快速构造

 1@Test
 2    fun `test channel`() = runBlocking<Unit> {
 3        val sendChannel:SendChannel<Int> = GlobalScope.actor {
 4            while (true){
 5                println("接受 ${receive()}")
 6            }
 7        }
 8        val producer = GlobalScope.launch {
 9            for (i in 0..3){
10                sendChannel.send(i)
11            }
12        }
13        producer.join()
14    }

使用作用域.actor()快速构造,然后在里面使用receive()来得到发送的值

在生产者那使用 send发送

channel的关闭

produceactor返回的Channel都会随着对应的协程执行完毕而关闭,也正是这样,Channel才被称为热数据流
对于一个Channel,如果我们调用了它的close方法,它会立即停止接收新元素,也就是说这时它的isClosedForSend会立即返回true.而由于Channel缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后isClosedForReceive才会返回true
Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭,一般是生产者

BroadcastChannel

发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然,订阅者多个接收端不存在互斥行为

 1@Test
 2    fun `test broadCast`() = runBlocking<Unit> {
 3        val broadcastChannel = BroadcastChannel<Int>(Channel.CONFLATED)
 4        val porducer =  GlobalScope.launch {
 5            List(3){
 6                delay(100)
 7                broadcastChannel.send(it)
 8            }
 9            broadcastChannel.close()
10        }
11        List(2){ index ->
12            GlobalScope.launch {
13                val receiveChannel = broadcastChannel.openSubscription()//订阅
14                for (i in receiveChannel){
15                    println( "接受者$index + 接受 $i")
16                }
17            }
18        }.joinAll()
19        porducer.join()
20    }

broadcastChannel.openSubscription()订阅

协程中的多路复用

多路复用:就是把一根管道,分成几个区域来用,做到一个信道同时传输多路型号

协程多路复用

协程中有点不一样

image-20210724143650846

协程中,是从两个中,接受最快的一个

使用select函数 select<T>,T是返回值类型

 1data class Response<T>(val value: T, val isLocal: Boolean)
 2 3fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
 4    //delay(1000) //故意的延迟
 5    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
 6}
 7 8fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
 9    userServiceApi.getUser(name)
10}
 1@Test
 2    fun `test select await`() = runBlocking<Unit> {
 3        GlobalScope.launch {
 4            val localRequest = getUserFromLocal("xxx")
 5            val remoteRequest = getUserFromRemote("yyy")
 6 7            val userResponse = select<Response<User>> {
 8                localRequest.onAwait { Response(it, true) }
 9                remoteRequest.onAwait { Response(it, false) }
10            }
1112            userResponse.value?.let { println(it) }
13        }.join()
14    }
复用多个Channel

和协程的复用一样,是接受最快的一个

 1@Test
 2    fun `test select channel`() = runBlocking<Unit> {
 3        val channels = listOf(Channel<Int>(), Channel<Int>())
 4        GlobalScope.launch {
 5            delay(100)
 6            channels[0].send(200)
 7        }
 8 9        GlobalScope.launch {
10            delay(50)
11            channels[1].send(100)
12        }
1314        val result = select<Int?> {
15            channels.forEach { channel ->
16                channel.onReceive { it }
17            }
18        }
19        println(result)
20    }
SelectClause

所有能够被select的事件都是SelectClauseN类型, 包括:
SelectClauseO: 对应事件没有返回值,例如join没有返回值,那么onJoin就是SelectClauseN类型。 使用时,onJoin的参数是一个无参函数
SelectClause1: 对应事件有返回值, 前面的onAwaitonReceive都是此类情况
SelectClause2: 对应事件有返回值, 此外还需要一个额外的参数, 例如Channel.onSend有两个参数, 第一个是Channel数据类型的值,表示即将发送的值,第二个是发送成功时的回调参数
如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN类型可回调即可。

 1@Test
 2    fun `test SelectClause0`() = runBlocking<Unit> {
 3        val job1 = GlobalScope.launch {
 4            delay(100)
 5            println("job 1")
 6        }
 7 8        val job2 = GlobalScope.launch {
 9            delay(10)
10            println("job 2")
11        }
1213        select<Unit> {
14            job1.onJoin { println("job 1 onJoin") }
15            job2.onJoin { println("job 2 onJoin") }
16        }
1718        delay(1000)
19    }
 1@Test
 2    fun `test SelectClause2`() = runBlocking<Unit> {
 3        val channels = listOf(Channel<Int>(), Channel<Int>())
 4        println(channels)
 5 6        launch(Dispatchers.IO) {
 7            select<Unit?> {
 8                launch {
 9                    delay(10)
10                    channels[1].onSend(200) { sentChannel ->
11                        println("sent on $sentChannel")
12                    }
13                }
1415                launch {
16                    delay(100)
17                    channels[0].onSend(100) { sentChannel ->
18                        println("sent on $sentChannel")
19                    }
20                }
21            }
22        }
2324        GlobalScope.launch {
25            println(channels[0].receive())
26        }
2728        GlobalScope.launch {
29            println(channels[1].receive())
30        }
3132        delay(1000)
33    }
使用flow模拟多路复用
 1@Test
 2    fun `test select flow`() = runBlocking<Unit> {
 3        // 函数 -> 协程 -> Flow -> Flow合并
 4        val name = "guest"
 5        coroutineScope {
 6            listOf(::getUserFromLocal, ::getUserFromRemote)
 7                    .map { function ->
 8                        function.call(name)
 9                    }.map { deferred ->
10                        flow { emit(deferred.await()) }
11                    }.merge().collect { user -> println(user) }
1213        }
14    }

这里用了反射

这里不一定要用merge(),可以自己自定义过滤

协程并发问题
1@Test
2    fun `test not safe concurrent`() = runBlocking<Unit> {
3        var count = 0
4        List(2000) {
5            GlobalScope.launch { count++ }
6        }.joinAll()
7        println(count)
8    }

这里会出现,访问未更新元素重复的并发问题

安全的方法

1@Test
2    fun `test safe concurrent`() = runBlocking<Unit> {
3        var count = AtomicInteger(0)
4        List(1000) {
5            GlobalScope.launch { count.incrementAndGet() }//这是一个安全的++
6        }.joinAll()
7        println(count.get())
8    }

其他方法

Channel并发安全的消息通道,我们已经非常熟悉。
Mutex轻量级锁,它的lock和unlock从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
Semaphore轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作.当Semaphore的参数为1时,效果等价于Mutex

获得Mutex对象,使用mutex.withLock,和java线程的lock是一样的

 1@Test
 2    fun `test safe concurrent tools`() = runBlocking<Unit> {
 3        var count = 0
 4        val mutex = Mutex()
 5        List(1000) {
 6            GlobalScope.launch {
 7                mutex.withLock {
 8                    count++
 9                }
10            }
11        }.joinAll()
12        println(count)
13    }
 1@Test
 2    fun `test safe concurrent tools2`() = runBlocking<Unit> {
 3        var count = 0
 4        val semaphore = Semaphore(1)
 5        List(10000) {
 6            GlobalScope.launch {
 7                semaphore.withPermit {
 8                    count++
 9                }
10            }
11        }.joinAll()
12        println(count)
13    }

这种就是使用信道,当信道为1的时候效果是一样的,但是就算是2也会有风险

或者规避这种访问,直接对外部的访问(比较极端)

1@Test
2    fun `test avoid access outer variable`() = runBlocking<Unit> {
3        var count = 0
4        val result = count + List(1000){
5            GlobalScope.async { 1 }
6        }.map { it.await() }.sum()
7        println(result)
8    }

标题:Kotlin协程
作者:kuohai
地址:https://udday.cn/articles/2021/07/24/1627114593258.html

充实的一天

取消