摘要:但是也存在諸多的問題,隨著新設備的出現(xiàn)以及對安全的重視,這些缺點越發(fā)顯得突出,例如日志消息內容無法驗證數(shù)據(jù)格式松散日志檢索低效有限的元數(shù)據(jù)保存無法記錄二進制數(shù)據(jù)等。該服務可以為項目增加一定數(shù)量的元數(shù)據(jù)。
前言
對于日志系統(tǒng)的重要性不言而喻,參照滬江的一篇關于日志系統(tǒng)的介紹,基本上日志數(shù)據(jù)在以下幾方面具有非常重要的作用:
數(shù)據(jù)查找:通過檢索日志信息,定位相應的 bug ,找出解決方案
服務診斷:通過對日志信息進行統(tǒng)計、分析,了解服務器的負荷和服務運行狀態(tài)
數(shù)據(jù)分析:可以做進一步的數(shù)據(jù)分析,比如根據(jù)請求中的課程 id ,找出 TOP10 用戶感興趣課程
日志+大數(shù)據(jù)+AI的確有很多想象空間。
而對于收集系統(tǒng),流行的技術stack有之前的elk,到現(xiàn)在的efk。logstash換成了filebeat。當然日志收集agent,也有flume和fluentd,尤其fluentd屬于cncf組織的產(chǎn)品,在k8s中有著廣泛的應用。但是fluentd是ruby寫的,不利于深入源碼了解。當然今天我們重點講的是另外一個agent--journalbeat。望文生義,隸屬于efk stack 中beats系列中的一員,專門用于收集journald日志。
長久以來 syslog 是每一個 Unix 系統(tǒng)中的重要部件。在漫長的歷史中在各種 Linux 發(fā)行版中都有不同的實現(xiàn)去完成類似的工作,它們采取的是邏輯相近,并使用基本相同的文件格式。但是 syslog 也存在諸多的問題,隨著新設備的出現(xiàn)以及對安全的重視,這些缺點越發(fā)顯得突出,例如日志消息內容無法驗證、數(shù)據(jù)格式松散、日志檢索低效、有限的元數(shù)據(jù)保存、無法記錄二進制數(shù)據(jù)等。
Journald是針對以上需求的解決方案。受udev事件啟發(fā),Journal 條目與環(huán)境組塊相似。一個鍵值域,按照換行符分開,使用大寫的變量名。除了支持ASCII 格式的字符串外,還能夠支持二進制數(shù)據(jù),如 ATA SMART 健康信息、SCSI 數(shù)據(jù)。應用程序和服務可以通過將項目域傳遞給systemd journald服務來生成項目。該服務可以為項目增加一定數(shù)量的元數(shù)據(jù)。這些受信任域的值由 Journal 服務來決定且無法由客戶端來偽造。在Journald中,可以把日志數(shù)據(jù)導出,在異地讀取,并不受處理器架構的影響。這對嵌入式設備是很有用的功能,方便維護人員分析設備運行狀況。
大致總結就是
journald日志是新的linux系統(tǒng)的具備的
journald區(qū)別于傳統(tǒng)的文件存儲方式,是二進制存儲。需要用journalctl查看。
docker對于journald的支持The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已經(jīng)增加了journald驅動。
我們的k8s集群,所有的docker輸出的日志格式都采用journald,這樣主機centos系統(tǒng)日志和docker的日志都用journalbeat來收集。
journalbeat實現(xiàn)關鍵journalbeat整個實現(xiàn)過程,基本上兩點:
與其他社區(qū)貢獻的beats系列,比如packetbeat,mysqlbeat類似,遵循了beats的框架和約定,journalbeat實現(xiàn)了run和stop等方法即可,然后作為一個客戶端,將收集到的數(shù)據(jù),publish到beats中。
讀取journald日志,采用了coreos開源的go-systemd庫中sdjournal部分。其實sdjournal是一個利用cgo 對于journald日志c接口的封裝。
源碼解讀程序入口:
package main import ( "log" "github.com/elastic/beats/libbeat/beat" "github.com/mheese/journalbeat/beater" ) func main() { err := beat.Run("journalbeat", "", beater.New) if err != nil { log.Fatal(err) } }
整個journalbeat共實現(xiàn)了3個方法即可。run,stop,和new。
run和stop顧名思義,就是beats控制journalbeat的運行和停止。
而new:
需要按照
// Creator initializes and configures a new Beater instance used to execute // the beat its run-loop. type Creator func(*Beat, *common.Config) (Beater, error)
實現(xiàn)Creator方法,返回的Beater實例,交由beats控制。
具體實現(xiàn):
// New creates beater func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { config := config.DefaultConfig var err error if err = cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("Error reading config file: %v", err) } jb := &Journalbeat{ config: config, done: make(chan struct{}), cursorChan: make(chan string), pending: make(chan *eventReference), completed: make(chan *eventReference, config.PendingQueue.CompletedQueueSize), } if err = jb.initJournal(); err != nil { logp.Err("Failed to connect to the Systemd Journal: %v", err) return nil, err } jb.client = b.Publisher.Connect() return jb, nil }
一般的beats中,都會有一些共同屬性。例如下面的done和client屬性。
// Journalbeat is the main Journalbeat struct type Journalbeat struct { done chan struct{} config config.Config client publisher.Client journal *sdjournal.Journal cursorChan chan string pending, completed chan *eventReference wg sync.WaitGroup }
done是一個控制整個beater啟停的信號量。
而client 是與beats平臺通信的client。注意在初始化的時候,
jb.client = b.Publisher.Connect()
建立鏈接。
然后在收集到數(shù)據(jù),發(fā)送的時候,也是通過該client
select { case <-jb.done: return nil default: // we need to clone to avoid races since map is a pointer... jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed) }
注意上邊的發(fā)送姿勢和對于剛才提到的done信號量使用。
其他方法都是業(yè)務相關不再詳細解讀了。
關于這點,個人感覺是最優(yōu)雅的部分
所有發(fā)送失敗的日志是會在程序結束之前以json格式保存到文件,完成持久化。// on exit fully consume both queues and flush to disk the pending queue defer func() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for evRef := range jb.pending { pending[evRef.cursor] = evRef.body } }() go func() { defer wg.Done() for evRef := range jb.completed { completed[evRef.cursor] = evRef.body } }() wg.Wait() logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed))) if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil { logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err) } }()程序啟動以后首先會讀取之前持久化的發(fā)送失敗的日志,重新發(fā)送
// load the previously saved queue of unsent events and try to publish them if any if err := jb.publishPending(); err != nil { logp.Warn("could not read the pending queue: %s", err) }client publish收集到的日志到beats,設置了publisher.Guaranteed模式,成功和失敗都有反饋
jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
其中publisher.Signal(&eventSignal{ref, jb.completed})類似于一個回調,凡是成功的都會寫成功的ref到jb.completed中。方便客戶端控制。
維護了兩個chan,一個存放客戶端發(fā)送的日志,一個存放服務端接受成功的日志,精確對比,可獲取發(fā)送失敗的日志,進入重發(fā)動作journalbeat struct中有下面兩個屬性
pending, completed chan *eventReference
每次客戶端發(fā)送一條日志,都會寫到pending。
case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed): if published := <-publishedChan; published { jb.pending <- ref // save cursor if jb.config.WriteCursorState { jb.cursorChan <- rawEvent.Cursor } } }
publisher.Signal(&eventSignal{ref, jb.completed}),回調會將成功的寫到completed。
整個程序同時會啟動一個
go jb.managePendingQueueLoop()
協(xié)程,專門用來定時重發(fā)失敗日志。
// managePendingQueueLoop runs the loop which manages the set of events waiting to be acked func (jb *Journalbeat) managePendingQueueLoop() { jb.wg.Add(1) defer jb.wg.Done() pending := map[string]common.MapStr{} completed := map[string]common.MapStr{} // diff returns the difference between this map and the other. diff := func(this, other map[string]common.MapStr) map[string]common.MapStr { result := map[string]common.MapStr{} for k, v := range this { if _, ok := other[k]; !ok { result[k] = v } } return result } // flush saves the map[string]common.MapStr to the JSON file on disk flush := func(source map[string]common.MapStr, dest string) error { tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest))) if err != nil { return err } if err = json.NewEncoder(tempFile).Encode(source); err != nil { _ = tempFile.Close() return err } _ = tempFile.Close() return os.Rename(tempFile.Name(), dest) } // on exit fully consume both queues and flush to disk the pending queue defer func() { var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() for evRef := range jb.pending { pending[evRef.cursor] = evRef.body } }() go func() { defer wg.Done() for evRef := range jb.completed { completed[evRef.cursor] = evRef.body } }() wg.Wait() logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed))) if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil { logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err) } }() // flush the pending queue to disk periodically tick := time.Tick(jb.config.PendingQueue.FlushPeriod) for { select { case <-jb.done: return case p, ok := <-jb.pending: if ok { pending[p.cursor] = p.body } case c, ok := <-jb.completed: if ok { completed[c.cursor] = c.body } case <-tick: result := diff(pending, completed) if err := flush(result, jb.config.PendingQueue.File); err != nil { logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err) } pending = result completed = map[string]common.MapStr{} } } }總結
當然還有一些其他的細節(jié),不再一一講述了。比如定時寫Cursor的功能和日志格式轉換等。具體的大家可以看源碼。主要是講了我認為其優(yōu)雅的部分和為beats編寫beater的要點。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/32627.html
摘要:但是也存在諸多的問題,隨著新設備的出現(xiàn)以及對安全的重視,這些缺點越發(fā)顯得突出,例如日志消息內容無法驗證數(shù)據(jù)格式松散日志檢索低效有限的元數(shù)據(jù)保存無法記錄二進制數(shù)據(jù)等。該服務可以為項目增加一定數(shù)量的元數(shù)據(jù)。 前言 對于日志系統(tǒng)的重要性不言而喻,參照滬江的一篇關于日志系統(tǒng)的介紹,基本上日志數(shù)據(jù)在以下幾方面具有非常重要的作用: 數(shù)據(jù)查找:通過檢索日志信息,定位相應的 bug ,找出解決方案 ...
摘要:正在走遠,新年之初,小數(shù)精選過去一年閱讀量居高的技術干貨,從容器到微服務云原生,匯集成篇精華集錦,充分反映了這一年的技術熱點走向。此文值得收藏,方便隨時搜索和查看。,小數(shù)將繼續(xù)陪伴大家,為朋友們奉獻更有逼格的技術內容。 2017正在走遠,新年之初,小數(shù)精選過去一年閱讀量居高的技術干貨,從容器、K8S 到微服務、云原生、Service Mesh,匯集成52篇精華集錦,充分反映了這一年的技...
摘要:今天主要針對版本進行源碼分析。外部接口的定義如下創(chuàng)建子網(wǎng)管理器負責子網(wǎng)的創(chuàng)建更新添加刪除監(jiān)聽等,主要和打交道定義續(xù)約。在到期之前,子網(wǎng)管理器調用該方法進行續(xù)約。 前言 之前在k8s與網(wǎng)絡--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進行源碼分析。首先需要理解三個比較重要的概念: 網(wǎng)絡(Network):整個集群中...
摘要:今天主要針對版本進行源碼分析。外部接口的定義如下創(chuàng)建子網(wǎng)管理器負責子網(wǎng)的創(chuàng)建更新添加刪除監(jiān)聽等,主要和打交道定義續(xù)約。在到期之前,子網(wǎng)管理器調用該方法進行續(xù)約。 前言 之前在k8s與網(wǎng)絡--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進行源碼分析。首先需要理解三個比較重要的概念: 網(wǎng)絡(Network):整個集群中...
摘要:今天主要針對版本進行源碼分析。外部接口的定義如下創(chuàng)建子網(wǎng)管理器負責子網(wǎng)的創(chuàng)建更新添加刪除監(jiān)聽等,主要和打交道定義續(xù)約。在到期之前,子網(wǎng)管理器調用該方法進行續(xù)約。 前言 之前在k8s與網(wǎng)絡--Flannel解讀一文中,我們主要講了Flannel整體的工作原理。今天主要針對Flannel v0.10.0版本進行源碼分析。首先需要理解三個比較重要的概念: 網(wǎng)絡(Network):整個集群中...
閱讀 1191·2023-04-26 02:38
閱讀 1483·2021-11-22 09:34
閱讀 1191·2021-09-26 10:19
閱讀 3180·2019-08-29 17:15
閱讀 3532·2019-08-29 12:27
閱讀 1722·2019-08-26 13:51
閱讀 1869·2019-08-26 13:47
閱讀 1020·2019-08-26 12:20