您好,登录后才能下订单哦!
密码登录
            
            
            
            
        登录注册
            
            
            
        点击 登录注册 即表示同意《亿速云用户服务条款》
        # SpringMVC消费RabbitMQ队列的示例分析
## 引言
在现代分布式系统架构中,消息队列作为解耦生产者和消费者的重要组件,被广泛应用于异步处理、应用解耦、流量削峰等场景。RabbitMQ作为实现了AMQP协议的开源消息代理,以其可靠性、灵活的路由机制和跨平台特性成为企业级应用的首选。本文将深入探讨如何在SpringMVC框架中集成RabbitMQ消费者,通过完整示例演示从环境搭建到消息处理的完整流程。
## 一、环境准备与依赖配置
### 1.1 RabbitMQ环境搭建
首先需要确保RabbitMQ服务已正确安装并运行:
```bash
# Docker方式启动RabbitMQ(推荐)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
访问http://localhost:15672可进入管理控制台(默认账号/密码:guest/guest)
在Maven项目的pom.xml中添加必要依赖:
<!-- Spring MVC基础依赖 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webmvc</artifactId>
    <version>5.3.18</version>
</dependency>
<!-- RabbitMQ集成依赖 -->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.4.4</version>
</dependency>
<!-- JSON处理 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.3</version>
</dependency>
创建RabbitMQ配置类RabbitMQConfig.java:
@Configuration
public class RabbitMQConfig {
    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.port}")
    private int port;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(host);
        factory.setPort(port);
        return factory;
    }
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
在配置类中继续添加队列和交换机的声明:
@Bean
public DirectExchange orderExchange() {
    return new DirectExchange("order.exchange", true, false);
}
@Bean
public Queue orderQueue() {
    return new Queue("order.queue", true);
}
@Bean
public Binding orderBinding() {
    return BindingBuilder.bind(orderQueue())
            .to(orderExchange())
            .with("order.routingKey");
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(10);
    factory.setPrefetchCount(5);
    factory.setMessageConverter(jsonMessageConverter());
    return factory;
}
创建订单消息消费者OrderMessageConsumer.java:
@Component
public class OrderMessageConsumer {
    @RabbitListener(queues = "order.queue")
    public void processOrder(Order order) {
        System.out.println("Received order: " + order);
        // 业务处理逻辑...
    }
}
@RabbitListener(queues = "order.queue")
public void processOrderWithAck(Order order, Channel channel, 
        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    try {
        // 业务处理
        channel.basicAck(tag, false);
    } catch (Exception e) {
        channel.basicNack(tag, false, true); // 重新入队
    }
}
@RabbitListener(queues = "batch.order.queue", containerFactory = "batchFactory")
public void processBatchOrders(List<Order> orders) {
    orders.forEach(order -> {
        // 批量处理逻辑
    });
}
对应的批量容器工厂配置:
@Bean
public SimpleRabbitListenerContainerFactory batchFactory(
        ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setBatchListener(true); // 启用批量模式
    factory.setBatchSize(50);
    return factory;
}
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        // 记录错误日志
        // 发送告警通知
        // 自定义恢复逻辑
    }
}
在容器工厂中配置:
factory.setErrorHandler(new CustomErrorHandler());
@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("order.dead.letter.queue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("dead.letter.exchange");
}
@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("dead.letter.routingKey");
}
@Bean
public Queue orderQueueWithDLQ() {
    return QueueBuilder.durable("order.queue.with.dlq")
            .withArgument("x-dead-letter-exchange", "dead.letter.exchange")
            .withArgument("x-dead-letter-routing-key", "dead.letter.routingKey")
            .build();
}
# application.properties
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=20
@Bean
public RabbitListenerEndpointRegistry endpointRegistry() {
    return new RabbitListenerEndpointRegistry();
}
@Scheduled(fixedRate = 60000)
public void monitorQueueConsumers() {
    endpointRegistry().getListenerContainers().forEach(container -> {
        System.out.println("Queue: " + Arrays.toString(container.getQueueNames()));
        System.out.println("Active consumers: " + container.getActiveConsumerCount());
    });
}
@RestController
@RequestMapping("/orders")
public class OrderController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostMapping
    public ResponseEntity<String> createOrder(@RequestBody Order order) {
        rabbitTemplate.convertAndSend("order.exchange", 
                "order.routingKey", order);
        return ResponseEntity.ok("Order submitted");
    }
}
@Service
public class OrderProcessingService {
    @RabbitListener(queues = "order.queue")
    public void processOrder(Order order) {
        // 1. 订单验证
        validateOrder(order);
        
        // 2. 库存扣减
        reduceInventory(order);
        
        // 3. 支付处理
        processPayment(order);
        
        // 4. 物流通知
        notifyLogistics(order);
    }
    
    // 各业务方法实现...
}
解决方案: - 实现幂等性处理 - 使用Redis记录已处理消息ID - 数据库唯一约束防止重复处理
@RabbitListener(queues = "order.queue")
public void processOrderWithIdempotent(Order order) {
    if (redisTemplate.opsForValue().setIfAbsent(
            "order:" + order.getId(), "processing", 24, TimeUnit.HOURS)) {
        // 实际业务处理
    }
}
解决方案: - 增加消费者实例 - 优化消费逻辑 - 设置合理的prefetch count - 启用惰性队列
@Bean
public Queue lazyQueue() {
    return QueueBuilder.durable("lazy.queue")
            .withArgument("x-queue-mode", "lazy")
            .build();
}
通过本文的示例,我们完整演示了SpringMVC集成RabbitMQ消费者的实现过程。关键实践要点包括:
随着业务规模扩大,可考虑引入Spring Cloud Stream等更高级的抽象层,进一步简化消息驱动的微服务开发。
# 查看队列状态
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 查看消费者
rabbitmqctl list_consumers
”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。