网站设计与制作包括,网站建设推广ppt模板,湘潭今天最新通知,小蓝本企业查询官网1、Channel 与 Flow 简介与对比
所有知识都可总结为一个字 —— 流。包括数据流、事件流、状态流。
开发中最常用的 StateFlow 提供状态订阅。可以将一些信息包进 StateFlow 中进行保存。比如界面上显示的字符串#xff0c;或者系统级别的信息#xff0c;如用户状态。装进 …1、Channel 与 Flow 简介与对比
所有知识都可总结为一个字 —— 流。包括数据流、事件流、状态流。
开发中最常用的 StateFlow 提供状态订阅。可以将一些信息包进 StateFlow 中进行保存。比如界面上显示的字符串或者系统级别的信息如用户状态。装进 StateFlow 中的状态就成为可订阅的状态当状态值发生改变时通知所有订阅的位置这样就能实现界面自动更新之类的自动化操作。
StateFlow 内部使用 SharedFlow 实现。SharedFlow 提供的是事件订阅而不是状态订阅。事件订阅和状态订阅的区别不多但是比较关键。比如在一个事件触发之后再进行事件订阅这个事件原则上无需推送到订阅者一端原则上无需推送但你也可以配置成依然推送。但状态订阅就不同在状态更新之后发生的状态订阅状态仍需要推送给订阅者。
StateFlow 内部使用 Flow 实现。Flow 不是订阅工具而是数据流工具。SharedFlow 底层是事件流模型而 Flow 准确地说并不是事件流而是数据流。数据流与事件流并没有天差地别的不同甚至可以从某个角度看成是一类东西。区分它们是为了应对不同的应用场景。
Channel 与 Flow 并不完全是一个体系的但在实现上它是 Flow 下层的一个关键支撑。Flow 的核心是数据流而 Channel 是协程间协作的工具提供在协程间传递数据的功能。它与 async 提供的功能有点像只不过它可以多次发送数据让其他协程使用而 async 是一次发送数据。如果数据流不需要跨协程那么就应该使用 Flow 而不是 Channel否则会遇到一些性能问题。并且Channel 的 API 在数据流的角度也没有 Flow 的灵活和强大。
2、用 produce() 来提供跨协程的事件流
日常开发中几乎用不到 Channel但是如果在基础架构团队负责给公司造轮子或者担任架构师或技术研究员的位置Channel 的知识是必要的。
Channel 是协程间通信的关键技术点相对比较底层但又没底层到对开发者完全透明的程度。想把 Flow 弄清楚Channel 是绕不过去的东西。
Channel 相当于多条数据版的 async()那我们就从 async() 说起。假如我想在一个协程里不断的发送网络请求的结果给另一个协程使用按照当前我们所掌握的知识可能会写出这种代码
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val deferred scope.async {while (isActive) {gitHub.contributors(square, retrofit)}}launch {delay(1000)println(Contributors: ${deferred.await()})}delay(3000)
}但这个代码是不可用的因为 async() 只能一次性的发送一个数据不能多次发送。 一个比较常见且易于理解的实际场景是股票软件需要实时显示股票的价格因此需要不停的去查询交易所的数据。这个不停查询的动作需要在一个协程中进行然后查询的结果需要在另一个协程中显示在 UI 的文字和股价图表之中。 为了解决这个问题可以对上述代码进行改造使用 produce() 替代 async()
OptIn(ExperimentalCoroutinesApi::class)
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)// 1.使用 produce 启动一个返回 ReceiveChannel 的协程val receiveChannel scope.produce { // this:ProducerScopewhile (isActive) {val data gitHub.contributors(square, retrofit)// 2.使用 send 将数据发送给调用 receive 的协程send(data)}}launch {delay(1000)// 3.接收数据需调用 receivewhile (isActive) {// 持续接收数据println(Contributors: ${receiveChannel.receive()})}}delay(3000)
}这样就可以实现在一个协程中不断获取数据并发送给另一个协程的功能了。
需要注意的一些问题 produce 与 launch 和 async 一样都是协程启动器只不过 produce 启动的协程会生产一个数据流给 Channel再由 Channel 在需要使用数据流的协程中接收数据 produce 后有一个泛型该泛型类型可以通过 send() 发送的数据推导出具体类型 produce 的 block 参数的接收者是 ProducerScope是 CoroutineScope 的子类 ExperimentalCoroutinesApi
public fun E CoroutineScope.produce(context: CoroutineContext EmptyCoroutineContext,capacity: Int 0,// launch 和 async 的 block 参数是 CoroutineScopeBuilderInference block: suspend ProducerScopeE.() - Unit
): ReceiveChannelE produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion null, block block)ProducerScope 是 CoroutineScope 子接口它还继承了 SendChannel 接口 public interface ProducerScopein E : CoroutineScope, SendChannelE {public val channel: SendChannelE
}3、Channel 的工作模式详解
Flow 的很多逻辑和 API 与 Channel 是相通的所以对 Channel 的了解会对学习 Flow 有直接帮助。
前面讲 launch 时我们通过打 log 的方式证明了
public fun CoroutineScope.launch(context: CoroutineContext EmptyCoroutineContext,start: CoroutineStart CoroutineStart.DEFAULT,block: suspend CoroutineScope.() - Unit
): Job {val newContext newCoroutineContext(context)val coroutine if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active true)coroutine.start(start, coroutine, block)return coroutine
}launch 返回的 Job 与 block 参数的 CoroutineScope 是同一个对象。对于 produce 也是类似的
ExperimentalCoroutinesApi
public fun E CoroutineScope.produce(context: CoroutineContext EmptyCoroutineContext,capacity: Int 0,BuilderInference block: suspend ProducerScopeE.() - Unit
): ReceiveChannelE produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion null, block block)produce 返回的 ReceiveChannel 与 block 参数的 ProducerScope 是同一个对象。
再进一步看 produce() 的内容
internal fun E CoroutineScope.produce(context: CoroutineContext EmptyCoroutineContext,capacity: Int 0,onBufferOverflow: BufferOverflow BufferOverflow.SUSPEND,start: CoroutineStart CoroutineStart.DEFAULT,onCompletion: CompletionHandler? null,BuilderInference block: suspend ProducerScopeE.() - Unit
): ReceiveChannelE {// 创建 Channelval channel ChannelE(capacity, onBufferOverflow)val newContext newCoroutineContext(context)// 将 Channel 传入 ProducerCoroutine 协程中val coroutine ProducerCoroutine(newContext, channel)if (onCompletion ! null) coroutine.invokeOnCompletion(handler onCompletion)coroutine.start(start, coroutine, block)return coroutine
}返回值 coroutine 的类型是 ProducerCoroutine它继承了 ChannelCoroutine
private class ProducerCoroutineE(parentContext: CoroutineContext, channel: ChannelE
) : ChannelCoroutineE(parentContext, channel, true, active true), ProducerScopeE {...
}ChannelCoroutine 通过参数传入的 _channel 以及类型声明上的 by _channel 形成接口委托将 Channel 包在协程的内部
internal open class ChannelCoroutineE(parentContext: CoroutineContext,protected val _channel: ChannelE,initParentJob: Boolean,active: Boolean
) : AbstractCoroutineUnit(parentContext, initParentJob, active), ChannelE by _channel {val channel: ChannelE get() this...
}Channel 是 SendChannel 和 ReceiveChannel 的子接口
public interface ChannelE : SendChannelE, ReceiveChannelE实际上这意味着produce() 封装了 Channel由其提供底层支持。我们可以直接使用 Channel 的工厂函数创建 Channel 对象实现同样的操作
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)// 通过 Channel 工厂函数创建 Channel 对象泛型是传输的数据类型val channel ChannelListContributor()// 在一个协程发送数据scope.launch {channel.send(gitHub.contributors(square, retrofit))}// 另一个协程接收数据scope.launch {println(Received data:${channel.receive()}.trimIndent())}delay(3000)
}Channel 的数据结构是一个挂起式的队列。它的功能定位类似于 BlockingQueue只不过 BlockingQueue 在条件不满足时会阻塞线程而 Channel 则是挂起协程。当 Channel 元素满了之后再向 Channel 插入元素会挂起协程来等待空闲位置取数据同理。
正是因为上述的本质Channel 不适合做可订阅的事件流。示例代码
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val channel ChannelListContributor()// 一个发送者两个订阅者发送者的数据只能被一个 send() 接收意味着// 没有一个协程能完整的接收到所有数据scope.launch {channel.send(gitHub.contributors(square, retrofit))}scope.launch {channel.receive()}scope.launch {channel.receive()}delay(3000)
}Channel 是一个队列其他协程来队列取一次数据就把该数据取走了其他协程再来取数据拿到的就是队列中的下一条数据。因此会出现如下情况第一次 send 的数据可能会被第一个 receive 接收到第二次 send 的数据就会被另一个协程的 receive 接收就发一个数据不可能被所有协程接收到。因此当订阅者多于一个的时候所有协程都接收不到完整数据。
其实在 Flow 的 API 诞生之后Channel 已经慢慢退居二线了。现在它最主要的用处是作为 Flow API 的下层支持。比如 Flow 的 Buffer 功能就是用 Channel 实现的用 Channel 把数据缓冲到另一个协程。
4、Channel API 详解
Channel 的 API 很多与 Flow 都是相通的我们先学习 Channel 的后续对 Flow 的 API 理解会有很大帮助。
4.1 Channel 的遍历
之前我们举得例子是通过 while(isActive) 从 Channel 中不断获取数据
private fun getDataByWhile() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val channel ChannelListContributor()scope.launch {channel.send(gitHub.contributors(square, retrofit))}launch {while (isActive) {val contributors channel.receive()println(Contributors: $contributors)}}
}实际上可以通过 for 循环来实现从 Channel 中取数据。for 循环的遍历都是通过对迭代器的实现而实现的ReceiveChannel 在接口中定义了 iterator() 做运算符重载
public interface ReceiveChannelout E {/*** 使用 for 循环返回一个新的迭代器以从此通道接收元素。* 当通道[对于 receive 操作关闭][isClosedForReceive]且没有原因时迭代会正常完成* 如果通道处于失败状态则会抛出原始的 [close][SendChannel.close] 原因异常。*/public operator fun iterator(): ChannelIteratorE
}示例代码
private fun getDataByFor() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val channel ChannelListContributor()scope.launch {channel.send(gitHub.contributors(square, retrofit))}launch {// 挂起式遍历for (contributors in channel) {println(Contributors: $contributors)}}
}Channel 的遍历相比于一般的容器比较特殊它的遍历是挂起式的。它在没有元素时会把协程挂起直到下一个元素出现。当 Channel 关闭后循环遍历也就结束了。
类似地当 Channel 队列满了之后如果再调用 send 这个挂起函数试图向 Channel 中增加元素也会导致协程被挂起直到 Channel 中有空闲位置之后。但是需要注意Channel 队列默认长度为 0也就是说第一次调用 send 会导致协程挂起除非在此之前已经有其他协程调用了 receive。但是先调用 receive 的协程会因为 Channel 中没有元素被挂起。
4.2 Channel 工厂函数
可以通过 Channel 的工厂函数参数指定 Channel 的容量
/**
* 使用指定的缓冲区容量或默认情况下不使用缓冲区创建一个通道。详细信息请参阅 [Channel] 接口文档。
*
* param capacity 正数通道容量或在 [Channel.Factory] 中定义的常量之一。
* param onBufferOverflow 配置缓冲区溢出时的操作可选默认为尝试通过 [send][Channel.send]
* 发送值的 [挂起][BufferOverflow.SUSPEND]仅在 capacity 0 或 capacity Channel.BUFFERED
* 时支持隐式地创建至少一个缓冲元素的通道。
* param onUndeliveredElement 可选函数当元素已发送但未传递给消费者时调用。
* throws IllegalArgumentException 当 [capacity] -2 时抛出。
*/
public fun E Channel(capacity: Int RENDEZVOUS,onBufferOverflow: BufferOverflow BufferOverflow.SUSPEND,onUndeliveredElement: ((E) - Unit)? null
): ChannelE when (capacity) {RENDEZVOUS - {if (onBufferOverflow BufferOverflow.SUSPEND)BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channelelseConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel}CONFLATED - {require(onBufferOverflow BufferOverflow.SUSPEND) {CONFLATED capacity cannot be used with non-default onBufferOverflow}ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)}UNLIMITED - BufferedChannel(UNLIMITED, onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflowsBUFFERED - { // uses default capacity with SUSPENDif (onBufferOverflow BufferOverflow.SUSPEND) BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)}else - {if (onBufferOverflow BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)}}capacity 默认值是 RENDEZVOUS 0此时数据的发送与接收必须通过“见面”的方式完成。 “Rendezvous” 是法语单词意为“约会”或“会面”。在计算机科学领域特别是在并发编程中“rendezvous” 通常用于描述两个或多个进程或线程在某个特定点上同时等待彼此的情况。 在并发编程中“rendezvous” 可以指以下情况之一 进程或线程之间的同步当两个或多个进程或线程需要在某个特定点上同时到达以继续执行这种同步等待的过程可以称为rendezvous。这有助于确保进程或线程在合适的时机相遇并且在适当的时候进行交互或数据传递。消息传递在消息传递模型中“rendezvous” 可以表示发送者和接收者之间的一种同步机制。发送者在发送消息时等待接收者准备好接收消息而接收者在接收消息时等待发送者准备好发送消息以确保消息能够正确传递。 从上述源码不难看出Channel 提供了几种预置的数值供 capacity 选择不同的类型创建不同的 Channel。这几个预置值如下
public interface ChannelE : SendChannelE, ReceiveChannelE {public companion object Factory {// Channel 的 buffer 没有容量限制public const val UNLIMITED: Int Int.MAX_VALUE// Channel 没有 bufferpublic const val RENDEZVOUS: Int 0// 用于创建合并通道相当于使用 [onBufferOverflow DROP_OLDEST][BufferOverflow.DROP_OLDEST]public const val CONFLATED: Int -1// 用于请求具有默认缓冲区容量的缓冲通道。// 对于在溢出时[挂起][BufferOverflow.SUSPEND]的通道默认容量为 64并可通过在 JVM 上设置// [DEFAULT_BUFFER_PROPERTY_NAME] 进行覆盖。// 对于非挂起通道使用容量为 1 的缓冲区。public const val BUFFERED: Int -2}
}观察 BUFFERED 参数的注释它会在不同的溢出处理策略设置不同的容量。这个策略就是 Channel 工厂函数的第二个参数 onBufferOverflow默认值是 BufferOverflow.SUSPEND共有三个值可选
public enum class BufferOverflow {/*** 在缓冲区溢出时挂起*/SUSPEND,/*** 在溢出时删除缓冲区中最旧的值将新值添加到缓冲区不会挂起。*/DROP_OLDEST,/*** 在缓冲区溢出时删除当前正在添加到缓冲区中的最新值以使缓冲区内容保持不变不会挂起。*/DROP_LATEST
}比较常用的是 SUSPEND 与 DROP_OLDEST丢弃队首的旧元素比如我们前面在说 Channel 的第一个参数 capacity 时介绍过有一个可选值 CONFLATED它的作用就相当于创建了容量为 1溢出策略为 BufferOverflow.DROP_OLDEST 的 Channel
fun main() runBlockingUnit {// 二者等价但是如果填了 CONFLATED那么就要求溢出策略必须是默认的 SUSPENDval channel1 ChannelListContributor(1, BufferOverflow.DROP_OLDEST)val channel2 ChannelListContributor(CONFLATED/*, BufferOverflow.SUSPEND*/)
}丢弃数据也是有用处的比如对于不断提供界面数据的 Channel界面会用你提供的最新的数据来更新界面。如果发送数据的频率高于界面处理数据比如经过某种耗时计算之后才能显示到界面的速度可能会出现下游没有把上一条数据处理完上游就发送来多条新的数据堆积在队列中。为了显示最新的数据可以丢弃掉队列中堆积的旧数据直接取最新的一条。 4.3 Channel 的关闭
Channel 的关闭有两种方式close() 与 cancel()。
了解 API 之前先了解什么是关闭为什么要关闭。
Channel 可以是一个事件流也可以是数据流具体是哪种流看你怎么用它。流在某一个时间后可能不再发送或接收数据了那就需要关闭它。比如一个传递网络数据给界面的 Channel在网络数据的持续获取结束之后或者在界面组件从界面里移除之后就可以将 Channel 关闭。
Channel 的两个关闭函数属于两个不同接口Channel 实现了 SendChannel 和 ReceiveChannelclose() 属于 SendChannel
public interface SendChannelin E {DelicateCoroutinesApipublic val isClosedForSend: Booleanpublic suspend fun send(element: E)public fun close(cause: Throwable? null): Boolean
}isClosedForSend 表示是否已经关闭发送功能调用了 close() 之后isClosedForSend 被修改为 true这之后就不允许再调用 send() 了否则就会抛出 ClosedSendChannelException
public class ClosedSendChannelException(message: String?) : IllegalStateException(message)当然由于已经 send() 出去的数据不可能立即、马上就被接收端收到因此缓冲区中的数据可能会存留一段时间。在这段时间内还可以调用 receive() 来接收数据。但是当所有数据都被接收完毕后Channel 会将 ReceiveChannel 这个接口的 isClosedForReceive 修改为 true意思是这之后不能再调用 receive()否则会抛 ClosedReceiveChannelException
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)以上是 Channel 在发送端的关闭过程。下面再看 Channel 在接收端关闭。
假如接收端不再需要接收数据了比如更新界面的协程它需要更新的界面组件被移除了那就不再需要新元素了。这种情况下可以直接调用 ReceiveChannel 的 cancel()它会把 SendChannel 的 isClosedForSend 和 ReceiveChannel 的 isClosedForReceive 都修改为 true禁止调用 send() 和 receive()。如果在 cancel() 之后还调用 send() 或 receive()会抛出 CancellationException用以区分是发送端还是接收端关闭导致的异常。以便针对不同的异常去写业务代码。
close() 也可以传入自定义的异常在触发异常后就会抛这个指定的异常。比如
channel.close(IllegalStateException(Data error!))Channel 在调用 cancel() 之后那些已经发送但是还没被接收的数据就没用了。但是这些数据如果直接丢弃可能会造成某种资源泄漏。比如发送的是文件流
val fileChannel ChannelFileWriter()
fileChannel.send(FileWriter(test.txt))本来应该是接收到 FileWriter 使用完毕后将其关闭的。但是在 Channel 调用 cancel() 之后尚未被接收的 FileWriter 就被丢到外太空去了进而造成资源泄漏。此时需要使用 Channel 的第三个参数 —— onUndeliveredElement() 来处理未交接的元素。
比如对于 FileWriter 而言可以这样处理
// onUndeliveredElement: ((E) - Unit)? null
val fileChannel ChannelFileWriter { it.close() }这样在 FileWriter 被丢弃之前会先执行它的 close() 避免资源泄漏。
4.4 trySend() 与 tryReceive()
这一对函数是 send() 与 receive() 的兄弟函数它们不是挂起函数会瞬时返回。如果因为缓冲满了无法发送数据或者因为缓冲中没有数据而无法接收数据它们都不会等待而是直接返回只不过返回的是失败的结果。
二者的返回值类型是 ChannelResult
JvmInline
public value class ChannelResultout T
PublishedApi internal constructor(PublishedApi internal val holder: Any?) {public val isSuccess: Boolean get() holder !is Failedpublic val isFailure: Boolean get() holder is Failedpublic val isClosed: Boolean get() holder is Closed...
}如果成功发送或接收数据那么 isSuccess 就为 trueisFailure 为 false如果失败的话就反过来。isClosed 是特别的失败类型如果是因为 Channel 关闭而失败则 isClosed 为 true如果是因为缓冲满了暂时没法写或者是缓冲空了暂时没法读这种失败 isClosed 为 false。
4.5 其他函数 // 如果此实例表示成功则返回封装的值如果表示失败则返回 nullSuppress(UNCHECKED_CAST)public fun getOrNull(): T? if (holder !is Failed) holder as T else null// 如果此实例表示成功则返回封装的值如果已关闭或失败则抛出异常public fun getOrThrow(): T {Suppress(UNCHECKED_CAST)if (holder !is Failed) return holder as Tif (holder is Closed holder.cause ! null) throw holder.causeerror(Trying to call getOrThrow on a failed channel result: $holder)}// 如果此实例表示失败则返回封装的异常如果表示成功或对已关闭通道的操作不成功则返回 nullpublic fun exceptionOrNull(): Throwable? (holder as? Closed)?.cause// 发生异常时不抛异常而是将异常封进 ChannelResult 中public suspend fun receiveCatching(): ChannelResultE5、actor()把 SendChannel 暴露出来
前面讲过可以将 Channel 的创建、发送与接收工作分开写
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val channel ChannelInt()scope.launch {for (num in 1..100) {channel.send(num)delay(100)}}scope.launch {for (num in channel) {println(Number: $num)}}delay(10000)
}也可以通过 produce() 将创建和发送操作合并以简化代码
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val channel scope.produce {for (num in 1..100) {send(num)delay(100)}}scope.launch {for (num in channel) {println(Number: $num)}}delay(10000)
}与 produce() 相反的还有一个启动器 actor()
produce() 是启动一个协程将 Channel 创建与发送数据的操作合并在一起返回 ReceiveChannelactor() 也是启动一个协程将 Channel 创建与接收数据的操作合并在一起返回 SendChannel
使用 actor() 改造上述代码
OptIn(ObsoleteCoroutinesApi::class)
fun main() runBlockingUnit {val scope CoroutineScope(EmptyCoroutineContext)val sendChannel scope.actorInt {for (num in this) {println(Number: $num)}}scope.launch {for (num in 1..100) {sendChannel.send(num)delay(100)}}delay(10000)
}需要注意 actor() 是被打了 ObsoleteCoroutinesApi 注解的
/**
* 在协程 API 中标记已过时的声明意味着相应声明的设计存在严重已知缺陷将来会重新设计。
* 大致来说这些声明将来会被弃用但目前还没有替代方案因此不能立即弃用它们。
*/
MustBeDocumented
Retention(value AnnotationRetention.BINARY)
RequiresOptIn(level RequiresOptIn.Level.WARNING)
public annotation class ObsoleteCoroutinesApi也就是说ObsoleteCoroutinesApi 要比 Deprecated 的严重性要弱一点。这意味着 actor() 可以用但是未来可能随着协程 API 的重新设计需要跟随它的变化对 actor() 的用法做出相应的修改。但是现在要使用的话必须带上 OptIn(ObsoleteCoroutinesApi::class)。