Version: Next

发布订阅模型

Fanout 也称 广播

  • 生产者发布的消息,所有消费者都可以接收到


流程:

  • 可以有多个消费者
  • 每个消费者都有自己的 队列 Queue
  • 每个队列都要绑定到 交换机 Exchange
  • 生产者发送的消息,只能发送到交换机,交换机来决定消息发给哪个队列,生产者无法决定
  • 交换机把消息发送给绑定过的所有队列
  • 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

生产者实现

生产者不再与消息队列绑定,而是与交换机绑定

  • 声明交换机channel.exchangeDeclare(String 交换机名, String 类型),类型必须是 fanout
    • 如果交换机不存在,RabbitMQ 会自动创建
  • 通过通道发送消息basicPublish(String exchange, String routingKey, ..., ...)
    • 在此模式下,routingKey 没有实际含义,填 "" 即可
Fanout 生产者实现
public class ProviderFanout {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 将通道声明到交换机
channel.exchangeDeclare("logs", "fanout"); // 广播
// 发送消息
channel.basicPublish("logs", "", null, "fanout type message".getBytes());
RabbitMqUtils.closeConnectionAndChannel(connection, channel);
}
}

消费者实现

步骤

  • 创建临时队列 —— channel.queueDeclare().getQueue()

  • 将临时队列绑定到交换机 —— channel.queueBind(3个参数)

    • String queue —— 临时队列名
    • String exchange —— 交换机名
    • String routingKey —— 在该模式下直接写 ""
  • 处理消息 —— channel.basicConsume()

消费者1
public class ConsumerFanout1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 创建临时队列
String tempQueue = channel.queueDeclare().getQueue();
// 降临时队列绑定到交换机
channel.queueBind(tempQueue, "logs", "");
// 消费消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Fanout Consumer 1 | Message from Provider -> " + new String(body));
}
});
}
}