From ba6bebeed36d072ac3ef92cfcc8ee5f5d6cf6163 Mon Sep 17 00:00:00 2001 From: Javad Ghane Date: Fri, 24 Oct 2025 18:07:59 +0330 Subject: [PATCH] Add Swagger specification for REST API --- README.md | 12 ++ docs/swagger.yaml | 464 +++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 4 + src/pgmq/__init__.py | 16 +- src/pgmq/api.py | 262 ++++++++++++++++++++++++ tests/test_api.py | 138 +++++++++++++ 6 files changed, 895 insertions(+), 1 deletion(-) create mode 100644 docs/swagger.yaml create mode 100644 src/pgmq/api.py create mode 100644 tests/test_api.py diff --git a/README.md b/README.md index 07ce789..39c811b 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,18 @@ queue.create_queue("my_queue") queue.create_partitioned_queue("my_partitioned_queue", partition_interval=10000) ``` +### REST API and Swagger documentation + +An optional FastAPI-powered REST API is available for teams that want to expose +queue operations over HTTP. Create an application with +`pgmq.api.create_app()`, or mount it within an existing ASGI server, and then +visit the automatically generated Swagger UI at `/docs` or ReDoc at `/redoc`. + +The repository also includes a static OpenAPI specification that can be used +with tools such as [Swagger UI](https://swagger.io/tools/swagger-ui/) or +[Insomnia](https://insomnia.rest/). You can find it at +[`docs/swagger.yaml`](docs/swagger.yaml). + ### List all queues ```python diff --git a/docs/swagger.yaml b/docs/swagger.yaml new file mode 100644 index 0000000..651deb7 --- /dev/null +++ b/docs/swagger.yaml @@ -0,0 +1,464 @@ +openapi: 3.0.3 +info: + title: PGMQ REST API + version: 1.0.0 + description: | + This OpenAPI document describes the REST interface exposed by the optional + FastAPI integration that ships with **pgmq**. The service provides queue + management, message lifecycle helpers, and queue metrics endpoints to make + it easy to integrate applications with PostgreSQL Message Queues. + contact: + name: PGMQ Maintainers + url: https://github.com/tembo-io/pgmq-py +servers: + - url: http://localhost:8000 + description: Local development server +components: + schemas: + Message: + type: object + required: [msg_id, read_ct, enqueued_at, vt, message] + properties: + msg_id: + type: integer + format: int64 + description: Unique identifier for the message within the queue. + read_ct: + type: integer + format: int32 + description: How many times the message has been read. + enqueued_at: + type: string + format: date-time + description: Timestamp when the message was enqueued. + vt: + type: string + format: date-time + description: Timestamp when the message will become visible again. + message: + type: object + additionalProperties: true + description: Arbitrary JSON payload carried by the message. + QueueMetrics: + type: object + required: + [queue_name, queue_length, newest_msg_age_sec, oldest_msg_age_sec, total_messages, scrape_time] + properties: + queue_name: + type: string + description: Name of the queue the metrics correspond to. + queue_length: + type: integer + format: int64 + description: Number of messages currently available in the queue. + newest_msg_age_sec: + type: number + format: float + description: Age in seconds of the newest visible message. + oldest_msg_age_sec: + type: number + format: float + description: Age in seconds of the oldest visible message. + total_messages: + type: integer + format: int64 + description: Total number of messages processed by the queue. + scrape_time: + type: string + format: date-time + description: Timestamp when the metrics snapshot was taken. + SendMessagePayload: + type: object + required: [message] + properties: + message: + type: object + additionalProperties: true + description: Arbitrary JSON payload to enqueue. + delay: + type: integer + format: int32 + minimum: 0 + description: Optional delay in seconds before the message becomes visible. + tz: + type: string + format: date-time + description: Optional timestamp when the message becomes visible. + SendBatchPayload: + type: object + required: [messages] + properties: + messages: + type: array + description: List of JSON payloads to enqueue as a batch. + items: + type: object + additionalProperties: true + delay: + type: integer + format: int32 + minimum: 0 + description: Optional delay applied to the batch in seconds. + tz: + type: string + format: date-time + description: Optional timestamp when the batch becomes visible. + ReadMessagesPayload: + type: object + properties: + batch_size: + type: integer + format: int32 + minimum: 1 + maximum: 1024 + default: 1 + description: Number of messages to retrieve in a single call. + vt: + type: integer + format: int32 + minimum: 0 + description: Optional visibility timeout override in seconds. + DeleteBatchPayload: + type: object + required: [msg_ids] + properties: + msg_ids: + type: array + description: Message identifiers to delete. + items: + type: integer + format: int64 + ArchiveBatchPayload: + type: object + required: [msg_ids] + properties: + msg_ids: + type: array + description: Message identifiers to archive. + items: + type: integer + format: int64 + responses: + NotFound: + description: The requested resource could not be found. + content: + application/json: + schema: + type: object + properties: + detail: + type: string + ServerError: + description: An unexpected server error occurred. + content: + application/json: + schema: + type: object + properties: + detail: + type: string +paths: + /queues: + get: + tags: [Queues] + summary: List available queues + operationId: listQueues + responses: + '200': + description: Successful response + content: + application/json: + schema: + type: object + properties: + queues: + type: array + items: + type: string + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/messages: + post: + tags: [Messages] + summary: Enqueue a message + operationId: sendMessage + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SendMessagePayload' + responses: + '200': + description: Message enqueued + content: + application/json: + schema: + type: object + properties: + msg_id: + type: integer + format: int64 + '500': + $ref: '#/components/responses/ServerError' + delete: + tags: [Messages] + summary: Delete a batch of messages by id + operationId: deleteBatch + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DeleteBatchPayload' + responses: + '200': + description: Delete confirmation + content: + application/json: + schema: + type: object + properties: + deleted: + type: array + items: + type: integer + format: int64 + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/messages/batch: + post: + tags: [Messages] + summary: Enqueue a batch of messages + operationId: sendBatch + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SendBatchPayload' + responses: + '200': + description: Messages enqueued + content: + application/json: + schema: + type: object + properties: + msg_ids: + type: array + items: + type: integer + format: int64 + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/messages/read: + post: + tags: [Messages] + summary: Read (peek) messages from a queue + operationId: readMessages + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ReadMessagesPayload' + responses: + '200': + description: Messages retrieved + content: + application/json: + schema: + type: object + properties: + messages: + type: array + items: + $ref: '#/components/schemas/Message' + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/messages/pop: + post: + tags: [Messages] + summary: Pop a message from a queue + operationId: popMessage + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + responses: + '200': + description: Popped message + content: + application/json: + schema: + type: object + properties: + message: + $ref: '#/components/schemas/Message' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/messages/{msg_id}: + delete: + tags: [Messages] + summary: Delete a single message by id + operationId: deleteMessage + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + - in: path + name: msg_id + required: true + schema: + type: integer + format: int64 + responses: + '200': + description: Delete confirmation + content: + application/json: + schema: + type: object + properties: + deleted: + type: boolean + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/archive/{msg_id}: + post: + tags: [Archive] + summary: Archive a single message by id + operationId: archiveMessage + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + - in: path + name: msg_id + required: true + schema: + type: integer + format: int64 + responses: + '200': + description: Archive confirmation + content: + application/json: + schema: + type: object + properties: + archived: + type: boolean + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/archive/batch: + post: + tags: [Archive] + summary: Archive a batch of messages + operationId: archiveBatch + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ArchiveBatchPayload' + responses: + '200': + description: Archive confirmation + content: + application/json: + schema: + type: object + properties: + archived: + type: array + items: + type: integer + format: int64 + '500': + $ref: '#/components/responses/ServerError' + /queues/{queue_name}/metrics: + get: + tags: [Metrics] + summary: Retrieve metrics for a specific queue + operationId: queueMetrics + parameters: + - in: path + name: queue_name + required: true + schema: + type: string + responses: + '200': + description: Metrics snapshot + content: + application/json: + schema: + type: object + properties: + metrics: + $ref: '#/components/schemas/QueueMetrics' + '500': + $ref: '#/components/responses/ServerError' + /queues/metrics: + get: + tags: [Metrics] + summary: Retrieve metrics for all queues + operationId: allQueueMetrics + responses: + '200': + description: Metrics for all queues + content: + application/json: + schema: + type: object + properties: + metrics: + type: array + items: + $ref: '#/components/schemas/QueueMetrics' + '500': + $ref: '#/components/responses/ServerError' diff --git a/pyproject.toml b/pyproject.toml index 55affd4..e89d3d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,10 @@ dependencies = [ [project.optional-dependencies] async = ["asyncpg>=0.30.0"] +api = [ + "fastapi>=0.115.0", + "uvicorn[standard]>=0.32.0", +] [dependency-groups] bench = [ diff --git a/src/pgmq/__init__.py b/src/pgmq/__init__.py index 8a431a7..16f6154 100644 --- a/src/pgmq/__init__.py +++ b/src/pgmq/__init__.py @@ -1,4 +1,18 @@ +from typing import TYPE_CHECKING, Optional + from pgmq.queue import Message, PGMQueue # type: ignore from pgmq.decorators import transaction, async_transaction -__all__ = ["Message", "PGMQueue", "transaction", "async_transaction"] +if TYPE_CHECKING: # pragma: no cover + from fastapi import FastAPI + + +def create_app(queue: Optional[PGMQueue] = None): + """Create a FastAPI application for interacting with PGMQ.""" + + from pgmq.api import create_app as _create_app + + return _create_app(queue=queue) + + +__all__ = ["Message", "PGMQueue", "transaction", "async_transaction", "create_app"] diff --git a/src/pgmq/api.py b/src/pgmq/api.py new file mode 100644 index 0000000..910483f --- /dev/null +++ b/src/pgmq/api.py @@ -0,0 +1,262 @@ +"""REST API for interacting with PGMQ queues.""" +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional + +from fastapi import FastAPI, HTTPException, Request +from pydantic import BaseModel, Field + +from pgmq.messages import Message, QueueMetrics +from pgmq.queue import PGMQueue + + +class SendMessagePayload(BaseModel): + """Payload for sending a single message.""" + + message: Dict[str, Any] = Field(..., description="Arbitrary JSON payload to enqueue") + delay: Optional[int] = Field( + default=None, + ge=0, + description="Optional delay (in seconds) before the message becomes visible.", + ) + tz: Optional[datetime] = Field( + default=None, + description="Optional timestamp when the message becomes visible.", + ) + + +class SendBatchPayload(BaseModel): + """Payload for sending a batch of messages.""" + + messages: List[Dict[str, Any]] = Field( + ..., description="List of JSON payloads to enqueue as a batch" + ) + delay: Optional[int] = Field( + default=None, + ge=0, + description="Optional delay (in seconds) applied to the batch before visibility.", + ) + tz: Optional[datetime] = Field( + default=None, + description="Optional timestamp when the batch becomes visible.", + ) + + +class ReadMessagesPayload(BaseModel): + """Payload for reading messages from a queue.""" + + batch_size: int = Field( + default=1, + ge=1, + le=1024, + description="Number of messages to retrieve in a single request.", + ) + vt: Optional[int] = Field( + default=None, + ge=0, + description="Optional override of the queue's default visibility timeout (seconds).", + ) + + +class DeleteBatchPayload(BaseModel): + """Payload for deleting multiple messages.""" + + msg_ids: List[int] = Field(..., description="List of message ids to delete from the queue.") + + +class ArchiveBatchPayload(BaseModel): + """Payload for archiving multiple messages.""" + + msg_ids: List[int] = Field(..., description="List of message ids to archive from the queue.") + + +def _serialize_message(message: Message) -> Dict[str, Any]: + return { + "msg_id": message.msg_id, + "read_ct": message.read_ct, + "enqueued_at": message.enqueued_at.isoformat(), + "vt": message.vt.isoformat(), + "message": message.message, + } + + +def _serialize_metrics(metrics: QueueMetrics) -> Dict[str, Any]: + return { + "queue_name": metrics.queue_name, + "queue_length": metrics.queue_length, + "newest_msg_age_sec": metrics.newest_msg_age_sec, + "oldest_msg_age_sec": metrics.oldest_msg_age_sec, + "total_messages": metrics.total_messages, + "scrape_time": metrics.scrape_time.isoformat(), + } + + +def _get_queue(request: Request) -> PGMQueue: + queue: PGMQueue = request.app.state.queue + return queue + + +def create_app(queue: Optional[PGMQueue] = None) -> FastAPI: + """Create a FastAPI application configured to interact with a PGMQ queue.""" + + app = FastAPI( + title="PGMQ API", + version="1.0.0", + description="REST API for interacting with the PGMQ extension.", + ) + + app.state.queue = queue or PGMQueue() + + @app.get("/queues", summary="List available queues") + def list_queues(request: Request) -> Dict[str, List[str]]: + try: + queues = _get_queue(request).list_queues() + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"queues": queues} + + @app.post("/queues/{queue_name}/messages", summary="Enqueue a message") + def send_message( + queue_name: str, payload: SendMessagePayload, request: Request + ) -> Dict[str, int]: + queue = _get_queue(request) + try: + msg_id = queue.send(queue_name, payload.message, delay=payload.delay or 0, tz=payload.tz) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"msg_id": msg_id} + + @app.post("/queues/{queue_name}/messages/batch", summary="Enqueue a batch of messages") + def send_batch( + queue_name: str, payload: SendBatchPayload, request: Request + ) -> Dict[str, List[int]]: + queue = _get_queue(request) + try: + msg_ids = queue.send_batch( + queue_name, + payload.messages, + delay=payload.delay or 0, + tz=payload.tz, + ) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"msg_ids": msg_ids} + + @app.post( + "/queues/{queue_name}/messages/read", + summary="Read (peek) messages from a queue", + ) + def read_messages( + queue_name: str, payload: ReadMessagesPayload, request: Request + ) -> Dict[str, List[Dict[str, Any]]]: + queue = _get_queue(request) + try: + messages = queue.read_batch( + queue_name, + vt=payload.vt, + batch_size=payload.batch_size, + ) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + + serialized = [] if not messages else [_serialize_message(message) for message in messages] + return {"messages": serialized} + + @app.post( + "/queues/{queue_name}/messages/pop", + summary="Pop a message from a queue", + ) + def pop_message(queue_name: str, request: Request) -> Dict[str, Any]: + queue = _get_queue(request) + try: + message = queue.pop(queue_name) + except IndexError: + raise HTTPException(status_code=404, detail="No messages available to pop") + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"message": _serialize_message(message)} + + @app.delete( + "/queues/{queue_name}/messages/{msg_id}", + summary="Delete a single message by id", + ) + def delete_message(queue_name: str, msg_id: int, request: Request) -> Dict[str, bool]: + queue = _get_queue(request) + try: + deleted = queue.delete(queue_name, msg_id) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + if not deleted: + raise HTTPException(status_code=404, detail="Message not found") + return {"deleted": True} + + @app.delete( + "/queues/{queue_name}/messages", + summary="Delete a batch of messages by id", + ) + def delete_batch( + queue_name: str, payload: DeleteBatchPayload, request: Request + ) -> Dict[str, List[int]]: + queue = _get_queue(request) + try: + deleted = queue.delete_batch(queue_name, payload.msg_ids) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"deleted": deleted} + + @app.post( + "/queues/{queue_name}/archive/{msg_id}", + summary="Archive a single message by id", + ) + def archive_message( + queue_name: str, msg_id: int, request: Request + ) -> Dict[str, bool]: + queue = _get_queue(request) + try: + archived = queue.archive(queue_name, msg_id) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + if not archived: + raise HTTPException(status_code=404, detail="Message not found") + return {"archived": True} + + @app.post( + "/queues/{queue_name}/archive/batch", + summary="Archive a batch of messages", + ) + def archive_batch( + queue_name: str, payload: ArchiveBatchPayload, request: Request + ) -> Dict[str, List[int]]: + queue = _get_queue(request) + try: + archived = queue.archive_batch(queue_name, payload.msg_ids) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"archived": archived} + + @app.get( + "/queues/{queue_name}/metrics", + summary="Retrieve metrics for a specific queue", + ) + def queue_metrics(queue_name: str, request: Request) -> Dict[str, Any]: + queue = _get_queue(request) + try: + metrics = queue.metrics(queue_name) + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"metrics": _serialize_metrics(metrics)} + + @app.get("/queues/metrics", summary="Retrieve metrics for all queues") + def all_queue_metrics(request: Request) -> Dict[str, List[Dict[str, Any]]]: + queue = _get_queue(request) + try: + metrics = queue.metrics_all() + except Exception as exc: # pragma: no cover - unexpected errors + raise HTTPException(status_code=500, detail=str(exc)) from exc + return {"metrics": [_serialize_metrics(item) for item in metrics]} + + return app + + +__all__ = ["create_app"] diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..ac9e1fe --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Dict, List + +import pytest + +pytest.importorskip("fastapi") +pytest.importorskip("fastapi.testclient") + +from fastapi.testclient import TestClient + +from pgmq import Message, QueueMetrics, create_app + + +class FakeQueue: + def __init__(self) -> None: + self._queues: Dict[str, List[Message]] = {} + self._next_id = 1 + + def list_queues(self) -> List[str]: + return list(self._queues.keys()) + + def _ensure_queue(self, queue: str) -> None: + self._queues.setdefault(queue, []) + + def send(self, queue: str, message: Dict[str, Any], delay: int = 0, tz=None, conn=None) -> int: + self._ensure_queue(queue) + msg_id = self._next_id + self._next_id += 1 + now = datetime.now(timezone.utc) + self._queues[queue].append( + Message(msg_id=msg_id, read_ct=0, enqueued_at=now, vt=now, message=message) + ) + return msg_id + + def send_batch( + self, queue: str, messages: List[Dict[str, Any]], delay: int = 0, tz=None, conn=None + ) -> List[int]: + return [self.send(queue, message, delay=delay, tz=tz, conn=conn) for message in messages] + + def read_batch(self, queue: str, vt: int | None = None, batch_size: int = 1, conn=None): + self._ensure_queue(queue) + messages = self._queues[queue][:batch_size] + return messages + + def pop(self, queue: str, conn=None): + self._ensure_queue(queue) + if not self._queues[queue]: + raise IndexError("empty queue") + return self._queues[queue].pop(0) + + def delete(self, queue: str, msg_id: int, conn=None) -> bool: + self._ensure_queue(queue) + for idx, message in enumerate(self._queues[queue]): + if message.msg_id == msg_id: + del self._queues[queue][idx] + return True + return False + + def delete_batch(self, queue: str, msg_ids: List[int], conn=None) -> List[int]: + deleted: List[int] = [] + for msg_id in msg_ids: + if self.delete(queue, msg_id, conn=conn): + deleted.append(msg_id) + return deleted + + def archive(self, queue: str, msg_id: int, conn=None) -> bool: + return self.delete(queue, msg_id, conn=conn) + + def archive_batch(self, queue: str, msg_ids: List[int], conn=None) -> List[int]: + return self.delete_batch(queue, msg_ids, conn=conn) + + def metrics(self, queue: str, conn=None) -> QueueMetrics: + self._ensure_queue(queue) + now = datetime.now(timezone.utc) + length = len(self._queues[queue]) + return QueueMetrics( + queue_name=queue, + queue_length=length, + newest_msg_age_sec=0, + oldest_msg_age_sec=0, + total_messages=length, + scrape_time=now, + ) + + def metrics_all(self, conn=None) -> List[QueueMetrics]: + return [self.metrics(queue) for queue in self._queues] + + +def create_client() -> TestClient: + fake_queue = FakeQueue() + app = create_app(queue=fake_queue) + return TestClient(app) + + +def test_send_and_read_messages(): + client = create_client() + + send_response = client.post("/queues/test/messages", json={"message": {"foo": "bar"}}) + assert send_response.status_code == 200 + msg_id = send_response.json()["msg_id"] + + read_response = client.post("/queues/test/messages/read", json={"batch_size": 1}) + assert read_response.status_code == 200 + messages = read_response.json()["messages"] + assert len(messages) == 1 + assert messages[0]["msg_id"] == msg_id + assert messages[0]["message"] == {"foo": "bar"} + + +def test_batch_operations_and_metrics(): + client = create_client() + + batch_response = client.post( + "/queues/test/messages/batch", + json={"messages": [{"foo": 1}, {"bar": 2}]}, + ) + assert batch_response.status_code == 200 + msg_ids = batch_response.json()["msg_ids"] + assert len(msg_ids) == 2 + + delete_response = client.delete( + "/queues/test/messages", + json={"msg_ids": [msg_ids[0]]}, + ) + assert delete_response.status_code == 200 + assert delete_response.json()["deleted"] == [msg_ids[0]] + + metrics_response = client.get("/queues/test/metrics") + assert metrics_response.status_code == 200 + metrics = metrics_response.json()["metrics"] + assert metrics["queue_name"] == "test" + assert metrics["queue_length"] == 1 + + all_metrics = client.get("/queues/metrics") + assert all_metrics.status_code == 200 + assert len(all_metrics.json()["metrics"]) == 1