成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

Rx.js使用之結(jié)合node的讀寫(xiě)流進(jìn)行數(shù)據(jù)處理

Tecode / 2962人閱讀

摘要:的讀寫(xiě)流模塊提供來(lái)創(chuàng)建行讀取流,即讀取文件的每一行作為持續(xù)的輸入數(shù)據(jù)模塊提供來(lái)創(chuàng)建寫(xiě)入流,其返回的有和兩個(gè)方法,來(lái)完成流式的寫(xiě)入與結(jié)束寫(xiě)入。

前幾天接到任務(wù)要使用第三方API處理幾千張圖片,得到結(jié)果集。我的做法就是使用Rx.js結(jié)合node的讀寫(xiě)流來(lái)完成數(shù)據(jù)讀入、接口請(qǐng)求、數(shù)據(jù)處理、數(shù)據(jù)寫(xiě)入這些操作。本篇就來(lái)分享這個(gè)代碼和其邏輯。

Rx.js是什么

Rx.js是一個(gè)響應(yīng)式編程庫(kù),能簡(jiǎn)化事件/異步處理邏輯代碼。其視所有的事件/數(shù)據(jù)為_(kāi)_流__,提供各種流處理的operators,將輸入與輸出平滑的鏈接起來(lái)??梢灶?lèi)比為linux上的pipe操作符: ls | grep a*b | less。

Node的讀寫(xiě)流

readline模塊提供readline.createInterface來(lái)創(chuàng)建行讀取流,即讀取文件的每一行作為持續(xù)的輸入數(shù)據(jù)

fs模塊提供fs.createWriteStream來(lái)創(chuàng)建寫(xiě)入流, 其返回的writerwriteend兩個(gè)方法,來(lái)完成流式的寫(xiě)入與結(jié)束寫(xiě)入。

第三方接口的使用情況

并發(fā)數(shù)有限制,3個(gè)是出現(xiàn)其出現(xiàn)并發(fā)錯(cuò)誤概率最低的最大并發(fā)數(shù)

接口請(qǐng)求過(guò)于頻繁,會(huì)較大概率出現(xiàn)連續(xù)的并發(fā)錯(cuò)誤, 大概延遲400秒效果尚可

提供給第三方的圖片是鏈接,其需要服務(wù)器自己下載,會(huì)出現(xiàn)操作超時(shí)或者長(zhǎng)時(shí)間不返回的情況。

任務(wù)列表

從文件讀取圖片文件名

拼接url

發(fā)送3個(gè)并發(fā)請(qǐng)求

請(qǐng)求出現(xiàn)超時(shí)問(wèn)題重試3次,最后如果失敗則放棄

出現(xiàn)非超時(shí)錯(cuò)誤(如并發(fā)錯(cuò)誤等)則一直重試,直到成功

請(qǐng)求成功后延遲400秒繼續(xù)發(fā)起下一個(gè)請(qǐng)求

處理返回的數(shù)據(jù)

寫(xiě)入文件

代碼分析 引入依賴(lài),創(chuàng)建讀取與寫(xiě)入流
const https = require("https");
const querystring = require("querystring");
const Rx = require("rxjs");
const readline = require("readline");
const fs = require("fs");

const imgStream = readline.createInterface({  // 創(chuàng)建行讀取流
    input: fs.createReadStream("filelist.txt")
});

const writeStream = fs.createWriteStream("output.txt");  // 創(chuàng)建寫(xiě)入流
使用Rx處理讀取并反饋結(jié)果給寫(xiě)入
Rx.Observable.fromEvent(imgStream, "line")  // 將行讀取流轉(zhuǎn)化為Rx的事件流
.takeUntil(Rx.Observable.fromEvent(imgStream, "close"))  // 讀取流截止時(shí)終止Rx流
.map(img => generateData(img))  // 將文件名處理成post的數(shù)據(jù)
 // 發(fā)起請(qǐng)求,并發(fā)3個(gè),請(qǐng)求返回后延遲400ms后再進(jìn)行下一步處理并發(fā)起下一個(gè)請(qǐng)求
.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) 
.subscribe(data => {
    // 處理數(shù)據(jù)并寫(xiě)入文件
    let str = data.url;
    if (data.status === 200 && data.data.xxx.length) {
        zzz = data.data.xxx.map(x => x.zzz);
        str += `    ${JSON.stringify(zzz)}`;
    }
    writeStream.write(`${str}
`);
}, (err) => {
    console.log(err);
    console.log("!!!!!!!!!!!ERROR!!!!!!!!!");
}, () => {
    console.log("=====complete======");
    writeStream.end();
});

其中的需要關(guān)注的點(diǎn)在.mergeMap(data => requestAPI(data).delay(400), (o, i) => i, 3) ,這里內(nèi)部requestAPI返回一個(gè)封裝了http異步請(qǐng)求并延遲400ms的Rx流,當(dāng)請(qǐng)求完成并延遲完成后將數(shù)據(jù)返回上一層繼續(xù)進(jìn)行處理(可以類(lèi)比為Promisethen)

使用Rx的自定義流封裝一個(gè)帶錯(cuò)誤重試機(jī)制的http請(qǐng)求
const requestFacepp = dataStr => {
    const options = {
        hostname: "api.xxx.com",
        port: 443,
        path: "/xxx",
        method: "POST",
        headers: {
            "Content-Type": "application/x-www-form-urlencoded",
            "Content-Length": Buffer.byteLength(dataStr)
        }
    };
    const reqData = querystring.parse(dataStr);
    const retry$ = new Rx.Subject();  // 觸發(fā)重試的流,當(dāng)其發(fā)出數(shù)據(jù)時(shí)會(huì)使`retryWhen`觸發(fā)重試錯(cuò)誤流
    let retryTimes = 3;  // 設(shè)置非正常失敗(超時(shí))重試的上限

    // 使用Rx的自定義流封裝一個(gè)帶錯(cuò)誤重試機(jī)制的http請(qǐng)求,可以類(lèi)比為new Promise
    // 但要注意的是Rx是流,即數(shù)據(jù)是可以持續(xù)的,而Promise則只有一個(gè)結(jié)果和狀態(tài)
    return Rx.Observable.create(observer => {
        const req = https.request(options, res => {
            let data = "";
            res.setEncoding("utf8");
            res.on("data", chunk => {
                data += chunk;
            });
            res.on("end", () => {
                if (res.statusCode === 200) {
                    // 請(qǐng)求正常返回,向流內(nèi)推送結(jié)果并結(jié)束
                    observer.next({
                        status: res.statusCode,
                        url: reqData.image_url,
                        data: JSON.parse(data)
                    });
                    observer.complete();
                } else {
                    // 請(qǐng)求正常返回,但不是正常結(jié)果,拋出錯(cuò)誤并重試
                    console.log(`retring: ${reqData.image_url}`);
                    observer.error({
                        status: res.statusCode,
                        url: reqData.image_url
                    });
                    retry$.next(true);
                }
            });
        });

        req.setTimeout(4000, () => {
            // 設(shè)置請(qǐng)求4s超時(shí),超時(shí)后終止,引發(fā)請(qǐng)求拋錯(cuò)
            req.abort();
        });

        req.on("error", err => {
            console.log(`retring(${retryTimes}): ${reqData.image_url}`);
            // 請(qǐng)求拋錯(cuò)時(shí)重試,超出次數(shù)則終止本次請(qǐng)求
            observer.error(`error: ${err.message}`);
            if (retryTimes > 0) {
                retryTimes--;
                retry$.next(true);
            } else {
                retry$.complete();
            }
        });

        req.write(dataStr);

        req.end();
        return () => { req.abort() };  // 返回終止流的處理回調(diào)
    })
    .retryWhen(errs => errs.switchMap(err => {
        // 未超過(guò)次數(shù)返回重試流,超出則返回錯(cuò)誤數(shù)據(jù)并終止本次Rx流
        return retryTimes > 0 ? retry$ : Rx.Observable.of({
            status: 500,
            url: reqData.image_url
        });
    }));
};
收尾

到此就搬磚完畢,開(kāi)個(gè)車(chē)讓他慢慢跑就可以了。
本篇展示了Rx在流數(shù)據(jù)處理與異步處理上的方式,邏輯與代碼都挺清晰、扁平。在處理交雜的邏輯時(shí)也不錯(cuò)(重試部分)。如果喜歡或者有幫助的話(huà)可以后面在發(fā)一篇Rx在復(fù)雜DOM事件處理上的應(yīng)用。;-)

本文始發(fā)于本人的公眾號(hào):楓之葉。公眾號(hào)二維碼

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/83331.html

相關(guān)文章

  • 認(rèn)識(shí)node核心模塊--從Buffer、Stream到fs

    摘要:端輸入數(shù)據(jù)到端,對(duì)就是輸入流,得到的對(duì)象就是可讀流對(duì)就是輸出端得到的對(duì)象是可寫(xiě)流。在中,這四種流都是的實(shí)例,它們都有事件,可讀流具有監(jiān)聽(tīng)數(shù)據(jù)到來(lái)的事件等,可寫(xiě)流則具有監(jiān)聽(tīng)數(shù)據(jù)已傳給低層系統(tǒng)的事件等,和都同時(shí)實(shí)現(xiàn)了和的事件和接口。 原文地址在我的博客 node中的Buffer和Stream會(huì)給剛接觸Node的前端工程師們帶來(lái)困惑,原因是前端并沒(méi)有類(lèi)似概念(or 有我們也沒(méi)意識(shí)到)。然而,...

    TANKING 評(píng)論0 收藏0
  • Node.js學(xué)習(xí)路08——fs文件系統(tǒng)stream基本介紹

    摘要:中各種用于讀取數(shù)據(jù)的對(duì)象對(duì)象描述用于讀取文件代表客戶(hù)端請(qǐng)求或服務(wù)器端響應(yīng)代表一個(gè)端口對(duì)象用于創(chuàng)建子進(jìn)程的標(biāo)準(zhǔn)輸出流。如果子進(jìn)程和父進(jìn)程共享輸入輸出流,則子進(jìn)程的標(biāo)準(zhǔn)輸出流被廢棄用于創(chuàng)建子進(jìn)程的標(biāo)準(zhǔn)錯(cuò)誤輸出流。 9. stream流 fs模塊中集中文件讀寫(xiě)方法的區(qū)別 用途 使用異步方式 使用同步方式 將文件完整讀入緩存區(qū) readFile readFileSync 將文件部...

    BoYang 評(píng)論0 收藏0
  • 前端閱讀筆記 2016-11-25

    摘要:為了防止某些文檔或腳本加載別的域下的未知內(nèi)容,防止造成泄露隱私,破壞系統(tǒng)等行為發(fā)生。模式構(gòu)建函數(shù)響應(yīng)式前端架構(gòu)過(guò)程中學(xué)到的經(jīng)驗(yàn)?zāi)J降牟煌幵谟?,它主要?zhuān)注于恰當(dāng)?shù)貙?shí)現(xiàn)應(yīng)用程序狀態(tài)突變。嚴(yán)重情況下,會(huì)造成惡意的流量劫持等問(wèn)題。 今天是編輯周刊的日子。所以文章很多和周刊一樣。微信不能發(fā)鏈接,點(diǎn)了也木有用,所以請(qǐng)記得閱讀原文~ 發(fā)個(gè)動(dòng)圖娛樂(lè)下: 使用 SVG 動(dòng)畫(huà)制作游戲 使用 GASP ...

    KoreyLee 評(píng)論0 收藏0
  • 初識(shí) Node Stream

    摘要:是在完成處理數(shù)據(jù)塊后需要調(diào)用的函數(shù)。這是寫(xiě)數(shù)據(jù)成功與否的標(biāo)志。若要發(fā)出故障信號(hào),請(qǐng)用錯(cuò)誤對(duì)象調(diào)用回調(diào)函數(shù)。雙工流的可讀性和可寫(xiě)性操作完全獨(dú)立于彼此。這僅僅是將兩個(gè)特性組合成一個(gè)對(duì)象。 showImg(https://segmentfault.com/img/remote/1460000013228112?w=533&h=300); Streams 是一個(gè)數(shù)據(jù)集——和數(shù)組、字符串一樣。不...

    fobnn 評(píng)論0 收藏0
  • 深入nodeTransform

    摘要:內(nèi)部架構(gòu)上圖表示一個(gè)實(shí)例的組成部分部分緩沖數(shù)組內(nèi)部函數(shù)部分緩沖鏈表內(nèi)部函數(shù)實(shí)例必須實(shí)現(xiàn)的內(nèi)部函數(shù)以及系統(tǒng)提供的回調(diào)函數(shù)。有三個(gè)參數(shù),第一個(gè)為待處理的數(shù)據(jù),第二個(gè)為編碼,第三個(gè)為回調(diào)函數(shù)。 Transform流特性 在開(kāi)發(fā)中直接接觸Transform流的情況不是很多,往往是使用相對(duì)成熟的模塊或者封裝的API來(lái)完成流的處理,最為特殊的莫過(guò)于through2模塊和gulp流操作。那么,Tra...

    williamwen1986 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<