Observable示例和源码解析
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Observable<String > observable = Observable.create(new ObservableOnSubscribe<String >() { @Override public void subscribe(ObservableEmitter<String > emitter) throws Exception { emitter.onNext("文章1" ); emitter.onNext("文章2" ); emitter.onNext("文章3" ); emitter.onComplete(); } }); Observer<String > observer = new Observer<String >() { @Override public void onSubscribe(Disposable d) { Log .d(TAG , "onSubscribe" ); } @Override public void onNext(String str) { Log .d(TAG , "onNext : " +str); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log .d(TAG , "onComplete" ); } }; observable.subscribe(observer);
其实RxJava的核心思想就是观察者模式,只要理解这个,其实RxJava也不难。说白了就是要有观察者和被观察着,然后建立观察者和被观察者之间的关系。
被观察者Observable(公众号)何时创建?
观察者Observer(用户)何时创建?
被观察者与观察者如何subscribe(用户关注了公众号)订阅?1. 被观察者Observable类
被观察者Observable为抽象类 实现 ObservableSource接口
1 2 3 4 5 6 7 8 public abstract class Observable<T > implements ObservableSource<T > { ... public static <T > Observable<T > create(ObservableOnSubscribe<T > source ) { ObjectHelper.requireNonNull(source , "source is null" ); return RxJavaPlugins.onAssembly(new ObservableCreate<T >(source )); } ... }
很简单,create方法里面就两行代码,先去判断source的是否为空,然后再去调用RxJavaPlugins.onAssembly(new ObservableCreate(source)),先 new 了 ObservableCreate 类,该类继承了 Observable,然后通过 RxJavaPlugins.onAssembly 方法返回 Observable 对象。
这样 Observable 就创建完成了,其实是创建了 Observable 的子类 ObservableCreate 对象,也就是真实的被观察着对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe <T > source; public ObservableCreate (ObservableOnSubscribe <T > source) { this .source = source; } @Override protected void subscribeActual(Observer <? super T > observer) { CreateEmitter <T > parent = new CreateEmitter <T >(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions .throwIfFatal(ex); parent.onError(ex); } } }
2. 观察者 Observer 1 2 3 4 5 6 7 8 9 10 public interface Observer <T > { void onSubscribe (@NonNull Disposable d) ; void onNext (@NonNull T t) ; void onError (@NonNull Throwable e) ; void onComplete () ; }
这个类十分简单,是标准的函数式接口
3. 被观察者与观察者如何subscribe(用户关注了公众号)订阅 observable.subscribe(observer);
1 2 3 4 5 6 7 8 public final void subscribe(Observer<? super T> observer) { observer = RxJavaPlugins . onSubscribe(this , observer ) ; subscribeActual(observer ) ; }
RxJavaPlugins.onSubscribe 此时直接返回Observer观察者对象,最后执行的是 subscribeActual() 方法,我们点进去看看
protected abstract void subscribeActual(Observer<? super T> observer);
是个抽象方法,我们来看看 Observable 子类 ObservableCreate 里面是不是有 subscribeActual(Observer<? super T> observer),又回到了 ObservableCreate 类。
1 2 3 4 5 6 7 8 9 10 11 12 @Override protected void subscribeActual(Observer<? super T> observer ) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent ) ; try { source.subscribe(parent); } catch (Throwable ex) { Exceptions . throwIfFatal(ex ) ; parent.onError(ex ) ; } }
该方法的参数就是我们创建的观察者 Observer,这里创建了 CreateEmitter 对象,把我们的观察者 Observer 传到 CreateEmitter 的构造方法中。 CreateEmitter 类继承了 Disposable 接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 static final class CreateEmitter <T > extends AtomicReference <Disposable > implements ObservableEmitter <T >, Disposable { final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this .observer = observer; } @Override public void onNext (T t) { if (!isDisposed()) { observer.onNext(t); } } @Override public void onError (Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError (Throwable t) { if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true ; } return false ; } @Override public void onComplete () { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } ...... }
是不是看到了我们在创建观察者 Observer 时的几个回调方法了,
接下来observer.onSubscribe(parent);
这行代码,其实就是我们创建观察者 Observer 时的
public void onSubscribe(Disposable d)
回调
1 2 3 4 5 6 7 Observer<String > observer = new Observer<String >() { @Override public void onSubscribe(Disposable d) { Log .d(TAG , "onSubscribe" ); } ... ...
的参数 Disposable d 就是 CreateEmitter 对象
再看 source.subscribe(parent);
这行代码,这个source就是我们创建被观察者 Observable(其实是它的子类 ObservableCreate)时 new 的 ObservableOnSubscribe,它只有一个 subscribe 方法,执行完这行代码,被观察者与观察者就订阅关系。
那么当我们在执行
1 2 3 4 emitter.onNext("文章1" ) emitter.onNext("文章2" ) emitter.onNext("文章3" ) emitter.onComplete()
这几行代码的时候,也就是被观察者 Observable 通过CreateEmitter发送事件时,观察者 Observer 就会走相应的回调方法, 当此执行完 onComplete() 观察者收到完成回调,整个订阅流程就完成了。
Observable.subscribe()除了接收Observer参数外,还可以接收Consumer参数
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe (ObservableEmitter<String> emitter) throws Exception { emitter.onNext("文章1" ); emitter.onNext("文章2" ); emitter.onNext("文章3" ); emitter.onComplete(); } }).subscribe(new Consumer<String>() { @Override public void accept (String s) throws Exception { Log.d(TAG, " Consumer : onNext : " + s); } }, new Consumer<Throwable>() { @Override public void accept (Throwable throwable) throws Exception { Log.d(TAG, " Consumer : onError" ); } }, new Action() { @Override public void run () throws Exception { Log.d(TAG, " Action : onComplete" ); } }, new Consumer<Disposable>() { @Override public void accept (Disposable disposable) throws Exception { Log.d(TAG, " Consumer : onSubscribe" ); } })
这里我们看到传入的是Consumer,我们来看看这个类:
1 2 3 public interface Consumer<T> { void accept(T t) throws Exception; }
很简单,就是个普通的接口,里面只有一个accept方法
当使用Consumer作为subscribe()的参数时,最多可以接收4个回调参数,而且执行结果和subscribe(observer)的一样。 接下来我们看下Observable的subscribe()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public final Disposable subscribe() { return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } public final Disposable subscribe(Consumer<? super T> on Next) { return subscribe(on Next, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } public final Disposable subscribe(Consumer<? super T> on Next, Consumer<? super Throwable> on Error) { return subscribe(on Next, on Error, Functions.EMPTY_ACTION, Functions.emptyConsumer()); } public final Disposable subscribe(Consumer<? super T> on Next, Consumer<? super Throwable> on Error, Action on Complete) { return subscribe(on Next, on Error, on Complete, Functions.emptyConsumer()); } public final Disposable subscribe(Consumer<? super T> on Next, Consumer<? super Throwable> on Error, Action on Complete, Consumer<? super Disposable> on Subscribe) { ObjectHelper.requireNon Null(on Next, "onNext is null" ); ObjectHelper.requireNon Null(on Error, "onError is null" ); ObjectHelper.requireNon Null(on Complete, "onComplete is null" ); ObjectHelper.requireNon Null(on Subscribe, "onSubscribe is null" ); LambdaObserver<T> ls = new LambdaObserver<T> (on Next, on Error, on Complete, on Subscribe); subscribe(ls); return ls; }
有五个重载方法,参数个数0到4个,参数名和接口Observer的方法名一样,从而我们可以猜测处理的事情应该和Observer每个方法的一样。我们看到 当参数少于4个时,就传入空的回调(不是null),那么我们直接看4个参数的subscribe()。
先分别判断参数是否为空,然后通过这个4个参数构造一个 LambdaObserver 对象,最后调用subscribe(ls),这个subscribe(ls)又是什么呢?我们点进去看看:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins . onSubscribe(this , observer ) ; subscribeActual(observer ) ; } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions . throwIfFatal(e ) ; RxJavaPlugins . onError(e ) ; npe.initCause(e ) ; throw npe; } }
嗯?似曾相识,对,就是我们认识的那个subscribe,参数就是上面创建的 LambdaObserver 对象,LambdaObserver继承了接口Observer,然后在传入 subscribeActual(observer) ,接下来的流程就和上面一样了。
我们在看 LambdaObserver 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public final class LambdaObserver <T > extends AtomicReference <Disposable > implements Observer <T >, Disposable , LambdaConsumerIntrospection { private static final long serialVersionUID = -7251123623727029452L ; final Consumer<? super T> onNext; final Consumer<? super Throwable> onError; final Action onComplete; final Consumer<? super Disposable> onSubscribe; public LambdaObserver (Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { super (); this .onNext = onNext; this .onError = onError; this .onComplete = onComplete; this .onSubscribe = onSubscribe; } @Override public void onSubscribe (Disposable d) { if (DisposableHelper.setOnce(this , d)) { try { onSubscribe.accept(this ); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); d.dispose(); onError(ex); } } } @Override public void onNext (T t) { if (!isDisposed()) { try { onNext.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); get().dispose(); onError(e); } } } @Override public void onError (Throwable t) { if (!isDisposed()) { lazySet(DisposableHelper.DISPOSED); try { onError.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(new CompositeException(t, e)); } } else { RxJavaPlugins.onError(t); } } @Override public void onComplete () { if (!isDisposed()) { lazySet(DisposableHelper.DISPOSED); try { onComplete.run(); } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); } } } }
构造方法里面就是我们传进去的四个参数,而且有四个方法我们似乎也很熟悉,对,他就是接口 Observer 的实现方法。这四个方法中分别执行了四个回调
onSubscribe.accept(Disposable);
onNext.accept(T);
onError.accept(Throwable);
onComplete.run();
也就是subscribe时传入的回调。