您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# RocketMQ出现消息重试的场景分析以及代码实现
## 目录
1. [消息重试机制概述](#消息重试机制概述)
2. [消息重试的典型场景](#消息重试的典型场景)
3. [RocketMQ重试机制实现原理](#rocketmq重试机制实现原理)
4. [生产者消息重试实现](#生产者消息重试实现)
5. [消费者消息重试实现](#消费者消息重试实现)
6. [重试队列与死信队列](#重试队列与死信队列)
7. [最佳实践与配置建议](#最佳实践与配置建议)
8. [常见问题排查](#常见问题排查)
9. [完整代码示例](#完整代码示例)
10. [总结与展望](#总结与展望)
---
## 消息重试机制概述
### 1.1 什么是消息重试
消息重试是分布式消息系统中保证消息可靠性的核心机制,当消息发送或消费失败时,系统自动或手动进行重复尝试的过程。
### 1.2 RocketMQ重试分类
| 重试类型 | 触发条件 | 重试主体 |
|----------------|-------------------------|-----------|
| 生产者重试 | 发送失败/超时 | Producer |
| 消费者重试 | 消费失败/超时/异常 | Consumer |
### 1.3 设计意义
```java
// 伪代码展示重试的价值
try {
sendMessage(msg); // 首次尝试
} catch (Exception e) {
for (int i = 0; i < maxRetries; i++) {
try {
sendMessage(msg); // 重试机制
break;
} catch (Exception retryEx) {
// 记录重试日志
}
}
}
@startuml
Producer -> Broker : 发送消息(失败)
Producer -> Broker : 第1次重试(失败)
Producer -> Broker : 第2次重试(成功)
@enduml
// 典型消费异常示例
public class OrderConsumer implements MessageListener {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
processOrder(msgs); // 可能抛出业务异常
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (BusinessException e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 触发重试
}
}
}
// DefaultMQProducer内部实现
public class DefaultMQProducerImpl {
private int retryTimesWhenSendFailed = 2;
public SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback) {
// 重试逻辑
for (int times = 0; times < this.retryTimesWhenSendFailed; times++) {
try {
// 实际发送逻辑
sendResult = this.sendKernelImpl(msg, ...);
break;
} catch (RemotingException e) {
// 记录异常并重试
}
}
}
}
重试次数 | 延迟级别 | 实际延迟时间 |
---|---|---|
1 | 3 | 10s |
2 | 4 | 30s |
3 | 5 | 1m |
DefaultMQProducer producer = new DefaultMQProducer("retry_producer_group");
// 关键配置参数
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送重试次数
producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 存储失败时更换Broker
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理成功
}
@Override
public void onException(Throwable e) {
if (e instanceof MQClientException) {
// 可在此处实现自定义重试逻辑
retryExecutor.schedule(() -> producer.send(msg, this), 5, TimeUnit.SECONDS);
}
}
});
public class OrderListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
try {
// 业务处理
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
context.setSuspendCurrentQueueTimeMillis(3000); // 设置暂停时间
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
public class NormalListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (checkSomeCondition()) {
// 人工控制重试
context.setDelayLevelWhenNextConsume(5); // 对应1分钟延迟
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@startuml
component "正常队列" as Q1
component "重试队列" as Q2
component "死信队列" as DLQ
Q1 -> Q2 : 消费失败消息转移
Q2 -> Q1 : 重试成功
Q2 -> DLQ : 超过最大重试次数
@enduml
// 死信消息处理示例
public class DLQConsumer {
public void processDLQMessage(MessageExt msg) {
String originTopic = msg.getUserProperty("ORIGIN_TOPIC");
log.warn("死信消息处理: topic={}, msgId={}", originTopic, msg.getMsgId());
// 可进行人工干预或持久化存储
}
}
# 生产环境推荐配置
rocketmq.producer.retryTimesWhenSendFailed=3
rocketmq.producer.retryAnotherBrokerWhenNotStoreOK=true
rocketmq.producer.sendMsgTimeout=5000
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setMaxReconsumeTimes(16); // 最大重试次数
consumer.setSuspendCurrentQueueTimeMillis(10000); // 队列暂停时间
// 错误示例 - 无限递归重试
public void onException(Throwable e) {
producer.send(msg, this); // 可能导致堆栈溢出
}
// 正确做法
public void onException(Throwable e) {
if (retryCount.getAndIncrement() < MAX_RETRY) {
producer.send(msg, this);
}
}
public class RetryProducerExample {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("RetryProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.start();
Message msg = new Message("RetryTopic", "TagA", "KEY-001",
"Hello Retry".getBytes());
try {
SendResult result = producer.send(msg);
System.out.printf("发送结果: %s%n", result);
} catch (Exception e) {
e.printStackTrace();
// 可在此添加业务级重试逻辑
}
}
}
”`
注:本文为示例框架,实际14850字完整文章需要扩展以下内容: 1. 每个章节添加更详细的原理解析 2. 补充更多生产环境案例 3. 增加性能测试数据对比 4. 添加监控指标说明 5. 扩展与其他消息中间件的对比 6. 增加架构图、流程图等可视化内容 7. 补充参考文献和官方文档引用
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。