摘要:舉例來說,每年都有生日是一道數(shù)據(jù)流,但是一個(gè)人的年齡卻是一個(gè)流。運(yùn)行結(jié)果顯示,第二個(gè)在訂閱之后,獲得了數(shù)據(jù)流中最后毫秒事件內(nèi)產(chǎn)生的和三個(gè)值。
原文:http://reactivex.io/rxjs/manu...
Subject是什么? RxJS的Subject是Observable的一個(gè)特殊類型,他可以將流中的值廣播給眾多觀察者(Observer)。
一般的Observalbe流是單一廣播制(每一個(gè)訂閱流的Observer擁有一個(gè)獨(dú)立的執(zhí)行過程)。
一個(gè)Subject類似一道Observable數(shù)據(jù)流,但是可以對多個(gè)Observer進(jìn)行多點(diǎn)廣播。這就像事件觸發(fā)器(EventEmitter):維護(hù)了一個(gè)偵聽器的列表。
每一個(gè)Subject就是一個(gè)Observable流。 對于給定的Subject,你可以訂閱它(subscribe),提供一個(gè)Observer,之后將會正常的接收傳遞來的數(shù)據(jù)。從Observer的角度來說,它是無法分辨一個(gè)流中的值是來源于單一廣播機(jī)制的Observable流還是一個(gè)Subject流。
在Subject內(nèi)部,訂閱(subscribe)不會引起一個(gè)新的接收數(shù)據(jù)的過程。類似于其他庫或語言中的注冊事件偵聽器(addListener),它會直接把給定的Observer放入到一個(gè)注冊列表中。
每一個(gè)Subject也是一個(gè)觀察者(Observer)。 擁有next(v)、error(e)和complete()方法。往Subject中填充數(shù)據(jù),只需要調(diào)用next(theValue)即可,它將會把數(shù)據(jù)廣播給所有已注冊的Observer。
以下的例子中,我們設(shè)定了2個(gè)訂閱Subject流的Observer,然后我們填充一些數(shù)據(jù)到Subject:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(1); subject.next(2);
得到了如下輸出:
observerA: 1 observerB: 1 observerA: 2 observerB: 2
因?yàn)镾ubject是一個(gè)Observer,因此你也可以將它作為任何Observable的subscribe()的參數(shù),訂閱這個(gè)Observable流,就像下面這樣:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // You can subscribe providing a Subject
運(yùn)行的結(jié)果:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
在上面的方法中,我們使用Subject將一個(gè)單點(diǎn)廣播的Observable流轉(zhuǎn)換為多點(diǎn)廣播。這也佐證了,Subject是可以將任何Observable流共享給多個(gè)Observer的唯一途徑。
除了Subject,還有一些衍生出的專門的Subject:BehaviorSubject,ReplaySubject和AsyncSubject。
多路傳播的Observable流 Multicasted Observables相比于只能推送消息給單個(gè)的Observer的“單路Observable流”,利用具有多個(gè)訂閱者的Subject,“多路傳播的Observable流”可以有多個(gè)通知通道。
多路傳播的Observable在后臺通過使用Subject讓多個(gè)Observers能夠從同一個(gè)Observable流中獲取數(shù)據(jù)。
在后臺,multicast操作符是這樣工作的:Obersver訂閱潛在的Subject,而Subject又訂閱了源Observable流。下面的例子和之前使用observable.subscribe(subject)的情況類似:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // These are, under the hood, `subject.subscribe({...})`: multicasted.subscribe({ next: (v) => console.log("observerA: " + v) }); multicasted.subscribe({ next: (v) => console.log("observerB: " + v) }); // This is, under the hood, `source.subscribe(subject)`: multicasted.connect();
multicast流返回了一個(gè)看似普通的Observable流,但是當(dāng)訂閱的時(shí)候他表現(xiàn)的與Subject類似。這個(gè)流被稱作ConnectableObservable流,本質(zhì)是一個(gè)Observable流,但擁有connect()方法。
connect()在內(nèi)部執(zhí)行了source.subscribe(subject),并且返回了一個(gè)你可以取消Observable流執(zhí)行的Subscription。因此,當(dāng)可被共享的Observable流開始時(shí),connect()方法對于精確的判定執(zhí)行過程很重要。
引用計(jì)數(shù) Reference counting手動的調(diào)用connect()和執(zhí)行Subscription往往是很累人的。我們當(dāng)然希望可以在第一個(gè)Observer訂閱的時(shí)候就自動的執(zhí)行connect(),并且最好在最后一個(gè)Observer取消訂閱(unsubscribe)的時(shí)候能自動取消流的執(zhí)行。
考慮一下,處于下列操作順序時(shí)的表現(xiàn)情況:
第一個(gè)Observer訂閱了多路傳播的Observable流
多路傳播的Observable流呈被連接狀態(tài)
調(diào)用next()傳0給第一個(gè)Observer
第二個(gè)Observer訂閱多路傳播Observable流
調(diào)用next()傳1給第一個(gè)Observer
調(diào)用next()傳1給第二個(gè)Observer
第一個(gè)Observer取消訂閱
調(diào)用next()傳2給第二個(gè)Observer
第二個(gè)Observer取消訂閱
多路傳播Observable流的連接情況是未被訂閱狀態(tài)
為了顯式的調(diào)用connect()實(shí)現(xiàn)這個(gè)過程,我們編寫如下代碼:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log("observerA: " + v) }); // We should call `connect()` here, because the first // subscriber to `multicasted` is interested in consuming values subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); // We should unsubscribe the shared Observable execution here, // because `multicasted` would have no more subscribers after this setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // for the shared Observable execution }, 2000);
如果我們想避免顯式的調(diào)用connect(),我們可以使用ConnectableObservable的refCount()方法(引用計(jì)數(shù)),他返回了一個(gè)存有眾多訂閱者的Observable流。當(dāng)訂閱者的數(shù)量從0增加到1時(shí),將會自動調(diào)用connect(),開始共享流。
當(dāng)訂閱者的數(shù)量從1變?yōu)?,即將處于未訂閱狀態(tài)時(shí),將會自動停止下一步的執(zhí)行。
refCount使多路傳播Observable流在第一個(gè)訂閱者出現(xiàn)時(shí)自動啟動,在最后一個(gè)訂閱者離開時(shí)自動停止。
請看下面的例子:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; // This calls `connect()`, because // it is the first subscriber to `refCounted` console.log("observerA subscribed"); subscription1 = refCounted.subscribe({ next: (v) => console.log("observerA: " + v) }); setTimeout(() => { console.log("observerB subscribed"); subscription2 = refCounted.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 600); setTimeout(() => { console.log("observerA unsubscribed"); subscription1.unsubscribe(); }, 1200); // This is when the shared Observable execution will stop, because // `refCounted` would have no more subscribers after this setTimeout(() => { console.log("observerB unsubscribed"); subscription2.unsubscribe(); }, 2000);
執(zhí)行過后的輸出是:
observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
refCount()方法只存在于ConnectableObservable中,他返回一個(gè)Observable流,而不是另一個(gè)ConnectableObservable流。
BehaviorSubjectBehaviorSubject是一類特異的Subject。具有返回“當(dāng)前值”的特性。它存儲了流中最新的值并把它推送給自己的用戶,不論它的新舊與否,都能夠立即收到推送的這個(gè)“當(dāng)前值”。
BehaviorSubject 非常有利于表示“變化中的值”。舉例來說,每年都有生日是一道Subject數(shù)據(jù)流,但是一個(gè)人的年齡卻是一個(gè)BehaviorSubject流。
來看下面的例子,BehaviorSubject以0為值進(jìn)行初始化,第一個(gè)訂閱的Observer將會直接收到這個(gè)值。當(dāng)2被填充入流之后,第二個(gè)Observer訂閱流時(shí),盡管時(shí)間較晚,也會收到最新值2。
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(3);
輸出如下:
observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3ReplaySubject
ReplaySubject 很像BehaviorSubject,他會把時(shí)間線中較老的值推送給新的訂閱者們,而且他還可以記錄Observable流中一段時(shí)間的值。
ReplaySubject能夠記錄Observable流中的多個(gè)值,并將它們推送給新的訂閱者。
創(chuàng)建ReplaySubject時(shí),你可以指定需要回放多少個(gè)值,像這樣:
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(5);
輸出如下:
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
在設(shè)定數(shù)據(jù)量大小之外,你還可以指定一個(gè)以毫秒為單位的窗口時(shí)間,用來確定記錄的數(shù)據(jù)所在的時(shí)間區(qū)間(數(shù)據(jù)有多老)。
在下面的例子中,我們使用了一個(gè)較大的數(shù)據(jù)量設(shè)定,同時(shí)還設(shè)定了500毫秒的窗口時(shí)間。
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 1000);
運(yùn)行結(jié)果顯示,第二個(gè)Observer在訂閱之后,獲得了數(shù)據(jù)流中最后500毫秒事件內(nèi)產(chǎn)生的3,4和5三個(gè)值。
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 /************/ observerB: 3 observerB: 4 observerB: 5 /************/ observerA: 6 observerB: 6 ...AsyncSubject
AsyncSubject是Subject的另一個(gè)變化,他會在流發(fā)出complete通知時(shí),將數(shù)據(jù)流中的最后一個(gè)值推送給所有訂閱流的Observer。
var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(5); subject.complete();
輸出為:
With output:
observerA: 5 observerB: 5
AsyncSubject非常類似last()操作符,它會等待complete通知,并在那時(shí)推送流中的數(shù)據(jù)值。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/83625.html
摘要:原文是一個(gè)使用可觀察量隊(duì)列解決異步編程和基于事件編程的庫。提供了幾個(gè)管理異步事件的核心概念可觀察量,代表了一個(gè)由未來獲取到的值或事件組成的集合。相當(dāng)于事件觸發(fā)器,是向多個(gè)廣播事件或推送值的唯一方法。 原文:http://reactivex.io/rxjs/manu... RxJS 是一個(gè)使用可觀察量(observable)隊(duì)列解決異步編程和基于事件編程的js庫。他提供了一個(gè)核心的類型O...
摘要:原文可觀察量是一種能惰性推送的集合,他可以包含多個(gè)值。是一種惰性計(jì)算方式,會在迭代中同步的返回到無限個(gè)可能的話返回值。使用一種處理方法,最終可能會或可能不會返回一個(gè)值。無論是同步方式還是異步方式,都可以擇其一來傳遞返回值。 原文:http://reactivex.io/rxjs/manu... Observable 可觀察量是一種能惰性推送的集合,他可以包含多個(gè)值。下面的表格對比了推送...
摘要:實(shí)例化一個(gè)對象向接受者發(fā)送一個(gè)消息流接受者訂閱消息,獲取消息流中的數(shù)據(jù)接受者訂閱消息,獲取消息流中的數(shù)據(jù)這樣兩路接受者都能拿到發(fā)送的數(shù)據(jù)流是的一個(gè)衍生類,它將數(shù)據(jù)流中的最新值推送給接受者。 Rxjs_Subject 及其衍生類 在 RxJS 中,Observable 有一些特殊的類,在消息通信中使用比較頻繁,下面主要介紹較常用的幾個(gè)類: 1/ Subject Subject 可以實(shí)現(xiàn)...
摘要:技術(shù)積累經(jīng)過社區(qū)的努力學(xué)習(xí)資料還是很多的,官方中文文檔就已經(jīng)很不錯,不過我們先從天精通初步感受一下然后配合一些中文文檔來補(bǔ)充知識點(diǎn),最后再根據(jù)官方文檔來校驗(yàn)整個(gè)知識體系。資料學(xué)習(xí)操作符的時(shí)候可以對照彈珠圖的交互彈珠圖的中文版中文文檔 前言 最近準(zhǔn)備畢設(shè),技術(shù)選型的時(shí)候因?yàn)楣δ艿囊恍┬枨鬁?zhǔn)備將RxJs融入到項(xiàng)目中,考慮RxJs的時(shí)候因?yàn)橹暗募夹g(shù)棧還猶豫了一下,查了一些資料以及粗略瀏覽了...
閱讀 2036·2021-10-09 09:41
閱讀 1609·2021-09-28 09:36
閱讀 1115·2021-09-26 09:55
閱讀 1303·2021-09-10 11:17
閱讀 1159·2021-09-02 09:56
閱讀 2772·2019-08-30 12:58
閱讀 2940·2019-08-29 13:03
閱讀 1867·2019-08-26 13:40