Skip to content

Commit 46495b3

Browse files
committed
feat: added consume message without id check
1 parent 1b0da8b commit 46495b3

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed

src/flow_queue/service.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,68 @@ impl RabbitmqClient {
187187
Err(RabbitMqError::DeserializationError)
188188
}
189189

190+
// Function intended to get used by the runtime
191+
pub async fn consume_message(
192+
&self,
193+
queue_name: &str,
194+
ack_on_success: bool,
195+
) -> Result<Message, RabbitMqError> {
196+
let mut consumer = {
197+
let channel = self.channel.lock().await;
198+
199+
let consumer_res = channel
200+
.basic_consume(
201+
queue_name,
202+
"consumer",
203+
lapin::options::BasicConsumeOptions::default(),
204+
FieldTable::default(),
205+
)
206+
.await;
207+
208+
match consumer_res {
209+
Ok(consumer) => consumer,
210+
Err(err) => panic!("{}", err),
211+
}
212+
};
213+
214+
debug!("Starting to consume from {}", queue_name);
215+
216+
while let Some(delivery_result) = consumer.next().await {
217+
let delivery = match delivery_result {
218+
Ok(del) => del,
219+
Err(_) => return Err(RabbitMqError::DeserializationError),
220+
};
221+
let data = &delivery.data;
222+
let message_str = match std::str::from_utf8(&data) {
223+
Ok(str) => str,
224+
Err(_) => {
225+
return Err(RabbitMqError::DeserializationError);
226+
}
227+
};
228+
229+
debug!("Received message: {}", message_str);
230+
231+
// Parse the message
232+
let message = match serde_json::from_str::<Message>(message_str) {
233+
Ok(m) => m,
234+
Err(e) => {
235+
log::error!("Failed to parse message: {}", e);
236+
return Err(RabbitMqError::DeserializationError);
237+
}
238+
};
239+
240+
if ack_on_success {
241+
delivery
242+
.ack(lapin::options::BasicAckOptions::default())
243+
.await
244+
.expect("Failed to acknowledge message");
245+
}
246+
247+
return Ok(message);
248+
}
249+
Err(RabbitMqError::DeserializationError)
250+
}
251+
190252
// Receive messages from a queue with timeout
191253
pub async fn await_message(
192254
&self,

0 commit comments

Comments
 (0)