网站建设的主要特征,2021网络公司排名,自己设置免费网站设计平台,发卡平台网站建设为了更深入学习协程的底层实现原理#xff0c;了解协程线程切换的根本本质。也为了以后在工作中可以根据不同的需求场景#xff0c;更加随心所欲的使用不同的协程。
今天通过 launch 跟踪一下协程的执行流程。 fun getData() {Trace.beginSection(getData);Log.…为了更深入学习协程的底层实现原理了解协程线程切换的根本本质。也为了以后在工作中可以根据不同的需求场景更加随心所欲的使用不同的协程。
今天通过 launch 跟踪一下协程的执行流程。 fun getData() {Trace.beginSection(getData);Log.e(TAG, getData before Thread.currentThread().name)val demoScope: suspend CoroutineScope.() - Unit {Trace.beginSection(DispatchersIO);Log.e(TAG, getData IO 1 Thread.currentThread().name)Thread.sleep(1000)Log.e(TAG, getData IO 2 Thread.currentThread().name)Trace.endSection();}viewModelScope.launch(Dispatchers.IO, block demoScope)
} 1. 流程图 1.1 从 launch 源码开始
public fun CoroutineScope.launch(context: CoroutineContext EmptyCoroutineContext,start: CoroutineStart CoroutineStart.DEFAULT,block: suspend CoroutineScope.() - Unit
): Job {//1先通过参数Context构造一个新的CoroutineContextval newContext newCoroutineContext(context)val coroutine if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active true)coroutine.start(start, coroutine, block)return coroutine
}
launch 方法有三个参数 context常用的一般是 Dispatchers.DefaultDispatchers.MainDispatchers.UnconfinedDispatchers.IO。 start枚举类型共四种DEFAULTLAZYATOMICUNDISPATCHED block就是 launch 执行的协程体
1.2 我们来看 newCoroutineContext 方法
ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {val combined coroutineContext context//1val debug if (DEBUG) combined CoroutineId(COROUTINE_ID.incrementAndGet()) else combinedreturn if (combined ! Dispatchers.Default combined[ContinuationInterceptor] null)debug Dispatchers.Default else debug
}
刚开始看到代码 1 的号头都是蒙的这是什么鬼不是数字类型为啥能加
其实本质就是调用了 CoroutineContext 的 plus是操作符的重载
/*** Returns a context containing elements from this context and elements from other [context].* The elements from this context with the same key as in the other one are dropped.*/
public operator fun plus(context: CoroutineContext): CoroutineContext if (context EmptyCoroutineContext) this else // fast path -- avoid lambda creationcontext.fold(this) { acc, element -//operation函数体。。。。。。。}
fold 函数比较难理解我们先说结论就是把参数 this 内部与 context 的 key 一样的 CoroutineContext 移除后剩下的 CoroutineContext 与 context 组成新的 CoroutineContext 对象。下边慢慢分析
CoroutineContext 的子类重写 fold 函数的一共有三个 EmptyCoroutineContextCombinedContextElement 上述代码第 6 行已经判断过 context 是 EmptyCoroutineContext。所以当前的 context 不可能是 EmptyCoroutineContext。其 fold 方法直接返回 this。如下
public override fun R fold(initial: R, operation: (R, Element) - R): R initial 是 Element 时。acc 就是 fold 函数参数。element 就是 fold 函数调用者
public override fun R fold(initial: R, operation: (R, Element) - R): R operation(initial, this) 是 CombinedContext 比较复杂
internal class CombinedContext(private val left: CoroutineContext,private val element: Element
) : CoroutineContext, Serializable {public override fun R fold(initial: R, operation: (R, Element) - R): R operation(left.fold(initial, operation), element)
}
要递归调用 fold 函数并重复调用 operation 函数。直到最后调用 Element或者 EmptyCoroutineContext 的 fold 函数。
最终需要分析的都是 Element 的 fold 函数执行情况
context.fold(this) { acc, element -//acc就是fold函数参数。element就是fold函数调用者当前就是Dispatchers.IO//如果acc的key和element的key是相同就返回新的EmptyCoroutineContext//否则就返回accval removed acc.minusKey(element.key) if (removed EmptyCoroutineContext) element else {// make sure interceptor is always last in the context (and thus is fast to get when present)//此时removed为acc的left也就是SupervisorJob//获得removed里key为ContinuationInterceptor.key的分发器。当前为nullval interceptor removed[ContinuationInterceptor]//合并removed和element。也就是SupervisorJobDispatchers.IOif (interceptor null) CombinedContext(removed, element) else {val left removed.minusKey(ContinuationInterceptor)if (left EmptyCoroutineContext) CombinedContext(element, interceptor) elseCombinedContext(CombinedContext(left, element), interceptor)}}
}
小结下
newCoroutineContext 其实就是给自己传递的 context 添加一些附加技能。但是 key 相同的技能只包含一个
比如 ViewModel 中 viewModelScope 的 coroutineContext 的默认值 SupervisorJob Dispatchers.Main.immediate。默认主线程执行并保证如果其中的某个子协程出现异常不会影响子协程
比如切换 dispatcher当前父协程 dispatcher 为 Dispatchers.Main.immediate切换为 Dispatchers.IO
1.3 下面分析 StandaloneCoroutine 的 start 方法
public fun R start(start: CoroutineStart, receiver: R, block: suspend R.() - T) {initParentJob()start(block, receiver, this)
}
internal fun initParentJob() {//当前的parentContext[job]就是SupervisorJobinitParentJobInternal(parentContext[Job])
}
/*** Initializes parent job.* It shall be invoked at most once after construction after all other initialization.*/
internal fun initParentJobInternal(parent: Job?) {assert { parentHandle null }if (parent null) {parentHandle NonDisposableHandlereturn}//start保证parent状态为isActiveparent.start() // make sure the parent is //...
}
CoroutineStart 的 start 就是如下的 invoke 函数
public operator fun R, T invoke(block: suspend R.() - T, receiver: R, completion: ContinuationT): Unit when (this) {DEFAULT - block.startCoroutineCancellable(receiver, completion)ATOMIC - block.startCoroutine(receiver, completion)UNDISPATCHED - block.startCoroutineUndispatched(receiver, completion)LAZY - Unit // will start lazily}
通过这里可以大概猜测一下几种 start 的区别。当前我们只看 DEFAULT
internal fun R, T (suspend (R) - T).startCoroutineCancellable(receiver: R, completion: ContinuationT,onCancellation: ((cause: Throwable) - Unit)? null
) //runSafely就是添加了一个try catchrunSafely(completion) {createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)}
createCoroutineUnintercepted 在文件 kotlin.coroutines.intrinsics.intrinsicsJvm.kt
public actual fun R, T (suspend R.() - T).createCoroutineUnintercepted(receiver: R,completion: ContinuationT
): ContinuationUnit {val probeCompletion probeCoroutineCreated(completion)//当前对象是BaseContinuationImpl的子类return if (this is BaseContinuationImpl)//这个方法在哪create(receiver, probeCompletion)else {createCoroutineFromSuspendFunction(probeCompletion) {(this as Function2R, ContinuationT, Any?).invoke(receiver, it)}}
}
create 方法在哪需要反编译代码才能看的到
public final class MainViewModel extends ViewModel {public static final Companion Companion new Companion(null);private static final String TAG MainViewModel;public final void getData() {Trace.beginSection(getData);StringBuilder stringBuilder new StringBuilder();stringBuilder.append(getData before );stringBuilder.append(Thread.currentThread().getName());Log.e(MainViewModel, stringBuilder.toString());MainViewModel$getData$eeeee$1 mainViewModel$getData$eeeee$1 new MainViewModel$getData$eeeee$1(null);BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)Dispatchers.getIO(), null, mainViewModel$getData$eeeee$1, 2, null);}Metadata(d1 {\000\022\n\002\030\002\n\002\020\000\n\002\b\002\n\002\020\016\n\000\b\003\030\0002\0020\001B\007\b\002¢\006\002\020\002R\016\020\003\032\0020\004XT¢\006\002\n\000¨\006\005}, d2 {Lcom/haier/uhome/coroutine/ui/main/MainViewModel$Companion;, , ()V, TAG, , coroutine_debug}, k 1, mv {1, 6, 0}, xi 48)public static final class Companion {private Companion() {}}Metadata(d1 {\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002H}, d2 {anonymous, , Lkotlinx/coroutines/CoroutineScope;}, k 3, mv {1, 6, 0}, xi 48)DebugMetadata(c com.haier.uhome.coroutine.ui.main.MainViewModel$getData$eeeee$1, f MainViewModel.kt, i {}, l {}, m invokeSuspend, n {}, s {})static final class MainViewModel$getData$eeeee$1 extends SuspendLambda implements Function2CoroutineScope, Continuation? super Unit, Object {int label;MainViewModel$getData$eeeee$1(Continuation? super MainViewModel$getData$eeeee$1 param1Continuation) {super(2, param1Continuation);}public final ContinuationUnit create(Object param1Object, Continuation? param1Continuation) {return (ContinuationUnit)new MainViewModel$getData$eeeee$1((Continuation)param1Continuation);}//。。。。。。。}
}
可以看到我们的协程体其实是一个基础 SuspendLambda 的 class 对象。当调用 create 时用参数 probeCompletion 又构造了一个新的协程体对象
SuspendLambda 的继承关系如下
SuspendLambda--ContinuationImpl--BaseContinuationImpl--ContinuationAny?, CoroutineStackFrame, Serializable
所以 intercepted方法就是调用 ContinuationImpl 内部实现的
public fun intercepted(): ContinuationAny? intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted it }
context[ContinuationInterceptor]此时获得的就是 Dispatchers.IO
其 interceptContinuation 方法如下
public final override fun T interceptContinuation(continuation: ContinuationT): ContinuationT DispatchedContinuation(this, continuation)
把 continuation 封装成了 DispatchedContinuation。其继承关系如下
DispatchedContinuation--DispatchedTask--SchedulerTask--Task--Runnable
需要注意的是 continuation 就是协程体。就是我们要执行的内容
1.4 继续看 resumeCancellableWith 方法
在文件 kotlinx.coroutines.internal.DispatchedContinuation.kt
Suppress(NOTHING_TO_INLINE)
inline fun resumeCancellableWith(result: ResultT,noinline onCancellation: ((cause: Throwable) - Unit)?
) {val state result.toState(onCancellation)//dispatcher就是协程代码传入的分发器//判断是否需要切换通过dispatcher执行当前dispatcher.io,isDispatchNeeded是直接返回trueif (dispatcher.isDispatchNeeded(context)) {//代码1_state stateresumeMode MODE_CANCELLABLEdispatcher.dispatch(context, this)} else {executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {resumeUndispatchedWith(result)}}}
}
dispatcher.dispatch方法就把上边生成的 runnable 放到了线程池队列中
文件 kotlinx.coroutines.scheduling.Dispatcher.kt#LimitingDispatcher
override fun dispatch(context: CoroutineContext, block: Runnable) dispatch(block, false)private fun dispatch(block: Runnable, tailDispatch: Boolean) {var taskToSchedule blockwhile (true) {// Commit in-flight tasks slotval inFlight inFlightTasks.incrementAndGet()// Fast path, if parallelism limit is not reached, dispatch task and returnif (inFlight parallelism) {dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)return}//....}}
2. dispatche 具体是什么呢
流程图如下 2.1 其实是在 Dispatchers.IO 实例化时的参数DefaultScheduler 对象
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {val IO: CoroutineDispatcher LimitingDispatcher(//这里实例化调度器对象this,systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),Dispatchers.IO,TASK_PROBABLY_BLOCKING)//....}
而 DefaultScheduler 内部实例化了一个线程池
2.2 在文件 kotlinx.coroutines.scheduling.Dispatcher.kt
//kotlinx.coroutines.scheduling.Dispatcher.kt#ExperimentalCoroutineDispatcher
override val executor: Executorget() coroutineScheduler
private var coroutineScheduler createScheduler()
private fun createScheduler() CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
dispatcher.dispatchWithContext就是调用线程池的 dispatch把任务放到 globalQueue 队列里我们看一下
在文件 kotlinx.coroutines.scheduling.CoroutineScheduler.kt
internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {try {//coroutineScheduler就是线程池coroutineScheduler.dispatch(block, context, tailDispatch)} catch (e: RejectedExecutionException) {// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved// for testing purposes, so we dont have to worry about cancelling the affected Job here.// TaskContext shouldnt be lost here to properly invoke before/after taskDefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))}
}fun dispatch(block: Runnable, taskContext: TaskContext NonBlockingContext, tailDispatch: Boolean false) {trackTask() // this is needed for virtual time support//当前block就继承之Taskval task createTask(block, taskContext)// try to submit the task to the local queue and act depending on the result//当前线程池不是work所以此时currentWorker返回为nullval currentWorker currentWorker()//local放置失败val notAdded currentWorker.submitToLocalQueue(task, tailDispatch)if (notAdded ! null) {//放到global队列里if (!addToGlobalQueue(notAdded)) {// Global queue is closed in the last step of close/shutdown -- no more tasks should be acceptedthrow RejectedExecutionException($schedulerName was terminated)}}
}
3. 任务具体如何执行
时序图如下 3.1 我们来看 kotlinx.coroutines.scheduling.CoroutineScheduler 文件
private fun runWorker() {var rescanned falsewhile (!isTerminated state ! WorkerState.TERMINATED) {//通过上一步可以知道任务没有放置到local队列mayHaveLocalTasks为falseval task findTask(mayHaveLocalTasks)// Task found. Execute and repeatif (task ! null) {rescanned falseminDelayUntilStealableTaskNs 0LexecuteTask(task)continue} else {mayHaveLocalTasks false}//。。。。。。
}
private fun findAnyTask(scanLocalQueue: Boolean): Task? {/** Anti-starvation mechanism: probabilistically poll either local* or global queue to ensure progress for both external and internal tasks.*/if (scanLocalQueue) {val globalFirst nextInt(2 * corePoolSize) 0if (globalFirst) pollGlobalQueues()?.let { return it }localQueue.poll()?.let { return it }if (!globalFirst) pollGlobalQueues()?.let { return it }} else {//从glocal中取出任务pollGlobalQueues()?.let { return it }}return trySteal(blockingOnly false)
}private fun pollGlobalQueues(): Task? {if (nextInt(2) 0) {globalCpuQueue.removeFirstOrNull()?.let { return it }return globalBlockingQueue.removeFirstOrNull()} else {globalBlockingQueue.removeFirstOrNull()?.let { return it }return globalCpuQueue.removeFirstOrNull()}
}//参数task就是一个runnable
private fun executeTask(task: Task) {val taskMode task.modeidleReset(taskMode)beforeTask(taskMode)//执行task里的run方法runSafely(task)afterTask(taskMode)
}
3.2 Task 的 run 方法的实现在 kotlinx.coroutines.DispatchedTask 里
public final override fun run() {
// should have been set before dispatchingval taskContext this.taskContextvar fatalException: Throwable? nulltry {//...withCoroutineContext(context, delegate.countOrElement) {//。。。。continuation.resume(getSuccessfulResult(state))//。。。。。}} catch (e: Throwable) {// This instead of runCatching to have nicer stacktrace and debug experiencefatalException e} finally {val result runCatching { taskContext.afterTask() }handleFatalException(fatalException, result.exceptionOrNull())}
}
3.3 continuation.resume 在 kotlin.coroutines.Continuation.kt 文件 public inline fun T ContinuationT.resume(value: T): Unit resumeWith(Result.success(value))
3.4 最终执行内容在文件kotlin.coroutines.jvm.internal.ContinuationImpl 里
public final override fun resumeWith(result: ResultAny?) {// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resumevar current thisvar param resultwhile (true) {// Invoke resume debug probe on every resumed continuation, so that a debugging library infrastructure// can precisely track what part of suspended callstack was already resumedprobeCoroutineResumed(current)with(current) {val completion completion!! // fail fast when trying to resume continuation without completionval outcome: ResultAny? try {//执行协程体内容val outcome invokeSuspend(param)if (outcome COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // this state machine instance is terminatingif (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent completionparam outcome} else {// top-level completion reached -- invoke and returncompletion.resumeWith(outcome)return}}}
}
3.5 invokeSuspend 在哪呢还是找不到同样需要反编译查看。就是
public final class MainViewModel extends ViewModel {public static final Companion Companion new Companion(null);private static final String TAG MainViewModel;public final void getData() {Trace.beginSection(getData);StringBuilder stringBuilder new StringBuilder();stringBuilder.append(getData before );stringBuilder.append(Thread.currentThread().getName());Log.e(MainViewModel, stringBuilder.toString());MainViewModel$getData$eeeee$1 mainViewModel$getData$eeeee$1 new MainViewModel$getData$eeeee$1(null);BuildersKt.launch$default(ViewModelKt.getViewModelScope(this), (CoroutineContext)Dispatchers.getIO(), null, mainViewModel$getData$eeeee$1, 2, null);}Metadata(d1 {\000\n\n\000\n\002\020\002\n\002\030\002\020\000\032\0020\001*\0020\002H}, d2 {anonymous, , Lkotlinx/coroutines/CoroutineScope;}, k 3, mv {1, 6, 0}, xi 48)DebugMetadata(c com.haier.uhome.coroutine.ui.main.MainViewModel$getData$eeeee$1, f MainViewModel.kt, i {}, l {}, m invokeSuspend, n {}, s {})static final class MainViewModel$getData$eeeee$1 extends SuspendLambda implements Function2CoroutineScope, Continuation? super Unit, Object {int label;public final Object invokeSuspend(Object param1Object) {IntrinsicsKt.getCOROUTINE_SUSPENDED();if (this.label 0) {ResultKt.throwOnFailure(param1Object);Trace.beginSection(DispatchersIO);param1Object new StringBuilder();param1Object.append(getData IO 1 );param1Object.append(Thread.currentThread().getName());Log.e(MainViewModel, param1Object.toString());Thread.sleep(1000L);param1Object new StringBuilder();param1Object.append(getData IO 2 );param1Object.append(Thread.currentThread().getName());Log.e(MainViewModel, param1Object.toString());Trace.endSection();return Unit.INSTANCE;} throw new IllegalStateException(call to resume before invoke with coroutine);}}
}
到此处协程 launch 内容就执行完了。
4. 总结
其底层使用的就是对线程池的封装把协程体封装到 runnable 里放到线程池执行。使用了的线程池线程复用不必频繁的创建销毁线程等优点。提升了性能
其他的 Dispatcher我就不一一跟踪了有兴趣的同学可以自己跟踪一下。这里简单介绍下我的理解
Dispatchers.Main其内部使用的 MainCoroutineDispatcher把任务放到主线程的 handler 顺序执行
Dispatchers.Default是一个使用 DefaultScheduler 的线程池据说比较适合做逻辑性任务这个我看不出来
Dispatchers.Unconfined跟随父协程的 context直接执行不做线程切换
launch 主要逻辑不是很复杂主要就是线程池的调度。难以跟踪的原因大概是因为源码中到处在使用函数扩展。再加上协程体的具体实现是 kotlin 编译过程中生成的。所以花的时间比较多需要有耐心
5. 团队介绍
「三翼鸟数字化技术平台-场景设计交互平台」主要负责设计工具的研发包括营销设计工具、家电VR设计和展示、水电暖通前置设计能力研发并沉淀素材库构建家居家装素材库集成户型库、全品类产品库、设计方案库、生产工艺模型打造基于户型和风格的AI设计能力快速生成算量和报价同时研发了门店设计师中心和项目中心包括设计师管理能力和项目经理管理能力。实现了场景全生命周期管理同时为水空气厨房等产业提供商机管理工具从而实现了以场景贯穿的B端C端全流程系统。