RabbitMQ支持多个消费者并行消费同一队列的消息,通过增加消费者实例(如启动10个消费者),可将队列处理能力提升至原来的10倍(前提是消费者处理逻辑可并行)。操作方式:
concurrentConsumers属性设置初始并发消费者数量(如@RabbitListener(concurrentConsumers = 10));消费者处理慢是堆积的根本原因之一,需通过以下方式优化:
JdbcTemplate.batchUpdate),或用异步方式处理非核心逻辑(如将短信发送任务投递到另一个消息队列,避免阻塞主流程);ThreadPoolTaskExecutor),但需注意线程安全(如避免共享资源竞争)。RabbitMQ默认会一次性给消费者发送10条消息(prefetchCount=10),若消费者处理慢,这些消息会堆积在消费者本地,导致队列显示“未确认消息”增多。通过basicQos方法限制预取数量(如channel.basicQos(1)),强制RabbitMQ将消息分发给其他空闲消费者,提高整体吞吐量。示例(Java):
channel.basicQos(1); // 消费者最多同时处理1条未确认消息
若消息量极大(如百万级)且堆积时间长,默认的“内存存储”模式会导致内存溢出(OOM)。惰性队列会将消息直接存入磁盘,消费者消费时再加载到内存,支持百万级消息存储,避免内存压力。设置方式:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues(将所有以lazy-开头的队列设为惰性);QueueBuilder.lazy()声明惰性队列。示例:@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.lazy() // 开启惰性队列
.build();
}
无法处理的消息(如解析失败的JSON、多次重试仍失败的消息)会堆积在原队列,占用空间并影响正常消息处理。通过死信队列将这些消息隔离:
nack(拒绝)且requeue=false、消息过期(x-message-ttl)、队列满(x-max-length);channel.exchangeDeclare("dlx_exchange", "direct")、channel.queueDeclare("dlq", true, false, false, null);channel.queueDeclare("main_queue", true, false, false, Map.of("x-dead-letter-exchange", "dlx_exchange", "x-dead-letter-routing-key", "dlq_key"))。若生产者发送速度远超过消费者处理能力,需从源头限制流量:
RateLimiter)控制发送速率(如每秒最多发送100条);通过监控实时跟踪队列状态,提前预警堆积风险:
messages)、生产者发送速率(message_rate_in)、消费者处理速率(message_rate_out);rabbitmqctl list_queues定期检查队列长度);messages超过1000或message_rate_out/message_rate_in < 0.5(处理速度低于发送速度的一半)时,触发邮件/短信告警,通知运维人员介入。