成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

RxJava中的Observable,多Subscribers

Martin91 / 1061人閱讀

摘要:在本文中,我們將介紹如何更改此行為并以適當(dāng)?shù)姆绞教幚矶鄠€訂閱者。第一個訂閱者將獲得此示例中發(fā)出的所有元素,而第二個訂閱者將只接收一些元素。我們可以取消訂閱所有真正的訂閱者,但人工訂閱者仍將處理事件。

多個訂閱者的默認(rèn)行為并不總是可取的。在本文中,我們將介紹如何更改此行為并以適當(dāng)?shù)姆绞教幚矶鄠€訂閱者。

但首先,讓我們來看看多個訂閱者的默認(rèn)行為。

默認(rèn)行為

假設(shè)我們有以下Observable:

private static Observable getObservable() {
????return Observable.create(subscriber -> {
????????subscriber.onNext(gettingValue(1));
????????subscriber.onNext(gettingValue(2));
?
????????subscriber.add(Subscriptions.create(() -> {
????????????LOGGER.info("Clear resources");
????????}));
????});
}

訂閱者訂閱后會立即發(fā)出兩個元素。

在我們的示例中,我們有兩個訂閱者:

LOGGER.info("Subscribing");
?
Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));
Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i));
?
s1.unsubscribe();
s2.unsubscribe();

想象一下,獲取每個元素是一項代價高昂的操作 - 例如,它可能包括密集計算或打開URL連接。

為了簡單起見,我們只返回一個數(shù)字:

private static Integer gettingValue(int i) {
????LOGGER.info("Getting " + i);
????return i;
}

這是輸出:

Subscribing
Getting 1
subscriber#1 is printing 1
Getting 2
subscriber#1 is printing 2
Getting 1
subscriber#2 is printing 1
Getting 2
subscriber#2 is printing 2
Clear resources
Clear resources

我們可以看到,在默認(rèn)情況下,獲取每個元素和清除資源都要執(zhí)行兩次-對于每個訂閱服務(wù)器一次。這不是我們想要的。ConnectableObservable類有助于解決這個問題。

ConnectableObservable

ConnectableObservable類允許與多個訂閱者共享訂閱,而不允許多次執(zhí)行底層操作。

但首先,讓我們創(chuàng)建一個ConnectableObservable。

publish()

publish()方法是從Observable創(chuàng)建一個ConnectableObservable:

ConnectableObservable obs = Observable.create(subscriber -> {
????subscriber.onNext(gettingValue(1));
????subscriber.onNext(gettingValue(2));
????subscriber.add(Subscriptions.create(() -> {
????????LOGGER.info("Clear resources");
????}));
}).publish();

但就目前而言,它什么都不做。它的工作原理是connect()方法。

connect()

在調(diào)用ConnectableObservable的connect()方法之前,即使有一些訂閱者,也不會觸發(fā)Observable的onSubcribe()回調(diào)。

讓我們來證明一下:

LOGGER.info("Subscribing");
obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));
obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));
Thread.sleep(1000);
LOGGER.info("Connecting");
Subscription s = obs.connect();
s.unsubscribe();

我們訂閱,然后等待一秒鐘再連接輸出是:

Subscribing
Connecting
Getting 1
subscriber #1 is printing 1
subscriber #2 is printing 1
Getting 2
subscriber #1 is printing 2
subscriber #2 is printing 2
Clear resources

我們可以看到:

獲取元素只出現(xiàn)一次我們想要的

清算資源也只出現(xiàn)一次

訂閱后獲取元素開始一秒鐘

訂閱不再觸發(fā)元素的發(fā)射。只有connect()才能這樣做

這種延遲可能是有益的 - 有時我們需要為所有訂閱者提供相同的元素序列,即使其中一個訂閱者比另一個訂閱者更早。

可觀察的一致視圖 - 在subscribe()之后的connect()

這個用例無法在我們之前的Observable上進(jìn)行演示,因為它運(yùn)行很冷,而且兩個訂閱者都可以獲得整個元素序列。

相反,想象一下,元素發(fā)射不依賴于訂閱的時刻,例如,鼠標(biāo)點(diǎn)擊發(fā)出的事件?,F(xiàn)在還想象第二個訂閱者在第一個訂閱者之后訂閱第二個訂閱者。

第一個訂閱者將獲得此示例中發(fā)出的所有元素,而第二個訂閱者將只接收一些元素。

另一方面,在正確的位置使用connect()方法可以為兩個訂閱者提供Observable序列上的相同視圖。

讓我們創(chuàng)建一個Observable。它將在JFrame上點(diǎn)擊鼠標(biāo)時發(fā)出元素。

每個元素都是點(diǎn)擊的x坐標(biāo):

private static Observable getObservable() {
????return Observable.create(subscriber -> {
????????frame.addMouseListener(new MouseAdapter() {
????????????@Override
????????????public void mouseClicked(MouseEvent e) {
????????????????subscriber.onNext(e.getX());
????????????}
????????});
????????subscriber.add(Subscriptions.create(() {
????????????LOGGER.info("Clear resources");
????????????for (MouseListener listener : frame.getListeners(MouseListener.class)) {
????????????????frame.removeMouseListener(listener);
????????????}
????????}));
????});
}

現(xiàn)在,如果我們以第二個間隔一個接一個地訂閱兩個訂閱者,運(yùn)行程序并開始單擊,我們將看到第一個訂閱者將獲得更多元素:

public static void defaultBehaviour() throws InterruptedException {
????Observable obs = getObservable();
?
????LOGGER.info("subscribing #1");
????Subscription subscription1 = obs.subscribe((i) -> 
????????LOGGER.info("subscriber#1 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("subscribing #2");
????Subscription subscription2 = obs.subscribe((i) -> 
????????LOGGER.info("subscriber#2 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("unsubscribe#1");
????subscription1.unsubscribe();
????Thread.sleep(1000);
????LOGGER.info("unsubscribe#2");
????subscription2.unsubscribe();
}
subscribing #1
subscriber#1 is printing x-coordinate 280
subscriber#1 is printing x-coordinate 242
subscribing #2
subscriber#1 is printing x-coordinate 343
subscriber#2 is printing x-coordinate 343
unsubscribe#1
clearing resources
unsubscribe#2
clearing resources
connect() After subscribe()

為了使兩個訂閱者獲得相同的序列,我們將Observable轉(zhuǎn)換為ConnectableObservable并在訂閱者之后調(diào)用connect():

public static void subscribeBeforeConnect() throws InterruptedException {
?
????ConnectableObservable obs = getObservable().publish();
?
????LOGGER.info("subscribing #1");
????Subscription subscription1 = obs.subscribe(
??????i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("subscribing #2");
????Subscription subscription2 = obs.subscribe(
??????i ->? LOGGER.info("subscriber#2 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("connecting:");
????Subscription s = obs.connect();
????Thread.sleep(1000);
????LOGGER.info("unsubscribe connected");
????s.unsubscribe();
}

現(xiàn)在他們將得到相同的序列:

subscribing #1
subscribing #2
connecting:
subscriber#1 is printing x-coordinate 317
subscriber#2 is printing x-coordinate 317
subscriber#1 is printing x-coordinate 364
subscriber#2 is printing x-coordinate 364
unsubscribe connected
clearing resources

所以重點(diǎn)是等待所有用戶準(zhǔn)備就緒然后調(diào)用connect()。

在Spring應(yīng)用程序中,我們可以在應(yīng)用程序啟動期間訂閱所有組件,例如在onApplicationEvent()中調(diào)用connect()。

讓我們回到我們的例子;注意,connect()方法之前的所有單擊操作都失敗了。如果我們不想遺漏元素,但相反,我們可以在代碼中更早地放置connect(),并強(qiáng)制可觀察到的元素在沒有任何訂閱服務(wù)器的情況下生成事件。

在沒有任何訂閱者的情況下強(qiáng)制訂閱 - connect()在subscribe()之前

為了證明這一點(diǎn),讓我們更正我們的例子:

public static void connectBeforeSubscribe() throws InterruptedException {
????ConnectableObservable obs = getObservable()
??????.doOnNext(x -> LOGGER.info("saving " + x)).publish();
????LOGGER.info("connecting:");
????Subscription s = obs.connect();
????Thread.sleep(1000);
????LOGGER.info("subscribing #1");
????obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("subscribing #2");
????obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
????Thread.sleep(1000);
????s.unsubscribe();
}

步驟相對簡單:

首先,我們連接

然后我們等待一秒鐘并訂閱第一個訂閱者

最后,我們等待另一秒鐘并訂閱第二個訂閱者

請注意,我們添加了doOnNext()運(yùn)算符。這里我們可以在數(shù)據(jù)庫中存儲元素,例如在我們的代碼中,我們只打印“save...”。

如果我們啟動代碼并開始點(diǎn)擊,我們將看到在connect()調(diào)用之后立即發(fā)出和處理元素:

connecting:
saving 306
saving 248
subscribing #1
saving 377
subscriber#1 is printing x-coordinate 377
saving 295
subscriber#1 is printing x-coordinate 295
saving 206
subscriber#1 is printing x-coordinate 206
subscribing #2
saving 347
subscriber#1 is printing x-coordinate 347
subscriber#2 is printing x-coordinate 347
clearing resources

如果沒有訂閱者,則仍會處理這些元素。

因此,不管是否有人訂閱,connect()方法都會開始發(fā)出和處理元素,就好像有一個使用了元素的空操作的人工訂閱器一樣。

如果有一些真正的訂閱者訂閱,這個人工中介只向他們傳播元素。

若要取消訂閱,我們會執(zhí)行以下步驟:

s.unsubscribe();

然后:

Subscription s = obs.connect();
autoConnect()

此方法意味著在訂閱之前或之后不會調(diào)用connect(),而是在第一個訂閱者訂閱時自動調(diào)用。

使用此方法,我們不能自己調(diào)用connect(),因為返回的對象是通常的Observable,它沒有此方法但使用底層的ConnectableObservable:

public static void autoConnectAndSubscribe() throws InterruptedException {
????Observable obs = getObservable()
????.doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect();
?
????LOGGER.info("autoconnect()");
????Thread.sleep(1000);
????LOGGER.info("subscribing #1");
????Subscription s1 = obs.subscribe((i) -> 
????????LOGGER.info("subscriber#1 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("subscribing #2");
????Subscription s2 = obs.subscribe((i) -> 
????????LOGGER.info("subscriber#2 is printing x-coordinate " + i));
?
????Thread.sleep(1000);
????LOGGER.info("unsubscribe 1");
????s1.unsubscribe();
????Thread.sleep(1000);
????LOGGER.info("unsubscribe 2");
????s2.unsubscribe();
}

請注意,我們也不能取消訂閱人工訂閱者。我們可以取消訂閱所有真正的訂閱者,但人工訂閱者仍將處理事件。

為了理解這一點(diǎn),讓我們看一下最后一個訂閱者取消訂閱后最后發(fā)生的事情:

subscribing #1
saving 296
subscriber#1 is printing x-coordinate 296
saving 329
subscriber#1 is printing x-coordinate 329
subscribing #2
saving 226
subscriber#1 is printing x-coordinate 226
subscriber#2 is printing x-coordinate 226
unsubscribe 1
saving 268
subscriber#2 is printing x-coordinate 268
saving 234
subscriber#2 is printing x-coordinate 234
unsubscribe 2
saving 278
saving 268

正如我們所看到的,在第二次取消訂閱后,不會出現(xiàn)清除資源的情況,并繼續(xù)使用doOnNext()保存元素。這意味著人工訂閱服務(wù)器不會取消訂閱,而是繼續(xù)使用元素。

refCount()

refCount()類似于autoConnect(),因為只要第一個訂閱者訂閱,連接也會自動發(fā)生。

與autoconnect()不同,當(dāng)最后一個訂閱者取消訂閱時,也會自動斷開連接:

public static void refCountAndSubscribe() throws InterruptedException {
????Observable obs = getObservable()
??????.doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount();
?
????LOGGER.info("refcount()");
????Thread.sleep(1000);
????LOGGER.info("subscribing #1");
????Subscription subscription1 = obs.subscribe(
??????i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i));
????Thread.sleep(1000);
????LOGGER.info("subscribing #2");
????Subscription subscription2 = obs.subscribe(
??????i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i));
?
????Thread.sleep(1000);
????LOGGER.info("unsubscribe#1");
????subscription1.unsubscribe();
????Thread.sleep(1000);
????LOGGER.info("unsubscribe#2");
????subscription2.unsubscribe();
}
refcount()
subscribing #1
saving 265
subscriber#1 is printing x-coordinate 265
saving 338
subscriber#1 is printing x-coordinate 338
subscribing #2
saving 203
subscriber#1 is printing x-coordinate 203
subscriber#2 is printing x-coordinate 203
unsubscribe#1
saving 294
subscriber#2 is printing x-coordinate 294
unsubscribe#2
clearing resources
結(jié)論

ConnectableObservable類可以輕松地處理多個訂閱者。

它的方法看起來很相似,但由于實(shí)現(xiàn)上的細(xì)微差別(甚至方法的順序也很重要),用戶的行為發(fā)生了很大的變化。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/72981.html

相關(guān)文章

  • RxJava系列二(基本概念及使用介紹)

    摘要:作用默認(rèn)的,直接在當(dāng)前線程運(yùn)行總是開啟一個新線程用于密集型任務(wù),如異步阻塞操作,這個調(diào)度器的線程池會根據(jù)需要增長對于普通的計算任務(wù),請使用默認(rèn)是一個,很像一個有線程緩存的新線程調(diào)度器計算所使用的。這個使用的固定的線程池,大小為核數(shù)。 轉(zhuǎn)載請注明出處:https://zhuanlan.zhihu.com/p/20687307 RxJava系列1(簡介) RxJava系列2(基本概念及使...

    Profeel 評論0 收藏0
  • 【Android】RxJava的使用(二)Action

    摘要:回顧在上一節(jié)的使用一基本用法中,介紹了的基本用法。它同樣只有一個方法,這個方法也無返回值,但有一個參數(shù)與同理,由于和也是單參數(shù)無返回值的,因此可以將和打包起來傳入以實(shí)現(xiàn)不完整定義的回調(diào)的使用定義三個對象,分別打包。 回顧 在上一節(jié)Android RxJava的使用(一)基本用法中,介紹了RxJava的基本用法。下面來回顧下實(shí)現(xiàn)一次RxJava的基本使用。例:分別打印Hello、 Wor...

    jemygraw 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<