Skip to content

Commit a4fd8f5

Browse files
committed
Add coverage for dynamic handlers
1 parent d769b75 commit a4fd8f5

File tree

1 file changed

+177
-30
lines changed

1 file changed

+177
-30
lines changed

tests/worker/test_workflow.py

Lines changed: 177 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5417,9 +5417,45 @@ async def run(
54175417
"continue-as-new",
54185418
"fail-post-continue-as-new-run",
54195419
],
5420-
wait_all_handlers_finished: bool,
5420+
handler_registration: Literal["late_registered", "not_late_registered"],
5421+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5422+
handler_waiting: Literal[
5423+
"wait_all_handlers_finish", "no_wait_all_handlers_finish"
5424+
],
54215425
) -> NoReturn:
5422-
if wait_all_handlers_finished:
5426+
if handler_registration == "late_registered":
5427+
if handler_dynamism == "dynamic":
5428+
5429+
async def my_late_registered_dynamic_update(
5430+
self, name: str, args: Sequence[RawValue]
5431+
) -> str:
5432+
await workflow.wait_condition(lambda: self.handlers_may_finish)
5433+
return "my-late-registered-dynamic-update-result"
5434+
5435+
async def my_late_registered_dynamic_signal(
5436+
self, name: str, args: Sequence[RawValue]
5437+
) -> None:
5438+
await workflow.wait_condition(lambda: self.handlers_may_finish)
5439+
5440+
workflow.set_dynamic_update_handler(my_late_registered_dynamic_update)
5441+
workflow.set_dynamic_signal_handler(my_late_registered_dynamic_signal)
5442+
else:
5443+
5444+
async def my_late_registered_update(self) -> str:
5445+
await workflow.wait_condition(lambda: self.handlers_may_finish)
5446+
return "my-late-registered-update-result"
5447+
5448+
async def my_late_registered_signal(self) -> None:
5449+
await workflow.wait_condition(lambda: self.handlers_may_finish)
5450+
5451+
workflow.set_update_handler(
5452+
"my_late_registered_update", my_late_registered_update
5453+
)
5454+
workflow.set_signal_handler(
5455+
"my_late_registered_signal", my_late_registered_signal
5456+
)
5457+
5458+
if handler_waiting == "wait_all_handlers_finish":
54235459
self.handlers_may_finish = True
54245460
await workflow.wait_condition(workflow.all_handlers_finished)
54255461
if workflow_termination_type == "failure":
@@ -5431,24 +5467,50 @@ async def run(
54315467
elif workflow_termination_type == "continue-as-new":
54325468
# Fail next run so that test terminates
54335469
workflow.continue_as_new(
5434-
args=["fail-post-continue-as-new-run", wait_all_handlers_finished]
5470+
args=[
5471+
"fail-post-continue-as-new-run",
5472+
handler_registration,
5473+
handler_dynamism,
5474+
handler_waiting,
5475+
]
54355476
)
54365477
else:
54375478
await workflow.wait_condition(lambda: False)
54385479
raise AssertionError("unreachable")
54395480

54405481
@workflow.update
5441-
async def my_update(self) -> NoReturn:
5482+
async def my_update(self) -> str:
54425483
await workflow.wait_condition(lambda: self.handlers_may_finish)
5484+
return "update-result"
54435485

54445486
@workflow.signal
5445-
async def my_signal(self) -> NoReturn:
5487+
async def my_signal(self) -> None:
54465488
await workflow.wait_condition(lambda: self.handlers_may_finish)
54475489

5490+
@workflow.update(dynamic=True)
5491+
async def my_dynamic_update(self, name: str, args: Sequence[RawValue]) -> str:
5492+
await workflow.wait_condition(lambda: self.handlers_may_finish)
5493+
return "my-dynamic-update-result"
54485494

5449-
@pytest.mark.parametrize("wait_all_handlers_finished", [True, False])
5495+
@workflow.signal(dynamic=True)
5496+
async def my_dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
5497+
await workflow.wait_condition(lambda: self.handlers_may_finish)
5498+
5499+
5500+
@pytest.mark.parametrize(
5501+
"handler_registration", ["late_registered", "not_late_registered"]
5502+
)
5503+
@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"])
5504+
@pytest.mark.parametrize(
5505+
"handler_waiting",
5506+
["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
5507+
)
54505508
async def test_unfinished_update_handler_with_workflow_cancellation(
5451-
client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool
5509+
client: Client,
5510+
env: WorkflowEnvironment,
5511+
handler_registration: Literal["late_registered", "not_late_registered"],
5512+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5513+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
54525514
):
54535515
if env.supports_time_skipping:
54545516
pytest.skip(
@@ -5458,25 +5520,50 @@ async def test_unfinished_update_handler_with_workflow_cancellation(
54585520
client,
54595521
"update",
54605522
"cancellation",
5461-
wait_all_handlers_finished,
5523+
handler_registration,
5524+
handler_dynamism,
5525+
handler_waiting,
54625526
).test_warning_is_issued_on_exit_with_unfinished_handler()
54635527

54645528

5465-
@pytest.mark.parametrize("wait_all_handlers_finished", [True, False])
5529+
@pytest.mark.parametrize(
5530+
"handler_registration", ["late_registered", "not_late_registered"]
5531+
)
5532+
@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"])
5533+
@pytest.mark.parametrize(
5534+
"handler_waiting",
5535+
["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
5536+
)
54665537
async def test_unfinished_signal_handler_with_workflow_cancellation(
5467-
client: Client, wait_all_handlers_finished: bool
5538+
client: Client,
5539+
handler_registration: Literal["late_registered", "not_late_registered"],
5540+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5541+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
54685542
):
54695543
await _UnfinishedHandlersOnWorkflowTerminationTest(
54705544
client,
54715545
"signal",
54725546
"cancellation",
5473-
wait_all_handlers_finished,
5547+
handler_registration,
5548+
handler_dynamism,
5549+
handler_waiting,
54745550
).test_warning_is_issued_on_exit_with_unfinished_handler()
54755551

54765552

5477-
@pytest.mark.parametrize("wait_all_handlers_finished", [True, False])
5553+
@pytest.mark.parametrize(
5554+
"handler_registration", ["late_registered", "not_late_registered"]
5555+
)
5556+
@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"])
5557+
@pytest.mark.parametrize(
5558+
"handler_waiting",
5559+
["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
5560+
)
54785561
async def test_unfinished_update_handler_with_workflow_failure(
5479-
client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool
5562+
client: Client,
5563+
env: WorkflowEnvironment,
5564+
handler_registration: Literal["late_registered", "not_late_registered"],
5565+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5566+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
54805567
):
54815568
if env.supports_time_skipping:
54825569
pytest.skip(
@@ -5486,13 +5573,26 @@ async def test_unfinished_update_handler_with_workflow_failure(
54865573
client,
54875574
"update",
54885575
"failure",
5489-
wait_all_handlers_finished,
5576+
handler_registration,
5577+
handler_dynamism,
5578+
handler_waiting,
54905579
).test_warning_is_issued_on_exit_with_unfinished_handler()
54915580

54925581

5493-
@pytest.mark.parametrize("wait_all_handlers_finished", [True, False])
5582+
@pytest.mark.parametrize(
5583+
"handler_registration", ["late_registered", "not_late_registered"]
5584+
)
5585+
@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"])
5586+
@pytest.mark.parametrize(
5587+
"handler_waiting",
5588+
["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
5589+
)
54945590
async def test_unfinished_signal_handler_with_workflow_failure(
5495-
client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool
5591+
client: Client,
5592+
env: WorkflowEnvironment,
5593+
handler_registration: Literal["late_registered", "not_late_registered"],
5594+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5595+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
54965596
):
54975597
if env.supports_time_skipping:
54985598
pytest.skip(
@@ -5502,13 +5602,26 @@ async def test_unfinished_signal_handler_with_workflow_failure(
55025602
client,
55035603
"signal",
55045604
"failure",
5505-
wait_all_handlers_finished,
5605+
handler_registration,
5606+
handler_dynamism,
5607+
handler_waiting,
55065608
).test_warning_is_issued_on_exit_with_unfinished_handler()
55075609

55085610

5509-
@pytest.mark.parametrize("wait_all_handlers_finished", [True, False])
5611+
@pytest.mark.parametrize(
5612+
"handler_registration", ["late_registered", "not_late_registered"]
5613+
)
5614+
@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"])
5615+
@pytest.mark.parametrize(
5616+
"handler_waiting",
5617+
["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
5618+
)
55105619
async def test_unfinished_update_handler_with_continue_as_new(
5511-
client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool
5620+
client: Client,
5621+
env: WorkflowEnvironment,
5622+
handler_registration: Literal["late_registered", "not_late_registered"],
5623+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5624+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
55125625
):
55135626
if env.supports_time_skipping:
55145627
pytest.skip(
@@ -5518,13 +5631,26 @@ async def test_unfinished_update_handler_with_continue_as_new(
55185631
client,
55195632
"update",
55205633
"continue-as-new",
5521-
wait_all_handlers_finished,
5634+
handler_registration,
5635+
handler_dynamism,
5636+
handler_waiting,
55225637
).test_warning_is_issued_on_exit_with_unfinished_handler()
55235638

55245639

5525-
@pytest.mark.parametrize("wait_all_handlers_finished", [True, False])
5640+
@pytest.mark.parametrize(
5641+
"handler_registration", ["late_registered", "not_late_registered"]
5642+
)
5643+
@pytest.mark.parametrize("handler_dynamism", ["dynamic", "not_dynamic"])
5644+
@pytest.mark.parametrize(
5645+
"handler_waiting",
5646+
["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
5647+
)
55265648
async def test_unfinished_signal_handler_with_continue_as_new(
5527-
client: Client, env: WorkflowEnvironment, wait_all_handlers_finished: bool
5649+
client: Client,
5650+
env: WorkflowEnvironment,
5651+
handler_registration: Literal["late_registered", "not_late_registered"],
5652+
handler_dynamism: Literal["dynamic", "not_dynamic"],
5653+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"],
55285654
):
55295655
if env.supports_time_skipping:
55305656
pytest.skip(
@@ -5534,7 +5660,9 @@ async def test_unfinished_signal_handler_with_continue_as_new(
55345660
client,
55355661
"signal",
55365662
"continue-as-new",
5537-
wait_all_handlers_finished,
5663+
handler_registration,
5664+
handler_dynamism,
5665+
handler_waiting,
55385666
).test_warning_is_issued_on_exit_with_unfinished_handler()
55395667

55405668

@@ -5543,13 +5671,15 @@ class _UnfinishedHandlersOnWorkflowTerminationTest:
55435671
client: Client
55445672
handler_type: Literal["update", "signal"]
55455673
workflow_termination_type: Literal["cancellation", "failure", "continue-as-new"]
5546-
wait_all_handlers_finished: bool
5674+
handler_registration: Literal["late_registered", "not_late_registered"]
5675+
handler_dynamism: Literal["dynamic", "not_dynamic"]
5676+
handler_waiting: Literal["wait_all_handlers_finish", "no_wait_all_handlers_finish"]
55475677

55485678
async def test_warning_is_issued_on_exit_with_unfinished_handler(
55495679
self,
55505680
):
55515681
assert await self._run_workflow_and_get_warning() == (
5552-
not self.wait_all_handlers_finished
5682+
self.handler_waiting == "no_wait_all_handlers_finish"
55535683
)
55545684

55555685
async def _run_workflow_and_get_warning(self) -> bool:
@@ -5562,17 +5692,29 @@ async def _run_workflow_and_get_warning(self) -> bool:
55625692
# server.
55635693
handle = await self.client.start_workflow(
55645694
UnfinishedHandlersOnWorkflowTerminationWorkflow.run,
5565-
args=[self.workflow_termination_type, self.wait_all_handlers_finished],
5695+
args=[
5696+
self.workflow_termination_type,
5697+
self.handler_registration,
5698+
self.handler_dynamism,
5699+
self.handler_waiting,
5700+
],
55665701
id=workflow_id,
55675702
task_queue=task_queue,
55685703
)
55695704
if self.workflow_termination_type == "cancellation":
55705705
await handle.cancel()
55715706

55725707
if self.handler_type == "update":
5708+
update_method = (
5709+
"__does_not_exist__"
5710+
if self.handler_dynamism == "dynamic"
5711+
else "my_late_registered_update"
5712+
if self.handler_registration == "late_registered"
5713+
else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update
5714+
)
55735715
update_task = asyncio.create_task(
55745716
handle.execute_update(
5575-
UnfinishedHandlersOnWorkflowTerminationWorkflow.my_update,
5717+
update_method,
55765718
id=update_id,
55775719
)
55785720
)
@@ -5581,9 +5723,14 @@ async def _run_workflow_and_get_warning(self) -> bool:
55815723
lambda: workflow_update_exists(self.client, workflow_id, update_id),
55825724
)
55835725
else:
5584-
await handle.signal(
5585-
UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal
5726+
signal_method = (
5727+
"__does_not_exist__"
5728+
if self.handler_dynamism == "dynamic"
5729+
else "my_late_registered_signal"
5730+
if self.handler_registration == "late_registered"
5731+
else UnfinishedHandlersOnWorkflowTerminationWorkflow.my_signal
55865732
)
5733+
await handle.signal(signal_method)
55875734

55885735
async with new_worker(
55895736
self.client,
@@ -5593,7 +5740,7 @@ async def _run_workflow_and_get_warning(self) -> bool:
55935740
with pytest.WarningsRecorder() as warnings:
55945741
if self.handler_type == "update":
55955742
assert update_task
5596-
if self.wait_all_handlers_finished:
5743+
if self.handler_waiting == "wait_all_handlers_finish":
55975744
await update_task
55985745
else:
55995746
with pytest.raises(RPCError) as update_err:

0 commit comments

Comments
 (0)