Version: Next

Routing Direct 模型

Routing 路由订阅模式 —— Direct 直连

  • Fanout 模式中,一条消息,会被所有订阅的队列消费
  • 在某些场景下,希望不同的消息被不同的队列消费——使用 Direct 类型的 Exchange

在 Direct 模型下

  • 队列与交换机的绑定,不能是任意绑定,而是需要制定一个 RoutingKey 路由Key
  • 消息发送方在向 Exchange 发送消息时,也必须指定消息的 Routing Key
  • Exchange 不再把消息发给每一个绑定的队列,而是根据消息的 Routing Key 进行判断
    • 只有队列的 RoutingKey 与消息的 RoutingKey 完全一致,才会接收消息

流程及图解——一个日志系统

  • 消费者1:希望只接收 error 信息并持久化到磁盘,通过其临时队列指定 RoutingKey = "error" 实现
  • 消费者2:希望接收所有日志信息,打印到控制台,通过其临时队列指定 RoutingKeyinfoerrorwarning 实现
  • 生产者:向 Exchange 发送消息,发送消息时,会指定一个 Routing Key
  • 交换机:接收生产者的消息,然后把消息递交给与 Routing Key 完全匹配的队列

生产者实现

步骤

  • 声明交换机 channel.exchangeDeclare(2个参数)
    • 交换机名
    • 类型:不再使用 fanout,而是写 direct
  • 发送消息:指定 RoutingKey
生产者实现
public class ProviderRoutingDirect {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机,类型不再是 fanout,而是 direct
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);
// 发送消息 需要指定 RoutingKey
String targetRoutingKey = "info";
channel.basicPublish("logs_direct",
targetRoutingKey,
null,
("[info] 普通日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());
// 关闭资源
// 发送 error 日志
targetRoutingKey = "error";
channel.basicPublish("logs_direct",
targetRoutingKey,
null,
("[error] 错误日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());
// 关闭资源
// 发送 warning 日志
targetRoutingKey = "warning";
channel.basicPublish("logs_direct",
targetRoutingKey,
null,
("[warning] 警告日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());
// 关闭资源
RabbitMqUtils.closeConnectionAndChannel(connection, channel);
}
}

消费者实现

消费者1
public class ConsumerRoutingDirect1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机,类型为 direct
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);
// 声明临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列到通道
// 指定 routingKey
String targetRoutingKey = "error";
channel.queueBind(queue, "logs_direct", targetRoutingKey);
// 接收消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 -> " + new String(body));
}
});
}
}

测试

发送三种日志消息,分别观察两个消费者

  • 消费者1 只接收 error 日志
  • 消费者2 接收 infoerrorwarning 日志
消费者1 运行结果
消费者1 -> [error] 错误日志 | RoutingKey: {error}
消费者2 运行结果
消费者2 -> [info] 普通日志 | RoutingKey: {info}
消费者2 -> [error] 错误日志 | RoutingKey: {error}
消费者2 -> [warning] 警告日志 | RoutingKey: {warning}