0%

Rxjava2 - from示例和源码解析

from示例和源码解析

fromIterable():

遍历 Iterable,和 just() 方式一样直接触发 onNext(),然后返回每项数据

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
list.add(2);
list.add(3);
mDisposables.add(Observable
.fromIterable(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("接收----->"+integer);
}
}));

输出日志:

1
2
3
4
接收----->0
接收----->1
接收----->2
接收----->3
分析

我们通过源码可以看到 fromIterable 通过 Iterable 构造了一个 ObservableFromIterable 然后返回。

1
2
3
4
5
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
ObjectHelper.requireNonNull(source, "source is null");
// 通过 Iterable 构造了一个 ObservableFromIterable 返回
return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}

ObservableFromIterable 继承 Observable。

接下来我们回到订阅过程,其订阅过程前面的内容跟上一节分析的是一样的,就不重复了。还记得订阅过程中 Observable 类的 subscribeActual() 是个抽象方法吗?他的真正实现是在 ObservableFromIterable 中,所以我们来看下 subscribeActual 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class ObservableFromIterable<T> extends Observable<T> {
final Iterable<? extends T> source;
public ObservableFromIterable(Iterable<? extends T> source) {
this.source = source;
}

@Override
public void subscribeActual(Observer<? super T> observer) {

......
// 把我们创建的 Observer 和 传的 Iterable 包装成 FromIterableDisposable
FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
observer.onSubscribe(d);

if (!d.fusionMode) {
d.run();
}
}
......
//内部静态类 FromIterableDisposable
}

同样也将我们自定义的 Observer 给包装成了一个新的 FromIterableDisposable 对象,然后调用 observer.onSubscribe(d) 设置了观察者的onSubscribe方法的回调。所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。最后执行 d.run()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> {
//我们创建的 观察者 Observer
final Observer<? super T> downstream;
//我们传的参数 Iterator
final Iterator<? extends T> it;

volatile boolean disposed;

boolean fusionMode;

boolean done;

boolean checkNext;

FromIterableDisposable(Observer<? super T> actual, Iterator<? extends T> it) {
this.downstream = actual;
this.it = it;
}

void run() {
boolean hasNext;
//循环 Iterator
do {
// 消息断开后直接返回
if (isDisposed()) {
return;
}
T v;

try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(e);
return;
}
// 执行 观察者 Observer 的回调 onNext()
downstream.onNext(v);
// 消息断开后直接返回
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(e);
return;
}
} while (hasNext);
// 循环结束执行onComplete() (循环结束isDisposed()都是false,否则在循环中就已经返回了)
if (!isDisposed()) {
downstream.onComplete();
}
}

......
}

fromArray():

遍历 数组,和 just() 方式一样直接触发 onNext(),然后返回每项数据

fromArray和多参数just一样,只不过 fromArray 可以传入多于10个的变量,并可传入一个数组

示例

示例1

1
2
3
4
5
6
7
8
9
mDisposables.add(Observable
//把 int 装箱成 Integer,所以返回的每个item
.fromArray(0,1,2,3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("接收----->"+integer);
}
}));
1
2
3
4
接收----->0
接收----->1
接收----->2
接收----->3

示例2

1
2
3
4
5
6
7
8
9
10
Integer[] array = {1, 2, 3, 4};
mDisposables.add(Observable
// 可传入一个数组
.fromArray(array)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("接收----->"+integer);
}
}));
1
2
3
4
接收----->0
接收----->1
接收----->2
接收----->3

示例3

1
2
3
4
5
6
7
8
9
10
11
12
//参数 int 类型 把整个array作为可变数据组的一个item,所以返回的是数组类型
int[] array = {1, 2, 3, 4};
mDisposables.add(Observable
// 可传入一个数组
.fromArray(array)
.subscribe(new Consumer<int[]>() {
@Override
public void accept(int[] ints) throws Exception {
//接收到的是数组的地址
System.out.println("接收----->"+ints.length);
}
}));
1
接收----->[I@6d6f6e28
分析

示例3和示例1、2的输入结果不一样,这个就涉及到java的 泛型T 以及 基本数据类型和其对应的包装类 相关的知识点,可自行查阅其他资料。

我的理解是 fromArray(T… items) 泛型T 理解成Object,当传int类型数组时,会把整个array当做一个item对象,但是当传入int 类型的多个变量时会自动装箱成Integer。其他八种基本数据类型一样的。

接下来我们看下fromArray的源码:

1
2
3
4
5
6
7
8
9
10
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

我们可以看到当只有一个参数时,就调用 just(T item),多个参数时调用的和多个参数的just()是一样的,具体的可以查看just示例和源码解析

fromCallable()

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
mDisposables.add(Observable
.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 100;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("接收----->"+i);
}
}));
1
接收----->100
分析

我们先看下Callable是什么,原来是java.util.concurrent 包下的一个接口,里面只有一个带返回值的方法。
再看 fromCallable

1
2
3
4
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier));
}

同样的装配了一个 ObservableFromCallable 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromCallable 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final class ObservableFromCallable<T> extends Observable<T> implements Callable<T> {
//我们传进去的 Callable
final Callable<? extends T> callable;
public ObservableFromCallable(Callable<? extends T> callable) {
this.callable = callable;
}

@Override
public void subscribeActual(Observer<? super T> observer) {
//通过我们创建的 observer 创建了一个DeferredScalarDisposable对象
DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer);
//执行订阅回调
observer.onSubscribe(d);
//中断后直接返回
if (d.isDisposed()) {
return;
}
T value;
try {
//callable.call()就是我们new的 Callable 中 call 方法的返回值
//这里做了非空判断,若如为空,直接抛出异常
value = ObjectHelper.requireNonNull(callable.call(), "Callable returned null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
}
return;
}
d.complete(value);
}

@Override
public T call() throws Exception {
return ObjectHelper.requireNonNull(callable.call(), "The callable returned a null value");
}
}

通过上面代码,我们看到,创建了一个 DeferredScalarDisposable 对象,最后执行了d.complete(value)。
接下来我们看 DeferredScalarDisposable 类的 complete 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final void complete(T value) {
int state = get();
if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0) {
return;
}
Observer<? super T> a = downstream;
if (state == FUSED_EMPTY) {
this.value = value;
lazySet(FUSED_READY);
a.onNext(null);
} else {
lazySet(TERMINATED);
// 执行回调 onNext(),并把new的 Callable 中 call 方法的返回值作为参数
a.onNext(value);
}
//假如没有执行dispose(),并执行完 onNext 方法后,接着执行onComplete
if (get() != DISPOSED) {
a.onComplete();
}
}

根据以上的分析,根据上面的分析,我们得出如下规则:

1、fromCallable 里的返回值就是 onNext 接收的参数。

2、通过 fromCallable() 方式 直接触发 onNext(),并执行 onComplete()。

fromFuture()

示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "返回值";
}
});

Observable
.fromFuture(futureTask)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
futureTask.run();
}

@Override
public void onNext(String s) {
System.out.println("接收----->" + s);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
System.out.println("onComplete");
}
});
1
2
3
System.out: onSubscribe
System.out: 接收----->返回值
System.out: onComplete
分析

我们先简单的看下Future 和 FutureTast 。Future类位于java.util.concurrent包下,它也是一个接口。

Future就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

FutureTask实现了RunableFuture接口,同时RunableFuture又继承Future,Runable接口,也就是说FutureTask具备Runbale的run方法执行异步任务,也可以像Future一样能够控制任务的执行。事实上,FutureTask是Future接口的一个唯一实现类。

详细用法查看Java并发编程:Callable、Future和FutureTask

接下来我们看下 fromFuture 的源码:

同样的装配了一个 ObservableFromFuture 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromFuture 类的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void subscribeActual(Observer<? super T> observer) {
DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer);
observer.onSubscribe(d);
if (!d.isDisposed()) {
T v;
try {
v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (!d.isDisposed()) {
observer.onError(ex);
}
return;
}
d.complete(v);
}
}

这个方法也很好理解,DeferredScalarDisposable 类和我们上面说的 fromCallable 的一样的,这里注意这一行代码:
v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");

假如我们不执行 futureTask.run(); 就会一直阻塞。