非阻塞 IO 管道可以使用单个线程来读取来自多个流的消息。 这要求流可以切换到非阻塞模式。 在非阻塞模式下,当从中读取数据时,如果流没有要读取的数据,则返回 0 字节。 当流实际上有一些要读取的数据时,返回至少 1 个字节。
为了避免检查有 0 字节的流来读取,我们使用 Selector 注册一个或多个 SelectableChannel 实例。 当在 Selector 上调用 select() 或 selectNow() 时,它只提供实际上有数据要读取的 SelectableChannel 实例。 这个设计的示意图:
读取部分消息当我们从 SelectableChannel 读取数据块时,我们不知道该数据块是否包含了一条完整的消息,可能的情况有:比一条消息少、一条完整消息、比一条消息多,如下图:
处理上述情况有两个挑战:
检测数据块中消息完整性;
在消息的其余部分到达之前,已收到的部分消息如何处理;
检测完整消息要求消息读取器查看数据块中的数据是否包含至少一个完整消息。 如果数据块包含一个或多个完整消息,则可以沿管道发送这些消息以进行处理。 这个步骤将重复很多次,因此这个过程必须尽可能快。
每当数据块中存在部分消息时,无论是单独消息还是在一个或多个完整消息之后,都需要存储该部分消息,直到该消息的其余部分到达。
检测完整消息和存储部分消息都是 Message Reader 的职责。 为区分来自不同 Channel 的消息数据,需要为每个 Channel 使用一个 Message Reader 。 设计看起来像这样:
检索具有要从选择器读取的数据的通道实例后,与该通道关联的消息读取器读取数据并尝试将其分解为消息。如果有任何完整的消息被读取,则可以将这些消息沿读取管道传递给需要处理它们的任何组件。
一个消息阅读器当然是针对特定协议的。 消息读取器需要知道它尝试读取的消息的消息格式。 如果我们的服务器实现可以跨协议重用,则需要能够插入Message Reader 实现 ---- 可能通过以某种方式接受 Message Reader 工厂作为配置参数。
存储部分消息既然我们已经确定消息阅读器负责存储部分消息,直到收到完整的消息,我们需要弄清楚应该如何实现部分消息的存储。
应该考虑两个设计考虑因素:
尽可能少地复制消息数据。 复制越多,性能越低。
将完整的消息存储在连续的字节序列中,使解析消息更容易。
每个消息读取器的缓冲区显然,部分消息需要存储在某写缓冲区中。 简单的实现是在每个 Message Reader 中内部只有一个缓冲区。 但是,缓冲区应该有多大? 它需要足够大才能存储最大允许消息。 因此,如果允许的最大消息是 1MB ,那么每个 Message Reader 中的内部缓冲区至少需要 1MB 。
当我们达到数百万个连接时,每个连接使用 1MB 并不真正起作用。 100*10000 x 1MB 仍然是 1TB 内存! 如果最大消息大小为 16MB 怎么办? 那128MB?
可调整大小的缓冲区另一个选择是实现一个可调整大小的缓冲区, 缓冲区将从较小的大小开始,如果消息对于缓冲区而言太大了,则会扩展缓冲区。 这样,每个连接不一定需要例如 1MB 缓冲区。 每个连接只占用保存下一条消息所需的内存。
有几种方法可以实现可调整大小的缓冲区。 所有这些都有优点和缺点,稍后会讨论它们。
1.通过复制消息调整大小实现可调整大小的缓冲区的第一种方法是从一个小的缓冲区开始,例如, 4KB。 如果消息不能大于 4KB,则可以使用更大的缓冲区。 例如分配 8KB,并将来自 4KB 缓冲区的数据复制到更大的缓冲区中。
逐个复制缓冲区实现的优点是消息的所有数据都保存在一个连续的字节数组中。 这使得解析消息变得更加容易。逐个复制缓冲区实现的缺点是它会导致大量数据复制。
为了减少数据复制,可以分析流经系统的消息大小,以找到一些可以减少复制量的缓冲区大小。
例如,大多数消息是少于 4KB ,因为它们只包含非常小的请求/响应。 这意味着第一个缓冲区大小应为 4KB。然后如果消息大于 4KB,通常是因为它包含一个文件,流经系统的大多数文件都少于128KB,我们可以使第二个缓冲区大小为 128KB。最后,一旦消息高于 128KB,消息的大小就没有规律了,最终的缓冲区大小就是最大的消息大小。