RabbitMQ消息丢失通常发生在生产者→RabbitMQ→消费者链路的三类场景中,需针对性解决:
生产者发送消息时,可能因网络问题、RabbitMQ未接收等原因导致消息丢失,需通过确认机制确保消息到达。
channel.confirmSelect()),每条消息分配唯一ID。RabbitMQ成功接收并持久化消息后,会回调handleAck方法(确认);若失败则回调handleNack方法(否定确认),生产者可据此重发。channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
System.out.println("消息确认成功:" + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("消息确认失败:" + deliveryTag + ",准备重发");
// 实现重发逻辑(如从内存队列重新发送)
}
});
channel.txSelect()开启事务,发送消息后调用channel.txCommit()提交(成功)或channel.txRollback()回滚(失败)。事务会阻塞生产者,降低吞吐量,仅适合对可靠性要求极高的场景。RabbitMQ默认将消息存于内存,若服务器宕机、重启或未持久化,消息会丢失,需通过持久化配置和高可用集群保障。
durable=true(元数据持久化);durable=true(元数据持久化);deliveryMode=2(内容持久化到磁盘)。// 创建持久化Exchange
channel.exchangeDeclare("exchange_name", "direct", true);
// 创建持久化Queue
channel.queueDeclare("queue_name", true, false, false, null);
// 发送持久化消息
channel.basicPublish("exchange_name", "routing_key",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
ha-all模式,所有节点同步):rabbitmqctl set_policy ha-all "^ha_queue$" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
此模式下,消息会同步到集群的所有节点,即使某个节点宕机,其他节点仍能提供服务。消费者处理消息时,可能因业务异常、服务宕机等原因未完成处理,需通过ACK确认机制确保消息被正确消费。
autoAck=false(默认是true,自动确认)。业务处理完成后,手动调用channel.basicAck确认;若处理失败,调用channel.basicNack或channel.basicReject拒绝,消息会重新入队(或进入死信队列)。channel.basicConsume("queue_name", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
// 业务处理逻辑
System.out.println("收到消息:" + new String(body));
// 处理成功,手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息(requeue=true表示重新入队)
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 死信路由键
channel.queueDeclare("queue_name", true, false, false, args);
/var/lib/rabbitmq)有足够空间,避免因磁盘满导致消息无法持久化;/var/log/rabbitmq/rabbit@hostname.log),定位消息丢失的具体原因(如连接中断、权限问题);setenforce 0临时关闭,或修改/etc/selinux/config)。通过以上步骤,可覆盖RabbitMQ消息丢失的主要场景,确保CentOS环境下消息的可靠传输与处理。