System.IO.Pipelines: .NET上高性能IO
System.IO.Pipelines是一个新的库,旨在简化在.NET中执行高性能IO的过程。它是一个依赖.NET Standard的库,适用于所有.NET实现。
Pipelines诞生于.NET Core团队,为使Kestrel成为业界最快的Web服务器之一。最初从作为Kestrel内部的实现细节发展成为可重用的API,它在.Net Core 2.1中作为可用于所有.NET开发人员的最高级BCL API(System.IO.Pipelines)提供。
它解决了什么问题?为了正确解析Stream或Socket中的数据,代码有固定的样板,并且有许多极端情况,为了处理他们,不得不编写难以维护的复杂代码。
实现高性能和正确性,同时也难以处理这种复杂性。Pipelines旨在解决这种复杂性。
让我们从一个简单的问题开始吧。我们想编写一个TCP服务器,它接收来自客户端的用行分隔的消息(由\n分隔)。(译者注:即一行为一条消息)
使用NetworkStream的TCP服务器声明:与所有对性能敏感的工作一样,应在应用程序中测量每个方案的实际情况。根据您的网络应用程序需要处理的规模,可能不需要在乎的各种技术的开销。
在Pipelines之前用.NET编写的典型代码如下所示:
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // 在buffer中处理一行消息 ProcessLine(buffer); }此代码可能在本地测试时正确工作,但它有几个潜在错误:
一次ReadAsync调用可能没有收到整个消息(行尾)。
它忽略了stream.ReadAsync()返回值中实际填充到buffer中的数据量。(译者注:即不一定将buffer填充满)
一次ReadAsync调用不能处理多条消息。
这些是读取流数据时常见的一些缺陷。为了解决这个问题,我们需要做一些改变:
我们需要缓冲传入的数据,直到找到新的行。
我们需要解析缓冲区中返回的所有行
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; var bytesBuffered = 0; var bytesConsumed = 0; while (true) { var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered); if (bytesRead == 0) { // EOF 已经到末尾 break; } // 跟踪已缓冲的字节数 bytesBuffered += bytesRead; var linePosition = -1; do { // 在缓冲数据中查找找一个行末尾 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // 根据偏移量计算一行的长度 var lineLength = linePosition - bytesConsumed; // 处理这一行 ProcessLine(buffer, bytesConsumed, lineLength); // 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }这一次,这可能适用于本地开发,但一行可能大于1KiB(1024字节)。我们需要调整输入缓冲区的大小,直到找到新行。
因此,我们可以在堆上分配缓冲区去处理更长的一行。我们从客户端解析较长的一行时,可以通过使用ArrayPool<byte>避免重复分配缓冲区来改进这一点。
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // 在buffer中计算中剩余的字节数 var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // 将buffer size翻倍 并且将之前缓冲的数据复制到新的缓冲区 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // 将旧的buffer丢回池中 ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF 末尾 break; } // 跟踪已缓冲的字节数 bytesBuffered += bytesRead; do { // 在缓冲数据中查找找一个行末尾 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // 根据偏移量计算一行的长度 var lineLength = linePosition - bytesConsumed; // 处理这一行 ProcessLine(buffer, bytesConsumed, lineLength); // 移动bytesConsumed为了跳过我们已经处理掉的行 (包括\n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }这段代码有效,但现在我们正在重新调整缓冲区大小,从而产生更多缓冲区副本。它将使用更多内存,因为根据代码在处理一行行后不会缩缓冲区的大小。为避免这种情况,我们可以存储缓冲区序列,而不是每次超过1KiB大小时调整大小。
此外,我们不会增长1KiB的 缓冲区,直到它完全为空。这意味着我们最终传递给ReadAsync越来越小的缓冲区,这将导致对操作系统的更多调用。
为了缓解这种情况,我们将在现有缓冲区中剩余少于512个字节时分配一个新缓冲区: