开发进阶:Dotnet Core多路径异步终止 (2)

这里,DuplexImpl方法允许枚举终止,但又与外部终止标记保持分离。这样,在编译器层面不会被合并。在里面,CreateLinkedTokenSource反倒像编译器的处理。

现在,我们有一个CancellationTokenSource,需要时,我们可能通过它来终止循环的运行。

using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken); try { // ... todo } finally { allDone.Cancel(); }

通过这种方式,我们可以处理这样的场景:消费者没有获取所有数据,而我们想要触发allDone,但是我们退出了DuplexImpl。这时候,迭代器的作用就很大了,它让程序变得更简单,因为用了using,最终里面的任何内容都会定位到Dispose/DisposeAsync。

下一个是生产者,也就是SendAsync。它也是双工的,对传入的消息没有影响,所以可以用Task.Run作为一个独立的代码路径开始运行,而如果生产者出现错误,则终止发送。上边的todo部分,可以加入:

var send = Task.Run(async () => { try { await foreach (var message in request.WithCancellation(allDone.Token)) { await transport.SendAsync(message, allDone.Token); } } catch { allDone.Cancel(); throw; } }, allDone.Token); // ... todo: receive await send;

这里启动了一个生产者的并行操作SendAsync。注意,这里我们用标记allDone.Token把组合的终止标记传递给生产者。延迟await是为了允许ProducerAsync方法里可以使用终止令牌,以满足复合双工操作的生命周期要求。

这样,接收代码就变成了:

while (true) { var (success, message) = await transport.TryReceiveAsync(allDone.Token); if (!success) break; yield return message; } allDone.Cancel();

最后,把这部分代码合在一起看看:

private async static IAsyncEnumerable<TResponse> DuplexImpl(ITransport<TRequest, TResponse> transport, IAsyncEnumerable<TRequest> request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default) { using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken); try { var send = Task.Run(async () => { try { await foreach (var message in request.WithCancellation(allDone.Token)) { await transport.SendAsync(message, allDone.Token); } } catch { allDone.Cancel(); throw; } }, allDone.Token); while (true) { var (success, message) = await transport.TryReceiveAsync(allDone.Token); if (!success) break; yield return message; } allDone.Cancel(); await send; } finally { allDone.Cancel(); } }

三、总结

相关的处理就这么多。这里实现的关键点是:

外部令牌和枚举器令牌都对allDone有贡献

传输中发送和接收代码使用allDone.Token

生产者枚举使用allDone.Token

任何情况下退出枚举器,allDone都会被终止

如果传输接收错误,则allDone被终止

如果消费者提前终止,则allDone被终止

当我们收到来自服务器的最后一条消息后,allDone被终止

如果生产者或传输发送错误,allDone被终止

最后多说一点,关于ConfigureAwait(false):

默认情况下,await包含一个对SynchronizationContext.Current的检查。除了表示额外的上下文切换之外,在UI应用程序的情况下,它也意味着在UI线程上运行不需要在UI线程上运行的代码。库代码通常不需要这样做。因此,在库代码中,通常应该在所有用到await的地方使用. configureawait (false)来绕过这个检查。而在一般应用程序的代码中,应该默认只使用await而不使用ConfigureAwait,除非你知道你在做什么。

开发进阶:Dotnet Core多路径异步终止

 

微信公众号:老王Plus

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjdzz.html