Skip to content

Commit 522835b

Browse files
committed
Test that checks that AttemptError can round trip properly
Follows up #27 to add a more elaborate test that checks that non-default job properties can unmarshal to `Job` property including `errors, which includes `AttemptError`. It turned out that of course this wasn't working properly.
1 parent 44ed566 commit 522835b

File tree

6 files changed

+246
-23
lines changed

6 files changed

+246
-23
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- `riverqueue.AttemptError` can now round trip to and from JSON properly, including its `at` timestamp. [PR #31](https://github.com/riverqueue/riverqueue-python/pull/31).
13+
1014
## [0.6.0] - 2024-07-06
1115

1216
### Added

src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,59 @@ class JobInsertFastManyParams:
134134
tags: List[str]
135135

136136

137+
JOB_INSERT_FULL = """-- name: job_insert_full \\:one
138+
INSERT INTO river_job(
139+
args,
140+
attempt,
141+
attempted_at,
142+
created_at,
143+
errors,
144+
finalized_at,
145+
kind,
146+
max_attempts,
147+
metadata,
148+
priority,
149+
queue,
150+
scheduled_at,
151+
state,
152+
tags
153+
) VALUES (
154+
:p1\\:\\:jsonb,
155+
coalesce(:p2\\:\\:smallint, 0),
156+
:p3,
157+
coalesce(:p4\\:\\:timestamptz, now()),
158+
:p5\\:\\:jsonb[],
159+
:p6,
160+
:p7\\:\\:text,
161+
:p8\\:\\:smallint,
162+
coalesce(:p9\\:\\:jsonb, '{}'),
163+
:p10\\:\\:smallint,
164+
:p11\\:\\:text,
165+
coalesce(:p12\\:\\:timestamptz, now()),
166+
:p13\\:\\:river_job_state,
167+
coalesce(:p14\\:\\:varchar(255)[], '{}')
168+
) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
169+
"""
170+
171+
172+
@dataclasses.dataclass()
173+
class JobInsertFullParams:
174+
args: Any
175+
attempt: int
176+
attempted_at: Optional[datetime.datetime]
177+
created_at: Optional[datetime.datetime]
178+
errors: List[Any]
179+
finalized_at: Optional[datetime.datetime]
180+
kind: str
181+
max_attempts: int
182+
metadata: Any
183+
priority: int
184+
queue: str
185+
scheduled_at: Optional[datetime.datetime]
186+
state: models.RiverJobState
187+
tags: List[str]
188+
189+
137190
class Querier:
138191
def __init__(self, conn: sqlalchemy.engine.Connection):
139192
self._conn = conn
@@ -266,6 +319,44 @@ def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int:
266319
})
267320
return result.rowcount
268321

322+
def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]:
323+
row = self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), {
324+
"p1": arg.args,
325+
"p2": arg.attempt,
326+
"p3": arg.attempted_at,
327+
"p4": arg.created_at,
328+
"p5": arg.errors,
329+
"p6": arg.finalized_at,
330+
"p7": arg.kind,
331+
"p8": arg.max_attempts,
332+
"p9": arg.metadata,
333+
"p10": arg.priority,
334+
"p11": arg.queue,
335+
"p12": arg.scheduled_at,
336+
"p13": arg.state,
337+
"p14": arg.tags,
338+
}).first()
339+
if row is None:
340+
return None
341+
return models.RiverJob(
342+
id=row[0],
343+
args=row[1],
344+
attempt=row[2],
345+
attempted_at=row[3],
346+
attempted_by=row[4],
347+
created_at=row[5],
348+
errors=row[6],
349+
finalized_at=row[7],
350+
kind=row[8],
351+
max_attempts=row[9],
352+
metadata=row[10],
353+
priority=row[11],
354+
queue=row[12],
355+
state=row[13],
356+
scheduled_at=row[14],
357+
tags=row[15],
358+
)
359+
269360

270361
class AsyncQuerier:
271362
def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection):
@@ -398,3 +489,41 @@ async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int:
398489
"p9": arg.tags,
399490
})
400491
return result.rowcount
492+
493+
async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]:
494+
row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), {
495+
"p1": arg.args,
496+
"p2": arg.attempt,
497+
"p3": arg.attempted_at,
498+
"p4": arg.created_at,
499+
"p5": arg.errors,
500+
"p6": arg.finalized_at,
501+
"p7": arg.kind,
502+
"p8": arg.max_attempts,
503+
"p9": arg.metadata,
504+
"p10": arg.priority,
505+
"p11": arg.queue,
506+
"p12": arg.scheduled_at,
507+
"p13": arg.state,
508+
"p14": arg.tags,
509+
})).first()
510+
if row is None:
511+
return None
512+
return models.RiverJob(
513+
id=row[0],
514+
args=row[1],
515+
attempt=row[2],
516+
attempted_at=row[3],
517+
attempted_by=row[4],
518+
created_at=row[5],
519+
errors=row[6],
520+
finalized_at=row[7],
521+
kind=row[8],
522+
max_attempts=row[9],
523+
metadata=row[10],
524+
priority=row[11],
525+
queue=row[12],
526+
state=row[13],
527+
scheduled_at=row[14],
528+
tags=row[15],
529+
)

src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,37 @@ INSERT INTO river_job(
103103

104104
-- Had trouble getting multi-dimensional arrays to play nicely with sqlc,
105105
-- but it might be possible. For now, join tags into a single string.
106-
string_to_array(unnest(@tags::text[]), ',');
106+
string_to_array(unnest(@tags::text[]), ',');
107+
108+
-- name: JobInsertFull :one
109+
INSERT INTO river_job(
110+
args,
111+
attempt,
112+
attempted_at,
113+
created_at,
114+
errors,
115+
finalized_at,
116+
kind,
117+
max_attempts,
118+
metadata,
119+
priority,
120+
queue,
121+
scheduled_at,
122+
state,
123+
tags
124+
) VALUES (
125+
@args::jsonb,
126+
coalesce(@attempt::smallint, 0),
127+
@attempted_at,
128+
coalesce(sqlc.narg('created_at')::timestamptz, now()),
129+
@errors::jsonb[],
130+
@finalized_at,
131+
@kind::text,
132+
@max_attempts::smallint,
133+
coalesce(@metadata::jsonb, '{}'),
134+
@priority::smallint,
135+
@queue::text,
136+
coalesce(sqlc.narg('scheduled_at')::timestamptz, now()),
137+
@state::river_job_state,
138+
coalesce(@tags::varchar(255)[], '{}')
139+
) RETURNING *;

src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from sqlalchemy.engine import Connection
99
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
1010
from typing import (
11-
Any,
1211
AsyncGenerator,
1312
AsyncIterator,
1413
Iterator,
@@ -36,7 +35,7 @@ async def advisory_lock(self, key: int) -> None:
3635
await self.pg_misc_querier.pg_advisory_xact_lock(key=key)
3736

3837
async def job_insert(self, insert_params: JobInsertParams) -> Job:
39-
return _job_from_row(
38+
return job_from_row(
4039
cast( # drop Optional[] because insert always returns a row
4140
models.RiverJob,
4241
await self.job_querier.job_insert_fast(
@@ -57,7 +56,7 @@ async def job_get_by_kind_and_unique_properties(
5756
row = await self.job_querier.job_get_by_kind_and_unique_properties(
5857
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
5958
)
60-
return _job_from_row(row) if row else None
59+
return job_from_row(row) if row else None
6160

6261
@asynccontextmanager
6362
async def transaction(self) -> AsyncGenerator:
@@ -103,7 +102,7 @@ def advisory_lock(self, key: int) -> None:
103102
self.pg_misc_querier.pg_advisory_xact_lock(key=key)
104103

105104
def job_insert(self, insert_params: JobInsertParams) -> Job:
106-
return _job_from_row(
105+
return job_from_row(
107106
cast( # drop Optional[] because insert always returns a row
108107
models.RiverJob,
109108
self.job_querier.job_insert_fast(
@@ -122,7 +121,7 @@ def job_get_by_kind_and_unique_properties(
122121
row = self.job_querier.job_get_by_kind_and_unique_properties(
123122
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
124123
)
125-
return _job_from_row(row) if row else None
124+
return job_from_row(row) if row else None
126125

127126
@contextmanager
128127
def transaction(self) -> Iterator[None]:
@@ -187,21 +186,13 @@ def _build_insert_many_params(
187186
return insert_many_params
188187

189188

190-
def _job_from_row(row: models.RiverJob) -> Job:
189+
def job_from_row(row: models.RiverJob) -> Job:
191190
"""
192191
Converts an internal sqlc generated row to the top level type, issuing a few
193192
minor transformations along the way. Timestamps are changed from local
194193
timezone to UTC.
195194
"""
196195

197-
def attempt_error_from(data: dict[str, Any]) -> AttemptError:
198-
return AttemptError(
199-
at=data["at"],
200-
attempt=data["attempt"],
201-
error=data["error"],
202-
trace=data["trace"],
203-
)
204-
205196
# Trivial shortcut, but avoids a bunch of ternaries getting line wrapped below.
206197
def to_utc(t: datetime) -> datetime:
207198
return t.astimezone(timezone.utc)
@@ -213,7 +204,7 @@ def to_utc(t: datetime) -> datetime:
213204
attempted_at=to_utc(row.attempted_at) if row.attempted_at else None,
214205
attempted_by=row.attempted_by,
215206
created_at=to_utc(row.created_at),
216-
errors=list(map(attempt_error_from, row.errors)) if row.errors else None,
207+
errors=list(map(AttemptError.from_dict, row.errors)) if row.errors else None,
217208
finalized_at=to_utc(row.finalized_at) if row.finalized_at else None,
218209
kind=row.kind,
219210
max_attempts=row.max_attempts,

src/riverqueue/job.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from dataclasses import dataclass
2-
import datetime
2+
from datetime import datetime, timezone
33
from enum import Enum
4+
import json
45
from typing import Any, Optional
56

67

@@ -106,7 +107,7 @@ class Job:
106107
if it's either snoozed or errors.
107108
"""
108109

109-
attempted_at: Optional[datetime.datetime]
110+
attempted_at: Optional[datetime]
110111
"""
111112
The time that the job was last worked. Starts out as `nil` on a new insert.
112113
"""
@@ -120,7 +121,7 @@ class Job:
120121
time when it starts up.
121122
"""
122123

123-
created_at: datetime.datetime
124+
created_at: datetime
124125
"""
125126
When the job record was created.
126127
"""
@@ -131,7 +132,7 @@ class Job:
131132
Ordered from earliest error to the latest error.
132133
"""
133134

134-
finalized_at: Optional[datetime.datetime]
135+
finalized_at: Optional[datetime]
135136
"""
136137
The time at which the job was "finalized", meaning it was either completed
137138
successfully or errored for the last time such that it'll no longer be
@@ -170,7 +171,7 @@ class Job:
170171
independently and be used to isolate jobs.
171172
"""
172173

173-
scheduled_at: datetime.datetime
174+
scheduled_at: datetime
174175
"""
175176
When the job is scheduled to become available to be worked. Jobs default to
176177
running immediately, but may be scheduled for the future when they're
@@ -199,7 +200,7 @@ class AttemptError:
199200
that occurred.
200201
"""
201202

202-
at: datetime.datetime
203+
at: datetime
203204
"""
204205
The time at which the error occurred.
205206
"""
@@ -221,3 +222,24 @@ class AttemptError:
221222
Contains a stack trace from a job that panicked. The trace is produced by
222223
invoking `debug.Trace()` in Go.
223224
"""
225+
226+
@staticmethod
227+
def from_dict(data: dict[str, Any]) -> "AttemptError":
228+
return AttemptError(
229+
at=datetime.fromisoformat(data["at"]),
230+
attempt=data["attempt"],
231+
error=data["error"],
232+
trace=data["trace"],
233+
)
234+
235+
def to_json(self) -> str:
236+
return json.dumps(
237+
{
238+
"at": self.at.astimezone(timezone.utc)
239+
.isoformat()
240+
.replace("+00:00", "Z"),
241+
"attempt": self.attempt,
242+
"error": self.error,
243+
"trace": self.trace,
244+
}
245+
)

0 commit comments

Comments
 (0)