0%

Rxjava2 - Observable示例和源码解析

Observable示例和源码解析

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//1、创建一个被观察着(公众号),定义要发送的事件
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
});
//2、创建一个观察着(用户),接收事件并作出响应操作
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

@Override
public void onNext(String str) {
Log.d(TAG, "onNext : "+str);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
//建立用户和公众号的订阅关系
observable.subscribe(observer);

其实RxJava的核心思想就是观察者模式,只要理解这个,其实RxJava也不难。说白了就是要有观察者和被观察着,然后建立观察者和被观察者之间的关系。

  • 被观察者Observable(公众号)何时创建?
  • 观察者Observer(用户)何时创建?
  • 被观察者与观察者如何subscribe(用户关注了公众号)订阅?
    1. 被观察者Observable类

被观察者Observable为抽象类 实现 ObservableSource接口

1
2
3
4
5
6
7
8
public abstract class Observable<T> implements ObservableSource<T> {
...
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
...
}

很简单,create方法里面就两行代码,先去判断source的是否为空,然后再去调用RxJavaPlugins.onAssembly(new ObservableCreate(source)),先 new 了 ObservableCreate 类,该类继承了 Observable,然后通过 RxJavaPlugins.onAssembly 方法返回 Observable 对象。

这样 Observable 就创建完成了,其实是创建了 Observable 的子类 ObservableCreate 对象,也就是真实的被观察着对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//这个方法很重要,Observable 的 subscribe 其实就执行的这个方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
2. 观察者 Observer
1
2
3
4
5
6
7
8
9
10
public interface Observer<T> {
//订阅时回调
void onSubscribe(@NonNull Disposable d);
//成功回调
void onNext(@NonNull T t);
//错误回调
void onError(@NonNull Throwable e);
//完成时回调
void onComplete();
}

这个类十分简单,是标准的函数式接口

3. 被观察者与观察者如何subscribe(用户关注了公众号)订阅

observable.subscribe(observer);

1
2
3
4
5
6
7
8
public final void subscribe(Observer<? super T> observer) {


observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(observer);

}

RxJavaPlugins.onSubscribe 此时直接返回Observer观察者对象,最后执行的是 subscribeActual() 方法,我们点进去看看

protected abstract void subscribeActual(Observer<? super T> observer);

是个抽象方法,我们来看看 Observable 子类 ObservableCreate 里面是不是有 subscribeActual(Observer<? super T> observer),又回到了 ObservableCreate 类。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

该方法的参数就是我们创建的观察者 Observer,这里创建了 CreateEmitter 对象,把我们的观察者 Observer 传到 CreateEmitter 的构造方法中。
CreateEmitter 类继承了 Disposable 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static final class CreateEmitter<T>  extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

final Observer<? super T> observer;

CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
//最终的回调就是我们创建的观察者 Observer 的 onNext()
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
//最终的回调就是我们创建的观察者 Observer 的 onError()
@Override
public boolean tryOnError(Throwable t) {
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
//最终的回调就是我们创建的观察者 Observer 的 onComplete()
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
......
}

是不是看到了我们在创建观察者 Observer 时的几个回调方法了,

接下来observer.onSubscribe(parent);这行代码,其实就是我们创建观察者 Observer 时的

public void onSubscribe(Disposable d) 回调

1
2
3
4
5
6
7
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

......

的参数 Disposable d 就是 CreateEmitter 对象

再看 source.subscribe(parent); 这行代码,这个source就是我们创建被观察者 Observable(其实是它的子类 ObservableCreate)时 new 的 ObservableOnSubscribe,它只有一个 subscribe 方法,执行完这行代码,被观察者与观察者就订阅关系。

那么当我们在执行

1
2
3
4
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();

这几行代码的时候,也就是被观察者 Observable 通过CreateEmitter发送事件时,观察者 Observer 就会走相应的回调方法,
当此执行完 onComplete() 观察者收到完成回调,整个订阅流程就完成了。

Observable.subscribe()除了接收Observer参数外,还可以接收Consumer参数

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("文章1");
emitter.onNext("文章2");
emitter.onNext("文章3");
emitter.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, " Consumer : onNext : " + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, " Consumer : onError");
}
}, new Action() {
@Override
public void run() throws Exception {
Log.d(TAG, " Action : onComplete");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(TAG, " Consumer : onSubscribe");
}
})

这里我们看到传入的是Consumer,我们来看看这个类:

1
2
3
public interface Consumer<T> {
void accept(T t) throws Exception;
}

很简单,就是个普通的接口,里面只有一个accept方法

当使用Consumer作为subscribe()的参数时,最多可以接收4个回调参数,而且执行结果和subscribe(observer)的一样。
接下来我们看下Observable的subscribe()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete) {
return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
}

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}

有五个重载方法,参数个数0到4个,参数名和接口Observer的方法名一样,从而我们可以猜测处理的事情应该和Observer每个方法的一样。我们看到 当参数少于4个时,就传入空的回调(不是null),那么我们直接看4个参数的subscribe()。

先分别判断参数是否为空,然后通过这个4个参数构造一个 LambdaObserver 对象,最后调用subscribe(ls),这个subscribe(ls)又是什么呢?我们点进去看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);

npe.initCause(e);
throw npe;
}
}

嗯?似曾相识,对,就是我们认识的那个subscribe,参数就是上面创建的 LambdaObserver 对象,LambdaObserver继承了接口Observer,然后在传入 subscribeActual(observer) ,接下来的流程就和上面一样了。

我们在看 LambdaObserver 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {

private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;

public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
//调用观察者的 onSubscribe()
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
//调用观察者的 onNext()
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
//调用观察者的 onError()
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}

//调用观察者的 onComplete()
@Override
public void onComplete() {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}

}

构造方法里面就是我们传进去的四个参数,而且有四个方法我们似乎也很熟悉,对,他就是接口 Observer 的实现方法。这四个方法中分别执行了四个回调

onSubscribe.accept(Disposable);

onNext.accept(T);

onError.accept(Throwable);

onComplete.run();

也就是subscribe时传入的回调。