Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ The task decorator accepts a few arguments to customize the task:
- `priority`: The priority of the task (between -100 and 100. Larger numbers are higher priority. 0 by default)
- `queue_name`: Whether to run the task on a specific queue
- `backend`: Name of the backend for this task to use (as defined in `TASKS`)
- `enqueue_on_commit`: Whether the task is enqueued when the current transaction commits successfully, or enqueued immediately. By default, this is handled by the backend (see below). `enqueue_on_commit` may not be modified with `.using`.

These attributes (besides `enqueue_on_commit`) can also be modified at run-time with `.using`:

```python
modified_task = calculate_meaning_of_life.using(priority=10)
Expand Down Expand Up @@ -108,23 +105,6 @@ The returned `TaskResult` can be interrogated to query the current state of the

If the task takes arguments, these can be passed as-is to `enqueue`.

#### Transactions

By default, tasks are enqueued after the current transaction (if there is one) commits successfully (using Django's `transaction.on_commit` method), rather than enqueueing immediately.

This can be configured using the `ENQUEUE_ON_COMMIT` setting. `True` and `False` force the behaviour.

```python
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.immediate.ImmediateBackend",
"ENQUEUE_ON_COMMIT": False
}
}
```

This can also be configured per-task by passing `enqueue_on_commit` to the `task` decorator.

### Queue names

By default, tasks are enqueued onto the "default" queue. When using multiple queues, it can be useful to constrain the allowed names, so tasks aren't missed.
Expand Down
32 changes: 1 addition & 31 deletions django_tasks/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core import checks
from django.db import connections
from django.utils import timezone
from django.utils.inspect import get_func_args
from typing_extensions import ParamSpec
Expand All @@ -27,7 +26,6 @@

class BaseTaskBackend(metaclass=ABCMeta):
alias: str
enqueue_on_commit: bool

task_class = Task

Expand All @@ -48,21 +46,8 @@ def __init__(self, alias: str, params: dict) -> None:

self.alias = alias
self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME]))
self.enqueue_on_commit = bool(params.get("ENQUEUE_ON_COMMIT", True))
self.options = params.get("OPTIONS", {})

def _get_enqueue_on_commit_for_task(self, task: Task) -> bool:
"""
Determine the correct `enqueue_on_commit` setting to use for a given task.
"""

# If the task defines it, use that, otherwise, fall back to the backend.
return (
task.enqueue_on_commit
if task.enqueue_on_commit is not None
else self.enqueue_on_commit
)

def validate_task(self, task: Task) -> None:
"""
Determine whether the provided Task can be executed by the backend.
Expand Down Expand Up @@ -151,19 +136,4 @@ async def aget_result(self, result_id: str) -> TaskResult:
)

def check(self, **kwargs: Any) -> Iterable[checks.CheckMessage]:
if self.enqueue_on_commit and not connections._settings: # type: ignore[attr-defined]
yield checks.Error(
"`ENQUEUE_ON_COMMIT` cannot be used when no databases are configured",
hint="Set ENQUEUE_ON_COMMIT to False",
id="tasks.E001",
)

if (
self.enqueue_on_commit
and not connections["default"].features.supports_transactions
):
yield checks.Error(
"ENQUEUE_ON_COMMIT cannot be used on a database which doesn't support transactions",
hint="Set ENQUEUE_ON_COMMIT to False",
id="tasks.E002",
)
return []
11 changes: 2 additions & 9 deletions django_tasks/backends/database/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from django.apps import apps
from django.core import checks
from django.core.exceptions import ValidationError
from django.db import transaction
from django.utils.version import PY311
from typing_extensions import ParamSpec

Expand Down Expand Up @@ -61,14 +60,8 @@ def enqueue(

db_result = self._task_to_db_task(task, args, kwargs)

def save_result() -> None:
db_result.save()
task_enqueued.send(type(self), task_result=db_result.task_result)

if self._get_enqueue_on_commit_for_task(task):
transaction.on_commit(save_result)
else:
save_result()
db_result.save()
task_enqueued.send(type(self), task_result=db_result.task_result)

return db_result.task_result

Expand Down
16 changes: 4 additions & 12 deletions django_tasks/backends/dummy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from copy import deepcopy
from functools import partial
from typing import TypeVar

from django.db import transaction
from django.utils import timezone
from typing_extensions import ParamSpec

Expand All @@ -28,11 +26,6 @@ def __init__(self, alias: str, params: dict) -> None:

self.results = []

def _store_result(self, result: TaskResult) -> None:
object.__setattr__(result, "enqueued_at", timezone.now())
self.results.append(result)
task_enqueued.send(type(self), task_result=result)

def enqueue(
self,
task: Task[P, T],
Expand All @@ -45,7 +38,7 @@ def enqueue(
task=task,
id=get_random_id(),
status=TaskResultStatus.READY,
enqueued_at=None,
enqueued_at=timezone.now(),
started_at=None,
last_attempted_at=None,
finished_at=None,
Expand All @@ -56,10 +49,9 @@ def enqueue(
worker_ids=[],
)

if self._get_enqueue_on_commit_for_task(task) is not False:
transaction.on_commit(partial(self._store_result, result))
else:
self._store_result(result)
self.results.append(result)

task_enqueued.send(type(self), task_result=result)

# Copy the task to prevent mutation issues
return deepcopy(result)
Expand Down
7 changes: 1 addition & 6 deletions django_tasks/backends/immediate.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
from functools import partial
from typing import TypeVar

from django.db import transaction
from django.utils import timezone
from typing_extensions import ParamSpec

Expand Down Expand Up @@ -110,9 +108,6 @@ def enqueue(
worker_ids=[],
)

if self._get_enqueue_on_commit_for_task(task) is not False:
transaction.on_commit(partial(self._execute_task, task_result))
else:
self._execute_task(task_result)
self._execute_task(task_result)

return task_result
22 changes: 6 additions & 16 deletions django_tasks/backends/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from django.apps import apps
from django.core.checks import messages
from django.core.exceptions import SuspiciousOperation
from django.db import transaction
from django.utils.functional import cached_property
from redis.client import Redis
from rq.defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD
Expand Down Expand Up @@ -237,22 +236,13 @@ def enqueue(
meta={"backend_name": self.alias},
)

def save_result() -> None:
nonlocal job
if task.run_after is None:
job = queue.enqueue_job(
job, at_front=task.priority == TASK_MAX_PRIORITY
)
else:
job = queue.schedule_job(job, task.run_after)

object.__setattr__(task_result, "enqueued_at", job.enqueued_at)
task_enqueued.send(type(self), task_result=task_result)

if self._get_enqueue_on_commit_for_task(task):
transaction.on_commit(save_result)
if task.run_after is None:
job = queue.enqueue_job(job, at_front=task.priority == TASK_MAX_PRIORITY)
else:
save_result()
job = queue.schedule_job(job, task.run_after)

object.__setattr__(task_result, "enqueued_at", job.enqueued_at)
task_enqueued.send(type(self), task_result=task_result)

return task_result

Expand Down
10 changes: 0 additions & 10 deletions django_tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ class Task(Generic[P, T]):
run_after: datetime | None
"""The earliest this Task will run"""

enqueue_on_commit: bool | None
"""
Whether the Task will be enqueued when the current transaction commits,
immediately, or whatever the backend decides.
"""

takes_context: bool
"""
Whether the Task receives the Task context when executed.
Expand Down Expand Up @@ -199,7 +193,6 @@ def task(
priority: int = TASK_DEFAULT_PRIORITY,
queue_name: str = DEFAULT_TASK_QUEUE_NAME,
backend: str = DEFAULT_TASK_BACKEND_ALIAS,
enqueue_on_commit: bool | None = None,
takes_context: Literal[False] = False,
) -> Callable[[Callable[P, T]], Task[P, T]]: ...

Expand All @@ -212,7 +205,6 @@ def task(
priority: int = TASK_DEFAULT_PRIORITY,
queue_name: str = DEFAULT_TASK_QUEUE_NAME,
backend: str = DEFAULT_TASK_BACKEND_ALIAS,
enqueue_on_commit: bool | None = None,
takes_context: Literal[True],
) -> Callable[[Callable[Concatenate["TaskContext", P], T]], Task[P, T]]: ...

Expand All @@ -224,7 +216,6 @@ def task( # type: ignore[misc]
priority: int = TASK_DEFAULT_PRIORITY,
queue_name: str = DEFAULT_TASK_QUEUE_NAME,
backend: str = DEFAULT_TASK_BACKEND_ALIAS,
enqueue_on_commit: bool | None = None,
takes_context: bool = False,
) -> (
Task[P, T]
Expand All @@ -242,7 +233,6 @@ def wrapper(f: Callable[P, T]) -> Task[P, T]:
func=f,
queue_name=queue_name,
backend=backend,
enqueue_on_commit=enqueue_on_commit,
takes_context=takes_context,
run_after=None,
)
Expand Down
10 changes: 0 additions & 10 deletions tests/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ def exit_task() -> None:
exit(1)


@task(enqueue_on_commit=True)
def enqueue_on_commit_task() -> None:
pass


@task(enqueue_on_commit=False)
def never_enqueue_on_commit_task() -> None:
pass


@task()
def hang() -> None:
"""
Expand Down
1 change: 0 additions & 1 deletion tests/tests/test_custom_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class CustomBackendNoEnqueue(BaseTaskBackend):
TASKS={
"default": {
"BACKEND": get_module_path(CustomBackend),
"ENQUEUE_ON_COMMIT": False,
"OPTIONS": {"prefix": "PREFIX: "},
},
"no_enqueue": {
Expand Down
67 changes: 0 additions & 67 deletions tests/tests/test_database_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,45 +322,10 @@ def test_priority_range_check(self) -> None:
TASKS={
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
"ENQUEUE_ON_COMMIT": True,
}
}
)
def test_wait_until_transaction_commit(self) -> None:
self.assertTrue(default_task_backend.enqueue_on_commit)
self.assertTrue(
default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task)
)

with transaction.atomic():
result = test_tasks.noop_task.enqueue()

self.assertIsNone(result.enqueued_at)

self.assertEqual(DBTaskResult.objects.count(), 0)
# SQLite locks the table during this transaction
if connection.vendor != "sqlite":
self.assertEqual(self.get_task_count_in_new_connection(), 0)

if connection.vendor != "sqlite":
self.assertEqual(self.get_task_count_in_new_connection(), 1)
result.refresh()
self.assertIsNotNone(result.enqueued_at)

@override_settings(
TASKS={
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
"ENQUEUE_ON_COMMIT": False,
}
}
)
def test_doesnt_wait_until_transaction_commit(self) -> None:
self.assertFalse(default_task_backend.enqueue_on_commit)
self.assertFalse(
default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task)
)

with transaction.atomic():
result = test_tasks.noop_task.enqueue()

Expand All @@ -375,36 +340,6 @@ def test_doesnt_wait_until_transaction_commit(self) -> None:
if connection.vendor != "sqlite":
self.assertEqual(self.get_task_count_in_new_connection(), 1)

@override_settings(
TASKS={
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
}
}
)
def test_wait_until_transaction_by_default(self) -> None:
self.assertTrue(default_task_backend.enqueue_on_commit)
self.assertTrue(
default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task)
)

@override_settings(
TASKS={
"default": {
"BACKEND": "django_tasks.backends.database.DatabaseBackend",
"ENQUEUE_ON_COMMIT": False,
}
}
)
def test_task_specific_enqueue_on_commit(self) -> None:
self.assertFalse(default_task_backend.enqueue_on_commit)
self.assertTrue(test_tasks.enqueue_on_commit_task.enqueue_on_commit)
self.assertTrue(
default_task_backend._get_enqueue_on_commit_for_task(
test_tasks.enqueue_on_commit_task
)
)

def test_enqueue_logs(self) -> None:
with self.assertLogs("django_tasks", level="DEBUG") as captured_logs:
result = test_tasks.noop_task.enqueue()
Expand Down Expand Up @@ -484,7 +419,6 @@ def test_validate_on_enqueue(self) -> None:
"default": {
"BACKEND": "django_tasks.backends.rq.RQBackend",
"QUEUES": ["unknown_queue"],
"ENQUEUE_ON_COMMIT": False,
}
}
):
Expand All @@ -503,7 +437,6 @@ async def test_validate_on_aenqueue(self) -> None:
"default": {
"BACKEND": "django_tasks.backends.rq.RQBackend",
"QUEUES": ["unknown_queue"],
"ENQUEUE_ON_COMMIT": False,
}
}
):
Expand Down
Loading