Version: Next
Routing Direct 模型
Routing 路由订阅模式 —— Direct 直连
- 在
Fanout
模式中,一条消息,会被所有订阅的队列消费- 在某些场景下,希望不同的消息被不同的队列消费——使用
Direct
类型的Exchange
在 Direct 模型下:
- 队列与交换机的绑定,不能是任意绑定,而是需要制定一个
RoutingKey 路由Key
- 消息发送方在向
Exchange
发送消息时,也必须指定消息的Routing Key
Exchange
不再把消息发给每一个绑定的队列,而是根据消息的Routing Key
进行判断
- 只有队列的
RoutingKey
与消息的RoutingKey
完全一致,才会接收消息
流程及图解——一个日志系统
- 消费者1:希望只接收
error
信息并持久化到磁盘,通过其临时队列指定RoutingKey = "error"
实现- 消费者2:希望接收所有日志信息,打印到控制台,通过其临时队列指定
RoutingKey
为info
、error
、warning
实现- 生产者:向
Exchange
发送消息,发送消息时,会指定一个Routing Key
- 交换机:接收生产者的消息,然后把消息递交给与
Routing Key
完全匹配的队列
生产者实现
步骤
- 声明交换机
channel.exchangeDeclare(2个参数)
- 交换机名
- 类型:不再使用
fanout
,而是写direct
- 发送消息:指定
RoutingKey
生产者实现public class ProviderRoutingDirect {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 声明交换机,类型不再是 fanout,而是 directchannel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);// 发送消息 需要指定 RoutingKeyString targetRoutingKey = "info";channel.basicPublish("logs_direct",targetRoutingKey,null,("[info] 普通日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());// 关闭资源// 发送 error 日志targetRoutingKey = "error";channel.basicPublish("logs_direct",targetRoutingKey,null,("[error] 错误日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());// 关闭资源// 发送 warning 日志targetRoutingKey = "warning";channel.basicPublish("logs_direct",targetRoutingKey,null,("[warning] 警告日志 | RoutingKey: {" + targetRoutingKey + "}").getBytes());// 关闭资源RabbitMqUtils.closeConnectionAndChannel(connection, channel);}}
消费者实现
- 消费者 1
- 消费者 2
消费者1
public class ConsumerRoutingDirect1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机,类型为 direct
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);
// 声明临时队列
String queue = channel.queueDeclare().getQueue();
// 绑定临时队列到通道
// 指定 routingKey
String targetRoutingKey = "error";
channel.queueBind(queue, "logs_direct", targetRoutingKey);
// 接收消息
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 -> " + new String(body));
}
});
}
}
测试
发送三种日志消息,分别观察两个消费者
- 消费者1 只接收
error
日志- 消费者2 接收
info
、error
、warning
日志
消费者1 运行结果
消费者1 -> [error] 错误日志 | RoutingKey: {error}
消费者2 运行结果
消费者2 -> [info] 普通日志 | RoutingKey: {info}
消费者2 -> [error] 错误日志 | RoutingKey: {error}
消费者2 -> [warning] 警告日志 | RoutingKey: {warning}