Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 119 additions & 108 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1509,32 +1509,34 @@ authentication, modifying connection parameters, or adding custom behavior durin
Here's an example of a client plugin that adds custom authentication:

```python
from temporalio.client import Plugin, ClientConfig
from temporalio.client import LowLevelPlugin, ClientConfig
import temporalio.service

class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: Plugin) -> None:
self.next_client_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

class AuthenticationPlugin(LowLevelPlugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: LowLevelPlugin) -> None:
self.next_client_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)


# Use the plugin when connecting
client = await Client.connect(
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
)
```

Expand All @@ -1551,53 +1553,59 @@ Here's an example of a worker plugin that adds custom monitoring:
import temporalio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
from temporalio.worker import LowLevelPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
import logging

class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: Plugin) -> None:
self.next_worker_plugin = next

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)

@asynccontextmanager
async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")

class MonitoringPlugin(LowLevelPlugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: LowLevelPlugin) -> None:
self.next_worker_plugin = next

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")


def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)


@asynccontextmanager


async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")


# Use the plugin when creating a worker
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
)
```

Expand All @@ -1607,60 +1615,63 @@ For plugins that need to work with both clients and workers, you can implement b
import temporalio
from contextlib import AbstractAsyncContextManager
from typing import AsyncIterator
from temporalio.client import Plugin as ClientPlugin, ClientConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
from temporalio.client import LowLevelPlugin as ClientPlugin, ClientConfig
from temporalio.worker import LowLevelPlugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer,

WorkflowReplayResult


class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def init_client_plugin(self, next: ClientPlugin) -> None:
self.next_client_plugin = next

def init_worker_plugin(self, next: WorkerPlugin) -> None:
self.next_worker_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return config

async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.run_replayer(replayer, histories)

def init_client_plugin(self, next: ClientPlugin) -> None:
self.next_client_plugin = next

def init_worker_plugin(self, next: WorkerPlugin) -> None:
self.next_worker_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return config

async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.run_replayer(replayer, histories)


# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[UnifiedPlugin()]
"localhost:7233",
plugins=[UnifiedPlugin()]
)

# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
```

Expand Down
18 changes: 9 additions & 9 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async def connect(
namespace: str = "default",
api_key: Optional[str] = None,
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
plugins: Sequence[LowLevelPlugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand Down Expand Up @@ -190,7 +190,7 @@ async def connect(
http_connect_proxy_config=http_connect_proxy_config,
)

root_plugin: Plugin = _RootPlugin()
root_plugin: LowLevelPlugin = _RootPlugin()
for plugin in reversed(plugins):
plugin.init_client_plugin(root_plugin)
root_plugin = plugin
Expand All @@ -213,7 +213,7 @@ def __init__(
*,
namespace: str = "default",
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
plugins: Sequence[Plugin] = [],
plugins: Sequence[LowLevelPlugin] = [],
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand All @@ -235,7 +235,7 @@ def __init__(
plugins=plugins,
)

root_plugin: Plugin = _RootPlugin()
root_plugin: LowLevelPlugin = _RootPlugin()
for plugin in reversed(plugins):
plugin.init_client_plugin(root_plugin)
root_plugin = plugin
Expand Down Expand Up @@ -1540,7 +1540,7 @@ class ClientConfig(TypedDict, total=False):
Optional[temporalio.common.QueryRejectCondition]
]
header_codec_behavior: Required[HeaderCodecBehavior]
plugins: Required[Sequence[Plugin]]
plugins: Required[Sequence[LowLevelPlugin]]


class WorkflowHistoryEventFilterType(IntEnum):
Expand Down Expand Up @@ -7367,7 +7367,7 @@ async def _decode_user_metadata(
)


class Plugin(abc.ABC):
class LowLevelPlugin(abc.ABC):
"""Base class for client plugins that can intercept and modify client behavior.

Plugins allow customization of client creation and service connection processes
Expand All @@ -7387,7 +7387,7 @@ def name(self) -> str:
return type(self).__module__ + "." + type(self).__qualname__

@abstractmethod
def init_client_plugin(self, next: Plugin) -> None:
def init_client_plugin(self, next: LowLevelPlugin) -> None:
"""Initialize this plugin in the plugin chain.

This method sets up the chain of responsibility pattern by providing a reference
Expand Down Expand Up @@ -7433,8 +7433,8 @@ async def connect_service_client(
"""


class _RootPlugin(Plugin):
def init_client_plugin(self, next: Plugin) -> None:
class _RootPlugin(LowLevelPlugin):
def init_client_plugin(self, next: LowLevelPlugin) -> None:
raise NotImplementedError()

def configure_client(self, config: ClientConfig) -> ClientConfig:
Expand Down
Loading
Loading