Spring WebFlux
WebFlux 是 Spring 5 引入的响应式 Web 框架,基于 Reactor 实现非阻塞式编程,少量线程即可处理高并发。
WebFlux vs MVC
| 特性 | WebFlux | Spring MVC |
|---|---|---|
| 编程模型 | 函数式/注解 | 注解驱动 |
| 线程模型 | 少量线程处理高并发 | 同步阻塞 |
| 适用场景 | IO 密集型、高并发 | CPU 密集型 |
| 响应式数据 | Flux、Mono | 同步返回 |
| 依赖 | Reactor、Netty | Servlet |
提示
选择建议 IO 密集型、高并发场景选 WebFlux;需要复杂业务逻辑或阻塞操作选 Spring MVC。
核心类型
| 类型 | 说明 |
|---|---|
| Flux | 发出 0 到 N 个元素的响应式序列 |
| Mono | 发出 0 或 1 个元素的响应式类型 |
| Router Function | 函数式路由定义 |
| Handler Function | 请求处理函数 |
Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>注解式开发
@RestController
@RequestMapping("/users")
public class UserController {
@Autowired
private UserRepository userRepository;
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userRepository.findById(id);
}
@GetMapping
public Flux<User> getAllUsers() {
return userRepository.findAll();
}
@PostMapping
public Mono<User> createUser(@RequestBody User user) {
return userRepository.save(user);
}
}响应式 Repository
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Flux<User> findByUsername(String username);
Mono<User> findById(Long id);
}函数式编程
Router Function
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return route(GET("/api/users/{id}"), handler::getUser)
.andRoute(GET("/api/users"), handler::getAllUsers)
.andRoute(POST("/api/users"), handler::createUser);
}
}Handler Function
@Component
public class UserHandler {
private final UserRepository userRepository;
public UserHandler(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.parseLong(request.pathVariable("id"));
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
}响应式流操作
// 映射、过滤、收集
userRepository.findAll().map(User::getUsername);
userRepository.findAll().filter(user -> user.getAge() > 18);
userRepository.findAll().collectList();
// 扁平化
userRepository.findAll().flatMap(user -> callExternalService(user));
// 错误处理
userRepository.findById(id).onErrorReturn(defaultUser);
userRepository.findAll().onErrorResume(e -> Flux.empty());
// 组合
Flux.concat(flux1, flux2);
Flux.merge(flux1, flux2);
Mono.zip(mono1, mono2, mono3);
// 分页
userRepository.findAll().skip(10).take(10);配置
spring:
data:
mongodb:
uri: mongodb://localhost:27017/test
r2dbc:
url: r2dbc:mysql://localhost:3306/test
username: root
password: passwordCORS 配置
@Bean
public CorsWebFilter corsWebFilter() {
CorsConfiguration config = new CorsConfiguration();
config.setAllowedOrigins(Arrays.asList("*"));
config.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "DELETE"));
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsMapping("/**", config);
return new CorsWebFilter(source);
}错误处理
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(UserNotFoundException.class)
public Mono<ServerResponse> handleNotFound(UserNotFoundException ex) {
return ServerResponse.status(HttpStatus.NOT_FOUND)
.bodyValue(Map.of("error", "Not Found", "message", ex.getMessage()));
}
@ExceptionHandler(Exception.class)
public Mono<ServerResponse> handleGeneral(Exception ex) {
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "Internal Server Error", "message", ex.getMessage()));
}
}测试
@SpringBootTest
@AutoConfigureWebTestClient
public class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void testGetUser() {
webTestClient.get().uri("/users/1")
.exchange()
.expectStatus().isOk()
.expectBody(User.class);
}
}StepVerifier
@Test
public void testUserRepository() {
StepVerifier.create(userRepository.findByUsername("test"))
.expectNextMatches(user -> user.getUsername().equals("test"))
.verifyComplete();
}常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 阻塞操作导致性能问题 | 在响应式链中使用了阻塞 I/O | 用 R2DBC 替代 JDBC,用 WebClient 替代 RestTemplate |
| 背压处理不当 | 消费者跟不上生产者速度 | 使用 onBackpressureBuffer()/onBackpressureDrop() 策略 |
| 内存泄漏 | Flux/Mono 未被订阅 | 确保响应式链被正确订阅,使用 Disposable 管理生命周期 |