怎么用python完成一个分布式事务TCC

发布时间:2021-10-25 10:11:41 作者:iii
来源:亿速云 阅读:160
# 怎么用Python完成一个分布式事务TCC

## 1. 分布式事务概述

### 1.1 什么是分布式事务

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。在微服务架构中,一个业务操作往往需要跨多个服务完成数据一致性保证,这就产生了分布式事务的需求。

### 1.2 分布式事务的挑战

与传统单机事务相比,分布式事务面临以下挑战:
- **网络不确定性**:网络延迟、分区、超时等问题
- **服务可用性**:部分服务可能不可用
- **数据一致性**:跨多个数据源的一致性保证
- **性能影响**:协调成本带来的性能下降

### 1.3 常见解决方案

常见的分布式事务解决方案包括:
- 2PC(两阶段提交)
- TCC(Try-Confirm-Cancel)
- 本地消息表
- Saga模式
- 最大努力通知

## 2. TCC模式详解

### 2.1 TCC基本概念

TCC(Try-Confirm-Cancel)是一种补偿型分布式事务解决方案,它将一个完整的业务逻辑拆分为三个阶段:

1. **Try阶段**:尝试执行业务,完成所有业务检查,预留必须的业务资源
2. **Confirm阶段**:确认执行业务,真正提交事务,通常不会失败
3. **Cancel阶段**:取消执行业务,释放Try阶段预留的资源

### 2.2 TCC的优势

- **最终一致性**:通过补偿机制保证最终一致性
- **高性能**:相比2PC减少了锁等待时间
- **灵活性**:业务可定制化程度高
- **适用场景广**:特别适合长事务和高并发场景

### 2.3 TCC的不足

- **开发成本高**:需要为每个操作实现三个接口
- **业务侵入性强**:需要改造现有业务逻辑
- **幂等性要求**:每个阶段操作必须支持幂等
- **空回滚问题**:需要处理Try未执行但收到Cancel的情况

## 3. Python实现TCC事务

### 3.1 系统架构设计

我们设计一个简单的电商下单场景,涉及三个服务:
- 订单服务(Order)
- 库存服务(Inventory)
- 支付服务(Payment)

架构图如下:

[Client] ↓ [API Gateway] ↓ [Order Service] → [Inventory Service] ↓ [Payment Service]


### 3.2 数据库设计

每个服务有自己的数据库,关键表设计:

**订单服务**:
```python
class Order(models.Model):
    order_id = models.UUIDField(primary_key=True)
    user_id = models.IntegerField()
    status = models.CharField(max_length=20)  # 'created', 'confirmed', 'canceled'
    amount = models.DecimalField(max_digits=10, decimal_places=2)
    created_at = models.DateTimeField(auto_now_add=True)
    
class OrderTCC(models.Model):
    tcc_id = models.UUIDField(primary_key=True)
    order_id = models.UUIDField()
    status = models.CharField(max_length=20)  # 'trying', 'confirmed', 'canceled'
    created_at = models.DateTimeField(auto_now_add=True)

库存服务

class Inventory(models.Model):
    product_id = models.IntegerField(primary_key=True)
    stock = models.IntegerField()
    
class InventoryTCC(models.Model):
    tcc_id = models.UUIDField(primary_key=True)
    product_id = models.IntegerField()
    frozen_stock = models.IntegerField()  # Try阶段冻结的库存
    status = models.CharField(max_length=20)

3.3 核心代码实现

3.3.1 Try阶段实现

# order_service/views.py
@transaction.atomic
def create_order_try(request):
    data = json.loads(request.body)
    order_id = uuid.uuid4()
    tcc_id = uuid.uuid4()
    
    # 本地事务:创建订单TCC记录
    OrderTCC.objects.create(
        tcc_id=tcc_id,
        order_id=order_id,
        status='trying'
    )
    
    # 调用库存服务Try
    inventory_response = requests.post(
        'http://inventory-service/api/inventory/try',
        json={
            'tcc_id': str(tcc_id),
            'product_id': data['product_id'],
            'quantity': data['quantity']
        }
    )
    
    if inventory_response.status_code != 200:
        raise Exception("Inventory try failed")
    
    # 调用支付服务Try
    payment_response = requests.post(
        'http://payment-service/api/payment/try',
        json={
            'tcc_id': str(tcc_id),
            'user_id': data['user_id'],
            'amount': data['amount']
        }
    )
    
    if payment_response.status_code != 200:
        # 补偿已成功的Try操作
        requests.post('http://inventory-service/api/inventory/cancel', 
                     json={'tcc_id': str(tcc_id)})
        raise Exception("Payment try failed")
    
    return JsonResponse({'tcc_id': tcc_id, 'order_id': order_id})

3.3.2 Confirm阶段实现

def confirm_order(tcc_id):
    try:
        # 获取TCC记录
        tcc = OrderTCC.objects.get(tcc_id=tcc_id, status='trying')
        
        with transaction.atomic():
            # 创建正式订单
            Order.objects.create(
                order_id=tcc.order_id,
                user_id=user_id,  # 从上下文中获取
                status='confirmed',
                amount=amount
            )
            
            # 更新TCC状态
            tcc.status = 'confirmed'
            tcc.save()
            
        # 调用库存服务Confirm
        requests.post('http://inventory-service/api/inventory/confirm',
                    json={'tcc_id': str(tcc_id)})
                    
        # 调用支付服务Confirm
        requests.post('http://payment-service/api/payment/confirm',
                    json={'tcc_id': str(tcc_id)})
                    
    except Exception as e:
        logger.error(f"Confirm failed: {str(e)}")
        raise

3.3.3 Cancel阶段实现

def cancel_order(tcc_id):
    try:
        tcc = OrderTCC.objects.filter(tcc_id=tcc_id).first()
        
        if not tcc:
            # 处理空回滚情况
            return
            
        if tcc.status == 'canceled':
            # 已处理过,幂等返回
            return
            
        with transaction.atomic():
            if tcc.status == 'trying':
                # 只有Try成功才需要创建取消记录
                Order.objects.create(
                    order_id=tcc.order_id,
                    user_id=user_id,
                    status='canceled',
                    amount=0
                )
            
            tcc.status = 'canceled'
            tcc.save()
            
        # 调用库存服务Cancel
        requests.post('http://inventory-service/api/inventory/cancel',
                    json={'tcc_id': str(tcc_id)})
                    
        # 调用支付服务Cancel
        requests.post('http://payment-service/api/payment/cancel',
                    json={'tcc_id': str(tcc_id)})
                    
    except Exception as e:
        logger.error(f"Cancel failed: {str(e)}")
        raise

3.4 服务间通信设计

3.4.1 基于HTTP的实现

# 使用requests库实现服务调用
def call_service(url, data, retry=3):
    for i in range(retry):
        try:
            response = requests.post(url, json=data, timeout=5)
            if response.status_code == 200:
                return True
        except (requests.exceptions.RequestException, 
                requests.exceptions.Timeout) as e:
            logger.warning(f"Retry {i+1} for {url}: {str(e)}")
            time.sleep(1)
    return False

3.4.2 引入消息队列

为提高可靠性,可以引入RabbitMQ实现异步通信:

# 使用pika库实现MQ通信
import pika

def setup_mq():
    connection = pika.BlockingConnection(pika.ConnectionParameters('mq_host'))
    channel = connection.channel()
    
    # 声明TCC事务交换机和队列
    channel.exchange_declare(exchange='tcc_transaction', exchange_type='topic')
    channel.queue_declare(queue='order_service')
    channel.queue_bind(queue='order_service', exchange='tcc_transaction', routing_key='order.#')
    
    return channel

def publish_tcc_event(channel, event_type, tcc_id):
    channel.basic_publish(
        exchange='tcc_transaction',
        routing_key=f'order.{event_type}',
        body=json.dumps({'tcc_id': str(tcc_id)})

4. 异常处理与可靠性保障

4.1 幂等性设计

每个TCC接口必须实现幂等:

# 库存服务Confirm接口示例
def inventory_confirm(request):
    data = json.loads(request.body)
    tcc_id = data['tcc_id']
    
    tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
    if not tcc:
        return JsonResponse({'status': 'not_found'}, status=404)
        
    if tcc.status == 'confirmed':
        return JsonResponse({'status': 'already_confirmed'})
        
    with transaction.atomic():
        # 扣减真实库存
        inventory = Inventory.objects.get(product_id=tcc.product_id)
        inventory.stock -= tcc.frozen_stock
        inventory.save()
        
        # 更新TCC状态
        tcc.status = 'confirmed'
        tcc.save()
        
    return JsonResponse({'status': 'success'})

4.2 空回滚处理

def inventory_cancel(request):
    data = json.loads(request.body)
    tcc_id = data['tcc_id']
    
    tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
    if not tcc:
        # 记录空回滚,防止Try阶段成功后再执行Cancel
        InventoryTCC.objects.create(
            tcc_id=tcc_id,
            product_id=data.get('product_id', 0),
            frozen_stock=0,
            status='canceled'
        )
        return JsonResponse({'status': 'empty_cancel'})
        
    # 正常取消逻辑...

4.3 定时任务补偿

from apscheduler.schedulers.background import BackgroundScheduler

def check_hanging_transactions():
    # 查找超过一定时间未完成的TCC记录
    timeout = datetime.now() - timedelta(minutes=30)
    hanging_tccs = OrderTCC.objects.filter(
        status='trying',
        created_at__lt=timeout
    )
    
    for tcc in hanging_tccs:
        try:
            cancel_order(tcc.tcc_id)
        except Exception as e:
            logger.error(f"Failed to cancel hanging TCC {tcc.tcc_id}: {str(e)}")

# 启动定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(check_hanging_transactions, 'interval', minutes=5)
scheduler.start()

5. 性能优化与扩展

5.1 异步化改造

使用Celery实现异步任务:

from celery import Celery

app = Celery('tcc_transaction', broker='pyamqp://guest@localhost//')

@app.task(bind=True, max_retries=3)
def confirm_order_task(self, tcc_id):
    try:
        confirm_order(tcc_id)
    except Exception as exc:
        raise self.retry(exc=exc, countdown=2**self.request.retry)

5.2 批量处理

def batch_confirm(tcc_ids):
    with transaction.atomic():
        tcc_list = OrderTCC.objects.filter(
            tcc_id__in=tcc_ids,
            status='trying'
        ).select_for_update()
        
        for tcc in tcc_list:
            # 批量确认逻辑
            pass

5.3 引入分布式锁

使用Redis实现分布式锁:

import redis
from contextlib import contextmanager

redis_client = redis.StrictRedis(host='localhost', port=6379)

@contextmanager
def redis_lock(lock_key, timeout=10):
    identifier = str(uuid.uuid4())
    end = time.time() + timeout
    
    while time.time() < end:
        if redis_client.setnx(lock_key, identifier):
            redis_client.expire(lock_key, timeout)
            try:
                yield
            finally:
                if redis_client.get(lock_key) == identifier:
                    redis_client.delete(lock_key)
            return
        time.sleep(0.001)
    raise Exception("Could not acquire lock")

6. 测试策略

6.1 单元测试

import pytest
from unittest.mock import patch

@pytest.mark.django_db
def test_order_try_success():
    with patch('requests.post') as mock_post:
        mock_post.return_value.status_code = 200
        
        response = client.post('/api/order/try', data={
            'user_id': 1,
            'product_id': 101,
            'quantity': 2,
            'amount': 100.00
        })
        
        assert response.status_code == 200
        assert OrderTCC.objects.count() == 1

6.2 集成测试

@pytest.mark.integration
def test_full_tcc_flow():
    # 1. 执行Try阶段
    try_response = create_order_try(test_data)
    
    # 2. 验证Try结果
    assert try_response.status == 'success'
    
    # 3. 执行Confirm阶段
    confirm_response = confirm_order(try_response.tcc_id)
    
    # 4. 验证最终一致性
    assert order.status == 'confirmed'
    assert inventory.stock == original_stock - quantity
    assert payment.balance == original_balance - amount

6.3 混沌测试

def test_network_partition():
    with ChaosMonkey() as cm:
        cm.network_partition('order-service', 'inventory-service')
        
        response = create_order_try(test_data)
        assert response.status_code == 500
        
        cm.heal_network()
        
        # 验证补偿是否成功
        assert InventoryTCC.objects.get(tcc_id=response.tcc_id).status == 'canceled'

7. 总结与展望

本文详细介绍了如何使用Python实现TCC模式的分布式事务,包括:

  1. TCC模式的基本原理和三个阶段实现
  2. Python下的具体代码实现方案
  3. 异常处理和可靠性保障机制
  4. 性能优化和扩展方案
  5. 全面的测试策略

在实际应用中,还可以考虑以下方向进行扩展:

TCC模式虽然实现复杂度较高,但在需要强一致性的分布式系统中仍然是非常有价值的解决方案。通过合理的架构设计和代码实现,可以在Python生态中构建可靠的分布式事务处理能力。 “`

推荐阅读:
  1. 怎么用Python完成股票回测框架
  2. 分布式事务之深入理解什么是2PC、3PC及TCC协议?

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

python

上一篇:如何使用send-mailmessage命令发送域内邮件

下一篇:Python爬虫经常会被封的原因是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》