摘要:采用實(shí)現(xiàn)的插件前言目前社區(qū)日志采集和處理的組件不少,之前方案中的,社區(qū)中的,方案中的以及大數(shù)據(jù)用到比較多的。適合采用的方案,實(shí)現(xiàn)日志中心化收集的方案。主要負(fù)責(zé)采集,負(fù)責(zé)處理和傳送。
采用golang實(shí)現(xiàn)Fluent Bit的output插件 前言
目前社區(qū)日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區(qū)中的fluentd,efk方案中的filebeat,以及大數(shù)據(jù)用到比較多的flume。而Fluent Bit是一款用c語(yǔ)言編寫(xiě)的高性能的日志收集組件,整個(gè)架構(gòu)源于fluentd。官方比較數(shù)據(jù)如下:
Fluentd | Fluent Bit | |
---|---|---|
Scope | Containers / Servers | Containers / Servers |
Language | C & Ruby | C |
Memory | ~40MB | ~450KB |
Performance | High Performance | High Performance |
Dependencies | Built as a Ruby Gem, it requires a certain number of gems. | Zero dependencies, unless some special plugin requires them. |
Plugins | More than 650 plugins available | Around 35 plugins available |
License | Apache License v2.0 | Apache License v2.0 |
通過(guò)數(shù)據(jù)可以看出,fluent bit 占用資源更少。適合采用fluent bit + fluentd 的方案,實(shí)現(xiàn)日志中心化收集的方案。fluent bit主要負(fù)責(zé)采集,fluentd負(fù)責(zé)處理和傳送。
擴(kuò)展output插件fluent bit 本身是C語(yǔ)言編寫(xiě),擴(kuò)展插件有一定的難度。可能官方考慮到這一點(diǎn),實(shí)現(xiàn)了fluent-bit-go,可以實(shí)現(xiàn)采用go語(yǔ)言來(lái)編寫(xiě)插件,目前只支持output的編寫(xiě)。
fluent-bit-go其實(shí)就是利用cgo,封裝了c接口。代碼比較簡(jiǎn)單,主要分析其中一個(gè)關(guān)鍵文件
package output /* #include#include "flb_plugin.h" #include "flb_output.h" */ import "C" import "fmt" import "unsafe" // Define constants matching Fluent Bit core const FLB_ERROR = C.FLB_ERROR const FLB_OK = C.FLB_OK const FLB_RETRY = C.FLB_RETRY const FLB_PROXY_OUTPUT_PLUGIN = C.FLB_PROXY_OUTPUT_PLUGIN const FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG // Local type to define a plugin definition type FLBPlugin C.struct_flb_plugin_proxy type FLBOutPlugin C.struct_flbgo_output_plugin // When the FLBPluginInit is triggered by Fluent Bit, a plugin context // is passed and the next step is to invoke this FLBPluginRegister() function // to fill the required information: type, proxy type, flags name and // description. func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int { p := (*FLBPlugin) (unsafe.Pointer(ctx)) p._type = FLB_PROXY_OUTPUT_PLUGIN p.proxy = FLB_PROXY_GOLANG p.flags = 0 p.name = C.CString(name) p.description = C.CString(desc) return 0 } // Release resources allocated by the plugin initialization func FLBPluginUnregister(ctx unsafe.Pointer) { p := (*FLBPlugin) (unsafe.Pointer(ctx)) fmt.Printf("[flbgo] unregistering %v ", p) C.free(unsafe.Pointer(p.name)) C.free(unsafe.Pointer(p.description)) } func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string { _key := C.CString(key) return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx))) }
主要是定義了一些編寫(xiě)插件需要用到的變量和方法,例如FLBPluginRegister注冊(cè)組件,F(xiàn)LBPluginConfigKey獲取配置文件設(shè)定參數(shù)等。
PS
實(shí)際上用golang調(diào)用fluent-bit-go,再加一些實(shí)際的業(yè)務(wù)邏輯實(shí)現(xiàn),最終編譯成一個(gè)c-share的.so動(dòng)態(tài)鏈接庫(kù)。
實(shí)際上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是實(shí)際項(xiàng)目中,并不滿(mǎn)足我們的需求,必須定制化。
當(dāng)然接下來(lái)的代碼主要是作為一個(gè)demo,講清楚如何編寫(xiě)一個(gè)output插件。
先上代碼:
package main import ( "C" "fmt" "io" "log" "reflect" "strconv" "strings" "time" "unsafe" "github.com/Shopify/sarama" "github.com/fluent/fluent-bit-go/output" "github.com/ugorji/go/codec" ) var ( brokers []string producer sarama.SyncProducer timeout = 0 * time.Minute topic string module string messageKey string ) //export FLBPluginRegister func FLBPluginRegister(ctx unsafe.Pointer) int { return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!") } //export FLBPluginInit // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int { if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" { brokers = strings.Split(bs, ",") } else { log.Printf("you must set brokers") return output.FLB_ERROR } if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" { topic = tp } else { log.Printf("you must set topics") return output.FLB_ERROR } if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" { module = mo } else { log.Printf("you must set module") return output.FLB_ERROR } if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" { messageKey = key } else { log.Printf("you must set message_key") return output.FLB_ERROR } config := sarama.NewConfig() config.Producer.Return.Successes = true if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" { if acks, err := strconv.Atoi(required_acks); err == nil { config.Producer.RequiredAcks = sarama.RequiredAcks(acks) } } if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" { if codec, err := strconv.Atoi(compression_codec); err == nil { config.Producer.Compression = sarama.CompressionCodec(codec) } } if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" { if max_retry, err := strconv.Atoi(max_retry); err == nil { config.Producer.Retry.Max = max_retry } } if timeout == 0 { timeout = 5 * time.Minute } // If Kafka is not running on init, wait to connect deadline := time.Now().Add(timeout) for tries := 0; time.Now().Before(deadline); tries++ { var err error if producer == nil { producer, err = sarama.NewSyncProducer(brokers, config) } if err == nil { return output.FLB_OK } log.Printf("Cannot connect to Kafka: (%s) retrying...", err) time.Sleep(time.Second * 30) } log.Printf("Kafka failed to respond after %s", timeout) return output.FLB_ERROR } //export FLBPluginFlush // FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent. func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { var h codec.MsgpackHandle var b []byte var m interface{} var err error b = C.GoBytes(data, length) dec := codec.NewDecoderBytes(b, &h) // Iterate the original MessagePack array var msgs []*sarama.ProducerMessage for { // decode the msgpack data err = dec.Decode(&m) if err != nil { if err == io.EOF { break } log.Printf("Failed to decode msgpack data: %v ", err) return output.FLB_ERROR } // Get a slice and their two entries: timestamp and map slice := reflect.ValueOf(m) data := slice.Index(1) // Convert slice data to a real map and iterate mapData := data.Interface().(map[interface{}]interface{}) flattenData, err := Flatten(mapData, "", UnderscoreStyle) if err != nil { break } message := "" host := "" for k, v := range flattenData { value := "" switch t := v.(type) { case string: value = t case []byte: value = string(t) default: value = fmt.Sprintf("%v", v) } if k == "pod_name" { host = value } if k == messageKey { message = value } } if message == "" || host == "" { break } m := &sarama.ProducerMessage{ Topic: topic, Key: sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)), Value: sarama.ByteEncoder(message), } msgs = append(msgs, m) } err = producer.SendMessages(msgs) if err != nil { log.Printf("FAILED to send kafka message: %s ", err) return output.FLB_ERROR } return output.FLB_OK } //export FLBPluginExit func FLBPluginExit() int { producer.Close() return output.FLB_OK } func main() { }
FLBPluginExit 插件退出的時(shí)候需要執(zhí)行的一些方法,比如關(guān)閉連接。
FLBPluginRegister 注冊(cè)插件
FLBPluginInit 插件初始化
FLBPluginFlush flush到數(shù)據(jù)到output
FLBPluginConfigKey 獲取配置文件中參數(shù)
PS
當(dāng)然除了FLBPluginConfigKey之外,也可以通過(guò)獲取環(huán)境變量來(lái)獲得設(shè)置參數(shù)。
ctx相當(dāng)于一個(gè)上下文,負(fù)責(zé)之間的數(shù)據(jù)的傳遞。
編譯的時(shí)候
go build -buildmode=c-shared -o out_kafka.so .
生成out_kafka.so
執(zhí)行的時(shí)候
/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so總結(jié)
采用類(lèi)似的編寫(xiě)結(jié)構(gòu),就可以定制化自己的輸出插件了。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/33069.html
摘要:采用實(shí)現(xiàn)的插件前言目前社區(qū)日志采集和處理的組件不少,之前方案中的,社區(qū)中的,方案中的以及大數(shù)據(jù)用到比較多的。適合采用的方案,實(shí)現(xiàn)日志中心化收集的方案。主要負(fù)責(zé)采集,負(fù)責(zé)處理和傳送。 采用golang實(shí)現(xiàn)Fluent Bit的output插件 前言 目前社區(qū)日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區(qū)中的fluentd,efk方案中的filebeat,以及大...
摘要:是一個(gè)開(kāi)源和多平臺(tái)的,它允許您從不同的來(lái)源收集數(shù)據(jù)日志,統(tǒng)一并將它們發(fā)送到多個(gè)目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業(yè)務(wù)落盤(pán)的日志。部署方案采取部署。 前言 收集日志的組件多不勝數(shù),有ELK久負(fù)盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數(shù)據(jù)領(lǐng)域...
摘要:是一個(gè)開(kāi)源和多平臺(tái)的,它允許您從不同的來(lái)源收集數(shù)據(jù)日志,統(tǒng)一并將它們發(fā)送到多個(gè)目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業(yè)務(wù)落盤(pán)的日志。部署方案采取部署。 前言 收集日志的組件多不勝數(shù),有ELK久負(fù)盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數(shù)據(jù)領(lǐng)域...
摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時(shí)沒(méi)有提供日志查看機(jī)制。以如下的形式啟動(dòng)容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對(duì)k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...
摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時(shí)沒(méi)有提供日志查看機(jī)制。以如下的形式啟動(dòng)容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對(duì)k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...
閱讀 2951·2023-04-26 02:14
閱讀 3795·2019-08-30 15:55
閱讀 1883·2019-08-29 16:42
閱讀 2790·2019-08-26 11:55
閱讀 2876·2019-08-23 13:38
閱讀 519·2019-08-23 12:10
閱讀 1339·2019-08-23 11:44
閱讀 2881·2019-08-23 11:43