成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

以太坊事件框架

騫諱護 / 3234人閱讀

摘要:和各有優(yōu)劣,最優(yōu)秀的共同特點是,他們只依賴于原始的包,完全與以太坊的其他模塊隔離開來,也就是說,你完全可以把這兩個事件框架用在自己的項目中。

過去在學Actor模型的時候,就認為異步消息是相當?shù)闹匾谌A為的時候,也深扒了一下當時產(chǎn)品用的消息模型,簡單實用,支撐起了很多模塊和業(yè)務,但也有一個缺點是和其他的框架有耦合,最近看到以太坊的事件框架,同樣簡單簡潔,理念很適合初步接觸事件框架的同學,寫文介紹一下。

以太坊的事件框架是一個多帶帶的基礎模塊,存在于目錄go-ethereum/event中,它有2中獨立的事件框架實現(xiàn),老點的叫TypeMux,已經(jīng)基本棄用,新的叫Feed,當前正在廣泛使用。

TypeMuxFeed還只是簡單的事件框架,與Kafka、RocketMQ等消息系統(tǒng)相比,是非常的傳統(tǒng)和簡單,但是TypeMuxFeed的簡單簡潔,已經(jīng)很好的支撐以太坊的上層模塊,這是當下最好的選擇。

TypeMuxFeed各有優(yōu)劣,最優(yōu)秀的共同特點是,他們只依賴于Golang原始的包,完全與以太坊的其他模塊隔離開來,也就是說,你完全可以把這兩個事件框架用在自己的項目中。

TypeMux的特點是,你把所有的訂閱塞給它就好,事件來了它自會通知你,但有可能會阻塞,通知你不是那么及時,甚至過了一段挺長的時間。

Feed的特點是,它通常不存在阻塞的情況,會及時的把事件通知給你,但需要你為每類事件都建立一個Feed,然后不同的事件去不同的Feed上訂閱和發(fā)送,這其實挺煩人的,如果你用錯了Feed,會導致panic。

接下來,介紹下這種簡單事件框架的抽象模型,然后再回歸到以太坊,介紹下TypeMuxFeed。

!<--more-->

事件框架的抽象結(jié)構(gòu)

如上圖,輕量級的事件框架會把所有的被訂閱的事件收集起來,然后把每個訂閱者組合成一個列表,當事件框架收到某個事件的時候,就把訂閱該事件的所有訂閱者找出來,然后把這個事件發(fā)給他們。

它需要具有2個功能:

讓訂閱者訂閱、取消訂閱某類事件。

讓發(fā)布者能夠發(fā)布某個事件,并且把事件送到每個訂閱者。

如果做成完善的消息系統(tǒng),就還得考慮這些特性:可用性、吞吐量、傳輸延遲、有序消息、消息存儲、過濾、重發(fā),這和事件框架相比就復雜上去了,我們專注的介紹下以太坊的事件模型怎么完成上述3個功能的。

以太坊的事件模型

TypeMux是一個以太坊不太滿意的事件框架,所以以太坊就搞了Feed出來,它解決了TypeMux效率低下,延遲交付的問題。接下來就先看下這個TypeMux。

TypeMux:同步事件框架

TypeMux是一個同步事件框架。它的實現(xiàn)和上面講的事件框架的抽象結(jié)構(gòu)是完全一樣的,它維護了一個訂閱表,表里維護了每個事件的訂閱者列表。它的特點:

采用多對多結(jié)構(gòu):多個事件對多個訂閱者。

采用推模式,把事件/消息推送給訂閱者,就像信件一樣,會被送到你的信箱,你在信箱里取信就行了。

是一個同步事件框架。這也是它的缺點所在,舉個例子就是:郵遞員要給小紅、小明送信,只有信箱里的信被小紅取走后,郵遞員才去給小明送信,如果小紅旅游去了無法取信,郵遞員就一直等在小紅家,而小明一直收不到信,小明很無辜無辜??!

看下它2個功能的實現(xiàn):

訂閱和取消訂閱。訂閱通過函數(shù)TypeMux.Subscribe(),入?yún)橐嗛喌氖录愋停瑫祷?b>TypeMuxSubscription給訂閱者,訂閱者可通過此控制訂閱,通過TypeMuxSubscription.Unsubscribe() 可以取消訂閱。

發(fā)布事件和傳遞事件。TypeMux.Post(),入?yún)槭录愋?,根?jù)訂閱表找出該事件的訂閱者列表,遍歷列表,依次向每個訂閱者傳遞事件,如果前一個沒有傳遞完成進入阻塞,會導致后邊的訂閱者不能及時收到事件。

TypeMux源碼速遞

TypeMux的精簡組成:

// A TypeMux dispatches events to registered receivers. Receivers can be
// registered to handle events of certain type. Any operation
// called after mux is stopped will return ErrMuxClosed.
//
// The zero value is ready to use.
//
// Deprecated: use Feed
// 本質(zhì):哈希列表,每個事件的訂閱者都存到對于的列表里
type TypeMux struct {
    mutex   sync.RWMutex // 鎖
    subm    map[reflect.Type][]*TypeMuxSubscription // 訂閱表:所有事件類型的所有訂閱者
    stopped bool
}

訂閱:

// Subscribe creates a subscription for events of the given types. The
// subscription"s channel is closed when it is unsubscribed
// or the mux is closed.
// 訂閱者只傳入訂閱的事件類型,然后TypeMux會返回給它一個訂閱對象
func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
    sub := newsub(mux)
    mux.mutex.Lock()
    defer mux.mutex.Unlock()
    if mux.stopped {
        // set the status to closed so that calling Unsubscribe after this
        // call will short circuit.
        sub.closed = true
        close(sub.postC)
    } else {
        if mux.subm == nil {
            mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
        }
        for _, t := range types {
            rtyp := reflect.TypeOf(t)
            // 在同一次訂閱中,不要重復訂閱同一個類型的事件
            oldsubs := mux.subm[rtyp]
            if find(oldsubs, sub) != -1 {
                panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
            }
            subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
            copy(subs, oldsubs)
            subs[len(oldsubs)] = sub
            mux.subm[rtyp] = subs
        }
    }
    return sub
}

取消訂閱:

func (s *TypeMuxSubscription) Unsubscribe() {
    s.mux.del(s)
    s.closewait()
}

發(fā)布事件和傳遞事件:

// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
// 遍歷map,找到所有訂閱的人,向它們傳遞event,同一個event對象,非拷貝,運行在調(diào)用者goroutine
func (mux *TypeMux) Post(ev interface{}) error {
    event := &TypeMuxEvent{
        Time: time.Now(),
        Data: ev,
    }
    rtyp := reflect.TypeOf(ev)
    mux.mutex.RLock()
    if mux.stopped {
        mux.mutex.RUnlock()
        return ErrMuxClosed
    }
    subs := mux.subm[rtyp]
    mux.mutex.RUnlock()
    for _, sub := range subs {
        sub.deliver(event)
    }
    return nil
}

func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
    // Short circuit delivery if stale event
    // 不發(fā)送過早(老)的消息
    if s.created.After(event.Time) {
        return
    }
    // Otherwise deliver the event
    s.postMu.RLock()
    defer s.postMu.RUnlock()

    select {
    case s.postC <- event:
    case <-s.closing:
    }
}

我上面指出了發(fā)送事件可能阻塞,阻塞在哪?關(guān)鍵就在下面這里:創(chuàng)建TypeMuxSubscription時,通道使用的是無緩存通道,讀寫是同步的,這里注定了TypeMux是一個同步事件框架,這是以太坊改用Feed的最大原因。

func newsub(mux *TypeMux) *TypeMuxSubscription {
    c := make(chan *TypeMuxEvent) // 無緩沖通道,同步讀寫
    return &TypeMuxSubscription{
        mux:     mux,
        created: time.Now(),
        readC:   c,
        postC:   c,
        closing: make(chan struct{}),
    }
}
Feed:流式框架

Feed是一個流式事件框架。上文強調(diào)了TypeMux是一個同步框架,也正是因為此以太坊丟棄了它,難道Feed就是一個異步框架?不一定是的,這取決于訂閱者是否采用有緩存的通道,采用有緩存的通道,則Feed就是異步的,采用無緩存的通道,F(xiàn)eed就是同步的,把同步還是異步的選擇交給使用者。

本節(jié)強調(diào)Feed的流式特點。事件本質(zhì)是一個數(shù)據(jù),連續(xù)不斷的事件就組成了一個數(shù)據(jù)流,這些數(shù)據(jù)流不停的流向它的訂閱者那里,并且不會阻塞在任何一個訂閱者那里。

舉幾個不是十分恰當?shù)睦印?/p>

公司要放中秋節(jié),HR給所有同事都發(fā)了一封郵件,有些同事讀了,有些同事沒讀,要到國慶節(jié)了HR又給所有同事發(fā)了一封郵件,這些郵件又進入到每個人的郵箱,不會因為任何一個人沒有讀郵件,導致剩下的同事收不到郵件。

你在朋友圈給朋友旅行的照片點了個贊,每當你們共同朋友點贊或者評論的時候,你都會收到提醒,無論你看沒看這些提醒,這些提醒都會不斷的發(fā)過來。

你微博關(guān)注了謝娜,謝娜發(fā)了個搞笑的視頻,你刷微博的時候就收到了,但也有很多人根本沒刷微博,你不會因為別人沒有刷,你就收不到謝娜的動態(tài)。

Feed和TypeMux相同的是,它們都是推模式,不同的是Feed是異步的,如果有些訂閱者阻塞了,沒關(guān)系,它會繼續(xù)向后面的訂閱者發(fā)送事件/消息。

Feed是一個一對多的事件流框架。每個類型的事件都需要一個與之對應的Feed,訂閱者通過這個Feed進行訂閱事件,發(fā)布者通過這個Feed發(fā)布事件。

看下Feed是如何實現(xiàn)2個功能的:

訂閱和取消訂閱:Feed.Subscribe(),入?yún)⑹且粋€通道,通常是有緩沖的,就算是無緩存也不會造成Feed阻塞,F(xiàn)eed會校驗這個通道的類型和本Feed管理的事件類型是否一致,然后把通道保存下來,返回給訂閱者一個Subscription,可以通過它取消訂閱和讀取通道錯誤。

發(fā)布事件和傳遞事件。Feed.Send()入?yún)⑹且粋€事件,加鎖確保本類型事件只有一個發(fā)送協(xié)程正在進行,然后校驗事件類型是否匹配,F(xiàn)eed會嘗試給每個訂閱者發(fā)送事件,如果訂閱者阻塞,F(xiàn)eed就繼續(xù)嘗試給下一個訂閱者發(fā)送,直到給每個訂閱者發(fā)送事件,返回發(fā)送該事件的數(shù)量。

Feed源碼速遞

Feed定義:

// Feed implements one-to-many subscriptions where the carrier of events is a channel.
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
//
// Feeds can only be used with a single type. The type is determined by the first Send or
// Subscribe operation. Subsequent calls to these methods panic if the type does not
// match.
//
// The zero value is ready to use.
// 一對多的事件訂閱管理:每個feed對象,當別人調(diào)用send的時候,會發(fā)送給所有訂閱者
// 每種事件類型都有一個自己的feed,一個feed內(nèi)訂閱的是同一種類型的事件,得用某個事件的feed才能訂閱該事件
type Feed struct {
    once      sync.Once        // ensures that init only runs once
    sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases. 這個鎖確保了只有一個協(xié)程在使用go routine
    removeSub chan interface{} // interrupts Send
    sendCases caseList         // the active set of select cases used by Send,訂閱的channel列表,這些channel是活躍的

    // The inbox holds newly subscribed channels until they are added to sendCases.
    mu     sync.Mutex
    inbox  caseList // 不活躍的在這里
    etype  reflect.Type
    closed bool
}

訂閱事件:

// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
// until the subscription is canceled. All channels added must have the same element type.
//
// The channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
// 訂閱者傳入接收事件的通道,feed將通道保存為case,然后返回給訂閱者訂閱對象
func (f *Feed) Subscribe(channel interface{}) Subscription {
    f.once.Do(f.init)

    // 通道和通道類型檢查
    chanval := reflect.ValueOf(channel)
    chantyp := chanval.Type()
    if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
        panic(errBadChannel)
    }
    sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

    f.mu.Lock()
    defer f.mu.Unlock()
    if !f.typecheck(chantyp.Elem()) {
        panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
    }
    
    // 把通道保存到case
    // Add the select case to the inbox.
    // The next Send will add it to f.sendCases.
    cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
    f.inbox = append(f.inbox, cas)
    return sub
}

發(fā)送和傳遞事件:這個發(fā)送是比較繞一點的,要想真正掌握其中的運行,最好寫個小程序練習下。

// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
// 同時向所有的訂閱者發(fā)送事件,返回訂閱者的數(shù)量
func (f *Feed) Send(value interface{}) (nsent int) {
    rvalue := reflect.ValueOf(value)

    f.once.Do(f.init)
    <-f.sendLock // 獲取發(fā)送鎖

    // Add new cases from the inbox after taking the send lock.
    // 從inbox加入到sendCases,不能訂閱的時候直接加入到sendCases,因為可能其他協(xié)程在調(diào)用發(fā)送
    f.mu.Lock()
    f.sendCases = append(f.sendCases, f.inbox...)
    f.inbox = nil

    // 類型檢查:如果該feed不是要發(fā)送的值的類型,釋放鎖,并且執(zhí)行panic
    if !f.typecheck(rvalue.Type()) {
        f.sendLock <- struct{}{}
        panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
    }
    f.mu.Unlock()

    // Set the sent value on all channels.
    // 把發(fā)送的值關(guān)聯(lián)到每個case/channel,每一個事件都有一個feed,所以這里全是同一個事件的
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = rvalue
    }

    // Send until all channels except removeSub have been chosen. "cases" tracks a prefix
    // of sendCases. When a send succeeds, the corresponding case moves to the end of
    // "cases" and it shrinks by one element.
    // 所有case仍然保留在sendCases,只是用過的會移動到最后面
    cases := f.sendCases
    for {
        // Fast path: try sending without blocking before adding to the select set.
        // This should usually succeed if subscribers are fast enough and have free
        // buffer space.
        // 使用非阻塞式發(fā)送,如果不能發(fā)送就及時返回
        for i := firstSubSendCase; i < len(cases); i++ {
            // 如果發(fā)送成功,把這個case移動到末尾,所以i這個位置就是沒處理過的,然后大小減1
            if cases[i].Chan.TrySend(rvalue) {
                nsent++
                cases = cases.deactivate(i)
                i--
            }
        }

        // 如果這個地方成立,代表所有訂閱者都不阻塞,都發(fā)送完了
        if len(cases) == firstSubSendCase {
            break
        }

        // Select on all the receivers, waiting for them to unblock.
        // 返回一個可用的,直到不阻塞。
        chosen, recv, _ := reflect.Select(cases)
        if chosen == 0 /* <-f.removeSub */ {
            // 這個接收方要刪除了,刪除并縮小sendCases
            index := f.sendCases.find(recv.Interface())
            f.sendCases = f.sendCases.delete(index)
            if index >= 0 && index < len(cases) {
                // Shrink "cases" too because the removed case was still active.
                cases = f.sendCases[:len(cases)-1]
            }
        } else {
            // reflect已經(jīng)確保數(shù)據(jù)已經(jīng)發(fā)送,無需再嘗試發(fā)送
            cases = cases.deactivate(chosen)
            nsent++
        }
    }

    // 把sendCases中的send都標記為空
    // Forget about the sent value and hand off the send lock.
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = reflect.Value{}
    }
    f.sendLock <- struct{}{}
    return nsent
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/24394.html

相關(guān)文章

  • 2018以太智能合約編程語言solidity的最佳IDEs

    摘要:使用基于以太坊的智能合約的集成開發(fā)環(huán)境。以太坊教程,主要介紹智能合約與應用開發(fā),適合入門。以太坊,主要是介紹使用進行智能合約開發(fā)交互,進行賬號創(chuàng)建交易轉(zhuǎn)賬代幣開發(fā)以及過濾器和事件等內(nèi)容。 Solidity是一種以智能合約為導向的編程語言。這是一種只有四年的年輕語言,旨在幫助開發(fā)基于以太坊數(shù)字貨幣的智能合約。 理解它官方文檔應該是學習Solidity的最佳來源:solidity.read...

    darkerXi 評論0 收藏0
  • 以太智能合約開發(fā)第二篇:理解以太相關(guān)概念

    摘要:原文發(fā)表于以太坊智能合約開發(fā)第二篇理解以太坊相關(guān)概念很多人都說比特幣是區(qū)塊鏈,以太坊是區(qū)塊鏈。它是以太坊智能合約的運行環(huán)境。是由以太坊節(jié)點提供。以太坊社區(qū)把基于智能合約的應用稱為去中心化的應用。 原文發(fā)表于:以太坊智能合約開發(fā)第二篇:理解以太坊相關(guān)概念 很多人都說比特幣是區(qū)塊鏈1.0,以太坊是區(qū)塊鏈2.0。在以太坊平臺上,可以開發(fā)各種各樣的去中心化應用,這些應用構(gòu)成了以太坊的整個生態(tài)...

    yibinnn 評論0 收藏0
  • 以太連載(五):以太社區(qū)、基金會、貢獻者介紹

    摘要:以太坊論壇大名鼎鼎的以太坊論壇將不再維護,可能很快就會停用。以太坊基金會以太坊基金會是在瑞士注冊的非營利性機構(gòu),旨在管理以太幣銷售中籌措的基金,以更好地為以太坊和去中心化技術(shù)生態(tài)系統(tǒng)服務。 社區(qū)發(fā)起討論和問問題,請明智選擇論壇,并協(xié)助我們維護論壇環(huán)境整潔。 Reddit以太坊reddit分論壇是最全面的以太坊論壇,這里是大部分社區(qū)討論發(fā)生的地方和核心開發(fā)者最活躍的地方。如果你想對新聞、...

    KoreyLee 評論0 收藏0
  • 智能合約實施指南

    摘要:在協(xié)議結(jié)束時,智能合約被視為已履行并仍存儲在區(qū)塊鏈網(wǎng)絡中。這組條件和事件代表了最基本的一次性智能合約。智能合約用例智能合約越來越受歡迎,并已在各種區(qū)塊鏈項目中實施。 與區(qū)塊鏈技術(shù)一樣,智能合約在商業(yè)領域也非常有價值。 為了讓我們的讀者徹底了解智能合約是什么以及它們?nèi)绾斡绊懍F(xiàn)代商業(yè)的交易方式,我們準備了本指南。 集中商業(yè)模式正在給去中心化的模式讓路 傳統(tǒng)的商業(yè)關(guān)系模型都是集中式的,始終存...

    meteor199 評論0 收藏0
  • 智能合約實施指南

    摘要:在協(xié)議結(jié)束時,智能合約被視為已履行并仍存儲在區(qū)塊鏈網(wǎng)絡中。這組條件和事件代表了最基本的一次性智能合約。智能合約用例智能合約越來越受歡迎,并已在各種區(qū)塊鏈項目中實施。 與區(qū)塊鏈技術(shù)一樣,智能合約在商業(yè)領域也非常有價值。 為了讓我們的讀者徹底了解智能合約是什么以及它們?nèi)绾斡绊懍F(xiàn)代商業(yè)的交易方式,我們準備了本指南。 集中商業(yè)模式正在給去中心化的模式讓路 傳統(tǒng)的商業(yè)關(guān)系模型都是集中式的,始終存...

    PumpkinDylan 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<