RabbitMQ
RabbitMQ 是实现了 AMQP 协议的消息中间件,支持多种消息模式。最大的特点是消费方不需要确保提供方存在,实现了服务之间的高度解耦。
为什么使用 RabbitMQ
| 优势 | 说明 |
|---|---|
| 异步削峰 | 具备异步、削峰、负载均衡等高级功能 |
| 持久化 | 拥有持久化机制,队列中的信息可以保存下来 |
| 解耦 | 生产者和消费者独立变化 |
| 限流 | 同步访问变为串行访问,利于数据库操作 |
MQ 的缺点
引入消息队列会提高系统复杂性:需要保证消息不重复消费、不丢失、顺序性;同时增加了系统依赖。
核心概念
| 概念 | 说明 |
|---|---|
| Server | Broker,接受客户端连接 |
| Connection | 应用程序与 Broker 的网络连接 |
| Channel | 网络信道,几乎所有的操作都在 Channel 中进行 |
| Exchange | 交换机,接受消息并路由到队列(不具备消息存储能力) |
| Queue | 消息队列,保存消息并转发给消费者 |
| Routing Key | 路由规则 |
| Binding | Exchange 和 Queue 之间的虚拟连接 |
消息路由流程
信道 Channel
TCP 连接的创建和销毁开销较大。RabbitMQ 使用信道方式传输数据:信道是建立在 TCP 连接内的虚拟连接,每条 TCP 连接上的信道数量没有限制。
交换器类型
| 类型 | 路由规则 |
|---|---|
| fanout | 广播到所有绑定的队列 |
| direct | 路由键完全匹配 |
| topic | 通配符匹配(# 匹配多个单词,* 匹配一个单词) |
| headers | 根据消息 headers 属性匹配 |
消息可靠性
消息可靠性需要从生产者、RabbitMQ 本身和消费端三方面保证。
生产者
使用 confirm 模式(异步,推荐)而非事务模式(同步,性能差):
- 开启 confirm 模式后,每条消息分配唯一 id
- RabbitMQ 接收后回调
ack,失败回调nack - 结合内存维护消息状态,超时未收到回调可重发
RabbitMQ 持久化
- 创建 queue 时设置持久化
- 发送消息时将
deliveryMode设置为 2 - 恢复后自动读取之前存储的数据
消费端
设置 noAck=false,消费者显式发回 ack 后才从队列中移去消息。
避免消息重复消费
| 去重机制 | 说明 |
|---|---|
| 消息去重 | MQ 内部针对每条消息生成 inner-msg-id |
| 业务去重 | 消息体中包含 bizId(如订单 ID),消费端去重依据 |
高可用
| 模式 | 说明 |
|---|---|
| 单机模式 | 开发测试使用 |
| 普通集群模式 | 提高吞吐量,queue 所在节点宕机数据丢失 |
| 镜像集群模式 | 每个节点都有 queue 的完整镜像,数据同步到所有实例 |
消息模式
| 模式 | 说明 |
|---|---|
| 简单模式 | 一个生产者对应一个消费者 |
| work 模式 | 一个生产者对应多个消费者,轮询或公平分发 |
| 发布/订阅 | 通过交换机广播到多个队列 |
Spring Boot 整合
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>创建队列和交换机并绑定:
@Component
public class QueueConfig {
@Bean
public Queue addWatermarkQueue() {
return new Queue("AddWatermark");
}
@Bean
DirectExchange addWatermarkExchange() {
return new DirectExchange("AddWatermark");
}
@Bean
Binding bindingAddWatermarkExchange(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("mq.addwatermark");
}
}生产消息:
rabbitTemplate.convertAndSend("AddWatermark", "", watermarkEntity);消费消息:
@RabbitListener(queues = "AddWatermark")
@Component
public class AddWatermarkProcessor {
@RabbitHandler
public void onMessage(WatermarkEntity watermarkEntity) {
System.out.println("消费消息 " + watermarkEntity);
}
}常见问题处理
消息丢失
- 生产者开启 confirm 模式,异步回调确认消息到达 Broker
- 队列和消息都设置持久化(
deliveryMode=2) - 消费者设置
noAck=false,显式发送 ACK
消息堆积
- 增加消费者数量,配合 prefetch 设置合理 QoS
- 检查消费者是否有阻塞操作或异常导致消费缓慢
- 临时扩容消费者处理积压,处理完后缩容