just示例和源码解析
just()示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Observable . just("文章1" , "文章2" ) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d ) { Log . d(TAG, " onSubscribe : " + d.isDisposed() ); } @Override public void onNext(String value ) { Log . d(TAG, " onNext : " + value); } @Override public void onError(Throwable e ) { Log . d(TAG, " onError : " + e.getMessage() ); } @Override public void onComplete() { Log . d(TAG, " onComplete" ); } })
Observable 的just()有10个重载方法,参数1~10个
1 2 3 public static <T > Observable<T > just(T item) ...... public static <T > Observable<T > just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
同样的我们从以下几个方面分析源码:
被观察者Observable是何时创建
观察者Observer何时创建(很简单就一个接口,不多介绍)
被观察者与观察者如何建立subscribe订阅关系的
一个参数的 just() 1. 被观察者Observable是何时创建 1 2 3 4 5 public static <T> Observable<T> just(T item) { ObjectHelper . requireNonNull(item , "The item is null" ) ; return RxJavaPlugins . onAssembly(new ObservableJust<T>(item ) ); }
创建了 ObservableJust 对象,调用 RxJavaPlugins.onAssembly 返回了被观察者 Observable.
1 2 3 4 5 6 7 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source ) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null ) { return apply(f, source ); } return source ; }
我们看下 ObservableJust 类,同样的也是继承 Observable。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public final class ObservableJust <T > extends Observable <T > implements ScalarCallable <T > { private final T value; public ObservableJust (final T value) { this .value = value; } @Override protected void subscribeActual (Observer<? super T> observer) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd); sd.run(); } @Override public T call () { return value; } }
这时候被观察者已经创建完成了,它就是 Observable 的子类 ObservableJust,我们在用 Observable.just() 的时候其实被观察者是 ObservableJust。
2. Observable 和 Observer 如何建立 subscribe 订阅关系的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins . onSubscribe(this , observer ) ; subscribeActual(observer ) ; } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; RxJavaPlugins . onError(e ) ; npe.initCause(e ) ; throw npe; } }
这里的 Subscribe() 方法和 Observable.Create() 调的 Subscribe() 一样,我们知道 Subscribeactual() 这个方法是抽象方法,那它具体实现是不是和 Observable.Create() 一样也是 Observablecreate 类里呢?不是,不是,不是,重要的话说三遍哦,它的具体实现在我们看到 Observablejust 中。
那么我们回到 ObservableJust 中看下subscribeActual(Observer<? super T> observer)
1 2 3 4 5 6 7 8 protected void subscribeActual(Observer<? super T> observer ) { ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value); observer.onSubscribe(sd ) ; sd.run() ; }
这里面创建了一个线程(Runnable),他就是 ScalarDisposable ,ScalarDisposable实现Runnable,把我们创建的观察者Observer 和参数value (Observable.just(“文章1”) 这里的‘文章1’) 作为构造方法的参数传进去了,
同时他也是Disposable的子类,所以 observer.onSubscribe(sd);
这行就很好理解了,就是设置了观察者的onSubscribe方法的回调,所以观察者onSubscribe()是在订阅时被调用,也就是在事件执行之前调用。
1 2 3 4 5 6 7 Observer<String > observer = new Observer<String >() { @Override public void onSubscribe(Disposable d) { Log .d(TAG , "onSubscribe" ); } ... ...
接下来看下 ScalarDisposable 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static final class ScalarDisposable <T > extends AtomicInteger implements QueueDisposable <T >, Runnable { private static final long serialVersionUID = 3880992722410194083L ; final Observer<? super T> observer; final T value; static final int START = 0 ; static final int FUSED = 1 ; static final int ON_NEXT = 2 ; static final int ON_COMPLETE = 3 ; public ScalarDisposable (Observer<? super T> observer, T value) { this .observer = observer; this .value = value; } @Override public void run () { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } }
我们在subscribeActual()方法中看到最后执行了 sd.run(); 所以我们只需看 public void run() 这个方法, run()方法首先执行了 observer.onNext(value), 也就是说我们在创建Observable时传的参数此时发送给observer, 然后在执行observer.onComplete()。
这样,Observable.just() 一个参数的方法就结束了
多个参数的 just() 同样的我们从以下几个方面分析源码:
1. 被观察者Observable是何时创建 我们下看源码:
1 2 3 4 5 6 public static <T> Observable<T> just(T item1, T item2) { ObjectHelper . requireNonNull(item1 , "The first item is null" ) ; ObjectHelper . requireNonNull(item2 , "The second item is null" ) ; return fromArray(item1 , item2 ) ; }
很简单,在往下看 fromArray()
1 2 3 4 5 6 7 8 9 10 public static <T> Observable<T> fromArray(T... items ) { ObjectHelper . requireNonNull(items , "items is null" ) ; if (items.length == 0 ) { return empty() ; } else if (items.length == 1 ) { return just(items[0 ] ); } return RxJavaPlugins . onAssembly(new ObservableFromArray<T>(items ) ); }
我们看到fromArray()参数是 可变长度参数,也就是说参数可以为1个,当然了1个时直接调用Observable.just(“文章1”) 一个参数的just()。最后返回一个可变长度参数 items 构造的 ObservableFromArray 对象,他也继承了Observable,也就是说我们创建的被观察着就是 ObservableFromArray 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public final class ObservableFromArray <T > extends Observable <T > { final T[] array ; public ObservableFromArray(T[] array ) { this .array = array ; } @Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array ); observer.onSubscribe(d); if (d.fusionMode) { return ; } d.run(); } ......
2. 接下来我们看下subscribe方法 同样是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins . onSubscribe(this , observer ) ; subscribeActual(observer ) ; } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; RxJavaPlugins . onError(e ) ; npe.initCause(e ) ; throw npe; } }
但是抽象方法subscribeActual()在ObservableFromArray中执行 在看ObservableFromArray的subscribeActual()这个方法
1 2 3 4 5 6 7 8 9 10 11 12 @Override public void subscribeActual (Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array); observer.onSubscribe(d); if (d.fusionMode) { return ; } d.run(); }
此时创建了FromArrayDisposable对象,参数是我们创建的观察者observer,和传递的可变长度的参数array,然后执行Observer中的onSubscribe()方法。最后执行了FromArrayDisposable的run()方法(注意他不是线程的run()方法)。
我们在看FromArrayDisposable的run()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static final class FromArrayDisposable <T > extends BasicQueueDisposable <T > { final Observer<? super T> downstream; final T[] array ; FromArrayDisposable(Observer<? super T> actual, T[] array ) { this .downstream = actual; this .array = array ; } void run() { T[] a = array ; int n = a.length; for (int i = 0 ; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null ) { downstream.onError(new NullPointerException("The " + i + "th element is null" )); return ; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } }
变量downstream 就是我们创建的观察者 Observer ,array 就是我们传递的可变长度的那一串数组 run()方法中遍历array,然后执行回调 downstream.onNext(value) ,最后在执行回调 downstream.onComplete()。
根据上面的分析,我们得出如下规则:
1、通过 just() 方式 直接触发 onNext()
2、just 传进去什么,在onNext() 接收什么,如果我们传入 List,同样的在 onNext() 接收的也是 List,而不是 List 的 Item
3、onNext() 中接收数据的顺序是根据 just 传入的顺序确认的,使用 just 不允许传递 null,否则会出现异常