概述
首先了解 Schedulers 和 Scheduler 的概念
Scheduler 是负责执行任务的单元, 调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程,而 Schedulers 是创建各种 Scheduler 的工厂,Schedulers.io()等等都返回的是 Scheduler。
我们可以使用使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制。
比如常用的 Observable 生成发射数据流,以及 Operators 加工数据流都是在后台线程中进行,而 Observer 在前台线程中接受并相应数据。
subscribeOn() 和 observeOn()
subscribeOn(): 指定 ObservableOnSubscribe#subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程,我把他叫做上游。
当调用多次 subscribeOn() 时,事件产生会运行在最早的一次调用声明的线程中。当然也不是说多次的调用是完全没效果的,后面会提到。
observeOn(): 指定 Observer 所运行在的线程。或者叫做事件消费的线程,我把他叫做下游。每次调用 observeOn() 都会发生线程切换,此次调用直到下次切换线程中间的过程中的操作运行在此次调用指定的线程中。
RxJava 提供了以下这些调度器:
Schedulers.single();
运行在一个单独的线程,顺序执行,先进先出
Schedulers.computation();
计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算;
默认线程数等于处理器的数量。 不要执行阻塞、IO操作(IO操作请使用Schedulers.io())
Schedulers.io();
I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程
Schedulers.trampoline();
在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行
Schedulers.newThread();
总是启用新线程,并在新线程执行操作
Schedulers.from(@NonNull Executor executor);
使用指定的 Executor 作为调度器
AndroidSchedulers.mainThread();
它指定的操作将在 Android 主线程运行
示例
以下示例部分参考:https://www.jianshu.com/p/12638513424f
自己再敲一遍,有助于更深层的理解
示例1 使用一次subscribeOn和一次observeOn:
1 2 3 4 5 6 7 8 9 10 11 12
| Observable .create((ObservableOnSubscribe<Integer>) emitter -> { for (int i =1;i<3;i++){ System.out.println("发射线程:"+Thread.currentThread().getName()+"---->"+"发射:"+i); Thread.sleep(1000); emitter.onNext(i); } emitter.onComplete(); }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> System.out.println("接收线程:"+Thread.currentThread().getName()+"---->"+"接收:"+integer)));
|
打印日志:
1 2 3 4
| 发射线程:RxCachedThreadScheduler-1---->发射:1 发射线程:RxCachedThreadScheduler-1---->发射:2 接收线程:main---->接收:1 接收线程:main---->接收:2
|
可以看出 上游数据发射在 RxCachedThreadScheduler-1 中执行,也就是IO线程中执行,下游接收是在主线程main中
示例2 使用两次 subscribeOn 和一次 observeOn:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Observable .create((ObservableOnSubscribe<Integer>) emitter -> { for (int i =1;i<3;i++){ System.out.println("发射线程:"+Thread.currentThread().getName()+"---->"+"发射:"+i); Thread.sleep(1000); emitter.onNext(i); } emitter.onComplete(); }) .subscribeOn(Schedulers.io()) .map(integer -> { System.out.println("处理线程:"+Thread.currentThread().getName()+"---->"+"处理:"+integer); return integer; }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> System.out.println("接收线程:"+Thread.currentThread().getName()+"---->"+"接收:"+integer));
|
打印日志:
1 2 3 4 5 6
| 发射线程:RxCachedThreadScheduler-1---->发射:1 处理线程:RxCachedThreadScheduler-1---->处理:1 接收线程:main---->接收:1 发射线程:RxCachedThreadScheduler-1---->发射:2 处理线程:RxCachedThreadScheduler-1---->处理:2 接收线程:main---->接收:2
|
可以看出 数据的接收是在主线程(main)中,但是发射和处理都在 RxCachedThreadScheduler-1 中执行,也就是IO线程中执行,而我们设置的 subscribeOn(Schedulers.newThread() 没起作用
示例3使用一次 subscribeOn 和两次 observeOn:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Observable .create((ObservableOnSubscribe<Integer>) emitter -> { for (int i =1;i<3;i++){ System.out.println("发射线程:"+Thread.currentThread().getName()+"---->"+"发射:"+i); Thread.sleep(1000); emitter.onNext(i); } emitter.onComplete(); }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(integer -> { System.out.println("处理线程:"+Thread.currentThread().getName()+"---->"+"处理:"+integer); return integer; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> System.out.println("接收线程:"+Thread.currentThread().getName()+"---->"+"接收:"+integer)));
|
打印日志:
1 2 3 4 5 6
| 发射线程:RxCachedThreadScheduler-1---->发射:1 发射线程:RxCachedThreadScheduler-1---->发射:2 处理线程:RxNewThreadScheduler-1---->处理:1 接收线程:main---->接收:1 处理线程:RxNewThreadScheduler-1---->处理:2 接收线程:main---->接收:2
|
可以看出发射线程在 RxCachedThreadScheduler-1 (IO线程) 中,数据的处理在 RxNewThreadScheduler-1 (也就是Schedulers.newThread())中,数据的接收在 主线程main中。说明 Observeon 每次设置都会起作用
示例4:
doOnSubscribe()/onSubscribe(),doOnNext()/onNext(),doOnComplete()/onComplete(),doOnError()/onError()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| System.out.println("当前线程---->" + Thread.currentThread().getName()); mDisposables.add(Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { System.out.println("Observable线程---->" + Thread.currentThread().getName()); e.onNext(10); e.onComplete(); } }) .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { System.out.println("doOnSubscribe线程---->" + Thread.currentThread().getName()); } }) .observeOn(Schedulers.newThread()) .doOnNext(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("doOnNext线程---->" + Thread.currentThread().getName()); } }) .observeOn(Schedulers.newThread()) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("doOnComplete线程---->" + Thread.currentThread().getName()); } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("subscribe线程---->" + Thread.currentThread().getName()); } }));
|
打印日志:
1 2 3 4 5 6
| 当前线程---->main doOnSubscribe线程---->main Observable线程---->RxComputationThreadPool-1 doOnNext线程---->RxNewThreadScheduler-2 doOnComplete线程---->RxNewThreadScheduler-3 subscribe线程---->RxNewThreadScheduler-4
|
可以看出我们预测2/3/4点都正确,doOnNext()、doOnComplete()、subscribe都在指定的newThread中,但是预测1却在主线程,说明 observeOn() 不能切换doOnSubscribe()。
我们再做个这样的测试,把整段代码放在新的 Thread 中执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| Thread thread = new Thread(){ @Override public void run() { System.out.println("当前线程---->" + Thread.currentThread().getName()); mDisposables.add(Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception { System.out.println("Observable线程---->" + Thread.currentThread().getName()); e.onNext(10); e.onComplete(); } }) .subscribeOn(Schedulers.computation()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { System.out.println("doOnSubscribe线程---->" + Thread.currentThread().getName()); } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("subscribe线程---->" + Thread.currentThread().getName()); } })); } };
thread.start();
|
输入日志:
1 2 3 4
| 当前线程---->Thread-282 doOnSubscribe线程---->Thread-282 Observable线程---->RxComputationThreadPool-1 subscribe线程---->RxNewThreadScheduler-1
|
看来doOnSubscribe比较特殊,它默认运行在执行该段代码的线程中,我们再用subscribeOn() 切换了线程试试:
1 2 3 4 5 6 7 8 9 10
|
.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { System.out.println("doOnSubscribe线程---->" + Thread.currentThread().getName()); } })
.subscribeOn(Schedulers.io())
|
输入日志:
1 2 3 4 5 6
| 当前线程---->main doOnSubscribe线程---->RxCachedThreadScheduler-1 Observable线程---->RxComputationThreadPool-1 doOnNext线程---->RxNewThreadScheduler-1 doOnComplete线程---->RxNewThreadScheduler-2 subscribe线程---->RxNewThreadScheduler-3
|
我们看到用 subscribeOn(Schedulers.io()) 切换到了 IO 线程
对于 doOnNext()/onNext(),doOnComplete()/onComplete(),doOnError()/onError() 几个方法 (前者是 被观察者 调用的方法,后者是 观察者 接口里面的对应方法) ,可以通过 observeOn() 进行线程的切换。
而对于 doOnSubscribe()/onSubscribe() 方法 (前者是 被观察者 调用的方法,后者是 观察者 接口里面的对应方法) 来说,如果他后面有调用 subscribeOn() 切换线程,那么它运行在切换的线程,否则他默认运行在执行该段代码的线程中
示例5 Schedulers.trampoline():
通过上面示例我们可以发现,Observer处理数据相比于Observable发射的数据存在滞后的现象,Observable发射了两个数据,Observer才处理了一个,并不是Observable每发射一个,Observer就处理一个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Observable .create((ObservableOnSubscribe<Integer>) emitter -> { for (int i = 1; i < 3; i++) { System.out.println("发射线程:" + Thread.currentThread().getName() + "---->" + "发射:" + i); Thread.sleep(1000); emitter.onNext(i); } emitter.onComplete(); }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.trampoline()) .subscribe(integer -> { Thread.sleep(2000); System.out.println("接收线程:" + Thread.currentThread().getName() + "---->" + "接收:" + integer); })
|
打印日志:
1 2 3 4
| 发射线程:RxCachedThreadScheduler-1---->发射:1 接收线程:RxCachedThreadScheduler-1---->接收:1 发射线程:RxCachedThreadScheduler-1---->发射:2 接收线程:RxCachedThreadScheduler-1---->接收:2
|
可以看到日志是按 发射->处理->接收 的顺序打印的,说明 Observable 在 Observer 将数据处理完之后才开始发射下一条。Schedulers.trampoline() 的作用 在当前线程(当前线程是IO线程)立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。
我们这样改下:
1 2 3 4
| // .subscribeOn(Schedulers.io()) // .observeOn(Schedulers.trampoline()) .subscribeOn(Schedulers.trampoline()) .observeOn(Schedulers.trampoline())
|
打印日志:
1 2 3 4
| 发射线程:main---->发射:1 接收线程:main---->接收:1 发射线程:main---->发射:2 接收线程:main---->接收:2
|
我们都注释掉:
1 2 3 4
| // .subscribeOn(Schedulers.io()) // .observeOn(Schedulers.trampoline()) // .subscribeOn(Schedulers.trampoline()) // .observeOn(Schedulers.trampoline())
|
打印日志:
1 2 3 4
| 发射线程:main---->发射:1 接收线程:main---->接收:1 发射线程:main---->发射:2 接收线程:main---->接收:2
|
和上面一样
说明 Schedulers.trampoline() 是作用当前主线程main上,相当于不指定线程。
示例6 Schedulers.single():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Observable .create((ObservableOnSubscribe<Integer>) emitter -> { for (int i = 1; i < 3; i++) { System.out.println("发射线程:" + Thread.currentThread().getName() + "---->" + "发射:" + i); Thread.sleep(1000); emitter.onNext(i); } emitter.onComplete(); }) .subscribeOn(Schedulers.single()) .observeOn(Schedulers.single()) .map(integer -> { System.out.println("处理线程:" + Thread.currentThread().getName() + "---->" + "处理:" + integer); return integer; }) .observeOn(Schedulers.single()) .subscribe(integer -> { System.out.println("接收线程:" + Thread.currentThread().getName() + "---->" + "接收:" + integer); })
|
打印日志:
1 2 3 4 5 6
| 发射线程:RxSingleScheduler-1---->发射:1 发射线程:RxSingleScheduler-1---->发射:2 处理线程:RxSingleScheduler-1---->处理:1 处理线程:RxSingleScheduler-1---->处理:2 接收线程:RxSingleScheduler-1---->接收:1 接收线程:RxSingleScheduler-1---->接收:2
|
通过Schedulers.single()将数据的发射,处理,接收在Schedulers.single()的线程单例中排队执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。
下面举个完整的示例,需求是这样的:
(1)在io线程请求服务器数据;
(2)请求之前需要在主线程显示 Loading dialog;
(3)数据返回后在 computation() 处理;
(4)处理完成后在 io() 存入数据库;
(5)主线程 更新UI;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| Observable .create(new ObservableOnSubscribe<List<String>>() { @Override public void subscribe(ObservableEmitter<List<String>> emitter) throws Exception { System.out.println("IO线程---->发起网络请求,请求服务器数据"); List<String> list = new ArrayList<>(); for (int i=0;i<10;i++){ list.add("数据"+i); } emitter.onNext(list); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { System.out.println("UI主线程---->显示 Loading dialog"); } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(Schedulers.computation()) .filter(new Predicate<List<String>>() { @Override public boolean test(List<String> list) throws Exception { System.out.println("计算线程处理---->处理数据"); return true; } }) .observeOn(Schedulers.io()) .doOnNext(new Consumer<List<String>>() { @Override public void accept(List<String> filterList) throws Exception { System.out.println("IO线程---->存入数据库"); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> list) throws Exception { System.out.println("UI主线程---->显示数据更新UI"); } })
|
打印日志:
1 2 3 4 5
| UI主线程 IO线程 计算线程处理 IO线程 UI主线程
|
通过以上示例,我们可以总结subscribeOn和observeOn的用法如下:
subscribeOn 用来声明上游事件发送时的所在线程,若多次设定,上游会运行在 最早 的一次调用声明的线程中。
observeOn 指定下游操作所在的线程。若多次设定,每次均起作用。如若不指定下游线程,则默认在上游所在的线程中