error:当 Observable 内部发生错误时被调用,之后不会调用 complete,next 方法无效
const source$ = new Observable(observer => { observer.next(1) observer.next(2) observer.complete() observer.next(3) }) const observer = { next: item => console.log(item), complete: () => console.log('complete') } source$.subscribe(observer)上面的代码会输出 1,2,'complete',而不会输出 3。
const source$ = new Observable(observer => { try { observer.next(1) observer.next(2) throw new Error('there is an exception') observer.complete() } catch (e) { observer.error(e) } }) const observer = { next: item => console.log(item), error: e => console.log(e), complete: () => console.log('complete') } source$.subscribe(observer)注意 error 之后不会再调用 complete。
Observer 还有简单形式,即不用构建一个对象,而是直接把函数作为 subscribe 方法的参数。
source$.subscribe( item => console.log(item), e => console.log(e), () => console.log('complete') )参数依次为 next 、error、complete,后面两个参数可以省略。
延迟执行(lazy evaluation)我们传给 new Observable 的回调函数如果没有订阅是不会执行的,订阅一个 Observable 就像是执行一个函数,和下面的函数类似。这和我们常见的那种内部保存有观察者列表的观察者模式是不同的,Observable 内部没有这个观察者列表。
function subscribe (observer) { let number = 1 setInterval(() => { observer.next(number++) }, 1000) } subscribe({ next: item => console.log(item), error: e => console.log(e), complete: () => console.log('complete') }) 退订(unsubscribe)观察者想退订,只要调用订阅返回的对象的 unsubscribe 方法,这样观察者就再也不会接受到 Observable 的信息了。
const source$ = new Observable(observer => { let number = 1 setInterval(() => { observer.next(number++) }, 1000) }) const observer = { next : item => console.log(item) } const subscription = source$.subscribe(observer) setTimeout(() => { subscription.unsubscribe() }, 5000) 操作符在 RxJS 中,操作符是用来处理数据流的。我们往往需要对数据流做一系列处理,才交给 Observer,这时一个操作符就像一个管道一样,数据进入管道,完成处理,流出管道。
import { interval } from 'rxjs'; import { map } from 'rxjs/operators' const source$ = interval(1000).pipe( map(x => x * x) ) source$.subscribe(x => console.log(x))interval 操作符创造了一个数据流,interval(1000) 会产生一个每隔 1000 ms 就发出一个从 0 开始递增的数据。map 操作符和数组的 map 方法类似,可以对数据流进行处理。具体见演示地址。
这个 map 和数组的 map 方法会产生新的数组类似,它会产生新的 Observable。每一个操作符都会产生一个新的 Observable,不会对上游的 Observable 做任何修改,这完全符合函数式编程“数据不可变”的要求。
上面的 pipe 方法就是数据管道,会对数据流进行处理,上面的例子只有一个 map 操作符进行处理,可以添加更多的操作符作为参数。
弹珠图弹珠图(Marble diagrams)就是用图例形象地表示 Observable 和各种操作符的一种方法。
用 - 表示一小段时间,X 代表有错误发生, | 表示结束,() 表示同步发生。
上面的例子可以如下表示:
source: -----0-----1-----2-----3--... map(x => x * x) newest: -----0-----1-----4-----9--...具体关于弹珠图的使用可以查看这个网站。
创建 Observable创建 Observable 的这些方法就是用来创建 Observable 数据流的,注意和操作符不同,它们是从 rxjs 中导入的,而不是 rxjs/operators 。
of 方法之前我们写的这种形式:
const source$ = new Observable(observer => { observer.next(1) observer.next(2) observer.next(3) observer.complete() })使用 of 方法将会非常简洁:
import {of} from 'rxjs' const source$ = of(1, 2, 3) from 方法上面的代码用 from 则是这样:
import {from} from 'rxjs' const source$ = from([1, 2, 3])from 可以将可遍历的对象(iterable)转化为一个 Observable,字符串也部署有 iterator 接口,所以也支持。