Effectively serves as a way for two message brokers to "share" messages across HTTPS. These applications connect to each other through server-sent events, allowing for messages to be pushed without waiting for a response.
Specific INTERSECT messaging logic has its own module (see shared-deps/src/intersect_messaging.rs), so these applications could easily be forked to support another ecosystem with its own messaging protocol.
This repository consists of two applications:
proxy-http-server- subscribe to message brokers and emit the messages on a SSE endpoint, or publish messages on a POST endpointproxy-http-client- subscribe to the aforementioned server SSE endpoint and publish them to a broker, or subscribe to message brokers and publish their messages on the aforementioned POST endpoint.
The server can function as a standalone application, but the client needs to talk to a server application. The general architecture looks something like:
------------
| BROKER 1 |
------------
^
| AMQP
v
---------------------
| proxy-http-server |
---------------------
^
| HTTP
v
---------------------- SYSTEM DIVIDING LINE
^
| HTTP
v
---------------------
| proxy-http-client |
---------------------
^
| AMQP
v
------------
| BROKER 2 |
------------
All applications connected to Broker 1 will share the same system, and all applications connected to Broker 2 will share the same system. Data may flow in either direction.
To prevent infinite loops and to ensure security, subscribers will not broadcast any messages which are determined to not originate from their system.
Caveats:
- you should only deploy one
proxy-http-clientand oneproxy-http-serverper system at most - any other proxies you talk to should have different systems.
- while you can have a
clientand aserverapplication for both systems, you should only have one of the clients talk to the other's server. Don't connect both clients to both servers.
Currently only supports AMQP 0-9-1 and MQTT 3-1-1 as the broker protocols, but can potentially support others in the future
- great at handling tons of concurrent requests, necessary for something like this
- strong static analysis
- Axum has great community support and examples, and is async (this is the webserver maintained by the tokio runtime team, the most supported async runtime for Rust).
The best way to install Rust is through Rustup.
You can format with cargo fmt and lint with cargo clippy . There's also a pre-commit hook you can use (installation instructions)
You will need two message brokers and two instances of this application running for it to have any purpose.
- Spin up backing services:
docker compose up -d(note that this spins up two brokers, add 1 to all normal port numbers for second broker) - In terminal 1, run proxy-http-server:
APP_CONFIG_FILE=proxy-http-server/conf.yaml cargo run --bin proxy-http-server - In terminal 2, run proxy-http-client (will not work until proxy-http-server is initialized):
APP_CONFIG_FILE=proxy-http-client/conf.yaml cargo run --bin proxy-http-client
Common configuration structures can be found in shared-deps/src/configuration.rs . The get_configuration() function is what will be called to initialize the configuration logic.
Specific configuration structs are in proxy-http-server/src/configuration.rs and proxy-http-client/src/configuration.rs .
These instructions assume you are using the docker compose configuration and the default conf.yaml configurations for each.
-
Make sure that you run
docker compose up -dfrom the root directory, to start both brokers and their associated management UIs. -
Make sure that you have both applications started (do NOT start more than 1 of each). Each application should be connected to a separate broker.
-
To login to the broker that the server instance uses, go to localhost:15672, username
intersect_username, passwordintersect_password -
To login to the broker that the client instance uses, go to localhost:15673, username
intersect_username, passwordintersect_password -
- IF AMQP: On each application, click on the
Exchangestab, and click on theintersect-messagesexchange. - IF MQTT: On each application, click on the
Queues and Streamstab, then click on the queue (it should look likemqtt-subscription-proxy-http-clientqos1ormqtt-subscription-proxy-http-serverqos1).
- IF AMQP: On each application, click on the
-
Make sure that the
Publish messagedropdown is expanded, select the large text area which is labeled withPayload:
For the application on localhost:15672, set the payload to below (no newlines):
{"messageId":"39d9c119-3b0a-474e-ae3d-f3eb5f8d3a86","operationId":"say_hello_to_name","contentType":"application/json","payload":"\"hello_client\"","headers":{"destination":"tmp-4b19600a-527d-4a0b-9bf7-f500d9656350.tmp-.tmp-.-.tmp-","source":"organization.facility.system.subsystem.service","sdk_version":"0.6.2","created_at":"2024-06-28T15:14:39.117515Z","data_handler":0,"has_error":false}}
For the application on localhost:15673, set the payload to below (no newlines):
{"messageId":"39d9c119-3b0a-474e-ae3d-f3eb5f8d3a86","operationId":"say_hello_to_name","contentType":"application/json","payload":"\"hello_client\"","headers":{"destination":"tmp-4b19600a-527d-4a0b-9bf7-f500d9656350.tmp-.tmp-.-.tmp-","source":"organization.facility.system2.subsystem.service","sdk_version":"0.6.2","created_at":"2024-06-28T15:14:39.117515Z","data_handler":0,"has_error":false}}
- Click "publish_message". At this point the message should show up in the logs for both
proxy-http-clientandproxy-http-server. - Check the logs of the application NOT connected to the broker you just published to. (With the default configuration: check
proxy-http-clientif you published to localhost:15672, checkproxy-http-serverif you published to localhost:15673.) You should see logs which look like this:
2025-03-18T00:09:19.973776Z DEBUG intersect_ingress_proxy_common::protocols::amqp::subscribe: got raw message data from broker: {"messageId":"39d9c119-3b0a-474e-ae3d-f3eb5f8d3a86","operationId":"say_hello_to_name","contentType":"application/json","payload":"\"hello_client\"","headers":{"destination":"tmp-4b19600a-527d-4a0b-9bf7-f500d9656350.tmp-.tmp-.-.tmp-","source":"organization.facility.system.subsystem.service","sdk_version":"0.6.2","created_at":"2024-06-28T15:14:39.117515Z","data_handler":0,"has_error":false}}
at shared-deps/src/protocols/amqp/subscribe.rs:141
2025-03-18T00:09:19.973811Z DEBUG intersect_ingress_proxy_common::protocols::amqp::subscribe: message source is not from this system, will not broadcast it
at shared-deps/src/protocols/amqp/subscribe.rs:147
To explain this log, the basic workflow is this:
- Application 1 receives a message from Broker 1
- The message source is from the same system, so it will be published to an HTTP endpoint
- The message is sent over HTTP: if the http-server gets a message, it broadcasts the message out to all connected clients on the SSE endpoint; if the http-client gets a message, it POSTS the message to the HTTP server. Assuming the send is successful, the message is acked and permanently removed from Broker 1
- Application 2 gets the message over HTTP and publishes it to Broker 2
- Application 2 receives this message from Broker 2.
- The message source is NOT from the same system, so it is not published through HTTP but will be acked to remove it from the broker.
Congratulations, you have successfully simulated a publisher and a subscriber being able to talk to each other across 2 separate message brokers.
Now it's advisable to run some INTERSECT-SDK examples. One client and one service should do. You should make sure that you're using AMQP configuration settings instead of MQTT configuration settings. Both of the counter examples should work - you should make sure that client and server are talking to DIFFERENT brokers, or the exercise is pointless.
- support publishing to and subscribing from multiple brokers at once
- support protocols other than AMQP
- support listening to multiple URLs at once (fairly easy feature to add)
- find way to persist messages which get pulled off of a broker but are not sent over HTTP
- one exchange for all messages for each application (see
shared-deps/src/protocols/amqp/mod.rsto get name) - routing keys will match SDK naming schematics (SOS hierarchy, "." as separator, end with ".{userspace|lifecycle|events}"). The routing key will roughly correspond to the
destinationfield in an INTERSECT message, but thedestinationfield only exists on userspace messages (event/lifecycle messages do not have a specific destination in mind). - The queue name is hardcoded to match the name of the application.
- follows similar rationale to AMQP, except does not utilize exchanges.