Version: Next

SpringBoot 整合 RabbitMQ

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>

通过 Spring 容器获取 RabbitTemplate 即可


Hello Wolrd

消费者

  • 交给 Spring 容器管理
  • @RabbitListener 注解,表明这是一个消费者,queuesToDeclare 声明一个队列,@Queue 指定队列的各种属性,默认属性为 队列名称,还可以设置 持久化自动删除
  • @RabbitHandler 指明消费消息的方法,之后参数列表可以直接接受到消息
@Component
@RabbitListener(queuesToDeclare = @Queue("hello")) // 声明队列
public class ConsumerHello {
@RabbitHandler // 获取消息
public void receive(String message) {
System.out.println("message = " + message);
}
}

生产者

  • 使用 RabbitTemplate 来发送消息
  • 在 SpringBootTest 中进行测试
@SpringBootTest(classes = MainClass.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
@Autowired
RabbitTemplate rabbitTemplate;
//hello world model
@Test
public void helloWorldTest() {
// 1. routingKey 2. message Object
rabbitTemplate.convertAndSend("hello","hello world message");
}
}
提示
  • 使用 JUnit 写的生产者代码,不会直接创建队列、交换机,而是在执行消费者代码时才创建

  • 使用 main 方法写的生产者,会直接创建队列、交换机


Work 模型

@RabbitListener 既可以用在类上,也可以用在方法上

生产者

//work model
@Test
public void workTest() {
// 1. routingKey 2. message Object
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("work",
"work model message | index [" + i + "]");
}

消费者

设定 2 个消费者,接收同一个队列名

  • 在一个类中,通过 @RabbitListener 注解注释两个方法实现
@Component
public class ConsumerWork {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("Work Consumer 1 => " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("Work Consumer 2 => " + message);
}
}
运行结果
Work Consumer 1 => work model message | index [1]
Work Consumer 2 => work model message | index [0]
Work Consumer 1 => work model message | index [3]
Work Consumer 2 => work model message | index [2]
Work Consumer 1 => work model message | index [5]
Work Consumer 2 => work model message | index [4]
Work Consumer 1 => work model message | index [7]
Work Consumer 2 => work model message | index [6]
Work Consumer 1 => work model message | index [9]
Work Consumer 2 => work model message | index [8]

人为排序一下

人为排序
Work Consumer 2 => work model message | index [0]
Work Consumer 1 => work model message | index [1]
Work Consumer 2 => work model message | index [2]
Work Consumer 1 => work model message | index [3]
Work Consumer 2 => work model message | index [4]
Work Consumer 1 => work model message | index [5]
Work Consumer 2 => work model message | index [6]
Work Consumer 1 => work model message | index [7]
Work Consumer 2 => work model message | index [8]
Work Consumer 1 => work model message | index [9]

Fanout 模型

生产者

logs 交换机中发送消息,因此先删除之前 名为 logs 的交换机

  • 使用的方法发生了重载
//fanout model
@Test
public void fanoutTest() {
// 1. exchange 2. routingKey 3. message object
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("logs",
"",
"Fanout model message | index [" + i + "]");
}

消费者

  • @RabbitListener 注解中不再使用 queuesToDeclare 而是使用 bindings 它接收一个 QueueBingding[] 数组
    • 其中写 @QueueBingding 注解,表示绑定交换机与队列
      • 其中 value 属性指定队列,使用 @Queue 声明一个队列,此处@Queue 不指定名称,表示生成一个临时队列
      • 其中 exchange 属性指定交换机,使用 @Exchange(value, type) 声明交换机
@Component
public class ConsumerFanout {
// 消费者1
@RabbitListener(bindings = {
@QueueBinding( // 绑定队列与交换机
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = ExchangeTypes.FANOUT)
)
})
public void receive1(String message) {
System.out.println("Fanout Consumer 1 => " + message);
}
// 消费者2
@RabbitListener(bindings = {
@QueueBinding( // 绑定队列与交换机
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = ExchangeTypes.FANOUT)
)
})
public void receive2(String message) {
System.out.println("Fanout Consumer 2 => " + message);
}
}
运行结果
Fanout Consumer 2 => Fanout model message | index [0]
Fanout Consumer 1 => Fanout model message | index [0]
Fanout Consumer 1 => Fanout model message | index [1]
Fanout Consumer 1 => Fanout model message | index [2]
Fanout Consumer 1 => Fanout model message | index [3]
Fanout Consumer 1 => Fanout model message | index [4]
Fanout Consumer 1 => Fanout model message | index [5]
Fanout Consumer 1 => Fanout model message | index [6]
Fanout Consumer 1 => Fanout model message | index [7]
Fanout Consumer 1 => Fanout model message | index [8]
Fanout Consumer 1 => Fanout model message | index [9]
-------------------------------------------------------
Fanout Consumer 2 => Fanout model message | index [1]
Fanout Consumer 2 => Fanout model message | index [2]
Fanout Consumer 2 => Fanout model message | index [3]
Fanout Consumer 2 => Fanout model message | index [4]
Fanout Consumer 2 => Fanout model message | index [5]
Fanout Consumer 2 => Fanout model message | index [6]
Fanout Consumer 2 => Fanout model message | index [7]
Fanout Consumer 2 => Fanout model message | index [8]
Fanout Consumer 2 => Fanout model message | index [9]

RoutingDirect 模式

在 Fanout 的基础上,多了 RoutingKey

  • 在消费者注解中,添加 key 属性即可,它就收一个 String[]

生产者

//Routing Direct model
@Test
public void RoutingDirectTest() {
String routingKey = "info";
// 1. exchange 2. routingKey 3. message object
for (int i = 0; i < 5; i++)
rabbitTemplate.convertAndSend("directs",
routingKey,
"Routing Direct model message | RoutingKey [" + routingKey + "]");
// 更改路由
routingKey = "error";
for (int i = 0; i < 5; i++)
rabbitTemplate.convertAndSend("directs",
routingKey,
"Routing Direct model message | RoutingKey [" + routingKey + "]");
}

消费者

@Component
public class ConsumerRoutingDirect {
// 接收所有日志
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
key = {"info", "error"}
)
})
public void receive1(String message) {
System.out.println("Routing Direct Consumer 1 => " + message);
}
// 只接收错误日志
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
key = {"error"}
)
})
public void receive2(String message) {
System.out.println("Routing Direct Consumer 2 => " + message);
}
}
运行结果
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
---------------------------------------------------------------------------------
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]

RoutingTopic 模式

生产者

//Routing Topic model
@Test
public void RoutingTopicTest() {
String routingKey = "user.save";
// 1. exchange 2. routingKey 3. message object
rabbitTemplate.convertAndSend("topics",
routingKey,
"Routing Topic model message | RoutingKey [" + routingKey + "]");
// 更改路由
routingKey = "user.save.1";
rabbitTemplate.convertAndSend("topics",
routingKey,
"Routing Topic model message | RoutingKey [" + routingKey + "]");
}

消费者

@Component
public class ConsumerTopic {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topics",type = ExchangeTypes.TOPIC),
key = {"user.*"}
)
})
public void receive1(String message) {
System.out.println("Routing Topic Consumer 1 => " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topics",type = ExchangeTypes.TOPIC),
key = {"user.#"}
)
})
public void receive2(String message) {
System.out.println("Routing Topic Consumer 2 => " + message);
}
}
运行结果
Routing Topic Consumer 1 => Routing Topic model message | RoutingKey [user.save]
--------------------------------------------------------------------------------------
Routing Topic Consumer 2 => Routing Topic model message | RoutingKey [user.save]
Routing Topic Consumer 2 => Routing Topic model message | RoutingKey [user.save.1]