Netty 架构深度解析
JDK NIO 很好,但直接使用它有几个明显的问题:
ByteBuffer 使用繁琐,需要手动管理 flip、clear
- 半包、粘包问题需要自己处理
- 异常场景多,代码容易出 bug
- Selector 的空轮询 bug 在某些 JDK 版本上存在
Netty 在 NIO 基础上做了大量封装,提供了更易用的 API、更可靠的行为、更高的性能。它是高性能网络编程的事实标准。
Netty 的核心优势
Netty 能成为行业标准,靠的不只是"封装 NIO"这么简单:
零拷贝。支持 CompositeByteBuf、DirectBuffer、FileRegion,最大化利用操作系统的零拷贝能力。
内存池。PooledByteBuf 基于jemalloc 实现的高效内存池,减少 GC 压力。
统一的异步 API。所有 I/O 操作都是异步的,即使底层使用同步 I/O。
丰富的编解码器。内置 HTTP/Redis/MySQL/Protobuf 等协议编解码器,开箱即用。
事件驱动。基于 ChannelPipeline 和 ChannelHandler 的事件处理模型,职责清晰,易于扩展。
Netty 架构总览
flowchart TB
subgraph Client
C1["Bootstrap"]
end
subgraph Server
S1["ServerBootstrap"]
end
subgraph Thread Groups
B["Boss Group\nNioEventLoopGroup"]
W["Worker Group\nNioEventLoopGroup"]
end
subgraph Channels
SC["NioServerSocketChannel"]
CC["NioSocketChannel"]
end
subgraph Pipeline
P1["ChannelPipeline\nChannelHandler1 -> ChannelHandler2 -> ..."]
end
S1 --> B
S1 --> W
B --> SC
W --> CC
SC --> P1
CC --> P1
Boss Group vs Worker Group
- Boss Group:负责处理连接建立(OP_ACCEPT)。通常只需要 1 个线程。
- Worker Group:负责处理读写事件(OP_READ/OP_WRITE)。线程数通常等于 CPU 核心数的 2 倍。
服务端启动
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 1 个 Boss 线程
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认 CPU×2
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 使用 NIO
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536),
new BusinessHandler()
);
}
});
ChannelFuture f = bootstrap.bind(8080).sync();
System.out.println("服务器启动,监听端口 8080");
ChannelPipeline:责任链模式
ChannelPipeline 是 Netty 处理消息的核心组件,采用责任链模式:
flowchart LR
subgraph Pipeline
I["Inbound 1"] --> I2["Inbound 2"]
I2 --> I3["Inbound 3"]
I3 --> B["业务 Handler"]
B --> O["Outbound 1"]
O --> O2["Outbound 2"]
O2 --> O3["Outbound 3"]
end
style B fill:#ff6b6b
- Inbound Handler:处理入站事件(数据接收、连接建立等),从头部向尾部传播
- Outbound Handler:处理出站事件(数据发送、连接关闭等),从尾部向头部传播
自定义
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("收到: " + in.toString(StandardCharsets.UTF_8));
// 响应客户端
ByteBuf out = ctx.alloc().buffer();
out.writeBytes("Hello Client".getBytes());
ctx.writeAndFlush(out);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ByteBuf:超越 ByteBuffer
Netty 实现了自己的 ByteBuf,解决了 JDK ByteBuffer 的三个问题:
问题一:无需手动 flip
JDK
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(data); // 写入
buffer.flip(); // 必须 flip 才能读
buffer.get(); // 读取
Netty
ByteBuf buffer = Unpooled.buffer(1024);
buffer.writeBytes(data); // 写入
buffer.readableBytes(); // 有多少数据可读
buffer.readByte(); // 读取
问题二:支持动态扩容
ByteBuf buffer = Unpooled.buffer(16);
buffer.writeBytes(largeData); // 自动扩容
问题三:引用计数与池化
ByteBuf buffer = ctx.alloc().buffer();
try {
// 使用 buffer
} finally {
buffer.release(); // 释放回池
}
Netty 的 ByteBuf 支持引用计数:
retain():增加引用计数
release():减少引用计数
- 计数为 0 时归还内存池
Pooled vs Unpooled
// 池化(推荐用于高频场景)
ByteBuf pooled = ctx.alloc().buffer(1024); // 从池中获取
// 非池化(每次创建新对象)
ByteBuf unpooled = Unpooled.buffer(1024); // 每次 new
ChannelHandler:编解码与业务处理
Netty 提供了丰富的内置 Handler:
编解码器
HTTP
ch.pipeline().addLast(
new HttpServerCodec(), // HTTP 编解码
new HttpObjectAggregator(65536), // 聚合 HTTP 消息
new HttpServerHandler() // 业务处理
);
协议编解码
Protobuf
ch.pipeline().addLast(
new ProtobufEncoder(), // Protobuf 编码
new ProtobufDecoder(...), // Protobuf 解码
new BusinessHandler() // 业务处理
);
心跳检测
心跳
ch.pipeline().addLast("heartbeat", new IdleStateHandler(
60, // 读空闲 60 秒
45, // 写空闲 45 秒
30 // 读写空闲 30 秒
));
ch.pipeline().addLast("heartbeatHandler", new HeartbeatHandler());
Netty 的线程模型
Netty 基于 Reactor 模式实现:
flowchart TB
subgraph Boss
B1["Boss NioEventLoop"]
end
subgraph Worker
W1["Worker NioEventLoop"]
W2["Worker NioEventLoop"]
W3["Worker NioEventLoop"]
W4["Worker NioEventLoop"]
end
subgraph Channel
C1["NioSocketChannel"]
end
B1 -->|"处理 OP_ACCEPT"| W1
B1 -->|"处理 OP_ACCEPT"| W2
W1 --> C1
W2 --> C1
style B1 fill:#ff6b6b
style W1 fill:#48dbfb
style W2 fill:#48dbfb
Boss NioEventLoop:
- 监听端口,接收新连接
- 把新连接注册到 Worker
Worker NioEventLoop:
- 处理连接的读写事件
- 执行 ChannelPipeline 中的 Handler
NioEventLoop 的职责
class NioEventLoop implements Runnable {
Selector selector;
Queue<Runnable> taskQueue;
public void run() {
while (!terminated) {
// 1. 监听 I/O 事件
int ready = selector.select();
// 2. 处理 I/O 事件
processSelectedKeys();
// 3. 处理异步任务
runAllTasks();
}
}
}
NioEventLoop 同时负责:
- I/O 事件处理
- 异步任务执行(定时任务、用户提交的任务)
Netty 启动流程
完整示例
public class NettyServer {
public static void main(String[] args) throws Exception {
// 1. 创建线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 2. 创建启动器
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024) // 连接队列大小
.childOption(ChannelOption.SO_KEEPALIVE, true) // TCP 保活
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用 Nagle
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new LineBasedFrameDecoder(1024), // 按行分割
new StringDecoder(StandardCharsets.UTF_8),
new BusinessHandler()
);
}
});
// 3. 绑定端口
ChannelFuture f = bootstrap.bind(8080).sync();
System.out.println("服务器启动,监听端口 8080");
// 4. 等待服务器关闭
f.channel().closeFuture().sync();
} finally {
// 5. 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
本章小结
Netty 是高性能网络编程的首选框架:
- ByteBuf:池化、动态扩容、引用计数的缓冲区
- ChannelPipeline:责任链模式的事件处理
- Reactor 线程模型:Boss Group + Worker Group
- 丰富内置组件:编解码器、心跳、协议支持
下一章我们将深入学习 Netty 的线程模型,理解 EventLoop 的工作原理。
延伸思考
为什么 Netty 的 ByteBuf 要设计引用计数?
根本原因是为了精确管理内存释放。在高性能场景下,GC 是最大的敌人之一。通过引用计数,可以:
- 精确控制内存释放时机
- 避免过早释放导致的 use-after-free
- 配合内存池实现高效复用
这与 C/C++ 的智能指针(如 shared_ptr)是同样的思想。