pushConsumer拉取消息流程是怎样的

发布时间:2021-12-18 14:49:42 作者:iii
来源:亿速云 阅读:143

这篇文章主要介绍“pushConsumer拉取消息流程是怎样的”,在日常操作中,相信很多人在pushConsumer拉取消息流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”pushConsumer拉取消息流程是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

这是一段RocketMq经典的consumer异步获取broker消息的代码:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");    consumer.setNamesrvAddr(Constants.NameServerAddr);    consumer.subscribe("topic01","*");    consumer.setMessageModel(MessageModel.BROADCASTING);//广播消息,所有相同组,定于topic的消费端都能收到消息    //consumer.setMessageModel(MessageModel.CLUSTERING);//集群消息--默认(相同组内的topic,集群消息只有一端会接收到)    consumer.registerMessageListener(new MessageListenerConcurrently(){      @Override      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,          ConsumeConcurrentlyContext consumeConcurrentlyContext) {        for (MessageExt messageExt:list){          System.out.println(new java.lang.String(messageExt.getBody()));        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      }    });    consumer.start();  }

consumer start()方法跟踪

  1. this.defaultMQPushConsumerImpl.start();

  2. 刚启动serviceState状态为 CREATE_JUST,进入这个状态的switch处理逻辑

3. 先用checkCoing()检查consumer的各个配置是否配置ok

4. 然后 copySubscription()用于根据subject构建本地的rebalance的conhurrentHashMapInner

5. 接着构建MqClientFactory的一个Instance

6. 构建PullWrapper,用于去Broker注册过滤消息

7. 再根据MessageMode是广播模式还是集群模式获取offset。(广播模式是从consumer本地的store获取,集群模式则是需要去broker去请求获取)

8. 根据监听消息的类型是OrderLy还是Concurrently去构建一个consumeMessageService对象

9.启动刚才创建的consumerMessageService对象,调用其start方法

10. 使用MqClientFactory Instance实例registerConsumer进行注册

11. 把当前的serviceState状态变为Running状态

12.然后就开始从broker获取消息,请看下面的pushConsumer拉取消息流程

pushConsumer拉取消息流程介绍 

consumer  --DefaultMqPushConsumerImpl 使用pullMessage(pullRequest)拉取消息,pullAPIWrapper.pullKernelImpl(传递pullReuest,回调callback等参数)根据是否同步pullMessageSync还是异步pullMessageAsync, 拉取回来的消息PullResult经过解析处理存放到ProcessQueue 队列里的TreeMap(offset,messageExt)

到此,关于“pushConsumer拉取消息流程是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

推荐阅读:
  1. Docker如何拉取镜像
  2. RPC的消息交互流程是怎样的

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

上一篇:综合三个Bug怎样实现Discord桌面应用RCE漏洞

下一篇:如何进行springboot配置templates直接访问的实现

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》