0%

Rxjava2 - defer示例和详细解析

defer示例和详细解析

defer

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 创建被观察者
* @return ObservableSource
*/
private ObservableSource<? extends Integer> getSource() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
System.out.println("发射----->" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
emitter.onComplete();
}
});
}

/**
* 创建观察者
* @return Observer
*/
private Observer<? super Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}

@Override
public void onNext(Integer integer) {
System.out.println("接收----->" + integer);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
};
}
public void testDefer() {
Observable<Integer> source = Observable
.defer(new Callable<ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> call() throws Exception {
return getSource();
}
});
source
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());

source
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
1
2
3
4
5
6
7
8
System.out: onSubscribe   //先订阅
System.out: onSubscribe
System.out: 发射----->RxCachedThreadScheduler-1--:1 //再发射数据
System.out: 发射----->RxCachedThreadScheduler-2--:1
System.out: 接收----->1
System.out: onComplete
System.out: 接收----->1
System.out: onComplete
分析

根据日志可以看出,直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个新的 Observable 。

Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

我们点进去看下 defer , 其实也返回了一个 Observable 子类的对象 ObservableDefer

那么接下来我们之间看 subscribeActual 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void subscribeActual(Observer<? super T> observer) {
ObservableSource<? extends T> pub;
try {
//返回我们通过 getSourse() 创建的 被观察者,
pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, observer);
return;
}

pub.subscribe(observer);
}

我们每次执行 source.subscribe(getObserver()) ;就会调用一次 subscribeActual方法,也就会通过 upplier.call() 返回我们新创建的被观察者,也就验证了 “直到有观察者订阅时才创建 Observable ,并且为每个观察者创建一个新的 Observable”。

我们再看下 pub.subscribe(observer); 这一行代码,pub 就是我们 getSourse() 时通过 create 创建的 Observable,这里的 pub.subscribe() 其实是执行了 ObservableCreate 的 subscribeActual 方法,接下来的流程就和 create 的订阅过程 一样了。