flush流程.png
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
.........将待发送数据转换到JDK NIO ByteBuffer中.........
//本次write loop中需要发送的 JDK ByteBuffer个数
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
//这里主要是针对 网络传输文件数据 的处理 FileRegion
writeSpinCount -= doWrite0(in);
break;
case 1: {
.........处理单个NioByteBuffer发送的情况......
break;
}
default: {
.........批量处理多个NioByteBuffers发送的情况......
break;
}
}
} while (writeSpinCount > 0);
............处理本轮write loop未写完的情况.......
}
这里大家可能对 nioBufferCnt == 0 的情况比较有疑惑,明明之前已经校验过ChannelOutboundBuffer 不为空了,为什么这里从 ChannelOutboundBuffer 中获取到的 nioBuffer 个数依然为 0 呢?
在前边我们介绍 Netty 对 write 事件的处理过程时提过, ChannelOutboundBuffer 中只支持 ByteBuf 类型和 FileRegion 类型,其中 ByteBuf 类型用于装载普通的发送数据,而 FileRegion 类型用于通过零拷贝的方式网络传输文件。
而这里 ChannelOutboundBuffer 虽然不为空,但是装载的 NioByteBuffer 个数却为 0 说明 ChannelOutboundBuffer 中装载的是 FileRegion 类型,当前正在进行网络文件的传输。
case 0 的分支主要就是用于处理网络文件传输的情况。
5.2.1 零拷贝发送网络文件 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg == null) {
return 0;
}
return doWriteInternal(in, in.current());
}
这里需要特别注意的是用于文件传输的方法 doWriteInternal 中的返回值,理解这些返回值的具体情况有助于我们理解后面 write loop 的逻辑走向。
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
..............忽略............
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
//文件已经传输完毕
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
//零拷贝的方式传输文件
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
//走到这里表示 此时Socket已经写不进去了 退出writeLoop,注册OP_WRITE事件
return WRITE_STATUS_SNDBUF_FULL;
}
最终会在 doWriteFileRegion 方法中通过 FileChannel#transferTo 方法底层用到的系统调用为 sendFile 实现零拷贝网络文件的传输。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected long doWriteFileRegion(FileRegion region) throws Exception {
final long position = region.transferred();
return region.transferTo(javaChannel(), position);
}
}
关于 Netty 中涉及到的零拷贝,笔者会有一篇专门的文章为大家讲解,本文的主题我们还是先聚焦于把发送流程的主线打通。
我们继续回到发送数据流程主线上来~~
case 0:
//这里主要是针对 网络传输文件数据 的处理 FileRegion
writeSpinCount -= doWrite0(in);
break;
- region.transferred() >= region.count() :表示当前 FileRegion 中的文件数据已经传输完毕。那么在这种情况下本次 write loop 没有写入任何数据到 Socket ,所以返回 0 ,writeSpinCount - 0 意思就是本次 write loop 不算,继续循环。
- localFlushedAmount > 0 :表示本 write loop 中写入了一些数据到 Socket 中,会有返回 1,writeSpinCount - 1 减少一次 write loop 次数。
- localFlushedAmount <= 0 :表示当前 Socket 发送缓冲区已满,无法写入数据,那么就返回 WRITE_STATUS_SNDBUF_FULL = Integer.MAX_VALUE。 writeSpinCount - Integer.MAX_VALUE 必然是负数,直接退出循环,向 Reactor 注册 OP_WRITE 事件并退出 flush 流程。等 Socket 发送缓冲区可写了,Reactor 会通知 channel 继续发送文件数据。记住这里,我们后面还会提到。
剩下两个 case 1 和 default 分支主要就是处理 ByteBuffer 装载的普通数据发送逻辑。
其中 case 1 表示当前 Channel 的 ChannelOutboundBuffer 中只包含了一个 NioByteBuffer 的情况。
default 表示当前 Channel 的 ChannelOutboundBuffer 中包含了多个 NioByteBuffers 的情况。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
.........将待发送数据转换到JDK NIO ByteBuffer中.........
//本次write loop中需要发送的 JDK ByteBuffer个数
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
..........处理网络文件传输.........
case 1: {
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
//如果当前Socket发送缓冲区满了写不进去了,则注册OP_WRITE事件,等待Socket发送缓冲区可写时 在写
// SubReactor在处理OP_WRITE事件时,直接调用flush方法
incompleteWrite(true);
return;
}
//根据当前实际写入情况调整 maxBytesPerGatheringWrite数值
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
//如果ChannelOutboundBuffer中的某个Entry被全部写入 则删除该Entry
// 如果Entry被写入了一部分 还有一部分未写入 则更新Entry中的readIndex 等待下次writeLoop继续写入
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// ChannelOutboundBuffer中总共待写入数据的字节数
long attemptedBytes = in.nioBufferSize();
//批量写入
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
//根据实际写入情况调整一次写入数据大小的最大值
// maxBytesPerGatheringWrite决定每次可以从channelOutboundBuffer中获取多少发送数据
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
//移除全部写完的BUffer,如果只写了部分数据则更新buffer的readerIndex,下一个writeLoop写入
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
............处理本轮write loop未写完的情况.......
}
case 1 和 default 这两个分支在处理发送数据时的逻辑是一样的,唯一的区别就是 case 1 是处理单个 NioByteBuffer 的发送,而 default 分支是批量处理多个 NioByteBuffers 的发送。
下面笔者就以经常被触发到的 default 分支为例来为大家讲述 Netty 在处理数据发送时的逻辑细节:
- 首先从当前 NioSocketChannel 中的 ChannelOutboundBuffer 中获取本次 write loop 需要发送的字节总量 attemptedBytes 。这个 nioBufferSize 是在前边介绍 ChannelOutboundBuffer#nioBuffers 方法转换 JDK NIO ByteBuffer 类型时被计算出来的。
- 调用 JDK NIO 原生 SocketChannel 批量发送 nioBuffers 中的数据。并获取到本次 write loop 一共批量发送了多少字节 localWrittenBytes 。
/**
* @throws NotYetConnectedException
* If this channel is not yet connected
*/
public abstract long write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
- localWrittenBytes <= 0 表示当前 Socket 的写缓存区 SEND_BUF 已满,写不进数据了。那么就需要向当前 NioSocketChannel 对应的 Reactor 注册 OP_WRITE 事件,并停止当前 flush 流程。当 Socket 的写缓冲区有容量可写时,epoll 会通知 reactor 线程继续写入。
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
//这里处理还没写满16次 但是socket缓冲区已满写不进去的情况 注册write事件
//什么时候socket可写了, epoll会通知reactor线程继续写
setOpWrite();
} else {
...........目前还不需要关注这里.......
}
}
向 Reactor 注册 OP_WRITE 事件:
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
关于通过位运算来向 IO 事件集合 interestOps 添加监听 IO 事件的用法,在前边的文章中,笔者已经多次介绍过了,这里不再重复。
- 根据本次 write loop 向 Socket 写缓冲区写入数据的情况,来调整下次 write loop 最大写入字节数。maxBytesPerGatheringWrite 决定每次 write loop 可以从 channelOutboundBuffer 中最多获取多少发送数据。初始值为 SO_SNDBUF大小 * 2 = 293976 = 146988 << 1,最小值为 2048。
public static final int MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD = 4096;
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
由于操作系统会动态调整 SO_SNDBUF 的大小,所以这里 netty 也需要根据操作系统的动态调整做出相应的调整,目的是尽量多的去写入数据。
attempted == written 表示本次 write loop 尝试写入的数据能全部写入到 Socket 的写缓冲区中,那么下次 write loop 就应该尝试去写入更多的数据。
那么这里的更多具体是多少呢?
Netty 会将本次写入的数据量 written 扩大两倍,如果扩大两倍后的写入量大于本次 write loop 的最大限制写入量 maxBytesPerGatheringWrite,说明用户的写入需求很猛烈,Netty当然要满足这样的猛烈需求,那么就将当前 NioSocketChannelConfig 中的 maxBytesPerGatheringWrite 更新为本次 write loop 两倍的写入量大小。
在下次 write loop 写入数据的时候,就会尝试从 ChannelOutboundBuffer 中加载最多 written * 2 大小的字节数。
如果扩大两倍后的写入量依然小于等于本次 write loop 的最大限制写入量 maxBytesPerGatheringWrite,说明用户的写入需求还不是很猛烈,Netty 继续维持本次 maxBytesPerGatheringWrite 数值不变。
如果本次写入的数据还不及尝试写入数据的 1 / 2 :written < attempted >>> 1。说明当前 Socket 写缓冲区的可写容量不是很多了,下一次 write loop 就不要写这么多了尝试减少下次写入的量将下次 write loop 要写入的数据减小为 attempted 的1 / 2。当然也不能无限制的减小,最小值不能低于 2048。
这里可以结合笔者前边的文章?《一文聊透ByteBuffer动态自适应扩缩容机制》中介绍到的 read loop 场景中的扩缩容一起对比着看。
read loop 中的扩缩容触发时机是在一个完整的 read loop 结束时候触发。而 write loop 中扩缩容的触发时机是在每次 write loop 发送完数据后,立即触发扩缩容判断。
- 当本次 write loop 批量发送完 ChannelOutboundBuffer 中的数据之后,最后调用in.removeBytes(localWrittenBytes) 从 ChannelOutboundBuffer 中移除全部写完的 Entry ,如果只发送了 Entry 的部分数据则更新 Entry 对象中封装的 DirectByteBuffer 的 readerIndex,等待下一次 write loop 写入。
到这里,write loop 中的发送数据的逻辑就介绍完了,接下来 Netty 会在 write loop 中循环地发送数据直到写满 16 次或者数据发送完毕。
还有一种退出 write loop 的情况就是当 Socket 中的写缓冲区满了,无法在写入时。Netty 会退出 write loop 并向 reactor 注册 OP_WRITE 事件。
但这其中还隐藏着一种情况就是如果 write loop 已经写满 16 次但还没写完数据并且此时 Socket 写缓冲区还没有满,还可以继续在写。那 Netty 会如何处理这种情况呢?
6. 处理Socket可写但已经写满16次还没写完的情况 @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
.........将待发送数据转换到JDK NIO ByteBuffer中.........
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
//这里主要是针对 网络传输文件数据 的处理 FileRegion
writeSpinCount -= doWrite0(in);
break;
case 1: {
.....发送单个nioBuffer....
}
default: {
.....批量发送多个nioBuffers......
}
}
} while (writeSpinCount > 0);
//处理write loop结束 但数据还没写完的情况
incompleteWrite(writeSpinCount < 0);
}
当 write loop 结束后,这时 writeSpinCount 的值会有两种情况:
- writeSpinCount < 0:这种情况有点不好理解,我们在介绍 Netty 通过零拷贝的方式传输网络文件也就是这里的 case 0 分支逻辑时,详细介绍了 doWrite0 方法的几种返回值,当 Netty 在传输文件的过程中发现 Socket 缓冲区已满无法在继续写入数据时,会返回 WRITE_STATUS_SNDBUF_FULL = Integer.MAX_VALUE,这就使得 writeSpinCount的值 < 0。随后 break 掉 write loop 来到 incompleteWrite(writeSpinCount < 0) 方法中,最后会在 incompleteWrite 方法中向 reactor 注册 OP_WRITE 事件。当 Socket 缓冲区变得可写时,epoll 会通知 reactor 线程继续发送文件。
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
//这里处理还没写满16次 但是socket缓冲区已满写不进去的情况 注册write事件
// 什么时候socket可写了, epoll会通知reactor线程继续写
setOpWrite();
} else {
..............
}
}
- writeSpinCount == 0: 这种情况很好理解,就是已经写满了 16 次,但是还没写完,同时 Socket 的写缓冲区未满,还可以继续写入。这种情况下即使 Socket 还可以继续写入,Netty 也不会再去写了,因为执行 flush 操作的是 reactor 线程,而 reactor 线程负责执行注册在它上边的所有 channel 的 IO 操作,Netty 不会允许 reactor 线程一直在一个 channel 上执行 IO 操作,reactor 线程的执行时间需要均匀的分配到每个 channel 上。所以这里 Netty 会停下,转而去处理其他 channel 上的 IO 事件。
那么还没写完的数据,Netty会如何处理呢?
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
//这里处理还没写满16次 但是socket缓冲区已满写不进去的情况 注册write事件
// 什么时候socket可写了, epoll会通知reactor线程继续写
setOpWrite();
} else {
//这里处理的是socket缓冲区依然可写,但是写了16次还没写完,这时就不能在写了,reactor线程需要处理其他channel上的io事件
//因为此时socket是可写的,必须清除op_write事件,否则会一直不停地被通知
clearOpWrite();
//如果本次writeLoop还没写完,则提交flushTask到reactor
eventLoop().execute(flushTask);
}
这个方法的 if 分支逻辑,我们在介绍do {.....}while()循环体 write loop 中发送逻辑时已经提过,在 write loop 循环发送数据的过程中,如果发现 Socket 缓冲区已满,无法写入数据时( localWrittenBytes <= 0),则需要向 reactor 注册 OP_WRITE 事件,等到 Socket 缓冲区变为可写状态时,epoll 会通知 reactor 线程继续写入剩下的数据。
do {
.........将待发送数据转换到JDK NIO ByteBuffer中.........
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
writeSpinCount -= doWrite0(in);
break;
case 1: {
.....发送单个nioBuffer....
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
.................省略..............
break;
}
default: {
.....批量发送多个nioBuffers......
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
.................省略..............
break;
}
}
} while (writeSpinCount > 0);
注意 if 分支处理的情况是还没写满 16 次,但是 Socket 缓冲区已满,无法写入的情况。
而 else 分支正是处理我们这里正在讨论的情况即 Socket 缓冲区是可写的,但是已经写满 16 次,在本轮 write loop 中不能再继续写入的情况。
这时 Netty 会将 channel 中剩下的待写数据的 flush 操作封装程 flushTask,丢进 reactor 的普通任务队列中,等待 reactor 执行完其他 channel 上的 io 操作后在回过头来执行未写完的 flush 任务。
忘记 Reactor 整体运行逻辑的同学,可以在回看下笔者的这篇文章?《一文聊透Netty核心引擎Reactor的运转架构》
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
((AbstractNioUnsafe) unsafe()).flush0();
}
};
这里我们看到 flushTask 中的任务是直接再次调用 flush0 继续回到发送数据的逻辑流程中。
细心的同学可能会有疑问,为什么这里不在继续注册 OP_WRITE 事件而是通过向 reactor 提交一个 flushTask 来完成 channel 中剩下数据的写入呢?
原因是这里我们讲的 else 分支是用来处理 Socket 缓冲区未满还是可写的,但是由于用户本次要发送的数据太多,导致写了 16 次还没写完的情形。
既然当前 Socket 缓冲区是可写的,我们就不能注册 OP_WRITE 事件,否则这里一直会不停地收到 epoll 的通知。因为 JDK NIO Selector 默认的是 epoll 的水平触发。
忘记水平触发和边缘触发这两种 epoll 工作模式的同学,可以在回看下笔者的这篇文章?《聊聊Netty那些事儿之从内核角度看IO模型》
所以这里只能向 reactor 提交 flushTask 来继续完成剩下数据的写入,而不能注册 OP_WRITE 事件。
注意:只有当 Socket 缓冲区已满导致无法写入时,Netty 才会去注册 OP_WRITE 事件。这和我们之前介绍的 OP_ACCEPT 事件和 OP_READ 事件的注册时机是不同的。
这里大家可能还会有另一个疑问,就是为什么在向 reactor 提交 flushTask 之前需要清理 OP_WRITE 事件呢? 我们并没有注册 OP_WRITE 事件呀~~
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
......省略......
} else {
clearOpWrite();
eventLoop().execute(flushTask);
}
在为大家解答这个疑问之前,笔者先为大家介绍下 Netty 是如何处理 OP_WRITE 事件的,当大家明白了 OP_WRITE 事件的处理逻辑后,这个疑问就自然解开了。
7. OP_WRITE事件的处理在?《一文聊透Netty核心引擎Reactor的运转架构》一文中,我们介绍过,当 Reactor 监听到 channel 上有 IO 事件发生后,最终会在 processSelectedKey 方法中处理 channel 上的 IO 事件,其中 OP_ACCEPT 事件和 OP_READ 事件的处理过程,笔者已经在之前的系列文章中介绍过了,这里我们聚焦于 OP_WRITE 事件的处理。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
.............省略.......
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
......处理connect事件......
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
........处理accept和read事件.........
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
这里我们看到当 OP_WRITE 事件发生后,Netty 直接调用 channel 的 forceFlush 方法。
@Override
public final void forceFlush() {
// directly call super.flush0() to force a flush now
super.flush0();
}
其实 forceFlush 方法中并没有什么特殊的逻辑,直接调用 flush0 方法再次发起 flush 操作继续 channel 中剩下数据的写入。
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
clearOpWrite();
return;
}
.........将待发送数据转换到JDK NIO ByteBuffer中.........
int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) {
case 0:
......传输网络文件........
case 1: {
.....发送单个nioBuffer....
}
default: {
.....批量发送多个nioBuffers......
}
}
} while (writeSpinCount > 0);
//处理write loop结束 但数据还没写完的情况
incompleteWrite(writeSpinCount < 0);
}
注意这里的 clearOpWrite() 方法,由于 channel 上的 OP_WRITE 事件就绪,表明此时 Socket 缓冲区变为可写状态,从而 Reactor 线程再次来到了 flush 流程中。
当 ChannelOutboundBuffer 中的数据全部写完后 in.isEmpty() ,就需要清理 OP_WRITE 事件,因为此时 Socket 缓冲区是可写的,这种情况下当数据全部写完后,就需要取消对 OP_WRITE 事件的监听,否则 epoll 会不断的通知 Reactor。
同理在 incompleteWrite 方法的 else 分支也需要执行 clearOpWrite() 方法取消对 OP_WRITE 事件的监听。
protected final void incompleteWrite(boolean setOpWrite) {
if (setOpWrite) {
// 这里处理还没写满16次 但是socket缓冲区已满写不进去的情况 注册write事件
// 什么时候socket可写了, epoll会通知reactor线程继续写
setOpWrite();
} else {
// 必须清除OP_WRITE事件,此时Socket对应的缓冲区依然是可写的,只不过当前channel写够了16次,被SubReactor限制了。
// 这样SubReactor可以腾出手来处理其他channel上的IO事件。这里如果不清除OP_WRITE事件,则会一直被通知。
clearOpWrite();
//如果本次writeLoop还没写完,则提交flushTask到SubReactor
//释放SubReactor让其可以继续处理其他Channel上的IO事件
eventLoop().execute(flushTask);
}
}
8. writeAndFlush
在我们讲完了 write 事件和 flush 事件的处理过程之后,writeAndFlush 就变得很简单了,它就是把 write 和 flush 流程结合起来,先触发 write 事件然后在触发 flush 事件。
下面我们来看下 writeAndFlush 的具体逻辑处理:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
//此处的msg就是Netty在read loop中从NioSocketChannel中读取到ByteBuffer
ctx.writeAndFlush(msg);
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise);
return promise;
}
}
这里可以看到 writeAndFlush 方法的处理入口和 write 事件的处理入口是一样的。唯一不同的是入口处理函数 write 方法的 boolean flush 入参不同,在 writeAndFlush 的处理中 flush = true。
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
................省略检查promise的有效性...............
//flush = true 表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler
//flush = false 表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
//用于检查内存泄露
final Object m = pipeline.touch(msg, next);
//获取下一个要被执行的channelHandler的executor
EventExecutor executor = next.executor();
//确保OutBound事件由ChannelHandler指定的executor执行
if (executor.inEventLoop()) {
//如果当前线程正是channelHandler指定的executor则直接执行
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
由于在 writeAndFlush 流程的处理中,flush 标志被设置为 true,所以这里有两个地方会和 write 事件的处理有所不同。
- findContextOutbound( MASK_WRITE | MASK_FLUSH ):这里在 pipeline 中向前查找的 ChanneOutboundHandler 需要实现 write 方法或者 flush 方法。这里需要注意的是 write 方法和 flush 方法只需要实现其中一个即可满足查找条件。因为一般我们自定义 ChannelOutboundHandler 时,都会继承 ChannelOutboundHandlerAdapter 类,而在 ChannelInboundHandlerAdapter 类中对于这些 outbound 事件都会有默认的实现。
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
这样在后面传播 write 事件或者 flush 事件的时候,我们通过上面逻辑找出的 ChannelOutboundHandler 中可能只实现了一个 flush 方法或者 write 方法。不过这样没关系,如果这里在传播 outbound 事件的过程中,发现找出的 ChannelOutboundHandler 中并没有实现对应的 outbound 事件回调函数,那么就直接调用在 ChannelOutboundHandlerAdapter 中的默认实现。
- 在向前传播 writeAndFlush 事件的时候会通过调用 ChannelHandlerContext 的 invokeWriteAndFlush 方法,先传播 write 事件 然后在传播 flush 事件。
void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
//向前传递write事件
invokeWrite0(msg, promise);
//向前传递flush事件
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
//调用当前ChannelHandler中的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
}
这里我们看到了 writeAndFlush 的核心处理逻辑,首先向前传播 write 事件,经过 write 事件的流程处理后,最后向前传播 flush 事件。
根据前边的介绍,这里在向前传播 write 事件的时候,可能查找出的 ChannelOutboundHandler 只是实现了 flush 方法,不过没关系,这里会直接调用 write 方法在 ChannelOutboundHandlerAdapter 父类中的默认实现。同理 flush 也是一样。
总结
到这里,Netty 处理数据发送的整个完整流程,笔者就为大家详细地介绍完了,可以看到 Netty 在处理读取数据和处理发送数据的过程中,虽然核心逻辑都差不多,但是发送数据的过程明显细节比较多,而且更加复杂一些。
这里笔者将读取数据和发送数据的不同之处总结如下几点供大家回忆对比:
- 在每次 read loop 之前,会分配一个大小固定的 diretByteBuffer 用来装载读取数据。每轮 read loop 完全结束之后,才会决定是否对下一轮的读取过程分配的 directByteBuffer 进行扩容或者缩容。
- 在每次 write loop 之前,都会获取本次 write loop 最大能够写入的字节数,根据这个最大写入字节数从 ChannelOutboundBuffer 中转换 JDK NIO ByteBuffer 。每次写入 Socket 之后都需要重新评估是否对这个最大写入字节数进行扩容或者缩容。
- read loop 和 write loop 都被默认限定最多执行 16 次。
- 在一个完整的 read loop 中,如果还读取不完数据,直接退出。等到 reactor 线程执行完其他 channel 上的 IO 事件再来读取未读完的数据。
- 而在一个完整的 write loop 中,数据发送不完,则分两种情况。 Socket 缓冲区满无法在继续写入。这时就需要向 reactor 注册 OP_WRITE 事件。等 Socket 缓冲区变的可写时,epoll 通知 reactor 线程继续发送。Socket 缓冲区可写,但是由于发送数据太多,导致虽然写满 16 次但依然没有写完。这时就直接向 reactor 丢一个 flushTask 进去,等到 reactor 线程执行完其他 channel 上的 IO 事件,在回过头来执行 flushTask。
- OP_READ 事件的注册是在 NioSocketChannel 被注册到对应的 Reactor 中时就会注册。而 OP_WRITE 事件只会在 Socket 缓冲区满的时候才会被注册。当 Socket 缓冲区再次变得可写时,要记得取消 OP_WRITE 事件的监听。否则的话就会一直被通知。
好了,本文的全部内容就到这里了,我们下篇文章见~~~~