Version: Next
过期时间 TTL
过期时间 TTL 表示可以对消息设置预期时间
- 在这个时间内都可以被消费者接受获取
- 超时后消息被自动删除
- RabbitMQ 可以对 消息和队列 设置 TTL,有两种方法
- 通过队列属性设置,队列中所有消息都有相同的过期时间
- 通过消息属性设置,每条消息的 TTL 可以不同
- 同时设置:以值较
小
的 TTL 为准- 一旦消息生存时间达到 TTL,就被称为
Dead Message 死信(死亡信息)
,被投递到死信队列
,消费者无法再接收到该消息
设置队列 TTL
以先前
Topic
代码为例,修改消费者中对Queue
的声明
- 使用
@Arugument(name, valie, type)
添加设置
name
:x-message-ttl
名称可以在 RabbitMQ 管理界面查到value
:"5000" 在这个位置上只能接收 String 类型,在type
中再指定 value 的类型,ttl 默认以毫秒
为单位type
:value
中填的东西的类型- 为了观察消息过期,设置队列消息确认模式为
手动
修改后的 topic 消费者方法,设置为手动确认,5秒过期,先不确认
@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息
@QueueBinding(
value = @Queue(name = "ttlQueue",
arguments = @Argument(
name = "x-message-ttl", // ttl 时间
value = "5000", // 5秒
type = "java.lang.Integer" // 5000是Integer类型
)
),
exchange = @Exchange(value = "topics", type = ExchangeTypes.TOPIC),
key = {"user.*"}
)
})
public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("Routing Topic Consumer 1 => " + message);
// 确认消息
// channel.basicAck(deliveryTag, false);
}
执行消息生产者,在管理界面中观察这个队列
- 生产者向队列中存了一条消息,由于消费者没有手动确认消息,所以它不会被消费
- 在
5
秒后,这条消息依然没有被消费,于是它成了死信
,从队列中被移除了
设置消息 TTL
在生产者端,添加
MessagePostProcessor
接口的实现类,它是一个函数式接口
//Routing Topic model
@Test
public void RoutingTopicTest() {
String routingKey = "user.save";
// 消息发布处理器
MessagePostProcessor messagePostProcessor = (message) -> {
message.getMessageProperties().setExpiration("10000"); // 消息过期时间
message.getMessageProperties().setContentEncoding("UTF-8"); // 消息编码
return message;
};
// 1.exchange 2. routingKey 3. message object 4. MessagePostProcessor
rabbitTemplate.convertAndSend("topics",
routingKey,
"Routing Topic model message | RoutingKey [" + routingKey + "]",
messagePostProcessor
);
}
消费者:设置
手动确认
,但是不确认,采用普通队列
@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息
@QueueBinding(
value = @Queue(name = "ttlMessage"
/* arguments = @Argument(
name = "x-message-ttl", // ttl 时间
value = "5000", // 5秒
type = "java.lang.Integer" // 5000是Integer类型
)*/
),
exchange = @Exchange(value = "topics", type = ExchangeTypes.TOPIC),
key = {"user.*"}
)
})
public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("Routing Topic Consumer 1 => " + message);
// 确认消息
// channel.basicAck(deliveryTag, false);
}
测试
警告
测试前删除所有当前队列,否则可能出毛病
- 队列上没有
TTL
标识- 可以观察到一样的效果,消息在指定时间后自动死亡
二者区别
- 队列 TTL:整个队列里的消息都有一样的 TTL,消息死亡后,被移动到
死信队列
- 消息 TTL:到期后消息
直接移除
了