您好,登录后才能下订单哦!
今天小编给大家分享一下RocketMQ事务消息怎么保证消息的可靠性和一致性的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
在发送事务消息的时候,会加一个标识,表示这个消息是事务消息。broker接收到消息后,在我们之前看的代码里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage会判断是否是事务消息。
if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }
sendTransactionPrepareMessage=true表示是事务消息,所以走了一个单独的逻辑。
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) { return store.asyncPutMessage(parseHalfMessageInner(messageInner)); }
这里parseHalfMessageInner这个方法里面开始了偷梁换柱,把topic和queueId都改了,把原本的信息先存在变量里面。所以实际上这个消息发到了半消息专有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
然后其他代码还是和普通的消息一样,就是把事务消息做了转发,存在了RMQ_SYS_TRANS_HALF_TOPIC里面。
到这里发送半消息就成功了,然后最后客户端发送了半消息之后,会查一下本地事务的情况是否完成。这里有3种情况:commit、rollback、未知。完成和回滚都是确认的状态,这个比较好处理,比较难的是未知。我们先看能得到确认结果的情况。
如果完成和回滚,会给客户端发送结束事务的消息,这个消息叫END_TRANSACTION,包括消息里面包括了之前发送的半消息的id和offset。
broker处理的代码在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根据offset拿到半消息,然后如果是commit,就是把原本的topic和queueId还原,发到原本的队列里面,这样就可以正常消费了。然后把这个半消息“删除”。如果是rollBack,也是拿到这个半消息,然后直接“删除”就可以了。接下来看一下怎么“删除”。
为什么我删除会打引号呢?因为半消息其实就是跟正常的消息一样,存在commitLog文件里面,mq的设计,就没有删除这个功能。所以所谓的删除其实就是把这个消息消费掉,不做任何处理,就是删除了。
想象一下,这个半消息有commit/rollBack/未知,3种状态,未知的肯定不能删除,那他怎么知道哪些消息是可以删除的呢?总不能所有的都再去客户端查一下事务的结果吧?mq怎么做的呢?前面提到的删除其实就是把这些commit和rollBack处理过后的半消息,再保存起来,后面消费半消息的数据的时候,只要从里面查一下是否需要删除就可以了。
这里又有一个问题,怎么把需要删除的半消息存起来呢?mq存储数据就是commitLog,所以其实这些需要删除的数据,就是又发到了一个特定的topic里面。这个topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意区分,原本半消息的topic名字是half_topic,这个topic名字是op_half_topic,存储的是处理过后,可以删除的半消息。
所以说前面提到的带引号的“删除”,就是把消息发到op_half_topic就表示是删除了,这个op_half_topic消息的内容就是half_topic的offset。那么现在需要有个地方,来消费half_topic,然后判断是否存在于op_half_topic,如果是表示可以删除了,如果不是,就接着保存起来。
处理逻辑就在TransactionalMessageCheckService这个定时任务中。具体是在TransactionalMessageServiceImpl#check方法里面
@Override public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; // 先拿到半消息 Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); // 拿到半消息的最小偏移量 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 拿到op_half的最小偏移量 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); // 拉取op的消息(32条),op消息内容是half的offset,跟half_topic的最小offset比较,如果op的小于最小的,就说明已经处理过了,放在doneOpOffset,反之,则说明还没处理过,就先放在removeMap里面 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; } // single thread int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; // 然后对half_topic进行处理 while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } // 如果这个offset已经处理过了,就接着处理下一个 if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { // 如果没有处理过,就要把数据捞出来重新投递 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout || valueOfCurrentMinusBorn <= -1; if (isNeedCheck) { // 重新投递 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 再重新确认事务 listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } // 更新offset if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } } } catch (Throwable e) { log.error("Check error", e); } }
我讲解一下这个代码做了啥。我们先明确这个代码是要实现什么功能。就是消费half_topic,然后去根据op_half_topic的数据来判断half_topc的消息是否被处理过,处理过了就直接忽略、丢弃,如果没有处理过,就“保留”这个消息,等待后面事务确认了再处理。
这里“保留”我也是加了引号,因为mq消费是一条一条按顺序消费,如果中间有一个数据卡住了,后面数据就没法消费了。所以这里“保留”,其实也是消费了,只是他消费到了不确定结果的消息,他是重新投递到了half_topic,来实现“保留”的目的。
好了,明确了这个代码实现的功能,我们来一步步看一下细节。
首先是拿到half_topic和op_half_topic的offset,知道现在是消费到了哪里。然后去拉取op_half_topic,每次32条,op_half消息内容存的是half_topic的offset,只要判断这条op_half里面的offset小于half_topic的offset,就表示已经消费过了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示还没消费,放入removeMap,就表示这个半消息可以放心删除了。
这一步,通过消费op_half,跟half_topic的minOffset做比较,构建了doneOpOffset,和removeMap。
然后就是消费half_topic的消息,只要判断每条消息的offset是否在removeMap中,就表示可以删除,放入doneOpOffset中,直接消费下一条数据,所以这里其实也不用真的拉取half_topic的消息,只要用offset来判断就行,消费过了,offset+1,就可以去判断下一条消息。
如果half_topic的offset没有在removeMap中,就表示暂时还不知道结果,这时候就重新发送到half_topic,重新投递之后,然后给客户端发送一个检查事务的请求,客户端检测过后,还是用之前的END_TRANSACTION命令,再发给broker,broker就会放到op_half里面,等于就是重新发了一个半消息的流程,实现了闭环。
最后就是更新两个topic的offset了。之前的doneOpOffset保存下来,就是为了更新op_half的offset,只有都处理过了,才会更新,如果中间有一个没有处理,就会阻塞在那条消息。
以上就是“RocketMQ事务消息怎么保证消息的可靠性和一致性”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。