你好,我是朱涛。今天,我们来分析Kotlin协程当中的Dispatchers。
上节课里,我们分析了launch的源代码,从中我们知道,Kotlin的launch会调用startCoroutineCancellable(),接着又会调用createCoroutineUnintercepted(),最终会调用编译器帮我们生成SuspendLambda实现类当中的create()方法。这样,协程就创建出来了。不过,协程是创建出来了,可它是如何运行的呢?
另外我们也都知道,协程无法脱离线程运行,Kotlin当中所有的协程,最终都是运行在线程之上的。**那么,协程创建出来以后,它又是如何跟线程产生关联的?**这节课,我们将进一步分析launch的启动流程,去发掘上节课我们忽略掉的代码分支。
我相信,经过这节课的学习,你会对协程与线程之间的关系有一个更加透彻的认识。
在上节课里我们学习过,launch{}本质上是调用了startCoroutineCancellable()当中的createCoroutineUnintercepted()方法创建了协程。
// 代码段1public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {// 注意这里// ↓createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}
那么下面,我们就接着上节课的流程,继续分析createCoroutineUnintercepted(completion)之后的 intercepted()方法。
不过,在正式分析intercepted()之前,我们还需要弄清楚Dispatchers、CoroutineDispatcher、ContinuationInterceptor、CoroutineContext之间的关系。
// 代码段2public actual object Dispatchers {public actual val Default: CoroutineDispatcher = DefaultSchedulerpublic actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcherpublic actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfinedpublic val IO: CoroutineDispatcher = DefaultIoSchedulerpublic fun shutdown() { }}public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}public interface ContinuationInterceptor : CoroutineContext.Element {}public interface Element : CoroutineContext {}
在第17讲当中,我们曾经分析过它们之间的继承关系。Dispatchers是一个单例对象,它当中的Default、Main、Unconfined、IO,类型都是CoroutineDispatcher,而它本身就是CoroutineContext。所以,它们之间的关系就可以用下面这个图来描述。
让我们结合这张图,来看看下面这段代码:
// 代码段3fun main() {testLaunch()Thread.sleep(2000L)}private fun testLaunch() {val scope = CoroutineScope(Job())scope.launch{logX("Hello!")delay(1000L)logX("World!")}}/*** 控制台输出带协程信息的log*/fun logX(any: Any?) {println("""================================$anyThread:${Thread.currentThread().name}================================""".trimIndent())}/*输出结果================================Hello!Thread:DefaultDispatcher-worker-1 @coroutine#1================================================================World!Thread:DefaultDispatcher-worker-1 @coroutine#1================================*/
在这段代码中,我们没有为launch()传入任何CoroutineContext参数,但通过执行结果,我们发现协程代码居然执行在DefaultDispatcher,并没有运行在main线程之上。这是为什么呢?
我们可以回过头来分析下launch的源代码,去看看上节课中我们刻意忽略的地方。
// 代码段4public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job {// 1val newContext = newCoroutineContext(context)val coroutine = if (start.isLazy)LazyStandaloneCoroutine(newContext, block) elseStandaloneCoroutine(newContext, active = true)coroutine.start(start, coroutine, block)return coroutine}
首先,请留意launch的第一个参数,context,它的默认值是EmptyCoroutineContext。在第17讲里,我曾提到过,CoroutineContext就相当于Map,而EmptyCoroutineContext则相当于一个空的Map。所以,我们可以认为,这里的EmptyCoroutineContext传了也相当于没有传,它的目的只是为了让context参数不为空而已。这其实也体现出了Kotlin的空安全思维,Kotlin官方用EmptyCoroutineContext替代了null。
接着,请留意上面代码的注释1,这行代码会调用newCoroutineContext(context),将传入的context参数重新包装一下,然后返回。让我们看看它具体的逻辑:
// 代码段5public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {// 1val combined = coroutineContext.foldCopiesForChildCoroutine() + context// 2val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined// 3return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)debug + Dispatchers.Default else debug}
这段代码一共有三个注释,我们来分析一下:
看到这里,你也许会有一个疑问,为什么协程默认的线程池是Dispatchers.Default,而不是Main呢?答案其实也很简单,因为Kotlin协程是支持多平台的,Main线程只在UI编程平台才有可用。因此,当我们的协程没有指定Dispatcher的时候,就只能使用Dispatchers.Default了。毕竟,协程是无法脱离线程执行的。
那么现在,代码段3当中的协程执行在Dispatchers.Default的原因也就找到了:由于我们定义的scope没有指定Dispatcher,同时launch的参数也没有传入Dispatcher,最终在newCoroutineContext()的时候,会被默认指定为Default线程池。
好,有了前面的基础以后,接下来,我们就可以开始intercepted()的逻辑了。
让我们回到课程开头提到过的startCoroutineCancellable()方法的源代码,其中的createCoroutineUnintercepted()方法,我们在上节课已经分析过了,它的返回值类型就是Continuation。而intercepted()方法,其实就是Continuation的扩展函数。
// 代码段6public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {// 注意这里// ↓createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =(this as? ContinuationImpl)?.intercepted() ?: thisinternal abstract class ContinuationImpl(completion: Continuation<Any?>?,private val _context: CoroutineContext?) : BaseContinuationImpl(completion) {constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)@Transientprivate var intercepted: Continuation<Any?>? = null// 1public fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }}
从上面的代码中,我们可以看到,startCoroutineCancellable()当中的intercepted()最终会调用BaseContinuationImpl的intercepted()方法。
这里,请你留意代码中我标记出的注释,intercepted()方法首先会判断它的成员变量 intercepted是否为空,如果为空,就会调用context[ContinuationInterceptor],获取上下文当中的Dispatcher对象。以代码段3当中的逻辑为例,这时候的Dispatcher肯定是Default线程池。
然后,如果我们继续跟进interceptContinuation(this)方法的话,会发现程序最终会调用CoroutineDispatcher的interceptContinuation()方法。
// 代码段7public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {// 1public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)}
同样留意下这里的注释1,interceptContinuation()直接返回了一个DispatchedContinuation对象,并且将this、continuation作为参数传了进去。这里的this,其实就是Dispatchers.Default。
所以,如果我们把startCoroutineCancellable()改写一下,它实际上会变成下面这样:
// 代码段8public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))}// 等价// ↓public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {// 1val continuation = createCoroutineUnintercepted(completion)// 2val dispatchedContinuation = continuation.intercepted()// 3dispatchedContinuation.resumeCancellableWith(Result.success(Unit))}
在上面的代码中,注释1,2我们都已经分析完了,现在只剩下注释3了。这里的resumeCancellableWith(),其实就是真正将协程任务分发到线程上的逻辑。让我们继续跟进分析源代码:
// 代码段9internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {inline fun resumeCancellableWith(result: Result<T>,noinline onCancellation: ((cause: Throwable) -> Unit)?) {// 省略,留到后面分析}}
也就是,DispatchedContinuation是实现了Continuation接口,同时,它使用了“类委托”的语法,将接口的具体实现委托给了它的成员属性continuation。通过之前代码段7的分析,我们知道它的成员属性 dispatcher对应的就是Dispatcher.Default,而成员属性 continuation对应的则是launch当中传入的SuspendLambda实现类。
另外,DispatchedContinuation还继承自DispatchedTask,我们来看看DispatchedTask到底是什么。
internal abstract class DispatchedTask<in T>(@JvmField public var resumeMode: Int) : SchedulerTask() {}internal actual typealias SchedulerTask = Taskinternal abstract class Task(@JvmField var submissionTime: Long,@JvmField var taskContext: TaskContext) : Runnable {constructor() : this(0, NonBlockingContext)inline val mode: Int get() = taskContext.taskMode // TASK_XXX}
可以看到,DispatchedContinuation继承自DispatchedTask,而它则是SchedulerTask的子类,SchedulerTask是Task的类型别名,而Task实现了Runnable接口。因此,DispatchedContinuation不仅是一个Continuation,同时还是一个Runnable。
那么,既然它是Runnable,也就意味着它可以被分发到Java的线程当中去执行了。所以接下来,我们就来看看resumeCancellableWith()当中具体的逻辑:
// 代码段9internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {inline fun resumeCancellableWith(result: Result<T>,noinline onCancellation: ((cause: Throwable) -> Unit)?) {val state = result.toState(onCancellation)// 1if (dispatcher.isDispatchNeeded(context)) {_state = stateresumeMode = MODE_CANCELLABLE// 2dispatcher.dispatch(context, this)} else {// 3executeUnconfined(state, MODE_CANCELLABLE) {if (!resumeCancelled(state)) {resumeUndispatchedWith(result)}}}}}public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {// 默认是truepublic open fun isDispatchNeeded(context: CoroutineContext): Boolean = truepublic abstract fun dispatch(context: CoroutineContext, block: Runnable)}internal object Unconfined : CoroutineDispatcher() {// 只有Unconfined会重写成falseoverride fun isDispatchNeeded(context: CoroutineContext): Boolean = false}
这段代码里也有三个注释,我们来分析一下:
接下来,让我们继续沿着注释2进行分析,这里的dispatcher.dispatch()其实就相当于调用了Dispatchers.Default.dispatch()。让我们看看它的逻辑:
public actual object Dispatchers {@JvmStaticpublic actual val Default: CoroutineDispatcher = DefaultScheduler}internal object DefaultScheduler : SchedulerCoroutineDispatcher(CORE_POOL_SIZE, MAX_POOL_SIZE,IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME) {}
那么,从上面的代码中,我们可以看到,Dispatchers.Default本质上是一个单例对象DefaultScheduler,它是SchedulerCoroutineDispatcher的子类。
我们也来看看SchedulerCoroutineDispatcher的源代码:
internal open class SchedulerCoroutineDispatcher(private val corePoolSize: Int = CORE_POOL_SIZE,private val maxPoolSize: Int = MAX_POOL_SIZE,private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,private val schedulerName: String = "CoroutineScheduler",) : ExecutorCoroutineDispatcher() {private var coroutineScheduler = createScheduler()override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)}
根据以上代码,我们可以看到Dispatchers.Default.dispatch()最终会调用SchedulerCoroutineDispatcher的dispatch()方法,而它实际上调用的是coroutineScheduler.dispatch()。
这里,我们同样再来看看CoroutineScheduler的源代码:
internal class CoroutineScheduler(@JvmField val corePoolSize: Int,@JvmField val maxPoolSize: Int,@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME) : Executor, Closeable {override fun execute(command: Runnable) = dispatch(command)fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {trackTask()// 1val task = createTask(block, taskContext)// 2val currentWorker = currentWorker()// 3val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)if (notAdded != null) {if (!addToGlobalQueue(notAdded)) {throw RejectedExecutionException("$schedulerName was terminated")}}val skipUnpark = tailDispatch && currentWorker != nullif (task.mode == TASK_NON_BLOCKING) {if (skipUnpark) returnsignalCpuWork()} else {signalBlockingWork(skipUnpark = skipUnpark)}}private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }// 内部类 Workerinternal inner class Worker private constructor() : Thread() {}}
你发现了吗?CoroutineScheduler其实是Java并发包下的Executor的子类,它的execute()方法也被转发到了dispatch()。
上面的代码里也有三个注释,我们分别来看看:
那么接下来,我们就来分析下Worker是如何执行Task的。
internal inner class Worker private constructor() : Thread() {override fun run() = runWorker()@JvmFieldvar mayHaveLocalTasks = falseprivate fun runWorker() {var rescanned = falsewhile (!isTerminated && state != WorkerState.TERMINATED) {// 1val task = findTask(mayHaveLocalTasks)if (task != null) {rescanned = falseminDelayUntilStealableTaskNs = 0L// 2executeTask(task)continue} else {mayHaveLocalTasks = false}if (minDelayUntilStealableTaskNs != 0L) {if (!rescanned) {rescanned = true} else {rescanned = falsetryReleaseCpu(WorkerState.PARKING)interrupted()LockSupport.parkNanos(minDelayUntilStealableTaskNs)minDelayUntilStealableTaskNs = 0L}continue}tryPark()}tryReleaseCpu(WorkerState.TERMINATED)}}
实际上,Worker会重写Thread的run()方法,然后把执行流程交给runWorker(),以上代码里有两个关键的地方,我也用注释标记了。
而接下来的逻辑,就是最关键的部分了:
internal inner class Worker private constructor() : Thread() {private fun executeTask(task: Task) {val taskMode = task.modeidleReset(taskMode)beforeTask(taskMode)// 1runSafely(task)afterTask(taskMode)}}fun runSafely(task: Task) {try {// 2task.run()} catch (e: Throwable) {val thread = Thread.currentThread()thread.uncaughtExceptionHandler.uncaughtException(thread, e)} finally {unTrackTask()}}internal abstract class Task(@JvmField var submissionTime: Long,@JvmField var taskContext: TaskContext) : Runnable {constructor() : this(0, NonBlockingContext)inline val mode: Int get() = taskContext.taskMode // TASK_XXX}
在Worker的executeTask()方法当中,会调用runSafely()方法,而在这个方法当中,最终会调用task.run()。前面我们就提到过 Task本质上就是Runnable,而Runnable.run()其实就代表了我们的协程任务真正执行了!
那么,task.run()具体执行的代码是什么呢?其实它是执行的 DispatchedTask.run()。这里的DispatchedTask实际上是DispatchedContinuation的父类。
internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T>) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {public final override fun run() {val taskContext = this.taskContextvar fatalException: Throwable? = nulltry {val delegate = delegate as DispatchedContinuation<T>val continuation = delegate.continuationwithContinuationContext(continuation, delegate.countOrElement) {val context = continuation.contextval state = takeState()val exception = getExceptionalResult(state)val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else nullif (job != null && !job.isActive) {// 1val cause = job.getCancellationException()cancelCompletedResult(state, cause)continuation.resumeWithStackTrace(cause)} else {if (exception != null) {// 2continuation.resumeWithException(exception)} else {// 3continuation.resume(getSuccessfulResult(state))}}}} catch (e: Throwable) {fatalException = e} finally {val result = runCatching { taskContext.afterTask() }handleFatalException(fatalException, result.exceptionOrNull())}}}
上面的代码有三个关键的注释,我们一起来分析:
最后,按照惯例,我还是制作了一个视频,来向你展示整个Dispather的代码执行流程。
这节课,我们围绕着launch,着重分析了它的Dispatchers执行流程。Dispatchers是协程框架中与线程交互的关键,这里面主要涉及以下几个步骤:
经过这节课的学习以后,请问你是否对协程的本质有了更深入的认识?请讲讲你的心得体会吧!