响应式流可以认为是随着时间发出的一系列元素。响应式和观察者模式有点相似,订阅者订阅后,发布者吐出数据时,订阅者会响应式进行处理。实际上Rx 组合了观察者模式(Observer pattern )、迭代器模式(Iterator pattern)和函数式编程。
RxJS 是上面两种编程思想的结合,但是对于它是不是函数响应式编程(FRP)有比较大的争议,因为它虽然既是函数式又是响应式但是不符合早期 FRP 的定义。
RxJS 的特点数据流抽象了很多现实问题
擅长处理异步问题
把复杂问题分解为简单问题的组合
前端中的 DOM 事件、WebSocket 推送消息、AJAX 请求资源、动画都可以看作是数据流。
RxJS 对数据采用“推”的方式,当一个数据产生时,会将其推送给对应的处理函数,这个处理函数不用关心数据时同步产生还是异步产生的,因此处理异步将会变得非常简单。
RxJS 中很多操作符,每个操作符都提供了一个小功能,学习 RxJS 最重要的就是学习如何组合操作符来解决复杂问题。
RxJS 入门 RxJS 使用RxJS 仓库现在移到了 ReactiveX 组织下,最新的大版本为 6,与之前的版本相比有许多破坏性变更,请注意。
RxJS 的 import 路径有以下 5 种:
创建 Observable 的方法、types、schedulers 和一些工具方法
import { Observable, Subject, asapScheduler, pipe, of, from, interval, merge, fromEvent, SubscriptionLike, PartialObserver } from 'rxjs';
操作符 operators
import { map, filter, scan } from 'rxjs/operators';
webSocket
import { webSocket } from 'rxjs/webSocket';
ajax
import { ajax } from 'rxjs/ajax';
测试
import { TestScheduler } from 'rxjs/testing';
本文所有 demo 均在 v6.2.1 中测试过
一个简单的例子 import { fromEvent } from 'rxjs'; import { take } from 'rxjs/operators'; const eleBtn = document.querySelector('#btn') const click$ = fromEvent(eleBtn, 'click') click$.pipe(take(1)) .subscribe(e => { console.log('只可点击一次') eleBtn.setAttribute('disabled', '') })这里演示了 RxJS 的大概用法,通过 fromEvent 将点击事件转换为 RxJS 的 Observable (响应式数据流),take(1) 表示只操作一次,观察者通过订阅(subscribe)来响应变化。具体 API 的使用会在后面讲到。
演示地址
代表流的变量用 $ 符号结尾,是 RxJS 中的一种惯例。
RxJS 要点RxJS 有一个核心和三个重点,一个核心是 Observable 再加上相关的 Operators,三个重点分别是 Observer、Subject、Schedulers。
什么是 Observable个人认为在文档中说的 Observable 更确切的说法是 Observable Stream,也就是 Rx 的响应式数据流。
在 RxJS 中 Observable 是可被观察者,观察者则是 Observer,它们通过 Observable 的 subscribe 方法进行关联。
前面提到了 RxJS 结合了观察者模式和迭代器模式。
对于观察者模式,我们其实比较熟悉了,比如各种 DOM 事件的监听,也是观察者模式的一种实践。核心就是发布者发布事件,观察者选择时机去订阅(subscribe)事件。
在 ES6 中,Array、String 等可遍历的数据结构原生部署了迭代器(Iterator )接口。
const numbers = [1, 2, 3] const iterator = numbers[Symbol.iterator]() iterator.next() // {value: 1, done: false} iterator.next() // {value: 2, done: false} iterator.next() // {value: 3, done: false} iterator.next() // {value: undefined, done: true}观察者模式和迭代器模式的相同之处是两者都是渐进式使用数据的,只不过从数据使用者的角度来说,观察者模式数据是推送(push)过来的,而迭代器模式是自己去拉取(pull)的。Rx 中的数据是 Observable 推送的,观察者不需要主动去拉取。
Observable 与 Array 相当类似,都可以看作是 Collection,只不过 Observable 是 a collection of items over time,是随时间发出的一序列元素,所以下面我们会看到 Observable 的一些操作符与 Array 的方法极其相似。
创建 Observable要创建一个 Observable,只要给 new Observable 传递一个接收 observer 参数的回调函数,在这个函数中去定义如何发送数据。
import { Observable } from 'rxjs'; const source$ = new Observable(observer => { observer.next(1) observer.next(2) observer.next(3) }) const observer = { next : item => console.log(item) } console.log('start') source$.subscribe(observer) console.log('end')上面的代码通过 new Observable 创建了一个 Observable,调用它的 subscribe 方法进行订阅,执行结果为依次输出 'start',1,2,3,'end'。
下面我们再看一个异步的例子:
import { Observable } from 'rxjs'; const source$ = new Observable(observer => { let number = 1 setInterval(() => { observer.next(number++) }, 1000) }) const observer = { next : item => console.log(item) } console.log('start') source$.subscribe(observer) console.log('end')先输出 ’start' 、'end',然后每隔 1000 ms 输出一个递增的数字。
通过这两个小例子,我们知道 RxJS 既能处理同步的行为,也能处理异步的。
观察者 Observer观察者 Observer 是一个有三个方法的对象:
next: 当 Observable 发出新的值时被调用,接收这个值作为参数
complete:当 Observable 完结,没有更多数据时被调用。complete 之后,next 方法无效