diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 751fd82..34bfb40 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -123,7 +123,7 @@ def create_connection() -> Connection: # connection = Connection(uris=uris, on_disconnection_handler=on_disconnected) connection = environment.connection( - url="amqp://guest:guest@localhost:5672/", + uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection, ) connection.dial() diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index a7e2df1..6bf5811 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -12,7 +12,10 @@ StreamOptions, ) from .environment import Environment -from .exceptions import ArgumentOutOfRangeException +from .exceptions import ( + ArgumentOutOfRangeException, + ValidationCodeException, +) from .management import Management from .publisher import Publisher from .qpid.proton._data import symbol # noqa: E402 @@ -62,6 +65,7 @@ "AddressHelper", "AMQPMessagingHandler", "ArgumentOutOfRangeException", + "ValidationCodeException", "SslConfigurationContext", "ClientCert", "ConnectionClosed", diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index c2bf766..4e05470 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -1,4 +1,5 @@ from .entities import BindingSpecification +from .qpid.proton._message import Message def _is_unreserved(char: str) -> bool: @@ -73,6 +74,11 @@ def binding_path_with_exchange_queue( ) return binding_path_wth_exchange_queue_key + @staticmethod + def message_to_address_helper(message: Message, address: str) -> Message: + message.address = address + return message + def validate_address(address: str) -> bool: if address.startswith("/queues") or address.startswith("/exchanges"): diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index 468d79a..ac08c0f 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -82,11 +82,12 @@ def close(self) -> None: self._conn.close() self._connections.remove(self) - def publisher(self, destination: str) -> Publisher: - if validate_address(destination) is False: - raise ArgumentOutOfRangeException( - "destination address must start with /queues or /exchanges" - ) + def publisher(self, destination: str = "") -> Publisher: + if destination != "": + if validate_address(destination) is False: + raise ArgumentOutOfRangeException( + "destination address must start with /queues or /exchanges" + ) publisher = Publisher(self._conn, destination) return publisher diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 87a6996..1805d30 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -1,5 +1,5 @@ import logging -from typing import Literal, Optional, Union +from typing import Literal, Optional, Union, cast from .entities import StreamOptions from .options import ( @@ -40,7 +40,8 @@ def _open(self) -> None: def consume(self, timeout: Union[None, Literal[False], float] = False) -> Message: if self._receiver is not None: - return self._receiver.receive(timeout=timeout) + message = self._receiver.receive(timeout=timeout) + return cast(Message, message) def close(self) -> None: logger.debug("Closing the receiver") diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index 81e3cad..2d6e3ee 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -1,6 +1,11 @@ import logging from typing import Optional +from .address_helper import validate_address +from .exceptions import ( + ArgumentOutOfRangeException, + ValidationCodeException, +) from .options import SenderOptionUnseattle from .qpid.proton._delivery import Delivery from .qpid.proton._message import Message @@ -13,7 +18,7 @@ class Publisher: - def __init__(self, conn: BlockingConnection, addr: str): + def __init__(self, conn: BlockingConnection, addr: str = ""): self._sender: Optional[BlockingSender] = None self._conn = conn self._addr = addr @@ -25,8 +30,23 @@ def _open(self) -> None: self._sender = self._create_sender(self._addr) def publish(self, message: Message) -> Delivery: - if self._sender is not None: - return self._sender.send(message) + if (self._addr != "") and (message.address is not None): + raise ValidationCodeException( + "address specified in both message and publisher" + ) + + if self._addr != "": + if self._sender is not None: + return self._sender.send(message) + else: + if message.address != "": + if validate_address(message.address) is False: + raise ArgumentOutOfRangeException( + "destination address must start with /queues or /exchanges" + ) + if self._sender is not None: + delivery = self._sender.send(message) + return delivery def close(self) -> None: logger.debug("Closing Sender") diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 156687e..9c0b655 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -140,7 +140,6 @@ def _check(self, err: int) -> int: def _check_property_keys(self) -> None: """ AMQP allows only string keys for properties. This function checks that this requirement is met - and raises a MessageException if not. However, in certain cases, conversions to string are automatically performed: 1. When a key is a user-defined (non-AMQP) subclass of str. diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 6b8e512..83e364c 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -3,17 +3,18 @@ from rabbitmq_amqp_python_client import ( AddressHelper, ArgumentOutOfRangeException, - BindingSpecification, Connection, ConnectionClosed, Environment, - ExchangeSpecification, Message, + OutcomeState, QuorumQueueSpecification, StreamSpecification, + ValidationCodeException, ) from .http_requests import delete_all_connections +from .utils import create_binding, publish_per_message def test_publish_queue(connection: Connection) -> None: @@ -29,20 +30,68 @@ def test_publish_queue(connection: Connection) -> None: accepted = False try: - publisher = connection.publisher("/queues/" + queue_name) + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) status = publisher.publish(Message(body="test")) - if status.ACCEPTED: + if status.remote_state == OutcomeState.ACCEPTED: + accepted = True + except Exception: + raised = True + + if publisher is not None: + publisher.close() + + management.delete_queue(queue_name) + management.close() + + assert accepted is True + assert raised is False + + +def test_publish_per_message(connection: Connection) -> None: + + queue_name = "test-queue-1" + queue_name_2 = "test-queue-2" + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + management.declare_queue(QuorumQueueSpecification(name=queue_name_2)) + + raised = False + + publisher = None + accepted = False + accepted_2 = True + + try: + publisher = connection.publisher() + status = publish_per_message( + publisher, addr=AddressHelper.queue_address(queue_name) + ) + if status.remote_state == OutcomeState.ACCEPTED: accepted = True + status = publish_per_message( + publisher, addr=AddressHelper.queue_address(queue_name_2) + ) + if status.remote_state == OutcomeState.ACCEPTED: + accepted_2 = True except Exception: raised = True if publisher is not None: publisher.close() + purged_messages_queue_1 = management.purge_queue(queue_name) + purged_messages_queue_2 = management.purge_queue(queue_name_2) management.delete_queue(queue_name) + management.delete_queue(queue_name_2) management.close() assert accepted is True + assert accepted_2 is True + assert purged_messages_queue_1 == 1 + assert purged_messages_queue_2 == 1 assert raised is False @@ -56,7 +105,9 @@ def test_publish_ssl(connection_ssl: Connection) -> None: raised = False try: - publisher = connection_ssl.publisher("/queues/" + queue_name) + publisher = connection_ssl.publisher( + destination=AddressHelper.queue_address(queue_name) + ) publisher.publish(Message(body="test")) except Exception: raised = True @@ -90,6 +141,60 @@ def test_publish_to_invalid_destination(connection: Connection) -> None: assert raised is True +def test_publish_per_message_to_invalid_destination(connection: Connection) -> None: + + queue_name = "test-queue-1" + raised = False + + message = Message(body="test") + message = AddressHelper.message_to_address_helper( + message, "/invalid_destination/" + queue_name + ) + publisher = connection.publisher() + + try: + publisher.publish(message) + except ArgumentOutOfRangeException: + raised = True + except Exception: + raised = False + + if publisher is not None: + publisher.close() + + assert raised is True + + +def test_publish_per_message_both_address(connection: Connection) -> None: + + queue_name = "test-queue-1" + raised = False + + management = connection.management() + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) + + try: + message = Message(body="test") + message = AddressHelper.message_to_address_helper( + message, AddressHelper.queue_address(queue_name) + ) + publisher.publish(message) + except ValidationCodeException: + raised = True + + if publisher is not None: + publisher.close() + + management.delete_queue(queue_name) + management.close() + + assert raised is True + + def test_publish_exchange(connection: Connection) -> None: exchange_name = "test-exchange" @@ -97,17 +202,7 @@ def test_publish_exchange(connection: Connection) -> None: management = connection.management() routing_key = "routing-key" - management.declare_exchange(ExchangeSpecification(name=exchange_name)) - - management.declare_queue(QuorumQueueSpecification(name=queue_name)) - - management.bind( - BindingSpecification( - source_exchange=exchange_name, - destination_queue=queue_name, - binding_key=routing_key, - ) - ) + bind_name = create_binding(management, exchange_name, queue_name, routing_key) addr = AddressHelper.exchange_address(exchange_name, routing_key) @@ -124,6 +219,7 @@ def test_publish_exchange(connection: Connection) -> None: publisher.close() + management.unbind(bind_name) management.delete_exchange(exchange_name) management.delete_queue(queue_name) management.close() @@ -143,7 +239,9 @@ def test_publish_purge(connection: Connection) -> None: raised = False try: - publisher = connection.publisher("/queues/" + queue_name) + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) for i in range(messages_to_publish): publisher.publish(Message(body="test")) except Exception: @@ -183,7 +281,9 @@ def on_disconnected(): connection_test.dial() if publisher is not None: - publisher = connection_test.publisher("/queues/" + queue_name) + publisher = connection_test.publisher( + destination=AddressHelper.queue_address(queue_name) + ) nonlocal reconnected reconnected = True @@ -202,7 +302,9 @@ def on_disconnected(): management.close() - publisher = connection_test.publisher("/queues/" + queue_name) + publisher = connection_test.publisher( + destination=AddressHelper.queue_address(queue_name) + ) while True: for i in range(messages_to_publish): @@ -258,10 +360,58 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: print("before creating publisher") - publisher = connection.publisher("/queues/" + stream_name) + publisher = connection.publisher( + destination=AddressHelper.queue_address(stream_name) + ) print("after creating publisher") for i in range(messages_to_send): publisher.publish(Message(body="test")) + + +def test_publish_per_message_exchange(connection: Connection) -> None: + + exchange_name = "test-exchange-per-message" + queue_name = "test-queue-per-message" + management = connection.management() + routing_key = "routing-key-per-message" + + bind_name = create_binding(management, exchange_name, queue_name, routing_key) + + raised = False + + publisher = None + accepted = False + accepted_2 = False + + try: + publisher = connection.publisher() + status = publish_per_message( + publisher, addr=AddressHelper.exchange_address(exchange_name, routing_key) + ) + if status.remote_state == OutcomeState.ACCEPTED: + accepted = True + status = publish_per_message( + publisher, addr=AddressHelper.queue_address(queue_name) + ) + if status.remote_state == OutcomeState.ACCEPTED: + accepted_2 = True + except Exception: + raised = True + + # if publisher is not None: + publisher.close() + + purged_messages_queue = management.purge_queue(queue_name) + management.unbind(bind_name) + management.delete_exchange(exchange_name) + management.delete_queue(queue_name) + + management.close() + + assert accepted is True + assert accepted_2 is True + assert purged_messages_queue == 2 + assert raised is False diff --git a/tests/utils.py b/tests/utils.py index fc6db91..a70ece0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,12 +1,15 @@ from typing import Optional from rabbitmq_amqp_python_client import ( + AddressHelper, BindingSpecification, Connection, + Delivery, ExchangeSpecification, ExchangeType, Management, Message, + Publisher, QuorumQueueSpecification, ) @@ -29,6 +32,13 @@ def publish_messages( publisher.close() +def publish_per_message(publisher: Publisher, addr: str) -> Delivery: + message = Message(body="test") + message = AddressHelper.message_to_address_helper(message, addr) + status = publisher.publish(message) + return status + + def setup_dead_lettering(management: Management) -> str: exchange_dead_lettering = "exchange-dead-letter" @@ -55,6 +65,25 @@ def setup_dead_lettering(management: Management) -> str: return bind_path +def create_binding( + management: Management, exchange_name: str, queue_name: str, routing_key: str +) -> str: + + management.declare_exchange(ExchangeSpecification(name=exchange_name)) + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + bind_name = management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + binding_key=routing_key, + ) + ) + + return bind_name + + def cleanup_dead_lettering(management: Management, bind_path: str) -> None: exchange_dead_lettering = "exchange-dead-letter"