您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# SpringMVC消费RabbitMQ队列的示例分析
## 引言
在现代分布式系统架构中,消息队列作为解耦生产者和消费者的重要组件,被广泛应用于异步处理、应用解耦、流量削峰等场景。RabbitMQ作为实现了AMQP协议的开源消息代理,以其可靠性、灵活的路由机制和跨平台特性成为企业级应用的首选。本文将深入探讨如何在SpringMVC框架中集成RabbitMQ消费者,通过完整示例演示从环境搭建到消息处理的完整流程。
## 一、环境准备与依赖配置
### 1.1 RabbitMQ环境搭建
首先需要确保RabbitMQ服务已正确安装并运行:
```bash
# Docker方式启动RabbitMQ(推荐)
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
访问http://localhost:15672
可进入管理控制台(默认账号/密码:guest/guest)
在Maven项目的pom.xml中添加必要依赖:
<!-- Spring MVC基础依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.3.18</version>
</dependency>
<!-- RabbitMQ集成依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.4.4</version>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
创建RabbitMQ配置类RabbitMQConfig.java
:
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.host}")
private String host;
@Value("${rabbitmq.port}")
private int port;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
在配置类中继续添加队列和交换机的声明:
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.exchange", true, false);
}
@Bean
public Queue orderQueue() {
return new Queue("order.queue", true);
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange())
.with("order.routingKey");
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(5);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
创建订单消息消费者OrderMessageConsumer.java
:
@Component
public class OrderMessageConsumer {
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
System.out.println("Received order: " + order);
// 业务处理逻辑...
}
}
@RabbitListener(queues = "order.queue")
public void processOrderWithAck(Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
// 业务处理
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true); // 重新入队
}
}
@RabbitListener(queues = "batch.order.queue", containerFactory = "batchFactory")
public void processBatchOrders(List<Order> orders) {
orders.forEach(order -> {
// 批量处理逻辑
});
}
对应的批量容器工厂配置:
@Bean
public SimpleRabbitListenerContainerFactory batchFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setBatchListener(true); // 启用批量模式
factory.setBatchSize(50);
return factory;
}
public class CustomErrorHandler implements ErrorHandler {
@Override
public void handleError(Throwable t) {
// 记录错误日志
// 发送告警通知
// 自定义恢复逻辑
}
}
在容器工厂中配置:
factory.setErrorHandler(new CustomErrorHandler());
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dead.letter.queue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange");
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead.letter.routingKey");
}
@Bean
public Queue orderQueueWithDLQ() {
return QueueBuilder.durable("order.queue.with.dlq")
.withArgument("x-dead-letter-exchange", "dead.letter.exchange")
.withArgument("x-dead-letter-routing-key", "dead.letter.routingKey")
.build();
}
# application.properties
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=20
@Bean
public RabbitListenerEndpointRegistry endpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Scheduled(fixedRate = 60000)
public void monitorQueueConsumers() {
endpointRegistry().getListenerContainers().forEach(container -> {
System.out.println("Queue: " + Arrays.toString(container.getQueueNames()));
System.out.println("Active consumers: " + container.getActiveConsumerCount());
});
}
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
public ResponseEntity<String> createOrder(@RequestBody Order order) {
rabbitTemplate.convertAndSend("order.exchange",
"order.routingKey", order);
return ResponseEntity.ok("Order submitted");
}
}
@Service
public class OrderProcessingService {
@RabbitListener(queues = "order.queue")
public void processOrder(Order order) {
// 1. 订单验证
validateOrder(order);
// 2. 库存扣减
reduceInventory(order);
// 3. 支付处理
processPayment(order);
// 4. 物流通知
notifyLogistics(order);
}
// 各业务方法实现...
}
解决方案: - 实现幂等性处理 - 使用Redis记录已处理消息ID - 数据库唯一约束防止重复处理
@RabbitListener(queues = "order.queue")
public void processOrderWithIdempotent(Order order) {
if (redisTemplate.opsForValue().setIfAbsent(
"order:" + order.getId(), "processing", 24, TimeUnit.HOURS)) {
// 实际业务处理
}
}
解决方案: - 增加消费者实例 - 优化消费逻辑 - 设置合理的prefetch count - 启用惰性队列
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable("lazy.queue")
.withArgument("x-queue-mode", "lazy")
.build();
}
通过本文的示例,我们完整演示了SpringMVC集成RabbitMQ消费者的实现过程。关键实践要点包括:
随着业务规模扩大,可考虑引入Spring Cloud Stream等更高级的抽象层,进一步简化消息驱动的微服务开发。
# 查看队列状态
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 查看消费者
rabbitmqctl list_consumers
”`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。