成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

白潔血戰(zhàn)Node并發(fā)編程 - 預(yù)覽

Lemon_95 / 919人閱讀

摘要:在這些動作結(jié)束后,所有的隊(duì)列變化,就是整個組合任務(wù)狀態(tài)機(jī)的下一個狀態(tài)。如果組合狀態(tài)機(jī)停止了,向其中的任何一個對象執(zhí)行或者操作都可以讓整個狀態(tài)機(jī)繼續(xù)動起來。

預(yù)覽。

先給出一個基礎(chǔ)類代碼。

const EventEmitter = require("events")
const debug = require("debug")("transform")

class Transform extends EventEmitter {

  constructor (options) {
    super()
    this.concurrency = 1

    Object.assign(this, options)

    this.pending = []
    this.working = []
    this.finished = []
    this.failed = []

    this.ins = []
    this.outs = []
  }

  push (x) {
    this.pending.push(x)
    this.schedule()
  }

  pull () {
    let xs = this.finished
    this.finished = []
    this.schedule()
    return xs
  }

  isBlocked () {
    return !!this.failed.length ||              // blocked by failed
      !!this.finished.length ||                 // blocked by output buffer (lazy)
      this.outs.some(t => t.isBlocked())        // blocked by outputs transform
  }

  isStopped () {
    return !this.working.length && this.outs.every(t => t.isStopped())
  }

  root () {
    return this.ins.length === 0 ? this : this.ins[0].root()
  }

  pipe (next) {
    this.outs.push(next)
    next.ins.push(this)
    return next
  }

  print () {
    debug(this.name,
      this.pending.map(x => x.name),
      this.working.map(x => x.name),
      this.finished.map(x => x.name),
      this.failed.map(x => x.name),
      this.isStopped())
    this.outs.forEach(t => t.print())
  }

  schedule () {
    // stop working if blocked
    if (this.isBlocked()) return

    this.pending = this.ins.reduce((acc, t) => [...acc, ...t.pull()], this.pending)

    while (this.working.length < this.concurrency && this.pending.length) {
      let x = this.pending.shift()
      this.working.push(x)
      this.transform(x, (err, y) => {
        this.working.splice(this.working.indexOf(x), 1)
        if (err) {
          x.error = err
          this.failed.push(x)
        } else {
          if (this.outs.length) {
            this.outs.forEach(t => t.push(y))
          } else {
            if (this.root().listenerCount("data")) {
              this.root().emit("data", y)
            } else {
              this.finished.push(y)
            }
          }
        }

        this.schedule()
        this.root().emit("step", this.name, x.name)
      })
    }
  }

}

module.exports = Transform

這段代碼目前還是雛形。

Transform類的設(shè)計(jì)類似node里的stream.Transform,但是它的設(shè)計(jì)目的不是buffering或流性能,而是作為并發(fā)編程的基礎(chǔ)模塊。

如果你熟悉流式編程,Transform的設(shè)計(jì)就很容易理解;在內(nèi)部,Transform維護(hù)四個隊(duì)列:

pending是input buffer

working是當(dāng)前正在執(zhí)行的任務(wù)

finished是output buffer,它的目的不是為了buffer輸出,而是在沒有其他輸出辦法的時候作一下buffer。

failed是失敗的任務(wù)

Transform可以組合成DAG(Directed Acyclic Graph)使用,insouts用來存儲前置和后置Transform的引用,pipe方法負(fù)責(zé)設(shè)置這種雙向鏈接;最常見的情況是雙向鏈表,即insouts都只有一個對象。但把他們設(shè)計(jì)成數(shù)組就可以允許fan-in, fan-out的結(jié)構(gòu)。

pushpull是write和read的等價物。

schedule是核心函數(shù),它的任務(wù)是填充working隊(duì)列。在構(gòu)造函數(shù)的參數(shù)里應(yīng)該提供一個名字為transform的異步函數(shù),schedule使用這個函數(shù)運(yùn)行任務(wù),在運(yùn)行結(jié)束后,根據(jù)結(jié)果把任務(wù)推到failed隊(duì)列、推到下一個Transformer、用root節(jié)點(diǎn)的emit輸出、或者推到自己的finished隊(duì)列里。

Transform設(shè)計(jì)的核心思想,就是把并發(fā)任務(wù)的狀態(tài),不使用對象屬性來編碼,只使用隊(duì)列位置來編碼;任何一個子任務(wù),在任何時刻,僅存在于一個Transform對象的某個隊(duì)列中。換句話說,它等于把并發(fā)任務(wù)用資源來建模。如果你熟悉restful api對過程或狀態(tài)的建模方式就很容易理解這一點(diǎn)。

Transform中,任何transform異步函數(shù)的返回,都是一個step;step是用Transform實(shí)現(xiàn)并發(fā)組合的最重要概念;

每一次transform函數(shù)返回,都會發(fā)生改變自己的隊(duì)列或向后續(xù)的Transform對象push任務(wù)的動作,這個push動作會觸發(fā)后續(xù)Transformschedule方法;step結(jié)束時自己的schedule方法也會被調(diào)用,它會重新填充任務(wù)。在這些動作結(jié)束后,所有Transform的隊(duì)列變化,就是整個組合任務(wù)狀態(tài)機(jī)的下一個狀態(tài)。

這個狀態(tài)是顯式的,可以打印出來看,對debug非常有幫助;雖然異步i/o會讓這種狀態(tài)具有不確定性,但至少這里堅(jiān)持了組合狀態(tài)機(jī)模型在處理并發(fā)問題時的同步原則,每個step結(jié)束時整體做一次狀態(tài)遷移,這個狀態(tài)遷移可以良好定義和觀察,這是Event模型下并發(fā)編程和Thread模型的重要區(qū)別。后者遇到并發(fā)邏輯引起的微妙錯誤時,很難捕捉現(xiàn)場分析,因?yàn)槊恳粋€Thread是黑盒。

transform返回開始到emit(step)之間的一連串連鎖動作都是中間過程,最終實(shí)現(xiàn)一次完整的狀態(tài)遷移,這個過程必須是同步的。不應(yīng)在這里出現(xiàn)異步、setImmediate或者process.nextTick等調(diào)用,這會帶來額外的不確定因素和極難發(fā)現(xiàn)和修復(fù)的bug。

在前面很長一段時間的并發(fā)編程實(shí)踐中,我指出過Promise的race/settle和錯誤處理邏輯在一些場景下的困難。Promise的過程邏輯不完備。我也花了很多力氣試圖在Process代數(shù)層面上把error, success, finish, race, settle, abort, pause, resume, 和他們的組合邏輯定義出來,但最終發(fā)現(xiàn)這很困難,因?yàn)閷?shí)際編程中各種處理情況太多了。

所以在Transform的設(shè)計(jì)中,這些邏輯全部被拋棄了,因?yàn)槭聦?shí)上它們都不是真正的基礎(chǔ)并發(fā)邏輯。

Transform試圖實(shí)現(xiàn)組合的基礎(chǔ)并發(fā)邏輯只有一個:stopped。stopped的定義非常簡單:在一次step結(jié)束時,所有的Transformworking隊(duì)列為空,就是(整體的)stopped。這里要再次強(qiáng)調(diào)前述的step結(jié)束時同步方法的必要性,如果你在schedule里使用了異步方法調(diào)用,那么這個stopped的判斷就可能是錯的,因?yàn)?b>schedule可能會在event loop里放置了一個馬上就會產(chǎn)生新的working任務(wù)的動作,而isStopped()的判斷就錯了。

stopped時,整體組合狀態(tài)可能是success, error, paused, 等等,都不難判斷,但目前代碼尚未穩(wěn)定,我不打算加入語法糖。

在blocking i/o和同步的編程模式下,因果鏈和代碼書寫形式是一致的,但是在異步編程下,因果是異步和并發(fā)的,你只能去改變因,然后去觀察果,這是很多程序員不適應(yīng)異步編程的根本原因,因?yàn)樗淖兯季S的習(xí)慣。

使用Transform來處理并發(fā)編程,仍然是在試圖重建這個因果鏈,即使他們是并發(fā)的,但是我們要有一個辦法把他們串起來;

前面說到的isStopped()是觀察到的果,能夠影響它的因,是isBlocked()函數(shù),這個函數(shù)在schedule中被調(diào)用,如果估值為true,就會阻止schedule繼續(xù)向working隊(duì)列調(diào)度任務(wù)。

這里寫的isBlocked()的代碼實(shí)現(xiàn)只是一個例子;可以阻止schedule的原因可能有很多,比如出現(xiàn)錯誤,或者輸出buffer滿了,這些可以由實(shí)現(xiàn)者自己去定義。他們是policy,isBlocked()本身是mechanism。這個策略的粒度是每個Transform對象都可以有自己的策略。比如一個刪除臨時文件的操作,結(jié)果是無關(guān)痛癢的,那么它不該因?yàn)閑rror就block。

isBlocked()邏輯可以象示例代碼里那樣向下chain起來,即只要有后續(xù)任務(wù)block了,前置任務(wù)就該停下來;這在絕大多數(shù)情況下都是合理的邏輯。因?yàn)殡m然我們寫的是流式處理辦法,但是我們不是在處理octet-stream,追求性能的buffering和flow control都沒什么意義,如果前面任務(wù)在copy文件后面的任務(wù)要移動到目標(biāo)文件夾,如果目標(biāo)文件夾出了問題前面快速移動了大量文件最終也無法成功。

如果組合狀態(tài)機(jī)停止了,向其中的任何一個Transform對象執(zhí)行push或者pull操作都可以讓整個狀態(tài)機(jī)繼續(xù)動起來。從root節(jié)點(diǎn)push是常見情況,從leaf節(jié)點(diǎn)pull也是,向中間節(jié)點(diǎn)push也是可能的;

資源建模的一個好處是你可以把狀態(tài)呈現(xiàn)給用戶,如果一個復(fù)制文件的任務(wù)因?yàn)槲募麤_突而fail,你還可以讓用戶選擇處理策略,例如覆蓋或者重命名,在用戶選擇了操作之后,代碼會從某個Transform對象的failed隊(duì)列中取走一個對象,修改策略參數(shù)后重新push進(jìn)去,那么這個狀態(tài)機(jī)可以繼續(xù)執(zhí)行下去;這種可處理的錯誤不該成為block整個狀態(tài)機(jī)工作(復(fù)制其他文件和文件夾)的原因,除非他們積累到可觀的數(shù)量,在Transform模式下這些都非常容易實(shí)現(xiàn),開發(fā)者可以很簡單的編寫isBlocked()的策略;

和node的stream一樣,Transform是lazy的,純粹的push machine可能會在中間節(jié)點(diǎn)buffer大量的任務(wù),這對把任務(wù)作為流處理來說是不合適的;同時,Lazy對于停下來的組合狀態(tài)機(jī)能繼續(xù)run起來很重要,pull方法就是這個設(shè)計(jì)目的,它的schedule邏輯和push一樣,只是方向相反;如果設(shè)置了Leaf節(jié)點(diǎn)會因?yàn)檩敵鼍彌_而block,它就可以block整個狀態(tài)機(jī)(或者其中的一部分),這在某些情況下也是有用的功能,如果整個狀態(tài)機(jī)的輸出因?yàn)槟撤N原因暫時無法被立刻消費(fèi)掉。

abort邏輯沒有在代碼中實(shí)現(xiàn),但它很容易,可以遍歷所有的Transform,如果working隊(duì)列中的對象有abort方法,就調(diào)用它;這不是個立即的中止,該對象仍然要通過callback返回才能stop。如果要全局的block,可以把所有的Leaf Node都pipe到一個sink節(jié)點(diǎn)去,把這個sink節(jié)點(diǎn)強(qiáng)制設(shè)置成isBlocked,可以block全部。pauseresume也是非常類似的邏輯。

當(dāng)然你可能會遇到類似finally的邏輯是必須去執(zhí)行的,即使在發(fā)生錯誤的時候,它意味著這個Transform要向前傳遞isBlocked信息,但是它的Schedule方法不必停止工作。它可以一直運(yùn)行到把所有隊(duì)列任務(wù)都處理完為止。

重載schedule方法也是可能的;例如你的任務(wù)之間有前后依賴的邏輯,你就可以重載schedule方法實(shí)現(xiàn)自己的調(diào)度方式。另外這里的schedule代碼只基于transform函數(shù),很顯然如果transform本身是一個Transform對象它也應(yīng)該工作,實(shí)現(xiàn)組合過程,包括Sequencer,Parallel等等,這些都是需要實(shí)現(xiàn)的。

總而言之,isBlockedschedule是分開的邏輯,它們有各自不同的設(shè)計(jì)目的和使命,你可以重載它們獲得自己想要的結(jié)果。所以寫在這里的代碼,重要的不是他們的實(shí)現(xiàn),而是其機(jī)制設(shè)計(jì)和界面設(shè)計(jì),以及接口承諾;所有邏輯都是足夠原子化的,每個函數(shù)只做一件事,isBlocked是因,可以根據(jù)需要選擇策略,isStopped是果,通過step觀察和實(shí)現(xiàn)后續(xù)邏輯。應(yīng)該避免通過向基類添加新方法來擴(kuò)展能力,因?yàn)?b>Transform使用隊(duì)列和任務(wù)描述狀態(tài),這個描述是完備的,機(jī)制也是完善的。

就像我在另一篇介紹JavaScript語言的文章里寫的一樣,如果針對問題的模型具有完備性,即使抽象,也可以通過組合基本操作和概念獲得更多的特性,而不是在模型上增加概念,除非你認(rèn)為模型不夠完備。

軟件工程中不是什么地方都要上狀態(tài)機(jī)(automaton)這么嚴(yán)格的模型工具,項(xiàng)目軟件里寫到bug數(shù)量足夠低就可以了,但是如果你要寫系統(tǒng)軟件或者對正確性有苛刻要求的東西,如果你沒有用狀態(tài)機(jī)建模,那么實(shí)際上你沒有完備設(shè)計(jì)。

當(dāng)然有了完備設(shè)計(jì)也不意味著軟件沒bug了,但一個好的設(shè)計(jì)可以讓你對問題的理解、遇到問題時找到原因,有極大的幫助。

在復(fù)雜系統(tǒng)中,上述的同步方法狀態(tài)機(jī)組合,和Hierarchical的狀態(tài)機(jī)組合,是我們目前已知的兩種具有完備性的模型方法。但是兩者不同。雖然Transform的組合看起來是一個Hierarchy,但是它就像你在紙上畫一棵樹,它仍然是二維的,每個step的整體狀態(tài)聯(lián)動的遷移只是在populate一次狀態(tài)遷移的范圍,并不是幾何級數(shù)的增加狀態(tài)組合;所以我們?nèi)匀豢梢詷?gòu)筑一個線性的因果鏈,每個step因果因果這樣的繼續(xù)下去,和沒有并發(fā)的狀態(tài)機(jī)是一樣。

本質(zhì)上這是數(shù)學(xué)歸納法:如果我們能證明如果n正確,那么n+1是正確的,這就可以證明chain下去的狀態(tài)組合即使是無窮也是正確的。

第二段代碼是使用的一個示例,這個class沒有必要,是為了保證和老代碼接口兼容,因?yàn)橛幸恍╉?xiàng)目內(nèi)其他代碼的依賴性就不解釋了,很容易看明白大概邏輯;列在這里只是展示一下Transform使用時pipe過程的代碼樣子。

const Promise = require("bluebird")
const path = require("path")
const fs = Promise.promisifyAll(require("fs"))
const EventEmitter = require("events")
const debug = require("debug")("dircopy")
const rimraf = require("rimraf")

const Transform = require("../lib/transform")
const { forceXstat } = require("../lib/xstat")
const fileCopy = require("./filecopy")

class DirCopy extends EventEmitter {

  constructor (src, tmp, files, getDirPath) {
    super()

    let dst = getDirPath()
    let pipe = new Transform({
      name: "copy",
      concurrency: 4,
      transform: (x, callback) =>
        (x.abort = fileCopy(path.join(src, x.name), path.join(tmp, x.name),
          (err, fingerprint) => {
            delete x.abort
            if (err) {
              callback(err)
            } else {
              callback(null, (x.fingerprint = fingerprint, x))
            }
          }))
    }).pipe(new Transform({
      name: "stamp",
      transform: (x, callback) =>
        forceXstat(path.join(tmp, x.name), { hash: x.fingerprint },
          (err, xstat) => err
            ? callback(err)
            : callback(null, (x.uuid = xstat.uuid, x)))
    })).pipe(new Transform({
      name: "move",
      transform: (x, callback) =>
        fs.link(path.join(tmp, x.name), path.join(dst, x.name), err => err
          ? callback(err)
          : callback(null, x))
    })).pipe(new Transform({
      name: "remove",
      transform: (x, callback) => rimraf(path.join(tmp, x.name), () => callback(null))
    })).root()

    let count = 0

    // drain data
    pipe.on("data", data => this.emit("data", data))
    pipe.on("step", (tname, xname) => {
      debug("------------------------------------------")
      debug(`step ${count++}`, tname, xname)
      pipe.print()
      if (pipe.isStopped()) this.emit("stopped")
    })

    files.forEach(name => pipe.push({ name }))
    pipe.print()
    this.pipe = pipe
  }

}

module.exports = DirCopy

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/87172.html

相關(guān)文章

  • 白潔血戰(zhàn)Node.js并發(fā)編程 01 狀態(tài)機(jī)

    摘要:狀態(tài)機(jī)狀態(tài)機(jī)是模型層面的概念,與編程語言無關(guān)。狀態(tài)機(jī)具有良好的可實(shí)現(xiàn)性和可測試性。在代碼里,這是一個,但是我們在狀態(tài)機(jī)模型中要把他理解為事件。 這一篇是這個系列的開篇,沒有任何高級內(nèi)容,就講講狀態(tài)機(jī)。 狀態(tài)機(jī) 狀態(tài)機(jī)是模型層面的概念,與編程語言無關(guān)。它的目的是為對象行為建模,屬于設(shè)計(jì)范疇。它的基礎(chǔ)概念是狀態(tài)(state)和事件(event)。 對象的內(nèi)部結(jié)構(gòu)描述為一組狀態(tài)S1, S2,...

    fjcgreat 評論0 收藏0
  • 2017-08-10 前端日報

    摘要:前端日報精選譯用搭建探索生命周期中的匿名遞歸瀏覽器端機(jī)器智能框架深入理解筆記和屬性中文上海線下活動前端工程化架構(gòu)實(shí)踐滬江技術(shù)沙龍掘金周二放送追加視頻知乎專欄第期聊一聊前端自動化測試上雙關(guān)語來自前端的小段子,你看得懂嗎眾成翻 2017-08-10 前端日報 精選 [譯] 用 Node.js 搭建 API Gateway探索 Service Worker 「生命周期」JavaScript ...

    wupengyu 評論0 收藏0
  • 2017-10-05 前端日報

    摘要:前端日報精選瀏覽器渲染過程與性能優(yōu)化新版采用新引擎,速度是舊版的倍,名字和也變了中文與的使用個人文章在對比中理解掘金白潔血戰(zhàn)并發(fā)編程異步英文 2017-10-05 前端日報 精選 瀏覽器渲染過程與性能優(yōu)化Firefox 新版采用新引擎,速度是舊版的 2 倍,名字和 Logo 也變了8 Key React Component DecisionsThe Intl.PluralRules A...

    tracy 評論0 收藏0
  • 和少婦白潔一起學(xué)JavaScript之Async/Await

    摘要:匿名函數(shù)是我們喜歡的一個重要原因,也是,它們分別消除了很多代碼細(xì)節(jié)上需要命名變量名或函數(shù)名的需要。這個匿名函數(shù)內(nèi),有更多的操作,根據(jù)的結(jié)果針對目錄和文件做了不同處理,而且有遞歸。 能和微博上的 @響馬 (fibjs作者)掰扯這個問題是我的榮幸。 事情緣起于知乎上的一個熱貼,諸神都發(fā)表了意見: https://www.zhihu.com/questio... 這一篇不是要說明白什么是as...

    Bryan 評論0 收藏0
  • 和少婦白潔一起學(xué)JavaScript之Async/Await II

    摘要:的科學(xué)定義是或者,它的標(biāo)志性原語是。能解決一類對語言的實(shí)現(xiàn)來說特別無力的狀態(tài)機(jī)模型流程即狀態(tài)。容易實(shí)現(xiàn)是需要和的一個重要原因。 前面寫了一篇,寫的很粗,這篇講講一些細(xì)節(jié)。實(shí)際上Fiber/Coroutine vs Async/Await之爭不是一個簡單的continuation如何實(shí)現(xiàn)的問題,而是兩個完全不同的problem和solution domain。 Event Model 我...

    番茄西紅柿 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<