Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

This library is in early stages of development. It is meant to be used with RabbitMQ 4.0.

# Table of Contents

- [How to Build the project and run the tests](#How-to-Build-the-project-and-run-the-tests)
- [Installation](#Installation)
- [Getting started](#Getting-Started)
* [Creating a connection](#Creating-a-connection)
* [Managing resources](#Managing-resources)
* [Publishing messages](#Publishing-messages)
* [Consuming messages](#Consuming-messages)
* [Support for streams](#support-for-streams)
* [SSL connection](#ssl-connections)
* [Managing disconnections](#Managing-disconnections)


## How to Build the project and run the tests

- Start a RabbitMQ 4.x broker
Expand All @@ -18,7 +32,7 @@ The client is distributed via [`PIP`](https://pypi.org/project/rabbitmq-amqp-pyt

## Getting Started

An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
An example is provided [`here`](./examples/getting_started/basic_example.py) you can run it after starting a RabbitMQ 4.0 broker with:

poetry run python ./examples/getting_started/main.py

Expand Down Expand Up @@ -109,6 +123,33 @@ Then from connection get a consumer object:

The consumer will run indefinitively waiting for messages to arrive.

### Support for streams

The client natively supports streams: https://www.rabbitmq.com/blog/2021/07/13/rabbitmq-streams-overview

You can consume from a given offset or specify a default starting point (FIRST, NEXT, LAST).

Streams filtering is also supported: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering

You can check the [`stream example`](./examples/getting_started/example_with_streams.py) to see how to work with RabbitMQ streams.

### SSL connections

The client supports TLS/SSL connections.

You can check the [`ssl example`](./examples/getting_started/tls_example.py) to see how to establish a secured connection


### Managing disconnections

At this stage the client doesn't support auto-reconnect but a callback is invoked everytime a remote disconnection is detected.
You can use this callback to implement your own logic and eventually attempt a reconnection.

You can check the [`reconnection example`](./examples/getting_started/reconnection_example.py) to see how to manage disconnections and
eventually attempt a reconnection






27 changes: 9 additions & 18 deletions examples/getting_started/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
AMQPMessagingHandler,
BindingSpecification,
Connection,
Disposition,
Event,
ExchangeSpecification,
Message,
QuorumQueueSpecification,
)

messages_to_publish = 100
MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):
Expand Down Expand Up @@ -45,7 +46,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == messages_to_publish:
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
Expand All @@ -62,17 +63,6 @@ def on_link_closed(self, event: Event) -> None:

def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection
Expand Down Expand Up @@ -120,21 +110,22 @@ def main() -> None:
# management.close()

# publish 10 messages
for i in range(messages_to_publish):
for i in range(MESSAGES_TO_PUBLISH):
print("publishing")
status = publisher.publish(Message(body="test"))
if status.ACCEPTED:
if status.remote_state == Disposition.ACCEPTED:
print("message accepted")
elif status.RELEASED:
elif status.remote_state == Disposition.RELEASED:
print("message not routed")
elif status.REJECTED:
elif status.remote_state == Disposition.REJECTED:
print("message not rejected")

publisher.close()

print(
"create a consumer and consume the test message - press control + c to terminate to consume"
)
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
consumer = connection.consumer(addr_queue, message_handler=MyMessageHandler())

try:
consumer.run()
Expand Down
38 changes: 22 additions & 16 deletions examples/getting_started/example_with_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
StreamSpecification,
)

MESSAGES_TO_PUBLISH = 100


class MyMessageHandler(AMQPMessagingHandler):

Expand All @@ -19,6 +21,7 @@ def __init__(self):
self._count = 0

def on_message(self, event: Event):
# just messages with banana filters get received
print(
"received message from stream: "
+ str(event.message.body)
Expand Down Expand Up @@ -47,7 +50,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == 100:
if self._count == MESSAGES_TO_PUBLISH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
Expand All @@ -64,25 +67,13 @@ def on_link_closed(self, event: Event) -> None:

def create_connection() -> Connection:
connection = Connection("amqp://guest:guest@localhost:5672/")
# in case of SSL enablement
# ca_cert_file = ".ci/certs/ca_certificate.pem"
# client_cert = ".ci/certs/client_certificate.pem"
# client_key = ".ci/certs/client_key.pem"
# connection = Connection(
# "amqps://guest:guest@localhost:5671/",
# ssl_context=SslConfigurationContext(
# ca_cert=ca_cert_file,
# client_cert=ClientCert(client_cert=client_cert, client_key=client_key),
# ),
# )
connection.dial()

return connection


def main() -> None:
queue_name = "example-queue"
messages_to_publish = 100

print("connection to amqp server")
connection = create_connection()
Expand All @@ -99,10 +90,11 @@ def main() -> None:
# can be first, last, next or an offset long
# you can also specify stream filters with methods: apply_filters and filter_match_unfiltered
stream_filter_options.offset(OffsetSpecification.first)
stream_filter_options.filter_values(["banana"])

consumer = consumer_connection.consumer(
addr_queue,
handler=MyMessageHandler(),
message_handler=MyMessageHandler(),
stream_filter_options=stream_filter_options,
)
print(
Expand All @@ -112,8 +104,22 @@ def main() -> None:
# print("create a publisher and publish a test message")
publisher = connection.publisher(addr_queue)

for i in range(messages_to_publish):
publisher.publish(Message(body="test: " + str(i)))
# publish with a filter of apple
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(
Message(
body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"}
)
)

# publish with a filter of banana
for i in range(MESSAGES_TO_PUBLISH):
publisher.publish(
Message(
body="banana: " + str(i),
annotations={"x-stream-filter-value": "banana"},
)
)

publisher.close()

Expand Down
18 changes: 9 additions & 9 deletions examples/getting_started/reconnection_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ConnectionConfiguration:


connection_configuration = ConnectionConfiguration()
messages_to_publish = 50000
MESSAGES_TO_PUBLSH = 50000


# disconnection callback
Expand Down Expand Up @@ -61,7 +61,7 @@ def on_disconnection():
if connection_configuration.consumer is not None:
connection_configuration.consumer = (
connection_configuration.connection.consumer(
addr_queue, handler=MyMessageHandler()
addr_queue, message_handler=MyMessageHandler()
)
)

Expand Down Expand Up @@ -95,7 +95,7 @@ def on_message(self, event: Event):

self._count = self._count + 1

if self._count == messages_to_publish:
if self._count == MESSAGES_TO_PUBLSH:
print("closing receiver")
# if you want you can add cleanup operations here
# event.receiver.close()
Expand All @@ -111,15 +111,15 @@ def on_link_closed(self, event: Event) -> None:


def create_connection() -> Connection:
# for multinode specify a list of urls and fill the field urls of Connection instead of url
# urls = [
# for multinode specify a list of urls and fill the field uris of Connection instead of url
# uris = [
# "amqp://ha_tls-rabbit_node0-1:5682/",
# "amqp://ha_tls-rabbit_node1-1:5692/",
# "amqp://ha_tls-rabbit_node2-1:5602/",
# ]
# connection = Connection(urls=urls, on_disconnection_handler=on_disconnected)
# connection = Connection(uris=uris, on_disconnection_handler=on_disconnected)
connection = Connection(
url="amqp://guest:guest@localhost:5672/",
uri="amqp://guest:guest@localhost:5672/",
on_disconnection_handler=on_disconnection,
)
connection.dial()
Expand Down Expand Up @@ -181,7 +181,7 @@ def main() -> None:

# publishing messages
while True:
for i in range(messages_to_publish):
for i in range(MESSAGES_TO_PUBLSH):

if i % 1000 == 0:
print("published 1000 messages...")
Expand All @@ -207,7 +207,7 @@ def main() -> None:
if connection_configuration.consumer is None:
connection_configuration.consumer = (
connection_configuration.connection.consumer(
addr_queue, handler=MyMessageHandler()
addr_queue, message_handler=MyMessageHandler()
)
)

Expand Down
Loading