from示例和源码解析
fromIterable(): 遍历 Iterable,和 just() 方式一样直接触发 onNext(),然后返回每项数据
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 List<Integer > list = new ArrayList<>(); list.add (0 ); list.add (1 ); list.add (2 ); list.add (3 ); mDisposables.add (Observable .fromIterable(list) .subscribe(new Consumer<Integer >() { @Override public void accept(Integer integer ) throws Exception { System .out .println("接收----->"+integer ); } }));
输出日志:
分析 我们通过源码可以看到 fromIterable 通过 Iterable 构造了一个 ObservableFromIterable 然后返回。
1 2 3 4 5 public static <T> Observable<T> fromIterable(Iterable<? extends T> source ) { ObjectHelper . requireNonNull(source , "source is null" ) ; return RxJavaPlugins . onAssembly(new ObservableFromIterable<T>(source ) ); }
ObservableFromIterable 继承 Observable。
接下来我们回到订阅过程,其订阅过程前面的内容跟上一节分析的是一样的,就不重复了。还记得订阅过程中 Observable 类的 subscribeActual() 是个抽象方法吗?他的真正实现是在 ObservableFromIterable 中,所以我们来看下 subscribeActual 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public final class ObservableFromIterable<T> extends Observable<T> { final Iterable <? extends T > source; public ObservableFromIterable (Iterable <? extends T > source) { this .source = source; } @Override public void subscribeActual(Observer <? super T > observer) { ...... FromIterableDisposable <T > d = new FromIterableDisposable <T >(observer, it); observer.onSubscribe(d); if (!d.fusionMode) { d.run(); } } ...... }
同样也将我们自定义的 Observer 给包装成了一个新的 FromIterableDisposable 对象,然后调用 observer.onSubscribe(d) 设置了观察者的onSubscribe方法的回调。所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。最后执行 d.run()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 static final class FromIterableDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final Iterator<? extends T> it; volatile boolean disposed; boolean fusionMode; boolean done ; boolean checkNext; FromIterableDisposable(Observer<? super T> actual , Iterator<? extends T> it ) { this.downstream = actual; this.it = it; } void run() { boolean hasNext; do { if (isDisposed() ) { return; } T v; try { v = ObjectHelper . requireNonNull(it .next () , "The iterator returned a null value" ); } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; downstream.onError(e ) ; return; } downstream.onNext(v ) ; if (isDisposed() ) { return; } try { hasNext = it.hasNext() ; } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; downstream.onError(e ) ; return; } } while (hasNext); if (!isDisposed() ) { downstream.onComplete() ; } } ...... }
fromArray(): 遍历 数组,和 just() 方式一样直接触发 onNext(),然后返回每项数据 fromArray和多参数just一样,只不过 fromArray 可以传入多于10个的变量,并可传入一个数组
示例 示例1
1 2 3 4 5 6 7 8 9 mDisposables.add (Observable //把 int 装箱成 Integer ,所以返回的每个item .fromArray(0 ,1 ,2 ,3 ) .subscribe(new Consumer<Integer >() { @Override public void accept(Integer integer ) throws Exception { System .out .println("接收----->"+integer ); } }));
示例2
1 2 3 4 5 6 7 8 9 10 Integer [] array = {1 , 2 , 3 , 4 };mDisposables.add (Observable // 可传入一个数组 .fromArray(array ) .subscribe(new Consumer<Integer >() { @Override public void accept(Integer integer ) throws Exception { System .out .println("接收----->"+integer ); } }));
示例3
1 2 3 4 5 6 7 8 9 10 11 12 int [] array = {1 , 2 , 3 , 4 };mDisposables.add(Observable .fromArray(array ) .subscribe(new Consumer<int []>() { @Override public void accept(int [] int s) throws Exception { System.out .println("接收----->" +int s.length); } }));
分析 示例3和示例1、2的输入结果不一样,这个就涉及到java的 泛型T 以及 基本数据类型和其对应的包装类 相关的知识点,可自行查阅其他资料。
我的理解是 fromArray(T… items) 泛型T 理解成Object,当传int类型数组时,会把整个array当做一个item对象,但是当传入int 类型的多个变量时会自动装箱成Integer。其他八种基本数据类型一样的。
接下来我们看下fromArray的源码:
1 2 3 4 5 6 7 8 9 10 public static <T> Observable<T> fromArray(T... items ) { ObjectHelper . requireNonNull(items , "items is null" ) ; if (items.length == 0 ) { return empty() ; } else if (items.length == 1 ) { return just(items[0 ] ); } return RxJavaPlugins . onAssembly(new ObservableFromArray<T>(items ) ); }
我们可以看到当只有一个参数时,就调用 just(T item),多个参数时调用的和多个参数的just()是一样的,具体的可以查看just示例和源码解析
fromCallable() 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 mDisposables .add (Observable .fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return 100 ; } }) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer i) throws Exception { System.out.println("接收----->" +i); } }));
分析 我们先看下Callable是什么,原来是java.util.concurrent 包下的一个接口,里面只有一个带返回值的方法。 再看 fromCallable
1 2 3 4 public static <T> Observable<T> fromCallable(Callable<? extends T> supplier ) { ObjectHelper . requireNonNull(supplier , "supplier is null" ) ; return RxJavaPlugins . onAssembly(new ObservableFromCallable<T>(supplier ) ); }
同样的装配了一个 ObservableFromCallable 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromCallable 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public final class ObservableFromCallable<T> extends Observable<T> implements Callable<T> { final Callable<? extends T> callable; public ObservableFromCallable(Callable<? extends T> callable ) { this.callable = callable; } @Override public void subscribeActual(Observer<? super T> observer ) { DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer); observer.onSubscribe(d ) ; if (d.isDisposed() ) { return; } T value; try { value = ObjectHelper . requireNonNull(callable .call () , "Callable returned null" ); } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; if (!d.isDisposed() ) { observer.onError(e ) ; } else { RxJavaPlugins . onError(e ) ; } return; } d.complete(value); } @Override public T call() throws Exception { return ObjectHelper . requireNonNull(callable .call () , "The callable returned a null value" ); } }
通过上面代码,我们看到,创建了一个 DeferredScalarDisposable 对象,最后执行了d.complete(value)。 接下来我们看 DeferredScalarDisposable 类的 complete 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public final void complete(T value) { int state = get(); if ((state & (FUSED_READY | FUSED_CONSUMED | TERMINATED | DISPOSED)) != 0 ) { return; } Observer<? super T> a = downstream; if (state == FUSED_EMPTY) { this.value = value; lazySet(FUSED_READY); a.on Next(null); } else { lazySet(TERMINATED); // 执行回调 on Next(),并把new的 Callable 中 call 方法的返回值作为参数 a.on Next(value); } //假如没有执行dispose(),并执行完 on Next 方法后,接着执行on Complete if (get() != DISPOSED) { a.on Complete(); } }
根据以上的分析,根据上面的分析,我们得出如下规则:
1、fromCallable 里的返回值就是 onNext 接收的参数。
2、通过 fromCallable() 方式 直接触发 onNext(),并执行 onComplete()。
fromFuture() 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { @Override public String call () throws Exception { Thread.sleep(5000 ); return "返回值" ; } }); Observable .fromFuture(futureTask) .subscribe(new Observer<String>() { @Override public void onSubscribe (Disposable d) { System.out.println("onSubscribe" ); futureTask.run(); } @Override public void onNext (String s) { System.out.println("接收----->" + s); } @Override public void onError (Throwable e) { } @Override public void onComplete () { System.out.println("onComplete" ); } });
1 2 3 System .out : onSubscribeSystem .out : 接收System .out : onComplete
分析 我们先简单的看下Future 和 FutureTast 。Future类位于java.util.concurrent包下,它也是一个接口。
Future就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
FutureTask实现了RunableFuture接口,同时RunableFuture又继承Future,Runable接口,也就是说FutureTask具备Runbale的run方法执行异步任务,也可以像Future一样能够控制任务的执行。事实上,FutureTask是Future接口的一个唯一实现类。
详细用法查看Java并发编程:Callable、Future和FutureTask
接下来我们看下 fromFuture 的源码:
同样的装配了一个 ObservableFromFuture 返回,作为我们的被观察者。我们知道正真订阅是实现 subscribeActual 方法的 Observable 的子类里面,所以我们直接 ObservableFromFuture 类的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void subscribeActual(Observer<? super T> observer ) { DeferredScalarDisposable<T> d = new DeferredScalarDisposable<T>(observer); observer.onSubscribe(d ) ; if (!d.isDisposed() ) { T v; try { v = ObjectHelper . requireNonNull(unit != null ? future .get (timeout , unit ) : future.get() , "Future returned null" ); } catch (Throwable ex) { Exceptions . throwIfFatal(ex ) ; if (!d.isDisposed() ) { observer.onError(ex ) ; } return; } d.complete(v); } }
这个方法也很好理解,DeferredScalarDisposable 类和我们上面说的 fromCallable 的一样的,这里注意这一行代码:v = ObjectHelper.requireNonNull(unit != null ? future.get(timeout, unit) : future.get(), "Future returned null");
假如我们不执行 futureTask.run(); 就会一直阻塞。