在CentOS上处理RabbitMQ消息延迟,推荐使用延迟插件或TTL+死信队列方案,以下是具体步骤:
rabbitmq-delayed-message-exchange
插件(如RabbitMQ 3.9.x对应插件版本3.9.0)。.ez
格式)复制到RabbitMQ安装目录的plugins
目录下。rabbitmq-plugins enable rabbitmq_delayed_message_exchange
systemctl restart rabbitmq-server
代码示例(Spring Boot):
// 声明延迟交换机(类型为x-delayed-message)
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); // 底层路由模式
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
}
// 声明延迟队列并绑定交换机
@Bean
public Queue delayedQueue() {
return new Queue("delayed_queue", true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delayed_routing_key");
}
发送延迟消息:
rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", "消息内容",
message -> {
message.getMessageProperties().setDelay(5000); // 设置延迟5秒(单位:毫秒)
return message;
});
消费者监听:
@RabbitListener(queues = "delayed_queue")
public void handleMessage(String message) {
System.out.println("处理延迟消息: " + message);
}
创建普通队列,设置消息TTL(存活时间)和死信交换机:
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("ttl_queue")
.withArgument("x-message-ttl", 10000) // 10秒TTL
.withArgument("x-dead-letter-exchange", "dlx_exchange") // 死信交换机
.withArgument("x-dead-letter-routing-key", "delayed_queue") // 死信路由键
.build();
}
// 声明死信队列
@Bean
public Queue delayedQueue() {
return new Queue("delayed_queue", true);
}
发送消息:直接发送到普通队列,消息到期后自动转入死信队列被消费。
x-max-length
限制队列长度。tail -f /var/log/rabbitmq/rabbit@hostname.log
rabbitmqctl list_queues
查看队列消息堆积情况,确认延迟是否符合预期。RabbitMQ Management UI
(端口15672)实时监控队列状态和消息流动。以上方案可有效解决CentOS下RabbitMQ的消息延迟问题,优先选择插件方案以获得更高的灵活性和性能。