Ray-Handler消息订阅器编写方法是什么

发布时间:2021-12-24 16:06:44 作者:iii
来源:亿速云 阅读:132

本篇内容主要讲解“Ray-Handler消息订阅器编写方法是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Ray-Handler消息订阅器编写方法是什么”吧!

消息订阅器:

Ray是基于Event Sourcing设计的ES/Actor框架,消息发布后需要订阅处理,订阅器主要有以下两类:

RabbitSub特性是RabbitMQ消息队列订阅器。

RabbitSub特性有两个构造函数,常用的是这个:

public RabbitSubAttribute(string group, string exchange, string queue, int queueCount = 1)

示例:

[RabbitSub("Core", "Account", "account")]
public sealed class AccountCoreHandler : SubHandler<string, MessageInfo>
{
    ……
}

RabbitSub可以单独使用,用于订阅消息。


CoreHandler消息订阅器

Ray中的ESActor通过RaiseEvent方法发布事件,传递消息。Ray默认使用RabbitMQ传递消息。ESActor发起事件后,CoreHandler订阅事件,以处理事件。

实现方式是:

  1. SQLToReadHandler

ESActor发起事件后,X-ToReadHandler订阅事件,以处理事件。X-ToReadHandler继承自X-SQLToReadHandler,X-SQLToReadHandler继承自ToReadHandler。

X-SQLToReadHandler需要使用者继承PartSubHandler,根据使用的关系型数据库自己实现。Ray默认提供了PostgreSQL的PSQLToReadHandler。如果使用的是MySQL、SQL Server等其他关系型数据库,请自定义实现。

X-SQLToReadHandler实现细节:
修改对应关系型数据库的Integrity Constraint Violation(违反完整性约束)的异常。
可以将实例中PSQLToReadHandler当做X-ToReadHandler模板,修改if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))即可。


说明:

当X-ToReadHandler订阅消息,消息有重放的场景,如果该消息已经得到处理,数据库中已经存在其处理后的结果,这是可能会报Integrity Constraint Violation(违反完整性约束)异常,默认不做处理,其他异常将其抛出,这是这段代码的作用。


示例模板:

public abstract class PSQLToReadHandler<K> : PartSubHandler<K, MessageInfo>
{
    public PSQLToReadHandler(IServiceProvider svProvider) : base(svProvider)
    { }
    public override Task Notice(byte[] data)
    {
        return base.Notice(data).ContinueWith(t =>
        {
            if (t.Exception != null)
            {
                //根据使用数据库,修改这个if判断
                if (!(t.Exception.InnerException is Npgsql.PostgresException e && e.SqlState == "23505"))
                {
                    throw t.Exception;
                }
            }
        });
    }
}

  2. X-ToReadHandler
X-ToReadHandler订阅器主要用于订阅感兴趣的消息,将数据写入到数据库中。

实现方式是:

[RabbitSub("Read", "Account", "account")]
public sealed class AccountToReadHandler : PSQLToReadHandler<string>
{
    public AccountToReadHandler(IServiceProvider svProvider) : base(svProvider)
    {
        Register<AmountAddEvent>();
        Register<AmountTransferEvent>();
    }
}
X-ToReadHandler消息订阅器与CoreHandler消息订阅器差异

X-ToReadHandler消息订阅器使用时,需要在构造函数中注册关心的事件,而X-CoreHandler中不需要,原因是事件在处理中需要反序列化,X-CoreHandler会对RabbitSub参数指定订阅的所有的消息反序列化,X-ToReadHandler在此基础上做了进一步的控制,在订阅的消息中只对Register的事件处理。这样做的原因:1.反序列化会消耗一定的性能,进一步控制有助于提高性能;2.Ray提供两种实现方式,为开发者扩展自定义源码提供借鉴。

到此,相信大家对“Ray-Handler消息订阅器编写方法是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. 如何用MQTT协议实现消息的订阅接收?
  2. python rabbitmq消息发布订阅

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

handler

上一篇:kubernetes中Service Account与Secret的示例分析

下一篇:linux中如何删除用户组

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》