Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
version = "0.0.0"
name = "code0-flow"
edition = "2021"
edition = "2024"
description = "Crate for managing the code0-flows inside of the Flow Queue & FlowStore"
repository = "https://github.com/code0-tech/code0-flow"
homepage = "https://code0.tech"
Expand Down
32 changes: 22 additions & 10 deletions src/flow_definition/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use tucana::{
aquila::{
DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
data_type_service_client::DataTypeServiceClient,
flow_type_service_client::FlowTypeServiceClient,
runtime_function_definition_service_client::RuntimeFunctionDefinitionServiceClient,
DataTypeUpdateRequest, FlowTypeUpdateRequest, RuntimeFunctionDefinitionUpdateRequest,
},
shared::{DataType, FlowType, RuntimeFunctionDefinition},
};
Expand Down Expand Up @@ -51,14 +51,18 @@ impl FlowUpdateService {

async fn update_data_types(&self) {
if self.data_types.is_empty() {
log::info!("No data types to update");
return;
}

log::info!("Updating the current DataTypes!");
let mut client = match DataTypeServiceClient::connect(self.aquila_url.clone()).await {
Ok(client) => client,
Ok(client) => {
log::info!("Successfully connected to the DataTypeService");
client
}
Err(err) => {
log::error!("Failed to connect to DataTypeService: {}", err);
log::error!("Failed to connect to the DataTypeService: {:?}", err);
return;
}
};
Expand All @@ -75,23 +79,27 @@ impl FlowUpdateService {
);
}
Err(err) => {
log::error!("Failed to update data types: {}", err);
log::error!("Failed to update data types: {:?}", err);
}
}
}

async fn update_runtime_definitions(&self) {
if self.runtime_definitions.is_empty() {
log::info!("No runtime definitions to update");
return;
}

log::info!("Updating the current RuntimeDefinitions!");
let mut client =
match RuntimeFunctionDefinitionServiceClient::connect(self.aquila_url.clone()).await {
Ok(client) => client,
Ok(client) => {
log::info!("Connected to RuntimeFunctionDefinitionService");
client
}
Err(err) => {
log::error!(
"Failed to connect to RuntimeFunctionDefinitionService: {}",
"Failed to connect to RuntimeFunctionDefinitionService: {:?}",
err
);
return;
Expand All @@ -110,21 +118,25 @@ impl FlowUpdateService {
);
}
Err(err) => {
log::error!("Failed to update runtime function definitions: {}", err);
log::error!("Failed to update runtime function definitions: {:?}", err);
}
}
}

async fn update_flow_types(&self) {
if self.flow_types.is_empty() {
log::info!("No FlowTypes to update!");
return;
}

log::info!("Updating the current FlowTypes!");
let mut client = match FlowTypeServiceClient::connect(self.aquila_url.clone()).await {
Ok(client) => client,
Ok(client) => {
log::info!("Connected to FlowTypeService!");
client
}
Err(err) => {
log::error!("Failed to connect to FlowTypeService: {}", err);
log::error!("Failed to connect to FlowTypeService: {:?}", err);
return;
}
};
Expand All @@ -141,7 +153,7 @@ impl FlowUpdateService {
);
}
Err(err) => {
log::error!("Failed to update flow types: {}", err);
log::error!("Failed to update flow types: {:?}", err);
}
}
}
Expand Down
13 changes: 8 additions & 5 deletions src/flow_queue/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@ use lapin::Connection;
pub async fn build_connection(rabbitmq_url: &str) -> Connection {
match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
Ok(env) => env,
Err(error) => panic!(
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
error
),
Err(error) => {
log::error!(
"Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
error
);
panic!("Cannot connect to FlowQueue (RabbitMQ) instance!");
}
}
}

#[cfg(test)]
mod tests {
use crate::flow_queue::connection::build_connection;
use testcontainers::GenericImage;
use testcontainers::core::{IntoContainerPort, WaitFor};
use testcontainers::runners::AsyncRunner;
use testcontainers::GenericImage;

macro_rules! rabbitmq_container_test {
($test_name:ident, $consumer:expr) => {
Expand Down
88 changes: 57 additions & 31 deletions src/flow_queue/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::{sync::Arc, time::Duration};

use futures_lite::StreamExt;
use lapin::{
Channel,
options::{BasicConsumeOptions, QueueDeclareOptions},
types::FieldTable,
Channel,
};
use log::debug;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -44,6 +44,7 @@ pub enum RabbitMqError {
ConnectionError(String),
TimeoutError,
DeserializationError,
SerializationError,
}

impl From<lapin::Error> for RabbitMqError {
Expand All @@ -65,6 +66,7 @@ impl std::fmt::Display for RabbitMqError {
RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
RabbitMqError::TimeoutError => write!(f, "Operation timed out"),
RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"),
RabbitMqError::SerializationError => write!(f, "Failed to serialize message"),
}
}
}
Expand All @@ -83,8 +85,10 @@ impl RabbitmqClient {
)
.await
{
Ok(_) => (),
Err(err) => log::error!("Failed to declare send_queue: {}", err),
Ok(_) => {
log::info!("Successfully declared send_queue");
}
Err(err) => log::error!("Failed to declare send_queue: {:?}", err),
}

match channel
Expand All @@ -95,8 +99,10 @@ impl RabbitmqClient {
)
.await
{
Ok(_) => (),
Err(err) => log::error!("Failed to declare recieve_queue: {}", err),
Ok(_) => {
log::info!("Successfully declared recieve_queue");
}
Err(err) => log::error!("Failed to declare recieve_queue: {:?}", err),
}

RabbitmqClient {
Expand All @@ -109,23 +115,27 @@ impl RabbitmqClient {
&self,
message_json: String,
queue_name: &str,
) -> Result<(), lapin::Error> {
) -> Result<(), RabbitMqError> {
let channel = self.channel.lock().await;

channel
match channel
.basic_publish(
"", // exchange
queue_name, // routing key (queue name)
lapin::options::BasicPublishOptions::default(),
message_json.as_bytes(),
lapin::BasicProperties::default(),
)
.await?;

Ok(())
.await
{
Err(err) => {
log::error!("Failed to publish message: {:?}", err);
Err(RabbitMqError::LapinError(err))
}
Ok(_) => Ok(()),
}
}

// Receive messages from a queue
// Receive messages from a queue with no timeout
pub async fn await_message_no_timeout(
&self,
Expand All @@ -146,8 +156,14 @@ impl RabbitmqClient {
.await;

match consumer_res {
Ok(consumer) => consumer,
Err(err) => panic!("{}", err),
Ok(consumer) => {
log::info!("Established queue connection to {}", queue_name);
consumer
}
Err(err) => {
log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
return Err(RabbitMqError::LapinError(err));
}
}
};

Expand All @@ -172,17 +188,19 @@ impl RabbitmqClient {
let message = match serde_json::from_str::<Message>(message_str) {
Ok(m) => m,
Err(e) => {
log::error!("Failed to parse message: {}", e);
log::error!("Failed to parse message: {:?}", e);
return Err(RabbitMqError::DeserializationError);
}
};

if message.message_id == message_id {
if ack_on_success {
delivery
if let Err(delivery_error) = delivery
.ack(lapin::options::BasicAckOptions::default())
.await
.expect("Failed to acknowledge message");
{
log::error!("Failed to acknowledge message: {:?}", delivery_error);
}
}

return Ok(message);
Expand All @@ -196,7 +214,7 @@ impl RabbitmqClient {
&self,
queue_name: &str,
handle_message: fn(Message) -> Result<Message, lapin::Error>,
) -> Result<(), lapin::Error> {
) -> Result<(), RabbitMqError> {
let mut consumer = {
let channel = self.channel.lock().await;

Expand All @@ -210,8 +228,14 @@ impl RabbitmqClient {
.await;

match consumer_res {
Ok(consumer) => consumer,
Err(err) => panic!("Cannot consume messages: {}", err),
Ok(consumer) => {
log::info!("Established queue connection to {}", queue_name);
consumer
}
Err(err) => {
log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
return Err(RabbitMqError::LapinError(err));
}
}
};

Expand All @@ -221,8 +245,8 @@ impl RabbitmqClient {
let delivery = match delivery {
Ok(del) => del,
Err(err) => {
log::error!("Error receiving message: {}", err);
return Err(err);
log::error!("Error receiving message: {:?}", err);
return Err(RabbitMqError::LapinError(err));
}
};

Expand All @@ -233,32 +257,32 @@ impl RabbitmqClient {
str
}
Err(err) => {
log::error!("Error decoding message: {}", err);
return Ok(());
log::error!("Error decoding message: {:?}", err);
return Err(RabbitMqError::DeserializationError);
}
};
// Parse the message
let inc_message = match serde_json::from_str::<Message>(message_str) {
Ok(mess) => mess,
Err(err) => {
log::error!("Error parsing message: {}", err);
return Ok(());
log::error!("Error parsing message: {:?}", err);
return Err(RabbitMqError::DeserializationError);
}
};

let message = match handle_message(inc_message) {
Ok(mess) => mess,
Err(err) => {
log::error!("Error handling message: {}", err);
return Ok(());
log::error!("Error handling message: {:?}", err);
return Err(RabbitMqError::DeserializationError);
}
};

let message_json = match serde_json::to_string(&message) {
Ok(json) => json,
Err(err) => {
log::error!("Error serializing message: {}", err);
return Ok(());
log::error!("Error serializing message: {:?}", err);
return Err(RabbitMqError::SerializationError);
}
};

Expand All @@ -267,10 +291,12 @@ impl RabbitmqClient {
}

// Acknowledge the message
delivery
if let Err(delivery_error) = delivery
.ack(lapin::options::BasicAckOptions::default())
.await
.expect("Failed to acknowledge message");
{
log::error!("Failed to acknowledge message: {:?}", delivery_error);
}
}

Ok(())
Expand Down
Loading