RxJS入门 (2)

如同lodash,RxJS完成复杂异步操作的关键是其实现了大量的操作符,RxJS实现了多达100+的操作符,包括创建类、转换类、过滤类、联合类、工具类等,如上面的例子中,interval属于创建类操作符,它创建了一个Observable对象,作为数据的源头,take和filter属于过滤类操作符,map属于转换类, reduce属于聚合类。实际应用中,我们会花很多时间在操作符的选择上,想熟悉掌握这些操作符不是短期内能完成的,但至少初学者要了解大部分操作符能完成什么样的操作,由于篇幅限制,本文不打算一一介绍所有的操作符,这些操作符可以具体可参考官方文档,后续例子中如果应用到的操作符会着重介绍一下,下面还是借着前面的例子说一下操作符的实现原理,RxJS中大多数操作符都是Pipeable Operators,例子中除了interval以外都是Pipeable Operators,Pipeable Operators本质上是一个纯函数,它将一个Observable作为输入,生成另一个Observable作为输出。订阅输出Observable也将订阅输入Observable。在RxJS中自定义一个操作符非常简单,只需要符合上述指导原则。下面的代码自行实现了例子中所有操作符,看起来一目了然。

const { Observable } = require('rxjs') const interval = duration => new Observable(observer => { let count = 0 setInterval(() => { observer.next(count++) }, duration) }) const take = num => observable => new Observable(observer => { let count = 0 const subscription = observable.subscribe({ next(value) { if (count <= num) { observer.next(value) ++count if (count === num) { observer.complete() subscription.unsubscribe() } } }, error(err) { observer.error(err) }, complete() { observer.complete() } }) return () => { subscription.unsubscribe() } }) const filter = handler => observable => new Observable(observer => { const subscription = observable.subscribe({ next(value) { if (handler(value)) { observer.next(value) } }, error(err) { observer.error(err) }, complete() { observer.complete() } }) return () => { subscription.unsubscribe() } }) const map = handler => observable => new Observable(observer => { const subscription = observable.subscribe({ next(value) { observer.next(handler(value)) }, error(err) { observer.error(err) }, complete() { observer.complete() } }) return () => { subscription.unsubscribe() } }) const reduce = (handler, seed) => observable => new Observable(observer => { const arr = [] const subscription = observable.subscribe({ next(value) { arr.push(value) }, error(err) { observer.error(err) }, complete() { seed = arr.reduce(handler, seed) observer.next(seed) observer.complete() } }) return () => { subscription.unsubscribe() } }) const source$ = interval(1000).pipe( take(5), filter(x => x % 2 === 0), map(x => x * x), reduce((acc, seed) => acc + seed, 0) ) source$.subscribe(item => console.log(item), null, () => console.log('complete') ) 四、RxJS与Promise

目前主流的异步解决方案是Promise,Await本质也是Promise,那么RxJS解决方案相比Promise有什么优势呢?
1.Observable可以处理异步事件流,但是Promise只能处理单次事件

const { Observable } = require('rxjs') const source$ = new Observable(observer => { setTimeout(() => observer.next(1), 1000) setTimeout(() => observer.next(2), 2000) setTimeout(() => observer.next(3), 3000) setTimeout(() => observer.complete(), 4000) }) source$.subscribe(result => console.log(result))

2019-09-29_16-38-40 -4-

2.Observable是懒执行的(Lazyable),而new Promise(executor)的executor会立即执行

const { Observable } = require('rxjs') const source$ = new Observable(observer => { setTimeout(() => observer.next(1), 1000) setTimeout(() => observer.next(2), 2000) setTimeout(() => observer.complete(), 3000) }) setTimeout(() => { console.log(3) source$.subscribe(result => console.log(result)) }, 3000) // 3 // 1 // 2

3.Observable 数据是可丢弃的(Cancellable/Abortable)
如前面例子中的take操作符,实际上只取了前5个数据,而丢弃了后面所有的数据,RxJS中还有很多操作符具有类似的性质,如takeUntil( observable ), takeWhile( predicate ), take( n ), first(), first( predicate )从它们的名称和参数就大概能猜到它们的作用。
再比如实际应用中可以会遇到需要丢弃网络请求的结果,如果单纯使用Promise是无法实现的,

const delay = wait => { return new Promise(resolve => { setTimeout(resolve, wait) }) } delay(3000).then(() => console.log('xxxx'))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgzjjy.html