Skip to content

Commit 49ed4f4

Browse files
WIP rewrite to support parallel delegates, Delay
1 parent e56709d commit 49ed4f4

File tree

2 files changed

+360
-160
lines changed

2 files changed

+360
-160
lines changed

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkStealingDispatcher.kt

Lines changed: 54 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,16 @@ package com.squareup.workflow1.internal
22

33
import com.squareup.workflow1.internal.WorkStealingDispatcher.Companion.wrapDispatcherFrom
44
import kotlinx.coroutines.CoroutineDispatcher
5+
import kotlinx.coroutines.Delay
56
import kotlinx.coroutines.Dispatchers
67
import kotlinx.coroutines.ExperimentalCoroutinesApi
8+
import kotlinx.coroutines.InternalCoroutinesApi
79
import kotlinx.coroutines.Runnable
8-
import kotlin.concurrent.Volatile
910
import kotlin.coroutines.Continuation
1011
import kotlin.coroutines.ContinuationInterceptor
1112
import kotlin.coroutines.CoroutineContext
1213
import kotlin.coroutines.resume
1314

14-
private typealias RunQueue = ArrayList<Runnable>
15-
1615
/**
1716
* A [CoroutineDispatcher] that delegates to another dispatcher but allows stealing any work
1817
* scheduled on this dispatcher and performing it synchronously by calling [advanceUntilIdle].
@@ -21,7 +20,7 @@ private typealias RunQueue = ArrayList<Runnable>
2120
*
2221
* E.g.
2322
* ```
24-
* val dispatcher = WorkStealingDispatcher.wrappingDispatcherFrom(scope.coroutineContext)
23+
* val dispatcher = WorkStealingDispatcher.wrapDispatcherFrom(scope.coroutineContext)
2524
* scope.launch(dispatcher) {
2625
* while (true) {
2726
* lots()
@@ -38,16 +37,22 @@ private typealias RunQueue = ArrayList<Runnable>
3837
* delegate scheduling behavior to. This can either be a confined or unconfined dispatcher, and its
3938
* behavior will be preserved transparently.
4039
*/
41-
internal class WorkStealingDispatcher(
40+
internal open class WorkStealingDispatcher protected constructor(
4241
private val delegateInterceptor: ContinuationInterceptor
4342
) : CoroutineDispatcher() {
4443

4544
companion object {
4645
/**
47-
* The initial storage capacity for the task queue. We use a small queue capacity since in most
48-
* cases the queue should be processed very soon after enqueuing.
46+
* Creates a [WorkStealingDispatcher] that supports [Delay] if [delegateInterceptor] does.
4947
*/
50-
private const val INITIAL_QUEUE_CAPACITY = 3
48+
@OptIn(InternalCoroutinesApi::class)
49+
operator fun invoke(delegateInterceptor: ContinuationInterceptor): WorkStealingDispatcher {
50+
return if (delegateInterceptor is Delay) {
51+
DelayableWorkStealingDispatcher(delegateInterceptor, delegateInterceptor)
52+
} else {
53+
WorkStealingDispatcher(delegateInterceptor)
54+
}
55+
}
5156

5257
/**
5358
* Returns a [WorkStealingDispatcher] that delegates to the [CoroutineDispatcher] from
@@ -57,70 +62,43 @@ internal class WorkStealingDispatcher(
5762
// If there's no dispatcher in the context then the coroutines runtime will fall back to
5863
// Dispatchers.Default anyway.
5964
val baseDispatcher = context[ContinuationInterceptor] ?: Dispatchers.Default
60-
return WorkStealingDispatcher(delegateInterceptor = baseDispatcher)
65+
return invoke(delegateInterceptor = baseDispatcher)
6166
}
6267
}
6368

6469
/** Used to synchronize access to the mutable properties of this class. */
6570
private val lock = Lock()
6671

6772
// region Access to these properties must always be synchronized with lock.
68-
/**
69-
* The queue of unconsumed work items. When there is no contention on the dispatcher, only one
70-
* queue will ever be allocated. Only when [dispatch] is called while the queue is being processed
71-
* (either by [advanceUntilIdle] or a [DispatchContinuation]) then a new queue will be allocated,
72-
* but when the processing is done the old one will be placed back here to be re-used.
73-
*/
74-
@Volatile
75-
private var queue: RunQueue? = null
76-
77-
@Volatile
78-
private var dispatchScheduled = false
79-
80-
/**
81-
* Cached [DispatchContinuation] used to delegate to the [delegateInterceptor]'s dispatching
82-
* behavior from [dispatch]. This is initialized the first call to [dispatch] that needs dispatch,
83-
* and then never changed.
84-
*/
85-
@Volatile
86-
private var dispatchContinuation: DispatchContinuation? = null
73+
private val queue = LinkedHashSet<DelegateDispatchedContinuation>()
8774
// endregion
8875

8976
/**
9077
* Always returns true since we always need to track what work is waiting so we can advance it.
9178
*/
92-
override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
79+
final override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
9380

94-
override fun dispatch(
81+
final override fun dispatch(
9582
context: CoroutineContext,
9683
block: Runnable
9784
) {
98-
var continuation: DispatchContinuation? = null
85+
val continuation = DelegateDispatchedContinuation(context, block)
9986
lock.withLock {
100-
val queue = this.queue ?: RunQueue(INITIAL_QUEUE_CAPACITY).also { this.queue = it }
101-
queue += block
102-
103-
// If no dispatch is currently scheduled, then flag that we're handling it, and schedule one
104-
// outside the critical section.
105-
if (!dispatchScheduled) {
106-
dispatchScheduled = true
107-
continuation = dispatchContinuation ?: DispatchContinuation()
108-
.also { dispatchContinuation = it }
109-
}
87+
queue += continuation
11088
}
11189

11290
// Trampoline the dispatch outside the critical section to avoid deadlocks.
11391
// This will either synchronously run block or dispatch it, depending on what resuming a
11492
// continuation on the delegate dispatcher would do.
115-
continuation?.resumeOnDelegateDispatcher()
93+
continuation.resumeOnDelegateDispatcher()
11694
}
11795

11896
/**
11997
* Throws [UnsupportedOperationException]. We can't allow the default implementation to run in
12098
* case the delegate dispatcher would throw.
12199
*/
122100
@ExperimentalCoroutinesApi
123-
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
101+
final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
124102
// We could probably support this by forwarding the call to the delegate then wrapping that
125103
// dispatcher with a WSD that advances when this one does, but we don't need this for our use
126104
// cases and getting all the behavior correct might be hard, so don't bother for now.
@@ -148,118 +126,29 @@ internal class WorkStealingDispatcher(
148126
// with a separate lock so all threads would just wait on the first one to finish running, but
149127
// that could deadlock if any of the dispatched coroutines call this method reentrantly.
150128
fun advanceUntilIdle() {
151-
var wasDispatchScheduled = false
152-
advanceUntilIdle(
153-
onStartLocked = {
154-
// If no dispatch was scheduled, then set the flag so that any new dispatch calls that
155-
// happen while we're draining the queue won't schedule one unnecessarily since we'll just
156-
// handle them.
157-
// Note that we could "cancel" the dispatch if this is true here, since we're stealing all
158-
// its work, but we can't cancel that task so it will just have to noop.
159-
wasDispatchScheduled = dispatchScheduled.also {
160-
if (!it) dispatchScheduled = true
161-
}
162-
},
163-
onFinishedLocked = {
164-
// If we set this flag above, then clear it now so future dispatch calls schedule normally.
165-
dispatchScheduled = wasDispatchScheduled
166-
}
167-
)
168-
}
169-
170-
/**
171-
* Executes queued work items until there are none left, then returns.
172-
*
173-
* @param onStartLocked Called while [lock] is held exactly 1 time before any tasks are executed.
174-
* @param onFinishedLocked Called while [lock] is held exactly 1 time after all tasks are finished
175-
* executing.
176-
*/
177-
private inline fun advanceUntilIdle(
178-
onStartLocked: () -> Unit = {},
179-
onFinishedLocked: () -> Unit
180-
) {
181-
var queueToDrain: RunQueue? = null
182129
do {
183-
lock.withLock {
184-
// Will only be null on first loop, since if it's null after this critical section we'll
185-
// exit the loop.
186-
if (queueToDrain == null) {
187-
onStartLocked()
188-
}
189-
190-
// We're about to overwrite queueToDrain, so put the old one back so future calls to
191-
// dispatch might not need to allocate a new queue.
192-
queueToDrain = consumeQueueLocked(queueToRecycle = queueToDrain).also {
193-
if (it == null) {
194-
onFinishedLocked()
195-
}
196-
}
197-
}
198-
199-
// Drain the queue outside the critical section to ensure we don't deadlock if any of the
200-
// runnables try to dispatch.
201-
queueToDrain?.runAll()
202-
} while (queueToDrain != null)
130+
val task = nextTask()
131+
task?.releaseAndRun()
132+
} while (task != null)
203133
}
204134

205-
/**
206-
* If there are work items queued up, returns the queue, otherwise returns null. MUST ONLY BE
207-
* CALLED while [lock] is held.
208-
*
209-
* If [queueToRecycle] is non-null then we try to place it back in the [queue] property for the
210-
* next call to [dispatch] (after clearing it) so it won't have to allocate a new one. After this
211-
* method returns [queueToRecycle] is unsafe to use for the calling code since it might be
212-
* modified by another thread.
213-
*/
214-
private fun consumeQueueLocked(queueToRecycle: RunQueue?): RunQueue? {
215-
if (queueToRecycle != null && queueToRecycle === queue) {
216-
throw IllegalArgumentException("Cannot recycle queue with itself")
217-
}
218-
219-
// Note: clear() iterates through the list to null everything out, so avoid calling it unless
220-
// necessary.
221-
val queue = this.queue
222-
return when {
223-
queue == null -> {
224-
// The next dispatch would allocate a new queue, so recycle one if possible.
225-
this.queue = queueToRecycle?.apply { clear() }
226-
null
227-
}
228-
229-
queue.isEmpty() -> {
230-
// There's nothing to process in an empty queue, so don't return it at all. And since the
231-
// next dispatch call already has a queue to use, so just let the recycled one be GC'd and
232-
// don't bother clearing it.
233-
null
234-
}
235-
236-
else -> {
237-
// There are queued tasks, so return the current queue and replace it with the recycled one.
238-
queue.also {
239-
this.queue = queueToRecycle?.apply { clear() }
240-
}
135+
private fun nextTask(): DelegateDispatchedContinuation? {
136+
lock.withLock {
137+
val iterator = queue.iterator()
138+
if (iterator.hasNext()) {
139+
val task = iterator.next()
140+
iterator.remove()
141+
return task
142+
} else {
143+
return null
241144
}
242145
}
243146
}
244147

245-
private fun RunQueue.runAll() {
246-
forEach {
247-
it.run()
248-
}
249-
}
250-
251-
/**
252-
* A reusable continuation that is used to access the coroutine runtime's resumption behavior for
253-
* both confined and unconfined dispatchers. See [resumeOnDelegateDispatcher] for more information
254-
* on how this works.
255-
*
256-
* [WorkStealingDispatcher] guarantees that only one instance of this class will be created per
257-
* dispatcher, and that it will never be resumed more than once concurrently, so it's safe to
258-
* reuse.
259-
*/
260-
private inner class DispatchContinuation : Continuation<Unit> {
261-
override val context: CoroutineContext get() = delegateInterceptor
262-
148+
private inner class DelegateDispatchedContinuation(
149+
override val context: CoroutineContext,
150+
private val runnable: Runnable
151+
) : Continuation<Unit> {
263152
/**
264153
* Cache for intercepted coroutine so we can release it from [resumeWith].
265154
* [WorkStealingDispatcher] guarantees only one resume call will happen until the continuation
@@ -289,18 +178,29 @@ internal class WorkStealingDispatcher(
289178
* DO NOT CALL DIRECTLY! Call [resumeOnDelegateDispatcher] instead.
290179
*/
291180
override fun resumeWith(result: Result<Unit>) {
181+
val shouldRun = lock.withLock {
182+
queue.remove(this)
183+
}
184+
185+
if (shouldRun) {
186+
releaseAndRun()
187+
}
188+
}
189+
190+
fun releaseAndRun() {
292191
intercepted?.let {
293192
if (it !== this) {
294193
delegateInterceptor.releaseInterceptedContinuation(it)
295194
}
296195
intercepted = null
297196
}
298-
299-
advanceUntilIdle(onFinishedLocked = {
300-
// Set this in the lock when we're about to return so that any dispatch calls waiting
301-
// on the lock will know to schedule a fresh dispatch.
302-
dispatchScheduled = false
303-
})
197+
runnable.run()
304198
}
305199
}
306200
}
201+
202+
@OptIn(InternalCoroutinesApi::class)
203+
private class DelayableWorkStealingDispatcher(
204+
delegate: ContinuationInterceptor,
205+
delay: Delay
206+
) : WorkStealingDispatcher(delegate), Delay by delay

0 commit comments

Comments
 (0)