您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# RocketMQ中顺序消息、重复消息、事务消息和消息存储的示例分析
## 一、顺序消息的实现与示例
### 1.1 顺序消息的核心机制
RocketMQ通过**队列分区顺序性**保证消息顺序:
- 同一业务ID(如订单ID)的消息会被分配到同一个MessageQueue
- 消费者通过MessageListenerOrderly接口顺序消费
```java
// 生产者示例:指定业务Key保证同订单消息进入同一队列
Message msg = new Message("OrderTopic", "Order_12345", "订单创建".getBytes());
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据订单ID选择队列
int index = Math.abs(arg.toString().hashCode()) % mqs.size();
return mqs.get(index);
}
}, "Order_12345");
// 消费者示例
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 保证顺序处理
processOrderMessages(msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 使用Redis实现幂等校验
public boolean checkMessageIdempotent(String msgId) {
String key = "msg:" + msgId;
// SETNX原子性操作
return redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
}
// 业务层去重表设计
CREATE TABLE message_dedup (
biz_id VARCHAR(64) PRIMARY KEY,
msg_id VARCHAR(64),
status TINYINT,
create_time DATETIME
);
@startuml
participant Producer
participant Broker
participant LocalDB
Producer -> Broker: 发送半消息(HALF_MESSAGE)
Broker --> Producer: 返回写入成功
Producer -> LocalDB: 执行本地事务
alt 事务成功
Producer -> Broker: 提交事务(COMMIT_MESSAGE)
else 事务失败
Producer -> Broker: 回滚事务(ROLLBACK_MESSAGE)
end
Broker -> Consumer: 投递可消费消息
@enduml
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地数据库事务
boolean success = orderService.createOrder((OrderDTO)arg);
return success ? LocalTransactionState.COMMIT_MESSAGE :
LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 补偿检查逻辑
return orderService.checkOrderStatus(msg.getKeys());
}
});
CommitLog
├── 00000000000000000000
├── 00000000000001000000
└── 00000000000002000000
ConsumeQueue
├── TopicA
│ ├── 0
│ ├── 1
│ └── 2
└── TopicB
├── 0
└── 1
// 存储配置示例(broker.conf)
flushDiskType = ASYNC_FLUSH
storePathCommitLog = /data/rocketmq/store/commitlog
mappedFileSizeCommitLog = 1073741824 # 1GB
顺序消息:
事务消息:
存储优化:
通过本文的示例分析,可以更深入地理解RocketMQ在消息可靠性、一致性方面的设计哲学,帮助开发者在实际业务中做出合理的技术选型和实现。 “`
注:全文约1150字,包含代码示例、架构图和实现原理说明,符合技术文章的专业性要求。可根据需要调整具体参数配置或补充更多异常处理场景。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。