Version: Next
发布订阅模型
Fanout
也称广播
- 生产者发布的消息,所有消费者都可以接收到
流程:
- 可以有多个消费者
- 每个消费者都有自己的
队列 Queue
- 每个队列都要绑定到
交换机 Exchange
- 生产者发送的消息,只能发送到交换机,交换机来决定消息发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息,实现一条消息被多个消费者消费
生产者实现
生产者不再与消息队列绑定,而是与交换机绑定
- 声明交换机:
channel.exchangeDeclare(String 交换机名, String 类型)
,类型必须是fanout
- 如果交换机不存在,RabbitMQ 会自动创建
- 通过通道发送消息:
basicPublish(String exchange, String routingKey, ..., ...)
- 在此模式下,
routingKey
没有实际含义,填""
即可Fanout 生产者实现public class ProviderFanout {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();// 将通道声明到交换机channel.exchangeDeclare("logs", "fanout"); // 广播// 发送消息channel.basicPublish("logs", "", null, "fanout type message".getBytes());RabbitMqUtils.closeConnectionAndChannel(connection, channel);}}
消费者实现
步骤
创建临时队列 ——
channel.queueDeclare().getQueue()
将临时队列绑定到交换机 ——
channel.queueBind(3个参数)
String queue
—— 临时队列名String exchange
—— 交换机名String routingKey
—— 在该模式下直接写""
处理消息 ——
channel.basicConsume()
- 消费者 1
- 消费者 2
- 消费者 3
消费者1
public class ConsumerFanout1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 创建临时队列
String tempQueue = channel.queueDeclare().getQueue();
// 降临时队列绑定到交换机
channel.queueBind(tempQueue, "logs", "");
// 消费消息
channel.basicConsume(tempQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Fanout Consumer 1 | Message from Provider -> " + new String(body));
}
});
}
}