Version: Next
Work 模型
Work Queues
,也被称为Task Queues
,任务模型
- 当消息处理比较耗时的时候,可能生产消息的速度远远大于消费消息的速度,造成 消息在队列中堆积,无法及时处理
Work 模型
:让多个消费者绑定同一个队列,共同消费队列中的消息- 队列中的消息一旦消费,就会消失,因此任务不会被重复执行
基本测试环境实现
生产者实现
生产者public class ProviderWork {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);// 生产消息for (int i = 0; i < 10; i++) {channel.basicPublish("","work",null,("{" + i + "} -> work model queue").getBytes());}RabbitMqUtils.closeConnectionAndChannel(connection, channel);}}消费者 1 实现
消费者1public class ConsumerWork1 {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicConsume("work", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer1 -> " + new String(body));}});}}消费者 2 实现
消费者2public class ConsumerWork2 {public static void main(String[] args) throws IOException {Connection connection = RabbitMqUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("work",true,false,false,null);channel.basicConsume("work", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Consumer2 -> " + new String(body));}});}}
测试
步骤
- 启动
消费者1
、消费者2
- 启动
生产者
快速生产10
条消息- 观察消费情况
消费者1Consumer1 -> {0} -> work model queueConsumer1 -> {2} -> work model queueConsumer1 -> {4} -> work model queueConsumer1 -> {6} -> work model queueConsumer1 -> {8} -> work model queue消费者2Consumer2 -> {1} -> work model queueConsumer2 -> {3} -> work model queueConsumer2 -> {5} -> work model queueConsumer2 -> {7} -> work model queueConsumer2 -> {9} -> work model queue
提示
默认情况下, RabbitMQ 将按顺序将每个消息发送给下一个使用者
平均而言,每个消费者会收到相同数量的消息
这种分发消息的方式为 轮询
问题
如果消费者的处理速度不一样,有的慢:
- 我们希望处理快的消费者多处理一些消息,处理慢的消费者少处理一些消息
- 就需要通过 Work 模型来实现
实现 “能者多劳”
RabbitMQ 消息自动确认机制
对于消费者,basicConsume
方法的 boolean autoAck
参数,默认我们使用了 true
- 一旦从队列中获取了消息,不管业务逻辑是怎样的,直接告诉队列自身已经消费完毕
- 队列因此将消息从队列中删除
问题
假设消费者从队列中确认了 5 条消息,队列直接将5条消息删除,但消费者在处理到第 3 条时出现了异常,自己挂了,则还没有处理的 2 条消息就丢失了,在这种情况下我们希望:
- 还没来得及处理的 2 条消息不会丢失
- 把这 2 条消息交给别的正常的消费者去消费
解决
在消费者接收消息时:
- 设置
boolean autoAck
为false
,即手动确认消息- 通过
channel.basicAck(envelope.getDeliveryTag(), boolean multiple)
实现
- 通过
- 令消费者不要一次消费所有消息,令消费者一次只能消费一条消息
- 通过
channel.basicQos(int prefetchCount)
实现
- 通过
手动确认,一次只消费一条消息的消费者代码
public class ConsumerWork1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 设置通道一次只消费一条消息
channel.basicQos(1);
channel.queueDeclare("work",
true,
false,
false,
null);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 -> " + new String(body));
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}