diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 6eb91c8..5e5944d 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -7,7 +7,7 @@ set -o xtrace script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" readonly script_dir echo "[INFO] script_dir: '$script_dir'" -readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine +readonly rabbitmq_image=rabbitmq:4.1.0-management readonly docker_name_prefix='rabbitmq-amqp-python-client' diff --git a/.gitignore b/.gitignore index bbc20c7..d884fbe 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ __pycache__/ local* .githooks/ .venv/ +.ci/ubuntu/log/* diff --git a/Makefile b/Makefile index 616547b..05988c1 100644 --- a/Makefile +++ b/Makefile @@ -7,5 +7,13 @@ rabbitmq-server: rabbitmq-server-stop: ./.ci/ubuntu/gha-setup.sh stop +format: + poetry run isort --skip rabbitmq_amqp_python_client/qpid . + poetry run black rabbitmq_amqp_python_client/ + poetry run black tests/ + poetry run flake8 --exclude=venv,local_tests,docs/examples,rabbitmq_amqp_python_client/qpid --max-line-length=120 --ignore=E203,W503 + +test: format + poetry run pytest . help: cat Makefile diff --git a/README.md b/README.md index ca395d6..a41bac0 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ +from rabbitmq_amqp_python_client import Converter + # RabbitMQ AMQP 1.0 Python Client This library is in early stages of development. It is meant to be used with RabbitMQ 4.0. @@ -83,7 +85,7 @@ For example: # publish messages for i in range(messages_to_publish): - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) publisher.close() ``` @@ -149,7 +151,7 @@ You can check the [`ssl example`](./examples/tls/tls_example.py) to see how to e The client supports oauth2 authentication. -You can check the [`oauth2 example`](./examples/oauth/oaut.py) to see how to establish and refresh a connection using an oauth2 token +You can check the [`oauth2 example`](examples/oauth/oAuth2.py) to see how to establish and refresh a connection using an oauth2 token ### Managing disconnections diff --git a/examples/README.md b/examples/README.md index 123539c..c9442f2 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,4 +4,4 @@ Client examples - [Reconnection](./reconnection/reconnection_example.py) - Producer and Consumer example with reconnection - [TLS](./tls/tls_example.py) - Producer and Consumer using a TLS connection - [Streams](./streams/example_with_streams.py) - Example supporting stream capabilities - - [Oauth](./oauth/oauth.py) - Connection through Oauth token \ No newline at end of file + - [Oauth](./oauth/oAuth2.py) - Connection through Oauth token \ No newline at end of file diff --git a/examples/getting_started/getting_started.py b/examples/getting_started/getting_started.py index bbd6f60..6e5e018 100644 --- a/examples/getting_started/getting_started.py +++ b/examples/getting_started/getting_started.py @@ -5,6 +5,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, Environment, Event, ExchangeSpecification, @@ -24,7 +25,11 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: " + str(event.message.body)) + print( + "received message: {} ".format( + Converter.bytes_to_string(event.message.body) + ) + ) # accepting self.delivery_context.accept(event) @@ -43,13 +48,11 @@ def on_amqp_message(self, event: Event): # in case of rejection with annotations added # self.delivery_context.discard_with_annotations(event) - print("count " + str(self._count)) - self._count = self._count + 1 + print("count " + str(self._count)) if self._count == MESSAGES_TO_PUBLISH: - print("closing receiver") - # if you want you can add cleanup operations here + print("received all messages") def on_connection_closed(self, event: Event): # if you want you can add cleanup operations here @@ -79,7 +82,6 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -122,8 +124,9 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): - print("publishing") - status = publisher.publish(Message(body="test")) + status = publisher.publish( + Message(body=Converter.string_to_bytes("test message {} ".format(i))) + ) if status.remote_state == OutcomeState.ACCEPTED: print("message accepted") elif status.remote_state == OutcomeState.RELEASED: diff --git a/examples/oauth/oaut.py b/examples/oauth/oAuth2.py similarity index 92% rename from examples/oauth/oaut.py rename to examples/oauth/oAuth2.py index 56f16e8..43d62b1 100644 --- a/examples/oauth/oaut.py +++ b/examples/oauth/oAuth2.py @@ -10,6 +10,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, Environment, Event, ExchangeSpecification, @@ -30,7 +31,7 @@ def __init__(self): self._count = 0 def on_amqp_message(self, event: Event): - print("received message: " + str(event.message.body)) + print("received message: " + Converter.bytes_to_string(event.message.body)) # accepting self.delivery_context.accept(event) @@ -85,10 +86,9 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - - exchange_name = "test-exchange" - queue_name = "example-queue" - routing_key = "routing-key" + exchange_name = "oAuth2-test-exchange" + queue_name = "oAuth2-example-queue" + routing_key = "oAuth2-routing-key" print("connection to amqp server") oaut_token = token( @@ -144,14 +144,15 @@ def main() -> None: # publish 10 messages for i in range(MESSAGES_TO_PUBLISH): - print("publishing") - status = publisher.publish(Message(body="test")) + status = publisher.publish( + Message(body=Converter.string_to_bytes("test_{}".format(i))) + ) if status.remote_state == OutcomeState.ACCEPTED: - print("message accepted") + print("message: test_{} accepted".format(i)) elif status.remote_state == OutcomeState.RELEASED: - print("message not routed") + print("message: test_{} not routed".format(i)) elif status.remote_state == OutcomeState.REJECTED: - print("message not rejected") + print("message: test_{} rejected".format(i)) publisher.close() diff --git a/examples/reconnection/reconnection_example.py b/examples/reconnection/reconnection_example.py index 161b8d4..bbd5f33 100644 --- a/examples/reconnection/reconnection_example.py +++ b/examples/reconnection/reconnection_example.py @@ -4,6 +4,7 @@ AMQPMessagingHandler, Connection, ConnectionClosed, + Converter, Environment, Event, ExchangeSpecification, @@ -15,7 +16,6 @@ # here we keep track of the objects we need to reconnect MESSAGES_TO_PUBLISH = 50000 - environment = Environment( uri="amqp://guest:guest@localhost:5672/", ) @@ -29,7 +29,9 @@ def __init__(self): def on_message(self, event: Event): if self._count % 1000 == 0: - print("received 100 message: " + str(event.message.body)) + print( + "received 100 message: " + Converter.bytes_to_string(event.message.body) + ) # accepting self.delivery_context.accept(event) @@ -79,7 +81,6 @@ def create_connection() -> Connection: def main() -> None: - exchange_name = "test-exchange" queue_name = "example-queue" routing_key = "routing-key" @@ -128,7 +129,7 @@ def main() -> None: print("published 1000 messages...") try: if publisher is not None: - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except ConnectionClosed: print("publisher closing exception, resubmitting") # publisher = connection.publisher(addr) diff --git a/examples/streams/example_with_streams.py b/examples/streams/example_with_streams.py index cb53e73..dc031ca 100644 --- a/examples/streams/example_with_streams.py +++ b/examples/streams/example_with_streams.py @@ -5,6 +5,7 @@ AMQPMessagingHandler, Connection, ConnectionClosed, + Converter, Environment, Event, Message, @@ -26,7 +27,7 @@ def on_amqp_message(self, event: Event): # just messages with banana filters get received print( "received message from stream: " - + str(event.message.body) + + Converter.bytes_to_string(event.message.body) + " with offset: " + str(event.message.annotations["x-stream-offset"]) ) @@ -84,7 +85,7 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - queue_name = "example-queue" + queue_name = "stream-example-queue" print("connection to amqp server") environment = Environment("amqp://guest:guest@localhost:5672/") @@ -118,7 +119,8 @@ def main() -> None: for i in range(MESSAGES_TO_PUBLISH): publisher.publish( Message( - body="apple: " + str(i), annotations={"x-stream-filter-value": "apple"} + Converter.string_to_bytes(body="apple: " + str(i)), + annotations={"x-stream-filter-value": "apple"}, ) ) @@ -126,7 +128,7 @@ def main() -> None: for i in range(MESSAGES_TO_PUBLISH): publisher.publish( Message( - body="banana: " + str(i), + body=Converter.string_to_bytes("banana: " + str(i)), annotations={"x-stream-filter-value": "banana"}, ) ) diff --git a/examples/tls/tls_example.py b/examples/tls/tls_example.py index c6eb0af..bd953fc 100644 --- a/examples/tls/tls_example.py +++ b/examples/tls/tls_example.py @@ -1,4 +1,5 @@ # type: ignore +import os import sys from traceback import print_exception @@ -6,6 +7,7 @@ AddressHelper, AMQPMessagingHandler, Connection, + Converter, CurrentUserStore, Environment, Event, @@ -34,7 +36,7 @@ def __init__(self): self._count = 0 def on_message(self, event: Event): - print("received message: " + str(event.message.body)) + print("received message: " + Converter.bytes_to_string(event.message.body)) # accepting self.delivery_context.accept(event) @@ -79,15 +81,14 @@ def create_connection(environment: Environment) -> Connection: def main() -> None: - - exchange_name = "test-exchange" - queue_name = "example-queue" - routing_key = "routing-key" + exchange_name = "tls-test-exchange" + queue_name = "tls-example-queue" + routing_key = "tls-routing-key" ca_p12_store = ".ci/certs/ca.p12" ca_cert_file = ".ci/certs/ca_certificate.pem" - client_cert = ".ci/certs/client_certificate.pem" - client_key = ".ci/certs/client_key.pem" - client_p12_store = ".ci/certs/client.p12" + client_cert = ".ci/certs/client_localhost_certificate.pem" + client_key = ".ci/certs/client_localhost_key.pem" + client_p12_store = ".ci/certs/client_localhost.p12" uri = "amqps://guest:guest@localhost:5671/" if sys.platform == "win32": @@ -138,6 +139,9 @@ def main() -> None: "connection failed. working directory should be project root" ) else: + print(" ca_cert_file exists: {}".format(os.path.isfile(ca_cert_file))) + print(" client_cert exists: {}".format(os.path.isfile(client_cert))) + print(" client_key exists: {}".format(os.path.isfile(client_key))) environment = Environment( uri, ssl_context=PosixSslConfigurationContext( @@ -187,7 +191,7 @@ def main() -> None: # publish 10 messages for i in range(messages_to_publish): - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=Converter.string_to_bytes("test"))) if status.ACCEPTED: print("message accepted") elif status.RELEASED: diff --git a/poetry.lock b/poetry.lock index e9c8b6d..2705966 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "black" @@ -6,6 +6,7 @@ version = "24.10.0" description = "The uncompromising code formatter." optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "black-24.10.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e6668650ea4b685440857138e5fe40cde4d652633b1bdffc62933d0db4ed9812"}, {file = "black-24.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1c536fcf674217e87b8cc3657b81809d3c085d7bf3ef262ead700da345bfa6ea"}, @@ -52,6 +53,7 @@ version = "2025.1.31" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "certifi-2025.1.31-py3-none-any.whl", hash = "sha256:ca78db4565a652026a4db2bcdf68f2fb589ea80d0be70e03929ed730746b84fe"}, {file = "certifi-2025.1.31.tar.gz", hash = "sha256:3d5da6925056f6f18f119200434a4780a94263f10d1c21d032a6f6b2baa20651"}, @@ -63,6 +65,7 @@ version = "1.17.1" description = "Foreign Function Interface for Python calling C code." optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "cffi-1.17.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:df8b1c11f177bc2313ec4b2d46baec87a5f3e71fc8b45dab2ee7cae86d9aba14"}, {file = "cffi-1.17.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8f2cdc858323644ab277e9bb925ad72ae0e67f69e804f4898c070998d50b1a67"}, @@ -142,6 +145,7 @@ version = "3.4.1" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "charset_normalizer-3.4.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:91b36a978b5ae0ee86c394f5a54d6ef44db1de0815eb43de826d41d21e4af3de"}, {file = "charset_normalizer-3.4.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7461baadb4dc00fd9e0acbe254e3d7d2112e7f92ced2adc96e54ef6501c5f176"}, @@ -243,6 +247,7 @@ version = "8.1.8" description = "Composable command line interface toolkit" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, @@ -257,6 +262,8 @@ version = "0.4.6" description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["dev"] +markers = "sys_platform == \"win32\" or platform_system == \"Windows\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, @@ -268,6 +275,8 @@ version = "1.2.2" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" +groups = ["dev"] +markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, @@ -282,6 +291,7 @@ version = "7.1.2" description = "the modular source code checker: pep8 pyflakes and co" optional = false python-versions = ">=3.8.1" +groups = ["dev"] files = [ {file = "flake8-7.1.2-py2.py3-none-any.whl", hash = "sha256:1cbc62e65536f65e6d754dfe6f1bada7f5cf392d6f5db3c2b85892466c3e7c1a"}, {file = "flake8-7.1.2.tar.gz", hash = "sha256:c586ffd0b41540951ae41af572e6790dbd49fc12b3aa2541685d253d9bd504bd"}, @@ -298,6 +308,7 @@ version = "3.10" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -312,6 +323,7 @@ version = "2.0.0" description = "brain-dead simple config-ini parsing" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, @@ -323,6 +335,7 @@ version = "5.13.2" description = "A Python utility / library to sort Python imports." optional = false python-versions = ">=3.8.0" +groups = ["dev"] files = [ {file = "isort-5.13.2-py3-none-any.whl", hash = "sha256:8ca5e72a8d85860d5a3fa69b8745237f2939afe12dbf656afbcb47fe72d947a6"}, {file = "isort-5.13.2.tar.gz", hash = "sha256:48fdfcb9face5d58a4f6dde2e72a1fb8dcaf8ab26f95ab49fab84c2ddefb0109"}, @@ -337,6 +350,7 @@ version = "0.7.0" description = "McCabe checker, plugin for flake8" optional = false python-versions = ">=3.6" +groups = ["dev"] files = [ {file = "mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e"}, {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, @@ -348,6 +362,7 @@ version = "0.910" description = "Optional static typing for Python" optional = false python-versions = ">=3.5" +groups = ["dev"] files = [ {file = "mypy-0.910-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457"}, {file = "mypy-0.910-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:b94e4b785e304a04ea0828759172a15add27088520dc7e49ceade7834275bedb"}, @@ -389,6 +404,7 @@ version = "0.4.4" description = "Experimental type system extensions for programs checked with the mypy typechecker." optional = false python-versions = ">=2.7" +groups = ["dev"] files = [ {file = "mypy_extensions-0.4.4.tar.gz", hash = "sha256:c8b707883a96efe9b4bb3aaf0dcc07e7e217d7d8368eec4db4049ee9e142f4fd"}, ] @@ -399,6 +415,7 @@ version = "24.2" description = "Core utilities for Python packages" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, @@ -410,17 +427,36 @@ version = "0.12.1" description = "Utility library for gitignore style pattern matching of file paths." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08"}, {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, ] +[[package]] +name = "pika" +version = "1.3.2" +description = "Pika Python AMQP Client Library" +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f"}, + {file = "pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f"}, +] + +[package.extras] +gevent = ["gevent"] +tornado = ["tornado"] +twisted = ["twisted"] + [[package]] name = "platformdirs" version = "4.3.6" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "platformdirs-4.3.6-py3-none-any.whl", hash = "sha256:73e575e1408ab8103900836b97580d5307456908a03e92031bab39e4554cc3fb"}, {file = "platformdirs-4.3.6.tar.gz", hash = "sha256:357fb2acbc885b0419afd3ce3ed34564c13c9b95c89360cd9563f73aa5e2b907"}, @@ -437,6 +473,7 @@ version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, @@ -452,6 +489,7 @@ version = "2.12.1" description = "Python style guide checker" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pycodestyle-2.12.1-py2.py3-none-any.whl", hash = "sha256:46f0fb92069a7c28ab7bb558f05bfc0110dac69a0cd23c61ea0040283a9d78b3"}, {file = "pycodestyle-2.12.1.tar.gz", hash = "sha256:6838eae08bbce4f6accd5d5572075c63626a15ee3e6f842df996bf62f6d73521"}, @@ -463,6 +501,7 @@ version = "2.22" description = "C parser in Python" optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, @@ -474,6 +513,7 @@ version = "3.2.0" description = "passive checker of Python programs" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pyflakes-3.2.0-py2.py3-none-any.whl", hash = "sha256:84b5be138a2dfbb40689ca07e2152deb896a65c3a3e24c251c5c62489568074a"}, {file = "pyflakes-3.2.0.tar.gz", hash = "sha256:1c61603ff154621fb2a9172037d84dca3500def8c8b630657d1701f026f8af3f"}, @@ -485,6 +525,7 @@ version = "2.10.1" description = "JSON Web Token implementation in Python" optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb"}, {file = "pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953"}, @@ -502,6 +543,7 @@ version = "8.3.5" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"}, {file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"}, @@ -524,6 +566,7 @@ version = "0.39.0" description = "An AMQP based messaging library." optional = false python-versions = "*" +groups = ["main", "dev"] files = [ {file = "python-qpid-proton-0.39.0.tar.gz", hash = "sha256:362055ae6ab4c7f1437247c602757f30328d55c0a6986d5b68ca9798de9fce02"}, {file = "python_qpid_proton-0.39.0-cp38-abi3-macosx_11_0_x86_64.whl", hash = "sha256:f69da296ffa9e3b22f88a53fe9e27c4f4844e088a9f041061bd4f75f74f2a0af"}, @@ -542,6 +585,7 @@ version = "2.32.3" description = "Python HTTP for Humans." optional = false python-versions = ">=3.8" +groups = ["dev"] files = [ {file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"}, {file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"}, @@ -563,6 +607,7 @@ version = "0.10.2" description = "Python Library for Tom's Obvious, Minimal Language" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +groups = ["dev"] files = [ {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, @@ -574,6 +619,8 @@ version = "2.2.1" description = "A lil' TOML parser" optional = false python-versions = ">=3.8" +groups = ["dev"] +markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, @@ -615,6 +662,7 @@ version = "4.13.0" description = "Backported and Experimental Type Hints for Python 3.8+" optional = false python-versions = ">=3.8" +groups = ["main", "dev"] files = [ {file = "typing_extensions-4.13.0-py3-none-any.whl", hash = "sha256:c8dd92cc0d6425a97c18fbb9d1954e5ff92c1ca881a309c45f06ebc0b79058e5"}, {file = "typing_extensions-4.13.0.tar.gz", hash = "sha256:0a4ac55a5820789d87e297727d229866c9650f6521b64206413c4fbada24d95b"}, @@ -626,18 +674,19 @@ version = "2.3.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false python-versions = ">=3.9" +groups = ["dev"] files = [ {file = "urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df"}, {file = "urllib3-2.3.0.tar.gz", hash = "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"}, ] [package.extras] -brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] [metadata] -lock-version = "2.0" +lock-version = "2.1" python-versions = "^3.9" -content-hash = "ed04935820eda364360c0d157a4bd3d271f8a6f087145950202c5a57a1b113e0" +content-hash = "70ddd7eaf9b665c8bd6255196cb8f0d738ad8830a11d418490f76d99f627f34a" diff --git a/pyproject.toml b/pyproject.toml index 59b9df0..9461e1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,10 +9,12 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.9" python-qpid-proton = "^0.39.0" -pyjwt = "^2.10.1" typing-extensions = "^4.13.0" + [tool.poetry.group.dev.dependencies] +pyjwt = "^2.10.1" +pika = "^1.3.2" flake8 = "^7.1.1" isort = "^5.9.3" mypy = "^0.910" diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index cdb623a..487e878 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -42,6 +42,7 @@ WinClientCert, WinSslConfigurationContext, ) +from .utils import Converter try: __version__ = metadata.version(__package__) @@ -91,4 +92,5 @@ "ExchangeCustomSpecification", "RecoveryConfiguration", "OAuth2Options", + "Converter", ] diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 9ba75ac..6ff7487 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -134,6 +134,7 @@ def _request( amq_message = Message( id=id, body=body, + inferred=False, reply_to="$me", address=path, subject=method, diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index dc63d5d..824170c 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -80,6 +80,14 @@ def publish(self, message: Message) -> Delivery: "address specified in both message and publisher" ) + if not isinstance(message.body, (bytes, type(None))): + raise ArgumentOutOfRangeException( + "Message body must be of type bytes or None" + ) + + if not message.inferred: + raise ArgumentOutOfRangeException("Message inferred must be True") + if self._addr != "": if self._sender is not None: return self._sender.send(message) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py b/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py index 806961a..473faa2 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_exceptions.py @@ -59,6 +59,14 @@ class MessageException(ProtonException): pass +class ArgumentOutOfRangeException(MessageException): + """ + An exception class raised when an argument is out of range. + """ + + pass + + class DataException(ProtonException): """ The DataException class is the root of the Data exception hierarchy. diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 9c0b655..ad11cd4 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -84,10 +84,13 @@ from ._common import millis2secs, secs2millis from ._data import AnnotationDict, Data, char, symbol, ulong from ._endpoints import Link -from ._exceptions import EXCEPTIONS, MessageException +from ._exceptions import ( + EXCEPTIONS, + MessageException, +) if TYPE_CHECKING: - from proton._data import Described, PythonAMQPData + from proton._data import PythonAMQPData from proton._delivery import Delivery from proton._endpoints import Receiver, Sender @@ -110,17 +113,17 @@ class Message(object): """ Default AMQP message priority""" def __init__( - self, - body: Union[ - bytes, str, dict, list, int, float, "UUID", "Described", None - ] = None, - **kwargs + self, body: Union[bytes, None] = None, inferred=True, **kwargs ) -> None: + # validate the types + self._msg = pn_message() self.instructions = None self.annotations = None self.properties = None self.body = body + self.inferred = inferred + for k, v in kwargs.items(): getattr(self, k) # Raise exception if it's not a valid attribute. setattr(self, k, v) @@ -236,7 +239,8 @@ def inferred(self) -> bool: :raise: :exc:`MessageException` if there is any Proton error when using the setter. """ - return pn_message_is_inferred(self._msg) + x = pn_message_is_inferred(self._msg) + return x @inferred.setter def inferred(self, value: bool) -> None: diff --git a/rabbitmq_amqp_python_client/utils.py b/rabbitmq_amqp_python_client/utils.py index cbc1a35..4c8b75d 100644 --- a/rabbitmq_amqp_python_client/utils.py +++ b/rabbitmq_amqp_python_client/utils.py @@ -7,3 +7,32 @@ def validate_annotations(annotations: []) -> bool: # type: ignore validated = False return validated return validated + + +class Converter: + + @staticmethod + def bytes_to_string(body: bytes) -> str: + """ + Convert the body of a message to a string. + + Args: + body: The body of the message + + Returns: + str: The string representation of the body + """ + return "".join(map(chr, body)) + + @staticmethod + def string_to_bytes(body: str) -> bytes: + """ + Convert a string to the body of a message. + + Args: + body: The string to convert + + Returns: + bytes: The byte representation of the string + """ + return str.encode(body) diff --git a/tests/conftest.py b/tests/conftest.py index 2552463..1ed6742 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -263,8 +263,7 @@ def __init__(self): self._received = 0 def on_message(self, event: Event): - annotations = {} - annotations[symbol("x-opt-string")] = "x-test1" + annotations = {symbol("x-opt-string"): "x-test1"} self.delivery_context.requeue_with_annotations(event, annotations) self._received = self._received + 1 if self._received == 1000: diff --git a/tests/test_amqp_091.py b/tests/test_amqp_091.py new file mode 100644 index 0000000..e0c494c --- /dev/null +++ b/tests/test_amqp_091.py @@ -0,0 +1,68 @@ +import functools + +import pika + +from rabbitmq_amqp_python_client import ( + AddressHelper, + Connection, + Converter, + OutcomeState, + QuorumQueueSpecification, +) +from rabbitmq_amqp_python_client.qpid.proton import Message + + +def test_publish_queue(connection: Connection) -> None: + queue_name = "amqp091-queue" + management = connection.management() + + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + + raised = False + + publisher = None + accepted = False + + try: + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) + status = publisher.publish( + Message(body=Converter.string_to_bytes("my_test_string_for_amqp")) + ) + if status.remote_state == OutcomeState.ACCEPTED: + accepted = True + except Exception: + raised = True + + if publisher is not None: + publisher.close() + + assert accepted is True + assert raised is False + + credentials = pika.PlainCredentials("guest", "guest") + parameters = pika.ConnectionParameters("localhost", credentials=credentials) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + def on_message(chan, method_frame, header_frame, body, userdata=None): + """Called when a message is received. Log message and ack it.""" + chan.basic_ack(delivery_tag=method_frame.delivery_tag) + assert body is not None + body_text = Converter.bytes_to_string(body) + assert body_text is not None + assert body_text == "my_test_string_for_amqp" + channel.stop_consuming() + + on_message_callback = functools.partial(on_message, userdata="on_message_userdata") + channel.basic_qos( + prefetch_count=1, + ) + channel.basic_consume(queue_name, on_message_callback) + + channel.start_consuming() + connection.close() + + management.delete_queue(queue_name) + management.close() diff --git a/tests/test_connection.py b/tests/test_connection.py index 6190761..5636020 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,5 +1,6 @@ import time from datetime import datetime, timedelta +from pathlib import Path from rabbitmq_amqp_python_client import ( ConnectionClosed, @@ -38,6 +39,15 @@ def test_connection_ssl(ssl_context) -> None: "amqps://guest:guest@localhost:5671/", ssl_context=ssl_context, ) + path = Path(ssl_context.ca_cert) + assert path.is_file() is True + assert path.exists() is True + + path = Path(ssl_context.client_cert.client_cert) + assert path.is_file() is True + + path = Path(ssl_context.client_cert.client_key) + assert path.is_file() is True connection = environment.connection() connection.dial() diff --git a/tests/test_consumer.py b/tests/test_consumer.py index db9bc6a..87a6ef8 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -5,6 +5,7 @@ Environment, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import Converter from .conftest import ( ConsumerTestException, @@ -42,7 +43,7 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None: # consumer synchronously without handler for i in range(messages_to_send): message = consumer.consume() - if message.body == "test" + str(i): + if Converter.bytes_to_string(message.body) == "test{}".format(i): consumed = consumed + 1 consumer.close() diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 4d21c7c..0847717 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -13,11 +13,37 @@ StreamSpecification, ValidationCodeException, ) +from rabbitmq_amqp_python_client.utils import Converter from .http_requests import delete_all_connections from .utils import create_binding, publish_per_message +def test_validate_message_for_publishing(connection: Connection) -> None: + queue_name = "validate-publishing" + management = connection.management() + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + publisher = connection.publisher( + destination=AddressHelper.queue_address(queue_name) + ) + try: + publisher.publish( + Message(body=Converter.string_to_bytes("test"), inferred=False) + ) + except ArgumentOutOfRangeException as e: + assert e.msg == "Message inferred must be True" + + try: + publisher.publish(Message(body="test")) + except ArgumentOutOfRangeException as e: + assert e.msg == "Message body must be of type bytes or None" + + try: + publisher.publish(Message(body={"key": "value"})) + except ArgumentOutOfRangeException as e: + assert e.msg == "Message body must be of type bytes or None" + + def test_publish_queue(connection: Connection) -> None: queue_name = "test-queue" @@ -34,7 +60,7 @@ def test_publish_queue(connection: Connection) -> None: publisher = connection.publisher( destination=AddressHelper.queue_address(queue_name) ) - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=Converter.string_to_bytes("test"))) if status.remote_state == OutcomeState.ACCEPTED: accepted = True except Exception: @@ -109,7 +135,7 @@ def test_publish_ssl(connection_ssl: Connection) -> None: publisher = connection_ssl.publisher( destination=AddressHelper.queue_address(queue_name) ) - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except Exception: raised = True @@ -130,7 +156,7 @@ def test_publish_to_invalid_destination(connection: Connection) -> None: publisher = None try: publisher = connection.publisher("/invalid-destination/" + queue_name) - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except ArgumentOutOfRangeException: raised = True except Exception: @@ -147,7 +173,7 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N queue_name = "test-queue-1" raised = False - message = Message(body="test") + message = Message(body=Converter.string_to_bytes("test")) message = AddressHelper.message_to_address_helper( message, "/invalid_destination/" + queue_name ) @@ -179,7 +205,7 @@ def test_publish_per_message_both_address(connection: Connection) -> None: ) try: - message = Message(body="test") + message = Message(body=Converter.string_to_bytes("test")) message = AddressHelper.message_to_address_helper( message, AddressHelper.queue_address(queue_name) ) @@ -212,7 +238,7 @@ def test_publish_exchange(connection: Connection) -> None: try: publisher = connection.publisher(addr) - status = publisher.publish(Message(body="test")) + status = publisher.publish(Message(body=Converter.string_to_bytes("test"))) if status.ACCEPTED: accepted = True except Exception: @@ -244,7 +270,7 @@ def test_publish_purge(connection: Connection) -> None: destination=AddressHelper.queue_address(queue_name) ) for i in range(messages_to_publish): - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except Exception: raised = True @@ -289,7 +315,7 @@ def test_disconnection_reconnection() -> None: # simulate a disconnection delete_all_connections() try: - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) except ConnectionClosed: disconnected = True @@ -331,8 +357,7 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: ) for i in range(messages_to_send): - - publisher.publish(Message(body="test")) + publisher.publish(Message(body=Converter.string_to_bytes("test"))) def test_publish_per_message_exchange(connection: Connection) -> None: diff --git a/tests/utils.py b/tests/utils.py index 2b0a7f0..1dd5337 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,6 +16,7 @@ Publisher, QuorumQueueSpecification, ) +from rabbitmq_amqp_python_client.utils import Converter def publish_messages( @@ -26,25 +27,29 @@ def publish_messages( ) -> None: annotations = {} if filters is not None: - for filter in filters: - annotations = {"x-stream-filter-value": filter} + for filterItem in filters: + annotations = {"x-stream-filter-value": filterItem} publisher = connection.publisher("/queues/" + queue_name) # publish messages_to_send messages for i in range(messages_to_send): - publisher.publish(Message(body="test" + str(i), annotations=annotations)) + publisher.publish( + Message( + body=Converter.string_to_bytes("test{}".format(i)), + annotations=annotations, + ) + ) publisher.close() def publish_per_message(publisher: Publisher, addr: str) -> Delivery: - message = Message(body="test") + message = Message(body=Converter.string_to_bytes("test")) message = AddressHelper.message_to_address_helper(message, addr) status = publisher.publish(message) return status def setup_dead_lettering(management: Management) -> str: - exchange_dead_lettering = "exchange-dead-letter" queue_dead_lettering = "queue-dead-letter" binding_key = "key_dead_letter" @@ -72,7 +77,6 @@ def setup_dead_lettering(management: Management) -> str: def create_binding( management: Management, exchange_name: str, queue_name: str, routing_key: str ) -> str: - management.declare_exchange(ExchangeSpecification(name=exchange_name)) management.declare_queue(QuorumQueueSpecification(name=queue_name)) @@ -89,7 +93,6 @@ def create_binding( def cleanup_dead_lettering(management: Management, bind_path: str) -> None: - exchange_dead_lettering = "exchange-dead-letter" queue_dead_lettering = "queue-dead-letter"