0%

Rxjava2 - just示例和源码解析

just示例和源码解析

just()示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.just("文章1", "文章2")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
Log.d(TAG, " onNext : " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, " onComplete");
}
})

Observable 的just()有10个重载方法,参数1~10个

1
2
3
public static <T> Observable<T> just(T item)
......
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

同样的我们从以下几个方面分析源码:

  • 被观察者Observable是何时创建
  • 观察者Observer何时创建(很简单就一个接口,不多介绍)
  • 被观察者与观察者如何建立subscribe订阅关系的

一个参数的 just()

1. 被观察者Observable是何时创建
1
2
3
4
5
 public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
//创建ObservableJust对象,封装成被观察者Observable
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

创建了 ObservableJust 对象,调用 RxJavaPlugins.onAssembly 返回了被观察者 Observable.

1
2
3
4
5
6
7
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

我们看下 ObservableJust 类,同样的也是继承 Observable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

private final T value;
public ObservableJust(final T value) {
this.value = value;
}
//这个方法很重要, 等会会说到
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}

@Override
public T call() {
return value;
}
}

这时候被观察者已经创建完成了,它就是 Observable 的子类 ObservableJust,我们在用 Observable.just() 的时候其实被观察者是 ObservableJust。

2. Observable 和 Observer 如何建立 subscribe 订阅关系的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);

npe.initCause(e);
throw npe;
}
}

这里的 Subscribe() 方法和 Observable.Create() 调的 Subscribe() 一样,我们知道 Subscribeactual() 这个方法是抽象方法,那它具体实现是不是和 Observable.Create() 一样也是 Observablecreate 类里呢?不是,不是,不是,重要的话说三遍哦,它的具体实现在我们看到 Observablejust 中。

那么我们回到 ObservableJust 中看下subscribeActual(Observer<? super T> observer)

1
2
3
4
5
6
7
8
protected void subscribeActual(Observer<? super T> observer) {
//创建了一个线程
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
//设置observer的回调方法onSubscribe
observer.onSubscribe(sd);
//执行线程
sd.run();
}

这里面创建了一个线程(Runnable),他就是 ScalarDisposable ,ScalarDisposable实现Runnable,把我们创建的观察者Observer 和参数value (Observable.just(“文章1”) 这里的‘文章1’) 作为构造方法的参数传进去了,

同时他也是Disposable的子类,所以 observer.onSubscribe(sd); 这行就很好理解了,就是设置了观察者的onSubscribe方法的回调,所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。

1
2
3
4
5
6
7
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}

......

接下来看下 ScalarDisposable 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static final class ScalarDisposable<T> extends AtomicInteger  implements QueueDisposable<T>, Runnable {

private static final long serialVersionUID = 3880992722410194083L;
//我们创建的观察者
final Observer<? super T> observer;
//我们在just中传递的参数(文章1)
final T value;

static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;

public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}


@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}

我们在subscribeActual()方法中看到最后执行了 sd.run(); 所以我们只需看 public void run() 这个方法,
run()方法首先执行了 observer.onNext(value), 也就是说我们在创建Observable时传的参数此时发送给observer,
然后在执行observer.onComplete()。

这样,Observable.just() 一个参数的方法就结束了

多个参数的 just()

同样的我们从以下几个方面分析源码:

1. 被观察者Observable是何时创建

我们下看源码:

1
2
3
4
5
6
public static <T> Observable<T> just(T item1, T item2) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");

return fromArray(item1, item2);
}

很简单,在往下看 fromArray()

1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

我们看到fromArray()参数是 可变长度参数,也就是说参数可以为1个,当然了1个时直接调用Observable.just(“文章1”) 一个参数的just()。最后返回一个可变长度参数 items 构造的 ObservableFromArray 对象,他也继承了Observable,也就是说我们创建的被观察着就是 ObservableFromArray 对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}

@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

observer.onSubscribe(d);

if (d.fusionMode) {
return;
}

d.run();
}
......
2. 接下来我们看下subscribe方法

同样是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public final void subscribe(Observer<? super T> observer) {
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);

npe.initCause(e);
throw npe;
}
}

但是抽象方法subscribeActual()在ObservableFromArray中执行
在看ObservableFromArray的subscribeActual()这个方法

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);

observer.onSubscribe(d);

if (d.fusionMode) {
return;
}

d.run();
}

此时创建了FromArrayDisposable对象,参数是我们创建的观察者observer,和传递的可变长度的参数array,然后执行Observer中的onSubscribe()方法。最后执行了FromArrayDisposable的run()方法(注意他不是线程的run()方法)。

我们在看FromArrayDisposable的run()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

final Observer<? super T> downstream;

final T[] array;

FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}

void run() {
T[] a = array;
int n = a.length;

for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}

变量downstream 就是我们创建的观察者 Observer ,array 就是我们传递的可变长度的那一串数组

run()方法中遍历array,然后执行回调 downstream.onNext(value) ,最后在执行回调 downstream.onComplete()。

根据上面的分析,我们得出如下规则:

1、通过 just() 方式 直接触发 onNext()

2、just 传进去什么,在onNext() 接收什么,如果我们传入 List,同样的在 onNext() 接收的也是 List,而不是 List 的 Item

3、onNext() 中接收数据的顺序是根据 just 传入的顺序确认的,使用 just 不允许传递 null,否则会出现异常