Skip to content

Commit dc8a3d1

Browse files
committed
Added retry policy to activity info
1 parent 7f228d8 commit dc8a3d1

File tree

5 files changed

+23
-6
lines changed

5 files changed

+23
-6
lines changed

temporalio/activity.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class Info:
124124
workflow_run_id: str
125125
workflow_type: str
126126
priority: temporalio.common.Priority
127+
retry_policy: Optional[temporalio.common.RetryPolicy]
127128
# TODO(cretz): Consider putting identity on here for "worker_id" for logger?
128129

129130
def _logger_details(self) -> Mapping[str, Any]:

temporalio/testing/_activity.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
workflow_run_id="test-run",
4242
workflow_type="test",
4343
priority=temporalio.common.Priority.default,
44+
retry_policy=None,
4445
)
4546

4647

temporalio/worker/_activity.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,9 @@ async def _execute_activity(
540540
workflow_run_id=start.workflow_execution.run_id,
541541
workflow_type=start.workflow_type,
542542
priority=temporalio.common.Priority._from_proto(start.priority),
543+
retry_policy=temporalio.common.RetryPolicy.from_proto(start.retry_policy)
544+
if start.HasField("retry_policy")
545+
else None,
543546
)
544547

545548
if self._encode_headers and self._data_converter.payload_codec is not None:

tests/helpers/worker.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,9 @@ def signal_handler(arg: Optional[Any] = None) -> None:
140140
opt = action.execute_activity
141141
config = workflow.ActivityConfig(
142142
task_queue=opt.task_queue,
143-
retry_policy=RetryPolicy(
144-
initial_interval=timedelta(milliseconds=1),
145-
backoff_coefficient=1.01,
146-
maximum_interval=timedelta(milliseconds=2),
147-
maximum_attempts=opt.retry_max_attempts or 1,
148-
non_retryable_error_types=opt.non_retryable_error_types or [],
143+
retry_policy=kitchen_sink_retry_policy(
144+
maximum_attempts=opt.retry_max_attempts,
145+
non_retryable_error_types=opt.non_retryable_error_types,
149146
),
150147
)
151148
if opt.schedule_to_close_timeout_ms:
@@ -207,6 +204,19 @@ async def run_activity(index: int) -> None:
207204
return False, None
208205

209206

207+
def kitchen_sink_retry_policy(
208+
maximum_attempts: Optional[int] = None,
209+
non_retryable_error_types: Optional[Sequence[str]] = None,
210+
) -> RetryPolicy:
211+
return RetryPolicy(
212+
initial_interval=timedelta(milliseconds=1),
213+
backoff_coefficient=1.01,
214+
maximum_interval=timedelta(milliseconds=2),
215+
maximum_attempts=maximum_attempts or 1,
216+
non_retryable_error_types=non_retryable_error_types,
217+
)
218+
219+
210220
async def cancel_after(task: asyncio.Task, after: float) -> None:
211221
await asyncio.sleep(after)
212222
task.cancel()

tests/worker/test_activity.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
KSAction,
4545
KSExecuteActivityAction,
4646
KSWorkflowParams,
47+
kitchen_sink_retry_policy,
4748
)
4849

4950
# Passing through because Python 3.9 has an import bug at
@@ -183,6 +184,7 @@ async def capture_info() -> None:
183184
assert info.workflow_namespace == client.namespace
184185
assert info.workflow_run_id == result.handle.first_execution_run_id
185186
assert info.workflow_type == "kitchen_sink"
187+
assert info.retry_policy == kitchen_sink_retry_policy()
186188

187189

188190
async def test_sync_activity_thread(client: Client, worker: ExternalWorker):

0 commit comments

Comments
 (0)