您好,登录后才能下订单哦!
今天就跟大家聊聊有关如何整合RocketMQ事务消息,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ选型
RocketMQ提供了事务消息回查,查看官方Demo
@SpringBootApplication public class ProducerApplication implements CommandLineRunner { private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @Resource private RocketMQTemplate rocketMQTemplate; @Value("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Override public void run(String... args) throws Exception { // Send transactional messages testTransaction(); } private void testTransaction() throws MessagingException { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = MessageBuilder .withPayload("Hello RocketMQ " + i) .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i) .build(); SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, springTransTopic + ":" + tags[i % tags.length], msg, null); System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n", msg.getPayload(), sendResult.getSendStatus()); Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } } @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(transId, status); if (status == 0) { // Return local transaction with success(commit), in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload()); return RocketMQLocalTransactionState.COMMIT; } if (status == 1) { // Return local transaction with failure(rollback) , in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload()); return RocketMQLocalTransactionState.ROLLBACK; } System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break; case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break; } } System.out.printf("------ !!! checkLocalTransaction is executed once," + " msgTransactionId=%s, TransactionState=%s status=%s %n", transId, retState, status); return retState; } } }
需要在testTransaction()
中发送消息,然后在TransactionListenerImpl
类中实现executeLocalTransaction()
方法才能执行整个本地事务,然后在checkLocalTransaction()
中实现事务消息回查。
查看源代码可以知道testTransaction()
方法和executeLocalTransaction()
是在同一个线程当中,只不过包装RocketMQTemplate
中。
消息发送的事务消息回调查询和本地事务没严格的先后顺序,怎么保证,回查时,事务操作肯定已经完成。
事务消息回调使用transaction_id
查询,那么transaction_id
存放在哪里,同时保证transaction_id
关联的业务操作执行成功。
怎么把事务回调查询操作隔离出业务,保证不侵入代码中。
下游消费者怎么保证接口幂等性。
下游消费者怎么提高幂等性查询性能。
怎么把幂等性操作隔离出业务,保证不侵入代码中。
因为数据库或者其他业务操作可能会存在延时,那么不能保证回查时业务操作已完成,那么可以多次回查,并设置最大回查次数,同时不能丢弃MQ消息持久化,方便手动恢复。
可以使用本地消息表落地的发送消息,同时可以采用切面、继承等等方式将落地消息隔离出业务代码之外,保证本地消息落库不侵入,注意必须要保证本地消息落库和本地业务落库在同一个事务之内!
事务消息回查可以使用第2点的本地消息表,根据transaction_id
查询,判断本地事务的执行结果,也和第2点一样,可以使用一些方式将事务消息回查代码隔离出业务代码,保证不侵入。
幂等性的方法:
数据库唯一约束
状态机CAS单向流转
消息去重表
,在执行本地业务前,先对redis判断是业务id是否存在,存在则直接返回消费成功,在执行本地业务之后,可以将消费信息异步落地到redis当中。注意:需要保证本地业务和消息幂等性操作在同一个事务当中,同时redis落地操作在事务之外。
比较好的方案应该是数据库唯一约束 + 消息去重表,在消息去重表中对业务id设置唯一约束,同时将消息落地操作隔离出本地业务之外,保证不侵入。
定时清理历史的本地消息表(消息去重表)。
看完上述内容,你们对如何整合RocketMQ事务消息有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。