摘要:改造的實(shí)現(xiàn)前言最近在使用的場(chǎng)景中,要求數(shù)據(jù)在程序意外終止的時(shí)候不丟失。按照最初的原始實(shí)現(xiàn)在內(nèi)部維護(hù)了兩個(gè),分別是和?;趯?shí)現(xiàn)的持久化在持久化機(jī)制的選型中,優(yōu)先實(shí)現(xiàn)。結(jié)語(yǔ)改造以后,可以根據(jù)自己的需求通過(guò)配置文件來(lái)決定使用或是來(lái)實(shí)現(xiàn)。
改造telegraf的buffer實(shí)現(xiàn) 前言
最近在使用telegraf的場(chǎng)景中,要求數(shù)據(jù)在程序意外終止的時(shí)候不丟失。按照telegraf最初的原始實(shí)現(xiàn),在running_output內(nèi)部維護(hù)了兩個(gè)buffer,分別是metrics和failMetrics。這兩個(gè)buffer是基于go中channel實(shí)現(xiàn)的。由于沒(méi)有持久化機(jī)制,在意外退出的時(shí)候,存在丟失數(shù)據(jù)的風(fēng)險(xiǎn)。所以這篇文章主要講述之前telegraf保證數(shù)據(jù)安全的一些措施和我們對(duì)代碼的一些優(yōu)化。
telegraf關(guān)于數(shù)據(jù)安全的處理辦法關(guān)于兩個(gè)buffer,定義在running_output.go的struct中。
// RunningOutput contains the output configuration type RunningOutput struct { Name string Output telegraf.Output Config *OutputConfig MetricBufferLimit int MetricBatchSize int MetricsFiltered selfstat.Stat MetricsWritten selfstat.Stat BufferSize selfstat.Stat BufferLimit selfstat.Stat WriteTime selfstat.Stat metrics *buffer.Buffer failMetrics *buffer.Buffer // Guards against concurrent calls to the Output as described in #3009 sync.Mutex }
這個(gè)兩個(gè)buffer的大小提供了配置參數(shù)可以設(shè)置。
metrics: buffer.NewBuffer(batchSize), failMetrics: buffer.NewBuffer(bufferLimit),
顧名思義。metrics存放要發(fā)送到指定output的metric,而failMetrics存放發(fā)送失敗的metric。當(dāng)然失敗的metrics會(huì)在telegraf重發(fā)機(jī)制下再次發(fā)送。
if ro.metrics.Len() == ro.MetricBatchSize { batch := ro.metrics.Batch(ro.MetricBatchSize) err := ro.write(batch) if err != nil { ro.failMetrics.Add(batch...) } }
在向metrics增加metrics的時(shí)候,做是否達(dá)到批量發(fā)送的數(shù)量,如果達(dá)到就調(diào)用發(fā)送方法。當(dāng)然還有定時(shí)的解決方案,如果一直沒(méi)有達(dá)到MetricBatchSize,也會(huì)在一定時(shí)間后發(fā)送數(shù)據(jù)。具體實(shí)現(xiàn)代碼在agent.go中
ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration) semaphore := make(chan struct{}, 1) for { select { case <-shutdown: log.Println("I! Hang on, flushing any cached metrics before shutdown") // wait for outMetricC to get flushed before flushing outputs wg.Wait() a.flush() return nil case <-ticker.C: go func() { select { case semaphore <- struct{}{}: internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown) a.flush() <-semaphore default: // skipping this flush because one is already happening log.Println("W! Skipping a scheduled flush because there is" + " already a flush ongoing.") } }()
在程序接受到停止信號(hào)后,程序會(huì)首先f(wàn)lush剩下的數(shù)據(jù)到output中,然后退出進(jìn)程。這樣可以保證一定的數(shù)據(jù)安全。
基于redis實(shí)現(xiàn)buffer的持久化在持久化機(jī)制的選型中,優(yōu)先實(shí)現(xiàn)redis。本身redis性能高,而且具備完善的持久化。
具體的實(shí)現(xiàn)架構(gòu)如下:
將原buffer中功能抽象出buffer.go接口。
具體代碼:
package buffer import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/buffer/memory" "github.com/influxdata/telegraf/internal/buffer/redis" ) const ( BufferTypeForMemory = "memory" BufferTypeForRedis = "redis" ) type Buffer interface { IsEmpty() bool Len() int Add(metrics ...telegraf.Metric) Batch(batchSize int) []telegraf.Metric } func NewBuffer(mod string, size int, key, addr string) Buffer { switch mod { case BufferTypeForRedis: return redis.NewBuffer(size, key, addr) default: return memory.NewBuffer(size) } }
然后分別內(nèi)存和redis實(shí)現(xiàn)了Buffer接口。
其中NewBuffer相當(dāng)于一個(gè)工廠方法。
當(dāng)然在后期可以實(shí)現(xiàn)基于file和db等buffer實(shí)現(xiàn),來(lái)滿足不同的場(chǎng)景和要求。
由于要滿足先進(jìn)先出的要求,選擇了redis的list數(shù)據(jù)結(jié)構(gòu)。redis中的list是一個(gè)字符串list,所以telegraf中metric數(shù)據(jù)接口要符合序列化的要求。比如屬性需要可導(dǎo)出,即public。所以這點(diǎn)需要改動(dòng)telegraf對(duì)于metric struct的定義。另外可以選擇json或是msgpack等序列化方式。我們這邊是采用的json序列化的方式。
結(jié)語(yǔ)改造以后,可以根據(jù)自己的需求通過(guò)配置文件來(lái)決定使用channel或是redis來(lái)實(shí)現(xiàn)buffer。各有優(yōu)劣,內(nèi)存實(shí)現(xiàn)的話,性能高,受到的依賴少。而redis這種分布式存儲(chǔ),決定了數(shù)據(jù)安全,但是性能會(huì)有一定的損耗,畢竟有大量的序列化和反序列化以及網(wǎng)絡(luò)傳輸,當(dāng)然依賴也增加了,取決于redis的可靠性,建議redis集群部署。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/32658.html
摘要:在我的前文容器可視化監(jiān)控中心搭建之中我們就實(shí)踐過(guò)容器的可視化監(jiān)控,在那篇文章中我們是使用了技術(shù)棧來(lái)完成的。 showImg(https://segmentfault.com/img/remote/1460000015484084); 概述 性能監(jiān)控是容器服務(wù)必不可少的基礎(chǔ)設(shè)施,容器化應(yīng)用運(yùn)行于宿主機(jī)上,我們需要知道該容器的運(yùn)行情況,包括 CPU使用率、內(nèi)存占用、網(wǎng)絡(luò)狀況以及磁盤(pán)空間等...
摘要:的展示非常炫酷,絕對(duì)是運(yùn)維提升逼格的一大利器。另外的可視化功能比強(qiáng)得多,而且以上版本將集成報(bào)警功能。它由寫(xiě)成,著力于高性能地查詢與存儲(chǔ)時(shí)序型數(shù)據(jù)。被廣泛應(yīng)用于存儲(chǔ)系統(tǒng)的監(jiān)控?cái)?shù)據(jù),行業(yè)的實(shí)時(shí)數(shù)據(jù)等場(chǎng)景。 原有監(jiān)控系統(tǒng) showImg(https://segmentfault.com/img/remote/1460000011082384); 整個(gè)系統(tǒng)以 Graphite (carbon ...
摘要:宋體本文從拉勾網(wǎng)的業(yè)務(wù)架構(gòu)日志采集監(jiān)控服務(wù)暴露調(diào)用等方面介紹了其基于的容器化改造實(shí)踐。宋體此外,拉勾網(wǎng)還有一套自研的環(huán)境的業(yè)務(wù)發(fā)布系統(tǒng),不過(guò)這套發(fā)布系統(tǒng)未適配容器環(huán)境。寫(xiě)在前面 拉勾網(wǎng)于 2019 年 3 月份開(kāi)始嘗試將生產(chǎn)環(huán)境的業(yè)務(wù)從 UHost 遷移到 UK8S,截至 2019 年 9 月份,QA 環(huán)境的大部分業(yè)務(wù)模塊已經(jīng)完成容器化改造,生產(chǎn)環(huán)境中,后臺(tái)管理服務(wù)已全部遷移到 UK8...
摘要:典型實(shí)現(xiàn)不同的監(jiān)控模塊,側(cè)重于不同領(lǐng)域,有著不同的職責(zé)。指標(biāo)收集方面,支持多樣化的組件將被優(yōu)先下使用。以上談了這么多,僅僅是聊了一下收集方面而已。 更多文章,請(qǐng)移步微信公眾號(hào)《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...監(jiān)控是分布式系統(tǒng)的必備組件,能夠起到提前預(yù)警、問(wèn)題排查、評(píng)估決策等功效,乃行走江湖、居家必備之良品。 監(jiān)控系統(tǒng)概要 功能劃分...
摘要:典型實(shí)現(xiàn)不同的監(jiān)控模塊,側(cè)重于不同領(lǐng)域,有著不同的職責(zé)。指標(biāo)收集方面,支持多樣化的組件將被優(yōu)先下使用。以上談了這么多,僅僅是聊了一下收集方面而已。 更多文章,請(qǐng)移步微信公眾號(hào)《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...監(jiān)控是分布式系統(tǒng)的必備組件,能夠起到提前預(yù)警、問(wèn)題排查、評(píng)估決策等功效,乃行走江湖、居家必備之良品。 監(jiān)控系統(tǒng)概要 功能劃分...
閱讀 1602·2021-09-22 15:52
閱讀 3523·2021-09-22 14:59
閱讀 2956·2021-09-02 15:12
閱讀 1033·2021-08-20 09:35
閱讀 1617·2019-08-30 14:09
閱讀 2749·2019-08-30 13:56
閱讀 1713·2019-08-26 18:27
閱讀 3410·2019-08-26 13:37