Version: Next

过期时间 TTL

过期时间 TTL 表示可以对消息设置预期时间

  • 在这个时间内都可以被消费者接受获取
  • 超时后消息被自动删除
  • RabbitMQ 可以对 消息和队列 设置 TTL,有两种方法
    1. 通过队列属性设置,队列中所有消息都有相同的过期时间
    2. 通过消息属性设置,每条消息的 TTL 可以不同
    3. 同时设置:以值较 的 TTL 为准
  • 一旦消息生存时间达到 TTL,就被称为 Dead Message 死信(死亡信息),被投递到 死信队列 ,消费者无法再接收到该消息

设置队列 TTL

以先前 Topic 代码为例,修改消费者中对 Queue 的声明

  • 使用 @Arugument(name, valie, type) 添加设置
    • namex-message-ttl 名称可以在 RabbitMQ 管理界面查到
    • value:"5000" 在这个位置上只能接收 String 类型,在 type 中再指定 value 的类型,ttl 默认以 毫秒 为单位
    • typevalue 中填的东西的类型
  • 为了观察消息过期,设置队列消息确认模式为 手动
修改后的 topic 消费者方法,设置为手动确认,5秒过期,先不确认
@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息
@QueueBinding(
value = @Queue(name = "ttlQueue",
arguments = @Argument(
name = "x-message-ttl", // ttl 时间
value = "5000", // 5秒
type = "java.lang.Integer" // 5000是Integer类型
)
),
exchange = @Exchange(value = "topics", type = ExchangeTypes.TOPIC),
key = {"user.*"}
)
})
public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("Routing Topic Consumer 1 => " + message);
// 确认消息
// channel.basicAck(deliveryTag, false);
}

执行消息生产者,在管理界面中观察这个队列

  • 生产者向队列中存了一条消息,由于消费者没有手动确认消息,所以它不会被消费
  • 5 秒后,这条消息依然没有被消费,于是它成了 死信,从队列中被移除了

设置消息 TTL

在生产者端,添加 MessagePostProcessor 接口的实现类,它是一个函数式接口

//Routing Topic model
@Test
public void RoutingTopicTest() {
String routingKey = "user.save";
// 消息发布处理器
MessagePostProcessor messagePostProcessor = (message) -> {
message.getMessageProperties().setExpiration("10000"); // 消息过期时间
message.getMessageProperties().setContentEncoding("UTF-8"); // 消息编码
return message;
};
// 1.exchange 2. routingKey 3. message object 4. MessagePostProcessor
rabbitTemplate.convertAndSend("topics",
routingKey,
"Routing Topic model message | RoutingKey [" + routingKey + "]",
messagePostProcessor
);
}

消费者:设置 手动确认,但是不确认,采用 普通队列

@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息
@QueueBinding(
value = @Queue(name = "ttlMessage"
/* arguments = @Argument(
name = "x-message-ttl", // ttl 时间
value = "5000", // 5秒
type = "java.lang.Integer" // 5000是Integer类型
)*/
),
exchange = @Exchange(value = "topics", type = ExchangeTypes.TOPIC),
key = {"user.*"}
)
})
public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("Routing Topic Consumer 1 => " + message);
// 确认消息
// channel.basicAck(deliveryTag, false);
}

测试

警告

测试前删除所有当前队列,否则可能出毛病

  • 队列上没有 TTL 标识
  • 可以观察到一样的效果,消息在指定时间后自动死亡

二者区别

  • 队列 TTL:整个队列里的消息都有一样的 TTL,消息死亡后,被移动到 死信队列
  • 消息 TTL:到期后消息 直接移除