RocketMQ出现消息重试的场景分析以及代码实现

发布时间:2021-09-07 07:57:37 作者:chen
来源:亿速云 阅读:168
# RocketMQ出现消息重试的场景分析以及代码实现

## 目录
1. [消息重试机制概述](#消息重试机制概述)
2. [消息重试的典型场景](#消息重试的典型场景)
3. [RocketMQ重试机制实现原理](#rocketmq重试机制实现原理)
4. [生产者消息重试实现](#生产者消息重试实现)
5. [消费者消息重试实现](#消费者消息重试实现)
6. [重试队列与死信队列](#重试队列与死信队列)
7. [最佳实践与配置建议](#最佳实践与配置建议)
8. [常见问题排查](#常见问题排查)
9. [完整代码示例](#完整代码示例)
10. [总结与展望](#总结与展望)

---

## 消息重试机制概述
### 1.1 什么是消息重试
消息重试是分布式消息系统中保证消息可靠性的核心机制,当消息发送或消费失败时,系统自动或手动进行重复尝试的过程。

### 1.2 RocketMQ重试分类
| 重试类型       | 触发条件                  | 重试主体   |
|----------------|-------------------------|-----------|
| 生产者重试     | 发送失败/超时            | Producer  |
| 消费者重试     | 消费失败/超时/异常       | Consumer  |

### 1.3 设计意义
```java
// 伪代码展示重试的价值
try {
    sendMessage(msg); // 首次尝试
} catch (Exception e) {
    for (int i = 0; i < maxRetries; i++) {
        try {
            sendMessage(msg); // 重试机制
            break;
        } catch (Exception retryEx) {
            // 记录重试日志
        }
    }
}

消息重试的典型场景

2.1 生产者端重试场景

2.1.1 网络波动

@startuml
Producer -> Broker : 发送消息(失败)
Producer -> Broker : 第1次重试(失败)
Producer -> Broker : 第2次重试(成功)
@enduml

2.1.2 Broker故障

2.2 消费者端重试场景

2.2.1 业务逻辑异常

// 典型消费异常示例
public class OrderConsumer implements MessageListener {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
        ConsumeConcurrentlyContext context) {
        try {
            processOrder(msgs); // 可能抛出业务异常
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (BusinessException e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 触发重试
        }
    }
}

2.2.2 依赖服务不可用


RocketMQ重试机制实现原理

3.1 生产者重试原理

// DefaultMQProducer内部实现
public class DefaultMQProducerImpl {
    private int retryTimesWhenSendFailed = 2;
    
    public SendResult sendDefaultImpl(
        Message msg, 
        final CommunicationMode communicationMode,
        final SendCallback sendCallback) {
        // 重试逻辑
        for (int times = 0; times < this.retryTimesWhenSendFailed; times++) {
            try {
                // 实际发送逻辑
                sendResult = this.sendKernelImpl(msg, ...);
                break;
            } catch (RemotingException e) {
                // 记录异常并重试
            }
        }
    }
}

3.2 消费者重试原理

重试次数 延迟级别 实际延迟时间
1 3 10s
2 4 30s
3 5 1m

生产者消息重试实现

4.1 同步发送重试配置

DefaultMQProducer producer = new DefaultMQProducer("retry_producer_group");
// 关键配置参数
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送重试次数
producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 存储失败时更换Broker

4.2 异步发送重试示例

producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 处理成功
    }
    
    @Override
    public void onException(Throwable e) {
        if (e instanceof MQClientException) {
            // 可在此处实现自定义重试逻辑
            retryExecutor.schedule(() -> producer.send(msg, this), 5, TimeUnit.SECONDS);
        }
    }
});

消费者消息重试实现

5.1 顺序消费重试

public class OrderListener implements MessageListenerOrderly {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeOrderlyContext context) {
        try {
            // 业务处理
            return ConsumeOrderlyStatus.SUCCESS;
        } catch (Exception e) {
            context.setSuspendCurrentQueueTimeMillis(3000); // 设置暂停时间
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}

5.2 并发消费重试

public class NormalListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
        ConsumeConcurrentlyContext context) {
        if (checkSomeCondition()) {
            // 人工控制重试
            context.setDelayLevelWhenNextConsume(5); // 对应1分钟延迟
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

重试队列与死信队列

6.1 重试队列实现机制

@startuml
component "正常队列" as Q1
component "重试队列" as Q2
component "死信队列" as DLQ

Q1 -> Q2 : 消费失败消息转移
Q2 -> Q1 : 重试成功
Q2 -> DLQ : 超过最大重试次数
@enduml

6.2 死信队列处理建议

// 死信消息处理示例
public class DLQConsumer {
    public void processDLQMessage(MessageExt msg) {
        String originTopic = msg.getUserProperty("ORIGIN_TOPIC");
        log.warn("死信消息处理: topic={}, msgId={}", originTopic, msg.getMsgId());
        // 可进行人工干预或持久化存储
    }
}

最佳实践与配置建议

7.1 生产者配置建议

# 生产环境推荐配置
rocketmq.producer.retryTimesWhenSendFailed=3
rocketmq.producer.retryAnotherBrokerWhenNotStoreOK=true
rocketmq.producer.sendMsgTimeout=5000

7.2 消费者配置建议

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.setMaxReconsumeTimes(16); // 最大重试次数
consumer.setSuspendCurrentQueueTimeMillis(10000); // 队列暂停时间

常见问题排查

8.1 重试风暴问题

// 错误示例 - 无限递归重试
public void onException(Throwable e) {
    producer.send(msg, this); // 可能导致堆栈溢出
}

// 正确做法
public void onException(Throwable e) {
    if (retryCount.getAndIncrement() < MAX_RETRY) {
        producer.send(msg, this);
    }
}

完整代码示例

9.1 生产者完整示例

public class RetryProducerExample {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("RetryProducerGroup");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();
        
        Message msg = new Message("RetryTopic", "TagA", "KEY-001", 
            "Hello Retry".getBytes());
            
        try {
            SendResult result = producer.send(msg);
            System.out.printf("发送结果: %s%n", result);
        } catch (Exception e) {
            e.printStackTrace();
            // 可在此添加业务级重试逻辑
        }
    }
}

总结与展望

10.1 关键总结

10.2 未来演进

”`

注:本文为示例框架,实际14850字完整文章需要扩展以下内容: 1. 每个章节添加更详细的原理解析 2. 补充更多生产环境案例 3. 增加性能测试数据对比 4. 添加监控指标说明 5. 扩展与其他消息中间件的对比 6. 增加架构图、流程图等可视化内容 7. 补充参考文献和官方文档引用

推荐阅读:
  1. RocketMQ重试机制的示例分析
  2. RocketMQ事务消息如何实现

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rocketmq

上一篇:怎么构建swoole docker镜像

下一篇:JSP下的Hibernate分页技术详细介绍

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》