《深入浅出RxJS》 (3)

merge第一时间会subscribe上游所有的Observable,然后才去先到先得的策略,任何一个Observable只要有数据下来,就会传给下游的Observable对象

image

merge的第一个Observable如果产生的是同步数据流,那会等第一个同步数据流产生完毕之后,再回合并下一个Observable对象,因此merge的主要适用场景仍然是异步数据流。一个比较常用的场景就是用于合并DOM事件

merge还有一个可选的参数concurrent,用于指定同时合并的Observable对象的个数

const source1$ = Observable.timer(0, 1000).map(x => x+'A'); const source2$ = Observable.timer(500, 1000).map(x => x+'B'); const source3$ = Observable.timer(1000, 1000).map(x => x+'C'); const merged$ = source1$.merge(source2$, source3$, 2); merged$.subscribe(console.log, null, () => console.log('complete')); // 0A 0B 1A 1B 2A 2B...

这里就限定了优先合并2个Observable对象。而第一二个又不会完结,所以source3$没有出头之日。

zip

zip将上游的两个Obserable合并,并且将他们中的数据一一对应。

// 基本用法 const source1$ = Observable.of(1,2,3); const source2$ = Observable.of(4,5,6); const zipped$ = Observable.zip(source1$, source2$); zipped$.subscribe(console.log, null, () => console.log('completed')); // [1,4] [2,5] [3,6] completed

当使用zip的时候,它会立刻订阅上游Observable,然后开始合并数据。对于zip而言上游任何一个Observable完结,zip只要给这个完结的Observable对象吐出所有的数据找到配对的数据,那么zip就会给下游一个complete信号

const source1$ = Observable.interval(1000); const source2$ = Observable.of('a', 'b', 'c'); // [0, 'a'] [1, 'b'] [2, 'c'] complete

但是这里也会有一个问题,如果某个上游的source1$吐出的数据很快,但是source$2吐出的数据慢,那么zip就不得不先存储source1$的数据

如果使用zip组合超过两个Observable对象,游戏规则依然一样,组合而成的Observable吐出的数据依然是数组

combineLatest

合并最后一个数据,从所有输入Observable对象中那最后一个产生的数据(最新数据),然后把这些数据组合起来传给下游。

const source1$ = Observable.timer(500, 1000); const source2$ = Observable.timer(1000, 1000); const result$ = source1$.combineLatest(source2$);

image

image

我们也可以自由的定制下游数据

const source1$ = Observable.timer(500, 1000); const source2$ = Observable.timer(1000, 1000); const project = (a, b) => `${a} and ${b}`; const result$ = source1$.combineLatest(source2$, project);

多重依赖的问题:

const original$ = Observable.timer(0, 1000); const source1$ = original$.map(x => x + 'a'); const source2$ = original$.map(x => x + 'b'); const result$ = source1$.combineLatest(source2$); withLatestFrom

功能类似于combineLatest,但是给下游推送数据只能由一个

const source1$ = Observable.timer(0, 2000).map(x => 1000 * x); const source2$ = Observable.timer(500, 1000); const result$ = source1$.withLatestFrom(source2$, (a,b) => a + b); // 101 203 305 407... race

第一个吐出数据的Observable对象就是胜者,race产生的Observable就会完全采用Observable对象的数据,其余的输入Observable对象则会被退订而抛弃。

const source1$ = Observable.timer(0, 2000).map(x => x + 'a'); const source2$ = Observable.timer(500, 2000).map(y => y + 'b'); const winner$ = source1$.race(source2$); winner$.subscribe(console.log); // 1a 2a 3a... startWith

让一个Observable对象在被订阅的时候,总是先吐出指定的若干数据

const origin$ = Observable.timer(0, 1000); const result$ = origin$.startWith('start'); // start // 0 // 1

startWith的操作符就是为了满足链式调用的需求

original$.map(x => x * 2).startWith('start').map(x => x + 'ok'); forkJoin

只有当所有的Observable对象都完结,确定不会有新的数据产生的时候,forkJoin就会把所有输入的Observable对象产生的最后一个数据合并成给下游唯一的数据

const source1$ = Observable.interval(1000).map(x => x + 'a').take(1); const source2$ = Observable.interval(1000).map(x => x + 'b').take(3); const concated$ = Observable.forkJoin(source1$, source2$); concated$.subscribe(console.log); // ["0a", "2b"] 高阶Observable

所谓高阶Observable,指的就是产生数据依然是Observable的Observable

// 高阶Observable示例 const ho$ = Observable.interval(1000).take(2) .map(x => Observable.interval(1500).map(y => x + ':' + y)); concatAll

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzyxwy.html