export class ReplaySubject extends Subject {
constructor(bufferSize = Number.POSITIVE_INFINITY,
windowTime = Number.POSITIVE_INFINITY,
scheduler) {
super();
this.scheduler = scheduler;
this._events = []; // ReplayEvent对象列表
this._bufferSize = bufferSize < 1 ? 1 : bufferSize; // 设置缓冲区大小
this._windowTime = windowTime < 1 ? 1 : windowTime;
}
next(value) {
const now = this._getNow();
this._events.push(new ReplayEvent(now, value));
this._trimBufferThenGetEvents();
super.next(value);
}
_subscribe(subscriber) {
const _events = this._trimBufferThenGetEvents(); // 过滤ReplayEvent对象列表
let subscription;
if (this.closed) {
throw new ObjectUnsubscribedError();
}
...
else {
this.observers.push(subscriber);
subscription = new SubjectSubscription(this, subscriber);
}
...
const len = _events.length;
// 重新发送设定的最后bufferSize个值
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i].value);
}
...
return subscription;
}
}
class ReplayEvent {
constructor(time, value) {
this.time = time;
this.value = value;
}
}
关于RxJS Subject的学习笔记(7)
ReplaySubject 应用
有些时候我们希望在 Subject 新增订阅者后,能向新增的订阅者重新发送最后几个值,这时我们就可以使用 ReplaySubject ,具体示例如下:
var subject = new Rx.ReplaySubject(2); // 重新发送最后2个值
var observerA = {
next: value => console.log('Observer A get value: ' + value),
error: error => console.log('Observer A error: ' + error),
complete: () => console.log('Observer A complete!')
};
var observerB = {
next: value => console.log('Observer B get value: ' + value),
error: error => console.log('Observer B error: ' + error),
complete: () => console.log('Observer B complete!')
};
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
setTimeout(() => {
subject.subscribe(observerB); // 1秒后订阅
}, 1000);
以上代码运行后,控制台的输出结果:
Observer A get value: 1
Observer A get value: 2
Observer A get value: 3
Observer B get value: 2
Observer B get value: 3
可能会有人认为 ReplaySubject(1) 是不是等同于 BehaviorSubject,其实它们是不一样的。在创建BehaviorSubject 对象时,是设置初始值,它用于表示 Subject 对象当前的状态,而 ReplaySubject 只是事件的重放。
JSBin - ReplaySubject
AsyncSubject
AsyncSubject 定义
AsyncSubject 源码片段
export class AsyncSubject extends Subject {
constructor() {
super(...arguments);
this.value = null;
this.hasNext = false;
this.hasCompleted = false; // 标识是否已完成
}
_subscribe(subscriber) {
if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
}
else if (this.hasCompleted && this.hasNext) { // 等到完成后,才发出最后的值
subscriber.next(this.value);
subscriber.complete();
return Subscription.EMPTY;
}
return super._subscribe(subscriber);
}
next(value) {
if (!this.hasCompleted) { // 若未完成,保存当前的值
this.value = value;
this.hasNext = true;
}
}
}
内容版权声明:除非注明,否则皆为本站原创文章。
转载注明出处:http://www.heiqu.com/82.html
