RxJS v6 学习指南 (13)

RxJS 中有一个 用于测试的 TestScheduler,RxJS 的测试大家可以查看程墨的《深入浅出 RxJS》或者其他资料。

import { TestScheduler } from 'rxjs/testing' RxJS 的一些实践 RxJS 与前端框架结合

Angular 自身引用了 RxJS,如 http 和 animation 都使用了 Observable,状态管理可以使用 ngrx。

Vue 官方有与 RxJS 集成的 vue-rx。

React 可以通过 Subject 建立桥梁,Redux 也有与 RxJS 结合的中间件 Redux-Observable。

轮询中的错误处理 interval(10000).pipe( switchMap(() => from(axios.get(url))), catchError(err => EMPTY) ).subscribe(data => render(data))

上面的代码,每隔 10s 去发送一个请求,当某个请求返回出错时,返回空的 Observable 而不渲染数据。这样处理貌似正确,但是实际上某个请求出错时,整个 Observable 终结了,因此轮询就结束了。为了保持轮询,我们需要进行隔离,把错误处理移到 switchMap 内部进行处理。

interval(10000).pipe( switchMap(() => from(axios.get(url)).pipe( catchError(err => EMPTY) )) ).subscribe(data => render(data)) 订阅管理

如果没有及时退订可能会引发内存泄露,我们需要通过退订去释放资源。

1)命令式管理

const subscription = source$.subscribe(observer) // later... subscription.unsubscribe()

上面的管理方式,数量很少时还好,如果数量较多,将会显得十分笨拙。

2) 声明式管理

const kill1 = fromEvent(button, 'click') const kill2 = getStreamOfRouteChanges() const kill3 = new Subject() const merged$ = mege( source1.pipe(takeUntil(kill1)), source2.pipe(takeUntil(kill2)), source3.pipe(takeUntil(kill3)) ) const sub = merged$.subscribe(observer) // later... sub.unsubscribe() // 或者发出任意结束的事件 kill3.next(true)

通过 takeUntil、map 或者其他操作符组合进行管理。这样更不容易漏掉某个退订,订阅也减少了。

3)让框架或者某些类库去处理

比如 Angular 中的 async pipe,当 unmount 时会自动退订,也不用写订阅。

不要 Rx 一切

不要过度使用 Rx,它比较适合以下场景:

组合事件时

增加延迟和控制频率

组合异步任务

需要取消时

简单的应用并不需要 RxJS。

RxJS 的业务实践

可以看看徐飞的相关思考:流动的数据——使用 RxJS 构造复杂单页应用的数据逻辑

RxJS 与 Async Iterator

Async Iterator 提案已经进入了 ES2018,可以认为是 iterator 的异步版本。在 Symbol 上部署了 asyncIterator 的接口,不过它的 next 方法返回的是 { value, done } 对象的 Promise 版本。可以使用 for-await-of 进行迭代:

for await (const line of readLines(filePath)) { console.log(line) }

使用 Async Iterator 我们可以很容易实现类似 RxJS 操作符的功能:

const map = async function*(fn) { for await(const value of this) yield fn(value) }

其他如 fromEvent 等也比较容易实现。Async Iterator 扩展库 axax 的一个例子:

import { fromEvent } from "axax/es5/fromEvent"; const clicks = fromEvent(document, 'click'); for await (const click of clicks) { console.log('a button was clicked'); }

下面是 Benjamin Gruenbaum 用 Async Iterator 实现 AutoComplete 的一个例子:

let tooSoon = false, last; for await (const {target: {value}} of fromEvent(el, "keyup")) { if(!value || tooSoon) continue; if(value === last) continue; last = value; yield await fetch("/autocomplete/" + value); // misses `last` tooSoon = true; delay(500).then(() => tooSoon = false); }

Async Iterator 相比 RxJS,没有那么多概念,上手快,也比较容易扩展实现那些操作符。

从数据消费者的角度上看,RxJS 是 push stream,由生产者把数据推送过来,Async Iterator 是 pull stream,是自己去拉取数据。

参考链接

博客:30 天精通 RxJS

书:深入浅出RxJS

视频:RxJS 5 Thinking Reactively | Ben Lesh

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdzpj.html