netty文件传输,netty传输二进制文件

首页 > 实用技巧 > 作者:YD1662024-02-21 13:35:28

pipeline结构.png

flush 事件和 write 事件一样都是 oubound 事件,所以它们的传播方向都是从后往前在 pipeline 中传播。

触发 flush 事件传播的同样也有两个方法:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint { @Override public ChannelHandlerContext flush() { //向前查找覆盖flush方法的Outbound类型的ChannelHandler final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH); //获取执行ChannelHandler的executor,在初始化pipeline的时候设置,默认为Reactor线程 EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeFlush(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false); } return this; } }

这里的逻辑和 write 事件传播的逻辑基本一样,也是首先通过findContextOutbound(MASK_FLUSH) 方法从当前 ChannelHandler 开始从 pipeline 中向前查找出第一个 ChannelOutboundHandler 类型的并且实现 flush 事件回调方法的 ChannelHandler 。注意这里传入的执行资格掩码为 MASK_FLUSH。

执行ChannelHandler中事件回调方法的线程必须是通过pipeline#addLast(EventExecutorGroup group, ChannelHandler... handlers)为 ChannelHandler 指定的 executor。如果不指定,默认的 executor 为 channel 绑定的 reactor 线程。

如果当前线程不是 ChannelHandler 指定的 executor,则需要将 invokeFlush() 方法的调用封装成 Task 交给指定的 executor 执行。

4.1.1 触发nextChannelHandler的flush方法回调

private void invokeFlush() { if (invokeHandler()) { invokeFlush0(); } else { //如果该ChannelHandler并没有加入到pipeline中则继续向前传递flush事件 flush(); } }

这里和 write 事件的相关处理一样,首先也是需要调用 invokeHandler() 方法来判断这个 nextChannelHandler 是否在 pipeline 中被正确的初始化。

如果 nextChannelHandler 中的 handlerAdded 方法并没有被回调过,那么这里就只能跳过 nextChannelHandler,并调用 ChannelHandlerContext#flush 方法继续向前传播flush事件。

如果 nextChannelHandler 中的 handlerAdded 方法已经被回调过,说明 nextChannelHandler 在 pipeline 中已经被正确的初始化好,则直接调用nextChannelHandler 的 flush 事件回调方法。

private void invokeFlush0() { try { ((ChannelOutboundHandler) handler()).flush(this); } catch (Throwable t) { invokeExceptionCaught(t); } }

这里有一点和 write 事件处理不同的是,当调用 nextChannelHandler 的 flush 回调出现异常的时候,会触发 nextChannelHandler 的 exceptionCaught 回调。

private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this, cause); } catch (Throwable error) { if (logger.isDebugEnabled()) { logger.debug(....相关日志打印......); } else if (logger.isWarnEnabled()) { logger.warn(...相关日志打印......)); } } } else { fireExceptionCaught(cause); } }

而其他 outbound 类事件比如 write 事件在传播的过程中发生异常,只是回调通知相关的 ChannelFuture。并不会触发 exceptionCaught 事件的传播。

4.2 flush事件的处理

netty文件传输,netty传输二进制文件(21)

客户端channel pipeline结构.png

最终flush事件会在pipeline中一直向前传播至HeadContext中,并在 HeadContext 里调用 channel 的 unsafe 类完成 flush 事件的最终处理逻辑。

final class HeadContext extends AbstractChannelHandlerContext { @Override public void flush(ChannelHandlerContext ctx) { unsafe.flush(); } }

下面就真正到了 Netty 处理 flush 事件的地方。

protected abstract class AbstractUnsafe implements Unsafe { @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //channel以关闭 if (outboundBuffer == null) { return; } //将flushedEntry指针指向ChannelOutboundBuffer头结点,此时变为即将要flush进Socket的数据队列 outboundBuffer.addFlush(); //将待写数据写进Socket flush0(); } } 4.2.1 ChannelOutboundBuffer#addFlush

netty文件传输,netty传输二进制文件(22)

ChannelOutboundBuffer结构.png

这里就到了真正要发送数据的时候了,在 addFlush 方法中会将 flushedEntry 指针指向 unflushedEntry 指针表示的第一个未被 flush 的 Entry 节点。并将 unflushedEntry 指针置为空,准备开始 flush 发送数据流程。

此时 ChannelOutboundBuffer 由待发送数据的缓冲队列变为了即将要 flush 进 Socket 的数据队列

这样在 flushedEntry 与 tailEntry 之间的 Entry 节点即为本次 flush 操作需要发送的数据范围。

public void addFlush() { Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { flushedEntry = entry; } do { flushed ; //如果当前entry对应的write操作被用户取消,则释放msg,并降低channelOutboundBuffer水位线 if (!entry.promise.setUncancellable()) { int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }

在 flush 发送数据流程开始时,数据的发送流程就不能被取消了,在这之前我们都是可以通过 ChannelPromise 取消数据发送流程的。

所以这里需要对 ChannelOutboundBuffer 中所有 Entry 节点包裹的 ChannelPromise 设置为不可取消状态。

public interface Promise<V> extends Future<V> { /** * 设置当前future为不可取消状态 * * 返回true的情况: * 1:成功的将future设置为uncancellable * 2:当future已经成功完成 * * 返回false的情况: * 1:future已经被取消,则不能在设置 uncancellable 状态 * */ boolean setUncancellable(); }

如果这里的 setUncancellable() 方法返回 false 则说明在这之前用户已经将 ChannelPromise 取消掉了,接下来就需要调用 entry.cancel() 方法来释放为待发送数据 msg 分配的堆外内存。

static final class Entry { //write操作是否被取消 boolean cancelled; int cancel() { if (!cancelled) { cancelled = true; int pSize = pendingSize; // release message and replace with an empty buffer ReferenceCountUtil.safeRelease(msg); msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; bufs = null; buf = null; return pSize; } return 0; } }

当 Entry 对象被取消后,就需要减少 ChannelOutboundBuffer 的内存占用总量的水位线 totalPendingSize。

private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); //水位线指针.ChannelOutboundBuffer中的待发送数据的内存占用总量 : 所有Entry对象本身所占用内存大小 所有待发送数据的大小 private volatile long totalPendingSize; private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }

当更新后的水位线低于低水位线 DEFAULT_LOW_WATER_MARK = 32 * 1024 时,就将当前 channel 设置为可写状态。

private void setWritable(boolean invokeLater) { for (;;) { final int oldValue = unwritable; final int newValue = oldValue & ~1; if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { if (oldValue != 0 && newValue == 0) { fireChannelWritabilityChanged(invokeLater); } break; } } }

当 Channel 的状态是第一次从不可写状态变为可写状态时,Netty 会在 pipeline 中再次触发 ChannelWritabilityChanged 事件的传播。

netty文件传输,netty传输二进制文件(23)

响应channelWritabilityChanged事件.png

4.2.2 发送数据前的最后检查---flush0

flush0 方法这里主要做的事情就是检查当 channel 的状态是否正常,如果 channel 状态一切正常,则调用 doWrite 方法发送数据。

protected abstract class AbstractUnsafe implements Unsafe { //是否正在进行flush操作 private boolean inFlush0; protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; //channel已经关闭或者outboundBuffer为空 if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; if (!isActive()) { try { if (!outboundBuffer.isEmpty()) { if (isOpen()) { //当前channel处于disConnected状态 通知promise 写入失败 并触发channelWritabilityChanged事件 outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { //当前channel处于关闭状态 通知promise 写入失败 但不触发channelWritabilityChanged事件 outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false); } } } finally { inFlush0 = false; } return; } try { //写入Socket doWrite(outboundBuffer); } catch (Throwable t) { handleWriteError(t); } finally { inFlush0 = false; } } }

还记得我们在?《Netty如何高效接收网络连接》一文中提到过的 NioSocketChannel 的 active 状态有哪些条件吗??

@Override public boolean isActive() { SocketChannel ch = javaChannel(); return ch.isOpen() && ch.isConnected(); }

NioSocketChannel 处于 active 状态的条件必须是当前 NioSocketChannel 是 open 的同时处于 connected 状态。

当 channel 处于 disConnected 状态时,用户可以进行 write 操作但不能进行 flush 操作。

4.2.2.1 ChannelOutboundBuffer#failFlushed

public final class ChannelOutboundBuffer { private boolean inFail; void failFlushed(Throwable cause, boolean notify) { if (inFail) { return; } try { inFail = true; for (;;) { if (!remove0(cause, notify)) { break; } } } finally { inFail = false; } } }

该方法用于在 Netty 在发送数据的时候,如果发现当前 channel 处于非活跃状态,则将 ChannelOutboundBuffer 中 flushedEntry 与tailEntry 之间的 Entry 对象节点全部删除,并释放发送数据占用的内存空间,同时回收 Entry 对象实例。

4.2.2.2 ChannelOutboundBuffer#remove0

private boolean remove0(Throwable cause, boolean notifyWritability) { Entry e = flushedEntry; if (e == null) { //清空当前reactor线程缓存的所有待发送数据 clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; //从channelOutboundBuffer中删除该Entry节点 removeEntry(e); if (!e.cancelled) { // only release message, fail and decrement if it was not canceled before. //释放msg所占用的内存空间 ReferenceCountUtil.safeRelease(msg); //编辑promise发送失败,并通知相应的Lisener safeFail(promise, cause); //由于msg得到释放,所以需要降低channelOutboundBuffer中的内存占用水位线,并根据notifyWritability决定是否触发ChannelWritabilityChanged事件 decrementPendingOutboundBytes(size, false, notifyWritability); } // recycle the entry //回收Entry实例对象 e.recycle(); return true; }

当一个 Entry 节点需要从 ChannelOutboundBuffer 中清除时,Netty 需要释放该 Entry 节点中包裹的发送数据 msg 所占用的内存空间。并标记对应的 promise 为失败同时通知对应的 listener ,由于 msg 得到释放,所以需要降低 channelOutboundBuffer 中的内存占用水位线,并根据 boolean notifyWritability 决定是否触发 ChannelWritabilityChanged 事件。最后需要将该 Entry 实例回收至 Recycler 对象池中。

5. 终于开始真正地发送数据了!

来到这里我们就真正进入到了 Netty 发送数据的核心处理逻辑,在?《Netty如何高效接收网络数据》一文中,笔者详细介绍了 Netty 读取数据的核心流程,Netty 会在一个 read loop 中不断循环读取 Socket 中的数据直到数据读取完毕或者读取次数已满 16 次,当循环读取了 16 次还没有读取完毕时,Netty 就不能在继续读了,因为 Netty 要保证 Reactor 线程可以均匀的处理注册在它上边的所有 Channel 中的 IO 事件。剩下未读取的数据等到下一次 read loop 在开始读取。

除此之外,在每次 read loop 开始之前,Netty 都会分配一个初始化大小为 2048 的 DirectByteBuffer 来装载从 Socket 中读取到的数据,当整个 read loop 结束时,会根据本次读取数据的总量来判断是否为该 DirectByteBuffer 进行扩容或者缩容,目的是在下一次 read loop 的时候可以为其分配一个容量大小合适的 DirectByteBuffer 。

其实 Netty 对发送数据的处理和对读取数据的处理核心逻辑都是一样的,这里大家可以将这两篇文章结合对比着看。

但发送数据的细节会多一些,也会更复杂一些,由于这块逻辑整体稍微比较复杂,所以我们接下来还是分模块进行解析:

5.1 发送数据前的准备工作

@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { //获取NioSocketChannel中封装的jdk nio底层socketChannel SocketChannel ch = javaChannel(); //最大写入次数 默认为16 目的是为了保证SubReactor可以平均的处理注册其上的所有Channel int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // 如果全部数据已经写完 则移除OP_WRITE事件并直接退出writeLoop clearOpWrite(); return; } // SO_SNDBUF设置的发送缓冲区大小 * 2 作为 最大写入字节数 293976 = 146988 << 1 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); // 将ChannelOutboundBuffer中缓存的DirectBuffer转换成JDK NIO 的 ByteBuffer ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); // ChannelOutboundBuffer中总共的DirectBuffer数 int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { .........向底层jdk nio socketChannel发送数据......... } } while (writeSpinCount > 0); ............处理本轮write loop未写完的情况....... }

这部分内容为 Netty 开始发送数据之前的准备工作:

5.1.1 获取write loop最大发送循环次数

从当前 NioSocketChannel 的配置类 NioSocketChannelConfig 中获取 write loop 最大循环写入次数,默认为 16。但也可以通过下面的方式进行自定义设置。

ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) ....... .childOption(ChannelOption.WRITE_SPIN_COUNT,自定义数值) 5.1.2 处理在一轮write loop中就发送完数据的情况

进入 write loop 之后首先需要判断当前 ChannelOutboundBuffer 中的数据是否已经写完了 in.isEmpty()) ,如果全部写完就需要清除当前 Channel 在 Reactor 上注册的 OP_WRITE 事件。

这里大家可能会有疑问,目前我们还没有注册 OP_WRITE 事件到 Reactor 上,为啥要清除呢? 别着急,笔者会在后面为大家揭晓答案。

5.1.3 获取本次write loop 最大允许发送字节数

从 ChannelConfig 中获取本次 write loop 最大允许发送的字节数 maxBytesPerGatheringWrite 。初始值为 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1,最小值为 2048。

private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { //293976 = 146988 << 1 //SO_SNDBUF设置的发送缓冲区大小 * 2 作为 最大写入字节数 //最小值为2048 private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE; private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); calculateMaxBytesPerGatheringWrite(); } private void calculateMaxBytesPerGatheringWrite() { // 293976 = 146988 << 1 // SO_SNDBUF设置的发送缓冲区大小 * 2 作为 最大写入字节数 int newSendBufferSize = getSendBufferSize() << 1; if (newSendBufferSize > 0) { setMaxBytesPerGatheringWrite(newSendBufferSize); } } }

我们可以通过如下的方式自定义配置 Socket 发送缓冲区大小。

ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) ....... .childOption(ChannelOption.SO_SNDBUF,自定义数值) 5.1.4 将待发送数据转换成 JDK NIO ByteBuffer

由于最终 Netty 会调用 JDK NIO 的 SocketChannel 发送数据,所以这里需要首先将当前 Channel 中的写缓冲队列 ChannelOutboundBuffer 里存储的 DirectByteBuffer( Netty 中的 ByteBuffer 实现)转换成 JDK NIO 的 ByteBuffer 类型。最终将转换后的待发送数据存储在 ByteBuffer[] nioBuffers 数组中。这里通过调用 ChannelOutboundBuffer#nioBuffers 方法完成以上 ByteBuffer 类型的转换。

通过 ChannelOutboundBuffer#nioBufferCount() 获取本次 write loop 总共需要发送的 ByteBuffer 数量 nioBufferCnt 。注意这里已经变成了 JDK NIO 实现的 ByteBuffer 了。

详细的 ByteBuffer 类型转换过程,笔者会在专门讲解 Buffer 设计的时候为大家全面细致地讲解,这里我们还是主要聚焦于发送数据流程的主线。

当做完这些发送前的准备工作之后,接下来 Netty 就开始向 JDK NIO SocketChannel 发送这些已经转换好的 JDK NIO ByteBuffer 了。

5.2 向JDK NIO SocketChannel发送数据

netty文件传输,netty传输二进制文件(24)

上一页23456下一页

栏目热文

文档排行

本站推荐

Copyright © 2018 - 2021 www.yd166.com., All Rights Reserved.