netty文件传输,netty传输二进制文件

首页 > 实用技巧 > 作者:YD1662024-02-21 13:35:28

客户端channel pipeline结构.png

outbound 事件在 pipeline 中的传播最终会传播到 HeadContext 中,之前的系列文章我们提到过,HeadContext 中封装了 Channel 的 Unsafe 类负责 Channel 底层的 IO 操作。而 HeadContext 指定的 executor 正是对应 channel 绑定的 reactor 线程。

netty文件传输,netty传输二进制文件(9)

image.png

所以最终在 netty 内核中执行写操作的线程一定是 reactor 线程从而保证了线程安全性。

忘记这段内容的同学可以在回顾下?《Reactor在Netty中的实现(创建篇)》,类似的套路我们在介绍 NioServerSocketChannel 进行 bind 绑定以及 register 注册的时候都介绍过,只不过这里将 executor 扩展到了自定义线程池的范围。

3.1.5 触发nextChannelHandler的write方法回调

netty文件传输,netty传输二进制文件(10)

write事件的传播1.png

//如果当前线程是指定的executor 则直接操作 if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); }

由于我们在示例 ChannelHandler 中调用的是 ChannelHandlerContext#write 方法,所以这里的 flush = false 。触发调用 nextChannelHandler 的 write 方法。

void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { // 当前channelHandler虽然添加到pipeline中,但是并没有调用handlerAdded // 所以不能调用当前channelHandler中的回调方法,只能继续向前传递write事件 write(msg, promise); } }

这里首先需要通过 invokeHandler() 方法判断这个 nextChannelHandler 中的 handlerAdded 方法是否被回调过。因为 ChannelHandler 只有被正确的添加到对应的 ChannelHandlerContext 中并且准备好处理异步事件时, ChannelHandler#handlerAdded 方法才会被回调。

这一部分内容笔者会在下一篇文章中详细为大家介绍,这里大家只需要了解调用 invokeHandler() 方法的目的就是为了确定 ChannelHandler 是否被正确的初始化。

private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }

只有触发了 handlerAdded 回调,ChannelHandler 的状态才能变成 ADD_COMPLETE 。

如果 invokeHandler() 方法返回 false,那么我们就需要跳过这个nextChannelHandler,并调用 ChannelHandlerContext#write 方法继续向前传播 write 事件。

@Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { //继续向前传播write事件,回到流程起点 write(msg, false, promise); return promise; }

如果 invokeHandler() 返回 true ,说明这个 nextChannelHandler 已经在 pipeline 中被正确的初始化了,Netty 直接调用这个 ChannelHandler 的 write 方法,这样就实现了 write 事件从当前 ChannelHandler 传播到了nextChannelHandler。

private void invokeWrite0(Object msg, ChannelPromise promise) { try { //调用当前ChannelHandler中的write方法 ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }

这里我们看到在 write 事件的传播过程中如果发生异常,那么 write 事件就会停止在 pipeline 中传播,并通知注册的 ChannelFutureListener。

netty文件传输,netty传输二进制文件(11)

客户端channel pipeline结构.png

从本文示例的 pipeline 结构中我们可以看到,当在 EchoServerHandler 调用 ChannelHandlerContext#write 方法后,write 事件会在 pipeline 中向前传播到 HeadContext 中,而在 HeadContext 中才是 Netty 真正处理 write 事件的地方。

3.2 HeadContext

final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); } }

write 事件最终会在 pipeline 中传播到 HeadContext 里并回调 HeadContext 的 write 方法。并在 write 回调中调用 channel 的 unsafe 类执行底层的 write 操作。这里正是 write 事件在 pipeline 中的传播终点。

netty文件传输,netty传输二进制文件(12)

上一页12345下一页

栏目热文

文档排行

本站推荐

Copyright © 2018 - 2021 www.yd166.com., All Rights Reserved.