@@ -2,9 +2,9 @@ use std::{sync::Arc, time::Duration};
2
2
3
3
use futures_lite::StreamExt;
4
4
use lapin::{
5
+ Channel,
5
6
options::{BasicConsumeOptions, QueueDeclareOptions},
6
7
types::FieldTable,
7
- Channel,
8
8
};
9
9
use log::debug;
10
10
use serde::{Deserialize, Serialize};
@@ -44,6 +44,7 @@ pub enum RabbitMqError {
44
44
ConnectionError(String),
45
45
TimeoutError,
46
46
DeserializationError,
47
+ SerializationError,
47
48
}
48
49
49
50
impl From<lapin::Error> for RabbitMqError {
@@ -65,6 +66,7 @@ impl std::fmt::Display for RabbitMqError {
65
66
RabbitMqError::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
66
67
RabbitMqError::TimeoutError => write!(f, "Operation timed out"),
67
68
RabbitMqError::DeserializationError => write!(f, "Failed to deserialize message"),
69
+ RabbitMqError::SerializationError => write!(f, "Failed to serialize message"),
68
70
}
69
71
}
70
72
}
@@ -83,8 +85,10 @@ impl RabbitmqClient {
83
85
)
84
86
.await
85
87
{
86
- Ok(_) => (),
87
- Err(err) => log::error!("Failed to declare send_queue: {}", err),
88
+ Ok(_) => {
89
+ log::info!("Successfully declared send_queue");
90
+ }
91
+ Err(err) => log::error!("Failed to declare send_queue: {:?}", err),
88
92
}
89
93
90
94
match channel
@@ -95,8 +99,10 @@ impl RabbitmqClient {
95
99
)
96
100
.await
97
101
{
98
- Ok(_) => (),
99
- Err(err) => log::error!("Failed to declare recieve_queue: {}", err),
102
+ Ok(_) => {
103
+ log::info!("Successfully declared recieve_queue");
104
+ }
105
+ Err(err) => log::error!("Failed to declare recieve_queue: {:?}", err),
100
106
}
101
107
102
108
RabbitmqClient {
@@ -109,23 +115,27 @@ impl RabbitmqClient {
109
115
&self,
110
116
message_json: String,
111
117
queue_name: &str,
112
- ) -> Result<(), lapin::Error > {
118
+ ) -> Result<(), RabbitMqError > {
113
119
let channel = self.channel.lock().await;
114
120
115
- channel
121
+ match channel
116
122
.basic_publish(
117
123
"", // exchange
118
124
queue_name, // routing key (queue name)
119
125
lapin::options::BasicPublishOptions::default(),
120
126
message_json.as_bytes(),
121
127
lapin::BasicProperties::default(),
122
128
)
123
- .await?;
124
-
125
- Ok(())
129
+ .await
130
+ {
131
+ Err(err) => {
132
+ log::error!("Failed to publish message: {:?}", err);
133
+ Err(RabbitMqError::LapinError(err))
134
+ }
135
+ Ok(_) => Ok(()),
136
+ }
126
137
}
127
138
128
- // Receive messages from a queue
129
139
// Receive messages from a queue with no timeout
130
140
pub async fn await_message_no_timeout(
131
141
&self,
@@ -146,8 +156,14 @@ impl RabbitmqClient {
146
156
.await;
147
157
148
158
match consumer_res {
149
- Ok(consumer) => consumer,
150
- Err(err) => panic!("{}", err),
159
+ Ok(consumer) => {
160
+ log::info!("Established queue connection to {}", queue_name);
161
+ consumer
162
+ }
163
+ Err(err) => {
164
+ log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
165
+ return Err(RabbitMqError::LapinError(err));
166
+ }
151
167
}
152
168
};
153
169
@@ -172,17 +188,19 @@ impl RabbitmqClient {
172
188
let message = match serde_json::from_str::<Message>(message_str) {
173
189
Ok(m) => m,
174
190
Err(e) => {
175
- log::error!("Failed to parse message: {}", e);
191
+ log::error!("Failed to parse message: {:? }", e);
176
192
return Err(RabbitMqError::DeserializationError);
177
193
}
178
194
};
179
195
180
196
if message.message_id == message_id {
181
197
if ack_on_success {
182
- delivery
198
+ if let Err(delivery_error) = delivery
183
199
.ack(lapin::options::BasicAckOptions::default())
184
200
.await
185
- .expect("Failed to acknowledge message");
201
+ {
202
+ log::error!("Failed to acknowledge message: {:?}", delivery_error);
203
+ }
186
204
}
187
205
188
206
return Ok(message);
@@ -196,7 +214,7 @@ impl RabbitmqClient {
196
214
&self,
197
215
queue_name: &str,
198
216
handle_message: fn(Message) -> Result<Message, lapin::Error>,
199
- ) -> Result<(), lapin::Error > {
217
+ ) -> Result<(), RabbitMqError > {
200
218
let mut consumer = {
201
219
let channel = self.channel.lock().await;
202
220
@@ -210,8 +228,14 @@ impl RabbitmqClient {
210
228
.await;
211
229
212
230
match consumer_res {
213
- Ok(consumer) => consumer,
214
- Err(err) => panic!("Cannot consume messages: {}", err),
231
+ Ok(consumer) => {
232
+ log::info!("Established queue connection to {}", queue_name);
233
+ consumer
234
+ }
235
+ Err(err) => {
236
+ log::error!("Cannot create consumer for queue {}: {:?}", queue_name, err);
237
+ return Err(RabbitMqError::LapinError(err));
238
+ }
215
239
}
216
240
};
217
241
@@ -221,8 +245,8 @@ impl RabbitmqClient {
221
245
let delivery = match delivery {
222
246
Ok(del) => del,
223
247
Err(err) => {
224
- log::error!("Error receiving message: {}", err);
225
- return Err(err);
248
+ log::error!("Error receiving message: {:? }", err);
249
+ return Err(RabbitMqError::LapinError( err) );
226
250
}
227
251
};
228
252
@@ -233,32 +257,32 @@ impl RabbitmqClient {
233
257
str
234
258
}
235
259
Err(err) => {
236
- log::error!("Error decoding message: {}", err);
237
- return Ok(() );
260
+ log::error!("Error decoding message: {:? }", err);
261
+ return Err(RabbitMqError::DeserializationError );
238
262
}
239
263
};
240
264
// Parse the message
241
265
let inc_message = match serde_json::from_str::<Message>(message_str) {
242
266
Ok(mess) => mess,
243
267
Err(err) => {
244
- log::error!("Error parsing message: {}", err);
245
- return Ok(() );
268
+ log::error!("Error parsing message: {:? }", err);
269
+ return Err(RabbitMqError::DeserializationError );
246
270
}
247
271
};
248
272
249
273
let message = match handle_message(inc_message) {
250
274
Ok(mess) => mess,
251
275
Err(err) => {
252
- log::error!("Error handling message: {}", err);
253
- return Ok(() );
276
+ log::error!("Error handling message: {:? }", err);
277
+ return Err(RabbitMqError::DeserializationError );
254
278
}
255
279
};
256
280
257
281
let message_json = match serde_json::to_string(&message) {
258
282
Ok(json) => json,
259
283
Err(err) => {
260
- log::error!("Error serializing message: {}", err);
261
- return Ok(() );
284
+ log::error!("Error serializing message: {:? }", err);
285
+ return Err(RabbitMqError::SerializationError );
262
286
}
263
287
};
264
288
@@ -267,10 +291,12 @@ impl RabbitmqClient {
267
291
}
268
292
269
293
// Acknowledge the message
270
- delivery
294
+ if let Err(delivery_error) = delivery
271
295
.ack(lapin::options::BasicAckOptions::default())
272
296
.await
273
- .expect("Failed to acknowledge message");
297
+ {
298
+ log::error!("Failed to acknowledge message: {:?}", delivery_error);
299
+ }
274
300
}
275
301
276
302
Ok(())
0 commit comments