您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# SpringBoot 中怎么整合RabbitMQ
## 一、RabbitMQ 简介
RabbitMQ 是一个开源的消息代理和队列服务器,用于在分布式系统之间异步传递消息。它实现了高级消息队列协议(AMQP),具有以下核心特性:
1. **可靠性**:支持持久化、传输确认和发布确认
2. **灵活的路由**:通过交换机和路由规则实现
3. **集群扩展**:支持多节点集群
4. **多协议支持**:除AMQP外还支持STOMP、MQTT等
5. **多语言客户端**:支持Java、Python、Ruby等主流语言
### AMQP 核心概念
- **Connection**:应用程序与Broker的TCP连接
- **Channel**:虚拟连接,复用TCP连接
- **Exchange**:消息路由组件,接收生产者消息并推送到队列
- **Queue**:存储消息的缓冲区
- **Binding**:Exchange和Queue之间的虚拟连接
## 二、SpringBoot 集成 RabbitMQ 基础配置
### 1. 添加依赖
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 高级配置
listener:
simple:
prefetch: 10 # 每次从Broker拉取的消息数量
concurrency: 5 # 最小消费者数量
max-concurrency: 10 # 最大消费者数量
acknowledge-mode: manual # 手动ACK
@Configuration
public class RabbitConfig {
@Bean
public Queue demoQueue() {
return new Queue("demo.queue", true); // 持久化队列
}
@Bean
public DirectExchange demoExchange() {
return new DirectExchange("demo.exchange");
}
@Bean
public Binding bindingDemo(Queue demoQueue, DirectExchange demoExchange) {
return BindingBuilder.bind(demoQueue)
.to(demoExchange)
.with("demo.routingKey");
}
// JSON消息转换器
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendSimpleMessage(String message) {
rabbitTemplate.convertAndSend(
"demo.exchange",
"demo.routingKey",
message
);
}
// 发送对象消息
public void sendObjectMessage(Order order) {
rabbitTemplate.convertAndSend(
"order.exchange",
"order.create",
order,
message -> {
message.getMessageProperties()
.setHeader("X-Order-Source", "WEB");
return message;
}
);
}
}
@Component
@RabbitListener(queues = "demo.queue")
public class MessageConsumer {
@RabbitHandler
public void process(String message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
System.out.println("Received: " + message);
// 业务处理...
channel.basicAck(tag, false); // 手动ACK
} catch (Exception e) {
channel.basicNack(tag, false, true); // 重试
}
}
// 处理对象消息
@RabbitHandler
public void processOrder(Order order,
@Header("X-Order-Source") String source) {
System.out.println("Received order from " + source);
}
}
@Configuration
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
rabbitTemplate.setMandatory(true);
}
@Override
public void confirm(CorrelationData correlationData,
boolean ack, String cause) {
if (ack) {
System.out.println("消息到达Exchange");
} else {
System.out.println("消息未到达Exchange: " + cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("消息未路由到Queue: " + returned.toString());
}
}
// 配置死信交换机和队列
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange");
}
@Bean
public Queue dlxQueue() {
return new Queue("dlx.queue");
}
@Bean
public Binding bindingDLX(Queue dlxQueue, DirectExchange dlxExchange) {
return BindingBuilder.bind(dlxQueue)
.to(dlxExchange)
.with("dlx.routingKey");
}
// 配置带TTL的队列
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60秒TTL
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routingKey");
return new Queue("delay.queue", true, false, false, args);
}
// 安装rabbitmq-delayed-message-exchange插件后
@Bean
public CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(
"delayed.exchange",
"x-delayed-message",
true,
false,
args
);
}
// 发送延迟消息
rabbitTemplate.convertAndSend(
"delayed.exchange",
"delayed.routingKey",
message,
msg -> {
msg.getMessageProperties()
.setHeader("x-delay", 5000); // 5秒延迟
return msg;
}
);
连接管理:
消息设计:
监控告警:
问题1:消息重复消费
// 幂等处理示例
@RabbitHandler
public void processOrder(Order order) {
if (redisTemplate.opsForValue()
.setIfAbsent("order:id:" + order.getId(), "1", 24, TimeUnit.HOURS)) {
// 业务处理
}
}
问题2:消息顺序性保证
问题3:消息堆积处理
// 批量发送
rabbitTemplate.invoke(template -> {
for (int i = 0; i < 100; i++) {
template.convertAndSend("exchange", "routingKey", "message" + i);
}
return null;
});
// 开启Publisher Confirm
spring.rabbitmq.publisher-confirm-type=correlated
spring:
rabbitmq:
listener:
direct:
consumers-per-queue: 5 # Direct监听器并发
simple:
batch-size: 50 # 批量消费
spring:
rabbitmq:
connection-timeout: 5000
cache:
channel.size: 50
connection.mode: CONNECTION
rabbitmq-demo/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── demo/
│ │ │ ├── config/
│ │ │ │ ├── RabbitConfig.java
│ │ │ │ └── RabbitConfirmConfig.java
│ │ │ ├── consumer/
│ │ │ ├── producer/
│ │ │ └── RabbitmqDemoApplication.java
│ │ └── resources/
│ │ └── application.yml
SpringBoot与RabbitMQ的整合提供了企业级消息解决方案,通过本文我们了解了:
实际项目中应根据业务需求选择合适的消息模式,并做好监控和异常处理,才能构建稳定可靠的异步消息系统。
# 查看队列列表
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# 查看交换机列表
rabbitmqctl list_exchanges
# 查看绑定关系
rabbitmqctl list_bindings
提示:本文代码示例基于SpringBoot 2.7.x和RabbitMQ 3.9.x版本 “`
注:本文实际约4500字,完整6000字版本可扩展以下内容: 1. 更详细的原理分析(如AMQP协议细节) 2. 更多生产环境配置示例(如SSL/TLS配置) 3. 与Spring Cloud Stream的对比 4. 更完整的异常处理案例 5. 性能基准测试数据
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。