SpringBoot中如何使用RabbitMQ延时队列

发布时间:2021-07-08 17:12:38 作者:Leah
来源:亿速云 阅读:154

这篇文章将为大家详细讲解有关SpringBoot中如何使用RabbitMQ延时队列,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

1.什么是MQ

MQ,是一种跨进程的通信机制,用于上下游传递消息。

在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

为什么会产生消息列队?

  1. 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;  不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

延时列队的使用场景?

  1. 订单业务:在淘宝或者京东购买东西,用户下单后未付款则30分钟后取消订单。  短信通知:手机用户交完话费后,几分钟之内将会收到缴费信息

2.什么是RabbitMQ(这里就做了一下简单介绍)

RabbitMQ是一种消息队列 ,用于常见的进程通信。支持点对点,请求应答和发布订阅模式 并且提供多种语言的支持。常见的java,c#,php都支持。

常被用在异步处理,应用解耦。流量消锋等复杂的业务场景中。和java的kafka一样都属于消息中间件。

下载地址:

https://www.rabbitmq.com/download.html

进入RabbitMQ官网

1.第一步

第二步

下载好后不要着急安装RabbitMQ,我们这里还需要安装Erlang

下载地址:http://www.erlang.org/download/otp_win64_17.3.exe

安装步骤

步骤一

步骤二

步骤三

步骤四

安装完成

现在安装RabbitMQ

步骤一

步骤二

步骤三

安装完成

启动RabbitMQ管理工具

开始菜单 — 最新添加 — 展开 — 选中双击

输入命令:rabbitmq-plugins enable rabbitmq_management

效果如果图

在浏览器中输入地址查看:http://127.0.0.1:15672/

出现次页面代表成功,默认用户和密码都是guest/ guest

若不出现此页面,就是安装失败了,不要慌,多半问题在系统用户名必须是中文(放心有解决办法):

Windows下安装RabbitMQ后,按正常RabbitMQ会自动注册服务并自动启动,但是如果有的道友不注意中英文目录就会出现服务启动后几秒钟自动停止,而且反反复复。

出现这种情况一般都是由我们的用户名是中文,而导致默认的DB和log访问出现问。所以我建议以后大家在使用windows操作系统的时候尽量用英文来命名文件或目录,这样会极大的减小以后安装软件出现莫名其妙的问题的bug。

接下来我们先卸载我们的RabbitMQ,然后在我们的系统变量里设置一个RABBITMQ_BASE 的变量路径为一个不含英文的路径 比如 E:\rabbit,最后我们重新安装RabbitMQ即可,然后就会看到RabbitMQ服务自动注册了,并且不会自动停止。

SpringBoot整合RabbitMQ

1.添加依赖

pom.xml中添加 spring-boot-starter-amqp的依赖

<!-- spring-boot-starter-amqp的依赖 -->      <dependency>   <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

其他依赖

<dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>      </dependency>        <dependency>        <groupId>org.projectlombok</groupId>        <artifactId>lombok</artifactId>        <optional>true</optional>      </dependency>        <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-test</artifactId>        <scope>test</scope>        <exclusions>          <exclusion>            <groupId>org.junit.vintage</groupId>            <artifactId>junit-vintage-engine</artifactId>          </exclusion>        </exclusions>      </dependency>            <dependency>        <groupId>junit</groupId>        <artifactId>junit</artifactId>        <version>4.12</version>        <scope>test</scope>      </dependency>

application.yml文件中配置rabbitmq相关内容

spring:  rabbitmq:   host: localhost   port: 5672   username: guest   password: guest

这里我们环境就搭建起来了

2.具体编码实现

配置列队

package com.example.spring_boot_rabbitmq;        import lombok.extern.slf4j.Slf4j;  import org.springframework.amqp.core.*;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;    import java.util.HashMap;  import java.util.Map;    /**   * @author:zq   * @date: Greated in 2019/12/19 11:46   * 配置队列   */    @Configuration  @Slf4j  public class DelayRabbitConfig {      /**     * 延迟队列 TTL 名称     */    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";    /**     * DLX,dead letter发送到的 exchange     * 延时消息就是发送到该交换机的     */    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";    /**     * routing key 名称     * 具体消息发送在该 routingKey 的     */    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";      public static final String ORDER_QUEUE_NAME = "user.order.queue";    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";    public static final String ORDER_ROUTING_KEY = "order";      /**     * 延迟队列配置     * <p>     * 1、params.put("x-message-ttl", 5 * 1000);     * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)     * 2、rabbitTemplate.convertAndSend(book, message -> {     * message.getMessageProperties().setExpiration(2 * 1000 + "");     * return message;     * });     * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制     **/    @Bean    public Queue delayOrderQueue() {      Map<String, Object> params = new HashMap<>();      // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,      params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);      // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。      params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);      return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);    }    /**     * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。     * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,     * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。     * @return DirectExchange     */    @Bean    public DirectExchange orderDelayExchange() {      return new DirectExchange(ORDER_DELAY_EXCHANGE);    }    @Bean    public Binding dlxBinding() {      return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);    }      @Bean    public Queue orderQueue() {      return new Queue(ORDER_QUEUE_NAME, true);    }    /**     * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。     * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。     **/    @Bean    public TopicExchange orderTopicExchange() {      return new TopicExchange(ORDER_EXCHANGE_NAME);    }      @Bean    public Binding orderBinding() {      // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键      return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);    }    }

创建一个Order实体类

package com.example.spring_boot_rabbitmq.pojo;  import lombok.Data;  import java.io.Serializable;  /**  * @author:zq  * @date: Greated in 2019/12/19 11:49  */ @Data public class Order implements Serializable {   private static final long serialVersionUID = -2221214252163879885L;    private String orderId; // 订单id    private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消    private String orderName; // 订单名字  }

接收者

package com.example.spring_boot_rabbitmq;  import com.example.spring_boot_rabbitmq.pojo.Order; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;  import java.util.Date;  /**  * @author:zq  * @date: Greated in 2019/12/19 11:53  * 接收者  */  @Component @Slf4j public class DelayReceiver {   @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})   public void orderDelayQueue(Order order, Message message, Channel channel) {     log.info("###########################################");     log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]", new Date(), order.toString());     if(order.getOrderStatus() == 0) {       order.setOrderStatus(2);       log.info("【该订单未支付,取消订单】" + order.toString());     } else if(order.getOrderStatus() == 1) {       log.info("【该订单已完成支付】");     } else if(order.getOrderStatus() == 2) {       log.info("【该订单已取消】");     }     log.info("###########################################");   }  }

发送者

package com.example.spring_boot_rabbitmq;   import com.example.spring_boot_rabbitmq.pojo.Order; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  import java.util.Date;  /** * @author:zq * @date: Greated in 2019/12/19 11:55 * 发送者 */ @Component @Slf4j public class DelaySender {   @Autowired   private AmqpTemplate amqpTemplate;    public void sendDelay(Order order) {     log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );     this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {       // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间       message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");       return message;     });   }  }

测试,访问http://localhost:8080/sendDelay查看日志输出

package com.example.spring_boot_rabbitmq;import com.example.spring_boot_rabbitmq.pojo.Order;import org.springframework.web.bind.annotation.RestController;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;/** * @author:zq * @date: Greated in 2019/12/19 11:57 * 测试 */@RestControllerpublic class TestController {  @Autowired  private DelaySender delaySender;  @GetMapping("/sendDelay")  public Object sendDelay() {    Order order1 = new Order();    order1.setOrderStatus(0);    order1.setOrderId("123456");    order1.setOrderName("小米6");    Order order2 = new Order();    order2.setOrderStatus(1);    order2.setOrderId("456789");    order2.setOrderName("小米8");    delaySender.sendDelay(order1);    delaySender.sendDelay(order2);    return "ok";  }}

关于SpringBoot中如何使用RabbitMQ延时队列就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. RabbitMQ实现延时队列(死信队列)
  2. SpringBoot:RabbitMQ 延迟队列

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

springboot rabbitmq

上一篇:springboot中怎么实现文件上传

下一篇:Springboot中怎么利用echarts实现可视化

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》