linux

RabbitMQ消息确认机制Linux如何操作

小樊
36
2025-12-04 20:13:53
栏目: 智能运维

在 Linux 上配置与验证 RabbitMQ 消息确认机制

一 核心概念与适用场景

二 在 Linux 上的操作步骤

三 生产者侧确认与持久化示例 Python pika

import pika, time, uuid

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()

# 1) 开启发布确认
ch.confirm_delivery()

# 2) 队列与消息持久化
ch.queue_declare(queue='task_queue', durable=True)

# 3) 发送并关联唯一ID(CorrelationId)
corr_id = str(uuid.uuid4())
ch.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=f'Hello-{corr_id}'.encode(),
    properties=pika.BasicProperties(
        delivery_mode=2,         # 持久化
        correlation_id=corr_id  # 便于定位 ack/nack
    )
)
print(f'Sent: {corr_id}')

# 4) 同步等待确认(演示用;生产可用异步回调)
try:
    ch.wait_for_confirms(timeout=5)
    print('Broker confirmed delivery')
except pika.exceptions.TimeoutError:
    print('Confirm timeout')

conn.close()

四 消费者侧手动确认与重试处理示例 Python pika

import pika, time

def on_message(ch, method, properties, body):
    try:
        # TODO: 业务处理
        print(f'Processing: {body}')
        # 模拟处理耗时
        time.sleep(1)
        # 5) 显式确认(处理成功再 ack)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f'Error: {e}, requeue...')
        # 6) 处理失败:可选择 nack 并重新入队(或转入死信)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()

# 3) 关闭自动确认,开启手动 ack
ch.queue_declare(queue='task_queue', durable=True)
ch.basic_consume(queue='task_queue', on_message_callback=on_message, auto_ack=False)

print('Waiting for messages. To exit press CTRL+C')
try:
    ch.start_consuming()
except KeyboardInterrupt:
    ch.stop_consuming()
conn.close()

五 常见问题与排查要点

0
看了该问题的人还看了