Message Queue Selector如何实现顺序消费

发布时间:2021-10-19 20:31:31 作者:柒染
来源:亿速云 阅读:248

Message Queue Selector如何实现顺序消费,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

顺序消息的定义:

顺序消息是指消息的消费顺序和生产顺序相同,在某些场景下,必须保证顺序消息。比如订单的生成、付款、发货.顺序消息又分为全局顺序消息和部分顺序消息,全局顺序消息指某一个topic下的所有消息都要保证顺序;部分顺序消息只要保证某一组消息被顺序消费。对于订单消息来说,只要保证同一个订单ID的生成、付款、发货消息按照顺序消费即可。

部分顺序消费实现原理:

1. 发送端:保证相同订单ID的各种消息发往同一个MessageQueue(同一个Topic下的某一个queue)

2.消费端:保证同一个MessageQueue里面的消息不被并发处理 (同一个Topic的不同MessageQueue是可以同时消费的)

        DefaultMQProducer producer = new DefaultMQProducer("local-test-producer");
		producer.setNamesrvAddr("10.76.0.38:9876");
		producer.start();
		for (int i = 0; i < 1000; i++) {
			Order order  = new Order();
			order.orderId = i;
			order.status = "生成";

			Message msg1 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult1={}",sendResult1);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="付款";

			Message msg2 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			log.info("sendResult2={}",sendResult2);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);



			order.status="发货";
			Message msg3 = new Message("local-test-producer",
					"TagA",
					JsonUtils.toJson(order).getBytes()
			);
			producer.send(msg2, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					return null;
				}
			}, order.orderId);
			Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);


			SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() {
				@Override
				public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
					Integer id = (Integer) arg;
					int index = id % mqs.size();
					return mqs.get(index);
				}
				//MessageQueueSelector保证同一个orderId的消息都存储在同一个MessageQueue。
			}, order.orderId);
			log.info("sendResult3={}",sendResult1);
		}

消费端主要逻辑如下,主要MessageListenerOrderly回调实现同一个MessageQueue里面的消息不会被并发消费:

       //同一个MessageQueue里面的消息要顺序消费,不能并发消费。
		//但是同一个Topic的不同MessageQueue是可以同时消费的
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2");
		consumer.setNamesrvAddr("10.76.0.38:9876");
		consumer.subscribe("test", "");
		consumer.setPullBatchSize(1);
		consumer.setConsumeThreadMin(1);
		consumer.setConsumeThreadMax(1);
	//	consumer.registerMessageListener(new MessageListenerConcurrently() {
		consumer.registerMessageListener(new MessageListenerOrderly() {
			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				List<String> messages = new ArrayList<>();

				for (MessageExt msg : msgs) {
					messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost());
				}
				System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages);
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});
		consumer.start();
		Thread.currentThread().join();

源码分析:

我们知道在RocketMQ中是可以给一个消费者实例设置多个线程并发消费的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,

那MessageListenerOrderly是如何保证某一个时刻,只有一个消费者的某一个线程在消费某一个MessageQueue的呢?

就在Client模块的 ConsumeMessageOrderlyService里面,消费者端并不是简单的禁止并发处理,而是给每一个Consumer Queue加锁,

private final MessageQueueLock messageQueueLock = new MessageQueueLock();

在消费每个消息之前,需要先获取这个消息对应的Consumer Queue所对应的锁,保证同一个Consumer Queue的消息不会被并发消费,但是不同的Consumer Queue的消息是可以并发处理的。

看完上述内容,你们掌握Message Queue Selector如何实现顺序消费的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!

推荐阅读:
  1. Swfupload Error Code: -200 Message: 500
  2. 了解Handler,Looper, MessageQueue,Message的工作流程

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

message queue selector

上一篇:微信公众号开发中如何进行静默授权登录

下一篇:Apache Shiro怎么入门

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》