RocketMQ顺序消息是什么意思

发布时间:2021-07-08 17:17:02 作者:chen
来源:亿速云 阅读:321

这篇文章主要介绍“RocketMQ顺序消息是什么意思”,在日常操作中,相信很多人在RocketMQ顺序消息是什么意思问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ顺序消息是什么意思”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

我们知道消息队列的特性导致其消息不是顺序进行消费的,RocketMQ没有提供所谓的顺序消息来供我们使用,但是有时候一些场景需要需要顺序的去接收消息。今天我们重点讨论一下如何实现这种功能。虽然RocketMQ没有提供顺序消费但是我们可以变相的来实现它。我们知道消息需要放入队列中才能被消费,而队列本身的特性就是FIFO先进先出,我们可以将需要顺序的消息放入一个队列中,则就可以实现这个功能。

1、场景分析

场景:两个业务系统之间消息通过MQ传输,业务系统A数据传输至业务系统B,要求消息准确、实时。但是业务系统A的原始的数据可能会存在修改的情况,要求业务系统B需要实时的更改。保证消息的实时性、一致性、可靠性。

1.1、默认消息生产过程分析

RocketMQ顺序消息是什么意思

主题test_1发送消息到RocketMQ的双主Broker1、Broker2上,每个broker上test_1主题对应4个队列,消息id为001001的消息存在创建(create)、更新(update),MQ集群是双主的,使用默认的消息发送算法,消息将轮询的丢弃到各个队列中。

默认按照轮询算法将消息分发到各个broker的不同的队列中,保证每个队列的消息都是均匀分配,集群消费且消费者多个时,多个消费者会分散到不同的队列中消费消息,保证消息能够实时消费。

因为消息本身是放入到不同的队列中消费的就不能保证其顺序性,更新的消息可能是最先被消费掉,创建的消息消费时业务需要判断消息是否是最新的,需要进行查库验证,是则更新,不是则丢弃保存最新的消息,保证业务系统A与业务系统B,数据的一致性,增加了业务处理的难度。

1.2、顺序消息生产过程分析

RocketMQ顺序消息是什么意思

我们在生产消息的时候可以将同一个消息ID的消息放入到相同的队列中,保证同一类需要顺序消费的消息放入到同一个队列中,这样队列中的消息就是有序的。但是同时也需要保证消息的消费也是有序的才可以保证消息的顺序消费。

集群模式下同一个消费组内的消费者共同承担其订阅主题下的消息队列的消费,同一个消息消息队列在同一时刻只会被消费组内的一个消费者消费,一个消费者同一时刻可以分配多个消费队列。

集群模式下的普通消息,线程池默认创建20个(可配置)线程。多线程从队列中拉取消息,提高并发加快消息的消费。

集群模式下的顺序消息,顺序消费是单线程,一个线程只能去一个队列获取数据,当需要获取某个队列中的消息时,需要锁定该消息队列(PS:后面会根据源码详细分析其原理)。

广播模式下的顺序消息,顺序消费是单线程,直接进行消费,无需锁定消息队列,因为相互之间无竞争

2、编写Producer

2.1、自定义队列选择器

public static void main(String[] args) throws Exception {
	//Instantiate with a producer group name.
	DefaultMQProducer producer = new DefaultMQProducer("order_group_test_1");
	//Launch the instance.
	producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
	producer.start();
	String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
	for (int i = 0; i < 100; i++) {
		int orderId = i % 10;
		//Create a message instance, specifying topic, tag and message body.
		Message msg = new Message("order_test_1", tags[i % tags.length], "KEY" + i,
				("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
		SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
			@Override
			public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
				Integer id = (Integer) arg;
				int index = id % mqs.size();
				return mqs.get(index);
			}
		}, orderId);

		System.out.printf("%s%n", sendResult);
	}
	//server shutdown
	producer.shutdown();
}

我们根据100个消息的序号来放入到不同的队列中,根据序号%10取模,相同的放入到一个队列中。

2.2、提供的队列选择器

上面是我们实现自定义队列选择器的算法,RocketMQ也提供了三种队列选择算法

RocketMQ顺序消息是什么意思

从图中我们可以看到一共三种

我们分别来看一下实现

2.2.1、SelectMessageQueueByHash

public class SelectMessageQueueByHash implements MessageQueueSelector {
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		int value = arg.hashCode();
		if (value < 0) {
			value = Math.abs(value);
		}
		value %= mqs.size();
		return ((MessageQueue) mqs.get(value));
	}
}

我们差看源码可以发现,通过提供的参数获取其HashCode,如果为负值则取绝对值,hash值与队列的总数进行取模获取其队列。

2.2.2、SelectMessageQueueByRandom

public class SelectMessageQueueByRandom implements MessageQueueSelector {
	private Random random;

	public SelectMessageQueueByRandom() {
		this.random = new Random(System.currentTimeMillis());
	}
	
	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		int value = this.random.nextInt(mqs.size());
		return ((MessageQueue) mqs.get(value));
	}
}

生成一个队列数以内的随机数,通过随机数获取队列。

2.2.3、SelectMessageQueueByMachineRoom(未实现)

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
	private Set<String> consumeridcs;

	public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
		return null;
	}

	public Set<String> getConsumeridcs() {
		return this.consumeridcs;
	}

	public void setConsumeridcs(Set<String> consumeridcs) {
		this.consumeridcs = consumeridcs;
	}
}

我们发现其select方法为null,其实是没有进行实现。需要我们自己实现。

虽然RocketMQ提供了三种(其实2种,SelectMessageQueueByMachineRoom未实现)队列选择算法,但是不建议使用,不同的业务规则其选择队列的算法也不尽相同,建议手动实现。

3、编写Consumer

public static void main(String[] args){
	try {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
		consumer.setConsumerGroup("order_consumer_test_push");
		consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
		consumer.subscribe("order_test_1", "*");
		consumer.registerMessageListener(new MessageListenerOrderly(){

			@Override
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> paramList,
					ConsumeOrderlyContext paramConsumeOrderlyContext) {
				try {
					for(MessageExt msg : paramList){
						String msgbody = new String(msg.getBody(), "utf-8");
						System.out.println("  MessageBody: "+ msgbody);//输出消息内容
					}
				} catch (Exception e) {
					e.printStackTrace();
					return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
				}
				return ConsumeOrderlyStatus.SUCCESS; //消费成功
			}
		});
		consumer.start();
	} catch (Exception e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
}

查看结果

RocketMQ顺序消息是什么意思

我们找到两个典型的一组数字,尾号是6和9的,尾号是6的计较集中,尾号是9的比较分散,但是结果都是一样的,按照顺序消费的。

到此,关于“RocketMQ顺序消息是什么意思”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. RocketMQ事务消息如何实现
  2. RocketMQ事务消息怎样实现

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rocketmq

上一篇:Springboot中怎么使用shiro和jwt实现前后端分离

下一篇:RocketMQ的安装部署方式

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》