您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# 怎么用Python完成一个分布式事务TCC
## 1. 分布式事务概述
### 1.1 什么是分布式事务
分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。在微服务架构中,一个业务操作往往需要跨多个服务完成数据一致性保证,这就产生了分布式事务的需求。
### 1.2 分布式事务的挑战
与传统单机事务相比,分布式事务面临以下挑战:
- **网络不确定性**:网络延迟、分区、超时等问题
- **服务可用性**:部分服务可能不可用
- **数据一致性**:跨多个数据源的一致性保证
- **性能影响**:协调成本带来的性能下降
### 1.3 常见解决方案
常见的分布式事务解决方案包括:
- 2PC(两阶段提交)
- TCC(Try-Confirm-Cancel)
- 本地消息表
- Saga模式
- 最大努力通知
## 2. TCC模式详解
### 2.1 TCC基本概念
TCC(Try-Confirm-Cancel)是一种补偿型分布式事务解决方案,它将一个完整的业务逻辑拆分为三个阶段:
1. **Try阶段**:尝试执行业务,完成所有业务检查,预留必须的业务资源
2. **Confirm阶段**:确认执行业务,真正提交事务,通常不会失败
3. **Cancel阶段**:取消执行业务,释放Try阶段预留的资源
### 2.2 TCC的优势
- **最终一致性**:通过补偿机制保证最终一致性
- **高性能**:相比2PC减少了锁等待时间
- **灵活性**:业务可定制化程度高
- **适用场景广**:特别适合长事务和高并发场景
### 2.3 TCC的不足
- **开发成本高**:需要为每个操作实现三个接口
- **业务侵入性强**:需要改造现有业务逻辑
- **幂等性要求**:每个阶段操作必须支持幂等
- **空回滚问题**:需要处理Try未执行但收到Cancel的情况
## 3. Python实现TCC事务
### 3.1 系统架构设计
我们设计一个简单的电商下单场景,涉及三个服务:
- 订单服务(Order)
- 库存服务(Inventory)
- 支付服务(Payment)
架构图如下:
[Client] ↓ [API Gateway] ↓ [Order Service] → [Inventory Service] ↓ [Payment Service]
### 3.2 数据库设计
每个服务有自己的数据库,关键表设计:
**订单服务**:
```python
class Order(models.Model):
order_id = models.UUIDField(primary_key=True)
user_id = models.IntegerField()
status = models.CharField(max_length=20) # 'created', 'confirmed', 'canceled'
amount = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)
class OrderTCC(models.Model):
tcc_id = models.UUIDField(primary_key=True)
order_id = models.UUIDField()
status = models.CharField(max_length=20) # 'trying', 'confirmed', 'canceled'
created_at = models.DateTimeField(auto_now_add=True)
库存服务:
class Inventory(models.Model):
product_id = models.IntegerField(primary_key=True)
stock = models.IntegerField()
class InventoryTCC(models.Model):
tcc_id = models.UUIDField(primary_key=True)
product_id = models.IntegerField()
frozen_stock = models.IntegerField() # Try阶段冻结的库存
status = models.CharField(max_length=20)
# order_service/views.py
@transaction.atomic
def create_order_try(request):
data = json.loads(request.body)
order_id = uuid.uuid4()
tcc_id = uuid.uuid4()
# 本地事务:创建订单TCC记录
OrderTCC.objects.create(
tcc_id=tcc_id,
order_id=order_id,
status='trying'
)
# 调用库存服务Try
inventory_response = requests.post(
'http://inventory-service/api/inventory/try',
json={
'tcc_id': str(tcc_id),
'product_id': data['product_id'],
'quantity': data['quantity']
}
)
if inventory_response.status_code != 200:
raise Exception("Inventory try failed")
# 调用支付服务Try
payment_response = requests.post(
'http://payment-service/api/payment/try',
json={
'tcc_id': str(tcc_id),
'user_id': data['user_id'],
'amount': data['amount']
}
)
if payment_response.status_code != 200:
# 补偿已成功的Try操作
requests.post('http://inventory-service/api/inventory/cancel',
json={'tcc_id': str(tcc_id)})
raise Exception("Payment try failed")
return JsonResponse({'tcc_id': tcc_id, 'order_id': order_id})
def confirm_order(tcc_id):
try:
# 获取TCC记录
tcc = OrderTCC.objects.get(tcc_id=tcc_id, status='trying')
with transaction.atomic():
# 创建正式订单
Order.objects.create(
order_id=tcc.order_id,
user_id=user_id, # 从上下文中获取
status='confirmed',
amount=amount
)
# 更新TCC状态
tcc.status = 'confirmed'
tcc.save()
# 调用库存服务Confirm
requests.post('http://inventory-service/api/inventory/confirm',
json={'tcc_id': str(tcc_id)})
# 调用支付服务Confirm
requests.post('http://payment-service/api/payment/confirm',
json={'tcc_id': str(tcc_id)})
except Exception as e:
logger.error(f"Confirm failed: {str(e)}")
raise
def cancel_order(tcc_id):
try:
tcc = OrderTCC.objects.filter(tcc_id=tcc_id).first()
if not tcc:
# 处理空回滚情况
return
if tcc.status == 'canceled':
# 已处理过,幂等返回
return
with transaction.atomic():
if tcc.status == 'trying':
# 只有Try成功才需要创建取消记录
Order.objects.create(
order_id=tcc.order_id,
user_id=user_id,
status='canceled',
amount=0
)
tcc.status = 'canceled'
tcc.save()
# 调用库存服务Cancel
requests.post('http://inventory-service/api/inventory/cancel',
json={'tcc_id': str(tcc_id)})
# 调用支付服务Cancel
requests.post('http://payment-service/api/payment/cancel',
json={'tcc_id': str(tcc_id)})
except Exception as e:
logger.error(f"Cancel failed: {str(e)}")
raise
# 使用requests库实现服务调用
def call_service(url, data, retry=3):
for i in range(retry):
try:
response = requests.post(url, json=data, timeout=5)
if response.status_code == 200:
return True
except (requests.exceptions.RequestException,
requests.exceptions.Timeout) as e:
logger.warning(f"Retry {i+1} for {url}: {str(e)}")
time.sleep(1)
return False
为提高可靠性,可以引入RabbitMQ实现异步通信:
# 使用pika库实现MQ通信
import pika
def setup_mq():
connection = pika.BlockingConnection(pika.ConnectionParameters('mq_host'))
channel = connection.channel()
# 声明TCC事务交换机和队列
channel.exchange_declare(exchange='tcc_transaction', exchange_type='topic')
channel.queue_declare(queue='order_service')
channel.queue_bind(queue='order_service', exchange='tcc_transaction', routing_key='order.#')
return channel
def publish_tcc_event(channel, event_type, tcc_id):
channel.basic_publish(
exchange='tcc_transaction',
routing_key=f'order.{event_type}',
body=json.dumps({'tcc_id': str(tcc_id)})
每个TCC接口必须实现幂等:
# 库存服务Confirm接口示例
def inventory_confirm(request):
data = json.loads(request.body)
tcc_id = data['tcc_id']
tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
if not tcc:
return JsonResponse({'status': 'not_found'}, status=404)
if tcc.status == 'confirmed':
return JsonResponse({'status': 'already_confirmed'})
with transaction.atomic():
# 扣减真实库存
inventory = Inventory.objects.get(product_id=tcc.product_id)
inventory.stock -= tcc.frozen_stock
inventory.save()
# 更新TCC状态
tcc.status = 'confirmed'
tcc.save()
return JsonResponse({'status': 'success'})
def inventory_cancel(request):
data = json.loads(request.body)
tcc_id = data['tcc_id']
tcc = InventoryTCC.objects.filter(tcc_id=tcc_id).first()
if not tcc:
# 记录空回滚,防止Try阶段成功后再执行Cancel
InventoryTCC.objects.create(
tcc_id=tcc_id,
product_id=data.get('product_id', 0),
frozen_stock=0,
status='canceled'
)
return JsonResponse({'status': 'empty_cancel'})
# 正常取消逻辑...
from apscheduler.schedulers.background import BackgroundScheduler
def check_hanging_transactions():
# 查找超过一定时间未完成的TCC记录
timeout = datetime.now() - timedelta(minutes=30)
hanging_tccs = OrderTCC.objects.filter(
status='trying',
created_at__lt=timeout
)
for tcc in hanging_tccs:
try:
cancel_order(tcc.tcc_id)
except Exception as e:
logger.error(f"Failed to cancel hanging TCC {tcc.tcc_id}: {str(e)}")
# 启动定时任务
scheduler = BackgroundScheduler()
scheduler.add_job(check_hanging_transactions, 'interval', minutes=5)
scheduler.start()
使用Celery实现异步任务:
from celery import Celery
app = Celery('tcc_transaction', broker='pyamqp://guest@localhost//')
@app.task(bind=True, max_retries=3)
def confirm_order_task(self, tcc_id):
try:
confirm_order(tcc_id)
except Exception as exc:
raise self.retry(exc=exc, countdown=2**self.request.retry)
def batch_confirm(tcc_ids):
with transaction.atomic():
tcc_list = OrderTCC.objects.filter(
tcc_id__in=tcc_ids,
status='trying'
).select_for_update()
for tcc in tcc_list:
# 批量确认逻辑
pass
使用Redis实现分布式锁:
import redis
from contextlib import contextmanager
redis_client = redis.StrictRedis(host='localhost', port=6379)
@contextmanager
def redis_lock(lock_key, timeout=10):
identifier = str(uuid.uuid4())
end = time.time() + timeout
while time.time() < end:
if redis_client.setnx(lock_key, identifier):
redis_client.expire(lock_key, timeout)
try:
yield
finally:
if redis_client.get(lock_key) == identifier:
redis_client.delete(lock_key)
return
time.sleep(0.001)
raise Exception("Could not acquire lock")
import pytest
from unittest.mock import patch
@pytest.mark.django_db
def test_order_try_success():
with patch('requests.post') as mock_post:
mock_post.return_value.status_code = 200
response = client.post('/api/order/try', data={
'user_id': 1,
'product_id': 101,
'quantity': 2,
'amount': 100.00
})
assert response.status_code == 200
assert OrderTCC.objects.count() == 1
@pytest.mark.integration
def test_full_tcc_flow():
# 1. 执行Try阶段
try_response = create_order_try(test_data)
# 2. 验证Try结果
assert try_response.status == 'success'
# 3. 执行Confirm阶段
confirm_response = confirm_order(try_response.tcc_id)
# 4. 验证最终一致性
assert order.status == 'confirmed'
assert inventory.stock == original_stock - quantity
assert payment.balance == original_balance - amount
def test_network_partition():
with ChaosMonkey() as cm:
cm.network_partition('order-service', 'inventory-service')
response = create_order_try(test_data)
assert response.status_code == 500
cm.heal_network()
# 验证补偿是否成功
assert InventoryTCC.objects.get(tcc_id=response.tcc_id).status == 'canceled'
本文详细介绍了如何使用Python实现TCC模式的分布式事务,包括:
在实际应用中,还可以考虑以下方向进行扩展:
TCC模式虽然实现复杂度较高,但在需要强一致性的分布式系统中仍然是非常有价值的解决方案。通过合理的架构设计和代码实现,可以在Python生态中构建可靠的分布式事务处理能力。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。