Skip to content

Commit 55f5bd7

Browse files
committed
refactor(rabbitmq): 移除消息处理包装器并直接使用回调
简化消息消费逻辑,直接使用传入的回调函数而非包装器 移除冗余的异常处理,由调用方自行控制消息确认行为 添加 to_tuple 参数以明确消费模式
1 parent 6ccefe1 commit 55f5bd7

File tree

1 file changed

+2
-10
lines changed

1 file changed

+2
-10
lines changed

src/use_rabbitmq/__init__.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -370,28 +370,20 @@ def start_consuming(
370370
:param kwargs: 其他参数
371371
"""
372372
channel = self.get_channel(channel_id)
373-
374-
def wrapper(message: Message):
375-
try:
376-
callback(message)
377-
message.ack()
378-
except Exception as exc:
379-
logger.exception(f"Error processing message: {exc}")
380-
message.nack(requeue=True)
381373

382374
# 设置预取数量
383375
channel.basic.qos(prefetch_count=prefetch)
384376

385377
# 开始消费
386378
channel.basic.consume(
387-
callback=wrapper,
379+
callback=callback,
388380
queue=queue_name,
389381
**kwargs
390382
)
391383

392384
# 启动消费循环
393385
try:
394-
channel.start_consuming()
386+
channel.start_consuming(to_tuple=False)
395387
except KeyboardInterrupt:
396388
logger.info("Consuming interrupted by user")
397389
channel.stop_consuming()

0 commit comments

Comments
 (0)