Version: Next
死信队列
DLX—— Dead Letter Exchange,死信交换机,也有人称死信邮箱
- 当消息在一个队列中编程死信后,它能够被重新发送到另一个
交换机中 ——DLX- 绑定
DLX的队列称为死信队列- 消息变成死信,可能由于以下原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX 也是一个正常的交换机,和一般的交换机没有区别,他能够在任何的队列上被指定,实际上就是设置某一队列的属性
- 当这个队列中存在死信时,RabbitMQ 就会自动的把这个消息重新发不到设置的
DLX,进而被路由到一个队列,即死信队列- 想要使用死信队列,只需要在定义队列的时候设置队列参数
x-dead-letter-exchange,指定交换机即可
实现
组件一览
- 生产者:向交换机
ttl_direct_exchange发送消息,携带routingKey为ttl,也即把消息发送给消费者 1- 消费者1 —— TTL 队列:一个
direct模式的队列,设置了ttl,
- 自身设置:
name:ttlQueueExchange:ttl_direct_exchangeroutingKey:ttl- 死信处理设置:
x-dead-letter-exchange:死信要发往的路由器,设置为dead_direct_exchangex-dead-letter-routing-key:死信发送是使用的Routing Key设置为dead- 消费者2 —— 死信队列:一个
direct模式的队列,没有设置ttl
- 自身设置:
name:dead.direct.queueExchange:dead_direct_exchangeroutingKey:dead生产者
// Dead letter Exchange Test@Testpublic void DeadLetterExchangeTest() {String exchangeName = "ttl_direct_exchange"; // 去正常 ttl 队列的交换机String routingKey = "ttl"; // 往正常 ttl 队列的 keyrabbitTemplate.convertAndSend(exchangeName, routingKey, "我是死信测试消息");}消费者
TTL 队列 & 死信队列
@Componentpublic class DlxDirectConsumer {// 正常 TTL 队列@RabbitListener(ackMode = "MANUAL", bindings = { // 手动确认消息@QueueBinding(value = @Queue(name = "ttlQueue",arguments = {@Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Integer"),@Argument(name = "x-dead-letter-exchange", value = "dead_direct_exchange"), // 死信找哪个交换机@Argument(name = "x-dead-letter-routing-key", value = "dead") // 死信用哪个routingKey}),exchange = @Exchange(value = "ttl_direct_exchange", type = ExchangeTypes.DIRECT),key = "ttl")})public void receive1(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {System.out.println("TTL Consumer => " + message);// 确认消息// channel.basicAck(deliveryTag, false);}// 死信队列@RabbitListener(bindings = {@QueueBinding(value = @Queue(name = "dead.direct.queue"),exchange = @Exchange(value = "dead_direct_exchange", type = ExchangeTypes.DIRECT), // 声明死信交换机key = "dead" // 设置死信 routingKey)})public void receiveDeadMessage(@Payload String message) {System.out.println("DLX Consumer => " + message);// 确认消息// channel.basicAck(deliveryTag, false);}}
