@@ -7,22 +7,29 @@ import com.squareup.workflow1.RuntimeConfigOptions.Companion.RuntimeOptions.DEFA
77import com.squareup.workflow1.RuntimeConfigOptions.DRAIN_EXCLUSIVE_ACTIONS
88import com.squareup.workflow1.RuntimeConfigOptions.PARTIAL_TREE_RENDERING
99import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES
10+ import com.squareup.workflow1.RuntimeConfigOptions.WORK_STEALING_DISPATCHER
1011import com.squareup.workflow1.WorkflowInterceptor.RenderPassSkipped
1112import com.squareup.workflow1.WorkflowInterceptor.RenderingProduced
1213import com.squareup.workflow1.WorkflowInterceptor.RuntimeUpdate
1314import kotlinx.coroutines.CancellationException
1415import kotlinx.coroutines.CompletableDeferred
1516import kotlinx.coroutines.CoroutineExceptionHandler
17+ import kotlinx.coroutines.Dispatchers
1618import kotlinx.coroutines.ExperimentalCoroutinesApi
19+ import kotlinx.coroutines.awaitCancellation
1720import kotlinx.coroutines.cancel
21+ import kotlinx.coroutines.cancelAndJoin
1822import kotlinx.coroutines.channels.Channel
1923import kotlinx.coroutines.flow.MutableSharedFlow
2024import kotlinx.coroutines.flow.MutableStateFlow
2125import kotlinx.coroutines.flow.StateFlow
2226import kotlinx.coroutines.flow.map
27+ import kotlinx.coroutines.flow.produceIn
2328import kotlinx.coroutines.flow.receiveAsFlow
2429import kotlinx.coroutines.isActive
30+ import kotlinx.coroutines.job
2531import kotlinx.coroutines.launch
32+ import kotlinx.coroutines.plus
2633import kotlinx.coroutines.suspendCancellableCoroutine
2734import kotlinx.coroutines.sync.Mutex
2835import kotlinx.coroutines.test.StandardTestDispatcher
@@ -46,7 +53,7 @@ import kotlin.test.assertTrue
4653@Burst
4754class RenderWorkflowInTest (
4855 useTracer : Boolean = false ,
49- useUnconfined : Boolean = true ,
56+ private val useUnconfined : Boolean = true ,
5057 private val runtime : RuntimeOptions = DEFAULT
5158) {
5259
@@ -1502,7 +1509,9 @@ class RenderWorkflowInTest(
15021509
15031510 @Test
15041511 fun for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output () {
1505- if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS )) {
1512+ if (CONFLATE_STALE_RENDERINGS in runtimeConfig &&
1513+ WORK_STEALING_DISPATCHER !in runtimeConfig
1514+ ) {
15061515 runTest(dispatcherUsed) {
15071516 check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS ))
15081517
@@ -1746,6 +1755,126 @@ class RenderWorkflowInTest(
17461755 }
17471756 }
17481757
1758+ /* *
1759+ * When the [CONFLATE_STALE_RENDERINGS] flag is specified, the runtime will repeatedly run all
1760+ * enqueued WorkflowActions after a render pass, before emitting the rendering to the external
1761+ * flow. When the [WORK_STEALING_DISPATCHER] flag is specified at the same time, any coroutines
1762+ * launched (or even resumed) since the render pass will be allowed to run _before_ checking for
1763+ * actions. This means that any new side effects or workers started by the render pass will be
1764+ * allowed to run to their first suspension point before the rendering is emitted. And if they
1765+ * happen to emit more actions as part of that, then those actions will also be processed, etc.
1766+ * until no more actions are available – only then will the rendering actually be emitted.
1767+ */
1768+ @Test
1769+ fun new_effect_coroutines_dispatched_before_rendering_emitted_when_work_stealing_dispatcher () {
1770+ // This tests is specifically for standard dispatching behavior. It currently only works when
1771+ // CSR is enabled, although an additional test for DEA should be added.
1772+ if (WORK_STEALING_DISPATCHER !in runtimeConfig ||
1773+ CONFLATE_STALE_RENDERINGS !in runtimeConfig ||
1774+ useUnconfined
1775+ ) {
1776+ return
1777+ }
1778+
1779+ runTest(dispatcherUsed) {
1780+ val workflow = Workflow .stateful<Int , Nothing , Unit >(initialState = 0 ) { effectCount ->
1781+ // Because of the WSD, this effect will be allowed to run after the render pass but before
1782+ // emitting the rendering OR checking for new actions, in the CSR loop. Since it emits an
1783+ // action, that action will be processed and trigger a second render pass.
1784+ runningSideEffect(" sender" ) {
1785+ actionSink.send(
1786+ action(" 0" ) {
1787+ expect(2 )
1788+ this .state++
1789+ }
1790+ )
1791+ }
1792+
1793+ if (effectCount >= 1 ) {
1794+ // This effect will be started by the first action and cancelled only when the runtime
1795+ // is cancelled.
1796+ // It will also start in the CSR loop, and trigger a third render pass before emitting the
1797+ // rendering.
1798+ runningSideEffect(" 0" ) {
1799+ expect(3 )
1800+ actionSink.send(
1801+ action(" 1" ) {
1802+ expect(4 )
1803+ this .state++
1804+ }
1805+ )
1806+ awaitCancellation {
1807+ expect(9 )
1808+ }
1809+ }
1810+ }
1811+
1812+ if (effectCount >= 2 ) {
1813+ // This effect will be started by the second action, and cancelled by its own action in
1814+ // the same run of the CSR loop again.
1815+ runningSideEffect(" 1" ) {
1816+ expect(5 )
1817+ actionSink.send(
1818+ action(" -1" ) {
1819+ expect(6 )
1820+ this .state--
1821+ }
1822+ )
1823+ awaitCancellation {
1824+ expect(7 )
1825+ }
1826+ }
1827+ }
1828+ }
1829+
1830+ // We collect the renderings flow to a channel to drive the runtime loop by receiving from the
1831+ // channel. We can't use testScheduler.advanceUntilIdle() et al because we only want the test
1832+ // scheduler to run tasks until a rendering is available, not indefinitely.
1833+ val renderings = renderWorkflowIn(
1834+ workflow = workflow,
1835+ // Run in this scope so it is advanced by advanceUntilIdle.
1836+ scope = backgroundScope,
1837+ props = MutableStateFlow (Unit ),
1838+ runtimeConfig = runtimeConfig,
1839+ workflowTracer = testTracer,
1840+ onOutput = {}
1841+ ).produceIn(backgroundScope + Dispatchers .Unconfined )
1842+
1843+ expect(0 )
1844+ // Receiving the first rendering allows the runtime coroutine to start. The first rendering
1845+ // is returned synchronously.
1846+ renderings.receive()
1847+ expect(1 )
1848+ // Receiving the second rendering will allow the runtime to continue until the rendering is
1849+ // emitted. Since the CSR loop will start all our effects before emitting the next rendering,
1850+ // only one rendering will be emitted for all those render passes.
1851+ renderings.receive()
1852+ expect(8 )
1853+
1854+ // No more renderings should be produced.
1855+ testScheduler.advanceUntilIdle()
1856+ assertTrue(renderings.isEmpty)
1857+
1858+ // Cancel the whole workflow runtime, including all effects.
1859+ backgroundScope.coroutineContext.job.cancelAndJoin()
1860+ expect(10 )
1861+ }
1862+ }
1863+
1864+ private suspend fun awaitCancellation (onFinally : () -> Unit ) {
1865+ try {
1866+ awaitCancellation()
1867+ } finally {
1868+ onFinally()
1869+ }
1870+ }
1871+
1872+ private var expectCounter = 0
1873+ private fun expect (expected : Int ) {
1874+ assertEquals(expected, expectCounter)
1875+ expectCounter++
1876+ }
1877+
17491878 @Test
17501879 fun for_drain_exclusive_we_handle_multiple_actions_in_one_render_or_not () = runTest(
17511880 dispatcherUsed
0 commit comments