空间安装wordpress后很卡,深圳设计优化公司,wordpress图片主题中文版,织梦网站空间如何清理本文已收录到 AndroidFamily#xff0c;技术和职场问题#xff0c;请关注公众号 [彭旭锐] 提问。 前言
大家好#xff0c;我是小彭。
在上一篇文章里#xff0c;我们聊到了 Square 开源的 I/O 框架 Okio 的三个优势#xff1a;精简且全面的 API、基于共享的缓冲区设计以… 本文已收录到 AndroidFamily技术和职场问题请关注公众号 [彭旭锐] 提问。 前言
大家好我是小彭。
在上一篇文章里我们聊到了 Square 开源的 I/O 框架 Okio 的三个优势精简且全面的 API、基于共享的缓冲区设计以及超时机制。前两个优势已经分析过了今天我们来分析 Okio 的超时检测机制。
本文源码基于 Okio v3.2.0。 思维导图 1. 认识 Okio 的超时机制
超时机制是一项通用的系统设计能够避免系统长时间阻塞在某些任务上。例如网络请求在超时时间内没有响应客户端就会提前中断请求并提示用户某些功能不可用。
1.1 说一下 Okio 超时机制的优势
先思考一个问题相比于传统 IO 的超时有什么优势呢我认为主要体现在 2 个方面
优势 1 - Okio 弥补了部分 IO 操作不支持超时检测的缺陷
Java 原生 IO 操作是否支持超时完全取决于底层的系统调用是否支持。例如网络 Socket 支持通过 setSoTimeout API 设置单次 IO 操作的超时时间而文件 IO 操作就不支持使用原生文件 IO 就无法实现超时。
而 Okio 是统一在应用层实现超时检测不管系统调用是否支持超时都能提供统一的超时检测机制。
优势 2 - Okio 不仅支持单次 IO 操作的超时检测还支持包含多次 IO 操作的复合任务超时检测
Java 原生 IO 操作只能实现对单次 IO 操作的超时检测无法实现对包含多次 IO 操作的复合任务超时检测。例如OkHttp 支持配置单次 connect、read 或 write 操作的超时检测还支持对一次完整 Call 请求的超时检测有时候单个操作没有超时但串联起来的完整 call 却超时了。
而 Okio 超时机制和 IO 操作没有强耦合不仅支持对 IO 操作的超时检测还支持非 IO 操作的超时检测所以这种复合任务的超时检测也是可以实现的。
1.2 Timeout 类的作用
Timeout 类是 Okio 超时机制的核心类Okio 对 Source 输入流和 Sink 输出流都提供了超时机制我们在构造 InputStreamSource 和 OutputStreamSink 这些流的实现类时都需要携带 Timeout 对象
Source.kt
interface Source : Closeable {// 返回超时控制对象fun timeout(): Timeout...
}Sink.kt
actual interface Sink : Closeable, Flushable {// 返回超时控制对象actual fun timeout(): Timeout...
}Timeout 类提供了两种配置超时时间的方式如果两种方式同时存在的话Timeout 会优先采用更早的截止时间
1、timeoutNanos 任务处理时间 设置处理单次任务的超时时间
最终触发超时的截止时间是任务的 startTime timeoutNanos
2、deadlineNanoTime 截止时间 直接设置未来的某个时间点多个任务整体的超时时间点。
Timeout.kt
// hasDeadline 这个属性显得没必要
private var hasDeadline false // 是否设置了截止时间点
private var deadlineNanoTime 0L // 截止时间点单位纳秒
private var timeoutNanos 0L // 处理单次任务的超时时间单位纳秒创建 Source 和 Sink 对象时都需要携带 Timeout 对象
JvmOkio.kt
// ----------------------------------------------------------------------------
// 输入流
// ----------------------------------------------------------------------------fun InputStream.source(): Source InputStreamSource(this, Timeout() /*Timeout 对象*/)// 文件输入流
fun File.source(): Source InputStreamSource(inputStream(), Timeout.NONE)// Socket 输入流
fun Socket.source(): Source {val timeout SocketAsyncTimeout(this)val source InputStreamSource(getInputStream(), timeout /*携带 Timeout 对象*/)// 包装为异步超时return timeout.source(source)
}// ----------------------------------------------------------------------------
// 输出流
// ----------------------------------------------------------------------------fun OutputStream.sink(): Sink OutputStreamSink(this, Timeout() /*Timeout 对象*/)// 文件输出流
fun File.sink(append: Boolean false): Sink FileOutputStream(this, append).sink()// Socket 输出流
fun Socket.sink(): Sink {val timeout SocketAsyncTimeout(this)val sink OutputStreamSink(getOutputStream(), timeout /*携带 Timeout 对象*/)// 包装为异步超时return timeout.sink(sink)
}在 Timeout 类的基础上Okio 提供了 2 种超时机制
Timeout 是同步超时AsyncTimeout 是异步超时
Okio 框架 2. Timeout 同步超时
Timeout 同步超时依赖于 Timeout#throwIfReached() 方法。
同步超时在每次执行任务之前都需要先调用 Timeout#throwIfReached() 检查当前时间是否到达超时截止时间。如果超时则会直接抛出超时异常不会再执行任务。
JvmOkio.kt
private class InputStreamSource(// 输入流private val input: InputStream,// 超时控制private val timeout: Timeout
) : Source {override fun read(sink: Buffer, byteCount: Long): Long {// 1、参数校验if (byteCount 0L) return 0require(byteCount 0) { byteCount 0: $byteCount }// 2、检查超时时间timeout.throwIfReached()// 3、执行输入任务已简化val bytesRead input.read(...)return bytesRead.toLong()}...
}private class OutputStreamSink(// 输出流private val out: OutputStream,// 超时控制private val timeout: Timeout
) : Sink {override fun write(source: Buffer, byteCount: Long) {// 1、参数校验checkOffsetAndCount(source.size, 0, byteCount)// 2、检查超时时间timeout.throwIfReached()// 3、执行输入任务已简化out.write(...)...}...
}看一眼 Timeout#throwIfReached 的源码。 可以看到同步超时只考虑 “deadlineNanoTime 截止时间”如果只设置 “timeoutNanos 任务处理时间” 是无效的我觉得这个设计容易让开发者出错。
Timeout.kt
Throws(IOException::class)
open fun throwIfReached() {if (Thread.interrupted()) {// 传递中断状态Thread.currentThread().interrupt() // Retain interrupted status.throw InterruptedIOException(interrupted)}if (hasDeadline deadlineNanoTime - System.nanoTime() 0) {// 抛出超时异常throw InterruptedIOException(deadline reached)}
}有必要解释所谓 “同步” 的意思
同步超时就是指任务的 “执行” 和 “超时检查” 是同步的。当任务超时时Okio 同步超时不会直接中断任务执行而是需要检主动查超时时间Timeout#throwIfReached来判断是否发生超时再决定是否中断任务执行。
这其实与 Java 的中断机制是非常相似的
当 Java 线程的中断标记位置位时并不是真的会直接中断线程执行而是主动需要检查中断标记位Thread.interrupted来判断是否发生中断再决定是否中断线程任务。所以说 Java 的线程中断机制是一种 “同步中断”。
可以看出同步超时存在 “滞后性”
因为同步超时需要主动检查所以即使在任务执行过程中发生超时也必须等到检查时才会发现超时无法及时触发超时异常。因此就需要异步超时机制。
同步超时示意图 3. AsyncTimeout 异步超时 异步超时监控进入 异步超时在每次执行任务之前都需要先调用 AsyncTimeout#enter() 方法将 AsyncTimeout 挂载到超时队列中并根据超时截止时间的先后顺序排序队列头部的节点就是会最先超时的任务 异步超时监控退出 在每次任务执行结束之后都需要再调用 AsyncTimeout#exit() 方法将 AsyncTimeout 从超时队列中移除。
注意 enter() 方法和 eixt() 方法必须成对存在。
AsyncTimeout.kt
open class AsyncTimeout : Timeout() {// 是否在等待队列中private var inQueue false// 后续指针private var next: AsyncTimeout? null// 超时截止时间private var timeoutAt 0L// 异步超时监控进入fun enter() {check(!inQueue) { Unbalanced enter/exit }val timeoutNanos timeoutNanos()val hasDeadline hasDeadline()if (timeoutNanos 0L !hasDeadline) {return}inQueue truescheduleTimeout(this, timeoutNanos, hasDeadline)}// 异步超时监控退出// 返回值是否发生超时如果节点不存在说明被 WatchDog 线程移除即发生超时fun exit(): Boolean {if (!inQueue) return falseinQueue falsereturn cancelScheduledTimeout(this)}// 在 WatchDog 线程调用protected open fun timedOut() {}companion object {// 超时队列头节点哨兵节点private var head: AsyncTimeout? null// 分发超时监控任务private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {synchronized(AsyncTimeout::class.java) {// 首次添加监控时需要启动 Watchdog 线程if (head null) {// 哨兵节点head AsyncTimeout()Watchdog().start()}// now当前时间val now System.nanoTime()// timeoutAt 超时截止时间计算 now timeoutNanos 和 deadlineNanoTime 的较小值if (timeoutNanos ! 0L hasDeadline) {node.timeoutAt now minOf(timeoutNanos, node.deadlineNanoTime() - now)} else if (timeoutNanos ! 0L) {node.timeoutAt now timeoutNanos} else if (hasDeadline) {node.timeoutAt node.deadlineNanoTime()} else {throw AssertionError()}// remainingNanos 超时剩余时间当前时间距离超时发生的时间val remainingNanos node.remainingNanos(now)var prev head!!// 线性遍历超时队列按照超时截止时间将 node 节点插入超时队列while (true) {if (prev.next null || remainingNanos prev.next!!.remainingNanos(now)) {node.next prev.nextprev.next node// 如果插入到队列头部需要唤醒 WatchDog 线程if (prev head) {(AsyncTimeout::class.java as Object).notify()}break}prev prev.next!!}}}// 取消超时监控任务// 返回值是否超时private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {synchronized(AsyncTimeout::class.java) {// 线性遍历超时队列将 node 节点移除var prev headwhile (prev ! null) {if (prev.next node) {prev.next node.nextnode.next nullreturn false}prev prev.next}// 如果节点不存在说明被 WatchDog 线程移除即发生超时return true}}}
}同时在首次添加异步超时监控时AsyncTimeout 内部会开启一个 WatchDog 守护线程按照 “检测 - 等待” 模型观察超时队列的头节点 如果发生超时则将头节点移除并回调 AsyncTimeout#timeOut() 方法。这是一个空方法需要由子类实现来主动取消任务 如果未发生超时则 WatchDog 线程会计算距离超时发生的时间间隔调用 Object#wait(时间间隔) 进入限时等待。
需要注意的是 AsyncTimeout#timeOut() 回调中不能执行耗时操作否则会影响后续检测的及时性。
有意思的是我们会发现 Okio 的超时检测机制和 Android ANR 的超时检测机制非常类似所以我们可以说 ANR 也是一种异步超时机制。
AsyncTimeout.kt
private class Watchdog internal constructor() : Thread(Okio Watchdog) {init {// 守护线程isDaemon true}override fun run() {// 死循环while (true) {try {var timedOut: AsyncTimeout? nullsynchronized(AsyncTimeout::class.java) {// 取头节点Maybe waittimedOut awaitTimeout()// 超时队列为空退出线程if (timedOut head) {head nullreturn}}// 超时发生触发 AsyncTimeout#timedOut 回调timedOut?.timedOut()} catch (ignored: InterruptedException) {}}}
}companion object {// 超时队列为空时再等待一轮的时间private val IDLE_TIMEOUT_MILLIS TimeUnit.SECONDS.toMillis(60)private val IDLE_TIMEOUT_NANOS TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)Throws(InterruptedException::class)internal fun awaitTimeout(): AsyncTimeout? {// Get the next eligible node.val node head!!.next// 如果超时队列为空if (node null) {// 需要再等待 60s 后再判断例如在首次添加监控时val startNanos System.nanoTime()(AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)return if (head!!.next null System.nanoTime() - startNanos IDLE_TIMEOUT_NANOS) {// 退出 WatchDog 线程head} else {// WatchDog 线程重新取一次null}}// 计算当前时间距离超时发生的时间var waitNanos node.remainingNanos(System.nanoTime())// 未超时进入限时等待if (waitNanos 0) {// Waiting is made complicated by the fact that we work in nanoseconds,// but the API wants (millis, nanos) in two arguments.val waitMillis waitNanos / 1000000LwaitNanos - waitMillis * 1000000L(AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())return null}// 超时将头节点移除head!!.next node.nextnode.next nullreturn node}
}异步超时示意图 直接看代码不好理解我们来举个例子 4. 举例OkHttp Call 的异步超时监控
在 OkHttp 中支持配置一次完整的 Call 请求上的操作时间 callTimeout。一次 Call 请求包含多个 IO 操作的复合任务使用传统 IO 是不可能监控超时的所以需要使用 AsyncTimeout 异步超时。
在 OkHttp 的 RealCall 请求类中就使用了 AsyncTimeout 异步超时 1、开始任务 在 execute() 方法中调用 AsyncTimeout#enter() 进入异步超时监控再执行请求 2、结束任务 在 callDone() 方法中调用 AsyncTimeout#exit() 退出异步超时监控。分析源码发现callDone() 不仅在请求正常时会调用在取消请求时也会回调保证了 enter() 和 exit() 成对存在 3、超时回调 在 AsyncTimeout#timeOut 超时回调中调用了 Call#cancel() 提前取消请求。Call#cancel() 会调用到 Socket#close()让阻塞中的 IO 操作抛出 SocketException 异常以达到提前中断的目的最终也会走到 callDone() 执行 exit() 退出异步监控。
Call 超时监控示意图 RealCall
class RealCall(val client: OkHttpClient,/** The applications original request unadulterated by redirects or auth headers. */val originalRequest: Request,val forWebSocket: Boolean
) : Call {// 3、AsyncTimeout 超时监控private val timeout object : AsyncTimeout() {override fun timedOut() {// 取消请求cancel()}}.apply {timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)}// 取消请求override fun cancel() {if (canceled) return // Already canceled.canceled trueexchange?.cancel()// 最终会调用 Socket#close()connectionToCancel?.cancel()eventListener.canceled(this)}// 1、请求开始由业务层调用override fun execute(): Response {// 1.1 异步超时监控进入timeout.enter()// 1.2 执行请求client.dispatcher.executed(this)return getResponseWithInterceptorChain()}// 2、请求结束由 OkHttp 引擎层调用包含正常和异常情况// 除了 IO 操作在抛出异常后会走到 callDone()在取消请求时也会走到 callDone()internal fun E : IOException? messageDone(exchange: Exchange,requestDone: Boolean, // 请求正常结束responseDone: Boolean, // 响应正常结束e: E): E {...if (callDone) {return callDone(e)}return e}private fun E : IOException? callDone(e: E): E {...// 检查是否超时val result timeoutExit(e)if (e ! null) {// 请求异常包含超时异常eventListener.callFailed(this, result!!)} else {// 请求正常结束eventListener.callEnd(this)}return result}private fun E : IOException? timeoutExit(cause: E): E {if (timeoutEarlyExit) return cause// 2.1 异步超时监控退出if (!timeout.exit()) return cause// 2.2 包装超时异常val e InterruptedIOException(timeout)if (cause ! null) e.initCause(cause)return e as E}
}调用 Socket#close() 会让阻塞中的 IO 操作抛出 SocketException 异常
Socket.java
// Any thread currently blocked in an I/O operation upon this socket will throw a {link SocketException}.
public synchronized void close() throws IOException {synchronized(closeLock) {if (isClosed())return;if (created)impl.close();closed true;}
}Exchange 中会捕获 Socket#close() 抛出的 SocketException 异常
Exchange.kt
private inner class RequestBodySink(delegate: Sink,/** The exact number of bytes to be written, or -1L if that is unknown. */private val contentLength: Long
) : ForwardingSink(delegate) {Throws(IOException::class)override fun write(source: Buffer, byteCount: Long) {...try {super.write(source, byteCount)this.bytesReceived byteCount} catch (e: IOException) {// Socket#close() 会抛出异常被这里拦截throw complete(e)}}private fun E : IOException? complete(e: E): E {if (completed) return ecompleted truereturn bodyComplete(bytesReceived, responseDone false, requestDone true, e e)}
}fun E : IOException? bodyComplete(bytesRead: Long,responseDone: Boolean,requestDone: Boolean,e: E
): E {...// 回调到上面的 RealCall#messageDonereturn call.messageDone(this, requestDone, responseDone, e)
}5. OkHttp 超时检测总结
先说一下 Okhttp 定义的 2 种颗粒度的超时
第 1 种是在单次 connect、read 或 write 操作上的超时第 2 种是在一次完整的 call 请求上的超时有时候单个操作没有超时但连接起来的完整 call 却超时。
其实 Socket 支持通过 setSoTimeout API 设置单次操作的超时时间但这个 API 无法满足需求比如说 Call 超时是包含多个 IO 操作的复合任务而且不管是 HTTP/1 并行请求还是 HTTP/2 多路复用都会存在一个 Socket 连接上同时承载多个请求的情况无法区分是哪个请求超时。
因此OkHttp 采用了两种超时监测
对于 connect 操作OkHttp 继续使用 Socket 级别的超时没有问题对于 call、read 和 write 的超时OkHttp 使用一个 Okio 的异步超时机制来监测超时。 参考资料
Github · OkioOkio 官网