Skip to content

Commit c1b5d1f

Browse files
1093: Launch Side Effects atomically so they are always started
Add headlessIntegrationTest to renderWorkflowIn with a nice Turbine attached. Still TODO: Add more documentation (and enforcement) of not using Unconfined dispatchers.
1 parent b130a43 commit c1b5d1f

File tree

13 files changed

+597
-188
lines changed

13 files changed

+597
-188
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ android.useAndroidX=true
88
systemProp.org.gradle.internal.publish.checksums.insecure=true
99

1010
GROUP=com.squareup.workflow1
11-
VERSION_NAME=1.11.0-beta04-SNAPSHOT
11+
VERSION_NAME=1.11.0-beta04-atomic-w-SNAPSHOT
1212

1313
POM_DESCRIPTION=Square Workflow
1414

workflow-core/src/commonMain/kotlin/com/squareup/workflow1/Worker.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,6 @@ public interface Worker<out OutputT> {
111111
* When the worker is torn down, the coroutine is cancelled.
112112
* This coroutine is launched in the same scope as the workflow runtime, with a few changes:
113113
*
114-
* - The dispatcher is always set to [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] to
115-
* minimize overhead for workers that don't care which thread they're executed on (e.g. logging
116-
* side effects, workers that wrap third-party reactive libraries, etc.). If your work cares
117-
* which thread it runs on, use [withContext][kotlinx.coroutines.withContext] or
118-
* [flowOn][kotlinx.coroutines.flow.flowOn] to specify a dispatcher.
119114
* - A [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
120115
* (via `toString`) and the key specified by the workflow running the worker.
121116
*

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/RenderWorkflow.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
200200
}
201201
}
202202

203-
// Pass on to the UI.
203+
// Pass the rendering on to the UI.
204204
renderingsAndSnapshots.value = nextRenderAndSnapshot
205205
// And emit the Output.
206206
sendOutput(actionResult, onOutput)

workflow-runtime/src/commonMain/kotlin/com/squareup/workflow1/internal/WorkflowNode.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import com.squareup.workflow1.internal.RealRenderContext.SideEffectRunner
1919
import kotlinx.coroutines.CancellationException
2020
import kotlinx.coroutines.CoroutineName
2121
import kotlinx.coroutines.CoroutineScope
22-
import kotlinx.coroutines.CoroutineStart.LAZY
22+
import kotlinx.coroutines.CoroutineStart.ATOMIC
2323
import kotlinx.coroutines.DelicateCoroutinesApi
2424
import kotlinx.coroutines.ExperimentalCoroutinesApi
2525
import kotlinx.coroutines.Job
@@ -40,7 +40,10 @@ import kotlin.coroutines.CoroutineContext
4040
* worker coroutines. This context will override anything from the workflow's scope and any other
4141
* hard-coded values added to worker contexts. It must not contain a [Job] element (it would violate
4242
* structured concurrency).
43+
*
44+
* The opt-in for [ExperimentalCoroutinesApi] is for using a [ATOMIC] on side effect Jobs.
4345
*/
46+
@OptIn(ExperimentalCoroutinesApi::class)
4447
internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
4548
val id: WorkflowNodeId,
4649
workflow: StatefulWorkflow<PropsT, StateT, OutputT, RenderingT>,
@@ -212,9 +215,7 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
212215

213216
// Tear down workflows and workers that are obsolete.
214217
subtreeManager.commitRenderedChildren()
215-
// Side effect jobs are launched lazily, since they can send actions to the sink, and can only
216-
// be started after context is frozen.
217-
sideEffects.forEachStaging { it.job.start() }
218+
// Tear down side effects that are no longer declared running.
218219
sideEffects.commitStaging { it.job.cancel() }
219220

220221
return rendering
@@ -260,7 +261,13 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
260261
sideEffect: suspend CoroutineScope.() -> Unit
261262
): SideEffectNode {
262263
val scope = this + CoroutineName("sideEffect[$key] for $id")
263-
val job = scope.launch(start = LAZY, block = sideEffect)
264+
// Side effect jobs are automatic because even if the side effect is run, then not run
265+
// in consecutive render passes before the side effect can be dispatched, we still want it to
266+
// start. Note that this means that side effects must be co-operative or they could unecessarily
267+
// hog runtime dispatch. We could force them to be so by adding an if (!isActive) yield() at the
268+
// start of the sideEffect block, but that also might mean that expected side effects don't
269+
// occur when the sideEffect is run at least once.
270+
val job = scope.launch(start = ATOMIC, block = sideEffect)
264271
return SideEffectNode(key, job)
265272
}
266273
}

workflow-runtime/src/commonTest/kotlin/com/squareup/workflow1/RenderWorkflowInTest.kt

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.coroutines.CoroutineExceptionHandler
99
import kotlinx.coroutines.CoroutineScope
1010
import kotlinx.coroutines.Dispatchers.Unconfined
1111
import kotlinx.coroutines.ExperimentalCoroutinesApi
12-
import kotlinx.coroutines.FlowPreview
1312
import kotlinx.coroutines.cancel
1413
import kotlinx.coroutines.channels.Channel
1514
import kotlinx.coroutines.flow.MutableStateFlow
@@ -27,18 +26,14 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher
2726
import kotlinx.coroutines.test.advanceUntilIdle
2827
import kotlinx.coroutines.test.runCurrent
2928
import okio.ByteString
29+
import kotlin.test.Ignore
3030
import kotlin.test.Test
3131

32-
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class, WorkflowExperimentalRuntime::class)
32+
@OptIn(ExperimentalCoroutinesApi::class, WorkflowExperimentalRuntime::class)
3333
class RenderWorkflowInTest {
3434

3535
/**
36-
* A [TestScope] that will not run until explicitly told to.
37-
*/
38-
private lateinit var pausedTestScope: TestScope
39-
40-
/**
41-
* A [TestScope] that will automatically dispatch enqueued routines.
36+
* lateinit so that we can get a fresh scope for each parameterized test run.
4237
*/
4338
private lateinit var testScope: TestScope
4439

@@ -52,8 +47,7 @@ class RenderWorkflowInTest {
5247
private val runtimeTestRunner = ParameterizedTestRunner<RuntimeConfig>()
5348

5449
private fun setup() {
55-
pausedTestScope = TestScope()
56-
testScope = TestScope(UnconfinedTestDispatcher())
50+
testScope = TestScope()
5751
}
5852

5953
@Test fun initial_rendering_is_calculated_synchronously() {
@@ -67,7 +61,7 @@ class RenderWorkflowInTest {
6761

6862
val renderings = renderWorkflowIn(
6963
workflow = workflow,
70-
scope = pausedTestScope,
64+
scope = testScope,
7165
props = props,
7266
runtimeConfig = runtimeConfig
7367
) {}
@@ -83,17 +77,18 @@ class RenderWorkflowInTest {
8377
val props = MutableStateFlow("foo")
8478
val workflow = Workflow.stateless<String, Nothing, String> { "props: $it" }
8579

86-
pausedTestScope.cancel()
80+
testScope.cancel()
8781
val renderings = renderWorkflowIn(
8882
workflow = workflow,
89-
scope = pausedTestScope,
83+
scope = testScope,
9084
props = props,
9185
runtimeConfig = runtimeConfig
9286
) {}
9387
assertEquals("props: foo", renderings.value.rendering)
9488
}
9589
}
9690

91+
@Ignore // Deciding if we want this behavior or if side effects should start.
9792
@Test
9893
fun `side_effects_from_initial_rendering_in_root_workflow_are_never_started_when_scope_cancelled_before_start`() {
9994
runtimeTestRunner.runParametrizedTest(
@@ -120,6 +115,7 @@ class RenderWorkflowInTest {
120115
}
121116
}
122117

118+
@Ignore // Deciding if we want this behavior.
123119
@Test
124120
fun `side_effects_from_initial_rendering_in_non_root_workflow_are_never_started_when_scope_cancelled_before_start`() {
125121
runtimeTestRunner.runParametrizedTest(
@@ -364,12 +360,15 @@ class RenderWorkflowInTest {
364360
) {
365361
receivedOutputs += it
366362
}
363+
testScope.advanceUntilIdle()
367364
assertTrue(receivedOutputs.isEmpty())
368365

369366
trigger.trySend("foo").isSuccess
367+
testScope.advanceUntilIdle()
370368
assertEquals(listOf("foo"), receivedOutputs)
371369

372370
trigger.trySend("bar").isSuccess
371+
testScope.advanceUntilIdle()
373372
assertEquals(listOf("foo", "bar"), receivedOutputs)
374373
}
375374
}
@@ -535,6 +534,7 @@ class RenderWorkflowInTest {
535534
runtimeConfig = runtimeConfig
536535
) {}
537536
}
537+
testScope.advanceUntilIdle()
538538
assertTrue(sideEffectWasRan)
539539
assertNotNull(cancellationException)
540540
val realCause = generateSequence(cancellationException) { it.cause }
@@ -543,6 +543,7 @@ class RenderWorkflowInTest {
543543
}
544544
}
545545

546+
@Ignore // Deciding if we want this behavior, or if all side effects should at least start.
546547
@Test
547548
fun `side_effects_from_initial_rendering_in_non_root_workflow_are_never_started_when_initial_render_of_non_root_workflow_fails`() {
548549
runtimeTestRunner.runParametrizedTest(
@@ -653,10 +654,12 @@ class RenderWorkflowInTest {
653654
props = MutableStateFlow(Unit),
654655
runtimeConfig = runtimeConfig
655656
) {}
657+
testScope.advanceUntilIdle()
656658
assertNull(cancellationException)
657659
assertTrue(testScope.isActive)
658660

659661
testScope.cancel()
662+
testScope.advanceUntilIdle()
660663
assertTrue(cancellationException is CancellationException)
661664
assertNull(cancellationException!!.cause)
662665
}
@@ -716,10 +719,12 @@ class RenderWorkflowInTest {
716719
props = MutableStateFlow(Unit),
717720
runtimeConfig = runtimeConfig
718721
) {}
722+
testScope.advanceUntilIdle()
719723
assertNull(cancellationException)
720724
assertTrue(testScope.isActive)
721725

722726
testScope.cancel(CancellationException("fail!", ExpectedException()))
727+
testScope.advanceUntilIdle()
723728
assertTrue(cancellationException is CancellationException)
724729
assertTrue(cancellationException!!.cause is ExpectedException)
725730
}
@@ -766,17 +771,17 @@ class RenderWorkflowInTest {
766771
}
767772
val renderings = renderWorkflowIn(
768773
workflow = workflow,
769-
scope = pausedTestScope,
774+
scope = testScope,
770775
props = MutableStateFlow(Unit),
771776
runtimeConfig = runtimeConfig
772777
) {}
773778

774-
pausedTestScope.launch {
779+
testScope.launch {
775780
renderings.collect { throw ExpectedException() }
776781
}
777782
assertNull(cancellationException)
778783

779-
pausedTestScope.advanceUntilIdle()
784+
testScope.advanceUntilIdle()
780785
assertTrue(cancellationException is CancellationException)
781786
assertTrue(cancellationException!!.cause is ExpectedException)
782787
}
@@ -794,20 +799,20 @@ class RenderWorkflowInTest {
794799
}
795800
renderWorkflowIn(
796801
workflow = workflow,
797-
scope = pausedTestScope,
802+
scope = testScope,
798803
props = MutableStateFlow(Unit),
799804
runtimeConfig = runtimeConfig
800805
) {
801806
throw ExpectedException()
802807
}
803-
assertTrue(pausedTestScope.isActive)
808+
assertTrue(testScope.isActive)
804809

805810
trigger.complete(Unit)
806-
assertTrue(pausedTestScope.isActive)
811+
assertTrue(testScope.isActive)
807812

808-
pausedTestScope.advanceUntilIdle()
809-
pausedTestScope.runCurrent()
810-
assertFalse(pausedTestScope.isActive)
813+
testScope.advanceUntilIdle()
814+
testScope.runCurrent()
815+
assertFalse(testScope.isActive)
811816
}
812817
}
813818

@@ -834,20 +839,20 @@ class RenderWorkflowInTest {
834839

835840
renderWorkflowIn(
836841
workflow = workflow,
837-
scope = pausedTestScope,
842+
scope = testScope,
838843
props = MutableStateFlow(Unit),
839844
runtimeConfig = runtimeConfig,
840845
onOutput = { events += "output($it)" }
841846
)
842847
.onEach { events += "rendering(${it.rendering})" }
843-
.launchIn(pausedTestScope)
844-
pausedTestScope.advanceUntilIdle()
845-
pausedTestScope.runCurrent()
848+
.launchIn(testScope)
849+
testScope.advanceUntilIdle()
850+
testScope.runCurrent()
846851
assertEquals(listOf("rendering({no output})"), events)
847852

848853
outputTrigger.complete("output")
849-
pausedTestScope.advanceUntilIdle()
850-
pausedTestScope.runCurrent()
854+
testScope.advanceUntilIdle()
855+
testScope.runCurrent()
851856
assertEquals(
852857
listOf(
853858
"rendering({no output})",
@@ -974,6 +979,7 @@ class RenderWorkflowInTest {
974979
val renderings = ras.map { it.rendering }
975980
.produceIn(testScope)
976981

982+
testScope.advanceUntilIdle()
977983
assertFailsWith<ExpectedException> {
978984
renderings.tryReceive()
979985
.getOrNull()
@@ -983,7 +989,6 @@ class RenderWorkflowInTest {
983989

984990
props.value += 1
985991
testScope.advanceUntilIdle()
986-
testScope.runCurrent()
987992

988993
assertFailsWith<ExpectedException> {
989994
renderings.tryReceive()

0 commit comments

Comments
 (0)