Version: Next
SpringBoot 整合 RabbitMQ
- maven 依赖
- application.yaml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
通过 Spring 容器获取
RabbitTemplate
即可
Hello Wolrd
消费者
- 交给 Spring 容器管理
@RabbitListener
注解,表明这是一个消费者,queuesToDeclare
声明一个队列,@Queue
指定队列的各种属性,默认属性为队列名称
,还可以设置持久化
、自动删除
等@RabbitHandler
指明消费消息的方法,之后参数列表可以直接接受到消息
@Component
@RabbitListener(queuesToDeclare = @Queue("hello")) // 声明队列
public class ConsumerHello {
@RabbitHandler // 获取消息
public void receive(String message) {
System.out.println("message = " + message);
}
}
生产者
- 使用
RabbitTemplate
来发送消息 - 在 SpringBootTest 中进行测试
@SpringBootTest(classes = MainClass.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
@Autowired
RabbitTemplate rabbitTemplate;
//hello world model
@Test
public void helloWorldTest() {
// 1. routingKey 2. message Object
rabbitTemplate.convertAndSend("hello","hello world message");
}
}
提示
使用 JUnit 写的生产者代码,不会直接创建队列、交换机,而是在执行消费者代码时才创建
使用 main 方法写的生产者,会直接创建队列、交换机
Work 模型
@RabbitListener
既可以用在类上,也可以用在方法上
生产者
//work model
@Test
public void workTest() {
// 1. routingKey 2. message Object
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("work",
"work model message | index [" + i + "]");
}
消费者
设定 2 个消费者,接收同一个队列名
- 在一个类中,通过
@RabbitListener
注解注释两个方法实现
@Component
public class ConsumerWork {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("Work Consumer 1 => " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("Work Consumer 2 => " + message);
}
}
运行结果
Work Consumer 1 => work model message | index [1]
Work Consumer 2 => work model message | index [0]
Work Consumer 1 => work model message | index [3]
Work Consumer 2 => work model message | index [2]
Work Consumer 1 => work model message | index [5]
Work Consumer 2 => work model message | index [4]
Work Consumer 1 => work model message | index [7]
Work Consumer 2 => work model message | index [6]
Work Consumer 1 => work model message | index [9]
Work Consumer 2 => work model message | index [8]
人为排序一下
人为排序
Work Consumer 2 => work model message | index [0]
Work Consumer 1 => work model message | index [1]
Work Consumer 2 => work model message | index [2]
Work Consumer 1 => work model message | index [3]
Work Consumer 2 => work model message | index [4]
Work Consumer 1 => work model message | index [5]
Work Consumer 2 => work model message | index [6]
Work Consumer 1 => work model message | index [7]
Work Consumer 2 => work model message | index [8]
Work Consumer 1 => work model message | index [9]
Fanout 模型
生产者
往
logs
交换机中发送消息,因此先删除之前 名为 logs 的交换机
- 使用的方法发生了重载
//fanout model
@Test
public void fanoutTest() {
// 1. exchange 2. routingKey 3. message object
for (int i = 0; i < 10; i++)
rabbitTemplate.convertAndSend("logs",
"",
"Fanout model message | index [" + i + "]");
}
消费者
@RabbitListener
注解中不再使用queuesToDeclare
而是使用bindings
它接收一个QueueBingding[]
数组
- 其中写
@QueueBingding
注解,表示绑定交换机与队列
- 其中
value
属性指定队列,使用@Queue
声明一个队列,此处@Queue
不指定名称,表示生成一个临时队列- 其中
exchange
属性指定交换机,使用@Exchange(value, type)
声明交换机
@Component
public class ConsumerFanout {
// 消费者1
@RabbitListener(bindings = {
@QueueBinding( // 绑定队列与交换机
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = ExchangeTypes.FANOUT)
)
})
public void receive1(String message) {
System.out.println("Fanout Consumer 1 => " + message);
}
// 消费者2
@RabbitListener(bindings = {
@QueueBinding( // 绑定队列与交换机
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = ExchangeTypes.FANOUT)
)
})
public void receive2(String message) {
System.out.println("Fanout Consumer 2 => " + message);
}
}
运行结果
Fanout Consumer 2 => Fanout model message | index [0]
Fanout Consumer 1 => Fanout model message | index [0]
Fanout Consumer 1 => Fanout model message | index [1]
Fanout Consumer 1 => Fanout model message | index [2]
Fanout Consumer 1 => Fanout model message | index [3]
Fanout Consumer 1 => Fanout model message | index [4]
Fanout Consumer 1 => Fanout model message | index [5]
Fanout Consumer 1 => Fanout model message | index [6]
Fanout Consumer 1 => Fanout model message | index [7]
Fanout Consumer 1 => Fanout model message | index [8]
Fanout Consumer 1 => Fanout model message | index [9]
-------------------------------------------------------
Fanout Consumer 2 => Fanout model message | index [1]
Fanout Consumer 2 => Fanout model message | index [2]
Fanout Consumer 2 => Fanout model message | index [3]
Fanout Consumer 2 => Fanout model message | index [4]
Fanout Consumer 2 => Fanout model message | index [5]
Fanout Consumer 2 => Fanout model message | index [6]
Fanout Consumer 2 => Fanout model message | index [7]
Fanout Consumer 2 => Fanout model message | index [8]
Fanout Consumer 2 => Fanout model message | index [9]
RoutingDirect 模式
在 Fanout 的基础上,多了
RoutingKey
- 在消费者注解中,添加
key
属性即可,它就收一个String[]
生产者
//Routing Direct model
@Test
public void RoutingDirectTest() {
String routingKey = "info";
// 1. exchange 2. routingKey 3. message object
for (int i = 0; i < 5; i++)
rabbitTemplate.convertAndSend("directs",
routingKey,
"Routing Direct model message | RoutingKey [" + routingKey + "]");
// 更改路由
routingKey = "error";
for (int i = 0; i < 5; i++)
rabbitTemplate.convertAndSend("directs",
routingKey,
"Routing Direct model message | RoutingKey [" + routingKey + "]");
}
消费者
@Component
public class ConsumerRoutingDirect {
// 接收所有日志
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
key = {"info", "error"}
)
})
public void receive1(String message) {
System.out.println("Routing Direct Consumer 1 => " + message);
}
// 只接收错误日志
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs", type = ExchangeTypes.DIRECT),
key = {"error"}
)
})
public void receive2(String message) {
System.out.println("Routing Direct Consumer 2 => " + message);
}
}
运行结果
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
Routing Direct Consumer 1 => Routing Direct model message | RoutingKey [info]
---------------------------------------------------------------------------------
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
Routing Direct Consumer 2 => Routing Direct model message | RoutingKey [error]
RoutingTopic 模式
生产者
//Routing Topic model
@Test
public void RoutingTopicTest() {
String routingKey = "user.save";
// 1. exchange 2. routingKey 3. message object
rabbitTemplate.convertAndSend("topics",
routingKey,
"Routing Topic model message | RoutingKey [" + routingKey + "]");
// 更改路由
routingKey = "user.save.1";
rabbitTemplate.convertAndSend("topics",
routingKey,
"Routing Topic model message | RoutingKey [" + routingKey + "]");
}
消费者
@Component
public class ConsumerTopic {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topics",type = ExchangeTypes.TOPIC),
key = {"user.*"}
)
})
public void receive1(String message) {
System.out.println("Routing Topic Consumer 1 => " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topics",type = ExchangeTypes.TOPIC),
key = {"user.#"}
)
})
public void receive2(String message) {
System.out.println("Routing Topic Consumer 2 => " + message);
}
}
运行结果
Routing Topic Consumer 1 => Routing Topic model message | RoutingKey [user.save]
--------------------------------------------------------------------------------------
Routing Topic Consumer 2 => Routing Topic model message | RoutingKey [user.save]
Routing Topic Consumer 2 => Routing Topic model message | RoutingKey [user.save.1]