Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- `riverqueue.AttemptError` can now round trip to and from JSON properly, including its `at` timestamp. [PR #31](https://github.com/riverqueue/riverqueue-python/pull/31).

## [0.6.0] - 2024-07-06

### Added
Expand Down
129 changes: 129 additions & 0 deletions src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,59 @@ class JobInsertFastManyParams:
tags: List[str]


JOB_INSERT_FULL = """-- name: job_insert_full \\:one
INSERT INTO river_job(
args,
attempt,
attempted_at,
created_at,
errors,
finalized_at,
kind,
max_attempts,
metadata,
priority,
queue,
scheduled_at,
state,
tags
) VALUES (
:p1\\:\\:jsonb,
coalesce(:p2\\:\\:smallint, 0),
:p3,
coalesce(:p4\\:\\:timestamptz, now()),
:p5\\:\\:jsonb[],
:p6,
:p7\\:\\:text,
:p8\\:\\:smallint,
coalesce(:p9\\:\\:jsonb, '{}'),
:p10\\:\\:smallint,
:p11\\:\\:text,
coalesce(:p12\\:\\:timestamptz, now()),
:p13\\:\\:river_job_state,
coalesce(:p14\\:\\:varchar(255)[], '{}')
) RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
"""


@dataclasses.dataclass()
class JobInsertFullParams:
args: Any
attempt: int
attempted_at: Optional[datetime.datetime]
created_at: Optional[datetime.datetime]
errors: List[Any]
finalized_at: Optional[datetime.datetime]
kind: str
max_attempts: int
metadata: Any
priority: int
queue: str
scheduled_at: Optional[datetime.datetime]
state: models.RiverJobState
tags: List[str]


class Querier:
def __init__(self, conn: sqlalchemy.engine.Connection):
self._conn = conn
Expand Down Expand Up @@ -266,6 +319,44 @@ def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int:
})
return result.rowcount

def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]:
row = self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), {
"p1": arg.args,
"p2": arg.attempt,
"p3": arg.attempted_at,
"p4": arg.created_at,
"p5": arg.errors,
"p6": arg.finalized_at,
"p7": arg.kind,
"p8": arg.max_attempts,
"p9": arg.metadata,
"p10": arg.priority,
"p11": arg.queue,
"p12": arg.scheduled_at,
"p13": arg.state,
"p14": arg.tags,
}).first()
if row is None:
return None
return models.RiverJob(
id=row[0],
args=row[1],
attempt=row[2],
attempted_at=row[3],
attempted_by=row[4],
created_at=row[5],
errors=row[6],
finalized_at=row[7],
kind=row[8],
max_attempts=row[9],
metadata=row[10],
priority=row[11],
queue=row[12],
state=row[13],
scheduled_at=row[14],
tags=row[15],
)


class AsyncQuerier:
def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection):
Expand Down Expand Up @@ -398,3 +489,41 @@ async def job_insert_fast_many(self, arg: JobInsertFastManyParams) -> int:
"p9": arg.tags,
})
return result.rowcount

async def job_insert_full(self, arg: JobInsertFullParams) -> Optional[models.RiverJob]:
row = (await self._conn.execute(sqlalchemy.text(JOB_INSERT_FULL), {
"p1": arg.args,
"p2": arg.attempt,
"p3": arg.attempted_at,
"p4": arg.created_at,
"p5": arg.errors,
"p6": arg.finalized_at,
"p7": arg.kind,
"p8": arg.max_attempts,
"p9": arg.metadata,
"p10": arg.priority,
"p11": arg.queue,
"p12": arg.scheduled_at,
"p13": arg.state,
"p14": arg.tags,
})).first()
if row is None:
return None
return models.RiverJob(
id=row[0],
args=row[1],
attempt=row[2],
attempted_at=row[3],
attempted_by=row[4],
created_at=row[5],
errors=row[6],
finalized_at=row[7],
kind=row[8],
max_attempts=row[9],
metadata=row[10],
priority=row[11],
queue=row[12],
state=row[13],
scheduled_at=row[14],
tags=row[15],
)
35 changes: 34 additions & 1 deletion src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,37 @@ INSERT INTO river_job(

-- Had trouble getting multi-dimensional arrays to play nicely with sqlc,
-- but it might be possible. For now, join tags into a single string.
string_to_array(unnest(@tags::text[]), ',');
string_to_array(unnest(@tags::text[]), ',');

-- name: JobInsertFull :one
INSERT INTO river_job(
args,
attempt,
attempted_at,
created_at,
errors,
finalized_at,
kind,
max_attempts,
metadata,
priority,
queue,
scheduled_at,
state,
tags
) VALUES (
@args::jsonb,
coalesce(@attempt::smallint, 0),
@attempted_at,
coalesce(sqlc.narg('created_at')::timestamptz, now()),
@errors::jsonb[],
@finalized_at,
@kind::text,
@max_attempts::smallint,
coalesce(@metadata::jsonb, '{}'),
@priority::smallint,
@queue::text,
coalesce(sqlc.narg('scheduled_at')::timestamptz, now()),
@state::river_job_state,
coalesce(@tags::varchar(255)[], '{}')
) RETURNING *;
21 changes: 6 additions & 15 deletions src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine
from typing import (
Any,
AsyncGenerator,
AsyncIterator,
Iterator,
Expand Down Expand Up @@ -36,7 +35,7 @@ async def advisory_lock(self, key: int) -> None:
await self.pg_misc_querier.pg_advisory_xact_lock(key=key)

async def job_insert(self, insert_params: JobInsertParams) -> Job:
return _job_from_row(
return job_from_row(
cast( # drop Optional[] because insert always returns a row
models.RiverJob,
await self.job_querier.job_insert_fast(
Expand All @@ -57,7 +56,7 @@ async def job_get_by_kind_and_unique_properties(
row = await self.job_querier.job_get_by_kind_and_unique_properties(
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
)
return _job_from_row(row) if row else None
return job_from_row(row) if row else None

@asynccontextmanager
async def transaction(self) -> AsyncGenerator:
Expand Down Expand Up @@ -103,7 +102,7 @@ def advisory_lock(self, key: int) -> None:
self.pg_misc_querier.pg_advisory_xact_lock(key=key)

def job_insert(self, insert_params: JobInsertParams) -> Job:
return _job_from_row(
return job_from_row(
cast( # drop Optional[] because insert always returns a row
models.RiverJob,
self.job_querier.job_insert_fast(
Expand All @@ -122,7 +121,7 @@ def job_get_by_kind_and_unique_properties(
row = self.job_querier.job_get_by_kind_and_unique_properties(
cast(river_job.JobGetByKindAndUniquePropertiesParams, get_params)
)
return _job_from_row(row) if row else None
return job_from_row(row) if row else None

@contextmanager
def transaction(self) -> Iterator[None]:
Expand Down Expand Up @@ -187,21 +186,13 @@ def _build_insert_many_params(
return insert_many_params


def _job_from_row(row: models.RiverJob) -> Job:
def job_from_row(row: models.RiverJob) -> Job:
"""
Converts an internal sqlc generated row to the top level type, issuing a few
minor transformations along the way. Timestamps are changed from local
timezone to UTC.
"""

def attempt_error_from(data: dict[str, Any]) -> AttemptError:
return AttemptError(
at=data["at"],
attempt=data["attempt"],
error=data["error"],
trace=data["trace"],
)

# Trivial shortcut, but avoids a bunch of ternaries getting line wrapped below.
def to_utc(t: datetime) -> datetime:
return t.astimezone(timezone.utc)
Expand All @@ -213,7 +204,7 @@ def to_utc(t: datetime) -> datetime:
attempted_at=to_utc(row.attempted_at) if row.attempted_at else None,
attempted_by=row.attempted_by,
created_at=to_utc(row.created_at),
errors=list(map(attempt_error_from, row.errors)) if row.errors else None,
errors=list(map(AttemptError.from_dict, row.errors)) if row.errors else None,
finalized_at=to_utc(row.finalized_at) if row.finalized_at else None,
kind=row.kind,
max_attempts=row.max_attempts,
Expand Down
34 changes: 28 additions & 6 deletions src/riverqueue/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataclasses import dataclass
import datetime
from datetime import datetime, timezone
from enum import Enum
import json
from typing import Any, Optional


Expand Down Expand Up @@ -106,7 +107,7 @@ class Job:
if it's either snoozed or errors.
"""

attempted_at: Optional[datetime.datetime]
attempted_at: Optional[datetime]
"""
The time that the job was last worked. Starts out as `nil` on a new insert.
"""
Expand All @@ -120,7 +121,7 @@ class Job:
time when it starts up.
"""

created_at: datetime.datetime
created_at: datetime
"""
When the job record was created.
"""
Expand All @@ -131,7 +132,7 @@ class Job:
Ordered from earliest error to the latest error.
"""

finalized_at: Optional[datetime.datetime]
finalized_at: Optional[datetime]
"""
The time at which the job was "finalized", meaning it was either completed
successfully or errored for the last time such that it'll no longer be
Expand Down Expand Up @@ -170,7 +171,7 @@ class Job:
independently and be used to isolate jobs.
"""

scheduled_at: datetime.datetime
scheduled_at: datetime
"""
When the job is scheduled to become available to be worked. Jobs default to
running immediately, but may be scheduled for the future when they're
Expand Down Expand Up @@ -199,7 +200,7 @@ class AttemptError:
that occurred.
"""

at: datetime.datetime
at: datetime
"""
The time at which the error occurred.
"""
Expand All @@ -221,3 +222,24 @@ class AttemptError:
Contains a stack trace from a job that panicked. The trace is produced by
invoking `debug.Trace()` in Go.
"""

@staticmethod
def from_dict(data: dict[str, Any]) -> "AttemptError":
return AttemptError(
at=datetime.fromisoformat(data["at"]),
attempt=data["attempt"],
error=data["error"],
trace=data["trace"],
)

def to_json(self) -> str:
return json.dumps(
{
"at": self.at.astimezone(timezone.utc)
.isoformat()
.replace("+00:00", "Z"),
"attempt": self.attempt,
"error": self.error,
"trace": self.trace,
}
)
Loading