Version: Next
HelloWorld 模型
Maven 依赖
RabbitMQ Java Client<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency>
创建虚拟主机 Virtual Host
使用 RabbitMQ 管理界面完成
- 登录 15672 端口,使用
guest
账户密码登录,按照以下步骤添加一个虚拟主机
添加用户
使用 RabbitMQ 管理界面完成
- 新创建的虚拟主机,只有
Guest
账户可以访问它- 应当创建一个新的
用户
,通过它来访问刚才创建的虚拟主机
用户绑定虚拟主机
- 把新创建的用户绑定到新创建的虚拟主机上
HelloWorld 代码实现
即 直连模式,没有
交换机
P
—— 生产者,要发送消息的程序C
—— 消费者,消息的接受者,会一直等待消息到来Queue
—— 消息队列,图中红色的部分。类似一个邮箱,可以缓存消息:生产者向队列中投递消息,消费者从其中取出消息
生产者实现
- 创建
RabbitMQ
连接工厂
, 与Server
建立连接,设置IP
地址与端口
- 设置要连接的
虚拟主机 Virtual Host
,设置能够访问该虚拟主机的用户账户
- 获取
连接对象
- 基于
通道
发送消息,因此要先创建通道
- 将
通道
绑定到对应的消息队列 Queue
,使用队列声明 queueDeclare(5个参数)
方法实现,仅声明
String queue
—— 队列名,不存在时会自动创建
boolean durable
—— 是否将队列磁盘持久化
true
—— 将消息队列持久化到磁盘,但其中未被消费的消息不会被持久化false
—— 在 RabbitMQ 重启时,队列消失,其中未被消费的消息也会消失boolean exclusive
—— 是否独占队列:是:当前队列只允许当前通道使用;否:队列可以被多个通道使用boolean autoDelete
—— 是否在消费完成后自动删除队列,在消费者连接关闭后才会触发自动删除Map arguments
—— 附加参数- 通过通道发布消息,
basicPulish(4个参数)
,真正发送消息到消息队列,当与声明不一致时,依然以这里发布的消息队列为准
String exchange
—— 交换机,直连模式下没有交换机,填""
String routingKey
—— 路由 key,写队列名称
BasicProperties basicProperties
—— 基础属性,填null
MessageProperties.PERSISTENT_TEXT_PLAIN
—— 设置消息持久化byte[] body
—— 消息体,字节数组- 关闭通道
- 关闭连接
生产者实现
/**
* 直连模式 生产者
*/
public class ProviderDirect {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建 RabbitMq 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接 rabbitMq 主机
connectionFactory.setHost("127.0.0.1");
// 设置端口号
connectionFactory.setPort(5672);
// 连接哪个虚拟主机
connectionFactory.setVirtualHost("/flute");
// 设置通过哪个账户访问这个虚拟主机
connectionFactory.setUsername("flute");
connectionFactory.setPassword("flute");
// 获取最终的链接对象
Connection connection = connectionFactory.newConnection();
// 连接中创建通道
Channel channel = connection.createChannel();
// 声明式:将通道与消息队列绑定
// 1. 队列名 2. 持久化 3. 独占 4. 自动珊瑚 5. 额外参数
channel.queueDeclare("hello", false, false, false, null);
// 通过通道发布消息
// 1. 交换机 2. 队列 3. 基础属性 4. 消息体字节数组
channel.basicPublish("", "hello", null, "Hello World RabbitMq".getBytes());
// 关闭通道
channel.close();
// 关闭连接
connection.close();
}
}
- 运行
3
次,通过管理界面查看,可以看到自动创建了消息队列 Queue
并且队列中有3
条消息
消费者实现
Server
->Virtual Host
->Channel 通道
- 通道绑定队列
- 消费消息
basicConsume(参数)
String queue
—— 队列名boolean autoAck
—— 是否开启消息自动确认机制(详见 Work 模型)Consumer callback
—— 消费消息时的回调接口
- 使用
new DefaultConsumer(Channel channel)
创建默认Consumer
,重写handleDelivery
方法,指定如何分发/消费消息,handleDeliver
方法的body
参数即消息体
消费者实现
/**
* 直连模式 消费者
*/
public class ConsumerDirect {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/flute");
connectionFactory.setUsername("flute");
connectionFactory.setPassword("flute");
Connection connection = connectionFactory.newConnection();
// 通道
Channel channel = connection.createChannel();
// 绑定队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
// 1. 队列 2. 开启消息自动确认机制 3. 消费消息时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body);
System.out.println("message = " + message);
}
});
// 关闭 通道 先不关,持续监听
// channel.close();
// 关闭 连接
// connection.close();
}
}
测试
- 执行消费者
运行结果message = Hello World RabbitMqmessage = Hello World RabbitMqmessage = Hello World RabbitMq
- 可以看到成功消费了队列中的消息,且一次消费完了所有消息
连接工具类封装
RabbitMqConnectionUtils
连接工具类封装/*** 连接 工具类*/public class RabbitMqUtils {private static ConnectionFactory connectionFactory;// 定义提供连接对象的方法static {connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/flute");connectionFactory.setUsername("flute");connectionFactory.setPassword("flute");}public static Connection getConnection() {try {return connectionFactory.newConnection();} catch (IOException | TimeoutException e) {e.printStackTrace();}return null;}public static void closeConnectionAndChannel(Connection connection, Channel channel) {try {if (channel != null) channel.close();if (connection != null) connection.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}}