成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

以太坊源碼分析:fetcher模塊和區(qū)塊傳播

biaoxiaoduan / 758人閱讀

摘要:當(dāng)前代碼是以太坊,如果版本不同,代碼上可能存在差異。非產(chǎn)生區(qū)塊節(jié)點的策略圖,如圖,黃色節(jié)點將區(qū)塊傳播給青色節(jié)點至此,可以看出以太坊采用以石擊水的方式,像水紋一樣,層層擴散新產(chǎn)生的區(qū)塊。

前言

這篇文章從區(qū)塊傳播策略入手,介紹新區(qū)塊是如何傳播到遠(yuǎn)端節(jié)點,以及新區(qū)塊加入到遠(yuǎn)端節(jié)點本地鏈的過程,同時會介紹fetcher模塊,fetcher的功能是處理Peer通知的區(qū)塊信息。在介紹過程中,還會涉及到p2p,eth等模塊,不會專門介紹,而是專注區(qū)塊的傳播和加入?yún)^(qū)塊鏈的過程。

當(dāng)前代碼是以太坊Release 1.8,如果版本不同,代碼上可能存在差異。

總體過程和傳播策略

本節(jié)從宏觀角度介紹,節(jié)點產(chǎn)生區(qū)塊后,為了傳播給遠(yuǎn)端節(jié)點做了啥,遠(yuǎn)端節(jié)點收到區(qū)塊后又做了什么,每個節(jié)點都連接了很多Peer,它傳播的策略是什么樣的?

總體流程和策略可以總結(jié)為,傳播給遠(yuǎn)端Peer節(jié)點,Peer驗證區(qū)塊無誤后,加入到本地區(qū)塊鏈,繼續(xù)傳播新區(qū)塊信息。具體過程如下。

先看總體過程。產(chǎn)生區(qū)塊后,miner模塊會發(fā)布一個事件NewMinedBlockEvent,訂閱事件的協(xié)程收到事件后,就會把新區(qū)塊的消息,廣播給它所有的peer,peer收到消息后,會交給自己的fetcher模塊處理,fetcher進(jìn)行基本的驗證后,區(qū)塊沒問題,發(fā)現(xiàn)這個區(qū)塊就是本地鏈需要的下一個區(qū)塊,則交給blockChain進(jìn)一步進(jìn)行完整的驗證,這個過程會執(zhí)行區(qū)塊所有的交易,無誤后把區(qū)塊加入到本地鏈,寫入數(shù)據(jù)庫,這個過程就是下面的流程圖,圖1。

總體流程圖,能看到有個分叉,是因為節(jié)點傳播新區(qū)塊是有策略的。它的傳播策略為:

假如節(jié)點連接了N個Peer,它只向Peer列表的sqrt(N)個Peer廣播完整的區(qū)塊消息。

向所有的Peer廣播只包含區(qū)塊Hash的消息。

策略圖的效果如圖2,紅色節(jié)點將區(qū)塊傳播給黃色節(jié)點:

收到區(qū)塊Hash的節(jié)點,需要從發(fā)送給它消息的Peer那里獲取對應(yīng)的完整區(qū)塊,獲取區(qū)塊后就會按照圖1的流程,加入到fetcher隊列,最終插入本地區(qū)塊鏈后,將區(qū)塊的Hash值廣播給和它相連,但還不知道這個區(qū)塊的Peer。非產(chǎn)生區(qū)塊節(jié)點的策略圖,如圖3,黃色節(jié)點將區(qū)塊Hash傳播給青色節(jié)點:

至此,可以看出以太坊采用以石擊水的方式,像水紋一樣,層層擴散新產(chǎn)生的區(qū)塊。

Fetcher模塊是干啥的

fetcher模塊的功能,就是收集其他Peer通知它的區(qū)塊信息:1)完整的區(qū)塊2)區(qū)塊Hash消息。根據(jù)通知的消息,獲取完整的區(qū)塊,然后傳遞給eth模塊把區(qū)塊插入?yún)^(qū)塊鏈。

如果是完整區(qū)塊,就可以傳遞給eth插入?yún)^(qū)塊,如果只有區(qū)塊Hash,則需要從其他的Peer獲取此完整的區(qū)塊,然后再傳遞給eth插入?yún)^(qū)塊。

源碼解讀

本節(jié)介紹區(qū)塊傳播和處理的細(xì)節(jié)東西,方式仍然是先用圖解釋流程,再是代碼流程。

產(chǎn)塊節(jié)點的傳播新區(qū)塊

節(jié)點產(chǎn)生區(qū)塊后,廣播的流程可以表示為圖4:

發(fā)布事件

事件處理函數(shù)選擇要廣播完整的Peer,然后將區(qū)塊加入到它們的隊列

事件處理函數(shù)把區(qū)塊Hash添加到所有Peer的另外一個通知隊列

每個Peer的廣播處理函數(shù),會遍歷它的待廣播區(qū)塊隊列和通知隊列,把數(shù)據(jù)封裝成消息,調(diào)用P2P接口發(fā)送出去

再看下代碼上的細(xì)節(jié)。

worker.wait()函數(shù)發(fā)布事件NewMinedBlockEvent。

ProtocolManager.minedBroadcastLoop()是事件處理函數(shù)。它調(diào)用了2次pm.BroadcastBlock()。

// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
    // automatically stops if unsubscribe
    for obj := range pm.minedBlockSub.Chan() {
        switch ev := obj.Data.(type) {
        case core.NewMinedBlockEvent:
            pm.BroadcastBlock(ev.Block, true)  // First propagate block to peers
            pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
        }
    }
}

pm.BroadcastBlock()的入?yún)?b>propagate為真時,向部分Peer廣播完整的區(qū)塊,調(diào)用peer.AsyncSendNewBlock(),否則向所有Peer廣播區(qū)塊頭,調(diào)用peer.AsyncSendNewBlockHash(),這2個函數(shù)就是把數(shù)據(jù)放入隊列,此處不再放代碼。

// BroadcastBlock will either propagate a block to a subset of it"s peers, or
// will only announce it"s availability (depending what"s requested).
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
    hash := block.Hash()
    peers := pm.peers.PeersWithoutBlock(hash)

    // If propagation is requested, send to a subset of the peer
    // 這種情況,要把區(qū)塊廣播給部分peer
    if propagate {
        // Calculate the TD of the block (it"s not imported yet, so block.Td is not valid)
        // 計算新的總難度
        var td *big.Int
        if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
            td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
        } else {
            log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
            return
        }
        // Send the block to a subset of our peers
        // 廣播區(qū)塊給部分peer
        transfer := peers[:int(math.Sqrt(float64(len(peers))))]
        for _, peer := range transfer {
            peer.AsyncSendNewBlock(block, td)
        }
        log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
        return
    }
    // Otherwise if the block is indeed in out own chain, announce it
    // 把區(qū)塊hash值廣播給所有peer
    if pm.blockchain.HasBlock(hash, block.NumberU64()) {
        for _, peer := range peers {
            peer.AsyncSendNewBlockHash(block)
        }
        log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
    }
}

peer.broadcase()是每個Peer連接的廣播函數(shù),它只廣播3種消息:交易、完整的區(qū)塊、區(qū)塊的Hash,這樣表明了節(jié)點只會主動廣播這3中類型的數(shù)據(jù),剩余的數(shù)據(jù)同步,都是通過請求-響應(yīng)的方式。

// broadcast is a write loop that multiplexes block propagations, announcements
// and transaction broadcasts into the remote peer. The goal is to have an async
// writer that does not lock up node internals.
func (p *peer) broadcast() {
    for {
        select {
        // 廣播交易
        case txs := <-p.queuedTxs:
            if err := p.SendTransactions(txs); err != nil {
                return
            }
            p.Log().Trace("Broadcast transactions", "count", len(txs))
        // 廣播完整的新區(qū)塊
        case prop := <-p.queuedProps:
            if err := p.SendNewBlock(prop.block, prop.td); err != nil {
                return
            }
            p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)

        // 廣播區(qū)塊Hash
        case block := <-p.queuedAnns:
            if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
                return
            }
            p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())

        case <-p.term:
            return
        }
    }
}

Peer節(jié)點處理新區(qū)塊

本節(jié)介紹遠(yuǎn)端節(jié)點收到2種區(qū)塊同步消息的處理,其中NewBlockMsg的處理流程比較清晰,也簡潔。NewBlockHashesMsg消息的處理就繞了2繞,從總體流程圖1上能看出來,它需要先從給他發(fā)送消息Peer那里獲取到完整的區(qū)塊,剩下的流程和NewBlockMsg又一致了。

這部分涉及的模塊多,畫出來有種眼花繚亂的感覺,但只要抓住上面的主線,代碼看起來還是很清晰的。通過圖5先看下整體流程。

消息處理的起點是ProtocolManager.handleMsg,NewBlockMsg的處理流程是藍(lán)色標(biāo)記的區(qū)域,紅色區(qū)域是多帶帶的協(xié)程,是fetcher處理隊列中區(qū)塊的流程,如果從隊列中取出的區(qū)塊是當(dāng)前鏈需要的,校驗后,調(diào)用blockchian.InsertChain()把區(qū)塊插入到區(qū)塊鏈,最后寫入數(shù)據(jù)庫,這是黃色部分。最后,綠色部分是NewBlockHashesMsg的處理流程,代碼流程上是比較復(fù)雜的,為了能通過圖描述整體流程,我把它簡化掉了。

仔細(xì)看看這幅圖,掌握整體的流程后,接下來看每個步驟的細(xì)節(jié)。

NewBlockMsg的處理

本節(jié)介紹節(jié)點收到完整區(qū)塊的處理,流程如下:

首先進(jìn)行RLP編解碼,然后標(biāo)記發(fā)送消息的Peer已經(jīng)知道這個區(qū)塊,這樣本節(jié)點最后廣播這個區(qū)塊的Hash時,不會再發(fā)送給該Peer。

將區(qū)塊存入到fetcher的隊列,調(diào)用fetcher.Enqueue。

更新Peer的Head位置,然后判斷本地鏈?zhǔn)欠衤浜笥赑eer的鏈,如果是,則通過Peer更新本地鏈。

只看handle.Msg()NewBlockMsg相關(guān)的部分。

case msg.Code == NewBlockMsg:
    // Retrieve and decode the propagated block
    // 收到新區(qū)塊,解碼,賦值接收數(shù)據(jù)
    var request newBlockData
    if err := msg.Decode(&request); err != nil {
        return errResp(ErrDecode, "%v: %v", msg, err)
    }
    request.Block.ReceivedAt = msg.ReceivedAt
    request.Block.ReceivedFrom = p

    // Mark the peer as owning the block and schedule it for import
    // 標(biāo)記peer知道這個區(qū)塊
    p.MarkBlock(request.Block.Hash())
    // 為啥要如隊列?已經(jīng)得到完整的區(qū)塊了
    // 答:存入fetcher的優(yōu)先級隊列,fetcher會從隊列中選取當(dāng)前高度需要的塊
    pm.fetcher.Enqueue(p.id, request.Block)

    // Assuming the block is importable by the peer, but possibly not yet done so,
    // calculate the head hash and TD that the peer truly must have.
    // 截止到parent區(qū)塊的頭和難度
    var (
        trueHead = request.Block.ParentHash()
        trueTD   = new(big.Int).Sub(request.TD, request.Block.Difficulty())
    )
    // Update the peers total difficulty if better than the previous
    // 如果收到的塊的難度大于peer之前的,以及自己本地的,就去和這個peer同步
    // 問題:就只用了一下塊里的hash指,為啥不直接使用這個塊呢,如果這個塊不能用,干嘛不少發(fā)送些數(shù)據(jù),減少網(wǎng)絡(luò)負(fù)載呢。
    // 答案:實際上,這個塊加入到了優(yōu)先級隊列中,當(dāng)fetcher的loop檢查到當(dāng)前下一個區(qū)塊的高度,正是隊列中有的,則不再向peer請求
    // 該區(qū)塊,而是直接使用該區(qū)塊,檢查無誤后交給block chain執(zhí)行insertChain
    if _, td := p.Head(); trueTD.Cmp(td) > 0 {
        p.SetHead(trueHead, trueTD)

        // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
        // a singe block (as the true TD is below the propagated block), however this
        // scenario should easily be covered by the fetcher.
        currentBlock := pm.blockchain.CurrentBlock()
        if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
            go pm.synchronise(p)
        }
    }
//------------------------ 以上 handleMsg

// Enqueue tries to fill gaps the the fetcher"s future import queue.
// 發(fā)給inject通道,當(dāng)前協(xié)程在handleMsg,通過通道發(fā)送給fetcher的協(xié)程處理
func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
    op := &inject{
        origin: peer,
        block:  block,
    }
    select {
    case f.inject <- op:
        return nil
    case <-f.quit:
        return errTerminated
    }
}

//------------------------ 以下 fetcher.loop處理inject部分
case op := <-f.inject:
    // A direct block insertion was requested, try and fill any pending gaps
    // 區(qū)塊加入隊列,首先也填入未決的間距
    propBroadcastInMeter.Mark(1)
    f.enqueue(op.origin, op.block)

//------------------------  如隊列函數(shù)

// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
// 把導(dǎo)入的新區(qū)塊放進(jìn)來
func (f *Fetcher) enqueue(peer string, block *types.Block) {
    hash := block.Hash()

    // Ensure the peer isn"t DOSing us
    // 防止peer的DOS攻擊
    count := f.queues[peer] + 1
    if count > blockLimit {
        log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
        propBroadcastDOSMeter.Mark(1)
        f.forgetHash(hash)
        return
    }
    // Discard any past or too distant blocks
    // 高度檢查:未來太遠(yuǎn)的塊丟棄
    if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
        log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
        propBroadcastDropMeter.Mark(1)
        f.forgetHash(hash)
        return
    }
    // Schedule the block for future importing
    // 塊先加入優(yōu)先級隊列,加入鏈之前,還有很多要做
    if _, ok := f.queued[hash]; !ok {
        op := &inject{
            origin: peer,
            block:  block,
        }
        f.queues[peer] = count
        f.queued[hash] = op
        f.queue.Push(op, -float32(block.NumberU64()))
        if f.queueChangeHook != nil {
            f.queueChangeHook(op.block.Hash(), true)
        }
        log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size())
    }
}
fetcher隊列處理

本節(jié)我們看看,區(qū)塊加入隊列后,fetcher如何處理區(qū)塊,為何不直接校驗區(qū)塊,插入到本地鏈?

由于以太坊又Uncle的機制,節(jié)點可能收到老一點的一些區(qū)塊。另外,節(jié)點可能由于網(wǎng)絡(luò)原因,落后了幾個區(qū)塊,所以可能收到“未來”的一些區(qū)塊,這些區(qū)塊都不能直接插入到本地鏈。

區(qū)塊入的隊列是一個優(yōu)先級隊列,高度低的區(qū)塊會被優(yōu)先取出來。fetcher.loop是多帶帶協(xié)程,不斷運轉(zhuǎn),清理fecther中的事務(wù)和事件。首先會清理正在fetching的區(qū)塊,但已經(jīng)超時。然后處理優(yōu)先級隊列中的區(qū)塊,判斷高度是否是下一個區(qū)塊,如果是則調(diào)用f.insert()函數(shù),校驗后調(diào)用BlockChain.InsertChain(),成功插入后,廣播新區(qū)塊的Hash。

// Loop is the main fetcher loop, checking and processing various notification
// events.
func (f *Fetcher) loop() {
    // Iterate the block fetching until a quit is requested
    fetchTimer := time.NewTimer(0)
    completeTimer := time.NewTimer(0)

    for {
        // Clean up any expired block fetches
        // 清理過期的區(qū)塊
        for hash, announce := range f.fetching {
            if time.Since(announce.time) > fetchTimeout {
                f.forgetHash(hash)
            }
        }
        // Import any queued blocks that could potentially fit
        // 導(dǎo)入隊列中合適的塊
        height := f.chainHeight()
        for !f.queue.Empty() {
            op := f.queue.PopItem().(*inject)
            hash := op.block.Hash()
            if f.queueChangeHook != nil {
                f.queueChangeHook(hash, false)
            }
            // If too high up the chain or phase, continue later
            // 塊不是鏈需要的下一個塊,再入優(yōu)先級隊列,停止循環(huán)
            number := op.block.NumberU64()
            if number > height+1 {
                f.queue.Push(op, -float32(number))
                if f.queueChangeHook != nil {
                    f.queueChangeHook(hash, true)
                }
                break
            }
            // Otherwise if fresh and still unknown, try and import
            // 高度正好是我們想要的,并且鏈上也沒有這個塊
            if number+maxUncleDist < height || f.getBlock(hash) != nil {
                f.forgetBlock(hash)
                continue
            }
            // 那么,塊插入鏈
            f.insert(op.origin, op.block)
        }
        
        //省略
    }
}
func (f *Fetcher) insert(peer string, block *types.Block) {
    hash := block.Hash()

    // Run the import on a new thread
    log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
    go func() {
        defer func() { f.done <- hash }()

        // If the parent"s unknown, abort insertion
        parent := f.getBlock(block.ParentHash())
        if parent == nil {
            log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
            return
        }
        // Quickly validate the header and propagate the block if it passes
        // 驗證區(qū)塊頭,成功后廣播區(qū)塊
        switch err := f.verifyHeader(block.Header()); err {
        case nil:
            // All ok, quickly propagate to our peers
            propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
            go f.broadcastBlock(block, true)

        case consensus.ErrFutureBlock:
            // Weird future block, don"t fail, but neither propagate

        default:
            // Something went very wrong, drop the peer
            log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
            f.dropPeer(peer)
            return
        }
        // Run the actual import and log any issues
        // 調(diào)用回調(diào)函數(shù),實際是blockChain.insertChain
        if _, err := f.insertChain(types.Blocks{block}); err != nil {
            log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
            return
        }
        // If import succeeded, broadcast the block
        propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
        go f.broadcastBlock(block, false)

        // Invoke the testing hook if needed
        if f.importedHook != nil {
            f.importedHook(block)
        }
    }()
}
NewBlockHashesMsg的處理

本節(jié)介紹NewBlockHashesMsg的處理,其實,消息處理是簡單的,而復(fù)雜一點的是從Peer哪獲取完整的區(qū)塊,下節(jié)再看。

流程如下:

對消息進(jìn)行RLP解碼,然后標(biāo)記Peer已經(jīng)知道此區(qū)塊。

尋找出本地區(qū)塊鏈不存在的區(qū)塊Hash值,把這些未知的Hash通知給fetcher。

fetcher.Notify記錄好通知信息,塞入notify通道,以便交給fetcher的協(xié)程。

fetcher.loop()會對notify中的消息進(jìn)行處理,確認(rèn)區(qū)塊并非DOS攻擊,然后檢查區(qū)塊的高度,判斷該區(qū)塊是否已經(jīng)在fetching或者comleting(代表已經(jīng)下載區(qū)塊頭,在下載body),如果都沒有,則加入到announced中,觸發(fā)0s定時器,進(jìn)行處理。

關(guān)于announced下節(jié)再介紹。

// handleMsg()部分
case msg.Code == NewBlockHashesMsg:
    var announces newBlockHashesData
    if err := msg.Decode(&announces); err != nil {
        return errResp(ErrDecode, "%v: %v", msg, err)
    }
    // Mark the hashes as present at the remote node
    for _, block := range announces {
        p.MarkBlock(block.Hash)
    }
    // Schedule all the unknown hashes for retrieval
    // 把本地鏈沒有的塊hash找出來,交給fetcher去下載
    unknown := make(newBlockHashesData, 0, len(announces))
    for _, block := range announces {
        if !pm.blockchain.HasBlock(block.Hash, block.Number) {
            unknown = append(unknown, block)
        }
    }
    for _, block := range unknown {
        pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
    }
// Notify announces the fetcher of the potential availability of a new block in
// the network.
// 通知fetcher(自己)有新塊產(chǎn)生,沒有塊實體,有hash、高度等信息
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
    headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
    block := &announce{
        hash:        hash,
        number:      number,
        time:        time,
        origin:      peer,
        fetchHeader: headerFetcher,
        fetchBodies: bodyFetcher,
    }
    select {
    case f.notify <- block:
        return nil
    case <-f.quit:
        return errTerminated
    }
}
// fetcher.loop()的notify通道消息處理
case notification := <-f.notify:
    // A block was announced, make sure the peer isn"t DOSing us
    propAnnounceInMeter.Mark(1)
    count := f.announces[notification.origin] + 1
    if count > hashLimit {
        log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
        propAnnounceDOSMeter.Mark(1)
        break
    }

    // If we have a valid block number, check that it"s potentially useful
    // 高度檢查
    if notification.number > 0 {
        if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
            log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
            propAnnounceDropMeter.Mark(1)
            break
        }
    }

    // All is well, schedule the announce if block"s not yet downloading
    // 檢查是否已經(jīng)在下載,已下載則忽略
    if _, ok := f.fetching[notification.hash]; ok {
        break
    }
    if _, ok := f.completing[notification.hash]; ok {
        break
    }
    // 更新peer已經(jīng)通知給我們的區(qū)塊數(shù)量
    f.announces[notification.origin] = count
    // 把通知信息加入到announced,供調(diào)度
    f.announced[notification.hash] = append(f.announced[notification.hash], notification)
    if f.announceChangeHook != nil && len(f.announced[notification.hash]) == 1 {
        f.announceChangeHook(notification.hash, true)
    }
    if len(f.announced) == 1 {
        // 有通知放入到announced,則重設(shè)0s定時器,loop的另外一個分支會處理這些通知
        f.rescheduleFetch(fetchTimer)
    }
fetcher獲取完整區(qū)塊

本節(jié)介紹fetcher獲取完整區(qū)塊的過程,這也是fetcher最重要的功能,會涉及到fetcher至少80%的代碼。多帶帶拉放一大節(jié)吧。

Fetcher的大頭

Fetcher最主要的功能就是獲取完整的區(qū)塊,然后在合適的實際交給InsertChain去驗證和插入到本地區(qū)塊鏈。我們還是從宏觀入手,看Fetcher是如何工作的,一定要先掌握好宏觀,因為代碼層面上沒有這么清晰。

宏觀

首先,看兩個節(jié)點是如何交互,獲取完整區(qū)塊,使用時序圖的方式看一下,見圖6,流程很清晰不再文字介紹。

再看下獲取區(qū)塊過程中,fetcher內(nèi)部的狀態(tài)轉(zhuǎn)移,它使用狀態(tài)來記錄,要獲取的區(qū)塊在什么階段,見圖7。我稍微解釋一下:

收到NewBlockHashesMsg后,相關(guān)信息會記錄到announced,進(jìn)入announced狀態(tài),代表了本節(jié)點接收了消息。

announced由fetcher協(xié)程處理,經(jīng)過校驗后,會向給他發(fā)送消息的Peer發(fā)送請求,請求該區(qū)塊的區(qū)塊頭,然后進(jìn)入fetching狀態(tài)。

獲取區(qū)塊頭后,如果區(qū)塊頭表示沒有交易和uncle,則轉(zhuǎn)移到completing狀態(tài),并且使用區(qū)塊頭合成完整的區(qū)塊,加入到queued優(yōu)先級隊列。

獲取區(qū)塊頭后,如果區(qū)塊頭表示該區(qū)塊有交易和uncle,則轉(zhuǎn)移到fetched狀態(tài),然后發(fā)送請求,請求交易和uncle,然后轉(zhuǎn)移到completing狀態(tài)。

收到交易和uncle后,使用頭、交易、uncle這3個信息,生成完整的區(qū)塊,加入到隊列queued。

微觀

接下來就是從代碼角度看如何獲取完整區(qū)塊的流程了,有點多,看不懂的時候,再回顧下上面宏觀的介紹圖。

首先看Fetcher的定義,它存放了通信數(shù)據(jù)和狀態(tài)管理,撿加注釋的看,上文提到的狀態(tài),里面都有。

// Fetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
// 積累塊通知,然后調(diào)度獲取這些塊
type Fetcher struct {
    // Various event channels
    // 收到區(qū)塊hash值的通道
    notify chan *announce
    // 收到完整區(qū)塊的通道
    inject chan *inject

    blockFilter chan chan []*types.Block
    // 過濾header的通道的通道
    headerFilter chan chan *headerFilterTask
    // 過濾body的通道的通道
    bodyFilter chan chan *bodyFilterTask

    done chan common.Hash
    quit chan struct{}

    // Announce states
    // Peer已經(jīng)給了本節(jié)點多少區(qū)塊頭通知
    announces map[string]int // Per peer announce counts to prevent memory exhaustion
    // 已經(jīng)announced的區(qū)塊列表
    announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
    // 正在fetching區(qū)塊頭的請求
    fetching map[common.Hash]*announce // Announced blocks, currently fetching
    // 已經(jīng)fetch到區(qū)塊頭,還差body的請求,用來獲取body
    fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
    // 已經(jīng)得到區(qū)塊頭的
    completing map[common.Hash]*announce // Blocks with headers, currently body-completing

    // Block cache
    // queue,優(yōu)先級隊列,高度做優(yōu)先級
    // queues,統(tǒng)計peer通告了多少塊
    // queued,代表這個塊如隊列了,
    queue  *prque.Prque            // Queue containing the import operations (block number sorted)
    queues map[string]int          // Per peer block counts to prevent memory exhaustion
    queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)

    // Callbacks
    getBlock       blockRetrievalFn   // Retrieves a block from the local chain
    verifyHeader   headerVerifierFn   // Checks if a block"s headers have a valid proof of work,驗證區(qū)塊頭,包含了PoW驗證
    broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,廣播給peer
    chainHeight    chainHeightFn      // Retrieves the current chain"s height
    insertChain    chainInsertFn      // Injects a batch of blocks into the chain,插入?yún)^(qū)塊到鏈的函數(shù)
    dropPeer       peerDropFn         // Drops a peer for misbehaving

    // Testing hooks
    announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
    queueChangeHook    func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
    fetchingHook       func([]common.Hash)     // Method to call upon starting a block (eth/61) or header (eth/62) fetch
    completingHook     func([]common.Hash)     // Method to call upon starting a block body fetch (eth/62)
    importedHook       func(*types.Block)      // Method to call upon successful block import (both eth/61 and eth/62)
}

NewBlockHashesMsg消息的處理前面的小節(jié)已經(jīng)講過了,不記得可向前翻看。這里從announced的狀態(tài)處理說起。loop()中,fetchTimer超時后,代表了收到了消息通知,需要處理,會從announced中選擇出需要處理的通知,然后創(chuàng)建請求,請求區(qū)塊頭,由于可能有很多節(jié)點都通知了它某個區(qū)塊的Hash,所以隨機的從這些發(fā)送消息的Peer中選擇一個Peer,發(fā)送請求的時候,為每個Peer都創(chuàng)建了多帶帶的協(xié)程。

case <-fetchTimer.C:
    // At least one block"s timer ran out, check for needing retrieval
    // 有區(qū)塊通知,去處理
    request := make(map[string][]common.Hash)

    for hash, announces := range f.announced {
        if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
            // Pick a random peer to retrieve from, reset all others
            // 可能有很多peer都發(fā)送了這個區(qū)塊的hash值,隨機選擇一個peer
            announce := announces[rand.Intn(len(announces))]
            f.forgetHash(hash)

            // If the block still didn"t arrive, queue for fetching
            // 本地還沒有這個區(qū)塊,創(chuàng)建獲取區(qū)塊的請求
            if f.getBlock(hash) == nil {
                request[announce.origin] = append(request[announce.origin], hash)
                f.fetching[hash] = announce
            }
        }
    }
    // Send out all block header requests
    // 把所有的request發(fā)送出去
    // 為每一個peer都創(chuàng)建一個協(xié)程,然后請求所有需要從該peer獲取的請求
    for peer, hashes := range request {
        log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)

        // Create a closure of the fetch and schedule in on a new thread
        fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
        go func() {
            if f.fetchingHook != nil {
                f.fetchingHook(hashes)
            }
            for _, hash := range hashes {
                headerFetchMeter.Mark(1)
                fetchHeader(hash) // Suboptimal, but protocol doesn"t allow batch header retrievals
            }
        }()
    }
    // Schedule the next fetch if blocks are still pending
    f.rescheduleFetch(fetchTimer)

Notify的調(diào)用中,可以看出,fetcherHeader()的實際函數(shù)是RequestOneHeader(),該函數(shù)使用的消息是GetBlockHeadersMsg,可以用來請求多個區(qū)塊頭,不過fetcher只請求一個。

pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)

// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
    p.Log().Debug("Fetching single header", "hash", hash)
    return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
}

GetBlockHeadersMsg的處理如下:因為它是獲取多個區(qū)塊頭的,所以處理起來比較“麻煩”,還好,fetcher只獲取一個區(qū)塊頭,其處理在20行~33行,獲取下一個區(qū)塊頭的處理邏輯,這里就不看了,最后調(diào)用SendBlockHeaders()將區(qū)塊頭發(fā)送給請求的節(jié)點,消息是BlockHeadersMsg。

// handleMsg()
// Block header query, collect the requested headers and reply
case msg.Code == GetBlockHeadersMsg:
    // Decode the complex header query
    var query getBlockHeadersData
    if err := msg.Decode(&query); err != nil {
        return errResp(ErrDecode, "%v: %v", msg, err)
    }
    hashMode := query.Origin.Hash != (common.Hash{})

    // Gather headers until the fetch or network limits is reached
    // 收集區(qū)塊頭,直到達(dá)到限制
    var (
        bytes   common.StorageSize
        headers []*types.Header
        unknown bool
    )
    // 自己已知區(qū)塊 && 少于查詢的數(shù)量 && 大小小于2MB && 小于能下載的最大數(shù)量
    for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
        // Retrieve the next header satisfying the query
        // 獲取區(qū)塊頭
        var origin *types.Header
        if hashMode {
            // fetcher 使用的模式
            origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
        } else {
            origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
        }
        if origin == nil {
            break
        }
        number := origin.Number.Uint64()
        headers = append(headers, origin)
        bytes += estHeaderRlpSize

        // Advance to the next header of the query
        // 下一個區(qū)塊頭的獲取,不同策略,方式不同
        switch {
        case query.Origin.Hash != (common.Hash{}) && query.Reverse:
            // ...
        }
    }
    return p.SendBlockHeaders(headers)

BlockHeadersMsg的處理很有意思,因為GetBlockHeadersMsg并不是fetcher獨占的消息,downloader也可以調(diào)用,所以,響應(yīng)消息的處理需要分辨出是fetcher請求的,還是downloader請求的。它的處理邏輯是:fetcher先過濾收到的區(qū)塊頭,如果fetcher不要的,那就是downloader的,在調(diào)用fetcher.FilterHeaders的時候,fetcher就將自己要的區(qū)塊頭拿走了。

// handleMsg()
case msg.Code == BlockHeadersMsg:
    // A batch of headers arrived to one of our previous requests
    var headers []*types.Header
    if err := msg.Decode(&headers); err != nil {
        return errResp(ErrDecode, "msg %v: %v", msg, err)
    }
    // If no headers were received, but we"re expending a DAO fork check, maybe it"s that
    // 檢查是不是當(dāng)前DAO的硬分叉
    if len(headers) == 0 && p.forkDrop != nil {
        // Possibly an empty reply to the fork header checks, sanity check TDs
        verifyDAO := true

        // If we already have a DAO header, we can check the peer"s TD against it. If
        // the peer"s ahead of this, it too must have a reply to the DAO check
        if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
            if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
                verifyDAO = false
            }
        }
        // If we"re seemingly on the same chain, disable the drop timer
        if verifyDAO {
            p.Log().Debug("Seems to be on the same side of the DAO fork")
            p.forkDrop.Stop()
            p.forkDrop = nil
            return nil
        }
    }
    // Filter out any explicitly requested headers, deliver the rest to the downloader
    // 過濾是不是fetcher請求的區(qū)塊頭,去掉fetcher請求的區(qū)塊頭再交給downloader
    filter := len(headers) == 1
    if filter {
        // If it"s a potential DAO fork check, validate against the rules
        // 檢查是否硬分叉
        if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
            // Disable the fork drop timer
            p.forkDrop.Stop()
            p.forkDrop = nil

            // Validate the header and either drop the peer or continue
            if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
                p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
                return err
            }
            p.Log().Debug("Verified to be on the same side of the DAO fork")
            return nil
        }
        // Irrelevant of the fork checks, send the header to the fetcher just in case
        // 使用fetcher過濾區(qū)塊頭
        headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
    }
    // 剩下的區(qū)塊頭交給downloader
    if len(headers) > 0 || !filter {
        err := pm.downloader.DeliverHeaders(p.id, headers)
        if err != nil {
            log.Debug("Failed to deliver headers", "err", err)
        }
    }

FilterHeaders()是一個很有大智慧的函數(shù),看起來耐人尋味,但實在妙。它要把所有的區(qū)塊頭,都傳遞給fetcher協(xié)程,還要獲取fetcher協(xié)程處理后的結(jié)果。fetcher.headerFilter是存放通道的通道,而filter是存放包含區(qū)塊頭過濾任務(wù)的通道。它先把filter傳遞給了headerFilter,這樣fetcher協(xié)程就在另外一段等待了,而后將headerFilterTask傳入filter,fetcher就能讀到數(shù)據(jù)了,處理后,再將數(shù)據(jù)寫回filter而剛好被FilterHeaders函數(shù)處理了,該函數(shù)實際運行在handleMsg()的協(xié)程中。

每個Peer都會分配一個ProtocolManager然后處理該Peer的消息,但fetcher只有一個事件處理協(xié)程,如果不創(chuàng)建一個filter,fetcher哪知道是誰發(fā)給它的區(qū)塊頭呢?過濾之后,該如何發(fā)回去呢?

// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
// 尋找出fetcher請求的區(qū)塊頭
func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
    log.Trace("Filtering headers", "peer", peer, "headers", len(headers))

    // Send the filter channel to the fetcher
    // 任務(wù)通道
    filter := make(chan *headerFilterTask)

    select {
    // 任務(wù)通道發(fā)送到這個通道
    case f.headerFilter <- filter:
    case <-f.quit:
        return nil
    }
    // Request the filtering of the header list
    // 創(chuàng)建過濾任務(wù),發(fā)送到任務(wù)通道
    select {
    case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}:
    case <-f.quit:
        return nil
    }
    // Retrieve the headers remaining after filtering
    // 從任務(wù)通道,獲取過濾的結(jié)果并返回
    select {
    case task := <-filter:
        return task.headers
    case <-f.quit:
        return nil
    }
}

接下來要看f.headerFilter的處理,這段代碼有90行,它做了一下幾件事:

f.headerFilter取出filter,然后取出過濾任務(wù)task。

它把區(qū)塊頭分成3類:unknown這不是分是要返回給調(diào)用者的,即handleMsg(), incomplete存放還需要獲取body的區(qū)塊頭,complete存放只包含區(qū)塊頭的區(qū)塊。遍歷所有的區(qū)塊頭,填到到對應(yīng)的分類中,具體的判斷可看18行的注釋,記住宏觀中將的狀態(tài)轉(zhuǎn)移圖。

unknonw中的區(qū)塊返回給handleMsg()。

incomplete的區(qū)塊頭獲取狀態(tài)移動到fetched狀態(tài),然后觸發(fā)定時器,以便去處理complete的區(qū)塊。

compelete的區(qū)塊加入到queued。

// fetcher.loop()
case filter := <-f.headerFilter:
    // Headers arrived from a remote peer. Extract those that were explicitly
    // requested by the fetcher, and return everything else so it"s delivered
    // to other parts of the system.
    // 收到從遠(yuǎn)端節(jié)點發(fā)送的區(qū)塊頭,過濾出fetcher請求的
    // 從任務(wù)通道獲取過濾任務(wù)
    var task *headerFilterTask
    select {
    case task = <-filter:
    case <-f.quit:
        return
    }
    headerFilterInMeter.Mark(int64(len(task.headers)))

    // Split the batch of headers into unknown ones (to return to the caller),
    // known incomplete ones (requiring body retrievals) and completed blocks.
    // unknown的不是fetcher請求的,complete放沒有交易和uncle的區(qū)塊,有頭就夠了,incomplete放
    // 還需要獲取uncle和交易的區(qū)塊
    unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
    // 遍歷所有收到的header
    for _, header := range task.headers {
        hash := header.Hash()

        // Filter fetcher-requested headers from other synchronisation algorithms
        // 是正在獲取的hash,并且對應(yīng)請求的peer,并且未fetched,未completing,未queued
        if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil {
            // If the delivered header does not match the promised number, drop the announcer
            // 高度校驗,竟然不匹配,擾亂秩序,peer肯定是壞蛋。
            if header.Number.Uint64() != announce.number {
                log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number)
                f.dropPeer(announce.origin)
                f.forgetHash(hash)
                continue
            }
            // Only keep if not imported by other means
            // 本地鏈沒有當(dāng)前區(qū)塊
            if f.getBlock(hash) == nil {
                announce.header = header
                announce.time = task.time

                // If the block is empty (header only), short circuit into the final import queue
                // 如果區(qū)塊沒有交易和uncle,加入到complete
                if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) {
                    log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())

                    block := types.NewBlockWithHeader(header)
                    block.ReceivedAt = task.time

                    complete = append(complete, block)
                    f.completing[hash] = announce
                    continue
                }
                // Otherwise add to the list of blocks needing completion
                // 否則就是不完整的區(qū)塊
                incomplete = append(incomplete, announce)
            } else {
                log.Trace("Block already imported, discarding header", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
                f.forgetHash(hash)
            }
        } else {
            // Fetcher doesn"t know about it, add to the return list
            // 沒請求過的header
            unknown = append(unknown, header)
        }
    }
    // 把未知的區(qū)塊頭,再傳遞會filter
    headerFilterOutMeter.Mark(int64(len(unknown)))
    select {
    case filter <- &headerFilterTask{headers: unknown, time: task.time}:
    case <-f.quit:
        return
    }
    // Schedule the retrieved headers for body completion
    // 把未完整的區(qū)塊加入到fetched,跳過已經(jīng)在completeing中的,然后觸發(fā)completeTimer定時器
    for _, announce := range incomplete {
        hash := announce.header.Hash()
        if _, ok := f.completing[hash]; ok {
            continue
        }
        f.fetched[hash] = append(f.fetched[hash], announce)
        if len(f.fetched) == 1 {
            f.rescheduleComplete(completeTimer)
        }
    }
    // Schedule the header-only blocks for import
    // 把只有頭的區(qū)塊入隊列
    for _, block := range complete {
        if announce := f.completing[block.Hash()]; announce != nil {
            f.enqueue(announce.origin, block)
        }
    }

跟隨狀態(tài)圖的轉(zhuǎn)義,剩下的工作是fetched轉(zhuǎn)移到completing,上面的流程已經(jīng)觸發(fā)了completeTimer定時器,超時后就會處理,流程與請求Header類似,不再贅述,此時發(fā)送的請求消息是GetBlockBodiesMsg,實際調(diào)的函數(shù)是RequestBodies。

// fetcher.loop()
case <-completeTimer.C:
    // At least one header"s timer ran out, retrieve everything
    // 至少有1個header已經(jīng)獲取完了
    request := make(map[string][]common.Hash)

    // 遍歷所有待獲取body的announce
    for hash, announces := range f.fetched {
        // Pick a random peer to retrieve from, reset all others
        // 隨機選一個Peer發(fā)送請求,因為可能已經(jīng)有很多Peer通知它這個區(qū)塊了
        announce := announces[rand.Intn(len(announces))]
        f.forgetHash(hash)

        // If the block still didn"t arrive, queue for completion
        // 如果本地沒有這個區(qū)塊,則放入到completing,創(chuàng)建請求
        if f.getBlock(hash) == nil {
            request[announce.origin] = append(request[announce.origin], hash)
            f.completing[hash] = announce
        }
    }
    // Send out all block body requests
    // 發(fā)送所有的請求,獲取body,依然是每個peer一個多帶帶協(xié)程
    for peer, hashes := range request {
        log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)

        // Create a closure of the fetch and schedule in on a new thread
        if f.completingHook != nil {
            f.completingHook(hashes)
        }
        bodyFetchMeter.Mark(int64(len(hashes)))
        go f.completing[hashes[0]].fetchBodies(hashes)
    }
    // Schedule the next fetch if blocks are still pending
    f.rescheduleComplete(completeTimer)

handleMsg()處理該消息也是干凈利落,直接獲取RLP格式的body,然后發(fā)送響應(yīng)消息。

// handleMsg()
case msg.Code == GetBlockBodiesMsg:
    // Decode the retrieval message
    msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
    if _, err := msgStream.List(); err != nil {
        return err
    }
    // Gather blocks until the fetch or network limits is reached
    var (
        hash   common.Hash
        bytes  int
        bodies []rlp.RawValue
    )

    // 遍歷所有請求
    for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
        // Retrieve the hash of the next block
        if err := msgStream.Decode(&hash); err == rlp.EOL {
            break
        } else if err != nil {
            return errResp(ErrDecode, "msg %v: %v", msg, err)
        }
        // Retrieve the requested block body, stopping if enough was found
        // 獲取body,RLP格式
        if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
            bodies = append(bodies, data)
            bytes += len(data)
        }
    }
    return p.SendBlockBodiesRLP(bodies)

響應(yīng)消息BlockBodiesMsg的處理與處理獲取header的處理原理相同,先交給fetcher過濾,然后剩下的才是downloader的。需要注意一點,響應(yīng)消息里只包含交易列表和叔塊列表。

// handleMsg()
case msg.Code == BlockBodiesMsg:
    // A batch of block bodies arrived to one of our previous requests
    var request blockBodiesData
    if err := msg.Decode(&request); err != nil {
        return errResp(ErrDecode, "msg %v: %v", msg, err)
    }
    // Deliver them all to the downloader for queuing
    // 傳遞給downloader去處理
    transactions := make([][]*types.Transaction, len(request))
    uncles := make([][]*types.Header, len(request))

    for i, body := range request {
        transactions[i] = body.Transactions
        uncles[i] = body.Uncles
    }
    // Filter out any explicitly requested bodies, deliver the rest to the downloader
    // 先讓fetcher過濾去fetcher請求的body,剩下的給downloader
    filter := len(transactions) > 0 || len(uncles) > 0
    if filter {
        transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
    }

    // 剩下的body交給downloader
    if len(transactions) > 0 || len(uncles) > 0 || !filter {
        err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
        if err != nil {
            log.Debug("Failed to deliver bodies", "err", err)
        }
    }

過濾函數(shù)的原理也與Header相同。

// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
// 過去出fetcher請求的body,返回它沒有處理的,過程類型header的處理
func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
    log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))

    // Send the filter channel to the fetcher
    filter := make(chan *bodyFilterTask)

    select {
    case f.bodyFilter <- filter:
    case <-f.quit:
        return nil, nil
    }
    // Request the filtering of the body list
    select {
    case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}:
    case <-f.quit:
        return nil, nil
    }
    // Retrieve the bodies remaining after filtering
    select {
    case task := <-filter:
        return task.transactions, task.uncles
    case <-f.quit:
        return nil, nil
    }
}

實際過濾body的處理瞧一下,這和Header的處理是不同的。直接看不點:

它要的區(qū)塊,多帶帶取出來存到blocks中,它不要的繼續(xù)留在task中。

判斷是不是fetcher請求的方法:如果交易列表和叔塊列表計算出的hash值與區(qū)塊頭中的一樣,并且消息來自請求的Peer,則就是fetcher請求的。

blocks中的區(qū)塊加入到queued,終結(jié)。

case filter := <-f.bodyFilter:
    // Block bodies arrived, extract any explicitly requested blocks, return the rest
    var task *bodyFilterTask
    select {
    case task = <-filter:
    case <-f.quit:
        return
    }
    bodyFilterInMeter.Mark(int64(len(task.transactions)))

    blocks := []*types.Block{}
    // 獲取的每個body的txs列表和uncle列表
    // 遍歷每個區(qū)塊的txs列表和uncle列表,計算hash后判斷是否是當(dāng)前fetcher請求的body
    for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ {
        // Match up a body to any possible completion request
        matched := false

        // 遍歷所有保存的請求,因為tx和uncle,不知道它是屬于哪個區(qū)塊的,只能去遍歷所有的請求,通常量不大,所以遍歷沒有性能影響
        for hash, announce := range f.completing {
            if f.queued[hash] == nil {
                // 把傳入的每個塊的hash和unclehash和它請求出去的記錄進(jìn)行對比,匹配則說明是fetcher請求的區(qū)塊body
                txnHash := types.DeriveSha(types.Transactions(task.transactions[i]))
                uncleHash := types.CalcUncleHash(task.uncles[i])

                if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer {
                    // Mark the body matched, reassemble if still unknown
                    matched = true

                    // 如果當(dāng)前鏈還沒有這個區(qū)塊,則收集這個區(qū)塊,合并成新區(qū)塊
                    if f.getBlock(hash) == nil {
                        block := types.NewBlockWithHeader(announce.header).WithBody(task.transactions[i], task.uncles[i])
                        block.ReceivedAt = task.time

                        blocks = append(blocks, block)
                    } else {
                        f.forgetHash(hash)
                    }
                }
            }
        }
        // 從task中移除fetcher請求的數(shù)據(jù)
        if matched {
            task.transactions = append(task.transactions[:i], task.transactions[i+1:]...)
            task.uncles = append(task.uncles[:i], task.uncles[i+1:]...)
            i--
            continue
        }
    }

    // 將剩余的數(shù)據(jù)返回
    bodyFilterOutMeter.Mark(int64(len(task.transactions)))
    select {
    case filter <- task:
    case <-f.quit:
        return
    }
    // Schedule the retrieved blocks for ordered import
    // 把收集的區(qū)塊加入到隊列
    for _, block := range blocks {
        if announce := f.completing[block.Hash()]; announce != nil {
            f.enqueue(announce.origin, block)
        }
    }
}

至此,fetcher獲取完整區(qū)塊的流程講完了,fetcher模塊中80%的代碼也都貼出來了,還有2個值得看看的函數(shù):

forgetHash(hash common.Hash) :用于清空指定hash指的記/狀態(tài)錄信息。

forgetBlock(hash common.Hash):用于從隊列中移除一個區(qū)塊。

最后了,再回到開始看看fetcher模塊和新區(qū)塊的傳播流程,有沒有豁然開朗。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/24395.html

相關(guān)文章

  • 以太源碼分析:共識(2)引擎

    摘要:前言是以太坊封定義的一個接口,它的功能可以分為類驗證區(qū)塊類,主要用在將區(qū)塊加入到區(qū)塊鏈前,對區(qū)塊進(jìn)行共識驗證。輔助類生成以太坊共識相關(guān)的。被使用,是以太坊狀態(tài)管理服務(wù),當(dāng)報告數(shù)據(jù)的時候,需要獲取區(qū)塊的信息。 前言 engine是以太坊封定義的一個接口,它的功能可以分為3類: 驗證區(qū)塊類,主要用在將區(qū)塊加入到區(qū)塊鏈前,對區(qū)塊進(jìn)行共識驗證。 產(chǎn)生區(qū)塊類,主要用在挖礦時。 輔助類。 接下...

    YuboonaZhang 評論0 收藏0
  • 以太源碼分析:共識(1)礦工

    摘要:接下來我們將從以下角度介紹礦工角色。我們分別使用礦長副礦長礦工進(jìn)行類比。副礦長,負(fù)責(zé)具體挖礦工作的安排,把挖礦任務(wù)安排給。礦工的主要函數(shù)介紹和的主要函數(shù),他們是礦工的具體運作機制。負(fù)責(zé)處理外部事件。 前言 礦工在PoW中負(fù)責(zé)了產(chǎn)生區(qū)塊的工作,把一大堆交易交給它,它生成一個證明自己做了很多工作的區(qū)塊,然后將這個區(qū)塊加入到本地區(qū)塊鏈并且廣播給其他節(jié)點。 接下來我們將從以下角度介紹礦工: ...

    tylin 評論0 收藏0
  • 區(qū)塊鏈技術(shù)學(xué)習(xí)指引

    摘要:引言給迷失在如何學(xué)習(xí)區(qū)塊鏈技術(shù)的同學(xué)一個指引,區(qū)塊鏈技術(shù)是隨比特幣誕生,因此要搞明白區(qū)塊鏈技術(shù),應(yīng)該先了解下比特幣。但區(qū)塊鏈技術(shù)不單應(yīng)用于比特幣,還有非常多的現(xiàn)實應(yīng)用場景,想做區(qū)塊鏈應(yīng)用開發(fā),可進(jìn)一步閱讀以太坊系列。 本文始發(fā)于深入淺出區(qū)塊鏈社區(qū), 原文:區(qū)塊鏈技術(shù)學(xué)習(xí)指引 原文已更新,請讀者前往原文閱讀 本章的文章越來越多,本文是一個索引帖,方便找到自己感興趣的文章,你也可以使用左側(cè)...

    Cristic 評論0 收藏0
  • 以太源碼分析—挖礦與共識

    摘要:下面來看看具體是怎么實現(xiàn)接口的可以看到,啟動了多個線程調(diào)用函數(shù),當(dāng)有線程挖到時,會通過傳入的通道傳出結(jié)果。可以看到在主要循環(huán)中,不斷遞增的值,調(diào)用函數(shù)計算上面公式中的左邊,而則是公式的右邊。 前言 挖礦(mine)是指礦工節(jié)點互相競爭生成新區(qū)塊以寫入整個區(qū)塊鏈獲得獎勵的過程.共識(consensus)是指區(qū)塊鏈各個節(jié)點對下一個區(qū)塊的內(nèi)容形成一致的過程在以太坊中, miner包向外提供挖...

    walterrwu 評論0 收藏0
  • 以太源碼分析--MPT樹

    摘要:是以太坊中存儲區(qū)塊數(shù)據(jù)的核心數(shù)據(jù)結(jié)構(gòu),它和融合一個樹形結(jié)構(gòu),理解結(jié)構(gòu)對之后學(xué)習(xí)以太坊區(qū)塊以及智能合約狀態(tài)存儲結(jié)構(gòu)的模塊源碼很有幫助。 MPT(Merkle Patricia Tries)是以太坊中存儲區(qū)塊數(shù)據(jù)的核心數(shù)據(jù)結(jié)構(gòu),它Merkle Tree和Patricia Tree融合一個樹形結(jié)構(gòu),理解MPT結(jié)構(gòu)對之后學(xué)習(xí)以太坊區(qū)塊header以及智能合約狀態(tài)存儲結(jié)構(gòu)的模塊源碼很有幫助。 首...

    roadtogeek 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<