Version: Next
死信队列
DLX
—— Dead Letter Exchange,死信交换机
,也有人称死信邮箱
- 当消息在一个队列中编程死信后,它能够被重新发送到另一个
交换机
中 ——DLX
- 绑定
DLX
的队列称为死信队列
- 消息变成死信,可能由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX 也是一个正常的交换机,和一般的交换机没有区别,他能够在任何的队列上被指定,实际上就是设置某一队列的属性
- 当这个队列中存在死信时,RabbitMQ 就会自动的把这个消息重新发不到设置的
DLX
,进而被路由到一个队列,即死信队列
- 想要使用死信队列,只需要在定义队列的时候设置队列参数
x-dead-letter-exchange
,指定交换机即可
实现
组件一览
- 生产者:向交换机
ttl_direct_exchange
发送消息,携带routingKey
为ttl
,也即把消息发送给消费者 1- 消费者1 —— TTL 队列:一个
direct
模式的队列,设置了ttl
,
- 自身设置:
name
:ttlQueue
Exchange
:ttl_direct_exchange
routingKey
:ttl
- 死信处理设置:
x-dead-letter-exchange
:死信要发往的路由器,设置为dead_direct_exchange
x-dead-letter-routing-key
:死信发送是使用的Routing Key
设置为dead
- 消费者2 —— 死信队列:一个
direct
模式的队列,没有设置ttl
- 自身设置:
name
:dead.direct.queue
Exchange
:dead_direct_exchange
routingKey
:dead
生产者
// Dead letter Exchange Test@Testpublic void DeadLetterExchangeTest() {String exchangeName = "ttl_direct_exchange"; // 去正常 ttl 队列的交换机String routingKey = "ttl"; // 往正常 ttl 队列的 keyrabbitTemplate.convertAndSend(exchangeName, routingKey, "我是死信测试消息");}消费者
TTL 队列 & 死信队列
@Componentpublic class DlxDirectConsumer {// 正常 TTL 队列@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息@QueueBinding(value = @Queue(name = "ttlQueue",arguments = {@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Integer"),@Argument(name = "x-dead-letter-exchange", value = "dead_direct_exchange"), // 死信找哪个交换机@Argument(name = "x-dead-letter-routing-key", value = "dead") // 死信用哪个routingKey}),exchange = @Exchange(value = "ttl_direct_exchange", type = ExchangeTypes.DIRECT),key = "ttl")})public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println("TTL Consumer => " + message);// 确认消息// channel.basicAck(deliveryTag, false);}// 死信队列@RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "dead.direct.queue"),exchange = @Exchange(value = "dead_direct_exchange", type = ExchangeTypes.DIRECT), // 声明死信交换机key = "dead" // 设置死信 routingKey)})public void receiveDeadMessage(@Payload String message) {System.out.println("DLX Consumer => " + message);// 确认消息// channel.basicAck(deliveryTag, false);}}