摘要:作者比原項(xiàng)目倉庫地址地址在前一篇中,我們已經(jīng)知道如何連上一個(gè)比原節(jié)點(diǎn)的端口,并與對方完成身份驗(yàn)證。代碼如下可以看到,首先是從眾多的中,找到最合適的那個(gè)。到這里,我們其實(shí)已經(jīng)知道比原是如何向其它節(jié)點(diǎn)請求區(qū)塊數(shù)據(jù),以及何時(shí)把信息發(fā)送出去。
作者:freewind
比原項(xiàng)目倉庫:
Github地址:https://github.com/Bytom/bytom
Gitee地址:https://gitee.com/BytomBlockc...
在前一篇中,我們已經(jīng)知道如何連上一個(gè)比原節(jié)點(diǎn)的p2p端口,并與對方完成身份驗(yàn)證。此時(shí),雙方結(jié)點(diǎn)已經(jīng)建立起來了信任,并且連接也不會(huì)斷開,下一步,兩者就可以繼續(xù)交換數(shù)據(jù)了。
那么,我首先想到的就是,如何才能讓對方把它已有的區(qū)塊數(shù)據(jù)全都發(fā)給我呢?
這其實(shí)可以分為三個(gè)問題:
我需要發(fā)給它什么樣的數(shù)據(jù)?
它在內(nèi)部由是如何應(yīng)答的呢?
我拿到數(shù)據(jù)之后,應(yīng)該怎么處理?
由于這一塊的邏輯還是比較復(fù)雜的,所以在本篇我們先回答第一個(gè)問題:
我們要發(fā)送什么樣的數(shù)據(jù)請求,才能讓比原節(jié)點(diǎn)把它持有的區(qū)塊數(shù)據(jù)發(fā)給我? 找到發(fā)送請求的代碼首先我們先要在代碼中定位到,比原到底是在什么時(shí)候來向?qū)Ψ焦?jié)點(diǎn)發(fā)送請求的。
在前一篇講的是如何建立連接并驗(yàn)證身份,那么發(fā)出數(shù)據(jù)請求的操作,一定在上次的代碼之后。按照這個(gè)思路,我們在SyncManager類中Switch啟動(dòng)之后,找到了一個(gè)叫BlockKeeper的類,相關(guān)的操作是在它里面完成的。
下面是老規(guī)矩,還是從啟動(dòng)開始,但是會(huì)更簡化一些:
cmd/bytomd/main.go#L54
func main() { cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir())) cmd.Execute() }
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error { n := node.NewNode(config) if _, err := n.Start(); err != nil { // ... }
node/node.go#L169
func (n *Node) OnStart() error { // ... n.syncManager.Start() // ... }
netsync/handle.go#L141
func (sm *SyncManager) Start() { go sm.netStart() // ... go sm.syncer() }
注意sm.netStart(),我們在一篇中建立連接并驗(yàn)證身份的操作,就是在它里面完成的。而這次的這個(gè)問題,是在下面的sm.syncer()中完成的。
另外注意,由于這兩個(gè)函數(shù)調(diào)用都使用了goroutine,所以它們是同時(shí)進(jìn)行的。
sm.syncer()的代碼如下:
netsync/sync.go#L46
func (sm *SyncManager) syncer() { sm.fetcher.Start() defer sm.fetcher.Stop() // ... for { select { case <-sm.newPeerCh: log.Info("New peer connected.") // Make sure we have peers to select from, then sync if sm.sw.Peers().Size() < minDesiredPeerCount { break } go sm.synchronise() // .. } }
這里混入了一個(gè)叫fetcher的奇怪的東西,名字看起來好像是專門去抓取數(shù)據(jù)的,我們要找的是它嗎?
可惜不是,fetcher的作用是從多個(gè)peer那里拿到了區(qū)塊數(shù)據(jù)之后,對數(shù)據(jù)進(jìn)行整理,把有用的放到本地鏈上。我們在以后會(huì)研究它,所以這里不展開討論。
接著是一個(gè)for循環(huán),當(dāng)發(fā)現(xiàn)通道newPeerCh有了新數(shù)據(jù)(也就是有了新的節(jié)點(diǎn)連接上了),會(huì)判斷一下當(dāng)前自己連著的節(jié)點(diǎn)是否夠多(大于等于minDesiredPeerCount,值為5),夠多的話,就會(huì)進(jìn)入sm.synchronise(),進(jìn)行數(shù)據(jù)同步。
這里為什么要多等幾個(gè)節(jié)點(diǎn),而不是一連上就馬上同步呢?我想這是希望有更多選擇的機(jī)會(huì),找到一個(gè)數(shù)據(jù)夠多的節(jié)點(diǎn)。
sm.synchronise()還是屬于SyncManager的方法。在真正調(diào)用到BlockKeeper的方法之前,它還做了一些比如清理已經(jīng)斷開的peer,找到最適合同步數(shù)據(jù)的peer等。其中“清理peer”的工作涉及到不同的對象持有的peer集合間的同步,略有些麻煩,但對當(dāng)前問題幫助不大,所以我打算把它們放在以后的某個(gè)問題中回答(比如“當(dāng)一個(gè)節(jié)點(diǎn)斷開了,比原會(huì)有什么樣的處理”),這里就先省略。
sm.synchronise()代碼如下:
netsync/sync.go#L77
func (sm *SyncManager) synchronise() { log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List()) // ... peer, bestHeight := sm.peers.BestPeer() // ... if bestHeight > sm.chain.BestBlockHeight() { // ... sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight) } }
可以看到,首先是從眾多的peers中,找到最合適的那個(gè)。什么叫Best呢?看一下BestPeer()的定義:
netsync/peer.go#L266
func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) { // ... for _, p := range ps.peers { if bestPeer == nil || p.height > bestHeight { bestPeer, bestHeight = p.swPeer, p.height } } return bestPeer, bestHeight }
其實(shí)就是持有區(qū)塊鏈數(shù)據(jù)最長的那個(gè)。
找到了BestPeer之后,就調(diào)用sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)方法,從這里,正式進(jìn)入BlockKeeper -- 也就是本文的主角 -- 的世界。
BlockKeeperblockKeeper.BlockRequestWorker的邏輯比較復(fù)雜,它包含了:
根據(jù)自己持有的區(qū)塊數(shù)據(jù)來計(jì)算需要同步的數(shù)據(jù)
向前面找到的最佳節(jié)點(diǎn)發(fā)送數(shù)據(jù)請求
拿到對方發(fā)過來的區(qū)塊數(shù)據(jù)
對數(shù)據(jù)進(jìn)行處理
廣播新狀態(tài)
處理各種出錯(cuò)情況,等等
由于本文中只關(guān)注“發(fā)送請求”,所以一些與之關(guān)系不大的邏輯我會(huì)忽略掉,留待以后再講。
在“發(fā)送請求”這里,實(shí)際也包含了兩種情形,一種簡單的,一種復(fù)雜的:
簡單的:假設(shè)不存在分叉,則直接檢查本地高度最高的區(qū)塊,然后請求下一個(gè)區(qū)塊
復(fù)雜的:考慮分叉的情況,則當(dāng)前本地的區(qū)塊可能就存在分叉,那么到底應(yīng)該請求哪個(gè)區(qū)塊,就需要慎重考慮
由于第2種情況對于本文來說過于復(fù)雜(因?yàn)樾枰羁汤斫獗仍溨蟹植娴奶幚磉壿嫞?,所以在本文中將把問題簡化,只考慮第1種。而分叉的處理,將放在以后講解。
下面是把blockKeeper.BlockRequestWorker中的代碼簡化成了只包含第1種情況:
netsync/block_keeper.go#L72
func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error { num := bk.chain.BestBlockHeight() + 1 reqNum := uint64(0) reqNum = num // ... bkPeer, ok := bk.peers.Peer(peerID) swPeer := bkPeer.getPeer() // ... block, err := bk.BlockRequest(peerID, reqNum) // ... }
在這種情況下,我們可以認(rèn)為bk.chain.BestBlockHeight()中的Best,指的是本地持有的不帶分叉的區(qū)塊鏈高度最高的那個(gè)。(需要提醒的是,如果存在分叉情況,則Best不一定是高度最高的那個(gè))
那么我們就可以直接向最佳peer請求下一個(gè)高度的區(qū)塊,它是通過bk.BlockRequest(peerID, reqNum)實(shí)現(xiàn)的:
netsync/block_keeper.go#L152
func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) { var block *types.Block if err := bk.blockRequest(peerID, height); err != nil { return nil, errReqBlock } // ... for { select { case pendingResponse := <-bk.pendingProcessCh: block = pendingResponse.block // ... return block, nil // ... } } }
在上面簡化后的代碼中,主要分成了兩個(gè)部分。一個(gè)是發(fā)送請求bk.blockRequest(peerID, height),這是本文的重點(diǎn);它下面的for-select部分,已經(jīng)是在等待并處理對方節(jié)點(diǎn)的返回?cái)?shù)據(jù)了,這部分我們今天先略過不講。
bk.blockRequest(peerID, height)這個(gè)方法,從邏輯上又可以分成兩部分:
構(gòu)造出請求的信息
把信息發(fā)送給對方節(jié)點(diǎn)
構(gòu)造出請求的信息bk.blockRequest(peerID, height)經(jīng)過一連串的方法調(diào)用之后,使用height構(gòu)造出了一個(gè)BlockRequestMessage對象,代碼如下:
netsync/block_keeper.go#L148
func (bk *blockKeeper) blockRequest(peerID string, height uint64) error { return bk.peers.requestBlockByHeight(peerID, height) }
netsync/peer.go#L332
func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error { peer, ok := ps.Peer(peerID) // ... return peer.requestBlockByHeight(height) }
netsync/peer.go#L73
func (p *peer) requestBlockByHeight(height uint64) error { msg := &BlockRequestMessage{Height: height} p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}) return nil }
到這里,終于構(gòu)造出了所需要的BlockRequestMessage,其實(shí)主要就是把height告訴peer。
然后,通過Peer的TrySend()把該信息發(fā)出去。
發(fā)送請求在TrySend中,主要是通過github.com/tendermint/go-wire庫將其序列化,再發(fā)送給對方。看起來應(yīng)該是很簡單的操作吧,先預(yù)個(gè)警,還是挺繞的。
當(dāng)我們進(jìn)入TrySend()后:
p2p/peer.go#L242
func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } return p.mconn.TrySend(chID, msg) }
發(fā)現(xiàn)它把鍋丟給了p.mconn.TrySend方法,那么mconn是什么?chID又是什么?
mconn是MConnection的實(shí)例,它是從哪兒來的?它應(yīng)該在之前的某個(gè)地方初始化了,否則我們沒法直接調(diào)用它。所以我們先來找到它初始化的地方。
經(jīng)過一番尋找,發(fā)現(xiàn)原來是在前一篇之后,即比原節(jié)點(diǎn)與另一個(gè)節(jié)點(diǎn)完成了身份驗(yàn)證之后,具體的位置在Switch類啟動(dòng)的地方。
我們這次直接從Swtich的OnStart作為起點(diǎn):
p2p/switch.go#L186
func (sw *Switch) OnStart() error { //... // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } return nil }
p2p/switch.go#L498
func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() // ... err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) // ... } }
p2p/switch.go#L645
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { // ... peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) // ... }
p2p/peer.go#L87
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) }
p2p/peer.go#L91
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn := rawConn // ... if config.AuthEnc { // ... conn, err = MakeSecretConnection(conn, ourNodePrivKey) // ... } // Key and NodeInfo are set after Handshake p := &Peer{ outbound: outbound, conn: conn, config: config, Data: cmn.NewCMap(), } p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig) p.BaseService = *cmn.NewBaseService(nil, "Peer", p) return p, nil }
終于找到了。上面方法中的MakeSecretConnection就是與對方節(jié)點(diǎn)交換公鑰并進(jìn)行身份驗(yàn)證的地方,下面的p.mconn = createMConnection(...)就是創(chuàng)建mconn的地方。
繼續(xù)進(jìn)去:
p2p/peer.go#L292
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { if chID == PexChannel { return } else { cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) } } reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { onPeerError(p, r) } return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) }
原來mconn是MConnection的實(shí)例,它是通過NewMConnectionWithConfig創(chuàng)建的。
看了上面的代碼,發(fā)現(xiàn)這個(gè)MConnectionWithConfig與普通的net.Conn并沒有太大的區(qū)別,只不過是當(dāng)收到了對方發(fā)來的數(shù)據(jù)后,會(huì)根據(jù)指定的chID調(diào)用相應(yīng)的Reactor的Receive方法來處理。所以它起到了將數(shù)據(jù)分發(fā)給Reactor的作用。
為什么需要這樣的分發(fā)操作呢?這是因?yàn)椋诒仍?,?jié)點(diǎn)之間交換數(shù)據(jù),有多種不同的方式:
一種是規(guī)定了詳細(xì)的數(shù)據(jù)交互協(xié)議(比如有哪些信息類型,分別代表什么意思,什么情況下發(fā)哪個(gè),如何應(yīng)答等),在ProtocolReactor中實(shí)現(xiàn),它對應(yīng)的chID是BlockchainChannel,值為byte(0x40)
另一種使用了與BitTorrent類似的文件共享協(xié)議,叫PEX,在PEXReactor中實(shí)現(xiàn),它對應(yīng)的chID是PexChannel,值為byte(0x00)
所以節(jié)點(diǎn)之間發(fā)送信息的時(shí)候,需要知道對方發(fā)過來的數(shù)據(jù)對應(yīng)的是哪一種方式,然后轉(zhuǎn)交給相應(yīng)的Reactor去處理。
在比原中,前者是主要的方式,后者起到輔助作用。我們目前的文章中涉及到的都是前者,后者將在以后專門研究。
p.mconn.TrySend當(dāng)我們知道了p.mconn.TrySend中的mconn是什么,并且在什么時(shí)候初始化以后,下面就可以進(jìn)入它的TrySend方法了。
p2p/connection.go#L243
func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // ... channel, ok := c.channelsIdx[chID] // ... ok = channel.trySendBytes(wire.BinaryBytes(msg)) if ok { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: default: } } return ok }
可以看到,它找到相應(yīng)的channel后(在這里應(yīng)該是ProtocolReactor對應(yīng)的channel),調(diào)用channel的trySendBytes方法。在發(fā)送數(shù)據(jù)的時(shí)候,使用了github.com/tendermint/go-wire庫,將msg序列化為二進(jìn)制數(shù)組。
p2p/connection.go#L602
func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true default: return false } }
原來它是把要發(fā)送的數(shù)據(jù),放到了該channel對應(yīng)的sendQueue中,交由別人來發(fā)送。具體是由誰來發(fā)送,我們馬上要就找到它。
細(xì)心的同學(xué)會(huì)發(fā)現(xiàn),Channel除了trySendBytes方法外,還有一個(gè)sendBytes(在本文中沒有用上):
p2p/connection.go#L589
func (ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true case <-time.After(defaultSendTimeout): return false } }
它們兩個(gè)的區(qū)別是,前者嘗試把待發(fā)送數(shù)據(jù)bytes放入ch.sendQueue時(shí),如果能放進(jìn)去,則返回true,否則馬上失敗,返回false,所以它是非阻塞的。而后者,如果放不進(jìn)去(sendQueue已滿,那邊還沒處理完),則等待defaultSendTimeout(值為10秒),然后才會(huì)失敗。另外,sendQueue的容量默認(rèn)為1。
到這里,我們其實(shí)已經(jīng)知道比原是如何向其它節(jié)點(diǎn)請求區(qū)塊數(shù)據(jù),以及何時(shí)把信息發(fā)送出去。
本想在本篇中就把真正發(fā)送數(shù)據(jù)的代碼也一起講了,但是發(fā)現(xiàn)它的邏輯也相當(dāng)復(fù)雜,所以就另開一篇講吧。
再回到本文問題,再強(qiáng)調(diào)一下,我們前面說了,對于向peer請求區(qū)塊數(shù)據(jù),有兩種情況:一種是簡單的不考慮分叉的,另一種是復(fù)雜的考慮分叉的。在本文只考慮了簡單的情況,在這種情況下,所謂的bestHeight就是指的最高的那個(gè)區(qū)塊的高度,而在復(fù)雜情況下,它就不一定了。這就留待以后我們再詳細(xì)討論,本文的問題就算是回答完畢了。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/24157.html
摘要:所以本文本來是想去研究一下,當(dāng)別的節(jié)點(diǎn)把區(qū)塊數(shù)據(jù)發(fā)給我們之后,我們應(yīng)該怎么處理,現(xiàn)在換成研究比原的是怎么做出來的。進(jìn)去后會(huì)看到大量的與相關(guān)的配置。它的功能主要是為了在訪問與的函數(shù)之間增加了一層轉(zhuǎn)換。 作者:freewind 比原項(xiàng)目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlo...
摘要:到這里,我們總算能夠完整的理解清楚,當(dāng)我們向一個(gè)比原節(jié)點(diǎn)請求區(qū)塊數(shù)據(jù),我們這邊需要怎么做,對方節(jié)點(diǎn)又需要怎么做了。 作者:freewind 比原項(xiàng)目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 在上一篇,我們知道了比原是如何把請求區(qū)塊數(shù)據(jù)的信息BlockReque...
摘要:作者比原項(xiàng)目倉庫地址地址在前一篇中,我們說到,當(dāng)比原向其它節(jié)點(diǎn)請求區(qū)塊數(shù)據(jù)時(shí),會(huì)發(fā)送一個(gè)把需要的區(qū)塊告訴對方,并把該信息對應(yīng)的二進(jìn)制數(shù)據(jù)放入對應(yīng)的通道中,等待發(fā)送。這個(gè)就是真正與連接對象綁定的一個(gè)緩存區(qū),寫入到它里面的數(shù)據(jù),會(huì)被發(fā)送出去。 作者:freewind 比原項(xiàng)目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https:...
摘要:如果傳的是,就會(huì)在內(nèi)部使用默認(rèn)的隨機(jī)數(shù)生成器生成隨機(jī)數(shù)并生成密鑰。使用的是,生成的是一個(gè)形如這樣的全球唯一的隨機(jī)數(shù)把密鑰以文件形式保存在硬盤上。 作者:freewind 比原項(xiàng)目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 在前一篇,我們探討了從瀏覽器的dashb...
摘要:所以在今天我打算通過源代碼分析一下比原的挖礦流程,但是考慮到它肯定會(huì)涉及到比原的核心,所以太復(fù)雜的地方我就會(huì)先跳過,那些地方時(shí)機(jī)成熟的時(shí)候會(huì)徹底研究一下。 作者:freewind 比原項(xiàng)目倉庫: Github地址:https://github.com/Bytom/bytom Gitee地址:https://gitee.com/BytomBlockc... 當(dāng)我們以bytom init ...
閱讀 1356·2021-11-24 09:38
閱讀 3291·2021-11-22 12:03
閱讀 4265·2021-11-11 10:59
閱讀 2372·2021-09-28 09:36
閱讀 1064·2021-09-09 09:32
閱讀 3488·2021-08-05 10:00
閱讀 2573·2021-07-23 15:30
閱讀 3020·2019-08-30 13:12