上面的Promise无论如何都会打印出xxxx,目前未知ES6规范的Promise仍未实现cancellation,但是使用Observable可以很方便的实现
const { defer } = require('rxjs') const delay = wait => { return new Promise(resolve => { setTimeout(resolve, wait) }) } const source$ = defer(() => delay(3000)) const subscription = source$.subscribe(() => console.log('xxxx')) setTimeout(subscription.unsubscribe.bind(subscription), 2000)只需要在结果返回前取消订阅就不会打印出结果
4.Observable 是可以重试的(Retryable)
const { Observable } = require('rxjs') const { retry } = require('rxjs/operators') const source$ = new Observable(observer => { observer.next(1) throw 'Error!' setTimeout(() => observer.complete(), 4000) }) source$ .pipe(retry(3)) .subscribe(result => console.log(result), err => console.log('Error')) // 1 // 1 // 1 // 1 // Error从上面的比较可以看出,RxJS可以处理很多Promise难以处理的场景,而Promise也可以很方便的用过defer操作符转化成Observable.
const { defer } = require('rxjs') const delay = wait => { return new Promise(resolve => { setTimeout(resolve, wait) }) } defer(() => delay(3000)).subscribe(() => console.log('xxx')) 五、RxJS使用案列 1. 搜索类问题搜索类问题是我们实际开发中常遇到的一类问题,如下面的两种场景,上图是查询翻译结果,下图是获取保险报价,这两种场景实际上是一类问题——根据特定的条件查询正确的结果,这类问题在实践的时候需要注意几点:
查询请求需要防抖
检查查询条件是否合理
检查结果是否和条件对应
参照上述三个注意点,我们首先看使用普通JS实现方式.
const input = document.querySelector('#input') let lastShowedResult = 0 let timer = null input.addEventListener('change', evt => { clearTimeout(timer) timer = setTimeout( async query => { if (query && query.length > 0) { const requestTime = +Date.now() const data = await fetch(url) if (requestTime > lastShowedResult) { lastShowedResult = requestTime showResult(data) } } }, 500, evt.target.value.trim() ) })上面的功能涉及到三个非同步行为:输入框输入、防抖、网络请求,如果使用普通JS实现,可以看到这三种异步行为使用了不同的范式,另外上面的代码还有一个丑陋的地方是使用了很多外部标识,如timer、lastShowedResult。下面是RxJS实现的版本。
import { fromFetch } from "rxjs/fetch" import { debounceTime, pluck, map, filter, switchMap } from "rxjs/operators" const search$ = fromEvent(document.querySelector('#input'), 'change').pipe( debounceTime(500), pluck('target', 'value'), map(query => query.trim()), filter(query => query.length !== 0), switchMap(query => fromFetch(`${url}?keyword=${query}`)) ) search$.subscribe(data => { showResult(data) })上面使用的一些操作符在前面没有提到过,fromEvent是一个创建类操作符,它可以基于给定事件目标的特定类型事件创建一个Observable对象,debounceTime很好理解,只有在特定时间间隔过去源Observable没有吐出下个值,才从源Observable获取一个值,pluck用来获取嵌套熟悉值,fromFetch创建一个网络请求的Observable对象,switchMap是一个关键的操作符,也是比较难以理解的,实际上switchMap 等价于 pipe(map, switchAll),为了便于演示,将上述模型进行一下简化。
const { defer, interval, range } = require('rxjs') const { debounceTime, take, map, concatAll, scan, mapTo, switchMap } = require('rxjs/operators') const fetchData = query => { return new Promise((resolve, reject) => { setTimeout(resolve, (query + 1) * 100, query) }) } const source$ = range(2, 6).pipe( map(s => interval(s * 100).pipe(take(1))), concatAll(), mapTo(1), scan((acc, one) => acc + one, 0) ) const search$ = source$.pipe( debounceTime(500), map(query => defer(() => fetchData(query))), switchAll() // switchMap(query => defer(() => fetchData(query)) ) search$.subscribe(data => { console.log(data) }) // 3 // 4 // 6假设数据不是来源于用户输入而是来源于模拟的source$可观测对象,响应数据会原样返回请求的数据。使用弹珠图分析异步数据的变化。
source$
这里我们不关注构造source$的原理,需要它能够模拟用户输入,产生不同时间间隔的异步数据序列,如下弹珠图所示