这个Scheduler 每次都会创建一个全新的线程来完成一组工作。它不会从任何线程池中受益,线程的创建和销毁都是很昂贵的,所以你需要非常小心,不要衍生出太多的线程,导致服务器系统变慢或出现内存溢出的错误。
理想情况下,你应该很少使用这个Scheduler,它大多用于在一个完全分离的线程中开始一项长时间运行、隔离的一组任务。
Schedulers.single()这个Scheduler是RxJava 2新引入的,它的背后只有一个线程作为支撑,只能按照有序的方式执行任务。如果你有一组后台任务要在App的不同地方执行,但是同时只能承受一个任务执行的话,那么这个Scheduler就可以派上用场了。
Schedulers.from(Executor executor)我们可以使用它创建自定义的Scheduler,它是由我们自己的Executor作为支撑的。在有些场景下,我们希望创建自定义的Scheduler为App执行特定的任务,这些任务可能需要自定义的线程逻辑。
假设,我们想要限制App中并行网络请求的数量,那么我们就可以创建一个自定义的Scheduler,使其具有一个固定线程池大小的Executor:Scheduler.from(Executors.newFixedThreadPool(n)),然后将其应用到代码中所有网络相关的Observable上。
AndroidSchedulers.mainThread()这是一个特殊的Scheduler,它无法在核心RxJava库中使用,要使用它,必须要借助RxAndroid扩展库。这个Scheduler对Android App特别有用,它能够在应用的主线程中执行基于UI的任务。
默认情况下,它会在应用主线程关联的looper中进行任务排队,但是它有一个其他的变种,允许我们以API的形式使用任意的Looper:AndroidSchedulers.from(Looper looper)。
注意:在使用无边界线程池支撑的Scheduler时,比如Schedulers.io(),我们要特别小心,因为它有可能会导致线程池无限增长,使系统中出现大量的线程。
理解subscribeOn()和observeOn()现在,我们对不同类型的Scheduler已经有了一个清晰的理解,并且掌握了何时该使用哪种Scheduler,我们继续前进,接下来会详细介绍subscribeOn()和observeOn()操作符。
要理解RxJava真正的多线程功能,我们需要深入理解这两个操作符分别如何运行以及何时要将它们组合起来。
subscribeOn()简而言之,这个操作符指定源observable要在哪个线程中发出各个条目。读者需要理解这里“源(source)” observable的含义,如果你有一个observable链的话,源observable始终位于根部或者说是链的顶部,也就是产出物(emission)生成的地方。
读者也看到了,如果我们不使用subscribeOn()的话,所有的产出都是直接在代码执行的线程中生成的(在我们的场景中,也就是main线程)。
接下来,我们将所有的产出物在计算线程中生成,这需要使用subscribeOn()和 Schedulers.computation() Scheduler。如果你运行下面的代码片段,所有产出物是在线程池中某个可用的计算线程中生成的,RxComputationThreadPool-1。
简洁起见,我们没有使用完整的DisposableSubscriber,因为在这种简单的场景中,我们不需要每次都处理onError()和onComplete(),我们只需要处理onNext(),它只是一个简单的消费者。
Observable.just(1, 2, 3, 4, 5, 6) .subscribeOn(Schedulers.computation()) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));在链中,将subscribeOn()放到什么位置其实无关紧要。它只会影响源observable并控制在哪个线程中生成条目。
在上面的样例中,读者可能会注意到map()和filter()操作符也会生成其他的observable,subscribeOn()放到了链的底部。但是,如果你运行下面的代码片段的话,就会发现它只会影响源observable。如果将observeOn()同时加入到链中,会更加清晰。即便我们将subscribeOn()放到observeOn()下面,它也只会影响源observable。
Observable.just(1, 2, 3, 4, 5, 6) .doOnNext(integer -> println("Emitting item " + integer + " on: " + currentThread().getName())) .map(integer -> integer * 3) .filter(integer -> integer % 2 == 0) .subscribeOn(Schedulers.computation()) .subscribe(integer -> println("Consuming item " + integer + " on: " + currentThread().getName()));