Netty编码流程及WriteAndFlush()的实现 (3)

调用jdk底层的api,进行自旋写

// todo 最终传递到 这里 @Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // todo 添加刷新标志, 设置写状态 outboundBuffer.addFlush(); // todo 遍历buffer队列, 过滤byteBuf flush0(); } 添加刷新标志,设置写状态

什么是添加刷新标志呢? 其实就是更改链表中的指针位置,三个指针之间的可以完美的把entry划分出曾经flush过的和未flush节点

ok,继续

下面看一下如何设置状态,addflush() 源码如下:

* todo 给 ChannelOutboundBuffer 添加缓存, 这意味着, 原来添加进 ChannelOutboundBuffer 中的所有 Entry, 全部会被标记为 flushed 过 */ public void addFlush() { // todo 默认让 entry 指向了 unflushedEntry ==> 其实链表中的最左边的 未被使用过的 entry // todo Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); // todo 跟进这个方法 decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); // All flushed so reset unflushedEntry unflushedEntry = null; } }

目标是移动指针,改变每一个节点的状态, 哪一个指针呢? 是 flushedEntry, 它指向读被flush的节点,也就是说,它左边的,都被处理过了

下面的代码,是选出一开始位置, 因为, 如果flushedEntry == null,说明没有任何一个曾经被flush过的节点,于是就将开始的位置定位到最左边开始,

if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; }

紧接着一个do-while循环,从最后一个被flushedEntry的地方,到尾部,挨个遍历每一个节点, 因为这些节点要被flush进缓存,我们需要把write时累加的他们的容量减掉, 源码如下

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; } // todo 每次 减去 -size long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); // todo 默认 getWriteBufferLowWaterMark() -32kb // todo newWriteBufferSize<32 就把不可写状态改为可写状态 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }

同样是使用原子类做到的这件事, 此外,经过减少的容量,如果小于了32kb就会传播 channel可写的事件

遍历buffer队列, 过滤byteBuf

这是flush的重头戏,它实现了将数据写入socket的操作

我们跟进它的源码,doWrite(ChannelOutboundBuffer in) 这是本类AbstractChannel的抽象方法, 写如的逻辑方法,被设计成抽象的,具体往那个channel写,和具体的实现有关, 当前我们想往客户端写, 它的实现是AbstractNioByteChannel,我们进入它的实现,源码如下

boolean setOpWrite = false; // todo 整体是无限循环, 过滤ByteBuf for (;;) { // todo 获取第一个 flushedEntity, 这个entity中 有我们需要的 byteBuf Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } if (msg instanceof ByteBuf) { // todo 第三部分,jdk底层, 进行自旋的写 ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { // todo 当前的 ByteBuf 中,没有可写的, 直接remove掉 in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { // todo 获取自旋锁, netty使用它进行 writeSpinCount = config().getWriteSpinCount(); } // todo 这个for循环是在自旋尝试往 jdk底层的 ByteBuf写入数据 for (int i = writeSpinCount - 1; i >= 0; i --) { // todo 把 对应的 buf , 写到socket中 // todo localFlushedAmount就是 本次 往jdk底层的 ByteBuffer 中写入了多少字节 int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } // todo 累加一共写了多少字节 flushedAmount += localFlushedAmount; // todo 如果buf中的数据全部写完了, 设置完成的状态, 退出循环 if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // todo 自旋结束,写完了 done = true if (done) { // todo 跟进去 in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; } ....

这一段代码也是非常长, 它的主要逻辑如下:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwsfwz.html