您好,登录后才能下订单哦!
本篇内容主要讲解“怎么实现Java异步延迟消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么实现Java异步延迟消息队列”吧!
系统在收到一个请求后,完整链路同步顺序调用,实现起来简单易懂,这也是所有功能在实现时最初选择的方案。这种方案实现起来简单,在并发量不高且调用链路不长的情况下,是最好的选择方案,因为简单所以不容易出错和容易维护。如图所示,一个请求按照1-2-3-4的顺序。
优点:实现简单易懂、易维护。
缺点:并发量不高。
由于系统是账户系统,每天的交易量大概在50-100w左右,且大部分交易集中在某个时间点,这就致使系统必须要支持高并发,异步方案也是该系统在最早设计时就已经选择好了。异步化后的请求必须要保证成功且需要有持久化的能力,基于此选择了消息中间件,在对比了几款中间件后,发现rabbitmq比较符合现有的业务。
如图所示,同颜色的线属于一个同步流程,不同线的表示异步,流程讲解如下:
主流程1->2->3:应用端发起交易,账户系统走完主流程后,将调用外部系统的请求放入MQ消息队列,则直接响应应用端成功,应用端不关心后续异步操作
MQ通知①:MQ服务器收到消息后,回调账户系统,通知已收到消息。
消息消费⑴->⑵->⑶:监听到队列有新消息,调用外部系统,完成请求。
优点:快速响应应用端,提高并发量。
缺点:在主流程的第2步,可能由于网络原因导致MQ没收到消息,造成主流程成功响应应用端成功,但是由于消息丢失造成后续的异步处理失败。
基于方案2的缺点,为了避免由于网络异常造成的发送消息成功但实际MQ没收到消息的情况,增加了一种发送成功后保证MQ能100%收到消息的机制,即使MQ宕机也能知道哪些消息是MQ没收到的。
如图所示,与方案2的对比,不同的是增加了缓存标志,即在发送消息到MQ前,先将该消息缓存到redis作为一个标志(如 2 保存标志),表明发送进行中,等到MQ收到消息并回调通知成功时才将该标志删除(如 ② 删除标志);
增加了定时任务:
轮询redis里的消息,若指定时间内未收到通知,则重新发送该消息。该方案可能会造成重复发消息,所以在消费端需要做幂等控制
删除已成功但未收到通知的重复标志
基于方案3,基本能保证不会受网络异常的影响导致消息丢失的情况出现,至此,发送端的保证已经完成,但是消费端还有些不理想。
正常情况下,消费者在消费完消息后,会通知MQ告知已经消费成功,MQ收到后则从队列删除消息。如果告知消费失败,则该消息会重新回到队列重新被消费者监听并且获取。对应到RabbitMQ有以下三种情况:
ACK:成功消费消息,MQ删除消息
NO_ACK:消费消息失败,消息重新队列,等待后续重新被消费
Rejected:消费消息失败,MQ删除该消息,如果队列存在死信队列的话,则将该消息移到死信队列,相当于垃圾箱,不会被重新消费。
基于以上三种情况,如果消息消费失败时,希望的是消息重回队列,隔一段时间后再被消费,也就是消息具备延迟的效果,但是找遍了官网,发现不支持延迟的机制。如果不延迟消费,那么消息一回到队列又会马上被消费,如果外部系统在一段时间内没有修复,那么在这段时间内的重复消费都属于无效重试且浪费性能。
脑壳疼ing。。。下班回家在地铁上头脑风暴时,终于灵机一动,联想到了死信队列的一个功能,那就是可以设置消息在指定时间内没被消费的话,就认定为是死亡消息,则该消息会被转到对应的死信队列。比如,正常队列A,B作为A的死信队列,设置A队列的消息的死亡时间为n秒,如果n秒内没被消费,则会自动转移到B队列。如图当时马上在备忘录记下来。
如图所示,在一个消息消费失败后的做法如下:
消息消费失败发送no_ack,让消息回到队列,并记录失败次数
重复消费失败超过三次后,发送rejected,让消息转移到死信队列B
由于死信队列B无消费者,所以消息在n秒后会转移到死信队列C(在这一步起到延迟的效果)
队列C的消费者消费死亡消息,将消息重新发送到正常队列A
基于以上的最终方案,在测试同事的压测下,大概500TPS/秒,不过没有模拟数据库方面的瓶颈(往数据库插入一定量级的数据)。
由于代码跟项目有关,所以就暂时不发源码,等我把公司业务相关的移除掉后,再基于最终方案做个demo发到git上。
后续会进行优化,比如:
顺序消费
让需要顺序消费的消息发往同个队列(取模的方式等等),每条队列只有一个消费者。单个消费者可能会有性能问题,可在消费者应用程序里弄内存队列再进行并发消费。
消费堆积
消费者宕机造成堆积:消费者宕机了,由于生产者还在生产消息,经过一段时间后就会堆积海量的消息,如果MQ磁盘有限,即使消费者恢复后如果不能快速消费的话,可能会使MQ服务器磁盘爆满。
消费者虽然恢复了,但是一时间堆积了海量的数据,消费完需要一定的时间。为了不对正常的MQ造成影响,一种解决方案是先快速消费调堆积的消息,但原来的消费者是带有逻辑的,处理完一条消息可能需要200ms左右,所以为了快速消费,就先启动一个临时消费者,只做转发逻辑大概消费一条消息只需要10ms。临时消费者将消息转发到一个临时MQ,接着再启动n个原来带逻辑的消费者去消费,这样可以达到快速消费堆积消息的效果。
生产快于消费导致:供过于求本身应该尽量优化消费端,找出原因,如果消费者本身就是慢于生产者的话,那这样只能增加消费者数量了。
持久化带来的性能影响
同步持久化:生产消息后保存到磁盘才返回给生产者成功,安全性高不会丢失消息,性能不高
异步持久化:性能高,会丢失数据
高可用
集群模式:其他节点保存某台数据节点的队列元数据,当消费者消费数据时,其他节点到实际保存消费的节点拉取数据。
优点:避免所有的生产者都往同个MQ写数据,
缺点:做不到分布式的效果,一个生产者只能往一个MQ写,其他的消费者若要消费的话还是要到原来的MQ上去拉取消息
镜像集群模式:所有节点都保存同一份数据
分布式:数据分发在各个节点
到此,相信大家对“怎么实现Java异步延迟消息队列”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。