摘要:現(xiàn)在網(wǎng)上已經(jīng)有大量的源碼分析文章,各種技術(shù)的都有。你完全可以寫(xiě)成下面的鏈?zhǔn)斤L(fēng)格方法會(huì)最先被執(zhí)行同樣,為了便于理解,我會(huì)借用流里面經(jīng)常用到的水流進(jìn)行類(lèi)比。該子類(lèi)的命名是有規(guī)律可言的。
現(xiàn)在網(wǎng)上已經(jīng)有大量的源碼分析文章,各種技術(shù)的都有。但我覺(jué)得很多文章對(duì)初學(xué)者并不友好,讓人讀起來(lái)云里霧里的,比源碼還源碼。究其原因,是根本沒(méi)有從學(xué)習(xí)者的角度去分析。在自己完成了源碼閱讀之后,卻忘記了自己是如何一步步提出問(wèn)題,進(jìn)而走到這里的。
所以,我想在本篇及以后的文章中,花更多的精力去進(jìn)行源碼的分析,爭(zhēng)取用淺顯易懂的語(yǔ)言,用適合的邏輯去組織內(nèi)容。這樣不至于陷入源碼里,導(dǎo)致文章難懂。盡量讓更多的人愿意去讀源碼。
閱讀本文,你需要對(duì) RxJava2 的一些基本使用有所了解,不過(guò)不用太深。這里推薦下Season_zlc的給初學(xué)者的RxJava2.0教程(一)
,比較淺顯易懂。
提到 RxJava,你第一個(gè)想到的詞是什么?
“異步”。
RxJava 在 GitHub 上的官網(wǎng)主頁(yè)也說(shuō)了,“RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.”(RxJava是一個(gè)使用可觀測(cè)序列來(lái)組建異步、基于事件的程序的庫(kù),它是 Reactive Extensions 在Java虛擬機(jī)上的一個(gè)實(shí)現(xiàn))。它的優(yōu)點(diǎn)嘛,用扔物線(xiàn)凱哥的話(huà)講,就是“簡(jiǎn)潔”,并且“隨著程序邏輯變得越來(lái)越復(fù)雜,它依然能夠保持簡(jiǎn)潔”。
這里要注意一點(diǎn),雖然對(duì)大多數(shù)人來(lái)講,更多的是使用 RxJava 來(lái)配合 Retrofit、OkHttp 進(jìn)行網(wǎng)絡(luò)請(qǐng)求框架的封裝及數(shù)據(jù)的異步處理,但是,RxJava和網(wǎng)絡(luò)請(qǐng)求本質(zhì)上沒(méi)有半毛錢(qián)的關(guān)系。它的本質(zhì),官網(wǎng)已經(jīng)說(shuō)的很明白了,就是“異步”。
RxJava 基于觀察者模式實(shí)現(xiàn),基于事件流進(jìn)行鏈?zhǔn)秸{(diào)用。
首先,我們需要添加必要的依賴(lài),這里以最新的2.2.8版本為例:
????implementation?"io.reactivex.rxjava2:rxjava:2.2.8"
當(dāng)然,對(duì)于 Android 項(xiàng)目來(lái)講,我們一般還需要添加一個(gè)補(bǔ)充庫(kù):
????implementation?"io.reactivex.rxjava2:rxandroid:2.1.0"
這個(gè)庫(kù)其實(shí)就是提供了 Android 相關(guān)的主線(xiàn)程的支持。
然后寫(xiě)個(gè)簡(jiǎn)單的代碼,就可以開(kāi)始我們的源碼分析啦。
????????//?上游?observable
????????Observable?observable?=?Observable.create(new?ObservableOnSubscribe()?{
????????????@Override
????????????public?void?subscribe(ObservableEmitter?emitter) ?throws?Exception?{
????????????????Log.d(TAG,?"subscribe:?");
????????????????emitter.onNext(1);
????????????????emitter.onNext(2);
????????????????emitter.onComplete();
????????????}
????????});
????????//?下游?observer
????????Observer?observer?=?new?Observer()?{
????????????@Override
????????????public?void?onSubscribe(Disposable?d)?{
????????????????//?onSubscribe?方法會(huì)最先被執(zhí)行
????????????????Log.d(TAG,?"onSubscribe:?");
????????????}
????????????@Override
????????????public?void?onNext(Integer?integer)?{
????????????????Log.d(TAG,?"onNext:?");
????????????}
????????????@Override
????????????public?void?onError(Throwable?e)?{
????????????????Log.d(TAG,?"onError:?");
????????????}
????????????@Override
????????????public?void?onComplete()?{
????????????????Log.d(TAG,?"onComplete:?");
????????????}
????????};
????????//?將上游和下游進(jìn)行關(guān)聯(lián)
????????observable.subscribe(observer);
為便于理解,我故意將可以鏈?zhǔn)秸{(diào)用的代碼,拆成了三部分。你完全可以寫(xiě)成下面的鏈?zhǔn)斤L(fēng)格:
?Observable.create(new?ObservableOnSubscribe()?{
????????????@Override
????????????public?void?subscribe(ObservableEmitter?emitter) ?throws?Exception?{
????????????????Log.d(TAG,?"subscribe:?");
????????????????emitter.onNext(1);
????????????????emitter.onNext(2);
????????????????emitter.onComplete();
????????????}
????????}).subscribe(new?Observer()?{
????????????@Override
????????????public?void?onSubscribe(Disposable?d)?{
????????????????//?onSubscribe?方法會(huì)最先被執(zhí)行
????????????????Log.d(TAG,?"onSubscribe:?");
????????????}
????????????@Override
????????????public?void?onNext(Integer?integer)?{
????????????????Log.d(TAG,?"onNext:?");
????????????}
????????????@Override
????????????public?void?onError(Throwable?e)?{
????????????????Log.d(TAG,?"onError:?");
????????????}
????????????@Override
????????????public?void?onComplete()?{
????????????????Log.d(TAG,?"onComplete:?");
????????????}
????????});
同樣,為了便于理解,我會(huì)借用i/o流里面經(jīng)常用到的
水流進(jìn)行類(lèi)比。將被觀察者
observable 稱(chēng)為
上游(upstream),將觀察者
observer 稱(chēng)為
下游(downstream)。讀源碼其實(shí)也能看出,作者本身也正是這么類(lèi)比的。
通過(guò)將整個(gè)過(guò)程拆分成三個(gè)步驟,能更清晰的理清邏輯。我們需要做的,本質(zhì)上就是創(chuàng)建一個(gè)上游和一個(gè)下游,最終通過(guò)上游對(duì)象的subscribe方法將二者關(guān)聯(lián)起來(lái):
明白了這三點(diǎn),以后我們就不會(huì)被各種實(shí)現(xiàn)類(lèi)搞的眼花繚亂。
這三個(gè)步驟,里面的核心是第三部,也就是訂閱過(guò)程,畢竟,這屬于一個(gè)動(dòng)作,而我們進(jìn)行源碼分析的時(shí)候,往往就是從動(dòng)作開(kāi)始的。這時(shí)候,我們Ctrl/Command + 鼠標(biāo)左鍵,進(jìn)入該方法看看,里面做了下什么。
????public?final?void?subscribe(Observer<");super?T>?observer)?{
????????ObjectHelper.requireNonNull(observer,?"observer?is?null");
????????try?{
????????????//?RxJavaPlugins是個(gè)鉤子函數(shù),用來(lái)在代碼的執(zhí)行前后插入進(jìn)行一些操作
????????????observer?=?RxJavaPlugins.onSubscribe(this,?observer);
????????????ObjectHelper.requireNonNull(observer,?"The?RxJavaPlugins.onSubscribe?hook?returned?a?null?Observer.?Please?change?the?handler?provided?to?RxJavaPlugins.setOnObservableSubscribe?for?invalid?null?returns.?Further?reading:?https://github.com/ReactiveX/RxJava/wiki/Plugins");
????????????//?關(guān)鍵點(diǎn)是這行代碼
????????????subscribeActual(observer);
????????}?catch?(NullPointerException?e)?{?//?NOPMD
????????????throw?e;
????????}?catch?(Throwable?e)?{
????????????Exceptions.throwIfFatal(e);
????????????//?can"t?call?onError?because?no?way?to?know?if?a?Disposable?has?been?set?or?not
????????????//?can"t?call?onSubscribe?because?the?call?might?have?set?a?Subscription?already
????????????RxJavaPlugins.onError(e);
????????????NullPointerException?npe?=?new?NullPointerException("Actually?not,?but?can"t?throw?other?exceptions?due?to?RS");
????????????npe.initCause(e);
????????????throw?npe;
????????}
????}
這里將this(上游Observable類(lèi)型)的和下游observer作為參數(shù)傳給了 RxJavaPlugins 的 onSubscribe(…)方法,并返回一個(gè)Observer,同時(shí),將原來(lái)的observer指向這個(gè)返回值,那么我們看看這個(gè)函數(shù)中到底進(jìn)行了什么操作:
????//??RxJavaPlugins.java
????public?static??Observer<");super?T>?onSubscribe(@NonNull?Observable?source,?@NonNull?Observer<");super?T>?observer)?{
????????BiFunction<");super?Observable,?");super?Observer,?");????????if?(f?!=?null)?{
????????????return?apply(f,?source,?observer);
????????}
????????return?observer;
????}
這里判斷onObservableSubscribe是否為 null,不為 null 則調(diào)用其 apply(…) 方法。若為 null ,則直接返回原來(lái)的observer。而該變量需要通過(guò)RxJavaPlugin的
setOnSingleSubscribe(...)方法來(lái)指定的,顯然,我們并沒(méi)有指定,所以忽略不管(后面遇到類(lèi)似問(wèn)題,基本也都可以忽略)。
回到之前的訂閱流程,就可以簡(jiǎn)化為下面這樣:
????public?final?void?subscribe(Observer<");super?T>?observer)?{
????????ObjectHelper.requireNonNull(observer,?"observer?is?null");
????????try?{
????????????...
????????????//?調(diào)用到具體實(shí)現(xiàn)子類(lèi)的?subscribeActual(observer)?方法
????????????subscribeActual(observer);
????????}?catch?(
????????????...
????????}
????}
從上面代碼可以看出,訂閱過(guò)程,即調(diào)用Observable的subscribe(...)的過(guò)程,其實(shí)就是直接調(diào)用了其實(shí)現(xiàn)類(lèi)的
subscribeActual(observer)方法(該方法在 Observable 中是個(gè)抽象方法)。以后我們遇到這個(gè)方法,就直接去 Observable 的實(shí)現(xiàn)類(lèi)中找即可,就不會(huì)亂了。
一些熟悉RxJava的朋友可能會(huì)說(shuō),有時(shí)候我們通過(guò)subscribe(...)訂閱的并不是Observer對(duì)象,而是consumer對(duì)象,有各種重載。如下:
當(dāng)你傳入的是Consumer的時(shí)候,不管你傳遞了幾個(gè)參數(shù),最終都會(huì)代用到以下方法,那些你沒(méi)傳遞的 onError或者 onComplete 回調(diào)等等,會(huì)自動(dòng)使用默認(rèn)創(chuàng)建的值。
????public?final?Disposable?subscribe(Consumer<");super?T>?onNext,?Consumer<");super?Throwable>?onError,
????????????Action?onComplete,?Consumer<");super?Disposable>?onSubscribe)?{
????????ObjectHelper.requireNonNull(onNext,?"onNext?is?null");
????????ObjectHelper.requireNonNull(onError,?"onError?is?null");
????????ObjectHelper.requireNonNull(onComplete,?"onComplete?is?null");
????????ObjectHelper.requireNonNull(onSubscribe,?"onSubscribe?is?null");
????????//?最終都會(huì)封裝成一個(gè)?LambdaObserver,并作為參數(shù)傳入subscribe(...)方法中
????????LambdaObserver?ls?=?new?LambdaObserver(onNext,?onError,?onComplete,?onSubscribe);
????????subscribe(ls);
????????return?ls;
????}
可以看出,這里最終還是將這些 Consumer 對(duì)象包裝在了一個(gè) LambdaObserver 類(lèi)型的變量中,然后又調(diào)用了
subscribe(...)方法,將其作為變量傳入,之后的分析,就跟上面是一樣的了。
訂閱方法講完了,我們也知道最終調(diào)用到了 Observable 的實(shí)現(xiàn)類(lèi)的subscribeActual(...)方法。那接下來(lái)肯定就是要弄懂在這個(gè)方中做了什么事。我們例子中是使用
Observable.create(...)方法創(chuàng)建的 observable:
????????//?上游?observable
????????Observable?observable?=?Observable.create(new?ObservableOnSubscribe()?{
????????????@Override
????????????public?void?subscribe(ObservableEmitter?emitter) ?throws?Exception?{
????????????????Log.d(TAG,?"subscribe:?");
????????????????emitter.onNext(1);
????????????????emitter.onNext(2);
????????????????emitter.onComplete();
????????????}
????????});
其中,Observable.create(...)方法的實(shí)現(xiàn)是這樣的:
????public?static?<T>?Observable<T>?create(ObservableOnSubscribe<T>?source)?{
????????ObjectHelper.requireNonNull(source,?"source?is?null");
????????return?RxJavaPlugins.onAssembly(new?ObservableCreate<T>(source));
????}
我們傳進(jìn)去了一個(gè)實(shí)現(xiàn)了ObservableOnSubscribe接口的匿名內(nèi)部類(lèi),該接口類(lèi)也很簡(jiǎn)單,就定義了一個(gè)
void subscribe(@NonNull ObservableEmitter
然后我們將傳進(jìn)來(lái)的source(剛剛提到的匿名內(nèi)部類(lèi)ObservableOnSubscribe)封裝進(jìn)一個(gè)ObservableCreate對(duì)象中,又傳進(jìn)了
RxJavaPlugins.onAssembly(...)中,這個(gè)RxJavaPlugins類(lèi)剛才我們說(shuō)過(guò),其實(shí)就是一個(gè)hook類(lèi),暫時(shí)直接忽略,一般就是直接把傳進(jìn)來(lái)的參數(shù)返回了(不放心的話(huà)可以自己點(diǎn)進(jìn)去,以后遇到該方法不再贅述)。
也就是說(shuō)Observable.create(...)方法最終創(chuàng)建了一個(gè)
ObservableCreate對(duì)象。注意,該對(duì)象是
Observable抽象類(lèi)的具體實(shí)現(xiàn)類(lèi)。
特別注意!
特別注意!
特別注意!
重要事情說(shuō)三遍。我們這里通過(guò)create(...)方法創(chuàng)建的
Observable的具體實(shí)現(xiàn)子類(lèi)是
ObservableCreate。該子類(lèi)的命名是有規(guī)律可言的。我在分析源碼的時(shí)候有時(shí)候就想,這么多看起來(lái)名字都一樣的類(lèi),RxJava的開(kāi)發(fā)者本人不會(huì)懵逼嗎?作為一個(gè)用戶(hù)量這么大的庫(kù),肯定各種都有講究,肯定有貴了。嗯。規(guī)律就是生成的子類(lèi)的命名方法為
“Observable+創(chuàng)建該類(lèi)的方法名”,即:在創(chuàng)建該類(lèi)的方法名稱(chēng)前面加上個(gè)Observable,以此來(lái)作為新的類(lèi)
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/7367.html
摘要:讓你收獲滿(mǎn)滿(mǎn)碼個(gè)蛋從年月日推送第篇文章一年過(guò)去了已累積推文近篇文章,本文為年度精選,共計(jì)篇,按照類(lèi)別整理便于讀者主題閱讀。本篇文章是今年的最后一篇技術(shù)文章,為了讓大家在家也能好好學(xué)習(xí),特此花了幾個(gè)小時(shí)整理了這些文章。 showImg(https://segmentfault.com/img/remote/1460000013241596); 讓你收獲滿(mǎn)滿(mǎn)! 碼個(gè)蛋從2017年02月20...
摘要:這個(gè)上游是個(gè)相對(duì)概念,上游之上,還有上游,所以就不斷回溯,最終調(diào)用到最開(kāi)始指定的那個(gè)線(xiàn)程。雖然表面上看,確實(shí)是第一個(gè)指定的有效,但是千萬(wàn)別被欺騙了。文章較長(zhǎng),可以耐心點(diǎn),反復(fù)看看。 這個(gè)上游是個(gè)相對(duì)概念,上游之上,還有上游,所以就不斷回溯,最終調(diào)用到最開(kāi)始指定的那個(gè)線(xiàn)程。 雖然表面上看,確實(shí)是第一個(gè)指定的有效,但是千萬(wàn)別被欺騙了。 好...
摘要:這個(gè)上游是個(gè)相對(duì)概念,上游之上,還有上游,所以就不斷回溯,最終調(diào)用到最開(kāi)始指定的那個(gè)線(xiàn)程。雖然表面上看,確實(shí)是第一個(gè)指定的有效,但是千萬(wàn)別被欺騙了。文章較長(zhǎng),可以耐心點(diǎn),反復(fù)看看。 這個(gè)上游是個(gè)相對(duì)概念,上游之上,還有上游,所以就不斷回溯,最終調(diào)用到最開(kāi)始指定的那個(gè)線(xiàn)程。 雖然表面上看,確實(shí)是第一個(gè)指定的有效,但是千萬(wàn)別被欺騙了。 好...
閱讀 1565·2023-04-26 01:36
閱讀 2730·2021-10-08 10:05
閱讀 2784·2021-08-05 09:57
閱讀 1544·2019-08-30 15:52
閱讀 1200·2019-08-30 14:12
閱讀 1320·2019-08-30 11:17
閱讀 3110·2019-08-29 13:07
閱讀 2429·2019-08-29 12:35