netty文件传输,netty传输二进制文件

首页 > 实用技巧 > 作者:YD1662024-02-21 13:35:28

ChannelOutboundBuffer中缓存待发送数据.png

protected abstract class AbstractUnsafe implements Unsafe { //待发送数据缓冲队列 Netty是全异步框架,所以这里需要一个缓冲队列来缓存用户需要发送的数据 private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this); @Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //获取当前channel对应的待发送数据缓冲队列(支持用户异步写入的核心关键) ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ..........省略.................. int size; try { //过滤message类型 这里只会接受DirectBuffer或者fileRegion类型的msg msg = filterOutboundMessage(msg); //计算当前msg的大小 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { ..........省略.................. } //将msg 加入到Netty中的待写入数据缓冲队列ChannelOutboundBuffer中 outboundBuffer.addMessage(msg, size, promise); } }

众所周知 Netty 是一个异步事件驱动的网络框架,在 Netty 中所有的 IO 操作全部都是异步的,当然也包括本小节介绍的 write 操作,为了保证异步执行 write 操作,Netty 定义了一个待发送数据缓冲队列 ChannelOutboundBuffer ,Netty 将这些用户需要发送的网络数据在写入到 Socket 之前,先放在 ChannelOutboundBuffer 中缓存。

每个客户端 NioSocketChannel 对应一个 ChannelOutboundBuffer 待发送数据缓冲队列

3.2.1 filterOutboundMessage

ChannelOutboundBuffer 只会接受 ByteBuffer 类型以及 FileRegion 类型的 msg 数据。

FileRegion 是Netty定义的用来通过零拷贝的方式网络传输文件数据。本文我们主要聚焦普通网络数据 ByteBuffer 的发送。

所以在将 msg 写入到 ChannelOutboundBuffer 之前,我们需要检查待写入 msg 的类型。确保是 ChannelOutboundBuffer 可接受的类型。

@Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " StringUtil.simpleClassName(msg) EXPECTED_TYPES); }

在网络数据传输的过程中,Netty为了减少数据从 堆内内存 到 堆外内存 的拷贝以及缓解GC的压力,所以这里必须采用 DirectByteBuffer 使用堆外内存来存放网络发送数据。

3.2.2 estimatorHandle计算当前msg的大小

public class DefaultChannelPipeline implements ChannelPipeline { //原子更新estimatorHandle字段 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); //计算要发送msg大小的handler private volatile MessageSizeEstimator.Handle estimatorHandle; final MessageSizeEstimator.Handle estimatorHandle() { MessageSizeEstimator.Handle handle = estimatorHandle; if (handle == null) { handle = channel.config().getMessageSizeEstimator().newHandle(); if (!ESTIMATOR.compareAndSet(this, null, handle)) { handle = estimatorHandle; } } return handle; } }

在 pipeline 中会有一个 estimatorHandle 专门用来计算待发送 ByteBuffer 的大小。这个 estimatorHandle 会在 pipeline 对应的 Channel 中的配置类创建的时候被初始化。

这里 estimatorHandle 的实际类型为DefaultMessageSizeEstimator#HandleImpl。

public final class DefaultMessageSizeEstimator implements MessageSizeEstimator { private static final class HandleImpl implements Handle { private final int unknownSize; private HandleImpl(int unknownSize) { this.unknownSize = unknownSize; } @Override public int size(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes(); } if (msg instanceof FileRegion) { return 0; } return unknownSize; } }

这里我们看到 ByteBuffer 的大小即为 Buffer 中未读取的字节数 writerIndex - readerIndex 。

当我们验证了待写入数据 msg 的类型以及计算了 msg 的大小后,我们就可以通过 ChannelOutboundBuffer#addMessage方法将 msg 写入到ChannelOutboundBuffer(待发送数据缓冲队列)中。

write 事件处理的最终逻辑就是将待发送数据写入到 ChannelOutboundBuffer 中,下面我们就来看下这个 ChannelOutboundBuffer 内部结构到底是什么样子的?

3.3 ChannelOutboundBuffer

ChannelOutboundBuffer 其实是一个单链表结构的缓冲队列,链表中的节点类型为 Entry ,由于 ChannelOutboundBuffer 在 Netty 中的作用就是缓存应用程序待发送的网络数据,所以 Entry 中封装的就是待写入 Socket 中的网络发送数据相关的信息,以及 ChannelHandlerContext#write 方法中返回给用户的 ChannelPromise 。这样可以在数据写入Socket之后异步通知应用程序。

此外 ChannelOutboundBuffer 中还封装了三个重要的指针:

这三个指针在初始化的时候均为 null 。

netty文件传输,netty传输二进制文件(13)

ChannelOutboundBuffer结构.png

3.3.1 Entry

Entry 作为 ChannelOutboundBuffer 链表结构中的节点元素类型,里边封装了待发送数据的各种信息,ChannelOutboundBuffer 其实就是对 Entry 结构的组织和操作。因此理解 Entry 结构是理解整个 ChannelOutboundBuffer 运作流程的基础。

下面我们就来看下 Entry 结构具体封装了哪些待发送数据的信息。

static final class Entry { //Entry的对象池,用来创建和回收Entry对象 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }); //DefaultHandle用于回收对象 private final Handle<Entry> handle; //ChannelOutboundBuffer下一个节点 Entry next; //待发送数据 Object msg; //msg 转换为 jdk nio 中的byteBuffer ByteBuffer[] bufs; ByteBuffer buf; //异步write操作的future ChannelPromise promise; //已发送了多少 long progress; //总共需要发送多少,不包含entry对象大小。 long total; //pendingSize表示entry对象在堆中需要的内存总量 待发送数据大小 entry对象本身在堆中占用内存大小(96) int pendingSize; //msg中包含了几个jdk nio bytebuffer int count = -1; //write操作是否被取消 boolean cancelled; }

我们看到Entry结构中一共有12个字段,其中1个静态字段和11个实例字段。

下面笔者就为大家介绍下这12个字段的含义及其作用,其中有些字段会在后面的场景中使用到,这里大家可能对有些字段理解起来比较模糊,不过没关系,这里能看懂多少是多少,不理解也没关系,这里介绍只是为了让大家混个眼熟,在后面流程的讲解中,笔者还会重新提到这些字段。

关于对象池的详细内容,感兴趣的同学可以回看下笔者的这篇文章?《详解Recycler对象池的精妙设计与实现》

netty文件传输,netty传输二进制文件(14)

Entry内存占用总量.png

3.3.2 pendingSize的作用

想象一下这样的一个场景,当由于网络拥塞或者 Netty 客户端负载很高导致网络数据的接收速度以及处理速度越来越慢,TCP 的滑动窗口不断缩小以减少网络数据的发送直到为 0,而 Netty 服务端却有大量频繁的写操作,不断的写入到 ChannelOutboundBuffer 中。

这样就导致了数据发送不出去但是 Netty 服务端又在不停的写数据,慢慢的就会撑爆 ChannelOutboundBuffer 导致OOM。这里主要指的是堆外内存的 OOM,因为 ChannelOutboundBuffer 中包裹的待发送数据全部存储在堆外内存中。

所以 Netty 就必须限制 ChannelOutboundBuffer 中的待发送数据的内存占用总量,不能让它无限增长。Netty 中定义了高低水位线用来表示 ChannelOutboundBuffer 中的待发送数据的内存占用量的上限和下限。注意:这里的内存既包括 JVM 堆内存占用也包括堆外内存占用。

那么我们用什么记录ChannelOutboundBuffer中的待发送数据的内存占用总量呢

答案就是本小节要介绍的 pendingSize 字段。在谈到待发送数据的内存占用量时大部分同学普遍都会有一个误解就是只计算待发送数据的大小(msg中包含的字节数) 而忽略了 Entry 实例对象本身在内存中的占用量。

因为 Netty 会将待发送数据封装在 Entry 实例对象中,在大量频繁的写操作中会产生大量的 Entry 实例对象,所以 Entry 实例对象的内存占用是不可忽视的。

否则就会导致明明还没有到达高水位线,但是由于大量的 Entry 实例对象存在,从而发生OOM。

所以 pendingSize 的计算既要包含待发送数据的大小也要包含其 Entry 实例对象的内存占用大小,这样才能准确计算出 ChannelOutboundBuffer 中待发送数据的内存占用总量。

ChannelOutboundBuffer 中所有的 Entry 实例中的 pendingSize 之和就是待发送数据总的内存占用量。

public final class ChannelOutboundBuffer { //ChannelOutboundBuffer中的待发送数据的内存占用总量 private volatile long totalPendingSize; } 3.3.3 高低水位线

上小节提到 Netty 为了防止 ChannelOutboundBuffer 中的待发送数据内存占用无限制的增长从而导致 OOM ,所以引入了高低水位线,作为待发送数据内存占用的上限和下限。

那么高低水位线具体设置多大呢 ? 我们来看一下 DefaultChannelConfig 中的配置。

public class DefaultChannelConfig implements ChannelConfig { //ChannelOutboundBuffer中的高低水位线 private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; }

public final class WriteBufferWaterMark { private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024; private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024; public static final WriteBufferWaterMark DEFAULT = new WriteBufferWaterMark(DEFAULT_LOW_WATER_MARK, DEFAULT_HIGH_WATER_MARK, false); WriteBufferWaterMark(int low, int high, boolean validate) { ..........省略校验逻辑......... this.low = low; this.high = high; } }

我们看到 ChannelOutboundBuffer 中的高水位线设置的大小为 64 KB,低水位线设置的是 32 KB。

这也就意味着每个 Channel 中的待发送数据如果超过 64 KB。Channel 的状态就会变为不可写状态。当内存占用量低于 32 KB时,Channel 的状态会再次变为可写状态。

3.3.4 Entry实例对象在JVM中占用内存大小

前边提到 pendingSize 的作用主要是记录当前待发送数据的内存占用总量从而可以预警 OOM 的发生。

待发送数据的内存占用分为:待发送数据 msg 的内存占用大小以及 Entry 对象本身在JVM中的内存占用。

那么 Entry 对象本身的内存占用我们该如何计算呢?

要想搞清楚这个问题,大家需要先了解一下 Java 对象内存布局的相关知识。关于这部分背景知识,笔者已经在 ?《一文聊透对象在JVM中的内存布局,以及内存对齐和压缩指针的原理及应用》这篇文章中给出了详尽的阐述,想深入了解这块的同学可以看下这篇文章。

这里笔者只从这篇文章中提炼一些关于计算 Java 对象占用内存大小相关的内容。

在关于 Java 对象内存布局这篇文章中我们提到,对于Java普通对象来说内存中的布局由:对象头 实例数据区 Padding,这三部分组成。

其中对象头由存储对象运行时信息的 MarkWord 以及指向对象类型元信息的类型指针组成。

MarkWord 用来存放:hashcode,GC 分代年龄,锁状态标志,线程持有的锁,偏向线程 Id,偏向时间戳等。在 32 位操作系统和 64 位操作系统中 MarkWord 分别占用 4B 和 8B 大小的内存。

Java 对象头中的类型指针还有实例数据区的对象引用,在64 位系统中开启压缩指针的情况下(-XX: UseCompressedOops)占用 4B 大小。在关闭压缩指针的情况下(-XX:-UseCompressedOops)占用 8B 大小。

实例数据区用于存储 Java 类中定义的实例字段,包括所有父类中的实例字段以及对象引用。

在实例数据区中对象字段之间的排列以及内存对齐需要遵循三个字段重排列规则:

还有一个重要规则就是 Java 虚拟机堆中对象的起始地址需要对齐至 8 的倍数(可由JVM参数 -XX:ObjectAlignmentInBytes 控制,默认为 8 )。

在了解上述字段排列以及对象之间的内存对齐规则后,我们分别以开启压缩指针和关闭压缩指针两种情况,来对 Entry 对象的内存布局进行分析并计算对象占用内存大小。

static final class Entry { .............省略static字段RECYCLER......... //DefaultHandle用于回收对象 private final Handle<Entry> handle; //ChannelOutboundBuffer下一个节点 Entry next; //待发送数据 Object msg; //msg 转换为 jdk nio 中的byteBuffer ByteBuffer[] bufs; ByteBuffer buf; //异步write操作的future ChannelPromise promise; //已发送了多少 long progress; //总共需要发送多少,不包含entry对象大小。 long total; //pendingSize表示entry对象在堆中需要的内存总量 待发送数据大小 entry对象本身在堆中占用内存大小(96) int pendingSize; //msg中包含了几个jdk nio bytebuffer int count = -1; //write操作是否被取消 boolean cancelled; }

我们看到 Entry 对象中一共有 11 个实例字段,其中 2 个 long 型字段,2 个 int 型字段,1 个 boolean 型字段,6 个对象引用。

默认情况下JVM参数 -XX CompactFields 是开启的。

开启指针压缩 -XX: UseCompressedOops

netty文件传输,netty传输二进制文件(15)

image.png

Entry 对象的内存布局中开头先是 8 个字节的 MarkWord,然后是 4 个字节的类型指针(开启压缩指针)。

在实例数据区中对象的排列规则需要符合规则3,也就是字段之间的排列顺序需要遵循 long > int > boolean > oop(对象引用)。

根据规则 3 Entry对象实例数据区第一个字段应该是 long progress,但根据规则1 long 型字段的 OFFSET 需要对齐至 8 的倍数,并且根据 规则2 在开启压缩指针的情况下,对象的第一个字段 OFFSET 需要对齐至 4 的倍数。所以字段long progress 的 OFFET = 16,这就必然导致了在对象头与字段 long progress 之间需要由 4 字节的字节填充(OFFET = 12处发生字节填充)。

但是 JVM 默认开启了 -XX CompactFields,根据 规则3 占用内存小于 long / double 的字段会允许被插入到对象中第一个 long / double 字段之前的间隙中,以避免不必要的内存填充。

所以位于后边的字段 int pendingSize 插入到了 OFFET = 12 位置处,避免了不必要的字节填充。

在 Entry 对象的实例数据区中紧接着基础类型字段后面跟着的就是 6 个对象引用字段(开启压缩指针占用 4 个字节)。

大家一定注意到 OFFSET = 37 处本应该存放的是字段 private final Handle<Entry> handle 但是却被填充了 3 个字节。这是为什么呢?

根据字段重排列规则1:引用字段 private final Handle<Entry> handle 占用 4 个字节(开启压缩指针的情况),所以需要对齐至4的倍数。所以需要填充3个字节,使得引用字段 private final Handle<Entry> handle 位于 OFFSET = 40 处。

根据以上这些规则最终计算出来在开启压缩指针的情况下Entry对象在堆中占用内存大小为64字节

关闭指针压缩 -XX:-UseCompressedOops

在分析完 Entry 对象在开启压缩指针情况下的内存布局情况后,我想大家现在对前边介绍的字段重排列的三个规则理解更加清晰了,那么我们基于这个基础来分析下在关闭压缩指针的情况下 Entry 对象的内存布局。

netty文件传输,netty传输二进制文件(16)

上一页12345下一页

栏目热文

文档排行

本站推荐

Copyright © 2018 - 2021 www.yd166.com., All Rights Reserved.