ChannelOutboundBuffer中缓存待发送数据.png
protected abstract class AbstractUnsafe implements Unsafe {
//待发送数据缓冲队列 Netty是全异步框架,所以这里需要一个缓冲队列来缓存用户需要发送的数据
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//获取当前channel对应的待发送数据缓冲队列(支持用户异步写入的核心关键)
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
..........省略..................
int size;
try {
//过滤message类型 这里只会接受DirectBuffer或者fileRegion类型的msg
msg = filterOutboundMessage(msg);
//计算当前msg的大小
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
..........省略..................
}
//将msg 加入到Netty中的待写入数据缓冲队列ChannelOutboundBuffer中
outboundBuffer.addMessage(msg, size, promise);
}
}
众所周知 Netty 是一个异步事件驱动的网络框架,在 Netty 中所有的 IO 操作全部都是异步的,当然也包括本小节介绍的 write 操作,为了保证异步执行 write 操作,Netty 定义了一个待发送数据缓冲队列 ChannelOutboundBuffer ,Netty 将这些用户需要发送的网络数据在写入到 Socket 之前,先放在 ChannelOutboundBuffer 中缓存。
3.2.1 filterOutboundMessage每个客户端 NioSocketChannel 对应一个 ChannelOutboundBuffer 待发送数据缓冲队列
ChannelOutboundBuffer 只会接受 ByteBuffer 类型以及 FileRegion 类型的 msg 数据。
FileRegion 是Netty定义的用来通过零拷贝的方式网络传输文件数据。本文我们主要聚焦普通网络数据 ByteBuffer 的发送。
所以在将 msg 写入到 ChannelOutboundBuffer 之前,我们需要检查待写入 msg 的类型。确保是 ChannelOutboundBuffer 可接受的类型。
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " StringUtil.simpleClassName(msg) EXPECTED_TYPES);
}
在网络数据传输的过程中,Netty为了减少数据从 堆内内存 到 堆外内存 的拷贝以及缓解GC的压力,所以这里必须采用 DirectByteBuffer 使用堆外内存来存放网络发送数据。
3.2.2 estimatorHandle计算当前msg的大小public class DefaultChannelPipeline implements ChannelPipeline {
//原子更新estimatorHandle字段
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
//计算要发送msg大小的handler
private volatile MessageSizeEstimator.Handle estimatorHandle;
final MessageSizeEstimator.Handle estimatorHandle() {
MessageSizeEstimator.Handle handle = estimatorHandle;
if (handle == null) {
handle = channel.config().getMessageSizeEstimator().newHandle();
if (!ESTIMATOR.compareAndSet(this, null, handle)) {
handle = estimatorHandle;
}
}
return handle;
}
}
在 pipeline 中会有一个 estimatorHandle 专门用来计算待发送 ByteBuffer 的大小。这个 estimatorHandle 会在 pipeline 对应的 Channel 中的配置类创建的时候被初始化。
这里 estimatorHandle 的实际类型为DefaultMessageSizeEstimator#HandleImpl。
public final class DefaultMessageSizeEstimator implements MessageSizeEstimator {
private static final class HandleImpl implements Handle {
private final int unknownSize;
private HandleImpl(int unknownSize) {
this.unknownSize = unknownSize;
}
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
}
这里我们看到 ByteBuffer 的大小即为 Buffer 中未读取的字节数 writerIndex - readerIndex 。
当我们验证了待写入数据 msg 的类型以及计算了 msg 的大小后,我们就可以通过 ChannelOutboundBuffer#addMessage方法将 msg 写入到ChannelOutboundBuffer(待发送数据缓冲队列)中。
write 事件处理的最终逻辑就是将待发送数据写入到 ChannelOutboundBuffer 中,下面我们就来看下这个 ChannelOutboundBuffer 内部结构到底是什么样子的?
3.3 ChannelOutboundBufferChannelOutboundBuffer 其实是一个单链表结构的缓冲队列,链表中的节点类型为 Entry ,由于 ChannelOutboundBuffer 在 Netty 中的作用就是缓存应用程序待发送的网络数据,所以 Entry 中封装的就是待写入 Socket 中的网络发送数据相关的信息,以及 ChannelHandlerContext#write 方法中返回给用户的 ChannelPromise 。这样可以在数据写入Socket之后异步通知应用程序。
此外 ChannelOutboundBuffer 中还封装了三个重要的指针:
- unflushedEntry :该指针指向 ChannelOutboundBuffer 中第一个待发送数据的 Entry。
- tailEntry :该指针指向 ChannelOutboundBuffer 中最后一个待发送数据的 Entry。通过 unflushedEntry 和 tailEntry 这两个指针,我们可以很方便的定位到待发送数据的 Entry 范围。
- flushedEntry :当我们通过 flush 操作需要将 ChannelOutboundBuffer 中缓存的待发送数据发送到 Socket 中时,flushedEntry 指针会指向 unflushedEntry 的位置,这样 flushedEntry 指针和 tailEntry 指针之间的 Entry 就是我们即将发送到 Socket 中的网络数据。
这三个指针在初始化的时候均为 null 。
ChannelOutboundBuffer结构.png
3.3.1 EntryEntry 作为 ChannelOutboundBuffer 链表结构中的节点元素类型,里边封装了待发送数据的各种信息,ChannelOutboundBuffer 其实就是对 Entry 结构的组织和操作。因此理解 Entry 结构是理解整个 ChannelOutboundBuffer 运作流程的基础。
下面我们就来看下 Entry 结构具体封装了哪些待发送数据的信息。
static final class Entry {
//Entry的对象池,用来创建和回收Entry对象
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
@Override
public Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
});
//DefaultHandle用于回收对象
private final Handle<Entry> handle;
//ChannelOutboundBuffer下一个节点
Entry next;
//待发送数据
Object msg;
//msg 转换为 jdk nio 中的byteBuffer
ByteBuffer[] bufs;
ByteBuffer buf;
//异步write操作的future
ChannelPromise promise;
//已发送了多少
long progress;
//总共需要发送多少,不包含entry对象大小。
long total;
//pendingSize表示entry对象在堆中需要的内存总量 待发送数据大小 entry对象本身在堆中占用内存大小(96)
int pendingSize;
//msg中包含了几个jdk nio bytebuffer
int count = -1;
//write操作是否被取消
boolean cancelled;
}
我们看到Entry结构中一共有12个字段,其中1个静态字段和11个实例字段。
下面笔者就为大家介绍下这12个字段的含义及其作用,其中有些字段会在后面的场景中使用到,这里大家可能对有些字段理解起来比较模糊,不过没关系,这里能看懂多少是多少,不理解也没关系,这里介绍只是为了让大家混个眼熟,在后面流程的讲解中,笔者还会重新提到这些字段。
- ObjectPool<Entry> RECYCLER:Entry 的对象池,负责创建管理 Entry 实例,由于 Netty 是一个网络框架,所以 IO 读写就成了它的核心操作,在一个支持高性能高吞吐的网络框架中,会有大量的 IO 读写操作,那么就会导致频繁的创建 Entry 对象。我们都知道,创建一个实例对象以及 GC 回收这些实例对象都是需要性能开销的,那么在大量频繁创建 Entry 对象的场景下,引入对象池来复用创建好的 Entry 对象实例可以抵消掉由频繁创建对象以及GC回收对象所带来的性能开销。
关于对象池的详细内容,感兴趣的同学可以回看下笔者的这篇文章?《详解Recycler对象池的精妙设计与实现》
- Handle<Entry> handle:默认实现类型为 DefaultHandle ,用于数据发送完毕后,对象池回收 Entry 对象。由对象池 RECYCLER 在创建 Entry 对象的时候传递进来。
- Entry next:ChannelOutboundBuffer 是一个单链表的结构,这里的 next 指针用于指向当前 Entry 节点的后继节点。
- Object msg:应用程序待发送的网络数据,这里 msg 的类型为 DirectByteBuffer 或者 FileRegion(用于通过零拷贝的方式网络传输文件)。
- ByteBuffer[] bufs:这里的 ByteBuffer 类型为 JDK NIO 原生的 ByteBuffer 类型,因为 Netty 最终发送数据是通过 JDK NIO 底层的 SocketChannel 进行发送,所以需要将 Netty 中实现的 ByteBuffer 类型转换为 JDK NIO ByteBuffer 类型。应用程序发送的 ByteBuffer 可能是一个也可能是多个,如果发送多个就用 ByteBuffer[] bufs 封装在 Entry 对象中,如果是一个就用 ByteBuffer buf 封装。
- int count :表示待发送数据 msg 中一共包含了多少个 ByteBuffer 需要发送。
- ChannelPromise promise:ChannelHandlerContext#write 异步写操作返回的 ChannelFuture。当 Netty 将待发送数据写入到 Socket 中时会通过这个 ChannelPromise 通知应用程序发送结果。
- long progress:表示当前的一个发送进度,已经发送了多少数据。
- long total:Entry中总共需要发送多少数据。注意:这个字段并不包含 Entry 对象的内存占用大小。只是表示待发送网络数据的大小。
- boolean cancelled:应用程序调用的 write 操作是否被取消。
- int pendingSize:表示待发送数据的内存占用总量。待发送数据在内存中的占用量分为两部分: Entry对象中所封装的待发送网络数据大小。Entry对象本身在内存中的占用量。
Entry内存占用总量.png
3.3.2 pendingSize的作用想象一下这样的一个场景,当由于网络拥塞或者 Netty 客户端负载很高导致网络数据的接收速度以及处理速度越来越慢,TCP 的滑动窗口不断缩小以减少网络数据的发送直到为 0,而 Netty 服务端却有大量频繁的写操作,不断的写入到 ChannelOutboundBuffer 中。
这样就导致了数据发送不出去但是 Netty 服务端又在不停的写数据,慢慢的就会撑爆 ChannelOutboundBuffer 导致OOM。这里主要指的是堆外内存的 OOM,因为 ChannelOutboundBuffer 中包裹的待发送数据全部存储在堆外内存中。
所以 Netty 就必须限制 ChannelOutboundBuffer 中的待发送数据的内存占用总量,不能让它无限增长。Netty 中定义了高低水位线用来表示 ChannelOutboundBuffer 中的待发送数据的内存占用量的上限和下限。注意:这里的内存既包括 JVM 堆内存占用也包括堆外内存占用。
- 当待发送数据的内存占用总量超过高水位线的时候,Netty 就会将 NioSocketChannel 的状态标记为不可写状态。否则就可能导致 OOM。
- 当待发送数据的内存占用总量低于低水位线的时候,Netty 会再次将 NioSocketChannel 的状态标记为可写状态。
那么我们用什么记录ChannelOutboundBuffer中的待发送数据的内存占用总量呢?
答案就是本小节要介绍的 pendingSize 字段。在谈到待发送数据的内存占用量时大部分同学普遍都会有一个误解就是只计算待发送数据的大小(msg中包含的字节数) 而忽略了 Entry 实例对象本身在内存中的占用量。
因为 Netty 会将待发送数据封装在 Entry 实例对象中,在大量频繁的写操作中会产生大量的 Entry 实例对象,所以 Entry 实例对象的内存占用是不可忽视的。
否则就会导致明明还没有到达高水位线,但是由于大量的 Entry 实例对象存在,从而发生OOM。
所以 pendingSize 的计算既要包含待发送数据的大小也要包含其 Entry 实例对象的内存占用大小,这样才能准确计算出 ChannelOutboundBuffer 中待发送数据的内存占用总量。
ChannelOutboundBuffer 中所有的 Entry 实例中的 pendingSize 之和就是待发送数据总的内存占用量。
public final class ChannelOutboundBuffer {
//ChannelOutboundBuffer中的待发送数据的内存占用总量
private volatile long totalPendingSize;
}
3.3.3 高低水位线
上小节提到 Netty 为了防止 ChannelOutboundBuffer 中的待发送数据内存占用无限制的增长从而导致 OOM ,所以引入了高低水位线,作为待发送数据内存占用的上限和下限。
那么高低水位线具体设置多大呢 ? 我们来看一下 DefaultChannelConfig 中的配置。
public class DefaultChannelConfig implements ChannelConfig {
//ChannelOutboundBuffer中的高低水位线
private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
}
public final class WriteBufferWaterMark {
private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
public static final WriteBufferWaterMark DEFAULT =
new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false);
WriteBufferWaterMark(int low, int high, boolean validate) {
..........省略校验逻辑.........
this.low = low;
this.high = high;
}
}
我们看到 ChannelOutboundBuffer 中的高水位线设置的大小为 64 KB,低水位线设置的是 32 KB。
这也就意味着每个 Channel 中的待发送数据如果超过 64 KB。Channel 的状态就会变为不可写状态。当内存占用量低于 32 KB时,Channel 的状态会再次变为可写状态。
3.3.4 Entry实例对象在JVM中占用内存大小前边提到 pendingSize 的作用主要是记录当前待发送数据的内存占用总量从而可以预警 OOM 的发生。
待发送数据的内存占用分为:待发送数据 msg 的内存占用大小以及 Entry 对象本身在JVM中的内存占用。
那么 Entry 对象本身的内存占用我们该如何计算呢?
要想搞清楚这个问题,大家需要先了解一下 Java 对象内存布局的相关知识。关于这部分背景知识,笔者已经在 ?《一文聊透对象在JVM中的内存布局,以及内存对齐和压缩指针的原理及应用》这篇文章中给出了详尽的阐述,想深入了解这块的同学可以看下这篇文章。
这里笔者只从这篇文章中提炼一些关于计算 Java 对象占用内存大小相关的内容。
在关于 Java 对象内存布局这篇文章中我们提到,对于Java普通对象来说内存中的布局由:对象头 实例数据区 Padding,这三部分组成。
其中对象头由存储对象运行时信息的 MarkWord 以及指向对象类型元信息的类型指针组成。
MarkWord 用来存放:hashcode,GC 分代年龄,锁状态标志,线程持有的锁,偏向线程 Id,偏向时间戳等。在 32 位操作系统和 64 位操作系统中 MarkWord 分别占用 4B 和 8B 大小的内存。
Java 对象头中的类型指针还有实例数据区的对象引用,在64 位系统中开启压缩指针的情况下(-XX: UseCompressedOops)占用 4B 大小。在关闭压缩指针的情况下(-XX:-UseCompressedOops)占用 8B 大小。
实例数据区用于存储 Java 类中定义的实例字段,包括所有父类中的实例字段以及对象引用。
在实例数据区中对象字段之间的排列以及内存对齐需要遵循三个字段重排列规则:
- 规则1:如果一个字段占用X个字节,那么这个字段的偏移量OFFSET需要对齐至NX。
- 规则2:在开启了压缩指针的 64 位 JVM 中,Java 类中的第一个字段的 OFFSET 需要对齐至 4N,在关闭压缩指针的情况下类中第一个字段的OFFSET需要对齐至 8N。
- 规则3:JVM 默认分配字段的顺序为:long / double,int / float,short / char,byte / boolean,oops(Ordianry Object Point 引用类型指针),并且父类中定义的实例变量会出现在子类实例变量之前。当设置JVM参数 -XX CompactFields 时(默认),占用内存小于 long / double 的字段会允许被插入到对象中第一个 long / double 字段之前的间隙中,以避免不必要的内存填充。
还有一个重要规则就是 Java 虚拟机堆中对象的起始地址需要对齐至 8 的倍数(可由JVM参数 -XX:ObjectAlignmentInBytes 控制,默认为 8 )。
在了解上述字段排列以及对象之间的内存对齐规则后,我们分别以开启压缩指针和关闭压缩指针两种情况,来对 Entry 对象的内存布局进行分析并计算对象占用内存大小。
static final class Entry {
.............省略static字段RECYCLER.........
//DefaultHandle用于回收对象
private final Handle<Entry> handle;
//ChannelOutboundBuffer下一个节点
Entry next;
//待发送数据
Object msg;
//msg 转换为 jdk nio 中的byteBuffer
ByteBuffer[] bufs;
ByteBuffer buf;
//异步write操作的future
ChannelPromise promise;
//已发送了多少
long progress;
//总共需要发送多少,不包含entry对象大小。
long total;
//pendingSize表示entry对象在堆中需要的内存总量 待发送数据大小 entry对象本身在堆中占用内存大小(96)
int pendingSize;
//msg中包含了几个jdk nio bytebuffer
int count = -1;
//write操作是否被取消
boolean cancelled;
}
我们看到 Entry 对象中一共有 11 个实例字段,其中 2 个 long 型字段,2 个 int 型字段,1 个 boolean 型字段,6 个对象引用。
默认情况下JVM参数 -XX CompactFields 是开启的。
开启指针压缩 -XX: UseCompressedOopsimage.png
Entry 对象的内存布局中开头先是 8 个字节的 MarkWord,然后是 4 个字节的类型指针(开启压缩指针)。
在实例数据区中对象的排列规则需要符合规则3,也就是字段之间的排列顺序需要遵循 long > int > boolean > oop(对象引用)。
根据规则 3 Entry对象实例数据区第一个字段应该是 long progress,但根据规则1 long 型字段的 OFFSET 需要对齐至 8 的倍数,并且根据 规则2 在开启压缩指针的情况下,对象的第一个字段 OFFSET 需要对齐至 4 的倍数。所以字段long progress 的 OFFET = 16,这就必然导致了在对象头与字段 long progress 之间需要由 4 字节的字节填充(OFFET = 12处发生字节填充)。
但是 JVM 默认开启了 -XX CompactFields,根据 规则3 占用内存小于 long / double 的字段会允许被插入到对象中第一个 long / double 字段之前的间隙中,以避免不必要的内存填充。
所以位于后边的字段 int pendingSize 插入到了 OFFET = 12 位置处,避免了不必要的字节填充。
在 Entry 对象的实例数据区中紧接着基础类型字段后面跟着的就是 6 个对象引用字段(开启压缩指针占用 4 个字节)。
大家一定注意到 OFFSET = 37 处本应该存放的是字段 private final Handle<Entry> handle 但是却被填充了 3 个字节。这是为什么呢?
根据字段重排列规则1:引用字段 private final Handle<Entry> handle 占用 4 个字节(开启压缩指针的情况),所以需要对齐至4的倍数。所以需要填充3个字节,使得引用字段 private final Handle<Entry> handle 位于 OFFSET = 40 处。
根据以上这些规则最终计算出来在开启压缩指针的情况下Entry对象在堆中占用内存大小为64字节
关闭指针压缩 -XX:-UseCompressedOops在分析完 Entry 对象在开启压缩指针情况下的内存布局情况后,我想大家现在对前边介绍的字段重排列的三个规则理解更加清晰了,那么我们基于这个基础来分析下在关闭压缩指针的情况下 Entry 对象的内存布局。