RocketMq事务消息发送代码的过程详解

发布时间:2020-07-17 17:08:10 作者:小猪
来源:亿速云 阅读:218

这篇文章主要讲解了RocketMq事务消息发送代码的过程详解,内容清晰明了,对此有兴趣的小伙伴可以学习一下,相信大家阅读完之后会有帮助。

一、RocketMq事务消息流程:

1、首先会向broker发送一个预请求消息,消费者不可见

2、回调执行本地事务(比如操作数据库)

3、事务执行成功后,再次发送消息给broker,告诉broker事务执行成功这个消息要提交,让消费者可见。如果本地事务执行超时,会返回一个unknow,broker会发送一个消息回查,检查消息是否执行成功。

二、RocketMq事务消息实例:

1、引入rocketMq相关的依赖:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>

2、创建一个TransactionProducer类:

public class TransactionProducer {

  public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
    //创建生产者并制定组名
    TransactionMQProducer producer = new TransactionMQProducer("rocketMQ_transaction_producer_group");
    //2.指定Nameserver地址
    producer.setNamesrvAddr("192.168.***.***:9876");
    //3、指定消息监听对象用于执行本地事务和消息回查
    TransactionListener listener = new TransactionListenerImol();
    producer.setTransactionListener(listener);
    //4、线程池
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
        Thread thread = newThread(r);
        thread.setName("client-tanscation-msg-check-thread");
        return thread;
      }
    });
    producer.setExecutorService(executorService);
    //5、启动producer
    producer.start();

    //6.创建消息对象,指定主题Topic、Tag和消息体 String topic, String tags, String keys, byte[] body
    Message message = new Message("Topic_transaction_demo", //主题
        "Tags", //主要用于消息过滤
        "Key_1", //消息唯一值
        ("hello-transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));

    //7、发送事务消息
    TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");

    producer.shutdown();
  }
}

3、发送事务消息还需要一个事务监听对象,它实现TransactionListener 接口,其中有两个方法作用分别是执行本地事务和消息回查:

public class TransactionListenerImol implements TransactionListener {
  //存储事务状态信息 key:事务id value:当前事务执行的状态
  private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  //执行本地事务
  @Override
  public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    //事务id
    String transactionId = message.getTransactionId();
    //0:执行中,状态未知 1:执行成功 2:执行失败
    localTrans.put(transactionId, 0);
    //业务执行,本地事务,service
    System.out.println("hello-demo-transaction");
    try {
      System.out.println("正在执行本地事务---");
      Thread.sleep(60000*2);
      System.out.println("本地事务执行成功---");
      localTrans.put(transactionId, 1);
    } catch (InterruptedException e) {
      e.printStackTrace();
      localTrans.put(transactionId, 2);
      return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.COMMIT_MESSAGE;
  }

  //消息回查
  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    //获取对应事务的状态信息
    String transactionId = messageExt.getTransactionId();
    //获取对应事务id执行状态
    Integer status = localTrans.get(transactionId);
    //消息回查
    System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
    switch (status) {
      case 0:
        return LocalTransactionState.UNKNOW;
      case 1:
        return LocalTransactionState.COMMIT_MESSAGE;
      case 2:
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
  }
}

看完上述内容,是不是对RocketMq事务消息发送代码的过程详解有进一步的了解,如果还想学习更多内容,欢迎关注亿速云行业资讯频道。

推荐阅读:
  1. RocketMQ事务消息学习及刨坑过程
  2. RocketMQ事务消息如何实现

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

rocket mq 过程详解

上一篇:简单的表单验证

下一篇:GLKMathRadiansToDegrees

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》