如何理解Java RabbitMQ的TTL和DLX

发布时间:2021-09-24 17:58:05 作者:柒染
来源:亿速云 阅读:212

这篇文章给大家介绍如何理解Java RabbitMQ的TTL和DLX,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

RabbitMQ的TTL

1、TTL概述

RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。设置TTL有两种方式:

  1. 第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;

  2. 第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL。

如果两种方式都设置了,则以设置的较小的为准。两者的区别:如果声明队列时设置了有效期,则消息过期了就会被删掉;如果是发消息时设置的有效期,消息过期了也不会被立马删掉,因为这时消息是否过期是在要投递给消费者时判断的。至于为啥要这样处理很容易想清楚:第一种方式队列的消息有效期都一样,先入队的在队列头部,头部也是最早要过期的消息,RabbitMQ起一个定时任务从队列的头部开始扫描是否有过期消息即可;第二种方式每条消息的过期时间不同,所以只有遍历整个队列才可以筛选出来过期的消息,这样效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投递给消费者时再判断删除,虽然删除的不够及时但是不影响功能,其实就是用空间换时间。

如果不设置TTL,则表示此消息永久有效(默认消息是不会失效的)。如果将TTL设为0,则表示如果消息不能被立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3.0以前支持的immediate参数,之所以所部分代替,是应为immediate参数在投递失败会有basic.return方法将消息体返回(这个功能可以利用死信队列来实现)。

2、设置消息有效期

2.1、通过队列设置有效期

还记得我们之前声明队列的方法吗,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),该方法的最后一个参数可以设置队列的属性,属性名为x-message-ttl,单位为毫秒。如果不清楚队列属性有哪些,可以查看web控制台的添加队列的地方。

如何理解Java RabbitMQ的TTL和DLX

具体代码如下:

//设置队列上所有的消息的有效期,单位为毫秒
Map<String, Object> argss = new HashMap<String , Object>();
arguments.put("x-message-ttl " , 5000);//5秒钟
channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;

查看控制台的队列列表如下:D表示持久化,TTL表示设置了消息的有效期。

如何理解Java RabbitMQ的TTL和DLX

过了几秒钟后发现消息已经不存在了。

如何理解Java RabbitMQ的TTL和DLX

也可以用RabbitMQ的命令行模式来设置:

rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

还可以通过HTTP接口调用:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' 
http://ip:15672/api/queues/{vhost}/{queuename}
2.2、通过发送消息时设置有效期

发送消息时basicPublish方法可以设置属性参数,里面通过expiration属性设置消息有效期,单位为毫秒,代码如下所示

Builder bd = new AMQP.BasicProperties().builder();
bd.deliveryMode(2);//持久化
bd.expiration("100000");//设置消息有效期100秒钟
BasicProperties pros = bd.build();
String message = "测试ttl消息";
channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());

另外也可以通过HTTPAPI 接口设置:

$ curl -i -u guest:guest -H "content-type:application/json"  -XPOST -d
'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}'  
http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish

完整的通过队列设置消息有效期、发布消息时通过属性设置有效期的代码如下:可以运行后,观察下控制台,可以发现同时设置时,消息的有效期是以较小的为准的。项目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git。

package cn.wkp.rabbitmq.newest.ttl;
 
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
/**
 * 
 * @ClassName: Send
 * @Description: 消息有效期 
 * @author wkg
 * @date: 2021年9月1日 下午11:28:22
 */
public class Send {
 
	private final static String EXCHANGE_NAME = "ttl_exchange";
	private final static String QUEUE_NAME = "ttl_queue";
 
	public static void main(String[] argv) throws Exception {
		// 获取到连接以及mq通道
		Connection connection = ConnectionUtil.getConnection();
		// 从连接中创建通道
		Channel channel = connection.createChannel();
 
		// 声明交换机
		channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
		
		//*****1:通过队列设置有效期 2:通过消息属性设置有效期,如果都设置了以较小的为准*****
		//声明队列
		Map<String, Object> arguments=new HashMap<String,Object>();
		//设置队列上所有的消息的有效期,单位为毫秒
		arguments.put("x-message-ttl", 5000);//5秒钟
		channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
		//绑定
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
		
		Builder bd = new AMQP.BasicProperties().builder();
		bd.deliveryMode(2);//持久化
		bd.expiration("100000");//设置消息有效期100秒钟
		BasicProperties pros = bd.build();
		String message = "测试ttl消息";
		channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
		System.out.println("Sent message:" + message);
//		 关闭通道和连接
		channel.close();
		connection.close();
	}
}

3、设置队列有效期(不常用,仅作了解)

如何理解Java RabbitMQ的TTL和DLX

上面在web管控台添加队列的时候,我们看到有一个x-expires参数,可以让队列在指定时间内 "未被使用" 的话会自动过期删除,未使用的意思是 queue 上没有任何 consumer,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多queue 会被创建出来,但是却从未被使用。

服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的queue 的超时时间将重新计算。 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 18000);  //队列有效期18秒
channel.queueDeclare("myqueue", false, false, false, args);

RabbitMQ的DLX

1、DLX是什么

DLX是Dead-Letter-Exchange的简写,意思是死信交换机。

它的作用其实是用来接收死信消息(dead message)的。那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

当消息在一个队列中变成了死信消息后,可以被发送到另一个交换机,这个交换机就是DLX,绑定DLX的队列成为死信队列。当这个队列中存在死信时, RabbitMQ 就会立即自动地将这个消息重新发布到设置的DLX 上去,进而被路由到绑定该DLX的死信队列上。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的TTL 设置为0 配合使用可以弥补imrnediate 参数的功能。

2、DLX有什么用

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。DLX还有一个非常重要的作用,就是结合TTL实现延迟队列(延迟队列的使用范围还是挺广的:比如下单超过多长时间自动关闭;比如我们接入过第三方支付系统的同学一定知道,我们的订单中会传一个notify_url用于接收支付结果知,如果我们给第三方支付响应的不是成功的消息,其会隔一段时间继续调用通知我们的notify_url,超过几次后不再进行通知,一般通知频率都是 0秒-5秒-30秒-5分钟-30分钟-1小时-6小时-12小时;比如我们的家用电器定时关机。。。。。。这些场景都是可以用延迟队列实现的)。

3、DLX使用方式

下面在web管控台添加队列的时候,我们看到有两个DLX相关的参数:x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是设置队列的DLX的;x-dead-letter-routing-key是设置死信消息进入DLX时的routing key的,这个是可以不设置的,如果不设置,则默认使用原队列的routing key。

如何理解Java RabbitMQ的TTL和DLX

客户端可以通过channel.queueDeclare方法声明队列时设置x-dead-letter-exchange参数,具体代码如下所示

channel.exchangeDeclare("dlx_exchange" , "direct"); //创建DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange" , "dlx_exchange ");//设置DLX
args.put("x-dead-letter-routing-key" , "dlx-routing-key");//设置DLX的路由键(可以不设置)
//为队列myqueue 添加DLX
channel.queueDeclare("myqueue" , false , false , false , args);

上面说的可能比较抽象,下面我们通过一个具体的例子,来演示一下DLX的具体使用:

package cn.wkp.rabbitmq.newest.dlx;
 
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class SendDLX {
 
	public static void main(String[] args) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		//声明一个交换机,做死信交换机用
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		//声明一个队列,做死信队列用
		channel.queueDeclare("dlx_queue", true, false, false, null);
		//队列绑定到交换机上
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
		
		channel.exchangeDeclare("normal_exchange", "fanout", true, false, null);
		Map<String, Object> arguments=new HashMap<String, Object>();
		arguments.put("x-message-ttl" , 5000);//设置消息有效期1秒,过期后变成私信消息,然后进入DLX
		arguments.put("x-dead-letter-exchange" , "dlx_exchange");//设置DLX
		arguments.put("x-dead-letter-routing-key" , "dlx.test");//设置DLX的路由键(可以不设置)
		//为队列normal_queue 添加DLX
		channel.queueDeclare("normal_queue", true, false, false, arguments);
		channel.queueBind("normal_queue", "normal_exchange", "");
		
		channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("测试死信消息").getBytes());
		System.out.println("发送消息时间:"+ConnectionUtil.formatDate(new Date()));
		
		channel.close();
		connection.close();
	}
}

上面是发送者的代码,运行后观察控制台可以看到如下所示:

死信队列dlx_queue的绑定如下,其已与死信交换机dlx_exchange(topic类型)进行了绑定,routing key为"dlx.*"

如何理解Java RabbitMQ的TTL和DLX

队列normal_queue的绑定如下,其已与交换机normal_exchange(fanout类型)进行了绑定

如何理解Java RabbitMQ的TTL和DLX

queues视图如下:DLX和DLK表示设置给normal_queue设置了死信交换机和死信消息的routing key,我们看到消息已经被路由到了死信队列上面。整个流程为:

如何理解Java RabbitMQ的TTL和DLX

然后我们给死信队列添加消费者如下:我们测试一下死信消息进入DLX的时间,先将之前的那个死信消息删除

package cn.wkp.rabbitmq.newest.dlx;
 
import java.io.IOException;
import java.util.Date;
 
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
 
import cn.wkp.rabbitmq.util.ConnectionUtil;
 
public class RecvDLX {
 
	public static void main(String[] argv) throws Exception {
		Connection connection = ConnectionUtil.getConnection();
		final Channel channel = connection.createChannel();
 
		channel.exchangeDeclare("dlx_exchange", "topic", true, false, null);
		channel.queueDeclare("dlx_queue", true, false, false, null);
		channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*");
 
		// 指该消费者在接收到队列里的消息但没有返回确认结果之前,它不会将新的消息分发给它。
		channel.basicQos(1);
 
		Consumer consumer = new DefaultConsumer(channel) {
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
					throws IOException {
				System.out.println("消费者收到消息:" + new String(body)+",当前时间:"+ConnectionUtil.formatDate(new Date()));
				// 消费者手动发送ack应答
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		System.out.println("消费死信队列中的消息======================");
		// 监听队列
		channel.basicConsume("dlx_queue", false, consumer);
	}
}

运行结果如下(先运行的死信队列消费者,然后运行生产者):我们看到消息过期后10毫秒就被死信队列的消费者消费到了,显然,消息成为死信后是立即被发送到了DLX中。

消费死信队列中的消息======================
消费者收到消息:测试死信消息,当前时间:2021-09-24 16:30:05:740

发送消息时间:2021-09-24 17:57:00:730

关于如何理解Java RabbitMQ的TTL和DLX就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. RabbitMQ实战:理解消息通信
  2. Java6种延时队列的实现方法

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java rabbitmq

上一篇:小内存大流量VPS的有效利用方法有哪些

下一篇:python中数据类型的示例分析

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》