Version: Next

HelloWorld 模型

Maven 依赖

RabbitMQ Java Client
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>

创建虚拟主机 Virtual Host

使用 RabbitMQ 管理界面完成

  • 登录 15672 端口,使用 guest 账户密码登录,按照以下步骤添加一个虚拟主机



添加用户

使用 RabbitMQ 管理界面完成

  • 新创建的虚拟主机,只有 Guest 账户可以访问它
  • 应当创建一个新的 用户,通过它来访问刚才创建的虚拟主机



用户绑定虚拟主机

  • 把新创建的用户绑定到新创建的虚拟主机上




HelloWorld 代码实现

直连模式没有 交换机



  • P —— 生产者,要发送消息的程序
  • C —— 消费者,消息的接受者,会一直等待消息到来
  • Queue —— 消息队列,图中红色的部分。类似一个邮箱,可以缓存消息:生产者向队列中投递消息,消费者从其中取出消息

生产者实现

  1. 创建 RabbitMQ 连接 工厂, 与 Server 建立连接,设置 IP 地址与 端口
  2. 设置要连接的 虚拟主机 Virtual Host,设置能够访问该虚拟主机的 用户账户
  3. 获取 连接对象
  4. 基于 通道 发送消息,因此要先 创建通道
  5. 通道 绑定到对应的 消息队列 Queue,使用 队列声明 queueDeclare(5个参数) 方法实现,仅声明
    1. String queue —— 队列名,不存在时会 自动创建
    2. boolean durable —— 是否将队列磁盘持久化
      • true —— 将消息队列持久化到磁盘,但其中未被消费的消息不会被持久化
      • false —— 在 RabbitMQ 重启时,队列消失,其中未被消费的消息也会消失
    3. boolean exclusive —— 是否独占队列:是:当前队列只允许当前通道使用;否:队列可以被多个通道使用
    4. boolean autoDelete —— 是否在消费完成后自动删除队列,在消费者连接关闭后才会触发自动删除
    5. Map arguments —— 附加参数
  6. 通过通道发布消息,basicPulish(4个参数)真正发送消息到消息队列,当与声明不一致时,依然以这里发布的消息队列为准
    1. String exchange —— 交换机,直连模式下没有交换机,填 ""
    2. String routingKey —— 路由 key,写 队列名称
    3. BasicProperties basicProperties —— 基础属性,填 null
      • MessageProperties.PERSISTENT_TEXT_PLAIN —— 设置消息持久化
    4. byte[] body —— 消息体,字节数组
  7. 关闭通道
  8. 关闭连接
生产者实现
/**
* 直连模式 生产者
*/
public class ProviderDirect {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建 RabbitMq 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接 rabbitMq 主机
connectionFactory.setHost("127.0.0.1");
// 设置端口号
connectionFactory.setPort(5672);
// 连接哪个虚拟主机
connectionFactory.setVirtualHost("/flute");
// 设置通过哪个账户访问这个虚拟主机
connectionFactory.setUsername("flute");
connectionFactory.setPassword("flute");
// 获取最终的链接对象
Connection connection = connectionFactory.newConnection();
// 连接中创建通道
Channel channel = connection.createChannel();
// 声明式:将通道与消息队列绑定
// 1. 队列名 2. 持久化 3. 独占 4. 自动珊瑚 5. 额外参数
channel.queueDeclare("hello", false, false, false, null);
// 通过通道发布消息
// 1. 交换机 2. 队列 3. 基础属性 4. 消息体字节数组
channel.basicPublish("", "hello", null, "Hello World RabbitMq".getBytes());
// 关闭通道
channel.close();
// 关闭连接
connection.close();
}
}
  • 运行 3 次,通过管理界面查看,可以看到自动创建了 消息队列 Queue 并且队列中有 3 条消息

消费者实现

  • Server -> Virtual Host -> Channel 通道
  • 通道绑定队列
  • 消费消息 basicConsume(参数)
    • String queue —— 队列名
    • boolean autoAck —— 是否开启消息自动确认机制(详见 Work 模型)
    • Consumer callback —— 消费消息时的回调接口
      • 使用 new DefaultConsumer(Channel channel) 创建默认 Consumer,重写 handleDelivery 方法,指定如何分发/消费消息,handleDeliver 方法的 body 参数即 消息体
消费者实现
/**
* 直连模式 消费者
*/
public class ConsumerDirect {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/flute");
connectionFactory.setUsername("flute");
connectionFactory.setPassword("flute");
Connection connection = connectionFactory.newConnection();
// 通道
Channel channel = connection.createChannel();
// 绑定队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
// 1. 队列 2. 开启消息自动确认机制 3. 消费消息时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String message = new String(body);
System.out.println("message = " + message);
}
});
// 关闭 通道 先不关,持续监听
// channel.close();
// 关闭 连接
// connection.close();
}
}

测试

  • 执行消费者
运行结果
message = Hello World RabbitMq
message = Hello World RabbitMq
message = Hello World RabbitMq
  • 可以看到成功消费了队列中的消息,且一次消费完了所有消息

连接工具类封装

RabbitMqConnectionUtils

连接工具类封装
/**
* 连接 工具类
*/
public class RabbitMqUtils {
private static ConnectionFactory connectionFactory;
// 定义提供连接对象的方法
static {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/flute");
connectionFactory.setUsername("flute");
connectionFactory.setPassword("flute");
}
public static Connection getConnection() {
try {
return connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
return null;
}
public static void closeConnectionAndChannel(Connection connection, Channel channel) {
try {
if (channel != null) channel.close();
if (connection != null) connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}