Kotlin
协程
协程
协程的挂起和恢复
阻塞:点98号技师,98号正在忙,就只有等着98号
挂起:点98号技师,98号正在忙,让前台记下来,就可以看点FBI电影
常规的函数操作:invoke(call)
和return
协程新增了suspend
和resume
suspend
:也称为挂起或者暂停,用于暂停执行当前协程,并保存所有局部变量
resume
:用于让已暂停的协程从其暂停的地方继续执行
协程的调度器
所有的协程都必须在调度器中运行,即使他们在主线程运行也是如此
Dispatchers.Main
Android是的主线程,用来处理UI交互和一些轻量级的任务
调用suspend函数、调用UI函数、更新LiveData
Dispatchers.IO
:非主线程,专为磁盘和网络IO进行了优化
数据库、文件读写、网络处理
Dispatchers.Default
:非主线程,专为CPU密集型任务进行了优化
数组排序、JSON数据解析、处理差异判断
如果是在使用retrofit
的话,使用的suspend
函数,会自动把函数放在IO
线程中
CoroutineScope
任务泄漏
当某个协程任务丟失,无法追踪,会导致内存、CPU、磁盘等资源浪费,甚至发送一个无用的网络请求, 这种情况称为任务泄漏
为了能够避免协程泄漏,Kotlin
引入了结构化并发机制
使用结构化并发可以做到:
取消任务, 当某项任务不再需要时取消它。
追踪任务, 当任务正在执行是,追踪它。
发出错误信号, 当协程失败时,发出错误信号表明有错误发生
定义协程必须指定其CoroutineScope
, 它会跟踪所有协程, 同样它还可以取消
由它所启动的所有协 程
GlobalScope
:生命周期是process
级别的,即使Activity
或Fragment
已经被销毁, 协程仍然在执行。
MainScope
:在Activity中使用, 可以在onDestroy
中取消协程(设置变量,手动取消)
viewModelScope
:只能在ViewModel
中使用, 绑定ViewModel
的生命周期
lifecycleScope
:只能在Activity、 Fragment
中使用, 会绑定Activity和Fragment
的生命周期。
1class MainActivity:AppCompatActivity(),CoroutineScope by MainScope(){
2 onCreate(){
3 launch{
4
5 }
6 onDestroy(){
7 cancel()
8 }
9 }
10}//通过委托的方式,把MainScope的上下文,给了CoroutineScope,那么就有委托对象的东西了,可以直接用
协程构建器
launch
和async
构建器都是用来启动新协程
launch
,返回一个Job
并且不附带任何结果值
async
,返回一个Deferred
,Deferred
也是一个Job
,可以使用.await()
在一个延期的值上得到他的最终结果,就是能有一个返回值
等待协程作业
launch
返回的Job
使用.join()
等待他结束
async
返回的Job
使用await()
有同样的效果
都是挂起函数,不会堵塞主线程
组合并发
使用async
包含挂起函数得到Job
多个Job
在一行代码里面.await()
就能做到并发
协程的启动模式
DEFAULT
:协程创建之后,立即开始调度,如果在调度之前协程被取消,其将直接进入取消的响应的状态。
ATOMIC
:协程创建之后,立即开始调度,协程执行到第一个挂起点之前不响应取消
LAZY
:只有协程被需要时,包括主动调用协程的start、join、await等函数时才会开始调度,如果调度前就被取消,那么该协程将直接进入异常结束状态。
UNDISPATCHED
: 协程创建后立即在当前函数调用栈中执行,直到遇到第一个真正的挂起的点
ATOMIC
就是在执行delay()
之前是不会响应取消的,就是他之前代码是肯定会完成的
LAZY
就是只有在被需要的时候才会被调用,而不是自动调用
UNDISPATCHED
不会等待调度,而是立即执行
协程的作用域构建器
coroutineScope
和runBlocking
runBlocking
是常规函数,coroutineScope
是挂起函数
他们都会等待其协程体以及所有子协程结束,主要区别在于runBlocking
方法会阻塞当前线程来等待,而coroutineScope
只是挂起,会释放底层线程用于其他用途
coroutineScope
和supervisorScope
coroutineScope
一个协程失败了,所有其他兄弟协程也会被取消
supervisorScope
一个协程失败了,不会影响其他兄弟协程
Job对象以及其生命周期
对于每一个创建的协程(通过launch
或者async
)会返回一个Job
实例, 该实例是协程的唯一标示,并且负责管理协程的生命周期。
一个任务可以包含一系列状态:新创建New
、 活跃Active
、完成中Completing
、 已完成Completed
、取消中Cancelling
和已取消Cancelled
虽然我们无法直接访问这些状态,但是我们可以访问Job
的属性:isActive
、 isCancelled
和isCompleted
如果协程处于活跃状态,协程运行出错或者调用job.cancel()
都会将当前任务置为取消中 Cancelling
状态isActive = false
, isCancelled = true
当所有的子协程都完成后,协程会进入已取消Cancelled
状态, 此时 isCompleted = true
协程的取消
取消作用域会取消它的子协程
被取消的子协程并不会影响其余的兄弟协程
协程通过抛出一个特殊的异常CancellationException
来处理取消操作
所有kotlinx.coroutines
的挂起函数(withContext``delay
等)都是可以取消的
使用launch之后,可以创建返回值变量,对变量进行cancel
或者join
操作,cancel
是取消,join
是等待结束操作,可以使用cancelAndJoin()
来操作。
为什么要cancelAndJoin()
呢,因为取消是ing
,还得等待他完成
一段协程代码必须协作才能被取消,所有的挂起函数都是可以被取消的,必须先检查协程的取消,并在取消的时候抛出CancellationException
然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的
算作异常的,但是异常抛出会被作用域默认认为不是异常,如果使用try...catch...finally
是可以抛出异常的
检查是使用isActive
,是CoroutineScope
中的扩展属性
1launch(Dispatchers.Default){
2while(isActive){
3xxx
4}
在finally
中释放资源
如果直接在finally
块里面调用挂起函数的任何行为都会抛出CancellationException
(因为这里持续运行的代码是可以被取消的)
使用withContext(NonCancellable)
在finally
里面使用协程,只有finally
里面使用协程才可以
超时
使用withTimeout
函数来限制时间,会抛出TimeoutCancellationException
,它是CancellationException
的子类。CancellationException
会被认为是一个结束协程的正常原因。
使用withTimeoutOrNull
如果抛出异常,就返回一个null
3
这一类的就是把超时的代码包装在一个try...catch
里面来代替抛出一个异常
1val result = withTimeoutOrNull(1000L){
2 repeat(400){ i ->
3 println("I'm sleeping &i")
4 delay(400)
5 }
6 "OK"//null返回值
7}
1Resource{
2 init{
3 a++;
4 }
5 fun close(){
6 a--;
7 }
8}
9
10launch{
11 var resource:Resource? = null
12 try{
13 withTimout(60){
14 delay(59)
15 resource = Resource()
16 }
17 }finally{
18 resource?.close()
19 }
20}
释放资源的操作,放入finally当中,否则一旦超时,释放资源的操作就不会进行
CPU密集型任务取消
isActive
是一个可以被使用在CoroutineScope
中的扩展属性,检查Job
是否处于活跃状态。
ensureActive()
,如果job
处于非活跃状态, 这个方法会立即抛出异常。
yield
函数会检查所在协程的状态,如果已经取消,则抛出CancellationException
予以响应。此外,它还会尝试出让线程的执行权,给其他协程提供执行机会。
协程上下文
launch()
每次使用这个的时候都会指定一个协程上下文,由以下组成
CoroutineScope
是一组用于定义协程行为的元素
Job
:控制协程的生命周期
CoroutineDispatcher
:向合适的协程分发任务
CoroutineName
:协程的名称,调试的时候很有用
CoroutineExceptionHandler
:处理未被捕捉的异常
1launch(Dispatchers.Default+CoroutineName("test")){
2 println("workingthread${Yhread.currentThread().name}")
3}
4
5
6val scope = CoroutineScope(Job()+Dispatchers.Default+CoroutineName("test"))
7val job = scope.launch{
8 println("${coroutineContext[Job]} workingthread${Yhread.currentThread().name}")
9 val result = async{
10 println("${coroutineContext[Job]} workingthread${Yhread.currentThread().name}")
11 "OK"
12 }.await()
13 job.join()
14}
这里可以用相加是因为源码进行了运算符重载
对于新创建的协程,他的CoroutineContext
会包含一个全新的Job
实例,它会帮助我们控制协程的生命周期。而剩下的元素会从CoroutimeContext
的父类继承,该父类可能是另一个协程或者创建该协程的CorountineScope
协程上下文的继承
协程的上下文 = 默认值 + 继承的CoroutineContext
+ 参数
一些元素包含默认值:Dispatchers.Default
是默认的CoroutineDispatcher
,以及coroutine
作为默认的CoroutineName
继承的CoroutineContext
是CoroutineScope
或者其父协程的CoroutineContext
传入的协程构建器的参数的优先级高于继承的上下文参数,因此会覆盖对应的参数值
1val coroutineExceptionHandler = CoroutineExceptionHandler{ _,exception ->
2 println("Caught $exception")
3}
4val scope = CoroutineScope(Job() + Dispatchers.Main + coroutineExceptionHandler)
覆盖之后的协程的会是一个新的job,和之前的那个job不一样
异常的传播
协程构建器的有两种形式:自动传播异常(launch
和actor
),向用户暴露异常(async
和produce
)当这些构建器用于创建一个根协程的时候(该协程不是另一个协程的子协程),前者这类构建器,异常会在它发生的第一时间被抛出,而后者则依赖用户来最终消费异常,例如通过await
和receive
其他协程(非根协程)所创建的协程中,产生的异常总是会被传播(直接抛出)
当一个协程由于一个异常而运行失败时,它会传播这个异常并传递给它的父级。接下来父级会进行下面几步操作:
取消它的子级
取消它自己
将异常传播并传递给它的父级
SupervisorJob
使用SupervisorJob
时,一个子协程的运行失败不会影响到其他子协程。SupervisorJob
不会传播异常给它的父级,它会让子协程自己处理异常
这种需求常见于在作用域内定义作业的UI组件,如果任何一个UI的子作业执行失败了,它并不总是有必要取消整个UI组件,但是如果UI组件被销毁了,由于它的结果不再被需要了它就有必要使所有的子作业执行失败。
supervisorScope
和SupervisorJob
效果相同,但是如果在作用域自身执行失败的时候,所有的子协程都会取消
异常的捕获
使用CoroutineExceptionHandler
对协程的异常进行捕获
以下条件满足的时候,异常就会被捕获:
时机:异常是被自动抛出异常的协程所抛出(使用launch
而不是async
时)
位置:在CoroutineScope
的CoroutineContext
中或在一个根协程(CoroutineScope
或者supervisorScope
的直接子协程)中.
不能在非supervisorScope
的子协程哟
捕获异常防止程序崩溃
Android中全局异常处理
全局异常处理器可以获取到所有协程未处理的未捕获异,不过它并不能对异常进行捕获,虽然不能阻止程序崩溃,全局异常处理器在程序调试和异常上报等场景中仍然有非常大的用处
我们需要在classpath
下面创建META-INF/services目录,并在其中创建一个名为kotlinx.coroutines.CoroutineExceptionHandler
的文件,文件内容就是我们的全局异常处理器的全类名(继承CoroutineExceptionHandler
)
取消与异常
取消与异常紧密相关,协程内部使用CancellationException
来进行取消, 这个异常会被
忽略
当子协程被取消时,不会取消它的父协程。
如果一个协程遇到了CancellationException
以外的异常,它将使用该异常取消它的父协程。当父协程的所有子协程都结束后, 异常才会被父协程处理。
异常聚合
当协程的多个子协程因为异常而失败时,一般情况下取第一个异常进行处理.在第一个异常之后发生的所有其他异常,都将被绑定到第一个异常之上
在CoroutineExceptionHanlder
中的exception.suppressed.contentToString()
获得更多其他的异常信息
Flow
1//一次性返回了多个值,同步
2 fun simpleList():List<Int> = listOf(1,2,3)
3
4 //间断返回多个值,同步
5 fun simpleSequence():Sequence<Int> = sequence {
6 for(i in 1..3){
7 Thread.sleep(1000)//假装在计算
8 //delay(1000)//不是谁都能挂起的
9 yield(i)
10 }
11 }
12 //返回多个值,异步,但是是一次的返回多个值
13 suspend fun simpleList2():List<Int>{
14 delay(1000)
15 return listOf(1,2,3)
16 }
17 //间断返回多个值,异步
18 fun simpleFlow() = flow<Int>{
19 for (i in 1..3){
20 delay(1000)
21 emit(i)//发射,产生一个元素
22 }
23 }
名为flow
的Flow
类型构建器函数
flow{...}构建块中的代码可以挂起
函数simpleFlow不再标有suspend修饰符
流使用emit函数发射值
流使用collect函数收集值
冷流
Flow
是一种类似于序列的冷流,flow构建器
直到被调用的时候才会被执行,如果启动两次冷流,那么他们就会执行两次
热流
不管他有没有接受,有就发射
流的连续性
流的每次单独收集都是按顺序执行的,除非使用特殊操作符
从上游到下游每个过渡操作符都会处理每个发射出的值,然后再交给末端操作符
1@Test
2 fun `test flow continuation`() = runBlocking {
3 (1..5).asFlow().filter {//过滤器
4 it % 2 == 0
5 }.map {
6 println("Map $it")
7 "string $it"
8 }.collect {
9 println("Collect $it")
10 }
11 }
流构建器
flowOf
构建器定义了一个发射固定值集的流
使用.asFlow()
扩展函数,可以将各种集合与序列转换为流
流的上下文
流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存
flow{...}
构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)
flowOn
操作符,该函数用于更改流发射的上下文(不能使用withContext
来改变)
启动流
使用launchIn
替代collect
我们可以在单独的协程中启动流的收集
注意launchIn
参数是一个作用域,用作用域包裹调度器就行
1//事件源
2 fun events() = (1..3)
3 .asFlow()
4 .onEach { delay(100) }
5 .flowOn(Dispatchers.IO)
6
7 @Test
8 fun `test flow launch`() = runBlocking<Unit> {
9 events().onEach { println(it) }
10 .launchIn(CoroutineScope(Dispatchers.Default))
11 .join()
12 }
流的取消
流采用与协程同样的协作取消。像往常一样,流的收集可以是当流在一个可取消的挂起函数(例如delay)中挂起的时候取消
流的取消检测
为方便起见,流构建器对每个发射值执行附加的ensureActive
检测以进行取消,这意味着从flow{ ...}
发出的繁忙循环是可以取消的
出于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消
通过cancellable
操作符来执行此操作,检查是否有被取消
背压
buffer()
,并发运行流中发射元素的代码
conflate()
合并发射项,不对每个值进行处理
collectLatest()
取消并重新发射最后一个值
当必须更改CoroutineDispatcher
时,flowOn
操作符使用了相同的缓冲机制,但
是buffer()
显式地请求缓冲而不改变执行上下文
1@Test
2 fun `test flow back`() = runBlocking<Unit> {
3 val time = measureTimeMillis {
4 events()
5 .buffer(50)//可以对比一下没有这一行的区别
6 .collect {
7 delay(300)
8 println("Collect $it")
9 }
10 }
11 println(time)
12 }
增加缓冲
buffer
相当于把管子加长,而且是异步的,但是线程没有切换
如果使用flowOn
的话,自带一个缓冲,但是线程是切换了
提高处理效率
conflate
之后,会跳过中间的值不要,满足接受的条件
collectLatest()
只收集最后一个值
流的操作符
过渡操作符
可以使用操作符转换流,就像使用集合与序列一样
过渡操作符应用于上游流,并返回下游流
这些操作符也是冷操作符,就像流一样。 这类操作符本身不是挂起函数
它运行的速度很快,返回新的转换流的定义
转换操作符
.map
就是一次过渡操作
.transform
<传入,传出>在里面使用emit
发射,从一到多
1@Test
2 fun `test transform flow opperator`() = runBlocking<Unit>{
3 (1..4).asFlow()
4 .map { it+1 }
5 .transform<Int,String> {
6 emit(performRequest(it))
7 emit("夹层 $it-1")
8 }
9 .collect { println(it) }
10 }
限长操作符
take
取多少个值
1@Test
2 fun `test transform flow opperator`() = runBlocking<Unit>{
3 (1..4).asFlow()
4 .take(3)
5 .collect { println(it) }
6 }
末端操作符
末端操作符是在流上用于启动流收集的挂起函数,collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
转化为各种集合, 例如toList
与toSet
toCollection
。
获取第一个first
值与确保流发射单个single
值的操作符。
使用reduce
与fold
将流规约到单个值。
count
计算有多少个值
toList
toSet
就是转换成集合
single
确保只有一个值,如果多余一个值报错
reduce
1@Test
2 fun `test reduce flow`() = runBlocking<Unit>{
3 val value = (1..4).asFlow()
4 .reduce { accumulator, value ->
5 accumulator - value
6 }
7 println(value)
8 }
accumulate
是保存值,然后value
是每次的值
fold
对比reduce
多了一个初始值
1@Test
2 fun `test reduce flow`() = runBlocking<Unit>{
3 val value = (1..4).asFlow()
4 .fold(0){ acc, value ->
5 acc+value
6 }
7 println(value)
8 }
组合操作符
zip
来合并两个流,两个流是异步的,时间短的等待时间长的那一个
1@Test
2 fun `test reduce flow`() = runBlocking<Unit>{
3 val flow1 = (1..8).asFlow().filter { it % 2 == 0 }
4 val flow2 = flowOf("One","Two","Three")
5 flow1.zip(flow2){ f1,f2 ->
6 "$f1 + $f2"
7 }.collect { println(it) }
8 }
展平操作符
流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不同的展平模式
flatMapConcat
连接模式
flatMapMerge
合并模式
flatMapLastest
最新展平模式
flatMapConcat
1suspend fun requestFlow(i:Int) = flow<String>{
2 emit("$i one")
3 delay(100)
4 emit("$i two")
5 }
6
7 @Test
8 fun `test flatMapConcat`() = runBlocking<Unit> {
9 val startTime = System.currentTimeMillis()
10 (1..3).asFlow()
11// .map { requestFlow(it) }//Flow<Flow<String>>,要把这个二维变成一维,使用展平操作符
12 .flatMapConcat { requestFlow(it) }
13 .collect { println("$it ${System.currentTimeMillis() - startTime} ms") }
14 }
flatMapConcat
结果
11 one 17 ms
21 two 123 ms
32 one 124 ms
42 two 235 ms
53 one 235 ms
63 two 346 ms
flatMapMerge
结果
11 one 50 ms
22 one 50 ms
33 one 50 ms
41 two 156 ms
52 two 156 ms
63 two 156 ms
如果加一个three在two后面
11 one 45 ms
22 one 46 ms
33 one 46 ms
41 two 144 ms
51 three 144 ms
62 two 147 ms
72 three 147 ms
83 two 147 ms
93 three 147 ms
就是把每一个发射值一起代入进去,以delay()耗时操作作为界限
flatMapLatest
1suspend fun requestFlow(i:Int) = flow<String>{
2 emit("$i one")
3 delay(100)
4 emit("$i two")
5 delay(100)
6 emit("$i three")
7 }
8
9 @Test
10 fun `test flatMapLatest`() = runBlocking<Unit> {
11 val startTime = System.currentTimeMillis()
12 (1..3).asFlow()
13// .map { requestFlow(it) }//Flow<Flow<String>>,要把这个二维变成一维,使用展平操作符
14 .flatMapLatest { requestFlow(it) }
15 .collect { println("$it ${System.currentTimeMillis() - startTime} ms") }
16 }
只要开始有耗时操作,就只要最新的一个值,中间的值就不要了
流的异常处理
一般使用try/catch块或者catch函数
1@Test
2 fun `test flow exception`() = runBlocking<Unit> {
3 try {
4 simpleFlow().collect {
5 println(it)
6 check(it <= 1) {"Collected $it"}
7 }
8 }catch (e:Throwable){
9 println("Caught $e")
10 }
11 }
这是在下流收集的时候抛出异常
如果是在上流计算的时候,上游的异常会影响下流的运行,如果使用try..catch,就会打破流的规范,所以使用,catch
可以在catch中补发一个,来恢复
1@Test
2 fun `test flow exception`() = runBlocking<Unit> {
3 flow {
4 emit(1)
5 throw ArithmeticException("Div 0")
6 }.catch{
7 e -> println("Caught $e")
8 emit(2)
9 }
10 .flowOn(Dispatchers.IO)
11 .collect { println(it) }
12 }
流的完成
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作
命令式try...finally
块
onCompletion
声明式处理
1@Test
2 fun `test flow finally`() = runBlocking<Unit> {
3 try {
4 (1..3).asFlow().collect { println(it) }
5 }finally {
6 println("已完成")
7 }
8 }
1@Test
2 fun `test flow onCompletion`() = runBlocking<Unit> {
3 (1..3).asFlow().onCompletion { println("已完成") }.collect { println(it) }
4 }
使用onCompletion
也可以得到异常信息,也可以拿到下游的异常信息,不能捕获
1fun simpleFlow() = flow<Int> {
2 for (i in 1..3){
3 println("Emit $i")
4 emit(i)
5 throw ArithmeticException("Div 0")
6 }
7 }
8 @Test
9 fun `test flow onCompletion`() = runBlocking<Unit> {
10 simpleFlow().onCompletion {
11 if (it != null) println("!e")
12 }
13 .collect { println(it) }
14 }
Channel通道
基本channel
Channel实际上是一个并发安全的队列,它可以用来连接协程,实现不同协程的通信
这是一个生产者消费者模型
1@Test
2 fun `test channel`() = runBlocking<Unit> {
3 val channel = Channel<Int>()
4 //生产者
5 val producer = GlobalScope.launch {
6 var i = 0
7 while (true){
8 delay(1000)
9 channel.send(++i)
10 println("发送 $i")
11 }
12 }
13
14 //消费者
15 val consumer = GlobalScope.launch {
16 while (true){
17 //delay(2000)//当消费者比生产者慢,就让消费者等着
18 val receive = channel.receive()
19 println("接受 $receive")
20 }
21 }
22 producer.start()
23 consumer.start()
24 delay(5000)
25 }
创建一个通道channel
然后在生产者中send
,在消费者中receive
Channel
实际上就是一个队列,队列中一定存在缓冲区,那么一旦这个缓冲区满了,并且也一直没有人调用receive
并取走函数,send
就需要挂起.故意让接收端的节奏放慢,发现send
总是会挂起,直到receive
之后才会继续往下执行
可以指定缓冲区大小,默认为64
val channel = Channel<Int>(20)
Channel
有四个默认值
UNLIMITED
INT.MAX
RENDEZVOUS
没有缓冲,会等待send
CONFLATED
容量为1,不暂停send,如果有新值就删旧的
BUFFERED
默认挂起64,非挂起1
迭代channel
channel.iterator()
1@Test
2 fun `test channel`() = runBlocking<Unit> {
3 val channel = Channel<Int>(Channel.BUFFERED)
4 //生产者
5 val producer = GlobalScope.launch {
6 for(i in 1..5){
7 channel.send(i)
8 println("发送 $i")
9 }
10 }
11
12
13 //消费者
14 val consumer = GlobalScope.launch {
15
16// for (elment in channel){
17// println("接受 $elment")
18// delay(1000)
19// }
20
21 val iterator = channel.iterator()
22 while (iterator.hasNext()){
23 val element = iterator.next()
24 println("接受 $element")
25 delay(1000)
26 }
27 }
28 joinAll(producer,consumer)
29 }
生产者消费者的边界构造方法
我们可以通过produce方法启动一个生产者协程,并返回一个ReceiveChannel,其他协程就可以用这个Channel来接收数据了
反过来,我们可以用actor启动一个消费者协程。
1@Test
2 fun `test channel`() = runBlocking<Unit> {
3 val channel = Channel<Int>(5)
4 val receiveChannel:ReceiveChannel<Int> = GlobalScope.produce {
5 repeat(10){
6 delay(800)
7 send(it)
8 }
9 }
10 val consumer = GlobalScope.launch {
11 for (i in receiveChannel){
12 println("接受 $i")
13 }
14 }
15 consumer.join()
16 }
使用作用域.produce
快速构造
1@Test
2 fun `test channel`() = runBlocking<Unit> {
3 val sendChannel:SendChannel<Int> = GlobalScope.actor {
4 while (true){
5 println("接受 ${receive()}")
6 }
7 }
8 val producer = GlobalScope.launch {
9 for (i in 0..3){
10 sendChannel.send(i)
11 }
12 }
13 producer.join()
14 }
使用作用域.actor()
快速构造,然后在里面使用receive()
来得到发送的值
在生产者那使用 send
发送
channel的关闭
produce
和actor
返回的Channel
都会随着对应的协程执行完毕而关闭,也正是这样,Channel
才被称为热数据流
对于一个Channel
,如果我们调用了它的close
方法,它会立即停止接收新元素,也就是说这时它的isClosedForSend
会立即返回true
.而由于Channel
缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后isClosedForReceive
才会返回true
Channel的生命周期最好由主导方来维护,建议由主导的一方实现关闭,一般是生产者
BroadcastChannel
发送端和接收端在Channel中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然,订阅者多个接收端不存在互斥行为
1@Test
2 fun `test broadCast`() = runBlocking<Unit> {
3 val broadcastChannel = BroadcastChannel<Int>(Channel.CONFLATED)
4 val porducer = GlobalScope.launch {
5 List(3){
6 delay(100)
7 broadcastChannel.send(it)
8 }
9 broadcastChannel.close()
10 }
11 List(2){ index ->
12 GlobalScope.launch {
13 val receiveChannel = broadcastChannel.openSubscription()//订阅
14 for (i in receiveChannel){
15 println( "接受者$index + 接受 $i")
16 }
17 }
18 }.joinAll()
19 porducer.join()
20 }
broadcastChannel.openSubscription()
订阅
协程中的多路复用
多路复用:就是把一根管道,分成几个区域来用,做到一个信道同时传输多路型号
协程多路复用
协程中有点不一样
协程中,是从两个中,接受最快的一个
使用select
函数 select<T>
,T是返回值类型
1data class Response<T>(val value: T, val isLocal: Boolean)
2
3fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
4 //delay(1000) //故意的延迟
5 File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
6}
7
8fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
9 userServiceApi.getUser(name)
10}
1@Test
2 fun `test select await`() = runBlocking<Unit> {
3 GlobalScope.launch {
4 val localRequest = getUserFromLocal("xxx")
5 val remoteRequest = getUserFromRemote("yyy")
6
7 val userResponse = select<Response<User>> {
8 localRequest.onAwait { Response(it, true) }
9 remoteRequest.onAwait { Response(it, false) }
10 }
11
12 userResponse.value?.let { println(it) }
13 }.join()
14 }
复用多个Channel
和协程的复用一样,是接受最快的一个
1@Test
2 fun `test select channel`() = runBlocking<Unit> {
3 val channels = listOf(Channel<Int>(), Channel<Int>())
4 GlobalScope.launch {
5 delay(100)
6 channels[0].send(200)
7 }
8
9 GlobalScope.launch {
10 delay(50)
11 channels[1].send(100)
12 }
13
14 val result = select<Int?> {
15 channels.forEach { channel ->
16 channel.onReceive { it }
17 }
18 }
19 println(result)
20 }
SelectClause
所有能够被select
的事件都是SelectClauseN
类型, 包括:
SelectClauseO
: 对应事件没有返回值,例如join
没有返回值,那么onJoin
就是SelectClauseN
类型。 使用时,onJoin
的参数是一个无参函数
SelectClause1
: 对应事件有返回值, 前面的onAwait
和onReceive
都是此类情况
SelectClause2
: 对应事件有返回值, 此外还需要一个额外的参数, 例如Channel.onSend
有两个参数, 第一个是Channel
数据类型的值,表示即将发送的值,第二个是发送成功时的回调参数
如果我们想要确认挂起函数是否支持select,只需要查看其是否存在对应的SelectClauseN
类型可回调即可。
1@Test
2 fun `test SelectClause0`() = runBlocking<Unit> {
3 val job1 = GlobalScope.launch {
4 delay(100)
5 println("job 1")
6 }
7
8 val job2 = GlobalScope.launch {
9 delay(10)
10 println("job 2")
11 }
12
13 select<Unit> {
14 job1.onJoin { println("job 1 onJoin") }
15 job2.onJoin { println("job 2 onJoin") }
16 }
17
18 delay(1000)
19 }
1@Test
2 fun `test SelectClause2`() = runBlocking<Unit> {
3 val channels = listOf(Channel<Int>(), Channel<Int>())
4 println(channels)
5
6 launch(Dispatchers.IO) {
7 select<Unit?> {
8 launch {
9 delay(10)
10 channels[1].onSend(200) { sentChannel ->
11 println("sent on $sentChannel")
12 }
13 }
14
15 launch {
16 delay(100)
17 channels[0].onSend(100) { sentChannel ->
18 println("sent on $sentChannel")
19 }
20 }
21 }
22 }
23
24 GlobalScope.launch {
25 println(channels[0].receive())
26 }
27
28 GlobalScope.launch {
29 println(channels[1].receive())
30 }
31
32 delay(1000)
33 }
使用flow模拟多路复用
1@Test
2 fun `test select flow`() = runBlocking<Unit> {
3 // 函数 -> 协程 -> Flow -> Flow合并
4 val name = "guest"
5 coroutineScope {
6 listOf(::getUserFromLocal, ::getUserFromRemote)
7 .map { function ->
8 function.call(name)
9 }.map { deferred ->
10 flow { emit(deferred.await()) }
11 }.merge().collect { user -> println(user) }
12
13 }
14 }
这里用了反射
这里不一定要用merge(),可以自己自定义过滤
协程并发问题
1@Test
2 fun `test not safe concurrent`() = runBlocking<Unit> {
3 var count = 0
4 List(2000) {
5 GlobalScope.launch { count++ }
6 }.joinAll()
7 println(count)
8 }
这里会出现,访问未更新元素重复的并发问题
安全的方法
1@Test
2 fun `test safe concurrent`() = runBlocking<Unit> {
3 var count = AtomicInteger(0)
4 List(1000) {
5 GlobalScope.launch { count.incrementAndGet() }//这是一个安全的++
6 }.joinAll()
7 println(count.get())
8 }
其他方法
Channel
并发安全的消息通道,我们已经非常熟悉。
Mutex
轻量级锁,它的lock和unlock从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
Semaphore
轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作.当Semaphore的参数为1时,效果等价于Mutex
获得Mutex
对象,使用mutex.withLock
,和java线程的lock
是一样的
1@Test
2 fun `test safe concurrent tools`() = runBlocking<Unit> {
3 var count = 0
4 val mutex = Mutex()
5 List(1000) {
6 GlobalScope.launch {
7 mutex.withLock {
8 count++
9 }
10 }
11 }.joinAll()
12 println(count)
13 }
1@Test
2 fun `test safe concurrent tools2`() = runBlocking<Unit> {
3 var count = 0
4 val semaphore = Semaphore(1)
5 List(10000) {
6 GlobalScope.launch {
7 semaphore.withPermit {
8 count++
9 }
10 }
11 }.joinAll()
12 println(count)
13 }
这种就是使用信道,当信道为1的时候效果是一样的,但是就算是2也会有风险
或者规避这种访问,直接对外部的访问(比较极端)
1@Test
2 fun `test avoid access outer variable`() = runBlocking<Unit> {
3 var count = 0
4 val result = count + List(1000){
5 GlobalScope.async { 1 }
6 }.map { it.await() }.sum()
7 println(result)
8 }