Spring Cloud Stream
Spring Cloud Stream 是构建消息驱动微服务的框架,通过 Binder 抽象层(屏蔽消息中间件差异,切换 RabbitMQ/Kafka 只需改配置)屏蔽消息中间件差异,支持 RabbitMQ 和 Kafka。
适用场景:微服务架构的消息通信、需要异步处理的业务场景、需要解耦生产者和消费者的系统。
提示
Spring Cloud Stream消息驱动微服务框架,支持RabbitMQ和Kafka绑定器
核心概念
| 概念 | 说明 |
|---|---|
| Binder | 绑定器(屏蔽消息中间件差异的抽象层) |
| Channel | 消息通道(生产者发送/消费者接收的管道) |
| Source | 消息生产者(发送消息的一方) |
| Sink | 消息消费者(接收消息的一方) |
快速开始
引入依赖(以 RabbitMQ 为例):
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>生产者
自定义消息通道:
public interface SendMsgOutput {
String OUTPUT = "test01output";
@Output(OUTPUT)
MessageChannel output();
}发送消息:
@EnableBinding(SendMsgOutput.class)
@RequiredArgsConstructor
public class SendMsgService {
private final MessageChannel messageChannel;
public void send(String message) {
messageChannel.send(MessageBuilder.withPayload(message).build());
}
}配置:
spring.cloud.stream.binders.test01.type=rabbit
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.host=${spring.rabbitmq.host}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.port=${spring.rabbitmq.port}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.username=${spring.rabbitmq.username}
spring.cloud.stream.binders.test01.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
spring.cloud.stream.bindings.test01output.destination=spring.cloud.stream.exchange
spring.cloud.stream.bindings.test01output.binder=test01消费者
自定义消息通道:
public interface ReceiveMsg {
String INPUT = "test01input";
@Input(INPUT)
SubscribableChannel input();
}接收消息:
@EnableBinding(ReceiveMsg.class)
public class MessageReceive {
@<mark>Stream</mark>Listener(ReceiveMsg.INPUT)
public void receive(String message) {
System.out.println("接收消息:" + message);
}
}分组与持久化
配置分组后,消息持久化且同一分组内只有一个消费者接收:
spring.cloud.stream.bindings.test01input.group=consumerGroup分组行为:
- 不分组时,消费者需要先启动,否则消息丢失
- 不分组时,多个消费者都能接收同一条消息
- 分组后,同一分组内只有一个消费者能接收消息,适合集群部署场景
路由键
默认情况下消息广播匹配方式是 #,所有消费者都可匹配。可通过指定 RoutingKey 实现按需匹配:
spring.cloud.stream.rabbit.bindings.myInput.consumer.bindingRoutingKey=spring.cloud.stream.#常见问题处理
消息发送失败
- 检查 RabbitMQ/Kafka 是否正常运行
- 确认消息代理连接配置是否正确
- 检查网络连通性
消息丢失
- 配置消息分组:
spring.cloud.stream.bindings.xxx.group=consumerGroup - 确认消费者是否正常启动
- 检查消息是否持久化