distinct 操作符可以用来去重,将上游重复的数据过滤掉。
of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe( zip(interval(1000)), map(arr => arr[0]), distinct() ).subscribe(x => console.log(x))上面的代码只会输出 1, 2, 3, 4
distinct 操作符还可以接收一个 keySelector 的函数作为参数,这是官网的一个 typescript 的例子:
interface Person { age: number, name: string } of<Person>( { age: 4, name: 'Foo' }, { age: 7, name: 'Bar' }, { age: 5, name: 'Foo' }, ).pipe( distinct((p: Person) => p.name), ).subscribe(x => console.log(x)) // { age: 4, name: 'Foo' } // { age: 7, name: 'Bar' }distinctUntilChanged 也是过滤重复数据,但是只会与上一次发出的元素比较。这个操作符比 distinct 更常用。distinct 要与之前发出的不重复的值进行比较,因此要在内部存储这些值,要小心内存泄漏,而 distinctUntilChanged 只用保存上一个的值。
dalay、delayWhen用来延迟上游 Observable 数据的发出。
delay 可以接受一个数字(单位默认为 ms)或者 date 对象作为延迟控制。
const clicks = fromEvent(document, 'click') const delayedClicks = clicks.pipe(delay(1000)) // 所有点击事件延迟 1 秒 delayedClicks.subscribe(x => console.log(x))我们前面介绍过 bufferWhen,dalayWhen 也带有 when,在 RxJS 中,这种操作符它接收的参数都是 Observable Factory,即一个返回 Observable 对象的回调函数,用这个 Observable 来进行控制。
每个 click 都延迟 0 至 5 秒之间的任意一个时间:
const clicks = fromEvent(document, 'click') const delayedClicks = clicks.pipe( delayWhen(event => interval(Math.random() * 5000)), ) delayedClicks.subscribe(x => console.log(x)) 异常错误处理异常处理的难点:
try/catch 只支持同步
回调函数容易形成回调地狱,而且每个回调函数的最开始都要判断是否存在错误
Promise 不能重试,而且不强制异常被捕获
对错误处理的处理可以分为两类,即恢复(recover)和重试(retry)。
恢复是虽然发生了错误但是让程序继续运行下去。重试,是认为这个错误是临时的,重试尝试发生错误的操作。实际中往往配合使用,因为一般重试是由次数限制的,当尝试超过这个限制时,我们应该使用恢复的方法让程序继续下去。
1)catchError
catchError 用来在管道中捕获上游传递过来的错误。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), catchError(err => of(8)) ).subscribe(x => console.log(x)) // 0 // 1 // 2 // 3 // 8catchError 中的回调函数返回了一个 Observable,当捕获到上游的错误时,调用这个函数,返回的 Observable 中发出的数据会传递给下游。因此上面当 x 为4 时发生了错误,会用 8 来替换。
catchError 中的回调函数除了接收错误对象为参数外,还有第二个参数 caught$ 表示上游的 Observable 对象。如果回调函数返回这个 Observable 对象,就会进行重试。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), catchError((err, caught$) => caught$), take(20) ).subscribe(x => console.log(x))这个代码会依次输出 5 次 0, 1, 2, 3。
2)retry
retry 可以接收一个整数作为参数,表示重试次数,如果是负数或者没有传参,会无限次重试。重试实际上就是退订再重新订阅。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), retry(5) // 重试 5 次 ).subscribe(x => console.log(x))在实际开发中,如果是代码原因造成的错误,重试没有意义,如果是因为外部资源导致的异常错误适合重试,如用户网络或者服务器偶尔不稳定的时候。
3)retryWhen
和前面带 when 的操作符一样,retryWhen 操作符接收一个返回 Observable 的回调函数,用这个 Observable 来控制重试的节奏。当这个 Observable 发出一个数据时就会进行一次重试,它完结时 retryWhen 返回的 Observable 也立即完结。
interval(1000).pipe( take(6), map(x => { if (x === 4) { throw new Error('unlucky number 4') } else { return x } }), retryWhen(err$ => err$.pipe( delay(1000), take(5)) ) // 延迟 1 秒后重试,重试 5 次 ).subscribe(x => console.log(x))retryWhen 的可定制性非常高,不仅可以实现延迟定制,还可以实现 retry 的控制重试次数。在实践中,这种重试频率固定的方法还不够好,如果之前的重试失败,之后重试成功的几率也不高。Angular 官网介绍了一个 的方法。将每次重试的延迟时间控制为指数级增长。
import { pipe, range, timer, zip } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { retryWhen, map, mergeMap } from 'rxjs/operators'; function backoff(maxTries, ms) { return pipe( retryWhen(attempts => range(1, maxTries) .pipe( zip(attempts, (i) => i), map(i => i * i), mergeMap(i => timer(i * ms)) ) ) ); } ajax('/api/endpoint') .pipe(backoff(3, 250)) .subscribe(data => handleData(data)); function handleData(data) { // ... }4)finalize