Skip to content

Commit f843ffa

Browse files
authored
Merge branch 'main' into plugins_to_core_debug
2 parents fafdada + 5d1630d commit f843ffa

25 files changed

+547
-203
lines changed

pyproject.toml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "temporalio"
3-
version = "1.18.1"
3+
version = "1.19.0"
44
description = "Temporal.io Python SDK"
55
authors = [{ name = "Temporal Technologies Inc", email = "[email protected]" }]
66
requires-python = ">=3.10"
@@ -9,7 +9,7 @@ license = "MIT"
99
license-files = ["LICENSE"]
1010
keywords = ["temporal", "workflow"]
1111
dependencies = [
12-
"nexus-rpc==1.1.0",
12+
"nexus-rpc==1.2.0",
1313
"protobuf>=3.20,<7.0.0",
1414
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
1515
"types-protobuf>=3.20",
@@ -28,10 +28,7 @@ classifiers = [
2828
grpc = ["grpcio>=1.48.2,<2"]
2929
opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
3030
pydantic = ["pydantic>=2.0.0,<3"]
31-
openai-agents = [
32-
"openai-agents>=0.3,<0.5",
33-
"mcp>=1.9.4, <2",
34-
]
31+
openai-agents = ["openai-agents>=0.3,<0.5", "mcp>=1.9.4, <2"]
3532

3633
[project.urls]
3734
Homepage = "https://github.com/temporalio/sdk-python"

scripts/gen_bridge_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def generate_rust_service_call(service_descriptor: ServiceDescriptor) -> str:
171171
py: Python<'p>,
172172
call: RpcCall,
173173
) -> PyResult<Bound<'p, PyAny>> {
174+
self.runtime.assert_same_process("use client")?;
174175
use temporal_client::${descriptor_name};
175176
let mut retry_client = self.retry_client.clone();
176177
self.runtime.future_into_py(py, async move {

temporalio/bridge/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ pub fn connect_client<'a>(
9393
config: ClientConfig,
9494
) -> PyResult<Bound<'a, PyAny>> {
9595
let opts: ClientOptions = config.try_into()?;
96+
runtime_ref.runtime.assert_same_process("create client")?;
9697
let runtime = runtime_ref.runtime.clone();
9798
runtime_ref.runtime.future_into_py(py, async move {
9899
Ok(ClientRef {

temporalio/bridge/src/client_rpc_generated.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ impl ClientRef {
1515
py: Python<'p>,
1616
call: RpcCall,
1717
) -> PyResult<Bound<'p, PyAny>> {
18+
self.runtime.assert_same_process("use client")?;
1819
use temporalio_client::WorkflowService;
1920
let mut retry_client = self.retry_client.clone();
2021
self.runtime.future_into_py(py, async move {
@@ -566,6 +567,7 @@ impl ClientRef {
566567
py: Python<'p>,
567568
call: RpcCall,
568569
) -> PyResult<Bound<'p, PyAny>> {
570+
self.runtime.assert_same_process("use client")?;
569571
use temporalio_client::OperatorService;
570572
let mut retry_client = self.retry_client.clone();
571573
self.runtime.future_into_py(py, async move {
@@ -628,6 +630,7 @@ impl ClientRef {
628630
}
629631

630632
fn call_cloud_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult<Bound<'p, PyAny>> {
633+
self.runtime.assert_same_process("use client")?;
631634
use temporalio_client::CloudService;
632635
let mut retry_client = self.retry_client.clone();
633636
self.runtime.future_into_py(py, async move {
@@ -842,6 +845,7 @@ impl ClientRef {
842845
}
843846

844847
fn call_test_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult<Bound<'p, PyAny>> {
848+
self.runtime.assert_same_process("use client")?;
845849
use temporalio_client::TestService;
846850
let mut retry_client = self.retry_client.clone();
847851
self.runtime.future_into_py(py, async move {
@@ -881,6 +885,7 @@ impl ClientRef {
881885
}
882886

883887
fn call_health_service<'p>(&self, py: Python<'p>, call: RpcCall) -> PyResult<Bound<'p, PyAny>> {
888+
self.runtime.assert_same_process("use client")?;
884889
use temporalio_client::HealthService;
885890
let mut retry_client = self.retry_client.clone();
886891
self.runtime.future_into_py(py, async move {

temporalio/bridge/src/runtime.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use futures::channel::mpsc::Receiver;
2-
use pyo3::exceptions::{PyRuntimeError, PyValueError};
2+
use pyo3::exceptions::{PyAssertionError, PyRuntimeError, PyValueError};
33
use pyo3::prelude::*;
44
use pythonize::pythonize;
55
use std::collections::HashMap;
@@ -33,6 +33,7 @@ pub struct RuntimeRef {
3333

3434
#[derive(Clone)]
3535
pub(crate) struct Runtime {
36+
pub(crate) pid: u32,
3637
pub(crate) core: Arc<CoreRuntime>,
3738
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
3839
log_forwarder_handle: Option<Arc<JoinHandle<()>>>,
@@ -189,6 +190,7 @@ pub fn init_runtime(options: RuntimeOptions) -> PyResult<RuntimeRef> {
189190

190191
Ok(RuntimeRef {
191192
runtime: Runtime {
193+
pid: std::process::id(),
192194
core: Arc::new(core),
193195
metrics_call_buffer,
194196
log_forwarder_handle,
@@ -213,6 +215,18 @@ impl Runtime {
213215
let _guard = self.core.tokio_handle().enter();
214216
pyo3_async_runtimes::generic::future_into_py::<TokioRuntime, _, T>(py, fut)
215217
}
218+
219+
pub(crate) fn assert_same_process(&self, action: &'static str) -> PyResult<()> {
220+
let current_pid = std::process::id();
221+
if self.pid != current_pid {
222+
Err(PyAssertionError::new_err(format!(
223+
"Cannot {} across forks (original runtime PID is {}, current is {})",
224+
action, self.pid, current_pid,
225+
)))
226+
} else {
227+
Ok(())
228+
}
229+
}
216230
}
217231

218232
impl Drop for Runtime {

temporalio/bridge/src/worker.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ pub fn new_worker(
493493
config: WorkerConfig,
494494
) -> PyResult<WorkerRef> {
495495
enter_sync!(runtime_ref.runtime);
496+
runtime_ref.runtime.assert_same_process("create worker")?;
496497
let event_loop_task_locals = Arc::new(OnceLock::new());
497498
let config = convert_worker_config(config, event_loop_task_locals.clone())?;
498499
let worker = temporalio_sdk_core::init_worker(
@@ -514,6 +515,9 @@ pub fn new_replay_worker<'a>(
514515
config: WorkerConfig,
515516
) -> PyResult<Bound<'a, PyTuple>> {
516517
enter_sync!(runtime_ref.runtime);
518+
runtime_ref
519+
.runtime
520+
.assert_same_process("create replay worker")?;
517521
let event_loop_task_locals = Arc::new(OnceLock::new());
518522
let config = convert_worker_config(config, event_loop_task_locals.clone())?;
519523
let (history_pusher, stream) = HistoryPusher::new(runtime_ref.runtime.clone());
@@ -538,6 +542,7 @@ pub fn new_replay_worker<'a>(
538542
#[pymethods]
539543
impl WorkerRef {
540544
fn validate<'p>(&self, py: Python<'p>) -> PyResult<Bound<PyAny, 'p>> {
545+
self.runtime.assert_same_process("use worker")?;
541546
let worker = self.worker.as_ref().unwrap().clone();
542547
// Set custom slot supplier task locals so they can run futures.
543548
// Event loop is assumed to be running at this point.
@@ -557,6 +562,7 @@ impl WorkerRef {
557562
}
558563

559564
fn poll_workflow_activation<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
565+
self.runtime.assert_same_process("use worker")?;
560566
let worker = self.worker.as_ref().unwrap().clone();
561567
self.runtime.future_into_py(py, async move {
562568
let bytes = match worker.poll_workflow_activation().await {
@@ -569,6 +575,7 @@ impl WorkerRef {
569575
}
570576

571577
fn poll_activity_task<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
578+
self.runtime.assert_same_process("use worker")?;
572579
let worker = self.worker.as_ref().unwrap().clone();
573580
self.runtime.future_into_py(py, async move {
574581
let bytes = match worker.poll_activity_task().await {
@@ -581,6 +588,7 @@ impl WorkerRef {
581588
}
582589

583590
fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
591+
self.runtime.assert_same_process("use worker")?;
584592
let worker = self.worker.as_ref().unwrap().clone();
585593
self.runtime.future_into_py(py, async move {
586594
let bytes = match worker.poll_nexus_task().await {

temporalio/nexus/_decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ async def _start(
117117
return WorkflowRunOperationHandler(_start)
118118

119119
method_name = get_callable_name(start)
120-
nexusrpc.set_operation_definition(
120+
nexusrpc.set_operation(
121121
operation_handler_factory,
122122
nexusrpc.Operation(
123123
name=name or method_name,

temporalio/nexus/_operation_handlers.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,10 @@
1010
HandlerError,
1111
HandlerErrorType,
1212
InputT,
13-
OperationInfo,
1413
OutputT,
1514
)
1615
from nexusrpc.handler import (
1716
CancelOperationContext,
18-
FetchOperationInfoContext,
19-
FetchOperationResultContext,
2017
OperationHandler,
2118
StartOperationContext,
2219
StartOperationResultAsync,
@@ -81,22 +78,6 @@ async def cancel(self, ctx: CancelOperationContext, token: str) -> None:
8178
"""Cancel the operation, by cancelling the workflow."""
8279
await _cancel_workflow(token)
8380

84-
async def fetch_info(
85-
self, ctx: FetchOperationInfoContext, token: str
86-
) -> OperationInfo:
87-
"""Fetch operation info (not supported for Temporal Nexus operations)."""
88-
raise NotImplementedError(
89-
"Temporal Nexus operation handlers do not support fetching operation info."
90-
)
91-
92-
async def fetch_result(
93-
self, ctx: FetchOperationResultContext, token: str
94-
) -> OutputT:
95-
"""Fetch operation result (not supported for Temporal Nexus operations)."""
96-
raise NotImplementedError(
97-
"Temporal Nexus operation handlers do not support fetching the operation result."
98-
)
99-
10081

10182
async def _cancel_workflow(
10283
token: str,

temporalio/nexus/_util.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,12 @@ def get_operation_factory(
129129
130130
``obj`` should be a decorated operation start method.
131131
"""
132-
op_defn = nexusrpc.get_operation_definition(obj)
132+
op_defn = nexusrpc.get_operation(obj)
133133
if op_defn:
134134
factory = obj
135135
else:
136136
if factory := getattr(obj, "__nexus_operation_factory__", None):
137-
op_defn = nexusrpc.get_operation_definition(factory)
137+
op_defn = nexusrpc.get_operation(factory)
138138
if not isinstance(op_defn, nexusrpc.Operation):
139139
return None, None
140140
return factory, op_defn

temporalio/runtime.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,33 +24,82 @@
2424
import temporalio.bridge.runtime
2525
import temporalio.common
2626

27-
_default_runtime: Optional[Runtime] = None
27+
28+
class _RuntimeRef:
29+
def __init__(
30+
self,
31+
) -> None:
32+
self._default_runtime: Runtime | None = None
33+
self._prevent_default = False
34+
35+
def default(self) -> Runtime:
36+
if not self._default_runtime:
37+
if self._prevent_default:
38+
raise RuntimeError(
39+
"Cannot create default Runtime after Runtime.prevent_default has been called"
40+
)
41+
self._default_runtime = Runtime(telemetry=TelemetryConfig())
42+
self._default_created = True
43+
return self._default_runtime
44+
45+
def prevent_default(self):
46+
if self._default_runtime:
47+
raise RuntimeError(
48+
"Runtime.prevent_default called after default runtime has been created or set"
49+
)
50+
self._prevent_default = True
51+
52+
def set_default(
53+
self, runtime: Runtime, *, error_if_already_set: bool = True
54+
) -> None:
55+
if self._default_runtime and error_if_already_set:
56+
raise RuntimeError("Runtime default already set")
57+
58+
self._default_runtime = runtime
59+
60+
61+
_runtime_ref: _RuntimeRef = _RuntimeRef()
2862

2963

3064
class Runtime:
3165
"""Runtime for Temporal Python SDK.
3266
33-
Users are encouraged to use :py:meth:`default`. It can be set with
67+
Most users are encouraged to use :py:meth:`default`. It can be set with
3468
:py:meth:`set_default`. Every time a new runtime is created, a new internal
3569
thread pool is created.
3670
37-
Runtimes do not work across forks.
71+
Runtimes do not work across forks. Advanced users should consider using
72+
:py:meth:`prevent_default` and `:py:meth`set_default` to ensure each
73+
fork creates it's own runtime.
74+
3875
"""
3976

4077
@classmethod
4178
def default(cls) -> Runtime:
42-
"""Get the default runtime, creating if not already created.
79+
"""Get the default runtime, creating if not already created. If :py:meth:`prevent_default`
80+
is called before this method it will raise a RuntimeError instead of creating a default
81+
runtime.
4382
4483
If the default runtime needs to be different, it should be done with
4584
:py:meth:`set_default` before this is called or ever used.
4685
4786
Returns:
4887
The default runtime.
4988
"""
50-
global _default_runtime
51-
if not _default_runtime:
52-
_default_runtime = cls(telemetry=TelemetryConfig())
53-
return _default_runtime
89+
global _runtime_ref
90+
return _runtime_ref.default()
91+
92+
@classmethod
93+
def prevent_default(cls):
94+
"""Prevent :py:meth:`default` from lazily creating a :py:class:`Runtime`.
95+
96+
Raises a RuntimeError if a default :py:class:`Runtime` has already been created.
97+
98+
Explicitly setting a default runtime with :py:meth:`set_default` bypasses this setting and
99+
future calls to :py:meth:`default` will return the provided runtime.
100+
"""
101+
global _runtime_ref
102+
_runtime_ref.prevent_default()
54103

55104
@staticmethod
56105
def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None:
@@ -65,10 +114,8 @@ def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None:
65114
error_if_already_set: If True and default is already set, this will
66115
raise a RuntimeError.
67116
"""
68-
global _default_runtime
69-
if _default_runtime and error_if_already_set:
70-
raise RuntimeError("Runtime default already set")
71-
_default_runtime = runtime
117+
global _runtime_ref
118+
_runtime_ref.set_default(runtime, error_if_already_set=error_if_already_set)
72119

73120
def __init__(
74121
self,

0 commit comments

Comments
 (0)