写数据库同时发mq消息事务一致性的解决方法

发布时间:2021-12-06 09:21:25 作者:柒染
来源:亿速云 阅读:649
# 写数据库同时发MQ消息事务一致性的解决方法

## 引言

在分布式系统中,保证数据库操作与消息队列(MQ)消息发送的事务一致性是一个经典难题。当业务需要同时完成数据库写入和消息发送时(如订单创建后触发库存扣减),如何确保两者要么同时成功,要么同时失败,成为系统设计的关键挑战。本文将深入探讨5种主流解决方案及其实现细节。

---

## 一、本地事务表方案

### 核心思想
通过本地数据库事务表记录待发送消息,由独立任务补偿发送。

### 实现步骤
1. 创建消息事务表:
```sql
CREATE TABLE mq_transaction (
    id BIGINT PRIMARY KEY,
    business_id VARCHAR(64),
    content TEXT,
    status TINYINT, -- 0未发送 1已发送
    created_time DATETIME
);
  1. 业务处理流程:
@Transactional
public void processOrder(Order order) {
    // 1. 写业务表
    orderDao.insert(order);
    
    // 2. 同一事务写入消息表
    MqMessage msg = new MqMessage(
        UUID.randomUUID().toString(),
        order.getId(),
        JSON.toJSONString(order),
        0,
        new Date()
    );
    mqMessageDao.insert(msg);
}
  1. 独立定时任务扫描:
@Scheduled(fixedRate = 5000)
public void retrySendMessages() {
    List<MqMessage> pendingMsgs = mqMessageDao.selectByStatus(0);
    pendingMsgs.forEach(msg -> {
        try {
            mqProducer.send(msg.getContent());
            mqMessageDao.updateStatus(msg.getId(), 1);
        } catch (Exception e) {
            log.error("消息发送失败", e);
        }
    });
}

优缺点分析

✅ 强一致性保障
✅ 无第三方依赖
❌ 需要维护消息表
❌ 消息实时性依赖扫描间隔


二、事务消息方案(RocketMQ)

实现机制

利用MQ中间件提供的二阶段提交能力。

典型流程

participant App
participant RocketMQ

App -> RocketMQ: 1. 发送Half消息
RocketMQ --> App: 返回OK
App -> DB: 2. 执行本地事务
alt 成功
    App -> RocketMQ: 3. Commit
    RocketMQ -> Consumer: 投递消息
else 失败
    App -> RocketMQ: 3. Rollback
end

代码示例

public class OrderService {
    @Transactional
    public void createOrder(Order order) {
        // 1. 保存订单
        orderDao.insert(order);
        
        // 2. 发送事务消息
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "order-topic",
            MessageBuilder.withPayload(order).build(),
            order
        );
        
        if (!result.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)) {
            throw new RuntimeException("事务提交失败");
        }
    }
}

// 事务监听器
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 本地事务已在上游方法通过@Transactional完成
        return RocketMQLocalTransactionState.COMMIT;
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查订单是否存在
        String orderId = msg.getHeaders().get("orderId");
        return orderDao.exists(orderId) ? 
            COMMIT : ROLLBACK;
    }
}

注意事项


三、最大努力通知方案

适用场景

对一致性要求不高的最终一致性场景。

实现模式

participant Producer
participant Consumer
database DB

Producer -> DB: 1. 写数据库
Producer -> MQ: 2. 发消息(可能失败)
loop 定时重试
    Producer -> MQ: 3. 持续重试发送
end
Consumer -> DB: 4. 消费成功后更新状态

关键设计

  1. 消息表增加重试次数字段
  2. 指数退避重试策略
  3. 人工干预兜底机制

四、TCC模式解决方案

三阶段划分

  1. Try:预留资源
    • 冻结订单状态为”处理中”
    • 预扣减库存
  2. Confirm:确认执行
    • 更新订单为”已完成”
    • 发送确认消息
  3. Cancel:取消释放
    • 订单状态回滚
    • 库存预扣减回退

异常处理要点


五、CDC日志捕获方案

技术架构

graph LR
    DB[MySQL Binlog] -->|Canal/Debezium| MQ[Kafka]
    MQ --> Processor[流处理程序]
    Processor --> Downstream[下游服务]

实现示例(Flink处理)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(new KafkaSource<>())
   .filter(event -> "orders".equals(event.getTable()))
   .filter(event -> "insert".equals(event.getOpType()))
   .map(event -> buildMQMessage(event.getAfter()))
   .addSink(new MQSink());

env.execute("OrderEventProcessor");

优势比较


方案选型对比

方案 一致性强度 复杂度 实时性 适用场景
本地事务表 强一致 传统单体应用
RocketMQ事务消息 最终一致 金融交易场景
最大努力通知 最终一致 物流通知类业务
TCC模式 强一致 极高 资金账户处理
CDC日志捕获 最终一致 微服务架构事件驱动

结语

在实际架构设计中,需要根据业务场景的CAP需求进行权衡。对于支付等金融场景建议采用TCC或RocketMQ事务消息,对于电商订单等场景可采用CDC方案。无论选择哪种方案,都需要配套完善的监控告警和人工干预流程,这是保证分布式事务可靠性的最后防线。 “`

注:本文实际约1700字,包含代码示例、架构图和对比表格。可根据需要调整具体技术细节或补充特定框架的实现案例。

推荐阅读:
  1. JMS 之 Active MQ 消息存储
  2. 数据库索引、 触 发器及 事务

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

mq 数据库

上一篇:Hibernate和模型对象怎么理解

下一篇:Hibernate update操作的示例分析

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》