RabbitMQ的消息确认机制分为生产者端确认(保障消息到达Broker)和消费者端确认(保障消息被正确处理)两部分,以下是在Ubuntu系统上的具体设置步骤:
生产者端确认需通过Confirm机制(消息到达Exchange)和Return机制(消息路由到Queue失败时退回)实现,以下是配置步骤:
Ubuntu系统需先安装Erlang(RabbitMQ依赖)和RabbitMQ Server:
# 更新软件包
sudo apt update
# 安装Erlang(版本需匹配RabbitMQ要求,如25.x)
sudo apt install erlang
# 添加RabbitMQ官方仓库并安装
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash
sudo apt install rabbitmq-server
# 启动服务并设置开机自启
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
安装完成后,通过sudo systemctl status rabbitmq-server检查服务状态(应为active (running))。
Confirm模式需通过代码配置(以Java为例),核心步骤如下:
channel.confirmSelect()将信道设置为Confirm模式,后续发送的消息会被RabbitMQ跟踪。channel.waitForConfirms()阻塞等待RabbitMQ返回确认(true为成功,false为失败),可设置超时时间。ConfirmCallback回调处理确认结果,避免阻塞主线程。// 开启Confirm模式
channel.confirmSelect();
// 异步确认配置
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息成功到达Exchange");
} else {
System.out.println("消息到达Exchange失败,原因:" + cause);
// 触发重试逻辑
}
});
需确保publisher-confirms参数为true(Spring Boot中可通过application.yml配置)。
若消息无法路由到Queue(如Queue不存在、Routing Key不匹配),需通过Return机制将消息退回给生产者:
publisher-returns为true。mandatory=true,否则RabbitMQ会直接丢弃无法路由的消息。rabbitTemplate.setReturnsCallback处理退回的消息。// 开启Return模式
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.out.println("消息路由失败,退回内容:" + new String(returnedMessage.getMessage().getBody()));
System.out.println("错误码:" + returnedMessage.getReplyCode());
System.out.println("错误原因:" + returnedMessage.getReplyText());
});
// 发送消息时设置Mandatory
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("exchange_name", "routing_key", "message_body");
需确保publisher-returns参数为true。
消费者端确认需通过手动ACK(避免自动ACK导致消息丢失)实现,以下是配置步骤:
消费者端默认使用自动ACK(消息一旦被接收即从队列删除),需改为手动ACK以控制消息确认时机:
SimpleMessageListenerContainer中设置acknowledge-mode=MANUAL。@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("your_queue_name");
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 处理消息逻辑
System.out.println("收到消息:" + new String(message.getBody()));
// 手动ACK(参数1:deliveryTag,参数2:是否批量确认)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,否定ACK(参数3:是否重新入队)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
});
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 关键配置
return container;
}
ConnectionFactory时无需特殊设置,但在消费时需手动调用basicAck或basicNack。channel.basicAck(deliveryTag, false),告知RabbitMQ删除该消息。basicNack:处理失败时调用,可指定是否重新入队(requeue=true则重新放回队列,false则进入死信队列)。basicReject:类似basicNack,但一次只能拒绝单条消息。onMessage方法中的catch块)。durable=true),示例:@Bean
public Queue queue() {
return new Queue("your_queue_name", true); // durable=true
}
prefetch(一次拉取的消息数),避免消费者过载。通过以上配置,可实现RabbitMQ在生产者端和消费者端的消息可靠确认,覆盖“消息发送-路由-处理”的全链路可靠性保障。