Reactive Extensions 相见恨晚的Rx.Net

何为Reactive Extensions(Rx)

Rx是一个遵循函数式编程的类库,它引用观察者以及迭代器设计模式对可观察对象产生的数据进行异步消费。使用Rx,

开发人员将使用LINQ运算符操作异步数据流,并使用调度程序参数化异步数据流中的并发性,简单地说,Rx = Observables + LINQ + Schedulers。

使用Rx需要Nuget安装System.Reactive Nuget包

Rx的使用场景

响应式UI

UI界面上,用户对一个绑定数据集合的控件进行关键字查询。常规的流程是我们必须在等待用户键盘按下指定的完成键(如回车)或鼠标点击查询按钮后程序才开始执行相应的查询处理。但假设需求变更:“用户希望在每输入一个关键字后就能及时将关键字相应的查询结果集绑定到控件” 如果面临这个需求,那你会如何实现呢? 你会少不了定义相应的全局状态字段,少不了相应的时间间隔刷新。我相信写出来的代码也会让你很烦恼。 其实你有更好的选择,那就是我们的主角Rx。

Rx 核心

Rx有两个核心接口 IObservable<T>、IObserver<T>

IObservable<T>

先来看此接口的结构:

Reactive Extensions 相见恨晚的Rx.Net

IObservable<T>接口就提供一个Subscribe(订阅)方法,入参是一个观察者对象接口
我们可以将IObservable<T>称之为被观察者(可观察者),IObserver<T>称之为观察者

通过可接口签名可以看出被观察者需要输出T类型的对象。需要理解被观察者IObservable<T>我们需要与现有的一些常规知识点做出比较,这里我们用IEnumerable<T>比较。
我想我们都使用过Linq,操作过IEnumerable<T>集合,IEnumerable<T>集合有个明显的状态就是它所存储的元素是静态的。集合内的元素状态除非代码显示的新增或删除、修改,否则这个集合基本是静态(数据未变动)的。但是IObservable<T>则不同,它的元素是根据被观察者提供的数据而变动的(不可预测的),就好比在UI上你无法预测用户的操作行为一样。

下面这个表格可以看出两者区别

IEnumerable 可在方便时列举元素值
IObservable   可观察对象变动的值  

IObserver<T>

IObserver<T>接口可以理解为消费被贯彻着提供数据的一个接口,它的三个方法决定了本次数据流的观察行为的走向。
通俗理解就是被观察者生成数据,观察者消费数据。

来看下IObserver<T>的结构

Reactive Extensions 相见恨晚的Rx.Net

 

OnNext 表示消费新数据

OnError 表示观察数据流出现异常

OnCompleted 表示明确关闭观察数据流

代码示例

下面代码定义了一个可观察的队列,该队列会提供给观察者三个int类型的入参 1、2、3 供观察者对象的OnNext方法消费。 MyConsoleObserver(观察者)在得到数据后打印出来。

1 class Program 2 { 3 4 static void Main(string[] args) 5 { 6 Test(); 7 } 8 9 private static void Test() 10 { 11 var numbers = new MySequenceOfNumbers(); 12 var observer = new MyConsoleObserver<int>(); 13 numbers.Subscribe(observer); 14 Console.ReadLine(); 15 } 16 17 } 18 19 /// <summary> 20 /// 自定义被观察队列 21 /// </summary> 22 public class MySequenceOfNumbers : IObservable<int> 23 { 24 public IDisposable Subscribe(IObserver<int> observer) 25 { 26 observer.OnNext(1); 27 observer.OnNext(2); 28 observer.OnNext(3); 29 observer.OnCompleted(); 30 return Disposable.Empty; 31 } 32 } 33 34 /// <summary> 35 /// 自定义观察者对象 36 /// </summary> 37 /// <typeparam></typeparam> 38 public class MyConsoleObserver<T> : IObserver<T> 39 { 40 public void OnNext(T value) 41 { 42 Console.WriteLine("接收到 value {0}", value); 43 } 44 public void OnError(Exception error) 45 { 46 Console.WriteLine("出现异常! {0}", error); 47 } 48 public void OnCompleted() 49 { 50 Console.WriteLine("关闭观察行为"); 51 } 52 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdyfx.html