RxJS入门

一、RxJS是什么?

官方文档使用了一句话总结RxJS: Think of RxJS as Lodash for events。那么Lodash主要解决了什么问题?Lodash主要集成了一系列关于数组、对象、字符串等集合操作,极大的方便了对这些集合数据进行衍生。举个简单的例子:求数组偶数元素的平方和

const { pipe, filter, map, reduce } = require('lodash/fp') const source = [0, 1, 2, 3, 4] const result = pipe( filter(x => x % 2 === 0), map(x => x * x), reduce((acc, cur) => acc + cur, 0) )(source) console.log(result) // 20

那么如果source中的元素序列是异步产生的呢,如何处理?其中一种解决方案是:Observer Pattern(观察者模式) + Iterator Pattern(迭代器模式)

const event = new (require('events')).EventEmitter() let count = 0, sum = 0 const source = [] const itr = source[Symbol.iterator]() event.on('pushData', data => { source.push(data) const { value } = itr.next() if (value % 2 === 0) { sum += value * value } }) event.on('pushDataComplete', () => { console.log(sum) // 20 }) const timer = setInterval(() => { if (count > 4) { clearInterval(timer) event.emit('pushDataComplete') return } event.emit('pushData', count++) }, 2000)

上述代码有什么问题?——没问题,但是结构松散,不易阅读,不符合函数式编程规范。用RxJS实现则简单的多,代码如下:

const { interval } = require('rxjs') const { reduce, take, filter, map } = require('rxjs/operators') const source$ = interval(1000) const result$ = source$.pipe( take(5) filter(x => x % 2 === 0), map(x => x * x), reduce((acc, cur) => acc + cur, 0) ) result$.subscribe(x => console.log(x))

这段代码与上文中Lodash实现的代码基本一致,唯一不同的是RxJS处理的是异步数据序列,这个异步数据序列在RxJS中被称为流(stream)。RxJS提供了很多操作符,可以对单条数据流进行转化、过滤等操作,也可以对多条数据流进行合并等操作。

RxJS入门

二、RxJS数据表示方法

RxJS中表示流的方法是Observable对象,也可以这么说,RxJS就是通过Observable组合各种异步行为的库。RxJS结合了观察者和迭代器模式的思想,可以简单的表示为:

Observable = Publisher + Iterator

下面是一个简单的例子

const { Observable } = require('rxjs') const onSubscribe = observer => { observer.next(0) observer.next(1) setTimeout(() => { observer.next(3) observer.complete() observer.next(4) }, 1000) observer.next(2) } // 创建流 const source$ = new Observable(onSubscribe) // 创建观察者 const observer = { next: item => console.log(item), complete: () => console.log('complete'), error: error => console.log(error) } // 订阅流 console.log('start') source$.subscribe(observer) console.log('end') // start // 0 // 1 // 2 // end // 3 // complete

前面我们说过Observable是Publisher和Iterator的结合,也仅仅是思想上的,实际上还是有很多区别,一般发布-订阅模式会在内部维护一个listeners清单,在要发布通知时会逐一的调用订阅者。但是Observable不是这样的,在其内部并没有一份订阅者的清单。订阅Observable的行为像是执行一个回调方法(onSubscribe),并且这个回调方法是把观察者observer当做参数的,而这里的观察者observer是一个具有三个方法属性的普通对象,观察者的三个方法(method):

next:每当Observable吐出新的值,next方法就会被调用。

complete:在Observable再也没有值吐出时调用,在complete被调用之后,next方法就不会再起作用。

error:每当Observable内发生错误时,error方法就会被调用。

没有强制要求observer对象必须要具有这三种方法,但至少需要有next方法,除此之外,Observable的subscribe方法还可以直接依次传入next/error/complete方法,其内部会自动组成完整的observer对象。

从上面的例子可以看出RxJS可以同时处理同步和异步行为,Observable可以通过创建时传入的回调onSubscribe方法控制数据吐出的节奏,这种数据流的节奏可以用一个时间轴来表示,在RxJS中被称为弹珠图(Marble Diagrams),上面的例子可以使用下面的弹珠图表示,第一颗弹珠表示同步吐出的0,1,2,第二颗弹珠表示1秒后吐出的3,弹珠上的竖线表示数据流不再产生数据,也就是调用了observer的complete方法.

...×33

理解弹珠图的意义的话,可以很容易画出本文第一节中例子对应的弹珠图

const { interval } = require('rxjs') const { reduce, take, filter, map } = require('rxjs/operators') const source$ = interval(1000) const result$ = source$.pipe( take(5) filter(x => x % 2 === 0), map(x => x * x), reduce((acc, cur) => acc + cur, 0) ) result$.subscribe(x => console.log(x))

interval(1000)

0123456789

take(5)

01234

filter(x => x % 2 === 0)

024

map(x => x * x)

0416

reduce((acc, cur) => acc + cur, 0)

20

三、RxJS操作符

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgzjjy.html