From f6c8204db0cc7c724d80f7e0112ce0edbdaf8c72 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 24 Feb 2025 09:13:34 +0100 Subject: [PATCH 1/2] adding code documentation plus few minor improvements --- poetry.lock | 28 ++-- pyproject.toml | 2 +- rabbitmq_amqp_python_client/address_helper.py | 62 ++++++++ rabbitmq_amqp_python_client/connection.py | 65 +++++++++ rabbitmq_amqp_python_client/consumer.py | 57 ++++++++ rabbitmq_amqp_python_client/entities.py | 117 +++++++++++++++ rabbitmq_amqp_python_client/environment.py | 69 ++++++++- rabbitmq_amqp_python_client/management.py | 134 ++++++++++++++++++ rabbitmq_amqp_python_client/publisher.py | 60 +++++++- tests/test_connection.py | 6 + 10 files changed, 579 insertions(+), 21 deletions(-) diff --git a/poetry.lock b/poetry.lock index 7e24b1a..7dafe77 100644 --- a/poetry.lock +++ b/poetry.lock @@ -48,13 +48,13 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "certifi" -version = "2024.12.14" +version = "2025.1.31" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" files = [ - {file = "certifi-2024.12.14-py3-none-any.whl", hash = "sha256:1275f7a45be9464efc1173084eaa30f866fe2e47d389406136d332ed4967ec56"}, - {file = "certifi-2024.12.14.tar.gz", hash = "sha256:b650d30f370c2b724812bee08008be0c4163b163ddaec3f2546c1caf65f191db"}, + {file = "certifi-2025.1.31-py3-none-any.whl", hash = "sha256:ca78db4565a652026a4db2bcdf68f2fb589ea80d0be70e03929ed730746b84fe"}, + {file = "certifi-2025.1.31.tar.gz", hash = "sha256:3d5da6925056f6f18f119200434a4780a94263f10d1c21d032a6f6b2baa20651"}, ] [[package]] @@ -278,13 +278,13 @@ test = ["pytest (>=6)"] [[package]] name = "flake8" -version = "7.1.1" +version = "7.1.2" description = "the modular source code checker: pep8 pyflakes and co" optional = false python-versions = ">=3.8.1" files = [ - {file = "flake8-7.1.1-py2.py3-none-any.whl", hash = "sha256:597477df7860daa5aa0fdd84bf5208a043ab96b8e96ab708770ae0364dd03213"}, - {file = "flake8-7.1.1.tar.gz", hash = "sha256:049d058491e228e03e67b390f311bbf88fce2dbaa8fa673e7aea87b7198b8d38"}, + {file = "flake8-7.1.2-py2.py3-none-any.whl", hash = "sha256:1cbc62e65536f65e6d754dfe6f1bada7f5cf392d6f5db3c2b85892466c3e7c1a"}, + {file = "flake8-7.1.2.tar.gz", hash = "sha256:c586ffd0b41540951ae41af572e6790dbd49fc12b3aa2541685d253d9bd504bd"}, ] [package.dependencies] @@ -481,13 +481,13 @@ files = [ [[package]] name = "pytest" -version = "7.4.4" +version = "8.3.4" description = "pytest: simple powerful testing with Python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "pytest-7.4.4-py3-none-any.whl", hash = "sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8"}, - {file = "pytest-7.4.4.tar.gz", hash = "sha256:2cf0005922c6ace4a3e2ec8b4080eb0d9753fdc93107415332f50ce9e7994280"}, + {file = "pytest-8.3.4-py3-none-any.whl", hash = "sha256:50e16d954148559c9a74109af1eaf0c945ba2d8f30f0a3d3335edde19788b6f6"}, + {file = "pytest-8.3.4.tar.gz", hash = "sha256:965370d062bce11e73868e0335abac31b4d3de0e82f4007408d242b4f8610761"}, ] [package.dependencies] @@ -495,11 +495,11 @@ colorama = {version = "*", markers = "sys_platform == \"win32\""} exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} iniconfig = "*" packaging = "*" -pluggy = ">=0.12,<2.0" -tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} +pluggy = ">=1.5,<2" +tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] -testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] name = "python-qpid-proton" @@ -623,4 +623,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "69fd7879f7457a4f02975ea1a6c1ebdc16e2c8e68bf542e0d7ff3efecf45cd1a" +content-hash = "9436dade4f0387b0b81013c2b05a4d42241fb0fd9274213567b078447611dd37" diff --git a/pyproject.toml b/pyproject.toml index 6638a19..ea68931 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ python-qpid-proton = "^0.39.0" flake8 = "^7.1.1" isort = "^5.9.3" mypy = "^0.910" -pytest = "^7.4.0" +pytest = "^8.3.4" black = "^24.3.0" python-qpid-proton = "^0.39.0" requests = "^2.31.0" diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index 138f724..7968059 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -31,9 +31,25 @@ def encode_path_segment(input_string: Optional[str]) -> str: class AddressHelper: + """ + Helper class for constructing and managing AMQP addresses. + + This class provides static methods for creating properly formatted addresses + for various AMQP operations including exchanges, queues, and bindings. + """ @staticmethod def exchange_address(exchange_name: str, routing_key: str = "") -> str: + """ + Create an address for an exchange, optionally with a routing key. + + Args: + exchange_name: The name of the exchange + routing_key: Optional routing key + + Returns: + str: The formatted exchange address + """ if routing_key == "": path = "/exchanges/" + encode_path_segment(exchange_name) else: @@ -48,12 +64,30 @@ def exchange_address(exchange_name: str, routing_key: str = "") -> str: @staticmethod def queue_address(name: str) -> str: + """ + Create an address for a queue. + + Args: + name: The name of the queue + + Returns: + str: The formatted queue address + """ path = "/queues/" + encode_path_segment(name) return path @staticmethod def purge_queue_address(name: str) -> str: + """ + Create an address for purging a queue. + + Args: + name: The name of the queue to purge + + Returns: + str: The formatted purge queue address + """ path = "/queues/" + encode_path_segment(name) + "/messages" return path @@ -68,6 +102,15 @@ def path_address() -> str: def binding_path_with_exchange_queue( bind_specification: ExchangeToQueueBindingSpecification, ) -> str: + """ + Create a binding path for an exchange-to-queue binding. + + Args: + bind_specification: The specification for the binding + + Returns: + str: The formatted binding path + """ if bind_specification.binding_key is not None: key = ";key=" + encode_path_segment(bind_specification.binding_key) else: @@ -90,6 +133,15 @@ def binding_path_with_exchange_queue( def binding_path_with_exchange_exchange( bind_specification: ExchangeToExchangeBindingSpecification, ) -> str: + """ + Create a binding path for an exchange-to-exchange binding. + + Args: + bind_specification: The specification for the binding + + Returns: + str: The formatted binding path + """ binding_path_wth_exchange_exchange_key = ( "/bindings" + "/" @@ -106,6 +158,16 @@ def binding_path_with_exchange_exchange( @staticmethod def message_to_address_helper(message: Message, address: str) -> Message: + """ + Set the address on a message. + + Args: + message: The message to modify + address: The address to set + + Returns: + Message: The modified message with the new address + """ message.address = address return message diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index ac08c0f..e3a533f 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -19,6 +19,15 @@ class Connection: + """ + Main connection class for interacting with RabbitMQ via AMQP 1.0 protocol. + + This class manages the connection to RabbitMQ and provides factory methods for + creating publishers, consumers, and management interfaces. It supports both + single-node and multi-node configurations, as well as SSL/TLS connections. + + """ + def __init__( self, # single-node mode @@ -28,6 +37,18 @@ def __init__( ssl_context: Optional[SslConfigurationContext] = None, on_disconnection_handler: Optional[CB] = None, # type: ignore ): + """ + Initialize a new Connection instance. + + Args: + uri: Single node connection URI + uris: List of URIs for multi-node setup + ssl_context: SSL configuration for secure connections + on_disconnection_handler: Callback for handling disconnection events + + Raises: + ValueError: If neither uri nor uris is provided + """ if uri is None and uris is None: raise ValueError("You need to specify at least an addr or a list of addr") self._addr: Optional[str] = uri @@ -44,6 +65,12 @@ def _set_environment_connection_list(self, connections: []): # type: ignore self._connections = connections def dial(self) -> None: + """ + Establish a connection to the AMQP server. + + Configures SSL if specified and establishes the connection using the + provided URI(s). Also initializes the management interface. + """ logger.debug("Establishing a connection to the amqp server") if self._conf_ssl_context is not None: logger.debug("Enabling SSL") @@ -74,15 +101,38 @@ def _open(self) -> None: self._management.open() def management(self) -> Management: + """ + Get the management interface for this connection. + + Returns: + Management: The management interface for performing administrative tasks + """ return self._management # closes the connection to the AMQP 1.0 server. def close(self) -> None: + """ + Close the connection to the AMQP 1.0 server. + + Closes the underlying connection and removes it from the connection list. + """ logger.debug("Closing connection") self._conn.close() self._connections.remove(self) def publisher(self, destination: str = "") -> Publisher: + """ + Create a new publisher instance. + + Args: + destination: Optional default destination for published messages + + Returns: + Publisher: A new publisher instance + + Raises: + ArgumentOutOfRangeException: If destination address format is invalid + """ if destination != "": if validate_address(destination) is False: raise ArgumentOutOfRangeException( @@ -98,6 +148,21 @@ def consumer( stream_filter_options: Optional[StreamOptions] = None, credit: Optional[int] = None, ) -> Consumer: + """ + Create a new consumer instance. + + Args: + destination: The address to consume from + message_handler: Optional handler for processing messages + stream_filter_options: Optional configuration for stream consumption + credit: Optional credit value for flow control + + Returns: + Consumer: A new consumer instance + + Raises: + ArgumentOutOfRangeException: If destination address format is invalid + """ if validate_address(destination) is False: raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 1805d30..3da4dec 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -17,6 +17,22 @@ class Consumer: + """ + A consumer class for receiving messages from RabbitMQ via AMQP 1.0 protocol. + + This class handles the consumption of messages from a specified address in RabbitMQ. + It supports both standard message consumption and stream-based consumption with + optional filtering capabilities. + + Attributes: + _receiver (Optional[BlockingReceiver]): The receiver for consuming messages + _conn (BlockingConnection): The underlying connection to RabbitMQ + _addr (str): The address to consume from + _handler (Optional[MessagingHandler]): Optional message handling callback + _stream_options (Optional[StreamOptions]): Configuration for stream consumption + _credit (Optional[int]): Flow control credit value + """ + def __init__( self, conn: BlockingConnection, @@ -25,6 +41,16 @@ def __init__( stream_options: Optional[StreamOptions] = None, credit: Optional[int] = None, ): + """ + Initialize a new Consumer instance. + + Args: + conn: The blocking connection to use for consuming + addr: The address to consume from + handler: Optional message handler for processing received messages + stream_options: Optional configuration for stream-based consumption + credit: Optional credit value for flow control + """ self._receiver: Optional[BlockingReceiver] = None self._conn = conn self._addr = addr @@ -39,21 +65,52 @@ def _open(self) -> None: self._receiver = self._create_receiver(self._addr) def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message: + """ + Consume a message from the queue. + + Args: + timeout: The time to wait for a message. + None: Defaults to 60s + float: Wait for specified number of seconds + + Returns: + Message: The received message + + Note: + The return type might be None if no message is available and timeout occurs, + but this is handled by the cast to Message. + """ if self._receiver is not None: message = self._receiver.receive(timeout=timeout) return cast(Message, message) def close(self) -> None: + """ + Close the consumer connection. + + Closes the receiver if it exists and cleans up resources. + """ logger.debug("Closing the receiver") if self._receiver is not None: self._receiver.close() def run(self) -> None: + """ + Run the consumer in continuous mode. + + Starts the consumer's container to process messages continuously. + """ logger.debug("Running the consumer: starting to consume") if self._receiver is not None: self._receiver.container.run() def stop(self) -> None: + """ + Stop the consumer's continuous processing. + + Stops the consumer's container, halting message processing. + This should be called to cleanly stop a consumer that was started with run(). + """ logger.debug("Stopping the consumer: starting to consume") if self._receiver is not None: self._receiver.container.stop_events() diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index e123c59..8e1d04a 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -12,6 +12,21 @@ @dataclass class ExchangeSpecification: + """ + Specification for declaring a standard exchange in RabbitMQ. + + This class defines the properties and configuration for a standard exchange, + including its type, durability, and other characteristics. + + Attributes: + name: The name of the exchange + arguments: Additional arguments for exchange configuration + exchange_type: The type of exchange (direct, fanout, topic, etc.) + is_auto_delete: Whether the exchange should be auto-deleted + is_internal: Whether the exchange is internal only + is_durable: Whether the exchange should survive broker restarts + """ + name: str arguments: dict[str, str] = field(default_factory=dict) exchange_type: ExchangeType = ExchangeType.direct @@ -22,6 +37,21 @@ class ExchangeSpecification: @dataclass class ExchangeCustomSpecification: + """ + Specification for declaring a custom exchange type in RabbitMQ. + + Similar to ExchangeSpecification but allows for custom exchange types + beyond the standard ones. + + Attributes: + name: The name of the exchange + exchange_type: The custom exchange type identifier + arguments: Additional arguments for exchange configuration + is_auto_delete: Whether the exchange should be auto-deleted + is_internal: Whether the exchange is internal only + is_durable: Whether the exchange should survive broker restarts + """ + name: str exchange_type: str arguments: dict[str, str] = field(default_factory=dict) @@ -32,6 +62,25 @@ class ExchangeCustomSpecification: @dataclass class QueueInfo: + """ + Information about a queue in RabbitMQ. + + This class represents the current state and configuration of a queue, + including its properties and statistics. + + Attributes: + name: The name of the queue + arguments: Additional arguments used in queue configuration + queue_type: The type of queue (classic, quorum, or stream) + is_exclusive: Whether the queue is exclusive to one connection + is_auto_delete: Whether the queue should be auto-deleted + is_durable: Whether the queue survives broker restarts + leader: The leader node for quorum queues + members: The member nodes for quorum queues + message_count: Current number of messages in the queue + consumer_count: Current number of consumers + """ + name: str arguments: dict[str, Any] queue_type: QueueType = QueueType.classic @@ -45,6 +94,17 @@ class QueueInfo: class OffsetSpecification(Enum): + """ + Specification for stream offset positions. + + Defines the possible starting positions for consuming from a stream. + + Attributes: + first: Start from the first message in the stream + next: Start from the next message to arrive + last: Start from the last message in the stream + """ + first = ("first",) next = ("next",) last = ("last",) @@ -52,6 +112,18 @@ class OffsetSpecification(Enum): @dataclass class ExchangeToQueueBindingSpecification: + """ + Specification for binding an exchange to a queue. + + Defines the relationship between a source exchange and a destination queue, + optionally with a routing key. + + Attributes: + source_exchange: The name of the source exchange + destination_queue: The name of the destination queue + binding_key: Optional routing key for the binding + """ + source_exchange: str destination_queue: str binding_key: Optional[str] = None @@ -59,17 +131,43 @@ class ExchangeToQueueBindingSpecification: @dataclass class ExchangeToExchangeBindingSpecification: + """ + Specification for binding an exchange to another exchange. + + Defines the relationship between two exchanges, optionally with a routing key. + + Attributes: + source_exchange: The name of the source exchange + destination_exchange: The name of the destination exchange + binding_key: Optional routing key for the binding + """ + source_exchange: str destination_exchange: str binding_key: Optional[str] = None class StreamOptions: + """ + Configuration options for stream queues. + + This class manages stream-specific options including filtering and offset specifications. + + Attributes: + _filter_set: Dictionary of stream filter specifications + """ def __init__(self): # type: ignore self._filter_set: Dict[symbol, Described] = {} def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: + """ + Set the offset specification for the stream. + + Args: + offset_specification: Either an OffsetSpecification enum value or + an integer offset + """ if isinstance(offset_specification, int): self._filter_set[symbol(STREAM_OFFSET_SPEC)] = Described( symbol(STREAM_OFFSET_SPEC), offset_specification @@ -80,14 +178,33 @@ def offset(self, offset_specification: Union[OffsetSpecification, int]) -> None: ) def filter_values(self, filters: list[str]) -> None: + """ + Set the filter values for the stream. + + Args: + filters: List of filter strings to apply to the stream + """ self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described( symbol(STREAM_FILTER_SPEC), filters ) def filter_match_unfiltered(self, filter_match_unfiltered: bool) -> None: + """ + Set whether to match unfiltered messages. + + Args: + filter_match_unfiltered: Whether to match messages that don't match + any filter + """ self._filter_set[symbol(STREAM_FILTER_MATCH_UNFILTERED)] = Described( symbol(STREAM_FILTER_MATCH_UNFILTERED), filter_match_unfiltered ) def filter_set(self) -> Dict[symbol, Described]: + """ + Get the current filter set configuration. + + Returns: + Dict[symbol, Described]: The current filter set configuration + """ return self._filter_set diff --git a/rabbitmq_amqp_python_client/environment.py b/rabbitmq_amqp_python_client/environment.py index 4e1a515..337f84c 100644 --- a/rabbitmq_amqp_python_client/environment.py +++ b/rabbitmq_amqp_python_client/environment.py @@ -12,9 +12,23 @@ class Environment: + """ + Environment class for managing AMQP connections. + + This class serves as a connection pooler to maintain compatibility with other clients. + It manages a collection of connections and provides methods for creating and managing + these connections. + + Attributes: + _connections (list[Connection]): List of active connections managed by this environment + """ def __init__(self): # type: ignore + """ + Initialize a new Environment instance. + Creates an empty list to track active connections. + """ self._connections: list[Connection] = [] def connection( @@ -26,6 +40,24 @@ def connection( ssl_context: Optional[SslConfigurationContext] = None, on_disconnection_handler: Optional[CB] = None, # type: ignore ) -> Connection: + """ + Create and return a new connection. + + This method supports both single-node and multi-node configurations, with optional + SSL/TLS security and disconnection handling. + + Args: + uri: Single node connection URI + uris: List of URIs for multi-node setup + ssl_context: SSL configuration for secure connections + on_disconnection_handler: Callback for handling disconnection events + + Returns: + Connection: A new connection instance + + Raises: + ValueError: If neither uri nor uris is provided + """ connection = Connection( uri=uri, uris=uris, @@ -39,9 +71,42 @@ def connection( # closes all active connections def close(self) -> None: - logger.debug("Environment: Closing all pending connections") + """ + Close all active connections. + + Iterates through all connections managed by this environment and closes them. + This method should be called when shutting down the application to ensure + proper cleanup of resources. + """ + errors = [] for connection in self._connections: - connection.close() + try: + connection.close() + except Exception as e: + errors.append(f"Exception closing connection: {str(e)}") + logger.error(f"Exception closing connection: {e}") + + if errors: + raise RuntimeError(f"Errors closing connections: {'; '.join(errors)}") def connections(self) -> list[Connection]: + """ + Get the list of active connections. + + Returns: + list[Connection]: List of all active connections managed by this environment + """ return self._connections + + def __enter__(self) -> "Environment": + """Context manager support""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def] + """Close all connections when the context terminate.""" + self.close() + + @property + def active_connections(self) -> int: + """Returns the number of active connections""" + return len(self._connections) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 429f89e..0ea7a83 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -29,12 +29,37 @@ class Management: + """ + Handles RabbitMQ management operations via AMQP 1.0 protocol. + + This class provides methods for declaring and managing exchanges, queues, + and bindings in RabbitMQ. It uses a blocking connection to communicate + with the RabbitMQ management interface. + + Attributes: + _sender (Optional[BlockingSender]): The sender for management commands + _receiver (Optional[BlockingReceiver]): The receiver for management responses + _conn (BlockingConnection): The underlying connection to RabbitMQ + """ + def __init__(self, conn: BlockingConnection): + """ + Initialize a new Management instance. + + Args: + conn: The blocking connection to use for management operations + """ self._sender: Optional[BlockingSender] = None self._receiver: Optional[BlockingReceiver] = None self._conn = conn def open(self) -> None: + """ + Open the management connection by creating sender and receiver. + + Creates sender and receiver if they don't exist, using the management + node address defined in CommonValues. + """ if self._sender is None: logger.debug("Creating Sender") self._sender = self._create_sender( @@ -54,6 +79,11 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: # closes the connection to the AMQP 1.0 server. def close(self) -> None: + """ + Close the management connection. + + Closes both sender and receiver if they exist. + """ logger.debug("Closing Sender and Receiver") if self._sender is not None: self._sender.close() @@ -67,6 +97,21 @@ def request( method: str, expected_response_codes: list[int], ) -> Message: + """ + Send a management request with a new UUID. + + Args: + body: The request body to send + path: The management API path + method: The HTTP method to use + expected_response_codes: List of acceptable response codes + + Returns: + Message: The response message from the server + + Raises: + ValidationCodeException: If response code is not in expected_response_codes + """ return self._request( str(uuid.uuid4()), body, path, method, expected_response_codes ) @@ -104,6 +149,18 @@ def declare_exchange( ExchangeSpecification, ExchangeCustomSpecification ], ) -> Union[ExchangeSpecification, ExchangeCustomSpecification]: + """ + Declare a new exchange in RabbitMQ. + + Args: + exchange_specification: The specification for the exchange to create + + Returns: + The same specification object that was passed in + + Raises: + ValidationCodeException: If exchange already exists or other validation fails + """ logger.debug("declare_exchange operation called") body: dict[str, Any] = {} body["auto_delete"] = exchange_specification.is_auto_delete @@ -138,6 +195,20 @@ def declare_queue( ) -> Union[ ClassicQueueSpecification, QuorumQueueSpecification, StreamSpecification ]: + """ + Declare a new queue in RabbitMQ. + + Supports declaration of classic queues, quorum queues, and streams. + + Args: + queue_specification: The specification for the queue to create + + Returns: + The same specification object that was passed in + + Raises: + ValidationCodeException: If queue already exists or other validation fails + """ logger.debug("declare_queue operation called") if isinstance(queue_specification, ClassicQueueSpecification) or isinstance( @@ -270,6 +341,15 @@ def _declare_stream( return body def delete_exchange(self, name: str) -> None: + """ + Delete an exchange. + + Args: + name: The name of the exchange to delete + + Raises: + ValidationCodeException: If exchange doesn't exist or deletion fails + """ logger.debug("delete_exchange operation called") path = AddressHelper.exchange_address(name) @@ -283,6 +363,15 @@ def delete_exchange(self, name: str) -> None: ) def delete_queue(self, name: str) -> None: + """ + Delete a queue. + + Args: + name: The name of the queue to delete + + Raises: + ValidationCodeException: If queue doesn't exist or deletion fails + """ logger.debug("delete_queue operation called") path = AddressHelper.queue_address(name) @@ -315,6 +404,18 @@ def bind( ExchangeToQueueBindingSpecification, ExchangeToExchangeBindingSpecification ], ) -> str: + """ + Create a binding between exchanges or between an exchange and a queue. + + Args: + bind_specification: The specification for the binding to create + + Returns: + str: The binding path created + + Raises: + ValidationCodeException: If binding creation fails + """ logger.debug("Bind Operation called") body = {} @@ -361,6 +462,15 @@ def unbind( ExchangeToExchangeBindingSpecification, ], ) -> None: + """ + Remove a binding between exchanges or between an exchange and a queue. + + Args: + bind_specification: Either a binding path string or a binding specification + + Raises: + ValidationCodeException: If unbinding fails + """ logger.debug("UnBind Operation called") binding_name = "" if isinstance(bind_specification, str): @@ -384,6 +494,18 @@ def unbind( ) def purge_queue(self, name: str) -> int: + """ + Purge all messages from a queue. + + Args: + name: The name of the queue to purge + + Returns: + int: The number of messages that were purged + + Raises: + ValidationCodeException: If queue doesn't exist or purge fails + """ logger.debug("purge_queue operation called") path = AddressHelper.purge_queue_address(name) @@ -399,6 +521,18 @@ def purge_queue(self, name: str) -> int: return int(response.body["message_count"]) def queue_info(self, name: str) -> QueueInfo: + """ + Get information about a queue. + + Args: + name: The name of the queue to get information about + + Returns: + QueueInfo: Object containing queue information + + Raises: + ValidationCodeException: If queue doesn't exist or other errors occur + """ logger.debug("queue_info operation called") path = AddressHelper.queue_address(name) diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 2d6e3ee..5c057df 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -18,7 +18,28 @@ class Publisher: + """ + A publisher class for sending messages to RabbitMQ via AMQP 1.0 protocol. + + This class handles the publishing of messages to either a predefined address + or to addresses specified in individual messages. It manages a blocking + connection to RabbitMQ and ensures proper message delivery. + + Attributes: + _sender (Optional[BlockingSender]): The sender for publishing messages + _conn (BlockingConnection): The underlying connection to RabbitMQ + _addr (str): The default address to publish to, if specified + """ + def __init__(self, conn: BlockingConnection, addr: str = ""): + """ + Initialize a new Publisher instance. + + Args: + conn: The blocking connection to use for publishing + addr: Optional default address to publish to. If provided, all messages + will be sent to this address unless overridden. + """ self._sender: Optional[BlockingSender] = None self._conn = conn self._addr = addr @@ -30,6 +51,22 @@ def _open(self) -> None: self._sender = self._create_sender(self._addr) def publish(self, message: Message) -> Delivery: + """ + Publish a message to RabbitMQ. + + The message can be sent to either the publisher's default address or + to an address specified in the message itself, but not both. + + Args: + message: The message to publish + + Returns: + Delivery: The delivery confirmation from RabbitMQ + + Raises: + ValidationCodeException: If address is specified in both message and publisher + ArgumentOutOfRangeException: If message address format is invalid + """ if (self._addr != "") and (message.address is not None): raise ValidationCodeException( "address specified in both message and publisher" @@ -44,14 +81,29 @@ def publish(self, message: Message) -> Delivery: raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) - if self._sender is not None: - delivery = self._sender.send(message) + if self.is_open: + delivery = self._sender.send(message) # type: ignore return delivery def close(self) -> None: + """ + Close the publisher connection. + + Closes the sender if it exists and cleans up resources. + """ logger.debug("Closing Sender") - if self._sender is not None: - self._sender.close() + if self.is_open: + self._sender.close() # type: ignore def _create_sender(self, addr: str) -> BlockingSender: return self._conn.create_sender(addr, options=SenderOptionUnseattle(addr)) + + @property + def is_open(self) -> bool: + """Check if publisher is open and ready to send messages.""" + return self._sender is not None + + @property + def address(self) -> str: + """Get the current publisher address.""" + return self._addr diff --git a/tests/test_connection.py b/tests/test_connection.py index 763a01f..2d5957f 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -26,6 +26,12 @@ def test_connection() -> None: environment.close() +def test_environment_context_manager() -> None: + with Environment() as environment: + connection = environment.connection("amqp://guest:guest@localhost:5672/") + connection.dial() + + def test_connection_ssl() -> None: environment = Environment() ca_cert_file = ".ci/certs/ca_certificate.pem" From 10ba5d1596045e2176668b4c7c4825e7227570f2 Mon Sep 17 00:00:00 2001 From: DanielePalaia Date: Mon, 24 Feb 2025 10:24:05 +0100 Subject: [PATCH 2/2] add again declare custom exchange test --- tests/test_connection.py | 8 ++++---- tests/test_management.py | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/tests/test_connection.py b/tests/test_connection.py index 2d5957f..ce6bd1d 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -59,20 +59,20 @@ def test_environment_connections_management() -> None: connection3 = environment.connection("amqp://guest:guest@localhost:5672/") connection3.dial() - assert len(environment.connections()) == 3 + assert environment.active_connections == 3 # this shouldn't happen but we test it anyway connection.close() - assert len(environment.connections()) == 2 + assert environment.active_connections == 2 connection2.close() - assert len(environment.connections()) == 1 + assert environment.active_connections == 1 connection3.close() - assert len(environment.connections()) == 0 + assert environment.active_connections == 0 environment.close() diff --git a/tests/test_management.py b/tests/test_management.py index 7d3338a..1d5e4d1 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -2,6 +2,7 @@ from rabbitmq_amqp_python_client import ( ClassicQueueSpecification, + ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, ExchangeToQueueBindingSpecification, @@ -42,6 +43,26 @@ def test_declare_delete_exchange_headers(management: Management) -> None: management.delete_exchange(exchange_name) +def test_declare_delete_exchange_custom(management: Management) -> None: + + exchange_name = "test-exchange" + + exchange_arguments = {} + exchange_arguments["x-delayed-type"] = "direct" + + exchange_info = management.declare_exchange( + ExchangeCustomSpecification( + name=exchange_name, + exchange_type="x-local-random", + arguments=exchange_arguments, + ) + ) + + assert exchange_info.name == exchange_name + + management.delete_exchange(exchange_name) + + def test_declare_delete_exchange_with_args(management: Management) -> None: exchange_name = "test-exchange-with-args"