Version: Next

Work 模型

Work Queues,也被称为 Task Queues任务模型

  • 当消息处理比较耗时的时候,可能生产消息的速度远远大于消费消息的速度,造成 消息在队列中堆积,无法及时处理
  • Work 模型让多个消费者绑定同一个队列,共同消费队列中的消息
  • 队列中的消息一旦消费,就会消失,因此任务不会被重复执行


基本测试环境实现

生产者实现

生产者
public class ProviderWork {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",
true,
false,
false,
null);
// 生产消息
for (int i = 0; i < 10; i++) {
channel.basicPublish("",
"work",
null,
("{" + i + "} -> work model queue").getBytes());
}
RabbitMqUtils.closeConnectionAndChannel(connection, channel);
}
}

消费者 1 实现

消费者1
public class ConsumerWork1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",
true,
false,
false,
null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 -> " + new String(body));
}
});
}
}

消费者 2 实现

消费者2
public class ConsumerWork2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",
true,
false,
false,
null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 -> " + new String(body));
}
});
}
}

测试

步骤

  1. 启动 消费者1消费者2
  2. 启动 生产者 快速生产 10 条消息
  3. 观察消费情况
消费者1
Consumer1 -> {0} -> work model queue
Consumer1 -> {2} -> work model queue
Consumer1 -> {4} -> work model queue
Consumer1 -> {6} -> work model queue
Consumer1 -> {8} -> work model queue
消费者2
Consumer2 -> {1} -> work model queue
Consumer2 -> {3} -> work model queue
Consumer2 -> {5} -> work model queue
Consumer2 -> {7} -> work model queue
Consumer2 -> {9} -> work model queue
提示

默认情况下, RabbitMQ 将按顺序将每个消息发送给下一个使用者

平均而言,每个消费者会收到相同数量的消息

这种分发消息的方式为 轮询

问题

如果消费者的处理速度不一样,有的慢:

  • 我们希望处理快的消费者多处理一些消息,处理慢的消费者少处理一些消息
  • 就需要通过 Work 模型来实现

实现 “能者多劳”

RabbitMQ 消息自动确认机制

对于消费者,basicConsume 方法的 boolean autoAck 参数,默认我们使用了 true

  • 一旦从队列中获取了消息,不管业务逻辑是怎样的,直接告诉队列自身已经消费完毕
  • 队列因此将消息从队列中删除
问题

假设消费者从队列中确认了 5 条消息,队列直接将5条消息删除,但消费者在处理到第 3 条时出现了异常,自己挂了,则还没有处理的 2 条消息就丢失了,在这种情况下我们希望:

  • 还没来得及处理的 2 条消息不会丢失
  • 把这 2 条消息交给别的正常的消费者去消费
解决

在消费者接收消息时:

  • 设置 boolean autoAckfalse,即手动确认消息
    • 通过 channel.basicAck(envelope.getDeliveryTag(), boolean multiple) 实现
  • 令消费者不要一次消费所有消息,令消费者一次只能消费一条消息
    • 通过 channel.basicQos(int prefetchCount) 实现
手动确认,一次只消费一条消息的消费者代码
public class ConsumerWork1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 设置通道一次只消费一条消息
channel.basicQos(1);
channel.queueDeclare("work",
true,
false,
false,
null);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 -> " + new String(body));
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}