Version: Next
分组消费与持久化
为演示分组,参照
8802
,新建cloud-stream-rabbitmq-consumer8803
- 依次运行
RabbitMQ
、注册中心7001
、消息生产者8801
、消息消费者8802
、8803
运行后的两个问题
- 重复消费
- 消息持久化
解决方法:依靠
分组
重复消费问题
在一些场景下,我们希望 Stream 中的消费者是竞争关系,一条消息只能被消费一次
- Stream 中处于同一个分组 Group 的消费者是竞争关系,要保证消息只被其中一个消费者消费一次
- 不同分组 Group 可以重复消费
实现
8802
、8803
分到不同的组8802
、8803
实现轮询分组
,每次只有一个消费者;8801
发送的消息每次只能被8802
、8803
其中之一接收,避免重复消费
YAML
修改
8802
、8803
的 YAML,添加分组配置
- 8802 分组配置
- 8803 分组配置
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此处配置要绑定的rabbitmq的服务信息defaultRabbit: #表示定义的名称,用于binding整合type: rabbit #消息组件类型environment: #设置rabbitmq的相关环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: #服务的整合处理input: #这个名字是一个通道的名称destination: studyExchange #表示要使用的Exchange名称定义content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain”binder: defaultRabbit #设置要绑定的消息服务的具体设置group: bsxA #绑定分组 防止消息重复消费eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S)lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90sinstance-id: receive-8802.com #在信息列表时显示主机名称prefer-ip-address: true #访问的路径变为IP地址
测试(不同组重复消费)
重启消费者项目,可以在 RabbitMQ 控制面板看到设置的两个分组
- 访问
http://localhost:8801/sendMessage
8802
、8803
都可以接收到消息:重复消费- 可见结论:默认情况下就是分成不同的组,现在只是给不同的组起了个名字,还是重复消费
- 8802 后台
- 8803 后台
消费者1号, -----> 接受到的消息: 0e397a19-21d4-4ff9-a231-094d7e114e11 port: 8802
相同分组
8802
、8803
分到同一个分组8802
、8803
实现轮询分组
,每次只有一个消费者;8801
发送的消息每次只能被8802
、8803
其中之一接收,避免重复消费
提示
8802
、8803
的 YAML 中, group 属性配置成相同的即可
测试(轮询消费)
- 访问
http://localhost:8801/sendMessage
8802
受到第一条,8803
收不到;8802
收到第二条,8803
收不到- 这样同组内的两个消费者会轮询消费
持久化
设置过分组即可
- 不设置分组:消费者挂掉,生产者在发送消息,消费者重新上线,不能接收到自己挂掉期间生产者发送的消息,出现 消息丢失
- 设置分组:消费者挂掉,生产者在发送消息,消费者重新上线,消费者可以接收到自己挂掉期间生产者发送的还未被消费的消息,即实现了 消息持久化