在CentOS上应用RabbitMQ消息确认机制前,需完成基础环境配置:
sudo yum install rabbitmq-server),启动服务(sudo systemctl start rabbitmq-server)并设置开机自启(sudo systemctl enable rabbitmq-server)。sudo rabbitmqctl add_user admin 123456、sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*")。生产者确认机制用于确保消息成功到达RabbitMQ Broker并正确路由到目标队列,分为同步确认、批量确认、异步确认三种方式。
每发送一条消息后,调用waitForConfirms()等待Broker返回确认结果(成功返回true,失败返回false)。
示例代码(Java):
Channel channel = connection.createChannel();
channel.queueDeclare("confirm_queue", true, false, false, null); // 队列持久化
channel.confirmSelect(); // 开启Confirm模式
for (int i = 0; i < 10; i++) {
String message = "Message-" + i;
channel.basicPublish("", "confirm_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("消息[" + message + "]发送成功");
} else {
System.out.println("消息[" + message + "]发送失败,需重试");
}
}
适用场景:对消息可靠性要求极高的场景(如金融交易),但性能较差。
每发送N条消息后,调用一次waitForConfirms()批量确认。
示例代码(Java):
Channel channel = connection.createChannel();
channel.queueDeclare("confirm_queue", true, false, false, null);
channel.confirmSelect();
int batchSize = 100;
for (int i = 0; i < 1000; i++) {
String message = "Message-" + i;
channel.basicPublish("", "confirm_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
if (i % batchSize == 0) {
channel.waitForConfirms(); // 批量确认
System.out.println("批量确认完成:" + i + "-" + (i + batchSize - 1));
}
}
适用场景:兼顾可靠性与性能的场景(如日志收集),减少网络往返次数。
通过addConfirmListener注册回调函数,处理成功(handleAck)与失败(handleNack)的情况。需维护未确认消息缓存(如ConcurrentSkipListMap),记录消息ID与内容,便于失败后重发。
示例代码(Java):
Channel channel = connection.createChannel();
channel.queueDeclare("confirm_queue", true, false, false, null);
channel.confirmSelect();
// 存储未确认消息(key: deliveryTag, value: message)
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// 成功回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 批量删除已确认的消息
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("消息确认成功,deliveryTag:" + deliveryTag);
};
// 失败回调
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
System.out.println("消息确认失败,需重发:" + message + ", deliveryTag:" + deliveryTag);
// 重发逻辑(如重新调用basicPublish)
};
channel.addConfirmListener(ackCallback, nackCallback);
for (int i = 0; i < 1000; i++) {
String message = "Message-" + i;
channel.basicPublish("", "confirm_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
outstandingConfirms.put(channel.getNextPublishSeqNo(), message); // 记录未确认消息
}
适用场景:高并发生产环境(如电商订单系统),性能最优。
消费者确认机制用于确保消息被正确处理后再从队列中移除,避免消息丢失。需将autoAck设置为false(关闭自动确认),手动调用basicAck(成功)、basicNack(失败重试)、basicReject(失败丢弃)。
示例代码(Python):
import pika
def callback(ch, method, properties, body):
try:
print(f"收到消息:{body.decode()}")
# 模拟业务处理(如数据库操作)
# 如果处理成功,发送ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
print("消息确认成功")
except Exception as e:
print(f"消息处理失败:{e}")
# 失败后重新入队(可设置requeue=True)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 连接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.230.131'))
channel = connection.channel()
# 声明队列(确保队列存在)
channel.queue_declare(queue='confirm_queue', durable=True)
# 关闭自动确认,设置prefetch_count(避免消费者过载)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='confirm_queue', on_message_callback=callback)
print("等待消息...")
channel.start_consuming()
关键参数说明:
autoAck=False:关闭自动确认,需手动发送ACK。basic_ack:确认消息处理成功,RabbitMQ将消息从队列中移除。basicNack:requeue=True表示重新入队(由其他消费者处理),requeue=False表示丢弃或进入死信队列。basicReject:与basicNack类似,但不支持批量拒绝。autoAck=True(默认值),消费者收到消息后立即发送ACK,RabbitMQ立即移除消息。若消费者处理过程中发生异常,消息会丢失。
适用场景:测试环境或对可靠性要求极低的场景(如实时通知)。
为确保RabbitMQ宕机后消息不丢失,需配合队列持久化与消息持久化:
durable=True(如channel.queueDeclare("confirm_queue", true, false, false, null))。deliveryMode=2(如MessageProperties.PERSISTENT_TEXT_PLAIN)。channel.confirmSelect()),确认回调中是否有ack=false的情况(需重发或记录日志)。autoAck是否为false,确认业务处理逻辑是否有异常(如数据库连接失败)。通过以上步骤,可在CentOS上实现RabbitMQ消息确认机制,确保消息从生产者到消费者的全链路可靠性。