RocketMQ
RocketMQ 是阿里巴巴开源的分布式消息中间件,支持事务消息。
消息丢失的场景
生产者发送时如何保证不丢失?
采用 RocketMQ 自带的事务消息机制:
half 消息的作用是先判断 MQ 是否正常。
MQ 写入硬盘如何保证不丢失?
将 flushDiskType 改为 SYNC_FLUSH(同步刷盘),默认 ASYNC_FLUSH(异步刷盘)。
硬盘坏了如何保证不丢失?
采用主从集群部署,Leader 中的数据在多个 Follower 中存有备份。Master 挂了之后 DLedger 接管 commitLog,选举从节点,文件复制完成后提交。
消费者如何保证不丢失?
- 网络问题导致消费失败可重试(默认每条消息重试 16 次)
- 多线程异步消费失败 → 先执行本地事务再返回成功状态
MQ 节点挂了如何保证不丢失?
消息发送失败后先存入本地缓存,另外启动线程扫描缓存重试发送。
安装
RocketMQ 部署需要启动 NameServer 和 Broker 两个核心组件。
NameServer
docker run -d -p 9876:9876 --name rmqserver apache/rocketmq ./mqnamesrvBroker
配置 broker.conf:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 192.168.2.252docker run -d -p 10911:10911 -p 10909:10909 \
--name rmqbroker --link rmqserver:namesrv \
-e "NAMESRV_ADDR=namesrv:9876" \
-v /docker/rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf \
apache/rocketmq ./mqbroker -c /etc/rocketmq/broker.conf使用
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>发送消息:
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/test01")
public Result test01() {
String str = RandomUtil.randomString(8);
rocketMQTemplate.convertAndSend("test-topic", str);
return Result.ok();
}接收消息:
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-group")
public class RocketMQReceive implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("接收到消息:" + s);
}
}消息类型
| 类型 | 说明 | 适用场景 |
|---|---|---|
| 同步发送 | 等待响应 | 重要通知、报名短信 |
| 异步发送 | 回调接口接收响应 | 链路耗时较长的业务 |
| 单向发送 | 不等待应答 | 日志收集 |
| 顺序消息 | 严格按顺序发布和消费 | 订单处理 |
| 事务消息 | 分布式事务最终一致 | 支付系统 |
常见问题处理
消费失败重试耗尽
- 默认每条消息重试 16 次后进入死信队列(DLQ)
- 监控死信队列,人工介入或自动补偿
- 检查消费逻辑是否有幂等处理,避免重复消费
消息重复消费
- 消费端实现幂等:基于业务 ID(如订单号)去重
- 使用数据库唯一约束或 Redis SETNX 防重
- RocketMQ 不保证 Exactly-once,需要业务层保证幂等