成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

k8s與日志--journalbeat源碼解讀

Amio / 1671人閱讀

摘要:但是也存在諸多的問題,隨著新設(shè)備的出現(xiàn)以及對安全的重視,這些缺點(diǎn)越發(fā)顯得突出,例如日志消息內(nèi)容無法驗(yàn)證數(shù)據(jù)格式松散日志檢索低效有限的元數(shù)據(jù)保存無法記錄二進(jìn)制數(shù)據(jù)等。該服務(wù)可以為項(xiàng)目增加一定數(shù)量的元數(shù)據(jù)。

前言

對于日志系統(tǒng)的重要性不言而喻,參照滬江的一篇關(guān)于日志系統(tǒng)的介紹,基本上日志數(shù)據(jù)在以下幾方面具有非常重要的作用:

數(shù)據(jù)查找:通過檢索日志信息,定位相應(yīng)的 bug ,找出解決方案

服務(wù)診斷:通過對日志信息進(jìn)行統(tǒng)計(jì)、分析,了解服務(wù)器的負(fù)荷和服務(wù)運(yùn)行狀態(tài)

數(shù)據(jù)分析:可以做進(jìn)一步的數(shù)據(jù)分析,比如根據(jù)請求中的課程 id ,找出 TOP10 用戶感興趣課程

日志+大數(shù)據(jù)+AI的確有很多想象空間。
而對于收集系統(tǒng),流行的技術(shù)stack有之前的elk,到現(xiàn)在的efk。logstash換成了filebeat。當(dāng)然日志收集agent,也有flume和fluentd,尤其fluentd屬于cncf組織的產(chǎn)品,在k8s中有著廣泛的應(yīng)用。但是fluentd是ruby寫的,不利于深入源碼了解。當(dāng)然今天我們重點(diǎn)講的是另外一個(gè)agent--journalbeat。望文生義,隸屬于efk stack 中beats系列中的一員,專門用于收集journald日志。

journalbeat源碼解讀 journald日志簡介

長久以來 syslog 是每一個(gè) Unix 系統(tǒng)中的重要部件。在漫長的歷史中在各種 Linux 發(fā)行版中都有不同的實(shí)現(xiàn)去完成類似的工作,它們采取的是邏輯相近,并使用基本相同的文件格式。但是 syslog 也存在諸多的問題,隨著新設(shè)備的出現(xiàn)以及對安全的重視,這些缺點(diǎn)越發(fā)顯得突出,例如日志消息內(nèi)容無法驗(yàn)證、數(shù)據(jù)格式松散、日志檢索低效、有限的元數(shù)據(jù)保存、無法記錄二進(jìn)制數(shù)據(jù)等。
Journald是針對以上需求的解決方案。受udev事件啟發(fā),Journal 條目與環(huán)境組塊相似。一個(gè)鍵值域,按照換行符分開,使用大寫的變量名。除了支持ASCII 格式的字符串外,還能夠支持二進(jìn)制數(shù)據(jù),如 ATA SMART 健康信息、SCSI 數(shù)據(jù)。應(yīng)用程序和服務(wù)可以通過將項(xiàng)目域傳遞給systemd journald服務(wù)來生成項(xiàng)目。該服務(wù)可以為項(xiàng)目增加一定數(shù)量的元數(shù)據(jù)。這些受信任域的值由 Journal 服務(wù)來決定且無法由客戶端來偽造。在Journald中,可以把日志數(shù)據(jù)導(dǎo)出,在異地讀取,并不受處理器架構(gòu)的影響。這對嵌入式設(shè)備是很有用的功能,方便維護(hù)人員分析設(shè)備運(yùn)行狀況。
大致總結(jié)就是

journald日志是新的linux系統(tǒng)的具備的

journald區(qū)別于傳統(tǒng)的文件存儲(chǔ)方式,是二進(jìn)制存儲(chǔ)。需要用journalctl查看。

docker對于journald的支持

The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已經(jīng)增加了journald驅(qū)動(dòng)。

目前本司使用場景

我們的k8s集群,所有的docker輸出的日志格式都采用journald,這樣主機(jī)centos系統(tǒng)日志和docker的日志都用journalbeat來收集。

journalbeat實(shí)現(xiàn)關(guān)鍵

journalbeat整個(gè)實(shí)現(xiàn)過程,基本上兩點(diǎn):

與其他社區(qū)貢獻(xiàn)的beats系列,比如packetbeat,mysqlbeat類似,遵循了beats的框架和約定,journalbeat實(shí)現(xiàn)了run和stop等方法即可,然后作為一個(gè)客戶端,將收集到的數(shù)據(jù),publish到beats中。

讀取journald日志,采用了coreos開源的go-systemd庫中sdjournal部分。其實(shí)sdjournal是一個(gè)利用cgo 對于journald日志c接口的封裝。

源碼解讀

程序入口:

package main

import (
    "log"

    "github.com/elastic/beats/libbeat/beat"
    "github.com/mheese/journalbeat/beater"
)

func main() {
    err := beat.Run("journalbeat", "", beater.New)
    if err != nil {
        log.Fatal(err)
    }
}

整個(gè)journalbeat共實(shí)現(xiàn)了3個(gè)方法即可。run,stop,和new。
run和stop顧名思義,就是beats控制journalbeat的運(yùn)行和停止。
而new:
需要按照

// Creator initializes and configures a new Beater instance used to execute
// the beat its run-loop.
type Creator func(*Beat, *common.Config) (Beater, error)

實(shí)現(xiàn)Creator方法,返回的Beater實(shí)例,交由beats控制。
具體實(shí)現(xiàn):

// New creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    var err error
    if err = cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    jb := &Journalbeat{
        config:     config,
        done:       make(chan struct{}),
        cursorChan: make(chan string),
        pending:    make(chan *eventReference),
        completed:  make(chan *eventReference, config.PendingQueue.CompletedQueueSize),
    }

    if err = jb.initJournal(); err != nil {
        logp.Err("Failed to connect to the Systemd Journal: %v", err)
        return nil, err
    }

    jb.client = b.Publisher.Connect()
    return jb, nil
}

一般的beats中,都會(huì)有一些共同屬性。例如下面的done和client屬性。

// Journalbeat is the main Journalbeat struct
type Journalbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client

    journal *sdjournal.Journal

    cursorChan         chan string
    pending, completed chan *eventReference
    wg                 sync.WaitGroup
}

done是一個(gè)控制整個(gè)beater啟停的信號(hào)量。
而client 是與beats平臺(tái)通信的client。注意在初始化的時(shí)候,

jb.client = b.Publisher.Connect()

建立鏈接。
然后在收集到數(shù)據(jù),發(fā)送的時(shí)候,也是通過該client

select {
        case <-jb.done:
            return nil
        default:
            // we need to clone to avoid races since map is a pointer...
            jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
        }

注意上邊的發(fā)送姿勢和對于剛才提到的done信號(hào)量使用。
其他方法都是業(yè)務(wù)相關(guān)不再詳細(xì)解讀了。

journalbeat如何保證發(fā)送失敗的日志重新發(fā)送

關(guān)于這點(diǎn),個(gè)人感覺是最優(yōu)雅的部分

所有發(fā)送失敗的日志是會(huì)在程序結(jié)束之前以json格式保存到文件,完成持久化。
    // on exit fully consume both queues and flush to disk the pending queue
    defer func() {
        var wg sync.WaitGroup
        wg.Add(2)

        go func() {
            defer wg.Done()
            for evRef := range jb.pending {
                pending[evRef.cursor] = evRef.body
            }
        }()

        go func() {
            defer wg.Done()
            for evRef := range jb.completed {
                completed[evRef.cursor] = evRef.body
            }
        }()
        wg.Wait()

        logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
        if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
            logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
        }
    }()
程序啟動(dòng)以后首先會(huì)讀取之前持久化的發(fā)送失敗的日志,重新發(fā)送
// load the previously saved queue of unsent events and try to publish them if any
    if err := jb.publishPending(); err != nil {
        logp.Warn("could not read the pending queue: %s", err)
    }
client publish收集到的日志到beats,設(shè)置了publisher.Guaranteed模式,成功和失敗都有反饋
jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)

其中publisher.Signal(&eventSignal{ref, jb.completed})類似于一個(gè)回調(diào),凡是成功的都會(huì)寫成功的ref到j(luò)b.completed中。方便客戶端控制。

維護(hù)了兩個(gè)chan,一個(gè)存放客戶端發(fā)送的日志,一個(gè)存放服務(wù)端接受成功的日志,精確對比,可獲取發(fā)送失敗的日志,進(jìn)入重發(fā)動(dòng)作

journalbeat struct中有下面兩個(gè)屬性

    pending, completed chan *eventReference

每次客戶端發(fā)送一條日志,都會(huì)寫到pending。

case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed):
            if published := <-publishedChan; published {
                jb.pending <- ref

                // save cursor
                if jb.config.WriteCursorState {
                    jb.cursorChan <- rawEvent.Cursor
                }
            }
        }

publisher.Signal(&eventSignal{ref, jb.completed}),回調(diào)會(huì)將成功的寫到completed。
整個(gè)程序同時(shí)會(huì)啟動(dòng)一個(gè)
go jb.managePendingQueueLoop()
協(xié)程,專門用來定時(shí)重發(fā)失敗日志。

// managePendingQueueLoop runs the loop which manages the set of events waiting to be acked
func (jb *Journalbeat) managePendingQueueLoop() {
    jb.wg.Add(1)
    defer jb.wg.Done()
    pending := map[string]common.MapStr{}
    completed := map[string]common.MapStr{}

    // diff returns the difference between this map and the other.
    diff := func(this, other map[string]common.MapStr) map[string]common.MapStr {
        result := map[string]common.MapStr{}
        for k, v := range this {
            if _, ok := other[k]; !ok {
                result[k] = v
            }
        }
        return result
    }

    // flush saves the map[string]common.MapStr to the JSON file on disk
    flush := func(source map[string]common.MapStr, dest string) error {
        tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
        if err != nil {
            return err
        }

        if err = json.NewEncoder(tempFile).Encode(source); err != nil {
            _ = tempFile.Close()
            return err
        }

        _ = tempFile.Close()
        return os.Rename(tempFile.Name(), dest)
    }

    // on exit fully consume both queues and flush to disk the pending queue
    defer func() {
        var wg sync.WaitGroup
        wg.Add(2)

        go func() {
            defer wg.Done()
            for evRef := range jb.pending {
                pending[evRef.cursor] = evRef.body
            }
        }()

        go func() {
            defer wg.Done()
            for evRef := range jb.completed {
                completed[evRef.cursor] = evRef.body
            }
        }()
        wg.Wait()

        logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
        if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
            logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
        }
    }()

    // flush the pending queue to disk periodically
    tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
    for {
        select {
        case <-jb.done:
            return
        case p, ok := <-jb.pending:
            if ok {
                pending[p.cursor] = p.body
            }
        case c, ok := <-jb.completed:
            if ok {
                completed[c.cursor] = c.body
            }
        case <-tick:
            result := diff(pending, completed)
            if err := flush(result, jb.config.PendingQueue.File); err != nil {
                logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err)
            }
            pending = result
            completed = map[string]common.MapStr{}
        }
    }
}
總結(jié)

當(dāng)然還有一些其他的細(xì)節(jié),不再一一講述了。比如定時(shí)寫Cursor的功能和日志格式轉(zhuǎn)換等。具體的大家可以看源碼。主要是講了我認(rèn)為其優(yōu)雅的部分和為beats編寫beater的要點(diǎn)。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/33029.html

相關(guān)文章

  • k8s日志--journalbeat源碼解讀

    摘要:但是也存在諸多的問題,隨著新設(shè)備的出現(xiàn)以及對安全的重視,這些缺點(diǎn)越發(fā)顯得突出,例如日志消息內(nèi)容無法驗(yàn)證數(shù)據(jù)格式松散日志檢索低效有限的元數(shù)據(jù)保存無法記錄二進(jìn)制數(shù)據(jù)等。該服務(wù)可以為項(xiàng)目增加一定數(shù)量的元數(shù)據(jù)。 前言 對于日志系統(tǒng)的重要性不言而喻,參照滬江的一篇關(guān)于日志系統(tǒng)的介紹,基本上日志數(shù)據(jù)在以下幾方面具有非常重要的作用: 數(shù)據(jù)查找:通過檢索日志信息,定位相應(yīng)的 bug ,找出解決方案 ...

    jemygraw 評論0 收藏0
  • 快收藏!52篇25萬字,微服務(wù)、云原生、容器、K8S、Serverless精華文章集錦

    摘要:正在走遠(yuǎn),新年之初,小數(shù)精選過去一年閱讀量居高的技術(shù)干貨,從容器到微服務(wù)云原生,匯集成篇精華集錦,充分反映了這一年的技術(shù)熱點(diǎn)走向。此文值得收藏,方便隨時(shí)搜索和查看。,小數(shù)將繼續(xù)陪伴大家,為朋友們奉獻(xiàn)更有逼格的技術(shù)內(nèi)容。 2017正在走遠(yuǎn),新年之初,小數(shù)精選過去一年閱讀量居高的技術(shù)干貨,從容器、K8S 到微服務(wù)、云原生、Service Mesh,匯集成52篇精華集錦,充分反映了這一年的技...

    AaronYuan 評論0 收藏0
  • k8s網(wǎng)絡(luò)--Flannel源碼分析

    摘要:今天主要針對版本進(jìn)行源碼分析。外部接口的定義如下創(chuàng)建子網(wǎng)管理器負(fù)責(zé)子網(wǎng)的創(chuàng)建更新添加刪除監(jiān)聽等,主要和打交道定義續(xù)約。在到期之前,子網(wǎng)管理器調(diào)用該方法進(jìn)行續(xù)約。 前言 之前在k8s與網(wǎng)絡(luò)--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進(jìn)行源碼分析。首先需要理解三個(gè)比較重要的概念: 網(wǎng)絡(luò)(Network):整個(gè)集群中...

    wpw 評論0 收藏0
  • k8s網(wǎng)絡(luò)--Flannel源碼分析

    摘要:今天主要針對版本進(jìn)行源碼分析。外部接口的定義如下創(chuàng)建子網(wǎng)管理器負(fù)責(zé)子網(wǎng)的創(chuàng)建更新添加刪除監(jiān)聽等,主要和打交道定義續(xù)約。在到期之前,子網(wǎng)管理器調(diào)用該方法進(jìn)行續(xù)約。 前言 之前在k8s與網(wǎng)絡(luò)--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進(jìn)行源碼分析。首先需要理解三個(gè)比較重要的概念: 網(wǎng)絡(luò)(Network):整個(gè)集群中...

    hoohack 評論0 收藏0
  • k8s網(wǎng)絡(luò)--Flannel源碼分析

    摘要:今天主要針對版本進(jìn)行源碼分析。外部接口的定義如下創(chuàng)建子網(wǎng)管理器負(fù)責(zé)子網(wǎng)的創(chuàng)建更新添加刪除監(jiān)聽等,主要和打交道定義續(xù)約。在到期之前,子網(wǎng)管理器調(diào)用該方法進(jìn)行續(xù)約。 前言 之前在k8s與網(wǎng)絡(luò)--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進(jìn)行源碼分析。首先需要理解三個(gè)比較重要的概念: 網(wǎng)絡(luò)(Network):整個(gè)集群中...

    Jeffrrey 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<