from 还可以根据 promise 创建一个 Observable。我们用 fetch 或者 axios 等类库发送的请求都是一个 promise 对象,我们可以使用 from 将其处理为一个 Observable 对象。
fromEvent 方法用 DOM 事件创建 Observable,第一个参数为 DOM 对象,第二个参数为事件名称。具体示例见前面 RxJS 入门章节的一个简单例子。
fromEventPattern 方法将添加事件处理器、删除事件处理器的 API 转化为 Observable。
function addClickHandler (handler) { document.addEventListener('click', handler) } function removeClickHandler (handler) { document.removeEventListener('click', handler) } fromEventPattern( addClickHandler, removeClickHandler ).subscribe(x => console.log(x))也可以是我们自己实现的和事件类似,拥有注册监听和移除监听的 API。
import { fromEventPattern } from 'rxjs' class EventEmitter { constructor () { this.handlers = {} } on (eventName, handler) { if (!this.handlers[eventName]) { this.handlers[eventName] = [] } if(typeof handler === 'function') { this.handlers[eventName].push(handler) } else { throw new Error('handler 不是函数!!!') } } off (eventName, handler) { this.handlers[eventName].splice(this.handlers[eventName].indexOf(handler), 1) } emit (eventName, ...args) { this.handlers[eventName].forEach(handler => { handler(...args) }) } } const event = new EventEmitter() const subscription = fromEventPattern( event.on.bind(event, 'say'), event.off.bind(event, 'say') ).subscribe(x => console.log(x)) let timer = (() => { let number = 1 return setInterval(() => { if (number === 5) { clearInterval(timer) timer = null } event.emit('say', number++) }, 1000) })() setTimeout(() => { subscription.unsubscribe() }, 3000)演示地址
interval、timerinterval 和 JS 中的 setInterval 类似,参数为间隔时间,下面的代码每隔 1000 ms 会发出一个递增的整数。
interval(1000).subscribe(console.log) // 0 // 1 // 2 // ...timer 则可以接收两个参数,第一个参数为发出第一个值需要等待的时间,第二个参数为之后的间隔时间。第一个参数可以是数字,也可以是一个 Date 对象,第二个参数可省。
range操作符 of 产生较少的数据时可以直接写如 of(1, 2, 3),但是如果是 100 个呢?这时我们可以使用 range 操作符。
range(1, 100) // 产生 1 到 100 的正整数 empty、throwError、neverempty 是创建一个立即完结的 Observable,throwError 是创建一个抛出错误的 Observable,never 则是创建一个什么也不做的 Observable(不完结、不吐出数据、不抛出错误)。这三个操作符单独用时没有什么意义,主要用来与其他操作符进行组合。目前官方不推荐使用 empty 和 never 方法,而是推荐使用常量 EMPTY 和 NEVER(注意不是方法,已经是一个 Observable 对象了)。
deferdefer 创建的 Observable 只有在订阅时才会去创建我们真正想要操作的 Observable。defer 延迟了创建 Observable,而又有一个 Observable 方便我们去订阅,这样也就推迟了占用资源。
defer(() => ajax(ajaxUrl))只有订阅了才会去发送 ajax 请求。
操作符操作符其实看作是处理数据流的管道,每个操作符实现了针对某个小的具体应用问题的功能,RxJS 编程最大的难点其实就是如何去组合这些操作符从而解决我们的问题。
在 RxJS 中,有各种各样的操作符,有转化类、过滤类、合并类、多播类、错误处理类、辅助工具类等等。一般不需要自己去实现操作符,但是我们需要知道操作符是一个函数,实现的时候必须考虑以下功能:
返回一个全新的 Observable 对象
对上游和下游的订阅和退订处理
处理异常情况
及时释放资源
pipeable 操作符之前版本的 RxJS 各种操作符都挂载到了全局 Observable 对象上,可以这样链式调用:
source$.filter(x => x % 2 === 0).map(x => x * 2)现在需要这样使用:
import {filter, map} from 'rxjs/operators' source$.pipe( filter(x => x % 2 === 0), map(x => x * 2) )其实也很好理解,pipe 就是管道的意思,数据流通过操作符处理,流出然后交给下一个操作符。
几个类似数组方法的基础操作符map、filter 和数组的 map、filter 方法类似,scan 则是和 reduce 方法类似,mapTo 是将所有发出的数据映射到一个给定的值。
import {mapTo} from 'rxjs/operators' fromEvent(document, 'click').pipe( mapTo('Hi') ).subscribe(x => console.log(x))每次点击页面时都会输出 Hi。
一些过滤的操作符take 是从数据流中选取最先发出的若干数据
takeLast 是从数据流中选取最后发出的若干数据
takeUntil 是从数据流中选取直到发生某种情况前发出的若干数据
first 是获得满足判断条件的第一个数据
last 是获得满足判断条件的最后一个数据
skip 是从数据流中忽略最先发出的若干数据