摘要:的讀寫(xiě)流模塊提供來(lái)創(chuàng)建行讀取流,即讀取文件的每一行作為持續(xù)的輸入數(shù)據(jù)模塊提供來(lái)創(chuàng)建寫(xiě)入流,其返回的有和兩個(gè)方法,來(lái)完成流式的寫(xiě)入與結(jié)束寫(xiě)入。
前幾天接到任務(wù)要使用第三方API處理幾千張圖片,得到結(jié)果集。我的做法就是使用Rx.js結(jié)合node的讀寫(xiě)流來(lái)完成數(shù)據(jù)讀入、接口請(qǐng)求、數(shù)據(jù)處理、數(shù)據(jù)寫(xiě)入這些操作。本篇就來(lái)分享這個(gè)代碼和其邏輯。
Rx.js是什么Rx.js是一個(gè)響應(yīng)式編程庫(kù),能簡(jiǎn)化事件/異步處理邏輯代碼。其視所有的事件/數(shù)據(jù)為_(kāi)_流__,提供各種流處理的operators,將輸入與輸出平滑的鏈接起來(lái)??梢灶?lèi)比為linux上的pipe操作符: ls | grep a*b | less。
Node的讀寫(xiě)流readline模塊提供readline.createInterface來(lái)創(chuàng)建行讀取流,即讀取文件的每一行作為持續(xù)的輸入數(shù)據(jù)
fs模塊提供fs.createWriteStream來(lái)創(chuàng)建寫(xiě)入流, 其返回的writer有write和end兩個(gè)方法,來(lái)完成流式的寫(xiě)入與結(jié)束寫(xiě)入。
第三方接口的使用情況并發(fā)數(shù)有限制,3個(gè)是出現(xiàn)其出現(xiàn)并發(fā)錯(cuò)誤概率最低的最大并發(fā)數(shù)
接口請(qǐng)求過(guò)于頻繁,會(huì)較大概率出現(xiàn)連續(xù)的并發(fā)錯(cuò)誤, 大概延遲400秒效果尚可
提供給第三方的圖片是鏈接,其需要服務(wù)器自己下載,會(huì)出現(xiàn)操作超時(shí)或者長(zhǎng)時(shí)間不返回的情況。
任務(wù)列表從文件讀取圖片文件名
拼接url
發(fā)送3個(gè)并發(fā)請(qǐng)求
請(qǐng)求出現(xiàn)超時(shí)問(wèn)題重試3次,最后如果失敗則放棄
出現(xiàn)非超時(shí)錯(cuò)誤(如并發(fā)錯(cuò)誤等)則一直重試,直到成功
請(qǐng)求成功后延遲400秒繼續(xù)發(fā)起下一個(gè)請(qǐng)求
處理返回的數(shù)據(jù)
寫(xiě)入文件
代碼分析 引入依賴(lài),創(chuàng)建讀取與寫(xiě)入流const https = require("https"); const querystring = require("querystring"); const Rx = require("rxjs"); const readline = require("readline"); const fs = require("fs"); const imgStream = readline.createInterface({ // 創(chuàng)建行讀取流 input: fs.createReadStream("filelist.txt") }); const writeStream = fs.createWriteStream("output.txt"); // 創(chuàng)建寫(xiě)入流使用Rx處理讀取并反饋結(jié)果給寫(xiě)入
Rx.Observable.fromEvent(imgStream, "line") // 將行讀取流轉(zhuǎn)化為Rx的事件流 .takeUntil(Rx.Observable.fromEvent(imgStream, "close")) // 讀取流截止時(shí)終止Rx流 .map(img => generateData(img)) // 將文件名處理成post的數(shù)據(jù) // 發(fā)起請(qǐng)求,并發(fā)3個(gè),請(qǐng)求返回后延遲400ms后再進(jìn)行下一步處理并發(fā)起下一個(gè)請(qǐng)求 .mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) .subscribe(data => { // 處理數(shù)據(jù)并寫(xiě)入文件 let str = data.url; if (data.status === 200 && data.data.xxx.length) { zzz = data.data.xxx.map(x => x.zzz); str += ` ${JSON.stringify(zzz)}`; } writeStream.write(`${str} `); }, (err) => { console.log(err); console.log("!!!!!!!!!!!ERROR!!!!!!!!!"); }, () => { console.log("=====complete======"); writeStream.end(); });
其中的需要關(guān)注的點(diǎn)在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,這里內(nèi)部requestAPI返回一個(gè)封裝了http異步請(qǐng)求并延遲400ms的Rx流,當(dāng)請(qǐng)求完成并延遲完成后將數(shù)據(jù)返回上一層繼續(xù)進(jìn)行處理(可以類(lèi)比為Promise的then)
使用Rx的自定義流封裝一個(gè)帶錯(cuò)誤重試機(jī)制的http請(qǐng)求const requestFacepp = dataStr => { const options = { hostname: "api.xxx.com", port: 443, path: "/xxx", method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Content-Length": Buffer.byteLength(dataStr) } }; const reqData = querystring.parse(dataStr); const retry$ = new Rx.Subject(); // 觸發(fā)重試的流,當(dāng)其發(fā)出數(shù)據(jù)時(shí)會(huì)使`retryWhen`觸發(fā)重試錯(cuò)誤流 let retryTimes = 3; // 設(shè)置非正常失敗(超時(shí))重試的上限 // 使用Rx的自定義流封裝一個(gè)帶錯(cuò)誤重試機(jī)制的http請(qǐng)求,可以類(lèi)比為new Promise // 但要注意的是Rx是流,即數(shù)據(jù)是可以持續(xù)的,而Promise則只有一個(gè)結(jié)果和狀態(tài) return Rx.Observable.create(observer => { const req = https.request(options, res => { let data = ""; res.setEncoding("utf8"); res.on("data", chunk => { data += chunk; }); res.on("end", () => { if (res.statusCode === 200) { // 請(qǐng)求正常返回,向流內(nèi)推送結(jié)果并結(jié)束 observer.next({ status: res.statusCode, url: reqData.image_url, data: JSON.parse(data) }); observer.complete(); } else { // 請(qǐng)求正常返回,但不是正常結(jié)果,拋出錯(cuò)誤并重試 console.log(`retring: ${reqData.image_url}`); observer.error({ status: res.statusCode, url: reqData.image_url }); retry$.next(true); } }); }); req.setTimeout(4000, () => { // 設(shè)置請(qǐng)求4s超時(shí),超時(shí)后終止,引發(fā)請(qǐng)求拋錯(cuò) req.abort(); }); req.on("error", err => { console.log(`retring(${retryTimes}): ${reqData.image_url}`); // 請(qǐng)求拋錯(cuò)時(shí)重試,超出次數(shù)則終止本次請(qǐng)求 observer.error(`error: ${err.message}`); if (retryTimes > 0) { retryTimes--; retry$.next(true); } else { retry$.complete(); } }); req.write(dataStr); req.end(); return () => { req.abort() }; // 返回終止流的處理回調(diào) }) .retryWhen(errs => errs.switchMap(err => { // 未超過(guò)次數(shù)返回重試流,超出則返回錯(cuò)誤數(shù)據(jù)并終止本次Rx流 return retryTimes > 0 ? retry$ : Rx.Observable.of({ status: 500, url: reqData.image_url }); })); };收尾
到此就搬磚完畢,開(kāi)個(gè)車(chē)讓他慢慢跑就可以了。
本篇展示了Rx在流數(shù)據(jù)處理與異步處理上的方式,邏輯與代碼都挺清晰、扁平。在處理交雜的邏輯時(shí)也不錯(cuò)(重試部分)。如果喜歡或者有幫助的話(huà)可以后面在發(fā)一篇Rx在復(fù)雜DOM事件處理上的應(yīng)用。;-)
本文始發(fā)于本人的公眾號(hào):楓之葉。公眾號(hào)二維碼
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/83331.html
摘要:端輸入數(shù)據(jù)到端,對(duì)就是輸入流,得到的對(duì)象就是可讀流對(duì)就是輸出端得到的對(duì)象是可寫(xiě)流。在中,這四種流都是的實(shí)例,它們都有事件,可讀流具有監(jiān)聽(tīng)數(shù)據(jù)到來(lái)的事件等,可寫(xiě)流則具有監(jiān)聽(tīng)數(shù)據(jù)已傳給低層系統(tǒng)的事件等,和都同時(shí)實(shí)現(xiàn)了和的事件和接口。 原文地址在我的博客 node中的Buffer和Stream會(huì)給剛接觸Node的前端工程師們帶來(lái)困惑,原因是前端并沒(méi)有類(lèi)似概念(or 有我們也沒(méi)意識(shí)到)。然而,...
摘要:中各種用于讀取數(shù)據(jù)的對(duì)象對(duì)象描述用于讀取文件代表客戶(hù)端請(qǐng)求或服務(wù)器端響應(yīng)代表一個(gè)端口對(duì)象用于創(chuàng)建子進(jìn)程的標(biāo)準(zhǔn)輸出流。如果子進(jìn)程和父進(jìn)程共享輸入輸出流,則子進(jìn)程的標(biāo)準(zhǔn)輸出流被廢棄用于創(chuàng)建子進(jìn)程的標(biāo)準(zhǔn)錯(cuò)誤輸出流。 9. stream流 fs模塊中集中文件讀寫(xiě)方法的區(qū)別 用途 使用異步方式 使用同步方式 將文件完整讀入緩存區(qū) readFile readFileSync 將文件部...
摘要:為了防止某些文檔或腳本加載別的域下的未知內(nèi)容,防止造成泄露隱私,破壞系統(tǒng)等行為發(fā)生。模式構(gòu)建函數(shù)響應(yīng)式前端架構(gòu)過(guò)程中學(xué)到的經(jīng)驗(yàn)?zāi)J降牟煌幵谟?,它主要?zhuān)注于恰當(dāng)?shù)貙?shí)現(xiàn)應(yīng)用程序狀態(tài)突變。嚴(yán)重情況下,會(huì)造成惡意的流量劫持等問(wèn)題。 今天是編輯周刊的日子。所以文章很多和周刊一樣。微信不能發(fā)鏈接,點(diǎn)了也木有用,所以請(qǐng)記得閱讀原文~ 發(fā)個(gè)動(dòng)圖娛樂(lè)下: 使用 SVG 動(dòng)畫(huà)制作游戲 使用 GASP ...
摘要:是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)。這是寫(xiě)數(shù)據(jù)成功與否的標(biāo)志。若要發(fā)出故障信號(hào),請(qǐng)用錯(cuò)誤對(duì)象調(diào)用回調(diào)函數(shù)。雙工流的可讀性和可寫(xiě)性操作完全獨(dú)立于彼此。這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象。 showImg(https://segmentfault.com/img/remote/1460000013228112?w=533&h=300); Streams 是一個(gè)數(shù)據(jù)集——和數(shù)組、字符串一樣。不...
摘要:內(nèi)部架構(gòu)上圖表示一個(gè)實(shí)例的組成部分部分緩沖數(shù)組內(nèi)部函數(shù)部分緩沖鏈表內(nèi)部函數(shù)實(shí)例必須實(shí)現(xiàn)的內(nèi)部函數(shù)以及系統(tǒng)提供的回調(diào)函數(shù)。有三個(gè)參數(shù),第一個(gè)為待處理的數(shù)據(jù),第二個(gè)為編碼,第三個(gè)為回調(diào)函數(shù)。 Transform流特性 在開(kāi)發(fā)中直接接觸Transform流的情況不是很多,往往是使用相對(duì)成熟的模塊或者封裝的API來(lái)完成流的處理,最為特殊的莫過(guò)于through2模塊和gulp流操作。那么,Tra...
閱讀 3546·2023-04-25 20:09
閱讀 3745·2022-06-28 19:00
閱讀 3066·2022-06-28 19:00
閱讀 3092·2022-06-28 19:00
閱讀 3185·2022-06-28 19:00
閱讀 2886·2022-06-28 19:00
閱讀 3057·2022-06-28 19:00
閱讀 2644·2022-06-28 19:00