另外需要注意的是,我们不能在链中多次使用subscribeOn()。从技术上讲,你可以这样做,但是它并不会产生额外的作用。在下面的代码片段中,我们将三个不同的Scheduler连接到了一起,你能猜出来,哪个Scheduler会成为源observable吗?
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.newThread()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));如果你的答案是Schedulers.io(),那么恭喜你答对了!
即便我们在链中放置多个subscribeOn()操作符,只有最靠近源observable的那一个会发挥作用。
背后的原理我们值得花一些时间更深入地理解一下上述的场景。为什么Schedulers.io() Scheduler会发挥作用,而不是其他的Scheduler?正常情况下,你可能会认为Schedulers.newThread()会生效,因为它是在链的最后才添加上去的。
我们必须要理解,在RxJava中,订阅(subscription)必须是基于上游observable实例的。如下的代码与我们前面看到的非常类似,只是更加繁琐一些。
Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5); Observable<Integer> o2 = o1.filter(integer -> integer % 2 == 0); Observable<Integer> o3 = o2.map(integer -> integer * 10); o3.subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));