diff --git a/README.md b/README.md index 85313c5..eb18249 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,20 @@ This library is in early stages of development. It is meant to be used with RabbitMQ 4.0. +# Table of Contents + +- [How to Build the project and run the tests](#How-to-Build-the-project-and-run-the-tests) +- [Installation](#Installation) +- [Getting started](#Getting-Started) + * [Creating a connection](#Creating-a-connection) + * [Managing resources](#Managing-resources) + * [Publishing messages](#Publishing-messages) + * [Consuming messages](#Consuming-messages) + * [Support for streams](#support-for-streams) + * [SSL connection](#ssl-connections) + * [Managing disconnections](#Managing-disconnections) + + ## How to Build the project and run the tests - Start a RabbitMQ 4.x broker @@ -18,7 +32,7 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt ## Getting Started -An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with: +An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with: poetry run python ./examples/getting_started/main.py @@ -109,6 +123,33 @@ Then from connection get a consumer object: The consumer will run indefinitively waiting for messages to arrive. +### Support for streams + +The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview + +You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST). + +Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering + +You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams. + +### SSL connections + +The client supports TLS/SSL connections. + +You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection + + +### Managing disconnections + +At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected. +You can use this callback to implement your own logic and eventually attempt a reconnection. + +You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and +eventually attempt a reconnection + + + diff --git a/examples/getting_started/basic_example.py b/examples/getting_started/basic_example.py index baa253d..038f934 100644 --- a/examples/getting_started/basic_example.py +++ b/examples/getting_started/basic_example.py @@ -6,13 +6,14 @@ AMQPMessagingHandler, BindingSpecification, Connection, + Disposition, Event, ExchangeSpecification, Message, QuorumQueueSpecification, ) -messages_to_publish = 100 +MESSAGES_TO_PUBLISH = 100 class MyMessageHandler(AMQPMessagingHandler): @@ -45,7 +46,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == messages_to_publish: + if self._count == MESSAGES_TO_PUBLISH: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -62,17 +63,6 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: connection = Connection("amqp://guest:guest@localhost:5672/") - # in case of SSL enablement - # ca_cert_file = ".ci/certs/ca_certificate.pem" - # client_cert = ".ci/certs/client_certificate.pem" - # client_key = ".ci/certs/client_key.pem" - # connection = Connection( - # "amqps://guest:guest@localhost:5671/", - # ssl_context=SslConfigurationContext( - # ca_cert=ca_cert_file, - # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), - # ), - # ) connection.dial() return connection @@ -120,13 +110,14 @@ def main() -> None: # management.close() # publish 10 messages - for i in range(messages_to_publish): + for i in range(MESSAGES_TO_PUBLISH): + print("publishing") status = publisher.publish(Message(body="test")) - if status.ACCEPTED: + if status.remote_state == Disposition.ACCEPTED: print("message accepted") - elif status.RELEASED: + elif status.remote_state == Disposition.RELEASED: print("message not routed") - elif status.REJECTED: + elif status.remote_state == Disposition.REJECTED: print("message not rejected") publisher.close() @@ -134,7 +125,7 @@ def main() -> None: print( "create a consumer and consume the test message - press control + c to terminate to consume" ) - consumer = connection.consumer(addr_queue, handler=MyMessageHandler()) + consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler()) try: consumer.run() diff --git a/examples/getting_started/example_with_streams.py b/examples/getting_started/example_with_streams.py index 0ff9df0..4448ec8 100644 --- a/examples/getting_started/example_with_streams.py +++ b/examples/getting_started/example_with_streams.py @@ -11,6 +11,8 @@ StreamSpecification, ) +MESSAGES_TO_PUBLISH = 100 + class MyMessageHandler(AMQPMessagingHandler): @@ -19,6 +21,7 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): + # just messages with banana filters get received print( "received message from stream: " + str(event.message.body) @@ -47,7 +50,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == 100: + if self._count == MESSAGES_TO_PUBLISH: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -64,17 +67,6 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: connection = Connection("amqp://guest:guest@localhost:5672/") - # in case of SSL enablement - # ca_cert_file = ".ci/certs/ca_certificate.pem" - # client_cert = ".ci/certs/client_certificate.pem" - # client_key = ".ci/certs/client_key.pem" - # connection = Connection( - # "amqps://guest:guest@localhost:5671/", - # ssl_context=SslConfigurationContext( - # ca_cert=ca_cert_file, - # client_cert=ClientCert(client_cert=client_cert, client_key=client_key), - # ), - # ) connection.dial() return connection @@ -82,7 +74,6 @@ def create_connection() -> Connection: def main() -> None: queue_name = "example-queue" - messages_to_publish = 100 print("connection to amqp server") connection = create_connection() @@ -99,10 +90,11 @@ def main() -> None: # can be first, last, next or an offset long # you can also specify stream filters with methods: apply_filters and filter_match_unfiltered stream_filter_options.offset(OffsetSpecification.first) + stream_filter_options.filter_values(["banana"]) consumer = consumer_connection.consumer( addr_queue, - handler=MyMessageHandler(), + message_handler=MyMessageHandler(), stream_filter_options=stream_filter_options, ) print( @@ -112,8 +104,22 @@ def main() -> None: # print("create a publisher and publish a test message") publisher = connection.publisher(addr_queue) - for i in range(messages_to_publish): - publisher.publish(Message(body="test: " + str(i))) + # publish with a filter of apple + for i in range(MESSAGES_TO_PUBLISH): + publisher.publish( + Message( + body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"} + ) + ) + + # publish with a filter of banana + for i in range(MESSAGES_TO_PUBLISH): + publisher.publish( + Message( + body="banana: " + str(i), + annotations={"x-stream-filter-value": "banana"}, + ) + ) publisher.close() diff --git a/examples/getting_started/reconnection_example.py b/examples/getting_started/reconnection_example.py index 0003eb5..9396d88 100644 --- a/examples/getting_started/reconnection_example.py +++ b/examples/getting_started/reconnection_example.py @@ -31,7 +31,7 @@ class ConnectionConfiguration: connection_configuration = ConnectionConfiguration() -messages_to_publish = 50000 +MESSAGES_TO_PUBLSH = 50000 # disconnection callback @@ -61,7 +61,7 @@ def on_disconnection(): if connection_configuration.consumer is not None: connection_configuration.consumer = ( connection_configuration.connection.consumer( - addr_queue, handler=MyMessageHandler() + addr_queue, message_handler=MyMessageHandler() ) ) @@ -95,7 +95,7 @@ def on_message(self, event: Event): self._count = self._count + 1 - if self._count == messages_to_publish: + if self._count == MESSAGES_TO_PUBLSH: print("closing receiver") # if you want you can add cleanup operations here # event.receiver.close() @@ -111,15 +111,15 @@ def on_link_closed(self, event: Event) -> None: def create_connection() -> Connection: - # for multinode specify a list of urls and fill the field urls of Connection instead of url - # urls = [ + # for multinode specify a list of urls and fill the field uris of Connection instead of url + # uris = [ # "amqp://ha_tls-rabbit_node0-1:5682/", # "amqp://ha_tls-rabbit_node1-1:5692/", # "amqp://ha_tls-rabbit_node2-1:5602/", # ] - # connection = Connection(urls=urls, on_disconnection_handler=on_disconnected) + # connection = Connection(uris=uris, on_disconnection_handler=on_disconnected) connection = Connection( - url="amqp://guest:guest@localhost:5672/", + uri="amqp://guest:guest@localhost:5672/", on_disconnection_handler=on_disconnection, ) connection.dial() @@ -181,7 +181,7 @@ def main() -> None: # publishing messages while True: - for i in range(messages_to_publish): + for i in range(MESSAGES_TO_PUBLSH): if i % 1000 == 0: print("published 1000 messages...") @@ -207,7 +207,7 @@ def main() -> None: if connection_configuration.consumer is None: connection_configuration.consumer = ( connection_configuration.connection.consumer( - addr_queue, handler=MyMessageHandler() + addr_queue, message_handler=MyMessageHandler() ) ) diff --git a/examples/getting_started/tls_example.py b/examples/getting_started/tls_example.py new file mode 100644 index 0000000..f358d64 --- /dev/null +++ b/examples/getting_started/tls_example.py @@ -0,0 +1,168 @@ +# type: ignore + + +from rabbitmq_amqp_python_client import ( # SSlConfigurationContext,; SslConfigurationContext,; ClientCert, + AddressHelper, + AMQPMessagingHandler, + BindingSpecification, + ClientCert, + Connection, + Event, + ExchangeSpecification, + Message, + QuorumQueueSpecification, + SslConfigurationContext, +) + +messages_to_publish = 100 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_message(self, event: Event): + print("received message: " + str(event.message.body)) + + # accepting + self.delivery_context.accept(event) + + # in case of rejection (+eventually deadlettering) + # self.delivery_context.discard(event) + + # in case of requeuing + # self.delivery_context.requeue(event) + + # annotations = {} + # annotations[symbol('x-opt-string')] = 'x-test1' + # in case of requeuing with annotations added + # self.delivery_context.requeue_with_annotations(event, annotations) + + # in case of rejection with annotations added + # self.delivery_context.discard_with_annotations(event) + + print("count " + str(self._count)) + + self._count = self._count + 1 + + if self._count == messages_to_publish: + print("closing receiver") + # if you want you can add cleanup operations here + # event.receiver.close() + # event.connection.close() + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection() -> Connection: + # in case of SSL enablement + ca_cert_file = ".ci/certs/ca_certificate.pem" + client_cert = ".ci/certs/client_certificate.pem" + client_key = ".ci/certs/client_key.pem" + connection = Connection( + "amqps://guest:guest@localhost:5671/", + ssl_context=SslConfigurationContext( + ca_cert=ca_cert_file, + client_cert=ClientCert(client_cert=client_cert, client_key=client_key), + ), + ) + connection.dial() + + return connection + + +def main() -> None: + + exchange_name = "test-exchange" + queue_name = "example-queue" + routing_key = "routing-key" + + print("connection to amqp server") + connection = create_connection() + + management = connection.management() + + print("declaring exchange and queue") + management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={})) + + management.declare_queue( + QuorumQueueSpecification(name=queue_name) + # QuorumQueueSpecification(name=queue_name, dead_letter_exchange="dead-letter") + ) + + print("binding queue to exchange") + bind_name = management.bind( + BindingSpecification( + source_exchange=exchange_name, + destination_queue=queue_name, + binding_key=routing_key, + ) + ) + + addr = AddressHelper.exchange_address(exchange_name, routing_key) + + addr_queue = AddressHelper.queue_address(queue_name) + + print("create a publisher and publish a test message") + publisher = connection.publisher(addr) + + print("purging the queue") + messages_purged = management.purge_queue(queue_name) + + print("messages purged: " + str(messages_purged)) + # management.close() + + # publish 10 messages + for i in range(messages_to_publish): + status = publisher.publish(Message(body="test")) + if status.ACCEPTED: + print("message accepted") + elif status.RELEASED: + print("message not routed") + elif status.REJECTED: + print("message not rejected") + + publisher.close() + + print( + "create a consumer and consume the test message - press control + c to terminate to consume" + ) + consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler()) + + try: + consumer.run() + except KeyboardInterrupt: + pass + + print("cleanup") + consumer.close() + # once we finish consuming if we close the connection we need to create a new one + # connection = create_connection() + # management = connection.management() + + print("unbind") + management.unbind(bind_name) + + print("delete queue") + management.delete_queue(queue_name) + + print("delete exchange") + management.delete_exchange(exchange_name) + + print("closing connections") + management.close() + print("after management closing") + connection.close() + print("after connection closing") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 34a3e5b..4d743ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rabbitmq-amqp-python-client" -version = "0.1.0-alpha.1" +version = "0.1.0-alpha.2" description = "Python RabbitMQ client for AMQP 1.0 protocol" authors = ["RabbitMQ team"] license = "Apache-2.0 license" diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 1a3e997..3e8f86c 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -15,7 +15,7 @@ from .management import Management from .publisher import Publisher from .qpid.proton._data import symbol # noqa: E402 -from .qpid.proton._delivery import Delivery +from .qpid.proton._delivery import Delivery, Disposition from .qpid.proton._events import Event from .qpid.proton._message import Message from .qpid.proton._utils import ConnectionClosed @@ -65,4 +65,5 @@ "ConnectionClosed", "StreamOptions", "OffsetSpecification", + "Disposition", ] diff --git a/rabbitmq_amqp_python_client/address_helper.py b/rabbitmq_amqp_python_client/address_helper.py index 1721f19..c2bf766 100644 --- a/rabbitmq_amqp_python_client/address_helper.py +++ b/rabbitmq_amqp_python_client/address_helper.py @@ -38,14 +38,14 @@ def exchange_address(exchange_name: str, routing_key: str = "") -> str: return path @staticmethod - def queue_address(queue_name: str) -> str: - path = "/queues/" + encode_path_segment(queue_name) + def queue_address(name: str) -> str: + path = "/queues/" + encode_path_segment(name) return path @staticmethod - def purge_queue_address(queue_name: str) -> str: - path = "/queues/" + encode_path_segment(queue_name) + "/messages" + def purge_queue_address(name: str) -> str: + path = "/queues/" + encode_path_segment(name) + "/messages" return path diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index c8480b7..c2fb7de 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -22,16 +22,16 @@ class Connection: def __init__( self, # single-node mode - url: Optional[str] = None, + uri: Optional[str] = None, # multi-node mode - urls: Optional[list[str]] = None, + uris: Optional[list[str]] = None, ssl_context: Optional[SslConfigurationContext] = None, on_disconnection_handler: Optional[CB] = None, # type: ignore ): - if url is None and urls is None: + if uri is None and uris is None: raise ValueError("You need to specify at least an addr or a list of addr") - self._addr: Optional[str] = url - self._addrs: Optional[list[str]] = urls + self._addr: Optional[str] = uri + self._addrs: Optional[list[str]] = uris self._conn: BlockingConnection self._management: Management self._on_disconnection_handler = on_disconnection_handler @@ -87,12 +87,15 @@ def publisher(self, destination: str) -> Publisher: def consumer( self, destination: str, - handler: Optional[MessagingHandler] = None, + message_handler: Optional[MessagingHandler] = None, stream_filter_options: Optional[StreamOptions] = None, + credit: Optional[int] = None, ) -> Consumer: if validate_address(destination) is False: raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) - consumer = Consumer(self._conn, destination, handler, stream_filter_options) + consumer = Consumer( + self._conn, destination, message_handler, stream_filter_options, credit + ) return consumer diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 729f247..87a6996 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -23,12 +23,14 @@ def __init__( addr: str, handler: Optional[MessagingHandler] = None, stream_options: Optional[StreamOptions] = None, + credit: Optional[int] = None, ): self._receiver: Optional[BlockingReceiver] = None self._conn = conn self._addr = addr self._handler = handler self._stream_options = stream_options + self._credit = credit self._open() def _open(self) -> None: @@ -70,4 +72,7 @@ def _create_receiver(self, addr: str) -> BlockingReceiver: handler=self._handler, ) + if self._credit is not None: + receiver.credit = self._credit + return receiver diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 20b74ad..da85283 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -62,7 +62,7 @@ def offset(self, offset_spefication: Union[OffsetSpecification, int]) -> None: symbol(STREAM_OFFSET_SPEC), offset_spefication.name ) - def apply_filters(self, filters: list[str]) -> None: + def filter_values(self, filters: list[str]) -> None: self._filter_set[symbol(STREAM_FILTER_SPEC)] = Described( symbol(STREAM_FILTER_SPEC), filters ) diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 8259688..fce5123 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -247,9 +247,9 @@ def _declare_stream( return body - def delete_exchange(self, exchange_name: str) -> None: + def delete_exchange(self, name: str) -> None: logger.debug("delete_exchange operation called") - path = AddressHelper.exchange_address(exchange_name) + path = AddressHelper.exchange_address(name) self.request( None, @@ -260,9 +260,9 @@ def delete_exchange(self, exchange_name: str) -> None: ], ) - def delete_queue(self, queue_name: str) -> None: + def delete_queue(self, name: str) -> None: logger.debug("delete_queue operation called") - path = AddressHelper.queue_address(queue_name) + path = AddressHelper.queue_address(name) self.request( None, @@ -323,9 +323,9 @@ def unbind(self, binding_exchange_queue_path: str) -> None: ], ) - def purge_queue(self, queue_name: str) -> int: + def purge_queue(self, name: str) -> int: logger.debug("purge_queue operation called") - path = AddressHelper.purge_queue_address(queue_name) + path = AddressHelper.purge_queue_address(name) response = self.request( None, @@ -338,9 +338,9 @@ def purge_queue(self, queue_name: str) -> int: return int(response.body["message_count"]) - def queue_info(self, queue_name: str) -> QueueInfo: + def queue_info(self, name: str) -> QueueInfo: logger.debug("queue_info operation called") - path = AddressHelper.queue_address(queue_name) + path = AddressHelper.queue_address(name) message = self.request( None, diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 683c437..05ef5f1 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -88,7 +88,7 @@ def test_consumer_async_queue_accept(connection: Connection) -> None: # we closed the connection so we need to open a new one connection_consumer = create_connection() consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerAccept() + addr_queue, message_handler=MyMessageHandlerAccept() ) try: @@ -125,7 +125,9 @@ def test_consumer_async_queue_no_ack(connection: Connection) -> None: # we closed the connection so we need to open a new one connection_consumer = create_connection() - consumer = connection_consumer.consumer(addr_queue, handler=MyMessageHandlerNoack()) + consumer = connection_consumer.consumer( + addr_queue, message_handler=MyMessageHandlerNoack() + ) try: consumer.run() @@ -172,7 +174,7 @@ def test_consumer_async_queue_with_discard(connection: Connection) -> None: connection_consumer = create_connection() consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerDiscard() + addr_queue, message_handler=MyMessageHandlerDiscard() ) try: @@ -228,7 +230,7 @@ def test_consumer_async_queue_with_discard_with_annotations( connection_consumer = create_connection() consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerDiscardWithAnnotations() + addr_queue, message_handler=MyMessageHandlerDiscardWithAnnotations() ) try: @@ -278,7 +280,7 @@ def test_consumer_async_queue_with_requeue(connection: Connection) -> None: connection_consumer = create_connection() consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerRequeue() + addr_queue, message_handler=MyMessageHandlerRequeue() ) try: @@ -316,7 +318,7 @@ def test_consumer_async_queue_with_requeue_with_annotations( connection_consumer = create_connection() consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerRequeueWithAnnotations() + addr_queue, message_handler=MyMessageHandlerRequeueWithAnnotations() ) try: @@ -363,7 +365,7 @@ def test_consumer_async_queue_with_requeue_with_invalid_annotations( try: consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerRequeueWithInvalidAnnotations() + addr_queue, message_handler=MyMessageHandlerRequeueWithInvalidAnnotations() ) consumer.run() diff --git a/tests/test_management.py b/tests/test_management.py index 6c5de51..f25b5f7 100644 --- a/tests/test_management.py +++ b/tests/test_management.py @@ -82,7 +82,7 @@ def test_queue_info_with_validations(management: Management) -> None: ) management.declare_queue(queue_specification) - queue_info = management.queue_info(queue_name=queue_name) + queue_info = management.queue_info(name=queue_name) management.delete_queue(queue_name) @@ -101,7 +101,7 @@ def test_queue_info_for_stream_with_validations(management: Management) -> None: ) management.declare_queue(queue_specification) - stream_info = management.queue_info(queue_name=stream_name) + stream_info = management.queue_info(name=stream_name) management.delete_queue(stream_name) diff --git a/tests/test_streams.py b/tests/test_streams.py index 7e7e557..af9a714 100644 --- a/tests/test_streams.py +++ b/tests/test_streams.py @@ -31,7 +31,7 @@ def test_stream_read_from_last_default(connection: Connection) -> None: try: connection_consumer = create_connection() consumer = connection_consumer.consumer( - addr_queue, handler=MyMessageHandlerAcceptStreamOffset() + addr_queue, message_handler=MyMessageHandlerAcceptStreamOffset() ) publish_messages(connection, messages_to_send, stream_name) consumer.run() @@ -66,7 +66,7 @@ def test_stream_read_from_last(connection: Connection) -> None: connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, - handler=MyMessageHandlerAcceptStreamOffset(), + message_handler=MyMessageHandlerAcceptStreamOffset(), stream_filter_options=stream_filter_options, ) publish_messages(connection, messages_to_send, stream_name) @@ -104,7 +104,7 @@ def test_stream_read_from_offset_zero(connection: Connection) -> None: connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, - handler=MyMessageHandlerAcceptStreamOffset(0), + message_handler=MyMessageHandlerAcceptStreamOffset(0), stream_filter_options=stream_filter_options, ) @@ -142,7 +142,7 @@ def test_stream_read_from_offset_first(connection: Connection) -> None: connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, - handler=MyMessageHandlerAcceptStreamOffset(0), + message_handler=MyMessageHandlerAcceptStreamOffset(0), stream_filter_options=stream_filter_options, ) @@ -180,7 +180,7 @@ def test_stream_read_from_offset_ten(connection: Connection) -> None: connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, - handler=MyMessageHandlerAcceptStreamOffset(10), + message_handler=MyMessageHandlerAcceptStreamOffset(10), stream_filter_options=stream_filter_options, ) @@ -212,11 +212,11 @@ def test_stream_filtering(connection: Connection) -> None: # consume and then publish try: stream_filter_options = StreamOptions() - stream_filter_options.apply_filters(["banana"]) + stream_filter_options.filter_values(["banana"]) connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, - handler=MyMessageHandlerAcceptStreamOffset(), + message_handler=MyMessageHandlerAcceptStreamOffset(), stream_filter_options=stream_filter_options, ) # send with annotations filter banana @@ -247,7 +247,7 @@ def test_stream_filtering_not_present(connection: Connection) -> None: # consume and then publish stream_filter_options = StreamOptions() - stream_filter_options.apply_filters(["apple"]) + stream_filter_options.filter_values(["apple"]) connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, stream_filter_options=stream_filter_options @@ -285,12 +285,12 @@ def test_stream_match_unfiltered(connection: Connection) -> None: # consume and then publish try: stream_filter_options = StreamOptions() - stream_filter_options.apply_filters(["banana"]) + stream_filter_options.filter_values(["banana"]) stream_filter_options.filter_match_unfiltered(True) connection_consumer = create_connection() consumer = connection_consumer.consumer( addr_queue, - handler=MyMessageHandlerAcceptStreamOffset(), + message_handler=MyMessageHandlerAcceptStreamOffset(), stream_filter_options=stream_filter_options, ) # send with annotations filter banana