Version: Next

死信队列

DLX —— Dead Letter Exchange,死信交换机,也有人称 死信邮箱

  • 当消息在一个队列中编程死信后,它能够被重新发送到另一个 交换机 中 —— DLX
  • 绑定 DLX 的队列称为 死信队列
  • 消息变成死信,可能由于以下原因:
    • 消息被拒绝
    • 消息过期
    • 队列达到最大长度

DLX 也是一个正常的交换机,和一般的交换机没有区别,他能够在任何的队列上被指定,实际上就是设置某一队列的属性

  • 当这个队列中存在死信时,RabbitMQ 就会自动的把这个消息重新发不到设置的 DLX,进而被路由到一个队列,即 死信队列
  • 想要使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange,指定交换机即可

实现

组件一览

  • 生产者:向交换机 ttl_direct_exchange 发送消息,携带 routingKeyttl,也即把消息发送给消费者 1
  • 消费者1 —— TTL 队列:一个 direct 模式的队列,设置了 ttl
    • 自身设置:
      • namettlQueue
      • Exchangettl_direct_exchange
      • routingKeyttl
    • 死信处理设置
      • x-dead-letter-exchange:死信要发往的路由器,设置为 dead_direct_exchange
      • x-dead-letter-routing-key:死信发送是使用的 Routing Key 设置为 dead
  • 消费者2 —— 死信队列:一个 direct 模式的队列,没有设置 ttl
    • 自身设置:
      • namedead.direct.queue
      • Exchangedead_direct_exchange
      • routingKeydead

生产者

// Dead letter Exchange Test
@Test
public void DeadLetterExchangeTest() {
String exchangeName = "ttl_direct_exchange"; // 去正常 ttl 队列的交换机
String routingKey = "ttl"; // 往正常 ttl 队列的 key
rabbitTemplate.convertAndSend(exchangeName, routingKey, "我是死信测试消息");
}

消费者

TTL 队列 & 死信队列

@Component
public class DlxDirectConsumer {
// 正常 TTL 队列
@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息
@QueueBinding(
value = @Queue(name = "ttlQueue",
arguments = {
@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Integer"),
@Argument(name = "x-dead-letter-exchange", value = "dead_direct_exchange"), // 死信找哪个交换机
@Argument(name = "x-dead-letter-routing-key", value = "dead") // 死信用哪个routingKey
}
),
exchange = @Exchange(value = "ttl_direct_exchange", type = ExchangeTypes.DIRECT),
key = "ttl"
)
})
public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("TTL Consumer => " + message);
// 确认消息
// channel.basicAck(deliveryTag, false);
}
// 死信队列
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "dead.direct.queue"),
exchange = @Exchange(value = "dead_direct_exchange", type = ExchangeTypes.DIRECT), // 声明死信交换机
key = "dead" // 设置死信 routingKey
)
})
public void receiveDeadMessage(@Payload String message) {
System.out.println("DLX Consumer => " + message);
// 确认消息
// channel.basicAck(deliveryTag, false);
}
}