Version: Next
Routing Direct 模型
Routing 路由订阅模式 —— Direct 直连
- 在
Fanout模式中,一条消息,会被所有订阅的队列消费- 在某些场景下,希望不同的消息被不同的队列消费——使用
Direct类型的Exchange在 Direct 模型下:
- 队列与交换机的绑定,不能是任意绑定,而是需要制定一个
RoutingKey 路由Key- 消息发送方在向
Exchange发送消息时,也必须指定消息的Routing KeyExchange不再把消息发给每一个绑定的队列,而是根据消息的Routing Key进行判断
- 只有队列的
RoutingKey与消息的RoutingKey完全一致,才会接收消息

流程及图解——一个日志系统
- 消费者1:希望只接收
error信息并持久化到磁盘,通过其临时队列指定RoutingKey = "error"实现- 消费者2:希望接收所有日志信息,打印到控制台,通过其临时队列指定
RoutingKey为info、error、warning实现- 生产者:向
Exchange发送消息,发送消息时,会指定一个Routing Key- 交换机:接收生产者的消息,然后把消息递交给与
Routing Key完全匹配的队列
生产者实现
步骤
- 声明交换机
channel.exchangeDeclare(2个参数)
- 交换机名
- 类型:不再使用
fanout,而是写direct- 发送消息:指定
RoutingKey生产者实现public class ProviderRoutingDirect {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 声明交换机,类型不再是 fanout,而是 directchannel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);// 发送消息 需要指定 RoutingKeyString targetRoutingKey = "info";channel.basicPublish("logs_direct",targetRoutingKey,null,("[info] 普通日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());// 关闭资源// 发送 error 日志targetRoutingKey = "error";channel.basicPublish("logs_direct",targetRoutingKey,null,("[error] 错误日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());// 关闭资源// 发送 warning 日志targetRoutingKey = "warning";channel.basicPublish("logs_direct",targetRoutingKey,null,("[warning] 警告日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());// 关闭资源RabbitMqUtils.closeConnectionAndChannel(connection, channel);}}
消费者实现
- 消费者 1
- 消费者 2
消费者1
public class ConsumerRoutingDirect1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机,类型为 direct
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);
// 声明临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列到通道
// 指定 routingKey
String targetRoutingKey = "error";
channel.queueBind(queue, "logs_direct", targetRoutingKey);
// 接收消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 -> " + new String(body));
}
});
}
}
测试
发送三种日志消息,分别观察两个消费者
- 消费者1 只接收
error日志- 消费者2 接收
info、error、warning日志
消费者1 运行结果
消费者1 -> [error] 错误日志 | RoutingKey: {error}
消费者2 运行结果
消费者2 -> [info] 普通日志 | RoutingKey: {info}
消费者2 -> [error] 错误日志 | RoutingKey: {error}
消费者2 -> [warning] 警告日志 | RoutingKey: {warning}