From e7efa62e09453c523d4eb310af1b83b09b2a5886 Mon Sep 17 00:00:00 2001 From: Adam Johnson Date: Thu, 6 Jun 2024 09:00:54 +0200 Subject: [PATCH] Tidy DEP 14 a bit co-authored-by: Peter Law --- accepted/0014-background-workers.rst | 322 +++++++++++++-------------- 1 file changed, 160 insertions(+), 162 deletions(-) diff --git a/accepted/0014-background-workers.rst b/accepted/0014-background-workers.rst index 5626beb8..82a3abf0 100644 --- a/accepted/0014-background-workers.rst +++ b/accepted/0014-background-workers.rst @@ -40,50 +40,49 @@ A backend will be a class which extends a Django-defined base class, and provide .. code:: python - from datetime import datetime - from typing import Callable, Dict, List - - from django.tasks import Task, TaskResult - from django.tasks.backends.base import BaseTaskBackend - - - class MyBackend(BaseTaskBackend): - task_class = Task - - def __init__(self, settings_dict: Dict): - """ - Any connections which need to be setup can be done here - """ - super().__init__(settings_dict) - - @classmethod - def validate_task(cls, task: Task) -> None: - """ - Determine whether the provided task is one which can be executed by the backend. - """ - ... - - def enqueue(self, task: Task, *args, **kwargs) -> TaskResult: - """ - Queue up a task to be executed - """ - ... - - def get_result(self, result_id: str) -> TaskResult: - """ - Retrieve a result by its id (if one exists). - If one doesn't, raises ResultDoesNotExist. - """ - ... - - def close(self) -> None: - """ - Close any connections opened as part of the constructor - """ - ... - - -``BaseTaskBackend`` will provide ``a``-prefixed stubs for ``enqueue`` and ``get_result`` using ``asgiref.sync_to_async``. + from datetime import datetime + from typing import Callable + + from django.tasks import Task, TaskResult + from django.tasks.backends.base import BaseTaskBackend + + + class MyBackend(BaseTaskBackend): + task_class = Task + + def __init__(self, settings_dict: dict[str, Any]) -> None: + """ + Any connections which need to be setup can be done here + """ + super().__init__(settings_dict) + + @classmethod + def validate_task(cls, task: Task) -> None: + """ + Determine whether the provided task is one which can be executed by the backend. + """ + ... + + def enqueue(self, task: Task, *args, **kwargs) -> TaskResult: + """ + Queue up a task to be executed + """ + ... + + def get_result(self, result_id: str) -> TaskResult: + """ + Retrieve a result by its id (if one exists). + If one doesn't, raises ResultDoesNotExist. + """ + ... + + def close(self) -> None: + """ + Close any connections opened as part of the constructor + """ + ... + +``BaseTaskBackend`` will provide asynchronous, ``a``-prefixed versions of ``enqueue`` and ``get_result`` using ``asgiref.sync_to_async``. ``validate_task`` determines whether the provided ``Task`` is valid for the backend. This can be used to prevent coroutines from being executed, or otherwise validate the callable. If the provided task is invalid, it will raise ``InvalidTaskError``. @@ -109,62 +108,62 @@ Backend implementors aren't required to implement their own ``Task``, but may fo .. code:: python - from datetime import datetime - from typing import Callable, Self + from datetime import datetime + from typing import Callable, Self - from django.tasks import Task, TaskResult + from django.tasks import Task, TaskResult - class MyBackendTask(Task): - priority: int | None - """The priority of the task""" + class MyBackendTask(Task): + priority: int | None + """The priority of the task""" - func: Callable - """The task function""" + func: Callable + """The task function""" - queue_name: str | None - """The name of the queue the task will run on """ + queue_name: str | None + """The name of the queue the task will run on """ - backend: str - """The name of the backend the task will run on""" + backend: str + """The name of the backend the task will run on""" - run_after: datetime | None - """The earliest this task will run""" + run_after: datetime | None + """The earliest this task will run""" - def using(self, priority: int | None = None, queue_name: str | None = None, run_after: datetime | timedelta | None = None) -> Self: - """ - Create a new task with modified defaults - """ - ... + def using(self, priority: int | None = None, queue_name: str | None = None, run_after: datetime | timedelta | None = None) -> Self: + """ + Create a new task with modified defaults + """ + ... - def enqueue(self, *args, **kwargs) -> TaskResult: - """ - Queue up the task to be executed - """ - ... + def enqueue(self, *args, **kwargs) -> TaskResult: + """ + Queue up the task to be executed + """ + ... - def get_result(self, result_id: str) -> Self: - """ - Retrieve a result for a task of this type by its id (if one exists). - If one doesn't, or is the wrong type, raises ResultDoesNotExist. - """ - ... + def get_result(self, result_id: str) -> Self: + """ + Retrieve a result for a task of this type by its id (if one exists). + If one doesn't, or is the wrong type, raises ResultDoesNotExist. + """ + ... A ``Task`` is created by decorating a function with ``@task``: .. code:: python - from django.tasks import task + from django.tasks import task - @task() - def do_a_task(*args, **kwargs): - pass + @task() + def do_a_task(*args, **kwargs): + pass -A ``Task`` can only be created for globally-importable callables. The task will be validated against the backend's ``validate_task`` during construction. +A ``Task`` can only be created for module-level callables, so that they can be re-imported in the task runner. The task will be validated against the backend's ``validate_task`` during construction. If a task doesn't define a backend, it is assumed it will only use the default backend. -``@task`` may be used on functions or coroutines. It will be up to the backend implementor to determine whether coroutines are supported. Support for coroutine tasks can be determined with the ``supports_coroutine_tasks`` method on the backend. In either case, the function must be globally importable. +``@task`` may be used on functions or coroutines. It will be up to the backend to determine whether coroutines are supported. Support for coroutine tasks can be determined with the ``supports_coroutine_tasks`` method on the backend. Task arguments must be JSON serializable, to avoid compatibility and versioning issues. Complex arguments should be converted to a format which is JSON-serializable. @@ -181,41 +180,40 @@ Backend implementors aren't required to implement their own ``TaskResult``, but .. code:: python - from datetime import datetime - from typing import Any, Callable - - from django.tasks import TaskResult, ResultStatus, Task + from datetime import datetime + from typing import Any, Callable - class MyBackendTaskResult(TaskResult): - task: Task - """The task for which this is a result""" + from django.tasks import TaskResult, ResultStatus, Task - id: str - """A unique identifier for the task result""" + class MyBackendTaskResult(TaskResult): + task: Task + """The task for which this is a result""" - status: ResultStatus - """The status of the running task""" + id: str + """A unique identifier for the task result""" - args: list - """The arguments to pass to the task function""" + status: ResultStatus + """The status of the running task""" - kwargs: dict - """The keyword arguments to pass to the task function""" + args: tuple[Any, ...] + """The arguments to pass to the task function""" - backend: str - """The name of the backend the task will run on""" + kwargs: dict[str, Any] + """The keyword arguments to pass to the task function""" - result: Any - """The return value from the task""" + backend: str + """The name of the backend the task will run on""" - def refresh(self) -> None: - """ - Reload the cached task data from the task store - """ - ... + result: Any + """The return value from the task""" + def refresh(self) -> None: + """ + Reload the cached task data from the task store + """ + ... -A ``TaskResult`` will cache its values, relying on the user calling ``refresh`` to reload the values from the task store. An ``async`` version of ``refresh`` is automatically provided by ``TaskResult`` using ``asgiref.sync_to_async``. +A ``TaskResult`` will cache its values, relying on the user calling ``refresh`` to reload the values from the task store. An asynchronous version of ``refresh`` is automatically provided by ``TaskResult`` using ``asgiref.sync_to_async``. A ``TaskResult``'s ``status`` must be one of the following values (as defined by an ``enum``): @@ -226,18 +224,18 @@ A ``TaskResult``'s ``status`` must be one of the following values (as defined by If a backend supports more than these statuses, it should compress them into one of these. -For convenience, calling a ``TaskResult`` will execute the task's function directly, which allows for graceful transitioning towards background tasks: +For convenience, calling a ``Task`` will execute the task's function directly, which allows for graceful transitioning towards background tasks: .. code:: python - from django.tasks import task + from django.tasks import task - @task() - def do_a_task(*args, **kwargs): - pass + @task() + def do_a_task(*args, **kwargs): + pass - # Calls `do_a_task` as if it weren't a task - do_a_task() + # Calls `do_a_task` as if it weren't a task + do_a_task() Queueing tasks -------------- @@ -246,45 +244,45 @@ Tasks can be queued using the ``enqueue`` method, which in turn calls ``enqueue` .. code:: python - from django.tasks import task + from django.tasks import task - @task(priority=1) - def do_a_task(*args, **kwargs): - pass + @task(priority=1) + def do_a_task(*args, **kwargs): + pass - # Submit the task function to be run - result = do_a_task.enqueue() + # Submit the task function to be run + result = do_a_task.enqueue() - # Optionally, provide arguments - result = do_a_task.enqueue(1, two="three") + # Optionally, provide arguments + result = do_a_task.enqueue(1, two="three") - # Override the priority defined by the `Task` - result = do_a_task.using(priority=10).enqueue() + # Override the priority defined by the `Task` + result = do_a_task.using(priority=10).enqueue() - # The modified task can be saved and reused - do_a_high_priority_task = do_a_task.using(priority=20) - for i in range(5): - do_a_high_priority_task.enqueue(i) + # The modified task can be saved and reused + do_a_high_priority_task = do_a_task.using(priority=20) + for i in range(5): + do_a_high_priority_task.enqueue(i) When multiple task backends are configured, each can be obtained from a global ``tasks`` connection handler. Whilst it's unlikely multiple backends will be configured for a single project, support is available. .. code:: python - from django.tasks import tasks, task + from django.tasks import tasks, task - @task() - def do_a_task(*args, **kwargs): - pass + @task() + def do_a_task(*args, **kwargs): + pass - # Submit the task function to be run - result = tasks["special"].enqueue(do_a_task) + # Submit the task function to be run + result = tasks["special"].enqueue(do_a_task) - # Optionally, provide arguments - result = tasks["special"].enqueue(do_a_task, 1, two="three") + # Optionally, provide arguments + result = tasks["special"].enqueue(do_a_task, 1, two="three") - # Alternatively - result = do_a_task.using(backend="special").enqueue(1, two="three") + # Alternatively + result = do_a_task.using(backend="special").enqueue(1, two="three") Whilst this API is available, it's best to call ``enqueue`` on the ``Task`` directly instead and configure the backend using the ``backend`` argument. @@ -297,14 +295,14 @@ Tasks may also be "deferred" to run at a specific time in the future, by passing .. code:: python - from django.utils import timezone - from datetime import timedelta + from django.utils import timezone + from datetime import timedelta - # Run the task at a specific time. - result = do_a_task.using(run_after=timezone.now() + timedelta(minutes=5)).enqueue() + # Run the task at a specific time. + result = do_a_task.using(run_after=timezone.now() + timedelta(minutes=5)).enqueue() - # Or, pass the `timedelta` directly. - result = do_a_task.using(run_after=timedelta(minutes=5)).enqueue() + # Or, pass the `timedelta` directly. + result = do_a_task.using(run_after=timedelta(minutes=5)).enqueue() ``run_after`` must be a ``timedelta`` or timezone-aware ``datetime``. @@ -317,43 +315,43 @@ One of the easiest and most common places that offloading work to the background Django will ship with an additional task-based SMTP email backend, configured identically to the existing SMTP backend. The other backends included with Django don't benefit from being moved to the background. -Async tasks ------------ +Asynchronous tasks +------------------ -Where the underlying task runner supports it, backends may also provide an ``async``-compatible interface for task queueing, using ``a``-prefixed methods: +Backends may also provide an asynchronous interface for task enqueueing, using ``a``-prefixed methods: .. code:: python - await do_a_task.aenqueue() - await do_a_task.using(priority=10).aenqueue() + await do_a_task.aenqueue() + await do_a_task.using(priority=10).aenqueue() -Similarly, a backend may support queueing an async task function: +Similarly, backends may support enqueueing coroutines: .. code:: python - from django.tasks import task + from django.tasks import task - @task() - async def do_an_async_task(): - pass + @task() + async def do_an_async_task(): + pass - await do_an_async_task.aenqueue() + await do_an_async_task.aenqueue() - # Also works - do_an_async_task.enqueue() + # Also works + do_an_async_task.enqueue() Settings --------- .. code:: python - TASKS = { - "default": { - "BACKEND": "django.tasks.backends.ImmediateBackend", - "QUEUES": [] - "OPTIONS": {} - } - } + TASKS = { + "default": { + "BACKEND": "django.tasks.backends.ImmediateBackend", + "QUEUES": [] + "OPTIONS": {} + } + } ``QUEUES`` contains a list of valid queue names for the backend. If a task is queued to a queue which doesn't exist, an exception is raised. If omitted or empty, any name is valid.