RxJS v6 学习指南 (5)

skipLast 是从数据流中忽略最后发出的若干数据

import { interval } from 'rxjs'; import { take } from 'rxjs/operators'; interval(1000).pipe( take(3) ).subscribe( x => console.log(x), null, () => console.log('complete') ) // 0 // 1 // 2 // 'complete'

使用了 take(3),表示只取 3 个数据,Observable 就进入完结状态。

import { interval, fromEvent } from 'rxjs' import { takeUntil } from 'rxjs/operators' interval(1000).pipe( takeUntil(fromEvent(document.querySelector('#btn'), 'click')) ).subscribe( x => { document.querySelector('#time').textContent = x + 1 }, null, () => console.log('complete') )

这里有一个 interval 创建的数据流一直在发出数据,直到当用户点击按钮时停止计时,见演示。

合并类操作符

合并类操作符用来将多个数据流合并。

1)concat、merge

concat、merge 都是用来把多个 Observable 合并成一个,但是 concat 要等上一个 Observable 对象 complete 之后才会去订阅第二个 Observable 对象获取数据并把数据传给下游,而 merge 时同时处理多个 Observable。使用方式如下:

import { interval } from 'rxjs' import { merge, take } from 'rxjs/operators' interval(500).pipe( take(3), merge(interval(300).pipe(take(6))) ).subscribe(x => console.log(x))

可以点此去比对效果,concat 的结果应该比较好理解,merge 借助弹珠图也比较好理解,它是在时间上对数据进行了合并。

source : ----0----1----2| source2: --0--1--2--3--4--5| merge() example: --0-01--21-3--(24)--5|

merge 的逻辑类似 OR,经常用来多个按钮有部分相同行为时的处理。

注意最新的官方文档和RxJS v5.x 到 6 的更新指南中指出不推荐使用 merge、concat、combineLatest、race、zip 这些操作符方法,而是推荐使用对应的静态方法。

将上面的 merge 改成从 rxjs 中导入,使用方式变成了合并多个 Observable,而不是一个 Observable 与其他 Observable 合并。

import { interval,merge } from 'rxjs' import { take } from 'rxjs/operators' merge( interval(500).pipe(take(3)), interval(300).pipe(take(6)) ).subscribe(x => console.log(x))

2)concatAll、mergeAll、switchAll

用来将高阶的 Observable 对象压平成一阶的 Observable,和 loadash 中压平数组的 flatten 方法类似。concatAll 会对内部的 Observable 对象做 concat 操作,和 concat 操作符类似,如果前一个内部 Observable 没有完结,那么 concatAll 不会订阅下一个内部 Observable,mergeAll 则是同时处理。switchAll 比较特殊一些,它总是切换到最新的内部 Observable 对象获取数据。上游高阶 Observable 产生一个新的内部 Observable 时,switchAll 就会立即订阅最新的内部 Observable,退订之前的,这也就是 ‘switch’ 的含义。

import { interval } from 'rxjs'; import { map, switchAll, take } from 'rxjs/operators'; interval(1500).pipe( take(2), map(x => interval(1000).pipe( map(y => x + ':' + y), take(2)) ), switchAll() ).subscribe(console.log) // 0:0 // 1:0 // 1:1

内部第一个 Observable 对象的第二个数据还没来得及发出,第二个 Observable 对象就产生了。

3)concatMap、mergeMap、switchMap

从上面的例子我们也可以看到高阶 Observable 常常是由 map 操作符将每个数据映射为 Observable 产生的,而我们订阅的时候需要将其压平为一阶 Observable,而就是要先使用 map 操作符再使用 concatAll 或 mergeAll 或 switchAll 这些操作符中的一个。RxJS 中提供了对应的更简洁的 API。使用的效果可以用下面的公式表示:

concatMap = map + concatAll mergeMap = map + mergeAll switchMap = map + switchAll

4)zip、combineLatest、withLatestFrom

zip 有拉链的意思,这个操作符和拉链的相似之处在于数据一定是一一对应的。

import { interval } from 'rxjs'; import { zip, take } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) source$.pipe( zip(newest$, (x, y) => x + y) ).subscribe(x => console.log(x)) // 0 // 2 // 4

zip 是内部的 Observable 都发出相同顺序的数据后才交给下游处理,最后一个参数是可选的 resultSelector 参数,这个函数用来处理操作符的结果。上面的示例运行过程如下:

newest 发出第一个值 0,但这时 source 还没有发出第一个值,所以不执行 resultSelector 函数也不会像下游发出数据

source 发出第一个值 0,此时 newest 之前已发出了第一个值 0,执行 resultSelector 函数得到结果 0,发出这个结果

newest 发出第二个值 1,但这时 source 还没有发出第二个值,所以不执行 resultSelector 函数也不会像下游发出数据

newest 发出第三个值 2,但这时 source 还没有发出第三个值,所以不执行 resultSelector 函数也不会像下游发出数据

source 发出第二个值 1,此时 newest 之前已发出了第一个值 1,执行 resultSelector 函数得到结果 2,发出这个结果

newest 发出第四个值 3,但这时 source 还没有发出第四个值,所以不执行 resultSelector 函数也不会像下游发出数据

source 发出第三个值 2,此时 newest 之前已发出了第一个值 2,执行 resultSelector 函数得到结果 4,发出这个结果

source 完结,不可能再有对应的数据了,整个 Observable 完结

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdzpj.html