-
Notifications
You must be signed in to change notification settings - Fork 141
Update Core, configure worker types, send plugin names to Core #1157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| ) | ||
| .nexus_task_poller_behavior(conf.nexus_task_poller_behavior) | ||
| .plugins( | ||
| conf.plugins |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, something that will need some discussion here. This PR reports worker plugins. We should discuss whether that is really what we intend. Plugins which exist only in the client and not the worker will be completely invisible. That could potentially be changed here at least for plugins in clients used by workers, though not generally for any client. I think that was something we didn't really think through when we decided to go with heartbeat as a carrier for this information, but maybe we conclude that is fine.
Typescript will be a bit more complicated as well with its additional plugin types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, we will for now have both worker and client plugins and dedup names
Sushisource
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good to me, only thing is the default interval
Sushisource
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
…untime client identity requirement
c276572 to
e83a260
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Runtime Comparison Needs Proper Instance Resolution
The runtime comparison uses identity (is not) but doesn't properly handle when bridge_client.config.runtime is None. When the original client uses the default runtime (via None), self._runtime stores the actual default runtime object, but if a new client also has runtime=None in its config, the comparison self._runtime is not None incorrectly raises an error even though both clients use the same default runtime. The comparison should resolve both sides to actual runtime instances before comparing.
temporalio/worker/_worker.py#L648-L652
sdk-python/temporalio/worker/_worker.py
Lines 648 to 652 in bd5a404
| bridge_client = _extract_bridge_client_for_worker(value) | |
| if self._runtime is not bridge_client.config.runtime: | |
| raise ValueError( | |
| "New client is not on the same runtime as the existing client" | |
| ) |
| py: Python<'p>, | ||
| call: RpcCall, | ||
| ) -> PyResult<Bound<'p, PyAny>> { | ||
| use temporal_client::WorkflowService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a code generated file, you need to change the generator.
f56d819 to
5862a1e
Compare
| "temporalio_sdk", | ||
| ] | ||
| parts = [self.other_level] | ||
| parts.extend(f"{target}={self.core_level}" for target in targets) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Outdated Telemetry Target Breaks Rust Log Filtering
The TelemetryFilter.formatted() method includes "temporalio_sdk" as a target, but this doesn't match any actual Rust crate name. The crates were renamed from temporal_sdk_core, temporal_client, etc. to temporalio_sdk_core, temporalio_client, and temporalio_common. The target "temporalio_sdk" will never match any log output from the Rust code, preventing those logs from being filtered at the configured level. This should likely be "temporalio_sdk_core" (which is already in the list) or removed entirely.
e9565ab to
6ef144a
Compare
| ) | ||
| .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; | ||
| let mut core = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default()) | ||
| .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {err}")))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pedantic, but would consider changing this error message to "Failed initializing runtime"
| nonsticky_to_sticky_poll_ratio: f32, | ||
| activity_task_poller_behavior: PollerBehavior, | ||
| no_remote_activities: bool, | ||
| task_types: WorkerTaskTypes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure it deserves a whole new single-use Python type here vs just three booleans inlined, but not a big deal
| @property | ||
| def plugins(self) -> Sequence[Plugin]: | ||
| """Plugins used by this client.""" | ||
| return self._config["plugins"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this needs to be exposed as a user-facing property. It only makes sense for some configs IMO (really obvious ones or runtime mutable ones).
| heartbeat_millis = int(worker_heartbeat_interval.total_seconds() * 1000) | ||
| if heartbeat_millis == 0: | ||
| heartbeat_millis = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand this a bit? Is there some case of it trunc'ing to 0 ms? What is our minimum ms we accept? Can we error if lower than that?
| if heartbeat_millis == 0: | ||
| heartbeat_millis = 1 | ||
|
|
||
| self._heartbeat_millis = heartbeat_millis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this stored on the class?
| import pytest | ||
| import pytest_asyncio | ||
|
|
||
| import temporalio.worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary?
|
|
||
| # Workflow worker | ||
| async with new_worker( | ||
| workflow_client, HelloWorldAgent, no_remote_activities=True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mentioned before, but no_remote_activities is an important concept to retain
| ] | ||
|
|
||
|
|
||
| async def test_worker_plugin_names_forwarded_to_core( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this test is needed, at least not in this manner that monkey patches things. If Core does not have a way to tell you what has been passed to it, then it is not assertable IMO. Same goes for every other option we pass to Core, this option is not unique.
| await assert_eventually(check_metrics) | ||
|
|
||
|
|
||
| def test_runtime_options_to_bridge_config() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't test that anything about bridge config, it's testing some internal property of a class. We usually prefer behavior testing. If Core does not have a way for you to test that the heartbeat interval is applied properly, then you can't truly test that it works. How would a user test that their heartbeat setting works? To set an internal property only so you can assert it was multiplied by 1000 is a overkill that we don't usually do.
| # Terminate both | ||
| await handle1.terminate() | ||
| await handle2.terminate() | ||
| async def test_workflow_replace_worker_client(client: Client): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide some background why the previous test was removed and replaced with this? We want to test that replacing a client works, not that it raises when a runtime is different (if we want the latter, that's a different test).
What was changed
Send plugin names over to core for worker heartbeating.
Updated Core to latest main, 9e9a461.
Updated test to validate replacing clients with a client from a different runtime is invalid.
Why?
Worker heartbeating
Checklist
Closes [Feature Request] Enable Worker Heartbeating #1196
How was this tested:
Added tests for plugin name propogation, and runtime options configuration
Note
Forward worker plugin names to Core and add runtime options (incl. worker heartbeat), migrate to new core/client crates, and sync protobuf/RPC APIs with new endpoints.
RuntimeOptions(incl.worker_heartbeat_interval) and changeinit_runtimeto accept options.pluginsto Core for heartbeating; addWorkerTaskTypes.replace_clientrequires same runtime; replayer/worker config updated.temporal_client/temporal-sdk-core*totemporalio_client/temporalio_*crates; bumpprost/tonicand related deps.AuditLogSinkSpec,SetServiceAccountNamespaceAccess,ValidateAccountAuditLogSink.DescribeWorker,SetWorkerDeploymentManager; extend start/system/namespace/deployment messages.Written by Cursor Bugbot for commit 715fe8d. This will update automatically on new commits. Configure here.