Skip to content

Commit d43fcd3

Browse files
committed
Update priority, add tests
1 parent eab6482 commit d43fcd3

File tree

3 files changed

+171
-4
lines changed

3 files changed

+171
-4
lines changed

django_tasks/backends/celery/backend.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
from dataclasses import dataclass
2-
from typing import TypeVar
2+
from typing import Any, Iterable, TypeVar
33

44
from celery import shared_task
55
from celery.app import default_app
66
from celery.local import Proxy as CeleryTaskProxy
7+
from django.apps import apps
8+
from django.core.checks import ERROR, CheckMessage
79
from django.utils import timezone
810
from typing_extensions import ParamSpec
911

1012
from django_tasks.backends.base import BaseTaskBackend
11-
from django_tasks.task import ResultStatus, TaskResult
13+
from django_tasks.task import MAX_PRIORITY, MIN_PRIORITY, ResultStatus, TaskResult
1214
from django_tasks.task import Task as BaseTask
1315
from django_tasks.utils import json_normalize
1416

@@ -22,6 +24,26 @@
2224
P = ParamSpec("P")
2325

2426

27+
CELERY_MIN_PRIORITY = 0
28+
CELERY_MAX_PRIORITY = 9
29+
30+
31+
def _map_priority(value: int) -> int:
32+
# linear scale value to the range 0 to 9
33+
scaled_value = (value + abs(MIN_PRIORITY)) / (
34+
(MAX_PRIORITY - MIN_PRIORITY) / (CELERY_MAX_PRIORITY - CELERY_MIN_PRIORITY)
35+
)
36+
mapped_value = int(scaled_value)
37+
38+
# ensure the mapped value is within the range 0 to 9
39+
if mapped_value < CELERY_MIN_PRIORITY:
40+
mapped_value = CELERY_MIN_PRIORITY
41+
elif mapped_value > CELERY_MAX_PRIORITY:
42+
mapped_value = CELERY_MAX_PRIORITY
43+
44+
return mapped_value
45+
46+
2547
@dataclass
2648
class Task(BaseTask[P, T]):
2749
celery_task: CeleryTaskProxy = None
@@ -50,8 +72,10 @@ def enqueue(
5072
}
5173
if task.queue_name:
5274
apply_async_kwargs["queue"] = task.queue_name
53-
if task.priority:
54-
apply_async_kwargs["priority"] = task.priority
75+
if task.priority is not None:
76+
# map priority to the range 0 to 9
77+
priority = _map_priority(task.priority)
78+
apply_async_kwargs["priority"] = priority
5579

5680
# TODO: a Celery result backend is required to get additional information
5781
async_result = task.celery_task.apply_async(
@@ -62,9 +86,20 @@ def enqueue(
6286
id=async_result.id,
6387
status=ResultStatus.NEW,
6488
enqueued_at=timezone.now(),
89+
started_at=None,
6590
finished_at=None,
6691
args=json_normalize(args),
6792
kwargs=json_normalize(kwargs),
6893
backend=self.alias,
6994
)
7095
return task_result
96+
97+
def check(self, **kwargs: Any) -> Iterable[CheckMessage]:
98+
backend_name = self.__class__.__name__
99+
100+
if not apps.is_installed("django_tasks.backends.celery"):
101+
yield CheckMessage(
102+
ERROR,
103+
f"{backend_name} configured as django_tasks backend, but celery app not installed",
104+
"Insert 'django_tasks.backends.celery' in INSTALLED_APPS",
105+
)

tests/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"django.contrib.sessions",
1919
"django.contrib.staticfiles",
2020
"django_tasks",
21+
"django_tasks.backends.celery",
2122
"django_tasks.backends.database",
2223
"tests",
2324
]

tests/tests/test_celery_backend.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
from datetime import timedelta
2+
from unittest.mock import patch
3+
4+
from celery import Celery
5+
from celery.result import AsyncResult
6+
from django.test import TestCase, override_settings
7+
from django.utils import timezone
8+
9+
from django_tasks import ResultStatus, default_task_backend, task, tasks
10+
from django_tasks.backends.celery import CeleryBackend
11+
from django_tasks.backends.celery.backend import _map_priority
12+
from django_tasks.task import DEFAULT_PRIORITY, DEFAULT_QUEUE_NAME
13+
14+
15+
def noop_task(*args: tuple, **kwargs: dict) -> None:
16+
return None
17+
18+
19+
@override_settings(
20+
TASKS={
21+
"default": {
22+
"BACKEND": "django_tasks.backends.celery.CeleryBackend",
23+
"QUEUES": [DEFAULT_QUEUE_NAME, "queue-1"],
24+
}
25+
}
26+
)
27+
class CeleryBackendTestCase(TestCase):
28+
def setUp(self) -> None:
29+
# register task during setup so it is registered as a Celery task
30+
self.task = task()(noop_task)
31+
32+
def test_using_correct_backend(self) -> None:
33+
self.assertEqual(default_task_backend, tasks["default"])
34+
self.assertIsInstance(tasks["default"], CeleryBackend)
35+
36+
def test_check(self) -> None:
37+
errors = list(default_task_backend.check())
38+
39+
self.assertEqual(len(errors), 0)
40+
41+
@override_settings(INSTALLED_APPS=[])
42+
def test_celery_backend_app_missing(self) -> None:
43+
errors = list(default_task_backend.check())
44+
45+
self.assertEqual(len(errors), 1)
46+
self.assertIn("django_tasks.backends.celery", errors[0].hint)
47+
48+
def test_enqueue_task(self) -> None:
49+
task = self.task
50+
assert task.celery_task # type: ignore[attr-defined]
51+
52+
# import here so that it is not set as default before registering the task
53+
from django_tasks.backends.celery.app import app as celery_app
54+
55+
self.assertEqual(task.celery_task.app, celery_app) # type: ignore[attr-defined]
56+
with patch("celery.app.task.Task.apply_async") as mock_apply_async:
57+
mock_apply_async.return_value = AsyncResult(id="123")
58+
result = default_task_backend.enqueue(task, (1,), {"two": 3})
59+
60+
self.assertEqual(result.id, "123")
61+
self.assertEqual(result.status, ResultStatus.NEW)
62+
self.assertIsNone(result.started_at)
63+
self.assertIsNone(result.finished_at)
64+
with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
65+
result.result # noqa:B018
66+
self.assertEqual(result.task, task)
67+
self.assertEqual(result.args, [1])
68+
self.assertEqual(result.kwargs, {"two": 3})
69+
expected_priority = _map_priority(DEFAULT_PRIORITY)
70+
mock_apply_async.assert_called_once_with(
71+
(1,),
72+
kwargs={"two": 3},
73+
eta=None,
74+
priority=expected_priority,
75+
queue=DEFAULT_QUEUE_NAME,
76+
)
77+
78+
def test_using_additional_params(self) -> None:
79+
with patch("celery.app.task.Task.apply_async") as mock_apply_async:
80+
mock_apply_async.return_value = AsyncResult(id="123")
81+
run_after = timezone.now() + timedelta(hours=10)
82+
result = self.task.using(
83+
run_after=run_after, priority=75, queue_name="queue-1"
84+
).enqueue()
85+
86+
self.assertEqual(result.id, "123")
87+
self.assertEqual(result.status, ResultStatus.NEW)
88+
mock_apply_async.assert_called_once_with(
89+
(), kwargs={}, eta=run_after, priority=7, queue="queue-1"
90+
)
91+
92+
def test_priority_mapping(self) -> None:
93+
for priority, expected in [(-100, 0), (-50, 2), (0, 4), (75, 7), (100, 9)]:
94+
with patch("celery.app.task.Task.apply_async") as mock_apply_async:
95+
mock_apply_async.return_value = AsyncResult(id="123")
96+
self.task.using(priority=priority).enqueue()
97+
98+
mock_apply_async.assert_called_with(
99+
(), kwargs={}, eta=None, priority=expected, queue=DEFAULT_QUEUE_NAME
100+
)
101+
102+
103+
@override_settings(
104+
TASKS={
105+
"default": {
106+
"BACKEND": "django_tasks.backends.celery.CeleryBackend",
107+
"QUEUES": [DEFAULT_QUEUE_NAME, "queue-1"],
108+
}
109+
}
110+
)
111+
class CeleryBackendCustomAppTestCase(TestCase):
112+
def setUp(self) -> None:
113+
self.celery_app = Celery("test_app")
114+
self.task = task()(noop_task)
115+
116+
def tearDown(self) -> None:
117+
# restore the default Celery app
118+
from django_tasks.backends.celery.app import app as celery_app
119+
120+
celery_app.set_current()
121+
return super().tearDown()
122+
123+
def test_enqueue_task(self) -> None:
124+
task = self.task
125+
assert task.celery_task # type: ignore[attr-defined]
126+
127+
from django_tasks.backends.celery.app import app as celery_app
128+
129+
self.assertNotEqual(celery_app, self.celery_app)
130+
# it should use the custom Celery app
131+
self.assertEqual(task.celery_task.app, self.celery_app) # type: ignore[attr-defined]

0 commit comments

Comments
 (0)