如何进行Apache Pulsar 延迟消息投递

发布时间:2022-01-18 14:38:08 作者:柒染
来源:亿速云 阅读:218

如何进行Apache Pulsar 延迟消息投递,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。腾讯数据平台部 MQ 团队对 Pulsar 做了深入调研以及大量的性能和稳定性方面优化,目前已经在腾讯云消息队列 TDMQ 落地上线。本文主要介绍Pulsar延迟消息投递的实现,希望与大家一同交流。

 

一、什么是延迟消息投递


延迟消息投递在MQ应用场景中十分普遍,它是指消息在发送到 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者,一般分为定时消息和延迟消息两种:

目前在业界,腾讯云的 CMQ 和阿里云的 RocketMQ 也都支持延迟消息投递:

开源的 NSQ、RabbitMQ、ActiveMQ 和 Pulsar 也都内置了延迟消息的处理能力。虽然每个 MQ 项目的使用和实现方式不同,但核心实现思路都一样:Producer 将一个延迟消息发送到某个 Topic 中,Broker 将延迟消息放到临时存储进行暂存,延迟跟踪服务(Delayed Tracker Service)会检查消息是否到期,将到期的消息进行投递

如何进行Apache Pulsar 延迟消息投递


二、延迟消息投递的使用场景

延迟消息投递是要暂缓对当前消息的处理,在未来的某个时间点再触发投递,实际的应用场景非常多,比如异常检测重试、订单超时取消、预约提醒等。

TDMQ 最近就有个使用 Pulsar 延迟消息的 Case:业务要对两套系统的日志消息进行关联,其中一套系统由于查询 Hbase 可能会超时或失败,需要将失败的关联任务在集群空闲的时候再次调度。


三、如何使用Pulsar延迟消息投递

Pulsar 最早是在 2.4.0 引入了延迟消息投递的特性,在 Pulsar 中使用延迟消息,可以精确指定延迟投递的时间,有 deliverAfter 和 deliverAt 两种方式。其中 deliverAt 可以指定具体的时间戳;deliverAfter 可以指定在当前多长时间后执行。两种方式的本质是一样的,Client 会计算出时间戳送到 Broker。

1. deliverAfter发送


  
  
  
producer.newMessage()        .deliverAfter(long time, TimeUnit unit)        .send();
     


2. deliverAt发送

producer.newMessage()        .deliverAt(long timestamp)        .send();

在 Pulsar 中,可以支持跨度很大的延时消息,比方说一个月、半年;同时在一个 Topic 里,既支持延时消息,也支持非延时消息。下图展示了 Pulsar 中延迟消息的具体过程:

如何进行Apache Pulsar 延迟消息投递

producer 发送的 m1/m3/m4/m5 有不同的延迟时间,m2 是不需要延迟投递的正常消息,consumer 消费时会根据不同的延迟时间进行 ack。


四、Pulsar延迟消息投递实现原理

从上面的使用方式可以看出,Pulsar 支持的是秒级精度的延迟消息投递,不同于开源 RocketMQ 支持固定时间 level 的延迟。

如何进行Apache Pulsar 延迟消息投递

Pulsar 实现延迟消息投递的方式比较简单,所有延迟投递的消息会被 Delayed Message Tracker 记录对应的 index。index 是由 timestamp | LedgerID | EntryID 三部分组成,其中 LedgerID | EntryID 用于定位该消息,timestamp 除了记录需要投递的时间,还用于 delayed index 优先级队列排序。

Delayed Message Tracker 在堆外内存维护着一个 delayed index 优先级队列,根据延迟时间进行堆排序,延迟时间最短的会放在头上,时间越长越靠后。consumer 在消费时,会先去 Delayed Message Tracker 检查,是否有到期需要投递的消息,如果有到期的消息,则从 Tracker 中拿出对应的 index,找到对应的消息进行消费;如果没有到期的消息,则直接消费正常的消息。

如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建 delayed index 队列,来保证延迟投递的消息能够正常工作。


五、Pulsar延迟消息投递面临的挑战

从 Pulsar 的延迟消息投递实现原理可以看出,该方法简单高效,对 Pulsar 内核侵入性较小,可以支持到任意时间的延迟消息。但同时发现,Pulsar 的实现方案无法支持大规模使用延迟消息,主要有以下两个原因:

1. delayed index队列受到内存限制

一条延迟消息的 delayed index 由三个 long 组成,对于小规模的延迟消息来说,内存开销并不大。但由于 index 队列是 subscription 级别,对于 topic 的同一个 partition 来说,有多少个 subscription 就需要维护多少个 index 队列;同时,由于延迟消息越多、延迟的时间越长,index 队列内存占用也会更多。

2. delayed index队列重建时间开销

上面有提到,如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建 delayed index 队列。对于跨度时间长的大规模延迟消息,重建时间可能会到小时级别。为了减小 delayed index 队列重建时间,虽然可以给 topic 分更多的 partition 提高重建的并发度,但没有彻底解决重建时间开销问题。


六、Pulsar延迟消息投递未来工作

Pulsar 目前的延迟消息投递方案简单高效,但处理大规模延迟消息时仍然存在风险。关于延迟消息投递,社区和数据平台部 MQ 团队下一步将聚焦在支持大规模延迟消息。目前讨论的方案是在 delayed index 队列加入时间分区,Broker 只加载当前较近的时间片 delayed index 到内存,其余时间片分区持久化磁盘,示例图如下图所示:

如何进行Apache Pulsar 延迟消息投递

上图中,我们按 5 分钟的间隔对 delayed index 队列进行分区,m5 和 m1 放在了 time partition 1,由于延迟时间最近,放在了内存;m4 和 m3 在 time partition 2,延迟时间比较靠后,index 存储在了磁盘。该方案不仅可以减少 delayed index 队列重建时间开销,还可以降低对内存的依赖。

看完上述内容,你们掌握如何进行Apache Pulsar 延迟消息投递的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

推荐阅读:
  1. 怎么使用Apache Pulsar Functions进行简单事件处理
  2. 如何进行Apache Pulsar 与 Apache Kafka 在金融场景下的性能对比分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

pulsar

上一篇:关于sqlmap的问题怎么分析

下一篇:Python的三大开源数据分析工具是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》