Version: Next

分组消费与持久化

为演示分组,参照 8802,新建 cloud-stream-rabbitmq-consumer8803

  • 依次运行 RabbitMQ、注册中心 7001、消息生产者 8801、消息消费者 88028803

运行后的两个问题

  • 重复消费
  • 消息持久化

解决方法:依靠 分组


重复消费问题

在一些场景下,我们希望 Stream 中的消费者是竞争关系,一条消息只能被消费一次

  • Stream 中处于同一个分组 Group 的消费者是竞争关系,要保证消息只被其中一个消费者消费一次
  • 不同分组 Group 可以重复消费

实现

  • 88028803 分到不同的组
  • 88028803 实现 轮询分组,每次只有一个消费者;8801 发送的消息每次只能被 88028803 其中之一接收,避免重复消费

YAML

修改 88028803 的 YAML,添加分组配置

server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: #在此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain”
binder: defaultRabbit #设置要绑定的消息服务的具体设置
group: bsxA #绑定分组 防止消息重复消费
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S)
lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s
instance-id: receive-8802.com #在信息列表时显示主机名称
prefer-ip-address: true #访问的路径变为IP地址

测试(不同组重复消费)

重启消费者项目,可以在 RabbitMQ 控制面板看到设置的两个分组


  • 访问 http://localhost:8801/sendMessage
  • 88028803 都可以接收到消息:重复消费
  • 可见结论:默认情况下就是分成不同的组,现在只是给不同的组起了个名字,还是重复消费
消费者1号, -----> 接受到的消息: 0e397a19-21d4-4ff9-a231-094d7e114e11 port: 8802

相同分组

  • 88028803 分到同一个分组
  • 88028803 实现 轮询分组,每次只有一个消费者;8801 发送的消息每次只能被 88028803 其中之一接收,避免重复消费
提示

88028803 的 YAML 中, group 属性配置成相同的即可

测试(轮询消费)

  • 访问 http://localhost:8801/sendMessage
  • 8802 受到第一条,8803 收不到;8802 收到第二条,8803 收不到
  • 这样同组内的两个消费者会轮询消费

持久化

设置过分组即可

  • 不设置分组:消费者挂掉,生产者在发送消息,消费者重新上线,不能接收到自己挂掉期间生产者发送的消息,出现 消息丢失
  • 设置分组:消费者挂掉,生产者在发送消息,消费者重新上线,消费者可以接收到自己挂掉期间生产者发送的还未被消费的消息,即实现了 消息持久化