C# 5引入了Async/Await,用以提高用户界面响应能力和对Web资源的访问能力。换句话说,异步方法用于执行不阻塞线程并返回一个标量结果的异步操作。
微软多次尝试简化异步操作,因为Async/Await模式易于理解,所以在开发人员当中获得了良好的认可。
现有异步方法的一个重要不足是它必须提供一个标量返回结果(一个值)。比如这个方法async Task<int> DoAnythingAsync(),DoAnythingAsync的结果是一个整数(一个值)。
由于存在这个限制,你不能将这个功能与yield关键字一起使用,并且也不能将其与async IEnumerable<int>(返回异步枚举)一起使用。
如果可以将Async/Await特性与yield操作符一起使用,我们就可以使用非常强大的编程模型(如异步数据拉取或基于拉取的枚举,在F#中被称为异步序列)。
C# 8中新提出的Async Streams去掉了标量结果的限制,并允许异步方法返回多个结果。
这个变更将使异步模式变得更加灵活,这样就可以按照延迟异步序列的方式从数据库中获取数据,或者按照异步序列的方式下载数据(这些数据在可用时以块的形式返回)。
例如:
foreach await (var streamChunck in asyncStreams) { Console.WriteLine($“Received data count = {streamChunck.Count}”); }Reactive Extensions(Rx)是解决异步编程问题的另一种方法。Rx越来越受到开发人员的欢迎。很多其他编程语言(如Java和JavaScript)已经实现了这种技术(RxJava、RxJS)。Rx基于推送式编程模型(Push Programming Model),也称为反应式编程。反应式编程是事件驱动编程的一种类型,它处理的是数据而不是通知。
通常,在推送式编程模型中,你不需要控制Publisher。数据被异步推送到队列中,消费者在数据到达时消费数据。与Rx不同,Async Streams可以按需被调用,并生成多个值,直到达到枚举的末尾。
在本文中,我将对拉取模型和推送模型进行比较,并演示每一种技术各自的适用场景。我将使用很多代码示例向你展示整个概念和它们的优点,最后,我将讨论Async Streams功能,并向你展示示例代码。
拉取式编程模型与推送式编程模型图-1-拉取式编程模型与推送式编程模型
我使用的例子是著名的生产者和消费者问题,但在我们的场景中,生产者不是生成食物,而是生成数据,消费者消费的是生成的数据,如图-1所示。拉取模型很容易理解。消费者询问并拉取生产者的数据。另一种方法是使用推送模型。生产者将数据发布到队列中,消费者通过订阅队列来接收所需的数据。
拉取模型更合适“快生产者和慢消费者”的场景,因为消费者可以从生产者那里拉取其所需的数据,避免消费者出现溢出。推送模型更适合“慢生产者和快消费者”的场景,因为生产者可以将数据推送给消费者,避免消费者不必要的等待时间。
Rx和Akka Streams(流式编程模型)使用了回压技术(一种流量控制机制)。它使用拉取模型或推送模型来解决上面提到的生产者和消费者问题。
在下面的示例中,我使用了一个慢消费者从快生产者那里异步拉取数据序列。消费者在处理完一个元素后,会向生产者请求下一个元素,依此类推,直到到达序列的末尾。
动机和背景要了解我们为什么需要Async Streams,让我们来看下面的代码。
// 对参数(count)进行循环相加操作 static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }方法调用:
const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);输出: