客户端channel pipeline结构.png
而 pipeline 这样一个双向链表数据结构中的类型正是 ChannelHandlerContext ,由 ChannelHandlerContext 包裹我们自定义的 IO 处理逻辑 ChannelHandler。
ChannelHandler 并不需要感知到它所处的 pipeline 中的上下文信息,只需要专心处理好 IO 逻辑即可,关于 pipeline 的上下文信息全部封装在 ChannelHandlerContext中。
ChannelHandler 在 Netty 中的作用只是负责处理 IO 逻辑,比如编码,解码。它并不会感知到它在 pipeline 中的位置,更不会感知和它相邻的两个 ChannelHandler。事实上 ChannelHandler也并不需要去关心这些,它唯一需要关注的就是处理所关心的异步事件
而 ChannelHandlerContext 中维护了 pipeline 这个双向链表中的 pre 以及 next 指针,这样可以方便的找到与其相邻的 ChannelHandler ,并可以过滤出一些符合执行条件的 ChannelHandler。正如它的命名一样, ChannelHandlerContext 正是起到了维护 ChannelHandler 上下文的一个作用。而 Netty 中的异步事件在 pipeline 中的传播靠的就是这个 ChannelHandlerContext 。
2. write事件的传播这样设计就使得 ChannelHandlerContext 和 ChannelHandler 的职责单一,各司其职,具有高度的可扩展性。
我们无论是在业务线程或者是在 SubReactor 线程中完成业务处理后,都需要通过 channelHandlerContext 的引用将 write事件在 pipeline 中进行传播。然后在 pipeline 中相应的 ChannelHandler 中监听 write 事件从而可以对 write事件进行自定义编排处理(比如我们常用的编码器),最终传播到 HeadContext 中执行发送数据的逻辑操作。
前边也提到 Netty 中有两个触发 write 事件传播的方法,它们的传播处理逻辑都是一样的,只不过它们在 pipeline 中的传播起点是不同的。
- channelHandlerContext.write() 方法会从当前 ChannelHandler 开始在 pipeline 中向前传播 write 事件直到 HeadContext。
- channelHandlerContext.channel().write() 方法则会从 pipeline 的尾结点 TailContext 开始在 pipeline 中向前传播 write 事件直到 HeadContext 。
客户端channel pipeline结构.png
在我们清楚了 write 事件的总体传播流程后,接下来就来看看在 write 事件传播的过程中Netty为我们作了些什么?这里我们以 channelHandlerContext.write() 方法为例说明。
3. write方法发送数据write事件传播流程.png
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise);
return promise;
}
}
这里我们看到 Netty 的写操作是一个异步操作,当我们在业务线程中调用 channelHandlerContext.write() 后,Netty 会给我们返回一个 ChannelFuture,我们可以在这个 ChannelFutrue 中添加 ChannelFutureListener ,这样当 Netty 将我们要发送的数据发送到底层 Socket 中时,Netty 会通过 ChannelFutureListener 通知我们写入结果。
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
//此处的msg就是Netty在read loop中从NioSocketChannel中读取到的ByteBuffer
ChannelFuture future = ctx.write(msg);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
处理异常情况
} else {
写入Socket成功后,Netty会通知到这里
}
}
});
}
当异步事件在 pipeline 传播的过程中发生异常时,异步事件就会停止在 pipeline 中传播。所以我们在日常开发中,需要对写操作异常情况进行处理。
- 其中 inbound 类异步事件发生异常时,会触发exceptionCaught事件传播。 exceptionCaught 事件本身也是一种 inbound 事件,传播方向会从当前发生异常的 ChannelHandler 开始一直向后传播直到 TailContext。
- 而 outbound 类异步事件发生异常时,则不会触发exceptionCaught事件传播。一般只是通知相关 ChannelFuture。但如果是 flush 事件在传播过程中发生异常,则会触发当前发生异常的 ChannelHandler 中 exceptionCaught 事件回调。
我们继续回归到写操作的主线上来~~~
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
................省略检查promise的有效性...............
//flush = true 表示channelHandler中调用的是writeAndFlush方法,这里需要找到pipeline中覆盖write或者flush方法的channelHandler
//flush = false 表示调用的是write方法,只需要找到pipeline中覆盖write方法的channelHandler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
//用于检查内存泄露
final Object m = pipeline.touch(msg, next);
//获取pipeline中下一个要被执行的channelHandler的executor
EventExecutor executor = next.executor();
//确保OutBound事件由ChannelHandler指定的executor执行
if (executor.inEventLoop()) {
//如果当前线程正是channelHandler指定的executor则直接执行
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//如果当前线程不是ChannelHandler指定的executor,则封装成异步任务提交给指定executor执行,注意这里的executor不一定是reactor线程。
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
write 事件要向前在 pipeline 中传播,就需要在 pipeline 上找到下一个具有执行资格的 ChannelHandler,因为位于当前 ChannelHandler 前边的可能是 ChannelInboundHandler 类型的也可能是 ChannelOutboundHandler 类型的 ChannelHandler ,或者有可能压根就不关心 write 事件的 ChannelHandler(没有实现write回调方法)。
write事件的传播.png
这里我们就需要通过 findContextOutbound 方法在当前 ChannelHandler 的前边找到 ChannelOutboundHandler 类型并且覆盖实现 write 回调方法的 ChannelHandler 作为下一个要执行的对象。
3.1 findContextOutbound private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
//获取当前ChannelHandler的executor
EventExecutor currentExecutor = executor();
do {
//获取前一个ChannelHandler
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
//判断前一个ChannelHandler是否具有响应Write事件的资格
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
findContextOutbound 方法接收的参数是一个掩码,这个掩码表示要向前查找具有什么样执行资格的 ChannelHandler。因为我们这里调用的是 ChannelHandlerContext 的 write 方法所以 flush = false,传递进来的掩码为 MASK_WRITE,表示我们要向前查找覆盖实现了 write 回调方法的 ChannelOutboundHandler。
3.1.1 掩码的巧妙应用Netty 中将 ChannelHandler 覆盖实现的一些异步事件回调方法用 int 型的掩码来表示,这样我们就可以通过这个掩码来判断当前 ChannelHandler 具有什么样的执行资格。
final class ChannelHandlerMask {
....................省略......................
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
//outbound事件掩码集合
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
....................省略......................
}
在 ChannelHandler 被添加进 pipeline 的时候,Netty 会根据当前 ChannelHandler 的类型以及其覆盖实现的异步事件回调方法,通过 | 运算 向 ChannelHandlerContext#executionMask 字段添加该 ChannelHandler 的执行资格。
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
//ChannelHandler执行资格掩码
private final int executionMask;
....................省略......................
}
类似的掩码用法其实我们在前边的文章?《一文聊透Netty核心引擎Reactor的运转架构》中也提到过,在 Channel 向对应的 Reactor 注册自己感兴趣的 IO 事件时,也是用到了一个 int 型的掩码 interestOps 来表示 Channel 感兴趣的 IO 事件集合。
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
* 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件
* */
if ((interestOps & readInterestOp) == 0) {
//注册监听OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
- 用 & 操作判断,某个事件是否在事件集合中:(readyOps & SelectionKey.OP_CONNECT) != 0
- 用 | 操作向事件集合中添加事件:interestOps | readInterestOp
- 从事件集合中删除某个事件,是通过先将要删除事件取反 ~ ,然后在和事件集合做 & 操作:ops &= ~SelectionKey.OP_CONNECT
3.1.2 向前查找具有执行资格的ChannelOutboundHandler这部分内容笔者会在下篇文章全面介绍 pipeline 的时候详细讲解,大家这里只需要知道这里的掩码就是表示一个执行资格的集合。当前 ChannelHandler 的执行资格存放在它的 ChannelHandlerContext 中的 executionMask 字段中。
private AbstractChannelHandlerContext findContextOutbound(int mask) {
//当前ChannelHandler
AbstractChannelHandlerContext ctx = this;
//获取当前ChannelHandler的executor
EventExecutor currentExecutor = executor();
do {
//获取前一个ChannelHandler
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
//判断前一个ChannelHandler是否具有响应Write事件的资格
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
前边我们提到 ChannelHandlerContext 不仅封装了 ChannelHandler 的执行资格掩码还可以感知到当前 ChannelHandler 在 pipeline 中的位置,因为 ChannelHandlerContext 中维护了前驱指针 prev 以及后驱指针 next。
这里我们需要在 pipeline 中传播 write 事件,它是一种 outbound 事件,所以需要向前传播,这里通过 ChannelHandlerContext 的前驱指针 prev 拿到当前 ChannelHandler 在 pipeline 中的前一个节点。
ctx = ctx.prev;
通过 skipContext 方法判断前驱节点是否具有执行的资格。如果没有执行资格则跳过继续向前查找。如果具有执行资格则返回并响应 write 事件。
在 write 事件传播场景中,执行资格指的是前驱 ChannelHandler 是否是ChannelOutboundHandler 类型的,并且它是否覆盖实现了 write 事件回调方法。
public class EchoChannelHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
}
}
3.1.3 skipContext
该方法主要用来判断当前 ChannelHandler 的前驱节点是否具有 mask 掩码中包含的事件响应资格。
方法参数中有两个比较重要的掩码:
- int onlyMask:用来指定当前 ChannelHandler 需要符合的类型。其中MASK_ONLY_OUTBOUND 为 ChannelOutboundHandler 类型的掩码, MASK_ONLY_INBOUND 为 ChannelInboundHandler 类型的掩码。
final class ChannelHandlerMask {
//outbound事件的掩码集合
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
//inbound事件的掩码集合
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
}
比如本小节中我们是在介绍 write 事件的传播,那么就需要在当前ChannelHandler 前边首先是找到一个 ChannelOutboundHandler 类型的ChannelHandler。
ctx.executionMask & (onlyMask | mask)) == 0 用于判断前一个 ChannelHandler 是否为我们指定的 ChannelHandler 类型,在本小节中我们指定的是 onluMask = MASK_ONLY_OUTBOUND 即 ChannelOutboundHandler 类型。如果不是,这里就会直接跳过,继续在 pipeline 中向前查找。
- int mask:用于指定前一个 ChannelHandler 需要实现的相关异步事件处理回调。在本小节中这里指定的是 MASK_WRITE ,即需要实现 write 回调方法。通过 (ctx.executionMask & mask) == 0 条件来判断前一个ChannelHandler 是否实现了 write 回调,如果没有实现这里就跳过,继续在 pipeline 中向前查找。
3.1.4 向前传播write事件关于 skipContext 方法的详细介绍,笔者还会在下篇文章全面介绍 pipeline的时候再次进行介绍,这里大家只需要明白该方法的核心逻辑即可。
通过 findContextOutbound 方法我们在 pipeline 中找到了下一个具有执行资格的 ChannelHandler,这里指的是下一个 ChannelOutboundHandler 类型并且覆盖实现了 write 方法的 ChannelHandler。
Netty 紧接着会调用这个 nextChannelHandler 的 write 方法实现 write 事件在 pipeline 中的传播。
//获取下一个要被执行的channelHandler指定的executor
EventExecutor executor = next.executor();
//确保outbound事件的执行 是由 channelHandler指定的executor执行的
if (executor.inEventLoop()) {
//如果当前线程是指定的executor 则直接操作
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//如果当前线程不是channelHandler指定的executor,则封装程异步任务 提交给指定的executor执行
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
在我们向 pipeline 添加 ChannelHandler 的时候可以通过ChannelPipeline#addLast(EventExecutorGroup,ChannelHandler......) 方法指定执行该 ChannelHandler 的executor。如果不特殊指定,那么执行该 ChannelHandler 的executor默认为该 Channel 绑定的 Reactor 线程。
执行 ChannelHandler 中异步事件回调方法的线程必须是 ChannelHandler 指定的executor。
所以这里首先我们需要获取在 findContextOutbound 方法查找出来的下一个符合执行条件的 ChannelHandler 指定的executor。
EventExecutor executor = next.executor()
并通过 executor.inEventLoop() 方法判断当前线程是否是该 ChannelHandler 指定的 executor。
如果是,那么我们直接在当前线程中执行 ChannelHandler 中的 write 方法。
如果不是,我们就需要将 ChannelHandler 对 write 事件的回调操作封装成异步任务 WriteTask 并提交给 ChannelHandler 指定的 executor 中,由 executor 负责执行。
这里需要注意的是这个 executor 并不一定是 channel 绑定的 reactor 线程。它可以是我们自定义的线程池,不过需要我们通过 ChannelPipeline#addLast 方法进行指定,如果我们不指定,默认情况下执行 ChannelHandler 的 executor 才是 channel 绑定的 reactor 线程。
这里Netty需要确保 outbound 事件是由 channelHandler 指定的 executor 执行的。
这里有些同学可能会有疑问,如果我们向pipieline添加ChannelHandler的时候,为每个ChannelHandler指定不同的executor时,Netty如果确保线程安全呢??
大家还记得pipeline中的结构吗?