Skip to content

Commit e06b394

Browse files
Introduce WorkStealingDispatcher.
1 parent 72b0e22 commit e06b394

File tree

9 files changed

+1081
-0
lines changed

9 files changed

+1081
-0
lines changed

build.gradle.kts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import com.squareup.workflow1.buildsrc.shardConnectedCheckTasks
2+
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
23
import org.jetbrains.dokka.gradle.AbstractDokkaLeafTask
34
import java.net.URL
45

@@ -101,6 +102,15 @@ subprojects {
101102
.configureEach { mustRunAfter(tasks.matching { it is Sign }) }
102103
}
103104

105+
subprojects {
106+
tasks.withType(Test::class.java)
107+
.configureEach {
108+
testLogging {
109+
exceptionFormat = FULL
110+
}
111+
}
112+
}
113+
104114
// This task is invoked by the documentation site generator script in the main workflow project (not
105115
// in this repo), which also expects the generated files to be in a specific location. Both the task
106116
// name and destination directory are defined in this script:

workflow-runtime/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ kotlin {
1717
if (targets == "kmp" || targets == "js") {
1818
js(IR) { browser() }
1919
}
20+
21+
compilerOptions.freeCompilerArgs.add("-Xexpect-actual-classes")
2022
}
2123

2224
dependencies {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.squareup.workflow1.internal
2+
3+
import platform.Foundation.NSLock
4+
5+
internal actual typealias Lock = NSLock
6+
7+
internal actual inline fun <R> Lock.withLock(block: () -> R): R {
8+
lock()
9+
try {
10+
return block()
11+
} finally {
12+
unlock()
13+
}
14+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.squareup.workflow1.internal
2+
3+
internal expect class Lock()
4+
5+
internal expect inline fun <R> Lock.withLock(block: () -> R): R
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
package com.squareup.workflow1.internal
2+
3+
import com.squareup.workflow1.internal.WorkStealingDispatcher.Companion.wrapDispatcherFrom
4+
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Dispatchers
6+
import kotlinx.coroutines.ExperimentalCoroutinesApi
7+
import kotlinx.coroutines.Runnable
8+
import kotlin.concurrent.Volatile
9+
import kotlin.coroutines.Continuation
10+
import kotlin.coroutines.ContinuationInterceptor
11+
import kotlin.coroutines.CoroutineContext
12+
import kotlin.coroutines.resume
13+
14+
private typealias RunQueue = ArrayList<Runnable>
15+
16+
/**
17+
* A [CoroutineDispatcher] that delegates to another dispatcher but allows stealing any work
18+
* scheduled on this dispatcher and performing it synchronously by calling [advanceUntilIdle].
19+
*
20+
* The easiest way to create one is by calling [wrapDispatcherFrom].
21+
*
22+
* E.g.
23+
* ```
24+
* val dispatcher = WorkStealingDispatcher.wrappingDispatcherFrom(scope.coroutineContext)
25+
* scope.launch(dispatcher) {
26+
* while (true) {
27+
* lots()
28+
* of()
29+
* suspending()
30+
* calls()
31+
* }
32+
* }
33+
* …
34+
* dispatcher.advanceUntilIdle()
35+
* ```
36+
*
37+
* @param delegateInterceptor The [CoroutineDispatcher] or other [ContinuationInterceptor] to
38+
* delegate scheduling behavior to. This can either be a confined or unconfined dispatcher, and its
39+
* behavior will be preserved transparently.
40+
*/
41+
internal class WorkStealingDispatcher(
42+
private val delegateInterceptor: ContinuationInterceptor
43+
) : CoroutineDispatcher() {
44+
45+
companion object {
46+
/**
47+
* The initial storage capacity for the task queue. We use a small queue capacity since in most
48+
* cases the queue should be processed very soon after enqueuing.
49+
*/
50+
private const val INITIAL_QUEUE_CAPACITY = 3
51+
52+
/**
53+
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
54+
* [context]. If the context does not specify a dispatcher, [Dispatchers.Default] is used.
55+
*/
56+
fun wrapDispatcherFrom(context: CoroutineContext): WorkStealingDispatcher {
57+
// If there's no dispatcher in the context then the coroutines runtime will fall back to
58+
// Dispatchers.Default anyway.
59+
val baseDispatcher = context[ContinuationInterceptor] ?: Dispatchers.Default
60+
return WorkStealingDispatcher(delegateInterceptor = baseDispatcher)
61+
}
62+
}
63+
64+
/** Used to synchronize access to the mutable properties of this class. */
65+
private val lock = Lock()
66+
67+
// region Access to these properties must always be synchronized with lock.
68+
/**
69+
* The queue of unconsumed work items. When there is no contention on the dispatcher, only one
70+
* queue will ever be allocated. Only when [dispatch] is called while the queue is being processed
71+
* (either by [advanceUntilIdle] or a [DispatchContinuation]) then a new queue will be allocated,
72+
* but when the processing is done the old one will be placed back here to be re-used.
73+
*/
74+
@Volatile
75+
private var queue: RunQueue? = null
76+
77+
@Volatile
78+
private var dispatchScheduled = false
79+
80+
/**
81+
* Cached [DispatchContinuation] used to delegate to the [delegateInterceptor]'s dispatching
82+
* behavior from [dispatch]. This is initialized the first call to [dispatch] that needs dispatch,
83+
* and then never changed.
84+
*/
85+
@Volatile
86+
private var dispatchContinuation: DispatchContinuation? = null
87+
// endregion
88+
89+
/**
90+
* Always returns true since we always need to track what work is waiting so we can advance it.
91+
*/
92+
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
93+
94+
override fun dispatch(
95+
context: CoroutineContext,
96+
block: Runnable
97+
) {
98+
var continuation: DispatchContinuation? = null
99+
lock.withLock {
100+
val queue = this.queue ?: RunQueue(INITIAL_QUEUE_CAPACITY).also { this.queue = it }
101+
queue += block
102+
println("OMG dispatch: queue size = ${queue.size}")
103+
104+
// If no dispatch is currently scheduled, then flag that we're handling it, and schedule one
105+
// outside the critical section.
106+
if (!dispatchScheduled) {
107+
println("OMG scheduling dispatch")
108+
dispatchScheduled = true
109+
continuation = dispatchContinuation ?: DispatchContinuation()
110+
.also { dispatchContinuation = it }
111+
}
112+
}
113+
114+
// Trampoline the dispatch outside the critical section to avoid deadlocks.
115+
// This will either synchronously run block or dispatch it, depending on what resuming a
116+
// continuation on the delegate dispatcher would do.
117+
continuation?.resumeOnDelegateDispatcher()
118+
}
119+
120+
/**
121+
* Throws [UnsupportedOperationException]. We can't allow the default implementation to run in
122+
* case the delegate dispatcher would throw.
123+
*/
124+
@ExperimentalCoroutinesApi
125+
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
126+
// We could probably support this by forwarding the call to the delegate then wrapping that
127+
// dispatcher with a WSD that advances when this one does, but we don't need this for our use
128+
// cases and getting all the behavior correct might be hard, so don't bother for now.
129+
throw UnsupportedOperationException(
130+
"limitedParallelism is not supported for WorkStealingDispatcher"
131+
)
132+
}
133+
134+
/**
135+
* "Steals" work that was scheduled on this dispatcher but hasn't had a chance to run yet and runs
136+
* it, until there is no work left to do. If the work schedules more work, that will also be ran
137+
* before the method returns.
138+
*
139+
* Some care must be taken with this method: if it is called from a coroutine that is dispatched
140+
* by this dispatcher, then it may run inside the stack frame of another call and in that case the
141+
* inner call may think the dispatcher is idle when the outer call still has more tasks to run.
142+
*
143+
* It is technically safe to call from multiple threads, even in parallel, although the behavior
144+
* is undefined. E.g. One thread might return from this method before the other has finished
145+
* running all tasks.
146+
*/
147+
// If we need a strong guarantee for reentrant calls we could use a ThreadLocal so the inner call
148+
// could steal work from the outer one.
149+
// If we need a strong guarantee for calling from multiple threads we could just run this method
150+
// with a separate lock so all threads would just wait on the first one to finish running, but
151+
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
152+
fun advanceUntilIdle() {
153+
var wasDispatchScheduled = false
154+
advanceUntilIdle(
155+
onStartLocked = {
156+
// If no dispatch was scheduled, then set the flag so that any new dispatch calls that
157+
// happen while we're draining the queue won't schedule one unnecessarily since we'll just
158+
// handle them.
159+
// Note that we could "cancel" the dispatch if this is true here, since we're stealing all
160+
// its work, but we can't cancel that task so it will just have to noop.
161+
wasDispatchScheduled = dispatchScheduled.also {
162+
if (!it) dispatchScheduled = true
163+
}
164+
},
165+
onFinishedLocked = {
166+
// If we set this flag above, then clear it now so future dispatch calls schedule normally.
167+
dispatchScheduled = wasDispatchScheduled
168+
}
169+
)
170+
}
171+
172+
/**
173+
* Executes queued work items until there are none left, then returns.
174+
*
175+
* @param onStartLocked Called while [lock] is held exactly 1 time before any tasks are executed.
176+
* @param onFinishedLocked Called while [lock] is held exactly 1 time after all tasks are finished
177+
* executing.
178+
*/
179+
private inline fun advanceUntilIdle(
180+
onStartLocked: () -> Unit = {},
181+
onFinishedLocked: () -> Unit
182+
) {
183+
var queueToDrain: RunQueue? = null
184+
do {
185+
lock.withLock {
186+
// Will only be null on first loop, since if it's null after this critical section we'll
187+
// exit the loop.
188+
if (queueToDrain == null) {
189+
onStartLocked()
190+
}
191+
192+
// We're about to overwrite queueToDrain, so put the old one back so future calls to
193+
// dispatch might not need to allocate a new queue.
194+
queueToDrain = consumeQueueLocked(queueToRecycle = queueToDrain).also {
195+
if (it == null) {
196+
onFinishedLocked()
197+
}
198+
}
199+
}
200+
201+
// Drain the queue outside the critical section to ensure we don't deadlock if any of the
202+
// runnables try to dispatch.
203+
println("OMG draining queue of size ${queueToDrain?.size}")
204+
queueToDrain?.runAll()
205+
} while (queueToDrain != null)
206+
}
207+
208+
/**
209+
* If there are work items queued up, returns the queue, otherwise returns null. MUST ONLY BE
210+
* CALLED while [lock] is held.
211+
*
212+
* If [queueToRecycle] is non-null then we try to place it back in the [queue] property for the
213+
* next call to [dispatch] (after clearing it) so it won't have to allocate a new one. After this
214+
* method returns [queueToRecycle] is unsafe to use for the calling code since it might be
215+
* modified by another thread.
216+
*/
217+
private fun consumeQueueLocked(queueToRecycle: RunQueue?): RunQueue? {
218+
if (queueToRecycle != null && queueToRecycle === queue) {
219+
throw IllegalArgumentException("Cannot recycle queue with itself")
220+
}
221+
222+
// Note: clear() iterates through the list to null everything out, so avoid calling it unless
223+
// necessary.
224+
val queue = this.queue
225+
return when {
226+
queue == null -> {
227+
// The next dispatch would allocate a new queue, so recycle one if possible.
228+
println("OMG recycling queue to null")
229+
this.queue = queueToRecycle?.apply { clear() }
230+
null
231+
}
232+
233+
queue.isEmpty() -> {
234+
// There's nothing to process in an empty queue, so don't return it at all. And since the
235+
// next dispatch call already has a queue to use, so just let the recycled one be GC'd and
236+
// don't bother clearing it.
237+
println("OMG queue is empty, not recycling")
238+
null
239+
}
240+
241+
else -> {
242+
// There are queued tasks, so return the current queue and replace it with the recycled one.
243+
println("OMG recycling queue to non-null")
244+
queue.also {
245+
this.queue = queueToRecycle?.apply { clear() }
246+
}
247+
}
248+
}
249+
}
250+
251+
private fun RunQueue.runAll() {
252+
forEach {
253+
it.run()
254+
}
255+
}
256+
257+
/**
258+
* A reusable continuation that is used to access the coroutine runtime's resumption behavior for
259+
* both confined and unconfined dispatchers. See [resumeOnDelegateDispatcher] for more information
260+
* on how this works.
261+
*
262+
* [WorkStealingDispatcher] guarantees that only one instance of this class will be created per
263+
* dispatcher, and that it will never be resumed more than once concurrently, so it's safe to
264+
* reuse.
265+
*/
266+
private inner class DispatchContinuation : Continuation<Unit> {
267+
override val context: CoroutineContext get() = delegateInterceptor
268+
269+
/**
270+
* Cache for intercepted coroutine so we can release it from [resumeWith].
271+
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
272+
* is done, so we don't need to guard this property with a lock.
273+
*/
274+
private var intercepted: Continuation<Unit>? = null
275+
276+
/**
277+
* Resumes this continuation on [delegateInterceptor] by intercepting it and resuming the
278+
* intercepted continuation. When a dispatcher returns false from [isDispatchNeeded], then when
279+
* continuations intercepted by it are resumed, they may either be ran in-place or scheduled to
280+
* a special thread-local queue. The only way to access this queue is to have the dispatcher
281+
* intercept a continuation and resume the intercepted continuation.
282+
*/
283+
fun resumeOnDelegateDispatcher() {
284+
val intercepted = delegateInterceptor.interceptContinuation(this).also {
285+
this.intercepted = it
286+
}
287+
288+
// If delegate is a CoroutineDispatcher, intercepted will be a special Continuation that will
289+
// check the delegate's isDispatchNeeded to decide whether to call dispatch() or to enqueue it
290+
// to the thread-local unconfined queue.
291+
intercepted.resume(Unit)
292+
}
293+
294+
/**
295+
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
296+
*/
297+
override fun resumeWith(result: Result<Unit>) {
298+
println("OMG DispatchContinuation resuming")
299+
intercepted?.let {
300+
if (it !== this) {
301+
delegateInterceptor.releaseInterceptedContinuation(it)
302+
}
303+
intercepted = null
304+
}
305+
306+
println("OMG DispatchContinuation draining queue")
307+
advanceUntilIdle(onFinishedLocked = {
308+
// Set this in the lock when we're about to return so that any dispatch calls waiting
309+
// on the lock will know to schedule a fresh dispatch.
310+
dispatchScheduled = false
311+
})
312+
println("OMG DispatchContinuation done draining queue")
313+
}
314+
}
315+
}

0 commit comments

Comments
 (0)