怎么用C#实现SAGA分布式事务

发布时间:2022-01-24 11:09:26 作者:柒染
来源:亿速云 阅读:155

这篇文章将为大家详细讲解有关怎么用C#实现SAGA分布式事务,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

背景

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。

市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。

下面就基于这个框架来实践一下银行转账的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先来看一下一个成功完成的 SAGA 时序图。

怎么用C#实现SAGA分布式事务

上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。

微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。

下面是两个服务的正向操作和补偿操作的处理。

OutApi

app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => 
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi

app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

注:示例为了简单,没有进行实际的数据库操作。

到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用

var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交结果 = {flag}");

到这里,一个完整的 SAGA 分布式事务就编写完成了。

搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。

怎么用C#实现SAGA分布式事务

当然,上面的情况太理想了,转出转入都是一次性就成功了。

但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。

下面来看一个异常的 SAGA 示例

异常的 SAGA

做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。

由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。

这个时候用户2 这边会出现什么样的情况呢?

转入其实成功了,但是 dtm 收到错误 (网络故障等)转入没有成功,直接告诉 dtm 失败了 (应用异常等)

无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。

先看一下事务失败交互的时序图

怎么用C#实现SAGA分布式事务

再通过调整上面成功的例子,来比较直观的看看出现的情况。

在 InApi 加多一个转入失败的处理接口

app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});

失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种

调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

运行结果如下:

怎么用C#实现SAGA分布式事务

在这个例子中,只考虑补偿/重试成功的情况下。

用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。

用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。

这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。

如果出现了上述的情况1,会发生什么呢?

用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2 还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。

同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。

前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。

怎么用C#实现SAGA分布式事务

再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。

子事务屏障

子事务屏障,需要根据 trans_type,gid,branch_id 和 op 四个内容进行创建。

这4个内容 dtm 在回调时会放在 querysting 上面。

客户端里面提供了 IBranchBarrierFactory 来供我们使用。

空补偿

针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // 转入失败的情况下,不应该输出下面这个
        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
        // tx 参数是事务,可和本地事务一起提交回滚
        await Task.CompletedTask;
    });

    Console.WriteLine($"子事务屏障-补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});

Call 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。

同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再来运行这个例子。

怎么用C#实现SAGA分布式事务

会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。

再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。

幂等

针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!! gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        var c = Interlocked.Increment(ref _errCount);

        // 模拟一个超时执行
        if (c > 0 && c < 2) await Task.Delay(10000);

        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

这里通过一个超时执行来让 dtm 进行转入正向操作的重试。

同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再来运行这个例子。

怎么用C#实现SAGA分布式事务

可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。

到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,大大降低了业务判断的复杂度和出错的可能性。

关于怎么用C#实现SAGA分布式事务就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. 用C#实现递归方法
  2. redux-saga原理是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

saga 分布式事务

上一篇:Java是怎么实现图片裁剪功能的

下一篇:Linux系统如何安装PostgreSQL

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》