Netty
Netty 是一款用于快速开发高性能网络应用程序的 Java 框架。它封装了网络编程的复杂性,使开发人员能够更方便地使用网络编程和 Web 技术的最新进展。Dubbo、Elasticsearch 都采用了 Netty。
为什么选 Netty
NIO 的缺点
| 问题 | 说明 |
|---|---|
| API 繁杂 | 类库复杂,学习成本高,需熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer |
| 多线程复杂 | NIO 涉及 Reactor 模式,必须熟悉多线程和网络编程 |
| epoll bug | Selector 空轮询,导致 CPU 100%,JDK 1.7 仍未根本解决 |
Netty 的优点
| 优点 | 说明 |
|---|---|
| API 简单 | 学习成本低 |
| 功能强大 | 内置多种编解码器,支持多种协议 |
| 性能高 | 相比其他 NIO 框架性能最优 |
| 社区活跃 | BUG 及时修复,迭代版本周期短 |
| 质量验证 | Dubbo、Elasticsearch 等知名项目采用 |
IO模型 详细介绍了 BIO/NIO/AIO 的原理和区别,Netty 正是基于 NIO 的封装和优化。
核心组件
架构分层
Netty 分三层:Core(核心层,零拷贝、API 库、事件模型)-> Protocol Support(HTTP/WebSocket、SSL、Protobuf)-> Transport Services(Socket、Datagram、HTTP Tunnel)。
Channel
Channel 是 Java NIO 的基本构造,代表到实体的开放连接。可以打开或关闭,连接或断开连接。
Future 与回调
Netty 的异步编程模型建立在 Future 和回调之上。JDK 的 Future 需要手动检查操作是否完成,而 Netty 的 ChannelFuture 允许注册监听器,在操作完成时自动回调通知。
ChannelFuture future = channel.connect(address);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// 连接成功
} else {
future.cause().printStackTrace();
}
}
});EventLoop 与线程模型
Netty 通过 EventLoop 处理事件。每个 Channel 都会被分配一个 EventLoop,用以处理所有事件。EventLoop 由单一线程驱动,在其整个生命周期内不会改变,消除了 ChannelHandler 中的同步需求。
EventLoop 分配策略:
- 服务器端:每个客户端连接对应一个 Channel,由 EventLoopGroup 分配 EventLoop 处理
- 客户端:连接操作由 EventLoopGroup 中的一个 EventLoop 处理
ChannelHandler
ChannelHandler 是 Netty 的核心组件,负责处理入站和出站事件:
| 事件类型 | 说明 |
|---|---|
| 入站事件 | 连接激活、数据读取、异常发生 |
| 出站事件 | 连接打开关闭、数据写入刷新 |
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}ChannelPipeline
ChannelPipeline 持有一条 ChannelHandler 实例链。入站事件从头部流向尾部,出站事件从尾流向头。这种设计使得业务逻辑与网络处理代码分离。
ByteBuf
ByteBuf 是 Netty 的数据容器,相比 JDK 的 ByteBuffer 有显著优势:
| 特性 | ByteBuf | JDK ByteBuffer |
|---|---|---|
| 读写索引 | 分离 | 同一索引 |
| 容量扩展 | 支持 | 不支持 |
| 内存管理 | 池化/引用计数 | 无 |
| 组合缓冲区 | CompositeByteBuf | 无 |
使用模式:
- 堆缓冲区:数据存储在 JVM 堆内存,分配释放快
- 直接缓冲区:使用系统内存,减少 JVM 堆与系统内存间的复制
- 复合缓冲区:组合多个 ByteBuf,如 HTTP 消息头+体
传输类型
| 传输 | 说明 | 适用场景 |
|---|---|---|
| NIO | 非阻塞 I/O,使用 Selector | 高并发服务器 |
| Epoll | Linux 原生 EPOLL | Linux 生产环境 |
| OIO | 阻塞 I/O | 兼容旧代码 |
| Local | JVM 内部通信 | 同一 JVM 进程 |
| Embedded | 用于测试 | 单元测试 |
服务器引导
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();Bootstrap 配置参数
| 参数 | 说明 |
|---|---|
SO_BACKLOG | 服务端队列长度(默认 128) |
SO_KEEPALIVE | 连接保活探测(默认 false) |
TCP_NODELAY | 禁用 Nagle 算法(默认 true) |
SO_RCVBUF | TCP 接收缓冲区大小 |
group() 线程组
| 线程组 | 作用 | 适用 |
|---|---|---|
| bossGroup | 监听客户端连接,注册到 workerGroup | ServerBootstrap |
| workerGroup | 处理连接读写事件 | ServerBootstrap/Bootstrap |
默认线程数 = CPU 核数 x 2。
编解码器
| 类型 | 类 | 说明 |
|---|---|---|
| 解码器 | ByteToMessageDecoder | 字节转为消息对象 |
| 解码器 | ReplayingDecoder | 简化解码过程 |
| 编码器 | MessageToByteEncoder | 消息转为字节 |
| 预置 | HttpServerCodec | HTTP 编解码 |
| 预置 | WebSocketServerProtocolHandler | WebSocket 支持 |
| 预置 | ProtobufEncoder/Decoder | Protocol Buffers |
TaskQueue 任务队列
Handler 中可使用 TaskQueue 处理耗时任务,避免阻塞:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.channel().eventLoop().execute(() -> {
// 耗时任务
});
}
}延时任务:
ctx.channel().eventLoop().schedule(() -> {
System.out.println("延时任务");
}, 5, TimeUnit.SECONDS);应用场景
| 公司 | 应用 |
|---|---|
| Twitter Finagle | 构建高性能 RPC 框架 |
| Facebook Nifty | 基于 Netty 实现 Thrift 服务 |
| Firebase | HTTP 长连接实现实时数据同步 |
| Urban Airship | 处理大量并发推送通知 |
常见问题处理
连接超时
- 检查
SO_TIMEOUT和connectTimeoutMillis配置 - 确认服务端
SO_BACKLOG队列长度是否足够(默认 128) - 使用
SO_KEEPALIVE开启 TCP 保活探测
内存泄漏
- 使用
ResourceLeakDetector.setLevel(Level.ADVANCED)开启泄漏检测 - 确保 ByteBuf 在 finally 块中
release(),或使用SimpleChannelInboundHandler自动释放 - 检查 ChannelPipeline 中是否有 Handler 未正确释放消息