@@ -6,22 +6,29 @@ import com.squareup.workflow1.RuntimeConfigOptions.Companion.RuntimeOptions
66import com.squareup.workflow1.RuntimeConfigOptions.Companion.RuntimeOptions.DEFAULT
77import com.squareup.workflow1.RuntimeConfigOptions.PARTIAL_TREE_RENDERING
88import com.squareup.workflow1.RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES
9+ import com.squareup.workflow1.RuntimeConfigOptions.WORK_STEALING_DISPATCHER
910import com.squareup.workflow1.WorkflowInterceptor.RenderPassSkipped
1011import com.squareup.workflow1.WorkflowInterceptor.RenderPassesComplete
1112import com.squareup.workflow1.WorkflowInterceptor.RuntimeLoopOutcome
1213import kotlinx.coroutines.CancellationException
1314import kotlinx.coroutines.CompletableDeferred
1415import kotlinx.coroutines.CoroutineExceptionHandler
16+ import kotlinx.coroutines.Dispatchers
1517import kotlinx.coroutines.ExperimentalCoroutinesApi
18+ import kotlinx.coroutines.awaitCancellation
1619import kotlinx.coroutines.cancel
20+ import kotlinx.coroutines.cancelAndJoin
1721import kotlinx.coroutines.channels.Channel
1822import kotlinx.coroutines.flow.MutableSharedFlow
1923import kotlinx.coroutines.flow.MutableStateFlow
2024import kotlinx.coroutines.flow.StateFlow
2125import kotlinx.coroutines.flow.map
26+ import kotlinx.coroutines.flow.produceIn
2227import kotlinx.coroutines.flow.receiveAsFlow
2328import kotlinx.coroutines.isActive
29+ import kotlinx.coroutines.job
2430import kotlinx.coroutines.launch
31+ import kotlinx.coroutines.plus
2532import kotlinx.coroutines.suspendCancellableCoroutine
2633import kotlinx.coroutines.sync.Mutex
2734import kotlinx.coroutines.test.StandardTestDispatcher
@@ -45,7 +52,7 @@ import kotlin.test.assertTrue
4552@Burst
4653class RenderWorkflowInTest (
4754 useTracer : Boolean = false ,
48- useUnconfined : Boolean = true ,
55+ private val useUnconfined : Boolean = true ,
4956 private val runtime : RuntimeOptions = DEFAULT
5057) {
5158
@@ -1494,7 +1501,9 @@ class RenderWorkflowInTest(
14941501
14951502 @Test
14961503 fun for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output () {
1497- if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS )) {
1504+ if (CONFLATE_STALE_RENDERINGS in runtimeConfig &&
1505+ WORK_STEALING_DISPATCHER !in runtimeConfig
1506+ ) {
14981507 runTest(dispatcherUsed) {
14991508 check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS ))
15001509
@@ -1738,6 +1747,126 @@ class RenderWorkflowInTest(
17381747 }
17391748 }
17401749
1750+ /* *
1751+ * When the [CONFLATE_STALE_RENDERINGS] flag is specified, the runtime will repeatedly run all
1752+ * enqueued WorkflowActions after a render pass, before emitting the rendering to the external
1753+ * flow. When the [WORK_STEALING_DISPATCHER] flag is specified at the same time, any coroutines
1754+ * launched (or even resumed) since the render pass will be allowed to run _before_ checking for
1755+ * actions. This means that any new side effects or workers started by the render pass will be
1756+ * allowed to run to their first suspension point before the rendering is emitted. And if they
1757+ * happen to emit more actions as part of that, then those actions will also be processed, etc.
1758+ * until no more actions are available – only then will the rendering actually be emitted.
1759+ */
1760+ @Test
1761+ fun new_effect_coroutines_dispatched_before_rendering_emitted_when_work_stealing_dispatcher () {
1762+ // This tests is specifically for standard dispatching behavior. It currently only works when
1763+ // CSR is enabled, although an additional test for DEA should be added.
1764+ if (WORK_STEALING_DISPATCHER !in runtimeConfig ||
1765+ CONFLATE_STALE_RENDERINGS !in runtimeConfig ||
1766+ useUnconfined
1767+ ) {
1768+ return
1769+ }
1770+
1771+ runTest(dispatcherUsed) {
1772+ val workflow = Workflow .stateful<Int , Nothing , Unit >(initialState = 0 ) { effectCount ->
1773+ // Because of the WSD, this effect will be allowed to run after the render pass but before
1774+ // emitting the rendering OR checking for new actions, in the CSR loop. Since it emits an
1775+ // action, that action will be processed and trigger a second render pass.
1776+ runningSideEffect(" sender" ) {
1777+ actionSink.send(
1778+ action(" 0" ) {
1779+ expect(2 )
1780+ this .state++
1781+ }
1782+ )
1783+ }
1784+
1785+ if (effectCount >= 1 ) {
1786+ // This effect will be started by the first action and cancelled only when the runtime
1787+ // is cancelled.
1788+ // It will also start in the CSR loop, and trigger a third render pass before emitting the
1789+ // rendering.
1790+ runningSideEffect(" 0" ) {
1791+ expect(3 )
1792+ actionSink.send(
1793+ action(" 1" ) {
1794+ expect(4 )
1795+ this .state++
1796+ }
1797+ )
1798+ awaitCancellation {
1799+ expect(9 )
1800+ }
1801+ }
1802+ }
1803+
1804+ if (effectCount >= 2 ) {
1805+ // This effect will be started by the second action, and cancelled by its own action in
1806+ // the same run of the CSR loop again.
1807+ runningSideEffect(" 1" ) {
1808+ expect(5 )
1809+ actionSink.send(
1810+ action(" -1" ) {
1811+ expect(6 )
1812+ this .state--
1813+ }
1814+ )
1815+ awaitCancellation {
1816+ expect(7 )
1817+ }
1818+ }
1819+ }
1820+ }
1821+
1822+ // We collect the renderings flow to a channel to drive the runtime loop by receiving from the
1823+ // channel. We can't use testScheduler.advanceUntilIdle() et al because we only want the test
1824+ // scheduler to run tasks until a rendering is available, not indefinitely.
1825+ val renderings = renderWorkflowIn(
1826+ workflow = workflow,
1827+ // Run in this scope so it is advanced by advanceUntilIdle.
1828+ scope = backgroundScope,
1829+ props = MutableStateFlow (Unit ),
1830+ runtimeConfig = runtimeConfig,
1831+ workflowTracer = testTracer,
1832+ onOutput = {}
1833+ ).produceIn(backgroundScope + Dispatchers .Unconfined )
1834+
1835+ expect(0 )
1836+ // Receiving the first rendering allows the runtime coroutine to start. The first rendering
1837+ // is returned synchronously.
1838+ renderings.receive()
1839+ expect(1 )
1840+ // Receiving the second rendering will allow the runtime to continue until the rendering is
1841+ // emitted. Since the CSR loop will start all our effects before emitting the next rendering,
1842+ // only one rendering will be emitted for all those render passes.
1843+ renderings.receive()
1844+ expect(8 )
1845+
1846+ // No more renderings should be produced.
1847+ testScheduler.advanceUntilIdle()
1848+ assertTrue(renderings.isEmpty)
1849+
1850+ // Cancel the whole workflow runtime, including all effects.
1851+ backgroundScope.coroutineContext.job.cancelAndJoin()
1852+ expect(10 )
1853+ }
1854+ }
1855+
1856+ private suspend fun awaitCancellation (onFinally : () -> Unit ) {
1857+ try {
1858+ awaitCancellation()
1859+ } finally {
1860+ onFinally()
1861+ }
1862+ }
1863+
1864+ private var expectCounter = 0
1865+ private fun expect (expected : Int ) {
1866+ assertEquals(expected, expectCounter)
1867+ expectCounter++
1868+ }
1869+
17411870 private class ExpectedException : RuntimeException ()
17421871
17431872 companion object {
0 commit comments