成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RxJava系列二(基本概念及使用介紹)

Profeel / 1857人閱讀

摘要:作用默認(rèn)的,直接在當(dāng)前線程運(yùn)行總是開啟一個(gè)新線程用于密集型任務(wù),如異步阻塞操作,這個(gè)調(diào)度器的線程池會根據(jù)需要增長對于普通的計(jì)算任務(wù),請使用默認(rèn)是一個(gè),很像一個(gè)有線程緩存的新線程調(diào)度器計(jì)算所使用的。這個(gè)使用的固定的線程池,大小為核數(shù)。

轉(zhuǎn)載請注明出處:https://zhuanlan.zhihu.com/p/20687307

RxJava系列1(簡介)

RxJava系列2(基本概念及使用介紹)

RxJava系列3(轉(zhuǎn)換操作符)

RxJava系列4(過濾操作符)

RxJava系列5(組合操作符)

RxJava系列6(從微觀角度解讀RxJava源碼)

RxJava系列7(最佳實(shí)踐)

前言

上一篇的示例代碼中大家一定發(fā)現(xiàn)了Observable這個(gè)類。從純Java的觀點(diǎn)看,Observable類源自于經(jīng)典的觀察者模式。RxJava的異步實(shí)現(xiàn)正是基于觀察者模式來實(shí)現(xiàn)的,而且是一種擴(kuò)展的觀察者模式。

觀察者模式

觀察者模式基于Subject這個(gè)概念,Subject是一種特殊對象,又叫做主題或者被觀察者。當(dāng)它改變時(shí)那些由它保存的一系列對象將會得到通知,而這一系列對象被稱作Observer(觀察者)。它們會對外暴漏了一個(gè)通知方法(比方說update之類的),當(dāng)Subject狀態(tài)發(fā)生變化時(shí)會調(diào)用的這個(gè)方法。

觀察者模式很適合下面這些場景中的任何一個(gè):

當(dāng)你的架構(gòu)有兩個(gè)實(shí)體類,一個(gè)依賴另一個(gè),你想讓它們互不影響或者是獨(dú)立復(fù)用它們時(shí)。

當(dāng)一個(gè)變化的對象通知那些與它自身變化相關(guān)聯(lián)的未知數(shù)量的對象時(shí)。

當(dāng)一個(gè)變化的對象通知那些無需推斷具體類型的對象時(shí)。

通常一個(gè)觀察者模式的類圖是這樣的:

如果你對觀察者模式不是很了解,那么強(qiáng)烈建議你先去學(xué)習(xí)下。關(guān)于觀察者模式的詳細(xì)介紹可以參考我之前的文章:設(shè)計(jì)模式之觀察者模式

擴(kuò)展的觀察者模式

在RxJava中主要有4個(gè)角色:

Observable

Subject

Observer

Subscriber

Observable和Subject是兩個(gè)“生產(chǎn)”實(shí)體,Observer和Subscriber是兩個(gè)“消費(fèi)”實(shí)體。說直白點(diǎn)Observable對應(yīng)于觀察者模式中的被觀察者,而ObserverSubscriber對應(yīng)于觀察者模式中的觀察者。Subscriber其實(shí)是一個(gè)實(shí)現(xiàn)了Observer的抽象類,后面我們分析源碼的時(shí)候也會介紹到。Subject比較復(fù)雜,以后再分析。

上一篇文章中我們說到RxJava中有個(gè)關(guān)鍵概念:事件。觀察者Observer和被觀察者Observable通過subscribe()方法實(shí)現(xiàn)訂閱關(guān)系。從而Observable 可以在需要的時(shí)候發(fā)出事件來通知Observer。

RxJava如何使用

我自己在學(xué)習(xí)一種新技術(shù)的時(shí)候通常喜歡先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么我們現(xiàn)在就來說說RxJava到底該怎么用。

第一步:創(chuàng)建觀察者Observer

Observer observer = new Observer() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object s) {

    }
 };

這么簡單,一個(gè)觀察者Observer創(chuàng)建了!

大兄弟你等等...,你之前那篇觀察者模式中不是說觀察者只提供一個(gè)update方法的嗎?這特么怎么有三個(gè)?!!

少年勿急,且聽我慢慢道來。在普通的觀察者模式中觀察者一般只會提供一個(gè)update()方法用于被觀察者的狀態(tài)發(fā)生變化時(shí),用于提供給被觀察者調(diào)用。而在RxJava中的觀察者Observer提供了:onNext()、 onCompleted()onError()三個(gè)方法。還記得嗎?開篇我們講過RxJava是基于一種擴(kuò)展的觀察這模式實(shí)現(xiàn),這里多出的onCompleted和onError正是對觀察者模式的擴(kuò)展。ps:onNext就相當(dāng)于普通觀察者模式中的update

RxJava中添加了普通觀察者模式缺失的三個(gè)功能:

RxJava中規(guī)定當(dāng)不再有新的事件發(fā)出時(shí),可以調(diào)用onCompleted()方法作為標(biāo)示;

當(dāng)事件處理出現(xiàn)異常時(shí)框架自動觸發(fā)onError()方法;

同時(shí)Observables支持鏈?zhǔn)秸{(diào)用,從而避免了回調(diào)嵌套的問題。

第二步:創(chuàng)建被觀察者Observable

Observable.create()方法可以創(chuàng)建一個(gè)Observable,使用crate()創(chuàng)建Observable需要一個(gè)OnSubscribe對象,這個(gè)對象繼承Action1。當(dāng)觀察者訂閱我們的Observable時(shí),它作為一個(gè)參數(shù)傳入并執(zhí)行call()函數(shù)。

Observable observable = Observable.create(new 
            Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {

    }
});

除了create(),just()和from()同樣可以創(chuàng)建Observable??纯聪旅鎯蓚€(gè)例子:

just(T...)將傳入的參數(shù)依次發(fā)送

Observable observable = Observable.just("One", "Two", "Three");
//上面這行代碼會依次調(diào)用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

from(T[])/from(Iterable)將傳入的數(shù)組或者Iterable拆分成Java對象依次發(fā)送

String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面這行代碼會依次調(diào)用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

第三步:被觀察者Observable訂閱觀察者Observerps:你沒看錯(cuò),不同于普通的觀察者模式,這里是被觀察者訂閱觀察者

有了觀察者和被觀察者,我們就可以通過subscribe()來實(shí)現(xiàn)二者的訂閱關(guān)系了。

observable.subscribe(observer);

連在一起寫就是這樣:

Observable.create(new Observable.OnSubscribe() {

    @Override
    public void call(Subscriber subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }

}).subscribe(new Observer() {

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

至此一個(gè)完整的RxJava調(diào)用就完成了。

兄臺,你叨逼叨叨逼叨的說了一大堆,可是我沒搞定你特么到底在干啥?。浚。〔患?,我現(xiàn)在就來告訴你們到底發(fā)生了什么。

首先我們使用Observable.create()創(chuàng)建了一個(gè)新的Observable,并為create()方法傳入了一個(gè)OnSubscribe,OnSubscribe中包含一個(gè)call()方法,一旦我們調(diào)用subscribe()訂閱后就會自動觸發(fā)call()方法。call()方法中的參數(shù)Subscriber其實(shí)就是subscribe()方法中的觀察者Observer。我們在call()方法中調(diào)用了5次onNext()和1次onCompleted()方法。一套流程周下來以后輸出結(jié)果就是下面這樣的:

Item is 0
Item is 1
Item is 2
Item is 3
Item is 4
onCompleted

看到這里可能你又要說了,大兄弟你別唬我啊!OnSubscribe的call()方法中的參數(shù)Subscriber怎么就變成了subscribe()方法中的觀察者Observer??。?!這倆兒貨明明看起來就是兩個(gè)不同的類啊。

我們先看看Subscriber這個(gè)類:

public abstract class Subscriber implements Observer, Subscription {
    
    ...
}

從源碼中我們可以看到,Subscriber是Observer的一個(gè)抽象實(shí)現(xiàn)類,所以我首先可以肯定的是Subscriber和Observer類型是一致的。接著往下我們看看subscribe()這個(gè)方法:

public final Subscription subscribe(final Observer observer) {

    //這里的if判斷對于我們要分享的問題沒有關(guān)聯(lián),可以先無視
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber)observer);
    }
    return subscribe(new Subscriber() {

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

    });
}

我們看到subscribe()方法內(nèi)部首先將傳進(jìn)來的Observer做了一層代理,將它轉(zhuǎn)換成了Subscriber。我們再看看這個(gè)方法內(nèi)部的subscribe()方法:

public final Subscription subscribe(Subscriber subscriber) {
    return Observable.subscribe(subscriber, this);
}

進(jìn)一步往下追蹤看看return后面這段代碼到底做了什么。精簡掉其他無關(guān)代碼后的subscribe(subscriber, this)方法是這樣的:

private static  Subscription subscribe(Subscriber subscriber,                     Observable observable) {

    subscriber.onStart();
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        return Subscriptions.unsubscribed();
    }
}

我們重點(diǎn)看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面這個(gè)hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括號內(nèi)的第二個(gè)參數(shù)observable.onSubscribe,然后調(diào)用了它的call方法。而這個(gè)observable.onSubscribe正是create()方法中的Subscriber,這樣整個(gè)流程就理順了??吹竭@里是不是對RxJava的執(zhí)行流程清晰了一點(diǎn)呢?這里也建議大家在學(xué)習(xí)新技術(shù)的時(shí)候多去翻一翻源碼,知其然還要能知其所以然不是嗎。

subscribe()的參數(shù)除了可以是Observer和Subscriber以外還可以是Action1、Action0;這是一種更簡單的回調(diào),只有一個(gè)call(T)方法;由于太簡單這里就不做詳細(xì)介紹了!

異步

上一篇文章中開篇就講到RxJava就是來處理異步任務(wù)的。但是默認(rèn)情況下我們在哪個(gè)線程調(diào)用subscribe()就在哪個(gè)線程生產(chǎn)事件,在哪個(gè)線程生產(chǎn)事件就在哪個(gè)線程消費(fèi)事件。那怎么做到異步呢?RxJava為我們提供Scheduler用來做線程調(diào)度,我們來看看RxJava提供了哪些Scheduler。

Schedulers 作用
Schedulers.immediate() 默認(rèn)的Scheduler,直接在當(dāng)前線程運(yùn)行
Schedulers.newThread() 總是開啟一個(gè)新線程
Schedulers.io() 用于IO密集型任務(wù),如異步阻塞IO操作,這個(gè)調(diào)度器的線程池會根據(jù)需要增長;對于普通的計(jì)算任務(wù),請使用Schedulers.computation();Schedulers.io()默認(rèn)是一個(gè)CachedThreadScheduler,很像一個(gè)有線程緩存的新線程調(diào)度器
Schedulers.computation() 計(jì)算所使用的 Scheduler。這個(gè)計(jì)算指的是 CPU 密集型計(jì)算,即不會被 I/O 等操作限制性能的操作,例如圖形的計(jì)算。這個(gè) Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時(shí)間會浪費(fèi) CPU
Schedulers.from(executor) 使用指定的Executor作為調(diào)度器
Schedulers.trampoline() 當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行
AndroidSchedulers.mainThread() RxAndroid中新增的Scheduler,表示在Android的main線程中運(yùn)行

同時(shí)RxJava還為我們提供了subscribeOn()observeOn()兩個(gè)方法來指定Observable和Observer運(yùn)行的線程。

Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

上面這段代碼大家應(yīng)該有印象吧,沒錯(cuò)正是我們上一篇文章中的例子。subscribeOn(Schedulers.io())指定了獲取小區(qū)列表、處理房源信息等一系列事件都是在IO線程中運(yùn)行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示房源的操作在UI線程執(zhí)行。這就做到了在子線程獲取房源,主線程展示房源。

好了,RxJava系列的入門內(nèi)容我們就聊到這。下一篇我們再繼續(xù)介紹更多的API以及它們內(nèi)部的原理。

如果大家喜歡這一系列的文章,歡迎關(guān)注我的知乎專欄和GitHub。

知乎專欄:https://zhuanlan.zhihu.com/baron

GitHub:https://github.com/BaronZ88

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/66507.html

相關(guān)文章

  • RxJava系列六(從微觀角度解讀RxJava源碼)

    摘要:而這個(gè)就是線程調(diào)度的關(guān)鍵前面的例子中我們通過指定了發(fā)射處理事件以及通知觀察者的一系列操作的執(zhí)行線程,正是通過這個(gè)創(chuàng)建了我們前面提到的。總結(jié)這一章以執(zhí)行流程操作符實(shí)現(xiàn)以及線程調(diào)度三個(gè)方面為切入點(diǎn)剖析了源碼。 轉(zhuǎn)載請注明出處:https://zhuanlan.zhihu.com/p/22338235 RxJava系列1(簡介) RxJava系列2(基本概念及使用介紹) RxJava系列3...

    zero 評論0 收藏0
  • RxJava系列三(轉(zhuǎn)換操作符)

    摘要:從這一章開始,我們開始聊聊中的操作符,后面我將用三章的篇幅來分別介紹轉(zhuǎn)換類操作符過濾類操作符組合類操作符這一章我們主要講講轉(zhuǎn)換類操作符。函數(shù)同樣也是做轉(zhuǎn)換的,但是作用卻不一樣。 轉(zhuǎn)載請注明出處:https://zhuanlan.zhihu.com/p/21926591 RxJava系列1(簡介) RxJava系列2(基本概念及使用介紹) RxJava系列3(轉(zhuǎn)換操作符) RxJava...

    xuxueli 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<