netty文件传输,netty传输二进制文件

首页 > 实用技巧 > 作者:YD1662024-02-21 13:35:28

image.png

首先 Entry 对象在内存布局中的开头依然是由 8 个字节的 MarkWord 还有 8 个字节的类型指针(关闭压缩指针)组成的对象头。

我们看到在 OFFSET = 41 处发生了字节填充,原因是在关闭压缩指针的情况下,对象引用占用内存大小变为 8 个字节,根据规则1: 引用字段 private final Handle<Entry> handle 的 OFFET 需要对齐至 8 的倍数,所以需要在该引用字段之前填充 7 个字节,使得引用字段 private final Handle<Entry> handle 的OFFET = 48 。

综合字段重排列的三个规则最终计算出来在关闭压缩指针的情况下Entry对象在堆中占用内存大小为96字节

3.3.5 向ChannelOutboundBuffer中缓存待发送数据

在介绍完 ChannelOutboundBuffer 的基本结构之后,下面就来到了 Netty 处理 write 事件的最后一步,我们来看下用户的待发送数据是如何被添加进 ChannelOutboundBuffer 中的。

public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); } 3.3.5.1 创建Entry对象来封装待发送数据信息

通过前边的介绍我们了解到当用户调用 ctx.write(msg) 之后,write 事件开始在pipeline中从当前 ChannelHandler开始一直向前进行传播,最终在 HeadContext 中将待发送数据写入到 channel 对应的写缓冲区 ChannelOutboundBuffer 中。

而 ChannelOutboundBuffer 是由 Entry 结构组成的一个单链表,Entry 结构封装了用户待发送数据的各种信息。

这里首先我们需要为待发送数据创建 Entry 对象,而在?《详解Recycler对象池的精妙设计与实现》一文中我们介绍对象池时,提到 Netty 作为一个高性能高吞吐的网络框架要面对海量的 IO 处理操作,这种场景下会频繁的创建大量的 Entry 对象,而对象的创建及其回收时需要性能开销的,尤其是在面对大量频繁的创建对象场景下,这种开销会进一步被放大,所以 Netty 引入了对象池来管理 Entry 对象实例从而避免 Entry 对象频繁创建以及 GC 带来的性能开销。

既然 Entry 对象已经被对象池接管,那么它在对象池外面是不能被直接创建的,其构造函数是私有类型,并提供一个静态方法 newInstance 供外部线程从对象池中获取 Entry 对象。这在?《详解Recycler对象池的精妙设计与实现》一文中介绍池化对象的设计时也有提到过。

static final class Entry { //静态变量引用类型地址 这个是在Klass Point(类型指针)中定义 8字节(开启指针压缩 为4字节) private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() { @Override public Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }); //Entry对象只能通过对象池获取,不可外部自行创建 private Entry(Handle<Entry> handle) { this.handle = handle; } //不考虑指针压缩的大小 entry对象在堆中占用的内存大小为96 //如果开启指针压缩,entry对象在堆中占用的内存大小 会是64 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96); static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; //待发数据数据大小 entry对象大小 entry.pendingSize = size CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD; entry.total = total; entry.promise = promise; return entry; } .......................省略................ }

  1. 通过池化对象 Entry 中持有的对象池 RECYCLER ,从对象池中获取 Entry 对象实例。
  2. 将用户待发送数据 msg(DirectByteBuffer),待发送数据大小:total ,本次发送数据的 channelFuture,以及该 Entry 对象的 pendingSize 统统封装在 Entry 对象实例的相应字段中。

这里需要特殊说明一点的是关于 pendingSize 的计算方式,之前我们提到 pendingSize 中所计算的内存占用一共包含两部分:

netty文件传输,netty传输二进制文件(17)

Entry内存占用总量.png

而在《3.3.4 Entry实例对象在JVM中占用内存大小》小节中我们介绍到,Entry 对象在内存中的占用大小在开启压缩指针的情况下(-XX: UseCompressedOops)占用 64 字节,在关闭压缩指针的情况下(-XX:-UseCompressedOops)占用 96 字节。

字段 CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD 表示的就是 Entry 对象在内存中的占用大小,Netty这里默认是 96 字节,当然如果我们的应用程序开启了指针压缩,我们可以通过 JVM 启动参数 -D io.netty.transport.outboundBufferEntrySizeOverhead 指定为 64 字节。

3.3.5.2 将Entry对象添加进ChannelOutboundBuffer中

netty文件传输,netty传输二进制文件(18)

ChannelOutboundBuffer结构.png

if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; }

在《3.3 ChannelOutboundBuffer》小节一开始,我们介绍了 ChannelOutboundBuffer 中最重要的三个指针,这里涉及到的两个指针分别是:

通过 unflushedEntry 和 tailEntry 可以定位出待发送数据的范围。Channel 中的每一次 write 事件,最终都会将待发送数据插入到 ChannelOutboundBuffer 的尾结点处。

3.3.5.3 incrementPendingOutboundBytes

在将 Entry 对象添加进 ChannelOutboundBuffer 之后,就需要更新用于记录当前 ChannelOutboundBuffer 中关于待发送数据所占内存总量的水位线指示。

如果更新后的水位线超过了 Netty 指定的高水位线 DEFAULT_HIGH_WATER_MARK = 64 * 1024,则需要将当前 Channel 的状态设置为不可写,并在 pipeline 中传播 ChannelWritabilityChanged 事件,注意该事件是一个 inbound 事件。

netty文件传输,netty传输二进制文件(19)

响应channelWritabilityChanged事件.png

public final class ChannelOutboundBuffer { //ChannelOutboundBuffer中的待发送数据的内存占用总量 : 所有Entry对象本身所占用内存大小 所有待发送数据的大小 private volatile long totalPendingSize; //水位线指针 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } //更新总共待写入数据的大小 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); //如果待写入的数据 大于 高水位线 64 * 1024 则设置当前channel为不可写 由用户自己决定是否继续写入 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { //设置当前channel状态为不可写,并触发fireChannelWritabilityChanged事件 setUnwritable(invokeLater); } } }

volatile 关键字在 Java 内存模型中只能保证变量的可见性,以及禁止指令重排序。但无法保证多线程更新的原子性,这里我们可以通过AtomicLongFieldUpdater 来帮助 totalPendingSize 字段实现原子性的更新。

// 0表示channel可写,1表示channel不可写 private volatile int unwritable; private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable"); private void setUnwritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue | 1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue == 0) { //触发fireChannelWritabilityChanged事件 表示当前channel变为不可写 fireChannelWritabilityChanged(invokeLater); } break; } } }

当 ChannelOutboundBuffer 中的内存占用水位线 totalPendingSize 已经超过高水位线时,调用该方法将当前 Channel 的状态设置为不可写状态。

unwritable == 0 表示当前channel可写,unwritable == 1 表示当前channel不可写。

channel 可以通过调用 isWritable 方法来判断自身当前状态是否可写。

public boolean isWritable() { return unwritable == 0; }

当 Channel 的状态是首次从可写状态变为不可写状态时,就会在 channel 对应的 pipeline 中传播 ChannelWritabilityChanged 事件。

private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { pipeline.fireChannelWritabilityChanged(); } }

用户可以在自定义的 ChannelHandler 中实现 channelWritabilityChanged 事件回调方法,来针对 Channel 的可写状态变化做出不同的处理。

public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isWritable()) { ...........当前channel可写......... } else { ...........当前channel不可写......... } } }

到这里 write 事件在 pipeline 中的传播,笔者就为大家介绍完了,下面我们来看下另一个重要的 flush 事件的处理过程。

4. flush

从前面 Netty 对 write 事件的处理过程中,我们可以看到当用户调用 ctx.write(msg) 方法之后,Netty 只是将用户要发送的数据临时写到 channel 对应的待发送缓冲队列 ChannelOutboundBuffer 中,然而并不会将数据写入 Socket 中。

而当一次 read 事件完成之后,我们会调用 ctx.flush() 方法将 ChannelOutboundBuffer 中的待发送数据写入 Socket 中的发送缓冲区中,从而将数据发送出去。

public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelReadComplete(ChannelHandlerContext ctx) { //本次OP_READ事件处理完毕 ctx.flush(); } } 4.1 flush事件的传播

netty文件传输,netty传输二进制文件(20)

上一页12345下一页

栏目热文

文档排行

本站推荐

Copyright © 2018 - 2021 www.yd166.com., All Rights Reserved.