Skip to content

Commit fd02fec

Browse files
1093: Launch Side Effects atomically so they are always started
Add headlessIntegrationTest to renderWorkflowIn with a nice Turbine attached. Check that we do not use the Unconfined dispatcher within the Workflow runtime as it does not provide ordering guarantees. Update documentation and tests to note the need to use a Dispatcher other than Dispatchers.Unconfined.
1 parent b130a43 commit fd02fec

File tree

19 files changed

+638
-286
lines changed

19 files changed

+638
-286
lines changed

benchmarks/performance-poetry/complex-poetry/src/main/java/com/squareup/benchmarks/performance/complex/poetry/PerformancePoetryActivity.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import com.squareup.sample.container.SampleContainers
2222
import com.squareup.sample.poetry.model.Poem
2323
import com.squareup.workflow1.RuntimeConfig
2424
import com.squareup.workflow1.RuntimeConfigOptions.Companion.RENDER_PER_ACTION
25-
import com.squareup.workflow1.WorkflowExperimentalRuntime
2625
import com.squareup.workflow1.WorkflowInterceptor
2726
import com.squareup.workflow1.ui.Screen
2827
import com.squareup.workflow1.ui.ViewEnvironment.Companion.EMPTY
@@ -31,9 +30,11 @@ import com.squareup.workflow1.ui.WorkflowLayout
3130
import com.squareup.workflow1.ui.WorkflowUiExperimentalApi
3231
import com.squareup.workflow1.ui.container.withEnvironment
3332
import com.squareup.workflow1.ui.renderWorkflowIn
33+
import kotlinx.coroutines.Dispatchers
3434
import kotlinx.coroutines.flow.StateFlow
3535
import kotlinx.coroutines.flow.map
3636
import kotlinx.coroutines.flow.onEach
37+
import kotlinx.coroutines.plus
3738
import timber.log.Timber
3839

3940
@OptIn(WorkflowUiExperimentalApi::class)
@@ -53,7 +54,7 @@ class PerformancePoetryActivity : AppCompatActivity() {
5354
private var selectTimeoutCount = 0
5455
private var selectTimeoutMainThreadMessageLatch = 0
5556

56-
@OptIn(WorkflowUiExperimentalApi::class, WorkflowExperimentalRuntime::class)
57+
@OptIn(WorkflowUiExperimentalApi::class)
5758
override fun onCreate(savedInstanceState: Bundle?) {
5859
super.onCreate(savedInstanceState)
5960

@@ -273,7 +274,7 @@ class PoetryModel(
273274
val renderings: StateFlow<Screen> by lazy {
274275
renderWorkflowIn(
275276
workflow = workflow,
276-
scope = viewModelScope,
277+
scope = viewModelScope + Dispatchers.Main.immediate,
277278
savedStateHandle = savedState,
278279
interceptors = interceptor?.let { listOf(it) } ?: emptyList(),
279280
runtimeConfig = runtimeConfig

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ android.useAndroidX=true
88
systemProp.org.gradle.internal.publish.checksums.insecure=true
99

1010
GROUP=com.squareup.workflow1
11-
VERSION_NAME=1.11.0-beta04-SNAPSHOT
11+
VERSION_NAME=1.11.0-beta04-atomic-w-SNAPSHOT
1212

1313
POM_DESCRIPTION=Square Workflow
1414

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/BaseRenderContext.kt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ import kotlin.reflect.typeOf
3030
* )
3131
* ```
3232
*
33-
* To create populate such functions from your `render` method, you first need to define a
33+
* To create such functions from your `render` method, you first need to define a
3434
* [WorkflowAction] to handle the event by changing state, emitting an output, or both. Then, just
3535
* pass a lambda to your rendering that instantiates the action and passes it to
3636
* [actionSink.send][Sink.send].
3737
*
3838
* ## Performing asynchronous work
3939
*
40-
* See [runningWorker].
40+
* See [runningSideEffect] and [runningWorker].
4141
*
4242
* ## Composing children
4343
*
@@ -92,8 +92,15 @@ public interface BaseRenderContext<out PropsT, StateT, in OutputT> {
9292
* [cancelled](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html).
9393
*
9494
* The coroutine will run with the same [CoroutineContext][kotlin.coroutines.CoroutineContext]
95-
* that the workflow runtime is running in. The side effect coroutine will not be started until
96-
* _after_ the first render call than runs it returns.
95+
* that the workflow runtime is running in.
96+
* The coroutine is launched with [CoroutineStart.ATOMIC][kotlinx.coroutines.CoroutineStart.ATOMIC]
97+
* start mode, which means that it will _start_ even if the scope is cancelled before it has a
98+
* chance to dispatch. This is to guarantee that any time a [sideEffect] is declared running
99+
* in any render pass, it will at least be started. If the backing scope is cancelled - it is no
100+
* longer declared as running in a consecutive render pass, or the rendering [Workflow] is no
101+
* longer rendered - then it will be cancelled at the first suspension point within [sideEffect].
102+
*
103+
*
97104
*
98105
* @param key The string key that is used to distinguish between side effects.
99106
* @param sideEffect The suspend function that will be launched in a coroutine to perform the

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/Worker.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,6 @@ public interface Worker<out OutputT> {
111111
* When the worker is torn down, the coroutine is cancelled.
112112
* This coroutine is launched in the same scope as the workflow runtime, with a few changes:
113113
*
114-
* - The dispatcher is always set to [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] to
115-
* minimize overhead for workers that don't care which thread they're executed on (e.g. logging
116-
* side effects, workers that wrap third-party reactive libraries, etc.). If your work cares
117-
* which thread it runs on, use [withContext][kotlinx.coroutines.withContext] or
118-
* [flowOn][kotlinx.coroutines.flow.flowOn] to specify a dispatcher.
119114
* - A [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
120115
* (via `toString`) and the key specified by the workflow running the worker.
121116
*

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import com.squareup.workflow1.internal.WorkflowRunner
66
import com.squareup.workflow1.internal.chained
77
import kotlinx.coroutines.CancellationException
88
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.Dispatchers
910
import kotlinx.coroutines.Job
1011
import kotlinx.coroutines.flow.Flow
1112
import kotlinx.coroutines.flow.MutableStateFlow
1213
import kotlinx.coroutines.flow.StateFlow
1314
import kotlinx.coroutines.isActive
1415
import kotlinx.coroutines.launch
16+
import kotlin.coroutines.ContinuationInterceptor
1517

1618
/**
1719
* Launches the [workflow] in a new coroutine in [scope] and returns a [StateFlow] of its
@@ -114,6 +116,14 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
114116
): StateFlow<RenderingAndSnapshot<RenderingT>> {
115117
val chainedInterceptor = interceptors.chained()
116118

119+
check(
120+
scope.coroutineContext[ContinuationInterceptor.Key]?.equals(Dispatchers.Unconfined) == false
121+
) {
122+
"Cannot use an Unconfined CoroutineDispatcher for the Workflow Runtime. It does not guarantee" +
123+
" any dispatch ordering, so we may send actions to unfrozen sinks." +
124+
" Use another CoroutineDispatcher. Our recommendation is Main.immediate."
125+
}
126+
117127
val runner =
118128
WorkflowRunner(scope, workflow, props, initialSnapshot, chainedInterceptor, runtimeConfig)
119129

@@ -200,7 +210,7 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
200210
}
201211
}
202212

203-
// Pass on to the UI.
213+
// Pass the rendering on to the UI.
204214
renderingsAndSnapshots.value = nextRenderAndSnapshot
205215
// And emit the Output.
206216
sendOutput(actionResult, onOutput)

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkflowNode.kt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import com.squareup.workflow1.internal.RealRenderContext.SideEffectRunner
1919
import kotlinx.coroutines.CancellationException
2020
import kotlinx.coroutines.CoroutineName
2121
import kotlinx.coroutines.CoroutineScope
22-
import kotlinx.coroutines.CoroutineStart.LAZY
22+
import kotlinx.coroutines.CoroutineStart.ATOMIC
2323
import kotlinx.coroutines.DelicateCoroutinesApi
2424
import kotlinx.coroutines.ExperimentalCoroutinesApi
2525
import kotlinx.coroutines.Job
@@ -40,7 +40,10 @@ import kotlin.coroutines.CoroutineContext
4040
* worker coroutines. This context will override anything from the workflow's scope and any other
4141
* hard-coded values added to worker contexts. It must not contain a [Job] element (it would violate
4242
* structured concurrency).
43+
*
44+
* The opt-in for [ExperimentalCoroutinesApi] is for using a [ATOMIC] on side effect Jobs.
4345
*/
46+
@OptIn(ExperimentalCoroutinesApi::class)
4447
internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
4548
val id: WorkflowNodeId,
4649
workflow: StatefulWorkflow<PropsT, StateT, OutputT, RenderingT>,
@@ -212,9 +215,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
212215

213216
// Tear down workflows and workers that are obsolete.
214217
subtreeManager.commitRenderedChildren()
215-
// Side effect jobs are launched lazily, since they can send actions to the sink, and can only
216-
// be started after context is frozen.
217-
sideEffects.forEachStaging { it.job.start() }
218+
// Tear down side effects that are no longer declared running.
218219
sideEffects.commitStaging { it.job.cancel() }
219220

220221
return rendering
@@ -260,7 +261,14 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
260261
sideEffect: suspend CoroutineScope.() -> Unit
261262
): SideEffectNode {
262263
val scope = this + CoroutineName("sideEffect[$key] for $id")
263-
val job = scope.launch(start = LAZY, block = sideEffect)
264+
// Side effect jobs are ATOMIC because even if the side effect is run and then NOT run
265+
// in consecutive render passes before the side effect can be dispatched, we still want it to
266+
// start. Note that this means that side effects must be co-operative or they could
267+
// unnecessarily hog runtime dispatch. We could force them to be so by adding an
268+
// `if (!isActive) yield()`
269+
// at the start of the sideEffect block, but that also might mean that expected side effects
270+
// don't occur when the sideEffect is run at least once.
271+
val job = scope.launch(start = ATOMIC, block = sideEffect)
264272
return SideEffectNode(key, job)
265273
}
266274
}

0 commit comments

Comments
 (0)