成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

容器日志處理及實(shí)現(xiàn)

mtunique / 3587人閱讀

摘要:實(shí)現(xiàn)方式是每個(gè)掛載目錄使用的插件掃描每個(gè)容器日志文件,直接發(fā)送給。首先根據(jù)容器配置的類別調(diào)用返回一個(gè)方法類型實(shí)質(zhì)就是從工廠類注冊(cè)的插件去查找,具體源碼下文分析。在處理中心統(tǒng)一完成下一步處理。直接接收容器的日志。

容器日志 輸出形式:

目前容器日志有兩種輸出形式:

stdout,stderr 標(biāo)準(zhǔn)輸出

這種形式的日志輸出我們可以直接使用docker logs查看日志, k8s 集群中同樣集群可以使用kubectl logs類似的形式查看日志。

日志文件記錄

這種日志輸出我們無(wú)法從以上方法查看日志內(nèi)容,只能tail日志文件查看。

收集方式:

不論你的業(yè)務(wù)容器日志如何輸出,都是可以使用統(tǒng)一的日志收集器收集。常見(jiàn)的日志收集方式:

k8s 集群

集群?jiǎn)?dòng)時(shí)會(huì)在每個(gè)機(jī)器啟動(dòng)一個(gè)Fluentd agent收集日志然后發(fā)送給 Elasticsearch。實(shí)現(xiàn)方式是每個(gè)agent掛載目錄/var/lib/docker/containers使用fluentd的tail插件掃描每個(gè)容器日志文件,直接發(fā)送給Elasticsearch。

Fluentd agent起在業(yè)務(wù)同一個(gè) pod 中共享 volume 然后實(shí)現(xiàn)對(duì)日志文件的收集發(fā)送給Elasticsearch。

docker swarm 集群

swarm 目前暫時(shí)沒(méi)有提供日志查看機(jī)制。但是docker cloud提供了與kubectrl logs類似的機(jī)制查看 stdout 的日志。目前還沒(méi)有 fluentd 插件直接對(duì)服務(wù)進(jìn)行日志收集,暫時(shí)考慮直接使用使用跟容器一樣的機(jī)制收集。docker service create 支持--log-driver

docker 容器

從 docker1.8 內(nèi)置了fluentd log driver 。以如下的形式啟動(dòng)容器,容器 stdout/stderr 日志將發(fā)往配置的 fluentd 。如果配置后,docker logs將無(wú)法使用。另外默認(rèn)模式下如果你配置得地址沒(méi)有正常服務(wù),容器無(wú)法啟動(dòng)。你也可以使用fluentd-async-connect形式啟動(dòng), docker daemon 則能在后臺(tái)嘗試連接并緩存日志。

`docker run --log-driver=fluentd --log-opt fluentd-address=myhost.local:24224
`
同樣如果是日志文件,將文件暴露出來(lái)直接使用 fluentd 收集。

容器日志源碼簡(jiǎn)單分析
# /container/container.go:63
type CommonContainer struct {
    StreamConfig *stream.Config
  ...
}
# /container/stream/streams.go:26
type Config struct {
    sync.WaitGroup
    stdout    *broadcaster.Unbuffered
    stderr    *broadcaster.Unbuffered
    stdin     io.ReadCloser
    stdinPipe io.WriteCloser
}

moby源碼來(lái)看,每一個(gè)container實(shí)例都有幾個(gè)屬性stdout,stderr,stdin,以及管道stdinPipe(當(dāng)容器使用-i參數(shù)啟動(dòng)時(shí)標(biāo)準(zhǔn)輸入將被運(yùn)行,daemon將能夠使用此管道向容器內(nèi)寫(xiě)入標(biāo)準(zhǔn)輸入).


那么針對(duì)如上的實(shí)例該如何實(shí)現(xiàn)日志收集轉(zhuǎn)發(fā)?

# /container/container.go:312
func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
    c, err := logger.GetLogDriver(cfg.Type)
    if err != nil {
        return nil, fmt.Errorf("Failed to get logging factory: %v", err)
    }
    ctx := logger.Context{
        Config:              cfg.Config,
        ContainerID:         container.ID,
        ContainerName:       container.Name,
        ContainerEntrypoint: container.Path,
        ContainerArgs:       container.Args,
        ContainerImageID:    container.ImageID.String(),
        ContainerImageName:  container.Config.Image,
        ContainerCreated:    container.Created,
        ContainerEnv:        container.Config.Env,
        ContainerLabels:     container.Config.Labels,
        DaemonName:          "docker",
    }

    // Set logging file for "json-logger"
    if cfg.Type == jsonfilelog.Name {
        ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf("%s-json.log", container.ID))
        if err != nil {
            return nil, err
        }
    }
    return c(ctx)
}
#/container/container.go:978
func (container *Container) startLogging() error {
    if container.HostConfig.LogConfig.Type == "none" {
        return nil // do not start logging routines
    }

    l, err := container.StartLogger(container.HostConfig.LogConfig)
    if err != nil {
        return fmt.Errorf("Failed to initialize logging driver: %v", err)
    }

    copier := logger.NewCopier(map[string]io.Reader{"stdout": container.StdoutPipe(), "stderr": container.StderrPipe()}, l)
    container.LogCopier = copier
    copier.Run()
    container.LogDriver = l

    // set LogPath field only for json-file logdriver
    if jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
        container.LogPath = jl.LogPath()
    }

    return nil
}

第一個(gè)方法是為container查找log-driver。首先根據(jù)容器配置的log-driver類別調(diào)用:logger.GetLogDriver(cfg.Type)返回一個(gè)方法類型:

/daemon/logger/factory.go:9
type Creator func(Context) (Logger, error)

實(shí)質(zhì)就是從工廠類注冊(cè)的logdriver插件去查找,具體源碼下文分析。獲取到c方法后構(gòu)建調(diào)用參數(shù)具體就是容器的一些信息。然后使用調(diào)用c方法返回driver。driver是個(gè)接口類型,我們看看有哪些方法:

# /daemon/logger/logger.go:61
type Logger interface {
    Log(*Message) error
    Name() string
    Close() error
}

很簡(jiǎn)單的三個(gè)方法,也很容易理解,Log()發(fā)送日志消息到driver,Close()進(jìn)行關(guān)閉操作(根據(jù)不同實(shí)現(xiàn))。
也就是說(shuō)我們自己實(shí)現(xiàn)一個(gè)logdriver,只需要實(shí)現(xiàn)如上三個(gè)方法,然后注冊(cè)到logger工廠類中即可。下面我們來(lái)看/daemon/logger/factory.go

第二個(gè)方法就是處理日志了,獲取到日志driver,在創(chuàng)建一個(gè)Copier,顧名思義就是復(fù)制日志,分別從stdout 和stderr復(fù)制到logger driver。下面看看具體關(guān)鍵實(shí)現(xiàn):

#/daemon/logger/copir.go:41
func (c *Copier) copySrc(name string, src io.Reader) {
    defer c.copyJobs.Done()
    reader := bufio.NewReader(src)

    for {
        select {
        case <-c.closed:
            return
        default:
            line, err := reader.ReadBytes("
")
            line = bytes.TrimSuffix(line, []byte{"
"})

            // ReadBytes can return full or partial output even when it failed.
            // e.g. it can return a full entry and EOF.
            if err == nil || len(line) > 0 {
                if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
                    logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
                }
            }

            if err != nil {
                if err != io.EOF {
                    logrus.Errorf("Error scanning log stream: %s", err)
                }
                return
            }
        }
    }
}

每讀取一行數(shù)據(jù),構(gòu)建一個(gè)消息,調(diào)用logdriver的log方法發(fā)送到driver處理。

日志driver注冊(cè)器

位于/daemon/logger/factory.go的源碼實(shí)現(xiàn)即時(shí)日志driver的注冊(cè)器,其中幾個(gè)重要的方法(上文已經(jīng)提到一個(gè)):

# /daemon/logger/factory.go:21
func (lf *logdriverFactory) register(name string, c Creator) error {
    if lf.driverRegistered(name) {
        return fmt.Errorf("logger: log driver named "%s" is already registered", name)
    }

    lf.m.Lock()
    lf.registry[name] = c
    lf.m.Unlock()
    return nil
}
# /daemon/logger/factory.go:39
func (lf *logdriverFactory) registerLogOptValidator(name string, l LogOptValidator) error {
    lf.m.Lock()
    defer lf.m.Unlock()

    if _, ok := lf.optValidator[name]; ok {
        return fmt.Errorf("logger: log validator named "%s" is already registered", name)
    }
    lf.optValidator[name] = l
    return nil
}

看起來(lái)很簡(jiǎn)單,就是將一個(gè)Creator方法類型添加到一個(gè)map結(jié)構(gòu)中,將LogOptValidator添加到另一個(gè)map這里注意加鎖的操作。

#/daemon/logger/factory.go:13
type LogOptValidator func(cfg map[string]string) error

這個(gè)主要是驗(yàn)證driver的參數(shù) ,dockerd和docker啟動(dòng)參數(shù)中有:--log-opt

實(shí)例 云幫怎么實(shí)現(xiàn)的

使用自己實(shí)現(xiàn)的 zeroMQ-driver 直接將容器日志通過(guò) 0MQ 發(fā)到日志統(tǒng)一處理中心。在處理中心統(tǒng)一完成下一步處理。如果平臺(tái)用戶需要將日志向外輸出或者直接對(duì)接平臺(tái)內(nèi)日志分析應(yīng)用,我們的處理是在應(yīng)用 pod 中啟動(dòng)日志收集插件容器(封裝擴(kuò)展的 fluentd ),根據(jù)用戶的需要配置日志出口,實(shí)現(xiàn)應(yīng)用級(jí)日志收集。容器日志首先是由 docker-daemon 收集到,再根據(jù)容器 log-driver 配置進(jìn)行相應(yīng)操作,也就是說(shuō)如果你的宿主機(jī)網(wǎng)絡(luò)與容器網(wǎng)絡(luò)不通(k8s 集群),日志從宿主機(jī)到 pod 中的收集容器只有兩種方式:走外層網(wǎng)絡(luò),文件掛載。 我們采用文件掛載方式。
以zmq-driver為例講講我們?cè)趺磳?shí)現(xiàn)自己的driver。直接接收容器的日志。

//定義一個(gè)struct,這里包含一個(gè)zmq套接字
type ZmqLogger struct {
    writer      *zmq.Socket
    containerId string
    tenantId    string
    serviceId   string
    felock      sync.Mutex
}
//定義init方法調(diào)用logger注冊(cè)器的方法注冊(cè)當(dāng)前driver
//和參數(shù)驗(yàn)證方法。
func init() {
    if err := logger.RegisterLogDriver(name, New); err != nil {
        logrus.Fatal(err)
    }
    if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
        logrus.Fatal(err)
    }
}
//實(shí)現(xiàn)一個(gè)上文提到的Creator方法注冊(cè)logdriver.
//這里新建一個(gè)zmq套接字構(gòu)建一個(gè)實(shí)例
func New(ctx logger.Context) (logger.Logger, error) {
    zmqaddress := ctx.Config[zmqAddress]

    puber, err := zmq.NewSocket(zmq.PUB)
    if err != nil {
        return nil, err
    }
    var (
        env       = make(map[string]string)
        tenantId  string
        serviceId string
    )
    for _, pair := range ctx.ContainerEnv {
        p := strings.SplitN(pair, "=", 2)
        //logrus.Errorf("ContainerEnv pair: %s", pair)
        if len(p) == 2 {
            key := p[0]
            value := p[1]
            env[key] = value
        }
    }
    tenantId = env["TENANT_ID"]
    serviceId = env["SERVICE_ID"]

    if tenantId == "" {
        tenantId = "default"
    }

    if serviceId == "" {
        serviceId = "default"
    }

    puber.Connect(zmqaddress)

    return &ZmqLogger{
        writer:      puber,
        containerId: ctx.ID(),
        tenantId:    tenantId,
        serviceId:   serviceId,
        felock:      sync.Mutex{},
    }, nil
}
//實(shí)現(xiàn)Log方法,這里使用zmq socket發(fā)送日志消息
//這里必須注意,zmq socket是線程不安全的,我們知道
//本方法可能被兩個(gè)線程(復(fù)制stdout和膚質(zhì)stderr)調(diào)用//必須使用鎖保證線程安全。否則會(huì)發(fā)生錯(cuò)誤。
func (s *ZmqLogger) Log(msg *logger.Message) error {
    s.felock.Lock()
    defer s.felock.Unlock()
    s.writer.Send(s.tenantId, zmq.SNDMORE)
    s.writer.Send(s.serviceId, zmq.SNDMORE)
    if msg.Source == "stderr" {
        s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
    } else {
        s.writer.Send(s.containerId+": "+string(msg.Line), zmq.DONTWAIT)
    }
    return nil
}
//實(shí)現(xiàn)Close方法,這里用來(lái)關(guān)閉zmq socket。
//同樣注意線程安全,調(diào)用此方法的是容器關(guān)閉協(xié)程。
func (s *ZmqLogger) Close() error {
    s.felock.Lock()
    defer s.felock.Unlock()
    if s.writer != nil {
        return s.writer.Close()
    }
    return nil
}

func (s *ZmqLogger) Name() string {
    return name
}
//驗(yàn)證參數(shù)的方法,我們使用參數(shù)傳入zmq pub的地址。
func ValidateLogOpt(cfg map[string]string) error {
    for key := range cfg {
        switch key {
        case zmqAddress:
        default:
            return fmt.Errorf("unknown log opt "%s" for %s log driver", key, name)
        }
    }
    if cfg[zmqAddress] == "" {
        return fmt.Errorf("must specify a value for log opt "%s"", zmqAddress)
    }
    return nil
}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/26885.html

相關(guān)文章

  • Docker監(jiān)控日志采集神器

    摘要:因此,另一種解決辦法像這樣的工具,則只是將和進(jìn)行了結(jié)合,其功能尤其關(guān)注日志管理,比如格式檢查,日志語(yǔ)法分析,數(shù)據(jù)改進(jìn)地址地理位置信息,元數(shù)據(jù)標(biāo)簽等以及日志路由。 由Rancher社區(qū)維護(hù)的應(yīng)用商店最近迎來(lái)了兩個(gè)明星項(xiàng)目——SPM 和 Logsene,來(lái)自Sematext的監(jiān)控與日志工具。如果你已經(jīng)熟悉Logstash,Kibana,Prometheus,Grafana這些監(jiān)控或日志解決...

    PAMPANG 評(píng)論0 收藏0
  • k8s與caas--容器云caas平臺(tái)的落地實(shí)踐

    摘要:容器云將支持應(yīng)用的一鍵式部署交付,提供負(fù)載均衡,私有域名綁定,性能監(jiān)控等應(yīng)用生命周期管理服務(wù)。本容器云平臺(tái),對(duì)接持續(xù)集成發(fā)布系統(tǒng)。 前言 在移動(dòng)互聯(lián)網(wǎng)時(shí)代,新的技術(shù)需要新技術(shù)支持環(huán)境、新的軟件交付流程和IT架構(gòu),從而實(shí)現(xiàn)架構(gòu)平臺(tái)化,交付持續(xù)化,業(yè)務(wù)服務(wù)化。容器將成為新一代應(yīng)用的標(biāo)準(zhǔn)交付件,容器云將幫助企業(yè)用戶構(gòu)建研發(fā)流程和云平臺(tái)基礎(chǔ)設(shè)施??s短應(yīng)用向云端交付的周期,降低運(yùn)營(yíng)門(mén)檻。加速向互...

    h9911 評(píng)論0 收藏0
  • k8s與caas--容器云caas平臺(tái)的落地實(shí)踐

    摘要:容器云將支持應(yīng)用的一鍵式部署交付,提供負(fù)載均衡,私有域名綁定,性能監(jiān)控等應(yīng)用生命周期管理服務(wù)。本容器云平臺(tái),對(duì)接持續(xù)集成發(fā)布系統(tǒng)。 前言 在移動(dòng)互聯(lián)網(wǎng)時(shí)代,新的技術(shù)需要新技術(shù)支持環(huán)境、新的軟件交付流程和IT架構(gòu),從而實(shí)現(xiàn)架構(gòu)平臺(tái)化,交付持續(xù)化,業(yè)務(wù)服務(wù)化。容器將成為新一代應(yīng)用的標(biāo)準(zhǔn)交付件,容器云將幫助企業(yè)用戶構(gòu)建研發(fā)流程和云平臺(tái)基礎(chǔ)設(shè)施??s短應(yīng)用向云端交付的周期,降低運(yùn)營(yíng)門(mén)檻。加速向互...

    KaltZK 評(píng)論0 收藏0
  • Cube如何助力科盾業(yè)務(wù)容器化“一步到位”?

    前言 以Docker為代表的容器技術(shù)縮短了企業(yè)應(yīng)用從開(kāi)發(fā)、構(gòu)建到發(fā)布、運(yùn)行的整個(gè)生命周期。Gartner推測(cè)到2022年將會(huì)有75%的全球化企業(yè)將在生產(chǎn)中使用容器化的應(yīng)用(當(dāng)前約為30%)。由于Docker往往難以獨(dú)立支撐起大規(guī)模容器化部署,因此誕生了Kubernetes等容器編排工具,解決了大規(guī)模容器的組織和管理難題。 但事實(shí)上,Kubernetes的使用體系還是非常復(fù)雜的,對(duì)于企業(yè)的開(kāi)...

    happyhuangjinjin 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<