Skip to content

Commit bd5a404

Browse files
committed
Plumb worker task types to lang
1 parent 0c0b4b3 commit bd5a404

File tree

6 files changed

+47
-16
lines changed

6 files changed

+47
-16
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ pub struct WorkerConfig {
5454
workflow_task_poller_behavior: PollerBehavior,
5555
nonsticky_to_sticky_poll_ratio: f32,
5656
activity_task_poller_behavior: PollerBehavior,
57-
no_remote_activities: bool,
57+
task_types: WorkerTaskTypes,
5858
sticky_queue_schedule_to_start_timeout_millis: u64,
5959
max_heartbeat_throttle_interval_millis: u64,
6060
default_heartbeat_throttle_interval_millis: u64,
@@ -175,6 +175,23 @@ pub struct ResourceBasedSlotSupplier {
175175
tuner_config: ResourceBasedTunerConfig,
176176
}
177177

178+
#[derive(FromPyObject)]
179+
pub struct WorkerTaskTypes {
180+
enable_workflows: bool,
181+
enable_activities: bool,
182+
enable_nexus: bool,
183+
}
184+
185+
impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes {
186+
fn from(t: &WorkerTaskTypes) -> Self {
187+
Self {
188+
enable_workflows: t.enable_workflows,
189+
enable_activities: t.enable_activities,
190+
enable_nexus: t.enable_nexus,
191+
}
192+
}
193+
}
194+
178195
#[pyclass]
179196
pub struct SlotReserveCtx {
180197
#[pyo3(get)]
@@ -692,7 +709,7 @@ fn convert_worker_config(
692709
.tuner(Arc::new(converted_tuner))
693710
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
694711
.activity_task_poller_behavior(conf.activity_task_poller_behavior)
695-
.no_remote_activities(conf.no_remote_activities)
712+
.task_types(&conf.task_types)
696713
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
697714
conf.sticky_queue_schedule_to_start_timeout_millis,
698715
))

temporalio/bridge/worker.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class WorkerConfig:
5454
workflow_task_poller_behavior: PollerBehavior
5555
nonsticky_to_sticky_poll_ratio: float
5656
activity_task_poller_behavior: PollerBehavior
57-
no_remote_activities: bool
57+
task_types: WorkerTaskTypes
5858
sticky_queue_schedule_to_start_timeout_millis: int
5959
max_heartbeat_throttle_interval_millis: int
6060
default_heartbeat_throttle_interval_millis: int
@@ -170,6 +170,15 @@ class TunerHolder:
170170
nexus_slot_supplier: SlotSupplier
171171

172172

173+
@dataclass
174+
class WorkerTaskTypes:
175+
"""Python representation of the Rust struct for worker task types"""
176+
177+
enable_workflows: bool
178+
enable_activities: bool
179+
enable_nexus: bool
180+
181+
173182
class Worker:
174183
"""SDK Core worker."""
175184

temporalio/runtime.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,15 @@ def formatted(self) -> str:
140140
"""Return a formatted form of this filter."""
141141
# We intentionally aren't using __str__ or __format__ so they can keep
142142
# their original dataclass impls
143-
return f"{self.other_level},temporalio_sdk_core={self.core_level},temporalio_client={self.core_level},temporalio_sdk={self.core_level}"
143+
targets = [
144+
"temporal_sdk_core",
145+
"temporal_sdk_bridge",
146+
"temporal_client",
147+
"temporalio_sdk",
148+
]
149+
parts = [self.other_level]
150+
parts.extend(f"{target}={self.core_level}" for target in targets)
151+
return ",".join(parts)
144152

145153

146154
@dataclass(frozen=True)

temporalio/worker/_replayer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from dataclasses import dataclass
1010
from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type
1111

12+
from temporalio.bridge.worker import WorkerTaskTypes
1213
from typing_extensions import TypedDict
1314

1415
import temporalio.api.history.v1
@@ -273,7 +274,7 @@ def on_eviction_hook(
273274
),
274275
),
275276
nonsticky_to_sticky_poll_ratio=1,
276-
no_remote_activities=True,
277+
task_types=WorkerTaskTypes(True, False, False),
277278
sticky_queue_schedule_to_start_timeout_millis=1000,
278279
max_heartbeat_throttle_interval_millis=1000,
279280
default_heartbeat_throttle_interval_millis=1000,

temporalio/worker/_worker.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
cast,
2323
)
2424

25+
from temporalio.bridge.worker import WorkerTaskTypes
2526
from typing_extensions import TypeAlias, TypedDict
2627

2728
import temporalio.bridge.worker
@@ -124,7 +125,6 @@ def __init__(
124125
max_concurrent_workflow_task_polls: Optional[int] = None,
125126
nonsticky_to_sticky_poll_ratio: float = 0.2,
126127
max_concurrent_activity_task_polls: Optional[int] = None,
127-
no_remote_activities: bool = False,
128128
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
129129
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
130130
default_heartbeat_throttle_interval: timedelta = timedelta(seconds=30),
@@ -251,8 +251,6 @@ def __init__(
251251
If set, will override any value passed to ``activity_task_poller_behavior``.
252252
253253
WARNING: Deprecated, use ``activity_task_poller_behavior`` instead
254-
no_remote_activities: If true, this worker will only handle workflow
255-
tasks and local activities, it will not poll for activity tasks.
256254
sticky_queue_schedule_to_start_timeout: How long a workflow task is
257255
allowed to sit on the sticky queue before it is timed out and
258256
moved to the non-sticky queue where it may be picked up by any
@@ -346,7 +344,6 @@ def __init__(
346344
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls,
347345
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio,
348346
max_concurrent_activity_task_polls=max_concurrent_activity_task_polls,
349-
no_remote_activities=no_remote_activities,
350347
sticky_queue_schedule_to_start_timeout=sticky_queue_schedule_to_start_timeout,
351348
max_heartbeat_throttle_interval=max_heartbeat_throttle_interval,
352349
default_heartbeat_throttle_interval=default_heartbeat_throttle_interval,
@@ -576,11 +573,11 @@ def check_activity(activity):
576573
max_cached_workflows=config["max_cached_workflows"],
577574
tuner=bridge_tuner,
578575
nonsticky_to_sticky_poll_ratio=config["nonsticky_to_sticky_poll_ratio"],
579-
# We have to disable remote activities if a user asks _or_ if we
580-
# are not running an activity worker at all. Otherwise shutdown
581-
# will not proceed properly.
582-
no_remote_activities=config["no_remote_activities"]
583-
or not config["activities"],
576+
task_types=WorkerTaskTypes(
577+
enable_workflows=self._workflow_worker is not None,
578+
enable_activities=self._activity_worker is not None,
579+
enable_nexus=self._nexus_worker is not None,
580+
),
584581
sticky_queue_schedule_to_start_timeout_millis=int(
585582
1000
586583
* config["sticky_queue_schedule_to_start_timeout"].total_seconds()
@@ -890,7 +887,6 @@ class WorkerConfig(TypedDict, total=False):
890887
max_concurrent_workflow_task_polls: Optional[int]
891888
nonsticky_to_sticky_poll_ratio: float
892889
max_concurrent_activity_task_polls: Optional[int]
893-
no_remote_activities: bool
894890
sticky_queue_schedule_to_start_timeout: timedelta
895891
max_heartbeat_throttle_interval: timedelta
896892
default_heartbeat_throttle_interval: timedelta

tests/contrib/openai_agents/test_openai.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2615,7 +2615,7 @@ async def test_split_workers(client: Client):
26152615

26162616
# Workflow worker
26172617
async with new_worker(
2618-
workflow_client, HelloWorldAgent, no_remote_activities=True
2618+
workflow_client, HelloWorldAgent,
26192619
) as worker:
26202620
activity_plugin = openai_agents.OpenAIAgentsPlugin(
26212621
model_params=ModelActivityParameters(

0 commit comments

Comments
 (0)