rabbitMQ

rabbitMQ 有4种不同的交换器类型

  • Direct : 发布与订阅,完全匹配
  • Topic : 主题,规则匹配
  • Fanout : 广播匹配
  • Headers :

交换器

Direct

根据 routing.key 进入对应的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
*                               ___________               #####################
* -> |_log.info_|----msg--> ## info日志处理服务 ##
* | #####################
* ___|________
* (routing.key)
* /
* _______ _________ /
* |_ 服务_|-->|_交换器_|<
* \
* \___________
* (routing.key)
* \ ____________ #####################
* \_> |_log.error_|----msg--> ## erro日志处理服务 ##
* #####################

Topic

根据 路由key 进行模糊匹配,再决定将哪个消息放到哪个队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
*                                                                                                              ###########
* _________________ # info 日志 #
* 7------ *.log.info ----->|_ log.info 队列_|------- msg -----> # 处理服务 #
* ___________ / ###########
* |_ 用户服务 _|----- /
* \ 匹配规则 :topic /
* \ / #############
* ____________ --> ############ __________________ # error日志 #
* |_ 商品服务 _|---------> # 交换器 #-------- *.log.error ----->|_ log.error 队列_|------- msg -----> # 处理服务 #
* --> ############ #############
* / \
* ___________ / \ #########
* |_ 订单服务 _|----^ \ _____________ # 全日志 #
* \------- *.log.* ----->|_ log- 队列_|------- msg -------------> # 处理服务 #
* ##########

Fanout

只要有订阅该交换器的队列都会收到消息

1
2
3
4
5
6
7
8
9
10
11
*                                                                                  
* ___________ #########
* 7----->|_ 队列 Q1 _|---- 短信发送 -----> # 短信服务 #
* / #########
* /
* 匹配规则 :Fanout /
* /
* ____________ ########### / ____________ ###########
* |_ 订单服务 _|---------> # 交换器 #------------>|_ 队列 Q2 _|---- app push ----> # push 服务 #
* ############ ###########
*

持久化(durable)

消息接收者如果宕机后再重启,期间会出现消息丢失,所以需要持久化

在接收者 RabbitListener 中,有不同的注解控制持久化

autoDelete

  • @Queue:autoDelete =”true”。 当所有消费客户端连接断开后,是否自动删除队列,true删除,false不删除
  • @Exchange:autoDelete =”true” 当所有绑定队列都不在使用时,是否自动删除交换器,true删除,false不删除

消息确认 ACK 机制

如果在处理消息的过程中,消费者的服务器在处理消息时出现异常,那可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确认-ACK

确认机制

ACK机制 是消费者从RabbitMQ收到消息并处理完成后,反馈在RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。

  • 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ 会认为这个消息没有正常消费,会将消息重新放入队列中。
  • 如果在集群的情况下:RabbitMQ会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不会丢失任何消息和任务。
  • 消息永远不会从RabbitMQ中删除:只会当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
  • 消息的ACK确认机制默认是打开的。

ACK机制的开发注意事项

如果忘记了ACK,那么后果很严重。当Consumer退出时,Message会一直重新分发。然后忘记RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。

1
2
3
4
5
6
7
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true //通过以上配置可以启动AmqpTemplate的重试机制,例如与RabbitMQ连接丢失的时候将会自动重试事件的发布,这个特性默认是关闭的
max-attempts: 5 //重试次数