摘要:建立一個(gè)實(shí)時(shí)地震我們將為地震儀表板應(yīng)用程序構(gòu)建服務(wù)器和客戶端部件,實(shí)時(shí)記錄地震的位置并可視化顯示。添加地震列表新儀表板的第一個(gè)功能是顯示地震的實(shí)時(shí)列表,包括有關(guān)其位置,大小和日期的信息。
Rxjs 響應(yīng)式編程-第一章:響應(yīng)式
Rxjs 響應(yīng)式編程-第二章:序列的深入研究
Rxjs 響應(yīng)式編程-第三章: 構(gòu)建并發(fā)程序
Rxjs 響應(yīng)式編程-第四章 構(gòu)建完整的Web應(yīng)用程序
Rxjs 響應(yīng)式編程-第五章 使用Schedulers管理時(shí)間
Rxjs 響應(yīng)式編程-第六章 使用Cycle.js的響應(yīng)式Web應(yīng)用程序
在本章中,我們將構(gòu)建一個(gè)典型的Web應(yīng)用程序,在前端和后端使用RxJS。我們將轉(zhuǎn)換文檔對象模型(DOM)并使用Node.js服務(wù)器中的WebSockets進(jìn)行客戶端 - 服務(wù)器通信。
對于用戶界面位,我們將使用RxJS-DOM庫,這是由RxJS制作的同一團(tuán)隊(duì)的庫,它提供了方便的Operator來處理DOM和瀏覽器相關(guān)的東西,這將使我們的編程更簡潔。對于服務(wù)器部分,我們將使用兩個(gè)完善的節(jié)點(diǎn)庫,并將一些API與Observables包裝在一起,以便在我們的應(yīng)用程序中使用它們。
在本章之后,您將能夠使用RxJS以聲明方式構(gòu)建用戶界面,使用我們目前為止看到的技術(shù)并將它們應(yīng)用于DOM。 您還可以在任何Node.js項(xiàng)目中使用RxJS,并且能夠在任何項(xiàng)目中使用反應(yīng)式編程和RxJS。
建立一個(gè)實(shí)時(shí)地震Dashboard我們將為地震儀表板應(yīng)用程序構(gòu)建服務(wù)器和客戶端部件,實(shí)時(shí)記錄地震的位置并可視化顯示。我們將在Node.js中構(gòu)建服務(wù)器,并且改進(jìn)我們的應(yīng)用程序,使其更具互動性和更充足的信息量。
一開始的代碼如下:
examples_earthquake/code1_3.js
var quakes = Rx.Observable .interval(5000) .flatMap(function() { return Rx.DOM.jsonpRequest({ url: QUAKE_URL, jsonpCallback: "eqfeed_callback" }).retry(3); }) .flatMap(function(result) { return Rx.Observable.from(result.response.features); }) .distinct(function(quake) { return quake.properties.code; }); quakes.subscribe(function(quake) { var coords = quake.geometry.coordinates; var size = quake.properties.mag * 10000; L.circle([coords[1], coords[0]], size).addTo(map); });
這段代碼已經(jīng)有一個(gè)潛在的錯(cuò)誤:它可以在DOM準(zhǔn)備好之前執(zhí)行,每當(dāng)我們嘗試在代碼中使用DOM元素時(shí)就會拋出錯(cuò)誤。我們想要的是在觸發(fā)DOMContentLoaded事件之后加載我們的代碼,這表示瀏覽器已經(jīng)準(zhǔn)備好dom了。
RxJS-DOM提供Rx.DOM.readyObservable,當(dāng)觸發(fā)DOMContentLoaded時(shí),它會發(fā)出一次。 因此,讓我們將代碼包裝在initialize函數(shù)中,并在訂閱Rx.DOM.ready時(shí)執(zhí)行它:
examples_earthquake_ui/code1.js
function initialize() { var quakes = Rx.Observable .interval(5000) .flatMap(function() { return Rx.DOM.jsonpRequest({ url: QUAKE_URL, jsonpCallback: "eqfeed_callback" }); }) .flatMap(function(result) { return Rx.Observable.from(result.response.features); }) .distinct(function(quake) { return quake.properties.code; }); quakes.subscribe(function(quake) { var coords = quake.geometry.coordinates; var size = quake.properties.mag * 10000; L.circle([coords[1], coords[0]], size).addTo(map); }); } Rx.DOM.ready().subscribe(initialize);
接下來,我們將在HTML中添加一個(gè)空表,我們將在下一部分填充地震數(shù)據(jù):
Location | Magnitude | Time |
---|
有了這個(gè),我們準(zhǔn)備開始為我們的儀表板編寫新代碼。
添加地震列表新儀表板的第一個(gè)功能是顯示地震的實(shí)時(shí)列表,包括有關(guān)其位置,大小和日期的信息。此列表的數(shù)據(jù)與來自USGS網(wǎng)站的地圖相同。我們首先創(chuàng)建一個(gè)函數(shù),在給定props對象參數(shù)的情況下返回一個(gè)row元素:
examples_earthquake_ui/code2.js
function makeRow(props) { var row = document.createElement("tr"); row.id = props.net + props.code; var date = new Date(props.time); var time = date.toString(); [props.place, props.mag, time].forEach(function(text) { var cell = document.createElement("td"); cell.textContent = text; row.appendChild(cell); }); return row; }
props參數(shù)與我們從USGS站點(diǎn)檢索的JSON中的properties屬性相同。
為了生成行,我們將再次訂閱地震Observable。此訂閱會在表格中為每次收到的新地震創(chuàng)建一行。 我們在initialize函數(shù)的末尾添加代碼:
examples_earthquake_ui/code2.js
var table = document.getElementById("quakes_info"); quakes .pluck("properties") .map(makeRow) .subscribe(function(row) { table.appendChild(row); });
pluck運(yùn)算符從每個(gè)地震對象中提取屬性值,因?yàn)樗琺akeRow所需的所有信息。 然后我們將每個(gè)地震對象映射到makeRow,將其轉(zhuǎn)換為填充的HTML tr元素。 最后,在訂閱中,我們將每個(gè)發(fā)出的行追加到我們的table中。
每當(dāng)我們收到地震數(shù)據(jù)時(shí),這應(yīng)該得到一個(gè)數(shù)據(jù)稠密的表格。
看起來不錯(cuò),而且很容易!不過,我們可以做一些改進(jìn)。首先,我們需要探索RxJS中的一個(gè)重要概念:冷熱Observable。
冷熱Observable無論Observers是否訂閱它們,“熱”O(jiān)bservable都會發(fā)出值。另一方面,“冷”O(jiān)bservables從Observer開始訂閱就發(fā)出整個(gè)值序列。
熱Observable訂閱熱Observable的Observer將接收從訂閱它的確切時(shí)刻發(fā)出的值。在那一刻訂閱的每個(gè)其他Observer將收到完全相同的值。 這類似于JavaScript事件的工作方式。
鼠標(biāo)事件和股票交易代碼是熱的Observables的例子。在這兩種情況下,Observable都會發(fā)出值,無論它是否有訂閱者,并且在任何訂閱者收聽之前可能已經(jīng)生成了值。這是一個(gè)例子:
hot_cold.js
var onMove = Rx.Observable.fromEvent(document, "mousemove"); var subscriber1 = onMove.subscribe(function(e) { console.log("Subscriber1:", e.clientX, e.clientY); }); var subscriber2 = onMove.subscribe(function(e) { console.log("Subscriber2:", e.clientX, e.clientY); });
// Result: // Subscriber1: 23 24 // Subscriber2: 23 24 // Subscriber1: 34 37 // Subscriber2: 34 37 // Subscriber1: 46 49 // Subscriber2: 46 49 // ...
在該示例中,兩個(gè)訂閱者在發(fā)出Observable時(shí)都會收到相同的值。 對于JavaScript程序員來說,這種行為感覺很自然,因?yàn)樗愃朴贘avaScript事件的工作方式。
現(xiàn)在讓我們看看冷Observables是如何工作的。
冷Observable只有當(dāng)Observers訂閱它時(shí),冷Observable才會發(fā)出值。
例如,Rx.Observable.range返回一個(gè)冷Observable。訂閱它的每個(gè)新觀察者都將收到整個(gè)范圍:
hot_cold.js
function printValue(value) { console.log(value); } var rangeToFive = Rx.Observable.range(1, 5); var obs1 = rangeToFive.subscribe(printValue); // 1, 2, 3, 4, 5 var obs2 = Rx.Observable .delay(2000) .flatMap(function() { return rangeToFive.subscribe(printValue); // 1, 2, 3, 4, 5 });
了解我們何時(shí)處理熱或冷的Observable對于避免細(xì)微和隱藏的錯(cuò)誤至關(guān)重要。例如,Rx.Observable.interval返回一個(gè)Observable,它以固定的時(shí)間間隔生成一個(gè)遞增的整數(shù)值。 想象一下,我們想用它來將相同的值推送給幾個(gè)觀察者。 我們可以像這樣實(shí)現(xiàn)它:
hot_cold.js
var source = Rx.Observable.interval(2000); var observer1 = source.subscribe(function (x) { console.log("Observer 1, next value: " + x); }); var observer2 = source.subscribe(function (x) { console.log("Observer 2: next value: " + x); });
輸出
Observer 1, next value: 0 Observer 2: next value: 0 Observer 1, next value: 1 Observer 2: next value: 1 ...
這似乎沒什么問題。 但現(xiàn)在想象我們需要第二個(gè)用戶在第一個(gè)用戶加入后三秒鐘加入:
hot_cold.js
var source = Rx.Observable.interval(1000); var observer1 = source.subscribe(function (x) { console.log("Observer 1: " + x); }); setTimeout(function() { var observer2 = source.subscribe(function (x) { console.log("Observer 2: " + x); }); }, 3000);
輸出
Observer 1: 0 Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 2: 0 Observer 1: 4 Observer 2: 1 ...
現(xiàn)在我們看到有些東西真的沒了。三秒后訂閱時(shí),observer2接收源已經(jīng)推送過的所有值,而不是從當(dāng)前值開始并從那里繼續(xù),因?yàn)镽x.Observable.interval是一個(gè)冷Observable。 如果熱和冷Observables之間的的區(qū)別不是很清楚的話,那么這樣的場景可能會令人驚訝。
如果我們有幾個(gè)Observers訂閱冷的Observable,他們將收到相同序列值的副本。嚴(yán)格來說,盡管觀察者共享相同的Observable,但它們并沒有共享相同的值序列。如果我們希望Observers共享相同的序列,我們需要一個(gè)熱的Observable。
從冷到熱使用publish我們可以使用publish將冷的Observable變成熱的。調(diào)用publish會創(chuàng)建一個(gè)新的Observable,它充當(dāng)原始Observable的代理。它通過訂閱原始版本并將其收到的值推送給訂閱者來實(shí)現(xiàn)。
已發(fā)布的Observable實(shí)際上是一個(gè)ConnectableObservable,它有一個(gè)名為connect的額外方法,我們調(diào)用它來開始接收值。 這允許我們在開始運(yùn)行之前訂閱它:
hot_cold.js
// Create an Observable that yields a value every second var source = Rx.Observable.interval(1000); var publisher = source.publish(); // Even if we are subscribing, no values are pushed yet. var observer1 = publisher.subscribe(function (x) { console.log("Observer 1: " + x); }); // publisher connects and starts publishing values publisher.connect(); setTimeout(function() { // 5 seconds later, observer2 subscribes to it and starts receiving // current values, not the whole sequence. var observer2 = publisher.subscribe(function (x) { console.log("Observer 2: " + x); }); }, 5000);共享冷Observable
讓我們回到我們的地震示例。到目前為止,我們的代碼看起來很合理;我們有一個(gè)帶有兩個(gè)訂閱的Observable地震:一個(gè)在地圖上繪制地震,另一個(gè)在表格中列出地震。
但我們可以使代碼更有效率。 通過讓兩個(gè)地震用戶,我們實(shí)際上要求兩次數(shù)據(jù)。 您可以通過在quakes的flatMap操作符中放入一個(gè)console.log來檢查。
發(fā)生這種情況是因?yàn)閝uakes是一個(gè)冷Observable,并且它會將所有值重新發(fā)送給每個(gè)新訂閱者,因此新訂閱意味著新的JSONP請求。這會通過網(wǎng)絡(luò)請求兩次相同的資源來影響我們的應(yīng)用程序性能。
對于下一個(gè)示例,我們將使用`share·運(yùn)算符,當(dāng)Observers的數(shù)量從0變?yōu)?時(shí),它自動創(chuàng)建對Observable的預(yù)訂。 這使我們免于重新連接:
examples_earthquake_ui/code2.js
var quakes = Rx.Observable .interval(5000) .flatMap(function() { return Rx.DOM.jsonpRequest({ url: QUAKE_URL, jsonpCallback: "eqfeed_callback" }); }) .flatMap(function(result) { return Rx.Observable.from(result.response.features); }) .distinct(function(quake) { return quake.properties.code; }) .share()
現(xiàn)在地震的行為就像一個(gè)熱的Observable,我們不必?fù)?dān)心我們連接多少觀察者,因?yàn)樗麄兌紩盏酵耆嗤臄?shù)據(jù)。
緩沖值我們之前的代碼運(yùn)行良好,但請注意,每次我們收到有關(guān)地震的信息時(shí)都會插入一個(gè)tr節(jié)點(diǎn)。 這是低效的,因?yàn)槊看尾迦胛覀兌紩薷腄OM并導(dǎo)致重新繪制頁面,使瀏覽器不必要地計(jì)算新布局。 這可能會導(dǎo)致性能下降。
理想情況下,我們會批處理幾個(gè)傳入的地震對象,并每隔幾秒插入一批地震對象。手動實(shí)現(xiàn)會很棘手,因?yàn)槲覀儽仨毐A粲?jì)數(shù)器和元素緩沖區(qū),我們必須記住每次批量重置它們。 但是使用RxJS,我們可以使用一個(gè)基于緩沖區(qū)的RxJS運(yùn)算符,比如bufferWithTime。
使用bufferWithTime,我們可以緩沖傳入的值,并在每x個(gè)時(shí)間段將它們作為數(shù)組釋放:
examples_earthquake_ui/code3.bufferWithTime.js
var table = document.getElementById("quakes_info"); quakes .pluck("properties") .map(makeRow) ? .bufferWithTime(500) ? .filter(function(rows) { return rows.length > 0; } .map(function(rows) { var fragment = document.createDocumentFragment(); rows.forEach(function(row) { ? fragment.appendChild(row); }); return fragment; }) .subscribe(function(fragment) { ? table.appendChild(fragment); });
這是新代碼中正在發(fā)生的事情:
B緩存每個(gè)傳入值并每500毫秒釋放一批值。
無論如何,bufferWithTime每500ms執(zhí)行一次,如果沒有傳入值,它將產(chǎn)生一個(gè)空數(shù)組。 我們會過濾掉這些空數(shù)組。
我們將每一行插入一個(gè)文檔片段,這是一個(gè)沒有父文檔的文檔。這意味著它不在DOM中,并且修改其內(nèi)容非??焖俸陀行?。
最后,我們將片段附加到DOM。附加片段的一個(gè)優(yōu)點(diǎn)是它被視為單個(gè)操作,只會導(dǎo)致一次重繪。 它還將片段的子元素附加到我們附加片段本身的同一元素。
使用緩沖區(qū)和片段,我們設(shè)法保持行插入性能,同時(shí)保持應(yīng)用程序的實(shí)時(shí)性(最大延遲為半秒)。 現(xiàn)在我們已準(zhǔn)備好為我們的儀表板添加下一個(gè)功能:交互性!
添加交互我們現(xiàn)在在地圖上和列表中發(fā)生地震,但兩個(gè)表示之間沒有相互作用。例如,每當(dāng)我們點(diǎn)擊列表上的地圖時(shí),就可以在地圖上居中地震,并在我們將鼠標(biāo)移動到其行上時(shí)突出顯示地圖上帶圓圈的地震。 我們開始吧。
在Leaflet中,您可以在地圖上繪制并將繪圖放在各自的圖層中,以便您可以多帶帶操作它們。 讓我們創(chuàng)建一組名為quakeLayer的圖層,我們將存儲所有地震圈。每個(gè)圓圈都是該組內(nèi)的一個(gè)圖層。 我們還將創(chuàng)建一個(gè)對象codeLayers,我們將存儲地震代碼和內(nèi)部圖層ID之間的相關(guān)性,以便我們可以通過地震ID來查找圓圈:
examples_earthquake_ui/code3.js
var codeLayers = {}; var quakeLayer = L.layerGroup([]).addTo(map);
現(xiàn)在,在初始化內(nèi)部的地震Observable訂閱中,我們將每個(gè)圓圈添加到圖層組并將其ID存儲在codeLayers中。 如果這看起來有點(diǎn)錯(cuò)綜復(fù)雜,那是因?yàn)檫@是Leaflet允許我們在地圖中引用圖層的唯一方式。
examples_earthquake_ui/code3.js
quakes.subscribe(function(quake) { var coords = quake.geometry.coordinates; var size = quake.properties.mag * 10000; var circle = L.circle([coords[1], coords[0]], size).addTo(map); quakeLayer.addLayer(circle); codeLayers[quake.id] = quakeLayer.getLayerId(circle); });
我們現(xiàn)在創(chuàng)建懸停效果。我們將編寫一個(gè)新函數(shù)isHovering,它返回一個(gè)Observable,它發(fā)出一個(gè)布爾值,表示在任何給定時(shí)刻鼠標(biāo)是否在特定地震圈上:
examples_earthquake_ui/code3.js
? var identity = Rx.helpers.identity; function isHovering(element) { ? var over = Rx.DOM.mouseover(element).map(identity(true)); ? var out = Rx.DOM.mouseout(element).map(identity(false)); ? return over.merge(out); }
Rx.helpers.identity是定義函數(shù)。 給定參數(shù)x,它返回x。 這樣我們就不必編寫返回它們收到的值的函數(shù)。
over是一個(gè)Observable,當(dāng)用戶將鼠標(biāo)懸停在元素上時(shí)會發(fā)出true。
out是一個(gè)Observable,當(dāng)用戶將鼠標(biāo)移動到元素之外時(shí),它會發(fā)出false。
isHovering將over和out合并,返回一個(gè)Observable,當(dāng)鼠標(biāo)懸停在元素上時(shí)發(fā)出true,當(dāng)它離開時(shí)返回false。
使用isHovering,我們可以修改創(chuàng)建rows的訂閱,這樣我們就可以在創(chuàng)建時(shí)訂閱每行中的事件:
examples_earthquake_ui/code3.js
var table = document.getElementById("quakes_info"); quakes .pluck("properties") .map(makeRow) .bufferWithTime(500) .filter(function(rows) { return rows.length > 0; }) .map(function(rows) { var fragment = document.createDocumentFragment(); rows.forEach(function(row) { fragment.appendChild(row); }); return fragment; }) .subscribe(function(fragment) { var row = fragment.firstChild; // Get row from inside the fragment ? var circle = quakeLayer.getLayer(codeLayers[row.id]); ? isHovering(row).subscribe(function(hovering) { circle.setStyle({ color: hovering ? "#ff0000" : "#0000ff" }); }); ? Rx.DOM.click(row).subscribe(function() { map.panTo(circle.getLatLng()); }); table.appendChild(fragment); })
我們使用從行元素獲得的ID在地圖上獲取地震的圓元素。 有了它,codeLayers為我們提供了相應(yīng)的內(nèi)部ID,它使用quakeLayer.getLayer獲取了circle元素。
我們用當(dāng)前行調(diào)用isHovering,然后我們訂閱生成的Observable。 如果懸停參數(shù)為真,我們會將圓圈畫成紅色; 不然,它會是藍(lán)色的。
我們訂閱了從當(dāng)前行中的click事件創(chuàng)建的Observable。 單擊列表中的行時(shí),地圖將以地圖中相應(yīng)圓圈為中心。
使其更高效經(jīng)驗(yàn)豐富的前端開發(fā)人員知道在頁面上創(chuàng)建許多事件是導(dǎo)致性能不佳的一個(gè)因素。 在前面的示例中,我們?yōu)槊恳恍袆?chuàng)建了三個(gè)事件。 如果我們在列表中獲得100次地震,我們將在頁面周圍浮動300個(gè)事件,只是為了做一些亮點(diǎn)突出工作! 這對于表現(xiàn)來說太糟糕了,我們可以做得更好。
因?yàn)镈OM中的事件總是冒泡(從子元素到父元素),前端開發(fā)人員中一個(gè)眾所周知的技術(shù)是避免將鼠標(biāo)事件多帶帶附加到多個(gè)元素,而是將它們附加到父元素。 一旦在父項(xiàng)上觸發(fā)了事件,我們就可以使用事件的target屬性來查找作為事件目標(biāo)的子元素。
因?yàn)槲覀冃枰獮槭录lick和mouseover提供類似的功能,所以我們將創(chuàng)建一個(gè)函數(shù)getRowFromEvent:
examples_earthquake_ui/code3.pairwise.js
function getRowFromEvent(event) { return Rx.Observable .fromEvent(table, event) ? .filter(function(event) { var el = event.target; return el.tagName === "TD" && el.parentNode.id.length; }) ? .pluck("target", "parentNode") ? .distinctUntilChanged(); }
getRowFromEvent為我們提供了事件發(fā)生的表行。 以下是詳細(xì)信息:
我們確保在表格單元格中發(fā)生事件,并檢查該單元格的父級是否是具有ID屬性的行。 這些行是我們用地震ID標(biāo)記的行。
pluck運(yùn)算符在element的target屬性中提取嵌套屬性parentNode。
這可以防止多次獲得相同的元素。 例如,使用mouseover事件會發(fā)生很多事情。
examples_earthquake_ui/code3.pairwise.js
在上一節(jié)中,我們在每行上附加事件mouseover和mouseout,以便在每次鼠標(biāo)輸入或退出行時(shí)更改地震圈顏色。 現(xiàn)在,我們將僅使用桌面上的mouseover事件,并結(jié)合方便的pairwise運(yùn)算符:
examples_earthquake_ui/code3.pairwise.js
getRowFromEvent("mouseover") .pairwise() .subscribe(function(rows) { var prevCircle = quakeLayer.getLayer(codeLayers[rows[0].id]); var currCircle = quakeLayer.getLayer(codeLayers[rows[1].id]); prevCircle.setStyle({ color: "#0000ff" }); currCircle.setStyle({ color: "#ff0000" }); });
pairwise將每個(gè)發(fā)射值與先前在陣列中發(fā)射的值進(jìn)行分組。 因?yàn)槲覀兛偸谦@得不同的行,所以成對將始終產(chǎn)生鼠標(biāo)剛剛離開的行和鼠標(biāo)現(xiàn)在懸停的行。 有了這些信息,就可以相應(yīng)地為每個(gè)地震圈著色。
處理click事件更簡單:
examples_earthquake_ui/code3.pairwise.js
getRowFromEvent("click") .subscribe(function(row) { var circle = quakeLayer.getLayer(codeLayers[row.id]); map.panTo(circle.getLatLng()); });
我們可以回到訂閱quakes來生成行:
examples_earthquake_ui/code3.pairwise.js
quakes .pluck("properties") .map(makeRow) .subscribe(function(row) { table.appendChild(row); });
我們的代碼現(xiàn)在更加干凈,并且它不依賴于別處的row。 如果沒有row,getRowFromEvent將不會嘗試產(chǎn)生任何item。
更重要的是,我們的代碼現(xiàn)在非常高效。 無論我們檢索的地震信息量如何,我們總是只有一個(gè)鼠標(biāo)懸停事件和單擊事件,而不是數(shù)百個(gè)事件。
從Twitter獲取實(shí)時(shí)更新我們?yōu)榈卣鹬谱鲗?shí)時(shí)儀表板的計(jì)劃的第二部分是從Twitter添加與地球上發(fā)生的不同地震有關(guān)的報(bào)告和信息。 為此,我們將創(chuàng)建一個(gè)小型Node.js程序,該程序?qū)@取與地震相關(guān)的文章流。
設(shè)置我們的Node.js環(huán)境讓我們開始配置我們的Node.js應(yīng)用程序吧。除了RxJS,我們將使用兩個(gè)第三方模塊:ws和twit。這種類似的模塊都是讓我們保持最少的代碼。
首先,讓我們?yōu)槲覀兊膽?yīng)用程序創(chuàng)建一個(gè)文件夾,并安裝我們將使用的模塊。 (請注意,npm命令的輸出可能會因軟件包的當(dāng)前版本而異。)
客戶端 - 服務(wù)器通信現(xiàn)在我們準(zhǔn)備開始構(gòu)建我們的應(yīng)用程序了。讓我們在tweet_stream文件夾中創(chuàng)建一個(gè)名為index.js的新文件來加載我們將使用的模塊:
examples_earthquake_ui/tweet_stream/index.js
var WebSocketServer = require("ws").Server; var Twit = require("twit"); var Rx = require("rx");
要使用Twitter API,您需要在Twitter網(wǎng)站中請求使用者密鑰和訪問令牌。 完成后,使用配置對象創(chuàng)建一個(gè)新的Twit對象,如下所示:
examples_earthquake_ui/tweet_stream/index.js
var T = new Twit({ consumer_key: "rFhfB5hFlth0BHC7iqQkEtTyw", consumer_secret: "zcrXEM1jiOdKyiFFlGYFAOo43Hsz383i0cdHYYWqBXTBoVAr1x", access_token: "14343133-nlxZbtLuTEwgAlaLsmfrr3D4QAoiV2fa6xXUVEwW9", access_token_secret: "57Dr99wECljyyQ9tViJWz0H3obNG3V4cr5Lix9sQBXju1" });
現(xiàn)在我們可以創(chuàng)建一個(gè)函數(shù)onConnect,它將完成搜索推文和將來與客戶端通信的所有工作,并且我們可以啟動一個(gè)WebSocket服務(wù)器,一旦WebSocket連接并準(zhǔn)備好就會調(diào)用onConnect:
examples_earthquake_ui/tweet_stream/index.js
function onConnect(ws) { console.log("Client connected on localhost:8080"); } var Server = new WebSocketServer({ port: 8080 }); Rx.Observable.fromEvent(Server, "connection").subscribe(onConnect);
我們現(xiàn)在可以啟動我們的應(yīng)用程序,它應(yīng)該在端口8080上啟動WebSocket連接:
~/tweet_stream$ node index.js
由于我們尚未將任何瀏覽器連接到此服務(wù)器,因此尚未打印有關(guān)客戶端連接的消息。現(xiàn)在讓我們切換到dashboard的代碼并執(zhí)行此操作。我們將在RxJS-DOM中使用fromWebSocket運(yùn)算符:
examples_earthquake_ui/code4.js
function initialize() { var socket = Rx.DOM.fromWebSocket("ws://127.0.0.1:8080"); ...
在前面的代碼中,fromWebSocket創(chuàng)建一個(gè)Subject,作為WebSocket服務(wù)器的消息的發(fā)送者和接收者。 通過調(diào)用socket.onNext,我們將能夠向服務(wù)器發(fā)送消息,通過訂閱套接字,我們將收到服務(wù)器發(fā)送給我們的任何消息。
我們現(xiàn)在可以發(fā)送包含我們收到的地震數(shù)據(jù)的服務(wù)器消息:
examples_earthquake_ui/code4.js
quakes.bufferWithCount(100) .subscribe(function(quakes) { console.log(quakes); var quakesData = quakes.map(function(quake) { return { id: quake.properties.net + quake.properties.code, lat: quake.geometry.coordinates[1], lng: quake.geometry.coordinates[0], mag: quake.properties.mag }; }); socket.onNext(JSON.stringify({quakes: quakesData })); });
我們可以為來自服務(wù)器的消息設(shè)置訂閱者:
examples_earthquake_ui/code4.js
socket.subscribe(function(message) { console.log(JSON.parse(message.data)); });
現(xiàn)在,當(dāng)我們重新加載瀏覽器時(shí),客戶端消息應(yīng)出現(xiàn)在服務(wù)器終端中:
~/tweet_stream$ node index.js Client connected on localhost:8080
太棒了! 一旦開始從遠(yuǎn)程JSONP資源接收地震,瀏覽器就應(yīng)該向服務(wù)器發(fā)送命令。 但是現(xiàn)在,服務(wù)器完全忽略了這些消息。 是時(shí)候回到我們的推文流代碼并用它們做點(diǎn)什么了。
首先,我們將連接到從瀏覽器客戶端到達(dá)服務(wù)器的消息事件。 每當(dāng)客戶端發(fā)送消息時(shí),WebSocket服務(wù)器都會發(fā)出包含消息內(nèi)容的消息事件。 在我們的例子中,內(nèi)容是一個(gè)JSON字符串。
我們可以在onConnect函數(shù)中編寫以下代碼:
examples_earthquake_ui/tweet_stream/index.js
var onMessage = Rx.Observable.fromEvent(ws, "message") .subscribe(function(quake) { quake = JSON.parse(quake); console.log(quake); });
如果我們重新啟動服務(wù)器(終端中的Ctrl-C)并重新加載瀏覽器,我們應(yīng)該會看到終端上的地震細(xì)節(jié)打印出來。這是完美的。 現(xiàn)在我們已經(jīng)準(zhǔn)備好開始尋找與我們的地震有關(guān)的推文了。
檢索和發(fā)送推文我們正在使用Node.js twit的流式Twitter客戶端連接到Twitter和搜索推文。 從現(xiàn)在開始,服務(wù)器中的所有代碼都將在onConnect函數(shù)內(nèi)部發(fā)生,因?yàn)樗俣ㄒ呀?jīng)建立了與WebSocket的連接。 讓我們初始化推文流:
examples_earthquake_ui/tweet_stream/index.js
var stream = T.stream("statuses/filter", { track: "earthquake", locations: [] });
這告訴我們的Twit實(shí)例T開始流式傳輸Twitter狀態(tài),按關(guān)鍵字地震過濾。 當(dāng)然,這是非常通用的,而不是與現(xiàn)在發(fā)生的地震直接相關(guān)。 但請注意空位置數(shù)組。 這是一個(gè)緯度和經(jīng)度邊界的數(shù)組,我們可以用它們按地理位置過濾推文,以及地震一詞。 那更加具體! 好的,讓我們訂閱這個(gè)流并開始向?yàn)g覽器發(fā)送推文:
examples_earthquake_ui/tweet_stream/index.js
Rx.Observable.fromEvent(stream, "tweet").subscribe(function(tweetObject) { ws.send(JSON.stringify(tweetObject), function(err) { if (err) { console.log("There was an error sending the message"); } }); });
如果我們重新啟動服務(wù)器并重新加載瀏覽器,我們應(yīng)該在瀏覽器中收到推文,開發(fā)面板中的控制臺應(yīng)該打印推文。
這些推文尚未按地震位置進(jìn)行過濾。 為此,我們需要對收到的每一條地震信息做以下事情:
取每個(gè)地震的經(jīng)度和緯度對的震中坐標(biāo),創(chuàng)建一個(gè)邊界框,界定我們認(rèn)為與地震相關(guān)的推文的地理區(qū)域。
累積所有邊界坐標(biāo),以便發(fā)送給客戶端的推文與地圖上的地震保持相關(guān)。
每次收到新地震的消息時(shí),都會使用新坐標(biāo)更新twit流。
這是一種方法:
examples_earthquake_ui/tweet_stream/index.js
Rx.Observable .fromEvent(ws, "message") .flatMap(function(quakesObj){ quakesObj = JSON.parse(quakesObj); return Rx.Observable.from(quakesObj.quakes); }) ? .scan([], function(boundsArray, quake) { ? var bounds = [ quake.lng - 0.3, quake.lat - 0.15, quake.lng + 0.3, quake.lat + 0.15 ].map(function(coordinate) { coordinate = coordinate.toString(); return coordinate.match(/-?d+(.-?d{2})?/)[0]; }); boundsArray.concat(bounds); ? return boundsArray.slice(Math.max(boundsArray.length - 50, 0)); }) ? .subscribe(function(boundsArray) { stream.stop(); stream.params.locations = boundsArray.toString(); stream.start(); });
以下是前面代碼中發(fā)生的事情的一步一步:
我們再次見到我們的老朋友scan。 任何時(shí)候我們需要累積結(jié)果并產(chǎn)生每個(gè)中間結(jié)果,scan是我們的朋友。 在這種情況下,我們將繼續(xù)在boundsArray數(shù)組中累積地震坐標(biāo)。
從地震震中的單緯度/經(jīng)度坐標(biāo)對,我們創(chuàng)建一個(gè)陣列,其中包含由西北坐標(biāo)和東南坐標(biāo)確定的區(qū)域。 用于近似邊界的數(shù)字創(chuàng)建了一個(gè)大城市大小的矩形。之后,我們使用正則表達(dá)式將每個(gè)坐標(biāo)的小數(shù)精度限制為兩位小數(shù),以符合Twitter API要求。
我們將生成的邊界連接到boundsArray,它包含以前每個(gè)地震的邊界。 然后我們采用最后25對邊界(數(shù)組中的50個(gè)項(xiàng)目),因?yàn)檫@是Twitter API的限制。
最后,我們訂閱了Observable,在onNext函數(shù)中,我們重新啟動當(dāng)前的twit流來重新加載更新的位置,以便通過我們新的累積位置數(shù)組進(jìn)行過濾,轉(zhuǎn)換為字符串。
重新啟動服務(wù)器并重新加載瀏覽器后,我們應(yīng)該在瀏覽器應(yīng)用程序中收到相關(guān)的推文。 但是現(xiàn)在,我們只能看到開發(fā)人員控制臺中顯示的原始對象。 在下一節(jié)中,我們將生成HTML以在儀表板中顯示推文。
在Dashboard上顯示推文既然我們正在接收來自服務(wù)器的推文,那么剩下要做的就是在屏幕上很好地展示它們。 為此,我們將創(chuàng)建一個(gè)新的HTML元素,我們附加傳入的推文:
examples_earthquake_ui/index_final.html
我們還將更新socket Observable訂閱以處理傳入的tweet對象并將它們附加到我們剛剛創(chuàng)建的tweet_container元素:
examples_earthquake_ui/code5.js
socket .map(function(message) { return JSON.parse(message.data); }) .subscribe(function(data) { var container = document.getElementById("tweet_container"); container.insertBefore(makeTweetElement(data), container.firstChild); });
任何新的推文都會出現(xiàn)在列表的頂部,它們將由makeTweetElement創(chuàng)建,這是一個(gè)創(chuàng)建推文元素的簡單函數(shù),并使用我們作為參數(shù)傳遞的數(shù)據(jù)填充它:
examples_earthquake_ui/code5.js
function makeTweetElement(tweetObj) { var tweetEl = document.createElement("div"); tweetEl.className = "tweet"; var content = "" + "$text" + "$time"; var time = new Date(tweetObj.created_at); var timeText = time.toLocaleDateString() + " " + time.toLocaleTimeString(); content = content.replace("$tweetImg", tweetObj.user.profile_image_url); content = content.replace("$text", tweetObj.text); content = content.replace("$time", timeText); tweetEl.innerHTML = content; return tweetEl; }
有了這個(gè),我們終于有了一個(gè)帶有相關(guān)的地理定位推文的側(cè)邊欄,可以讓我們更深入地了解受地震影響的區(qū)域。
改進(jìn)的想法此儀表板已經(jīng)正常運(yùn)行,但可以進(jìn)行許多改進(jìn)。 一些想法,使它更好:
添加更多地震數(shù)據(jù)庫。 USGS是一個(gè)很棒的資源,但它主要提供在美國發(fā)生的地震。 合并來自世界各地的地震報(bào)告,而不僅僅是美國,并在地圖中將它們?nèi)空故驹谝黄饘苡腥ぁ?為此,您可以使用merge和mergeAll的幫助,并使用distinct與選擇器函數(shù)來避免重復(fù)。
每當(dāng)用戶點(diǎn)擊推文時(shí),將地圖置于相關(guān)地震中心。 這將涉及通過地震在服務(wù)器上對推文進(jìn)行分組,并且您可能希望使用groupBy運(yùn)算符將推文分組到特定地理區(qū)域。
總結(jié)在本章中,我們使用RxJS創(chuàng)建了一個(gè)響應(yīng)式用戶界面,使我們能夠?qū)崟r(shí)查看地球上發(fā)生的地震的各種數(shù)據(jù)。我們在瀏覽器客戶端和Node.js服務(wù)器中都使用了RxJS,顯示了使用Observable管理應(yīng)用程序的不同區(qū)域是多么容易。
更重要的是,我們已經(jīng)看到我們可以在客戶端和服務(wù)器上以相同的方式使用RxJS,在我們的應(yīng)用程序中隨處可見Observable序列抽象。 不僅如此。我們實(shí)際上可以在其他編程語言中使用RxJS概念和運(yùn)算符,因?yàn)樵S多編程語言都支持RxJS。
接下來我們將介紹Scheduler,它是RxJS中更高級的對象類型,它允許我們更精確地控制時(shí)間和并發(fā)性,并為測試代碼提供了很大的幫助。
關(guān)注我的微信公眾號,更多優(yōu)質(zhì)文章定時(shí)推送
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/96881.html
摘要:由于技術(shù)棧的學(xué)習(xí),筆者需要在原來函數(shù)式編程知識的基礎(chǔ)上,學(xué)習(xí)的使用。筆者在社區(qū)發(fā)現(xiàn)了一個(gè)非常高質(zhì)量的響應(yīng)式編程系列教程共篇,從基礎(chǔ)概念到實(shí)際應(yīng)用講解的非常詳細(xì),有大量直觀的大理石圖來輔助理解流的處理,對培養(yǎng)響應(yīng)式編程的思維方式有很大幫助。 showImg(https://segmentfault.com/img/bVus8n); [TOC] 一. 響應(yīng)式編程 響應(yīng)式編程,也稱為流式編程...
摘要:響應(yīng)式編程第一章響應(yīng)式響應(yīng)式編程第二章序列的深入研究響應(yīng)式編程第三章構(gòu)建并發(fā)程序響應(yīng)式編程第四章構(gòu)建完整的應(yīng)用程序響應(yīng)式編程第五章使用管理時(shí)間響應(yīng)式編程第六章使用的響應(yīng)式應(yīng)用程序使用管理時(shí)間自從接觸,就開始在我的項(xiàng)目中使用它。 Rxjs 響應(yīng)式編程-第一章:響應(yīng)式Rxjs 響應(yīng)式編程-第二章:序列的深入研究Rxjs 響應(yīng)式編程-第三章: 構(gòu)建并發(fā)程序Rxjs 響應(yīng)式編程-第四章 構(gòu)建完...
摘要:本文是響應(yīng)式編程第四章構(gòu)建完整的應(yīng)用程序這篇文章的學(xué)習(xí)筆記。涉及的運(yùn)算符每隔指定時(shí)間將流中的數(shù)據(jù)以數(shù)組形式推送出去。中提供了一種叫做異步管道的模板語法,可以直接在的微語法中使用可觀測對象示例五一點(diǎn)建議一定要好好讀官方文檔。 本文是【Rxjs 響應(yīng)式編程-第四章 構(gòu)建完整的Web應(yīng)用程序】這篇文章的學(xué)習(xí)筆記。示例代碼托管在:http://www.github.com/dashnoword...
摘要:我們將使用,這是一個(gè)現(xiàn)代,簡單,漂亮的框架,在內(nèi)部使用并將響應(yīng)式編程概念應(yīng)用于前端編程。驅(qū)動程序采用從我們的應(yīng)用程序發(fā)出數(shù)據(jù)的,它們返回另一個(gè)導(dǎo)致副作用的。我們將使用來呈現(xiàn)我們的應(yīng)用程序。僅采用長度超過兩個(gè)字符的文本。 Rxjs 響應(yīng)式編程-第一章:響應(yīng)式Rxjs 響應(yīng)式編程-第二章:序列的深入研究Rxjs 響應(yīng)式編程-第三章: 構(gòu)建并發(fā)程序Rxjs 響應(yīng)式編程-第四章 構(gòu)建完整的We...
摘要:響應(yīng)式編程具有很強(qiáng)的表現(xiàn)力,舉個(gè)例子來說,限制鼠標(biāo)重復(fù)點(diǎn)擊的例子。在響應(yīng)式編程中,我把鼠標(biāo)點(diǎn)擊事件作為一個(gè)我們可以查詢和操作的持續(xù)的流事件。這在響應(yīng)式編程中尤其重要,因?yàn)槲覀冸S著時(shí)間變換會產(chǎn)生很多狀態(tài)片段。迭代器模式的另一主要部分來自模式。 Rxjs 響應(yīng)式編程-第一章:響應(yīng)式Rxjs 響應(yīng)式編程-第二章:序列的深入研究Rxjs 響應(yīng)式編程-第三章: 構(gòu)建并發(fā)程序Rxjs 響應(yīng)式編程-...
閱讀 1164·2021-11-24 09:39
閱讀 3632·2021-09-02 15:21
閱讀 2173·2021-08-24 10:01
閱讀 734·2021-08-19 10:55
閱讀 2458·2019-08-30 15:55
閱讀 1218·2019-08-30 14:16
閱讀 3001·2019-08-29 15:17
閱讀 3242·2019-08-29 13:53