Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ class Info:
workflow_run_id: str
workflow_type: str
priority: temporalio.common.Priority
retry_policy: Optional[temporalio.common.RetryPolicy]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a docstring here explaining that None means that it has a server-defined default retry policy (does not mean it does not retry).

Although we don't yet have docstrings on activity.Info, we do on workflow.Info, so I think it makes sense to start adding them on activity.Info.

"""The retry policy of this activity.

Note that the server may have set a different policy than the one provided when scheduling the activity.
If the value is None, it means the server didn't send information about retry policy (e.g. due to old server
version), but it may still be defined server-side."""

# TODO(cretz): Consider putting identity on here for "worker_id" for logger?

def _logger_details(self) -> Mapping[str, Any]:
Expand Down
1 change: 1 addition & 0 deletions temporalio/testing/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
workflow_run_id="test-run",
workflow_type="test",
priority=temporalio.common.Priority.default,
retry_policy=None,
)


Expand Down
3 changes: 3 additions & 0 deletions temporalio/worker/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,9 @@ async def _execute_activity(
workflow_run_id=start.workflow_execution.run_id,
workflow_type=start.workflow_type,
priority=temporalio.common.Priority._from_proto(start.priority),
retry_policy=temporalio.common.RetryPolicy.from_proto(start.retry_policy)
if start.HasField("retry_policy")
else None,
)

if self._encode_headers and self._data_converter.payload_codec is not None:
Expand Down
22 changes: 16 additions & 6 deletions tests/helpers/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,9 @@ def signal_handler(arg: Optional[Any] = None) -> None:
opt = action.execute_activity
config = workflow.ActivityConfig(
task_queue=opt.task_queue,
retry_policy=RetryPolicy(
initial_interval=timedelta(milliseconds=1),
backoff_coefficient=1.01,
maximum_interval=timedelta(milliseconds=2),
maximum_attempts=opt.retry_max_attempts or 1,
non_retryable_error_types=opt.non_retryable_error_types or [],
retry_policy=kitchen_sink_retry_policy(
maximum_attempts=opt.retry_max_attempts,
non_retryable_error_types=opt.non_retryable_error_types,
),
)
if opt.schedule_to_close_timeout_ms:
Expand Down Expand Up @@ -207,6 +204,19 @@ async def run_activity(index: int) -> None:
return False, None


def kitchen_sink_retry_policy(
maximum_attempts: Optional[int] = None,
non_retryable_error_types: Optional[Sequence[str]] = None,
) -> RetryPolicy:
return RetryPolicy(
initial_interval=timedelta(milliseconds=1),
backoff_coefficient=1.01,
maximum_interval=timedelta(milliseconds=2),
maximum_attempts=maximum_attempts or 1,
non_retryable_error_types=non_retryable_error_types,
)


async def cancel_after(task: asyncio.Task, after: float) -> None:
await asyncio.sleep(after)
task.cancel()
Expand Down
2 changes: 2 additions & 0 deletions tests/worker/test_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
KSAction,
KSExecuteActivityAction,
KSWorkflowParams,
kitchen_sink_retry_policy,
)

# Passing through because Python 3.9 has an import bug at
Expand Down Expand Up @@ -186,6 +187,7 @@ async def capture_info() -> None:
assert info.workflow_namespace == client.namespace
assert info.workflow_run_id == result.handle.first_execution_run_id
assert info.workflow_type == "kitchen_sink"
assert info.retry_policy == kitchen_sink_retry_policy()


async def test_sync_activity_thread(client: Client, worker: ExternalWorker):
Expand Down