您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 写数据库同时发MQ消息事务一致性的解决方法
## 引言
在分布式系统中,保证数据库操作与消息队列(MQ)消息发送的事务一致性是一个经典难题。当业务需要同时完成数据库写入和消息发送时(如订单创建后触发库存扣减),如何确保两者要么同时成功,要么同时失败,成为系统设计的关键挑战。本文将深入探讨5种主流解决方案及其实现细节。
---
## 一、本地事务表方案
### 核心思想
通过本地数据库事务表记录待发送消息,由独立任务补偿发送。
### 实现步骤
1. 创建消息事务表:
```sql
CREATE TABLE mq_transaction (
id BIGINT PRIMARY KEY,
business_id VARCHAR(64),
content TEXT,
status TINYINT, -- 0未发送 1已发送
created_time DATETIME
);
@Transactional
public void processOrder(Order order) {
// 1. 写业务表
orderDao.insert(order);
// 2. 同一事务写入消息表
MqMessage msg = new MqMessage(
UUID.randomUUID().toString(),
order.getId(),
JSON.toJSONString(order),
0,
new Date()
);
mqMessageDao.insert(msg);
}
@Scheduled(fixedRate = 5000)
public void retrySendMessages() {
List<MqMessage> pendingMsgs = mqMessageDao.selectByStatus(0);
pendingMsgs.forEach(msg -> {
try {
mqProducer.send(msg.getContent());
mqMessageDao.updateStatus(msg.getId(), 1);
} catch (Exception e) {
log.error("消息发送失败", e);
}
});
}
✅ 强一致性保障
✅ 无第三方依赖
❌ 需要维护消息表
❌ 消息实时性依赖扫描间隔
利用MQ中间件提供的二阶段提交能力。
participant App
participant RocketMQ
App -> RocketMQ: 1. 发送Half消息
RocketMQ --> App: 返回OK
App -> DB: 2. 执行本地事务
alt 成功
App -> RocketMQ: 3. Commit
RocketMQ -> Consumer: 投递消息
else 失败
App -> RocketMQ: 3. Rollback
end
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 1. 保存订单
orderDao.insert(order);
// 2. 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic",
MessageBuilder.withPayload(order).build(),
order
);
if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
throw new RuntimeException("事务提交失败");
}
}
}
// 事务监听器
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 本地事务已在上游方法通过@Transactional完成
return RocketMQLocalTransactionState.COMMIT;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查订单是否存在
String orderId = msg.getHeaders().get("orderId");
return orderDao.exists(orderId) ?
COMMIT : ROLLBACK;
}
}
对一致性要求不高的最终一致性场景。
participant Producer
participant Consumer
database DB
Producer -> DB: 1. 写数据库
Producer -> MQ: 2. 发消息(可能失败)
loop 定时重试
Producer -> MQ: 3. 持续重试发送
end
Consumer -> DB: 4. 消费成功后更新状态
graph LR
DB[MySQL Binlog] -->|Canal/Debezium| MQ[Kafka]
MQ --> Processor[流处理程序]
Processor --> Downstream[下游服务]
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new KafkaSource<>())
.filter(event -> "orders".equals(event.getTable()))
.filter(event -> "insert".equals(event.getOpType()))
.map(event -> buildMQMessage(event.getAfter()))
.addSink(new MQSink());
env.execute("OrderEventProcessor");
方案 | 一致性强度 | 复杂度 | 实时性 | 适用场景 |
---|---|---|---|---|
本地事务表 | 强一致 | 中 | 中 | 传统单体应用 |
RocketMQ事务消息 | 最终一致 | 高 | 高 | 金融交易场景 |
最大努力通知 | 最终一致 | 低 | 低 | 物流通知类业务 |
TCC模式 | 强一致 | 极高 | 高 | 资金账户处理 |
CDC日志捕获 | 最终一致 | 中 | 高 | 微服务架构事件驱动 |
在实际架构设计中,需要根据业务场景的CAP需求进行权衡。对于支付等金融场景建议采用TCC或RocketMQ事务消息,对于电商订单等场景可采用CDC方案。无论选择哪种方案,都需要配套完善的监控告警和人工干预流程,这是保证分布式事务可靠性的最后防线。 “`
注:本文实际约1700字,包含代码示例、架构图和对比表格。可根据需要调整具体技术细节或补充特定框架的实现案例。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。