上面如果没有传递最后一个参数 resultSelector 函数,将会依次输出数组 [0, 0]、[1, 1]、[2, 2]。在更新指南中,官方指出不推荐使用 resultSelector 参数,将会在 v7 中移除。加上之前提到的推荐使用静态方法,这个示例应该改成这样:
import { interval, zip } from 'rxjs'; import { take, map } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) const add = (x, y) => x + y zip(source$, newest$).pipe( map(x => add(...x)) ).subscribe(x => console.log(x))使用 zip 当有数据流吐出数据很快,而有数据流发出值很慢时,要小心数据积压的问题。这时快的数据流已经发出了很多数据,由于对应的数据还没发出,RxJS 只能保存数据,快的数据流不断地发出数据,积压的数据越来越多,消耗的内存也会越来越大。
combineLatest 与 zip 不同,只要其他的 Observable 已经发出过值就行,顾名思义,就是与其他 Observable 最近发出的值结合。
import { interval, combineLatest } from 'rxjs'; import { take } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) combineLatest(source$, newest$).subscribe(x => console.log(x)) // [0, 0] // [0, 1] // [0, 2] // [1, 2] // [1, 3] // [2, 3] // [2, 4] // [2, 5]withLatestFrom 没有静态方法,只有操作符方法,前面的方法所有 Observable 地位是平等的,而这个方法是使用这个操作符的 Observable 起到了主导作用,即只有它发出值才会进行合并产生数据发出给下游。
import { interval } from 'rxjs'; import { take, withLatestFrom } from 'rxjs/operators'; const source$ = interval(500).pipe(take(3)) const newest$ = interval(300).pipe(take(6)) source$.pipe( withLatestFrom(newest$) ).subscribe(x => console.log(x)) // [0, 0] // [1, 2] // [2, 4]source 发出 0 时,newest 最新发出的值为 0,结合为 [0, 0] 发出
source 发出 1,此时 newest 最新发出的值为 2,结合为 [1, 2] 发出
source 发出 2,此时 newest 最新发出的值为 4,结合为 [2, 4] 发出
source 完结,整个 Observable 完结
5)startWith、forkJoin、race
startWith 是在 Observable 的一开始加入初始数据,同步立即发送,常用来提供初始状态。
import { fromEvent, from } from 'rxjs'; import { startWith, switchMap } from 'rxjs/operators'; const source$ = fromEvent(document.querySelector('#btn'), 'click') let number = 0 const fakeRequest = x => { return new Promise((resolve, reject) => { setTimeout(() => { resolve(number++) }, 1000) }) } source$.pipe( startWith('initData'), switchMap(x => from(fakeRequest(x))) ).subscribe(x => document.querySelector('#number').textContent = x)这里通过 startWith 操作符获取了页面的初始数据,之后通过点击按钮获取更新数据。
forkJoin 只有静态方法形式,类似 Promise.all ,它会等内部所有 Observable 都完结之后,将所有 Observable 对象最后发出来的最后一个数据合并成 Observable。
race 操作符产生的 Observable 会完全镜像最先吐出数据的 Observable。
const obs1 = interval(1000).pipe(mapTo('fast one')); const obs2 = interval(3000).pipe(mapTo('medium one')); const obs3 = interval(5000).pipe(mapTo('slow one')); race(obs3, obs1, obs2) .subscribe( winner => console.log(winner) ); // result: // a series of 'fast one' 一个小的练习本文中的例子基本来自30 天精通 RxJS,使用 RxJS v6 版本进行重写。
页面上有一个 p 标签存放一个状态,初始为 0,有两个按钮,一个按钮点击后这个状态增加 1,另一个按钮点击后这个状态减少 1。
<button>Add</button> <button>Minus</button> <p></p>这两个按钮的点击事件我们都可以建立响应式数据流,可以使用 mapTo(1) 和 mapTo(-1) 分别表示点击后增加 1 和减少 1。我们可以使用 EMPTY 创建一个空的数据流来表示这个状态,用 startWith 设定初始值。然后 merge 这两个点击的数据流,但是这还有一个问题,点击事件的数据流需要与表示状态的数据流进行逻辑计算,发出最终的状态,我们才能去订阅这个最终的数据流来更改页面的显示。而这种累计计算的方法,可以用 scan 操作符来实现。最终实现如下:
import { fromEvent, EMPTY, merge } from 'rxjs' import { mapTo, startWith, scan } from 'rxjs/operators' const addButton = document.getElementById('addButton') const minusButton = document.getElementById('minusButton') const state = document.getElementById('state') const addClick$ = fromEvent(addButton, 'click').pipe(mapTo(1)) const minusClick$ = fromEvent(minusButton, 'click').pipe(mapTo(-1)) merge( EMPTY.pipe(startWith(0)), addClick$, minusClick$) .pipe( scan((origin, next) => origin + next) ).subscribe(item => { state.textContent = item })查看演示
简单拖拽页面上有一个 id 为 drag 的 div:
<div></div>