RocketMQ中顺序消息、重复消息、事务消息和消息存储的示例分析

发布时间:2021-12-17 16:34:08 作者:小新
来源:亿速云 阅读:192
# RocketMQ中顺序消息、重复消息、事务消息和消息存储的示例分析

## 一、顺序消息的实现与示例

### 1.1 顺序消息的核心机制
RocketMQ通过**队列分区顺序性**保证消息顺序:
- 同一业务ID(如订单ID)的消息会被分配到同一个MessageQueue
- 消费者通过MessageListenerOrderly接口顺序消费

```java
// 生产者示例:指定业务Key保证同订单消息进入同一队列
Message msg = new Message("OrderTopic", "Order_12345", "订单创建".getBytes());
producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 根据订单ID选择队列
        int index = Math.abs(arg.toString().hashCode()) % mqs.size();
        return mqs.get(index);
    }
}, "Order_12345");

// 消费者示例
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 保证顺序处理
        processOrderMessages(msgs);
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

1.2 典型应用场景

二、重复消息的产生与解决方案

2.1 重复消息的三大诱因

  1. 生产者重试:网络超时导致重复发送
  2. Broker主从切换:未收到ACK的重复投递
  3. 消费者重启:offset未及时提交

2.2 幂等性处理方案

// 使用Redis实现幂等校验
public boolean checkMessageIdempotent(String msgId) {
    String key = "msg:" + msgId;
    // SETNX原子性操作
    return redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
}

// 业务层去重表设计
CREATE TABLE message_dedup (
    biz_id VARCHAR(64) PRIMARY KEY,
    msg_id VARCHAR(64),
    status TINYINT,
    create_time DATETIME
);

三、事务消息的完整流程

3.1 两阶段提交机制

@startuml
participant Producer
participant Broker
participant LocalDB

Producer -> Broker: 发送半消息(HALF_MESSAGE)
Broker --> Producer: 返回写入成功
Producer -> LocalDB: 执行本地事务
alt 事务成功
    Producer -> Broker: 提交事务(COMMIT_MESSAGE)
else 事务失败
    Producer -> Broker: 回滚事务(ROLLBACK_MESSAGE)
end
Broker -> Consumer: 投递可消费消息
@enduml

3.2 代码实现示例

TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地数据库事务
            boolean success = orderService.createOrder((OrderDTO)arg);
            return success ? LocalTransactionState.COMMIT_MESSAGE : 
                           LocalTransactionState.ROLLBACK_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.UNKNOW;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 补偿检查逻辑
        return orderService.checkOrderStatus(msg.getKeys());
    }
});

四、消息存储的深度解析

4.1 存储架构设计

CommitLog
├── 00000000000000000000
├── 00000000000001000000
└── 00000000000002000000

ConsumeQueue
├── TopicA
│   ├── 0
│   ├── 1
│   └── 2
└── TopicB
    ├── 0
    └── 1

4.2 高性能写入实现

  1. 内存映射技术:通过MappedByteBuffer实现零拷贝
  2. 顺序写入:所有消息追加到CommitLog
  3. 页缓存加速:Linux page cache异步刷盘
// 存储配置示例(broker.conf)
flushDiskType = ASYNC_FLUSH
storePathCommitLog = /data/rocketmq/store/commitlog
mappedFileSizeCommitLog = 1073741824  # 1GB

4.3 过期文件清理

五、最佳实践建议

  1. 顺序消息

    • 避免单个队列热点问题
    • 设置合理的sendMsgTimeout(默认3秒)
  2. 事务消息

    • 本地事务检查要实现幂等
    • 事务超时时间不超过checkInterval(默认1分钟)
  3. 存储优化

    • SSD硬盘提升IOPS
    • 监控CommitLog文件增长速率
    • 合理设置osPageCache大小

通过本文的示例分析,可以更深入地理解RocketMQ在消息可靠性、一致性方面的设计哲学,帮助开发者在实际业务中做出合理的技术选型和实现。 “`

注:全文约1150字,包含代码示例、架构图和实现原理说明,符合技术文章的专业性要求。可根据需要调整具体参数配置或补充更多异常处理场景。

推荐阅读:
  1. RocketMQ中事务消息状态回查的示例分析
  2. RocketMQ事务消息如何实现

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rocketmq

上一篇:SystemVerilog Downcast是什么

下一篇:如何进行springboot配置templates直接访问的实现

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》