成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

k8s與日志--采用golang實(shí)現(xiàn)Fluent Bit的output插件

岳光 / 3851人閱讀

摘要:采用實(shí)現(xiàn)的插件前言目前社區(qū)日志采集和處理的組件不少,之前方案中的,社區(qū)中的,方案中的以及大數(shù)據(jù)用到比較多的。適合采用的方案,實(shí)現(xiàn)日志中心化收集的方案。主要負(fù)責(zé)采集,負(fù)責(zé)處理和傳送。

采用golang實(shí)現(xiàn)Fluent Bit的output插件 前言

目前社區(qū)日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區(qū)中的fluentd,efk方案中的filebeat,以及大數(shù)據(jù)用到比較多的flume。而Fluent Bit是一款用c語(yǔ)言編寫(xiě)的高性能的日志收集組件,整個(gè)架構(gòu)源于fluentd。官方比較數(shù)據(jù)如下:

Fluentd Fluent Bit
Scope Containers / Servers Containers / Servers
Language C & Ruby C
Memory ~40MB ~450KB
Performance High Performance High Performance
Dependencies Built as a Ruby Gem, it requires a certain number of gems. Zero dependencies, unless some special plugin requires them.
Plugins More than 650 plugins available Around 35 plugins available
License Apache License v2.0 Apache License v2.0

通過(guò)數(shù)據(jù)可以看出,fluent bit 占用資源更少。適合采用fluent bit + fluentd 的方案,實(shí)現(xiàn)日志中心化收集的方案。fluent bit主要負(fù)責(zé)采集,fluentd負(fù)責(zé)處理和傳送。

擴(kuò)展output插件

fluent bit 本身是C語(yǔ)言編寫(xiě),擴(kuò)展插件有一定的難度。可能官方考慮到這一點(diǎn),實(shí)現(xiàn)了fluent-bit-go,可以實(shí)現(xiàn)采用go語(yǔ)言來(lái)編寫(xiě)插件,目前只支持output的編寫(xiě)。
fluent-bit-go其實(shí)就是利用cgo,封裝了c接口。代碼比較簡(jiǎn)單,主要分析其中一個(gè)關(guān)鍵文件

package output

/*
#include 
#include "flb_plugin.h"
#include "flb_output.h"
*/
import "C"
import "fmt"
import "unsafe"

// Define constants matching Fluent Bit core
const FLB_ERROR               =  C.FLB_ERROR
const FLB_OK                  =  C.FLB_OK
const FLB_RETRY               =  C.FLB_RETRY

const FLB_PROXY_OUTPUT_PLUGIN =  C.FLB_PROXY_OUTPUT_PLUGIN
const FLB_PROXY_GOLANG        =  C.FLB_PROXY_GOLANG

// Local type to define a plugin definition
type FLBPlugin C.struct_flb_plugin_proxy
type FLBOutPlugin C.struct_flbgo_output_plugin

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    p._type = FLB_PROXY_OUTPUT_PLUGIN
    p.proxy = FLB_PROXY_GOLANG
    p.flags = 0
    p.name  = C.CString(name)
    p.description = C.CString(desc)
    return 0
}

// Release resources allocated by the plugin initialization
func FLBPluginUnregister(ctx unsafe.Pointer) {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    fmt.Printf("[flbgo] unregistering %v
", p)
    C.free(unsafe.Pointer(p.name))
    C.free(unsafe.Pointer(p.description))
}

func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
    _key := C.CString(key)
    return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
}

主要是定義了一些編寫(xiě)插件需要用到的變量和方法,例如FLBPluginRegister注冊(cè)組件,F(xiàn)LBPluginConfigKey獲取配置文件設(shè)定參數(shù)等。
PS
實(shí)際上用golang調(diào)用fluent-bit-go,再加一些實(shí)際的業(yè)務(wù)邏輯實(shí)現(xiàn),最終編譯成一個(gè)c-share的.so動(dòng)態(tài)鏈接庫(kù)。

定制fluent-bit-kafka-ouput插件

實(shí)際上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是實(shí)際項(xiàng)目中,并不滿(mǎn)足我們的需求,必須定制化。
當(dāng)然接下來(lái)的代碼主要是作為一個(gè)demo,講清楚如何編寫(xiě)一個(gè)output插件。

代碼編寫(xiě)和分析

先上代碼:

package main

import (
    "C"
    "fmt"
    "io"
    "log"
    "reflect"
    "strconv"
    "strings"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    "github.com/fluent/fluent-bit-go/output"
    "github.com/ugorji/go/codec"
)

var (
    brokers    []string
    producer   sarama.SyncProducer
    timeout    = 0 * time.Minute
    topic      string
    module     string
    messageKey string
)

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
    return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

    if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
        brokers = strings.Split(bs, ",")
    } else {
        log.Printf("you must set brokers")
        return output.FLB_ERROR
    }

    if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
        topic = tp
    } else {
        log.Printf("you must set topics")
        return output.FLB_ERROR
    }

    if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
        module = mo
    } else {
        log.Printf("you must set module")
        return output.FLB_ERROR
    }

    if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
        messageKey = key
    } else {
        log.Printf("you must set message_key")
        return output.FLB_ERROR
    }

    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
        if acks, err := strconv.Atoi(required_acks); err == nil {
            config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
        }
    }

    if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
        if codec, err := strconv.Atoi(compression_codec); err == nil {
            config.Producer.Compression = sarama.CompressionCodec(codec)
        }
    }

    if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
        if max_retry, err := strconv.Atoi(max_retry); err == nil {
            config.Producer.Retry.Max = max_retry
        }
    }

    if timeout == 0 {
        timeout = 5 * time.Minute
    }
    // If Kafka is not running on init, wait to connect
    deadline := time.Now().Add(timeout)
    for tries := 0; time.Now().Before(deadline); tries++ {
        var err error
        if producer == nil {
            producer, err = sarama.NewSyncProducer(brokers, config)
        }
        if err == nil {
            return output.FLB_OK
        }
        log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
        time.Sleep(time.Second * 30)
    }

    log.Printf("Kafka failed to respond after %s", timeout)
    return output.FLB_ERROR

}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
    var h codec.MsgpackHandle

    var b []byte
    var m interface{}
    var err error

    b = C.GoBytes(data, length)
    dec := codec.NewDecoderBytes(b, &h)

    // Iterate the original MessagePack array
    var msgs []*sarama.ProducerMessage
    for {
        // decode the msgpack data
        err = dec.Decode(&m)
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Printf("Failed to decode msgpack data: %v
", err)
            return output.FLB_ERROR
        }

        // Get a slice and their two entries: timestamp and map
        slice := reflect.ValueOf(m)
        data := slice.Index(1)

        // Convert slice data to a real map and iterate
        mapData := data.Interface().(map[interface{}]interface{})
        flattenData, err := Flatten(mapData, "", UnderscoreStyle)
        if err != nil {
            break
        }

        message := ""
        host := ""

        for k, v := range flattenData {
            value := ""
            switch t := v.(type) {
            case string:
                value = t
            case []byte:
                value = string(t)
            default:
                value = fmt.Sprintf("%v", v)
            }

            if k == "pod_name" {
                host = value
            }

            if k == messageKey {
                message = value
            }

        }
        if message == "" || host == "" {
            break
        }

        m := &sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
            Value: sarama.ByteEncoder(message),
        }
        msgs = append(msgs, m)

    }

    err = producer.SendMessages(msgs)
    if err != nil {
        log.Printf("FAILED to send kafka message: %s
", err)
        return output.FLB_ERROR
    }
    return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
    producer.Close()
    return output.FLB_OK
}

func main() {
}

FLBPluginExit 插件退出的時(shí)候需要執(zhí)行的一些方法,比如關(guān)閉連接。

FLBPluginRegister 注冊(cè)插件

FLBPluginInit 插件初始化

FLBPluginFlush flush到數(shù)據(jù)到output

FLBPluginConfigKey 獲取配置文件中參數(shù)

PS
當(dāng)然除了FLBPluginConfigKey之外,也可以通過(guò)獲取環(huán)境變量來(lái)獲得設(shè)置參數(shù)。
ctx相當(dāng)于一個(gè)上下文,負(fù)責(zé)之間的數(shù)據(jù)的傳遞。

編譯和執(zhí)行

編譯的時(shí)候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

執(zhí)行的時(shí)候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
總結(jié)

采用類(lèi)似的編寫(xiě)結(jié)構(gòu),就可以定制化自己的輸出插件了。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/33069.html

相關(guān)文章

  • k8s日志--采用golang實(shí)現(xiàn)Fluent Bitoutput插件

    摘要:采用實(shí)現(xiàn)的插件前言目前社區(qū)日志采集和處理的組件不少,之前方案中的,社區(qū)中的,方案中的以及大數(shù)據(jù)用到比較多的。適合采用的方案,實(shí)現(xiàn)日志中心化收集的方案。主要負(fù)責(zé)采集,負(fù)責(zé)處理和傳送。 采用golang實(shí)現(xiàn)Fluent Bit的output插件 前言 目前社區(qū)日志采集和處理的組件不少,之前elk方案中的logstash,cncf社區(qū)中的fluentd,efk方案中的filebeat,以及大...

    binta 評(píng)論0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一個(gè)開(kāi)源和多平臺(tái)的,它允許您從不同的來(lái)源收集數(shù)據(jù)日志,統(tǒng)一并將它們發(fā)送到多個(gè)目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業(yè)務(wù)落盤(pán)的日志。部署方案采取部署。 前言 收集日志的組件多不勝數(shù),有ELK久負(fù)盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數(shù)據(jù)領(lǐng)域...

    betacat 評(píng)論0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一個(gè)開(kāi)源和多平臺(tái)的,它允許您從不同的來(lái)源收集數(shù)據(jù)日志,統(tǒng)一并將它們發(fā)送到多個(gè)目的地。例如日志收集日志分析主要講部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。業(yè)務(wù)落盤(pán)的日志。部署方案采取部署。 前言 收集日志的組件多不勝數(shù),有ELK久負(fù)盛名組合中的logstash, 也有EFK組合中的filebeat,更有cncf新貴fluentd,另外還有大數(shù)據(jù)領(lǐng)域...

    CoffeX 評(píng)論0 收藏0
  • 關(guān)于k8s集群容器日志收集總結(jié)

    摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時(shí)沒(méi)有提供日志查看機(jī)制。以如下的形式啟動(dòng)容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對(duì)k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...

    jeffrey_up 評(píng)論0 收藏0
  • 關(guān)于k8s集群容器日志收集總結(jié)

    摘要:我推薦你使用進(jìn)行日志收集,將作為的出口。集群目前暫時(shí)沒(méi)有提供日志查看機(jī)制。以如下的形式啟動(dòng)容器,容器日志將發(fā)往配置的。 【作者barnett】本文介紹了k8s官方提供的日志收集方法,并介紹了Fluentd日志收集器并與其他產(chǎn)品做了比較。最后介紹了好雨云幫如何對(duì)k8s進(jìn)行改造并使用ZeroMQ以消息的形式將日志傳輸?shù)浇y(tǒng)一的日志處理中心。 容器日志存在形式 目前容器日志有兩種輸出形式: ...

    or0fun 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<