Netty编码流程及WriteAndFlush()的实现 (2)

当我们进入查看他的实现时,idea会提示,它的子类重写了这个方法, 是谁重写的呢? 是AbstractNioByteChannel 这个类其实是属于客户端阵营的类,和服务端的AbstractNioMessageChannel相提并论

源码如下:

protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }

第二件事: 将转换后的DirectBuffer插入到写队列中

什么是写队列 ? 作用是啥?

它其实就是一个netty自定义的容器,使用的单向链表的结构,为什么要有这个容器呢? 回想一下,服务端需要向客户端发送消息,消息进而被封装进ByteBuf,但是呢, 往客户端写的方法有两个

write()

writeAndFlush()

这个方法的区别是有的,前者只是进行了写,(写到了ByteBuf) 却没有将内容刷新到ByteBuffer,没有刷新到缓存中,就没办法进一步把它写入jdk原生的ByteBuffer中, 而writeAndFlush()就比较方便,先把msg写入ByteBuf,然后直接刷进socket,一套带走,打完收工

但是如果客户端偏偏就是不使用writeAndFlush(),而使用前者,那么盛放消息的ByteBuf被传递到handler的最开始的位置,怎么办? unsafe也无法把它写给客户端, 难道丢弃不成?

于是写队列就解决了这个问题,它以链表当做数据结构,新传播过来的ByteBuf就会被他封装成一个一个的节点(entry)进行维护,为了区分这个链表中,哪个节点是被使用过的,哪个节点是没有使用过的,他就用三个标记指针进行标记,如下:

flushedEntry 被刷新过的entry

tailEntry 尾节点

unflushedEntry 未被刷的entry

下面我们看一下,它如何将一个新的节点,添加到写队列

addMessage(Object msg, int size, ChannelPromise promise) 添加写队列 public void addMessage(Object msg, int size, ChannelPromise promise) { // todo 将上面的三者封装成实体 // todo 调用工厂方法, 创建 Entry , 在 当前的ChannelOutboundBuffer 中每一个单位都是一个 Entry, 用它进一步包装 msg Entry entry = Entry.newInstance(msg, size, total(msg), promise); // todo 调整三个指针, 去上面查看这三个指针的定义 if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 // todo 跟进这个方法 incrementPendingOutboundBytes(entry.pendingSize, false); }

看他的源码,其实就是简单的针对链表进行插入的操作,尾插入法, 一直往最后的位置插入,链表的头被标记成unflushedEntry 这两个节点之间entry,表示是可以被flush的节点

在每次添加新的 节点后都调用incrementPendingOutboundBytes(entry.pendingSize, false)方法, 这个方法的作用是设置写状态, 设置怎样的状态呢? 我们看它的源码, 可以看到,它会记录下累计的ByteBuf的容量,一旦超出了阈值,就会传播channel不可写的事件

这也是write()的第三件事

private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } // todo TOTAL_PENDING_SIZE_UPDATER 当前缓存中 存在的代写的 字节 // todo 累加 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); // todo 判断 新的将被写的 buffer的容量不能超过 getWriteBufferHighWaterMark() 默认是 64*1024 64字节 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { // todo 超过64 字节,进入这个方法 setUnwritable(invokeLater); } } 小结:

到目前为止,第一波write()事件已经完成了,我们可以看到了,这个事件的功能就是使用ChannelOutBoundBuf将write事件传播过去的单个ByteBuf维护起来,等待 flush事件的传播

第二波事件传递 flush()

我们重新回到,AbstractChannel中,看他的第二波flush事件的传播状态, 源码如下:它也是主要做了下面的三件事

添加刷新标志,设置写状态

遍历buffer队列,过滤可以flush的buffer

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwsfwz.html