成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

深入K8S Job(三):cronJob controller源碼分析

Enlightenment / 2188人閱讀

摘要:如果沒有指定,則沒有期限。取消當前正在運行的,然后新建來替換。和這兩個字段也是可選的。設(shè)置限制值為,相關(guān)類型的完成后將不會被保留。列出所有的列出所有的遍歷所有的根據(jù)字段確定該是否由所創(chuàng)建。

k8s version: v1.11.0

author: [email protected]

源碼流程圖

概述

cronJob controller 的實現(xiàn)比較簡單,使用 Cron - Wikipedia 的方法,確定調(diào)度規(guī)則,底層的調(diào)度對象就是依賴了 job,它不會去檢查任何 Pod。

該 controller 也沒有依賴各種 informer,就簡單創(chuàng)建了一個循環(huán)運行的協(xié)程,每次遍歷現(xiàn)有的 jobs & cronJobs,整理它們的關(guān)系并進行管理。

注意:kubernetes version >= 1.4 (ScheduledJob),>= 1.5(CronJob),需要給 apiserver 傳遞 --runtime-config=batch/v2alpha1=true 開啟 batch/v2alpha1 API 才可用。
spec 關(guān)鍵字段

.spec.schedule 是 cronJob 的必填字段,該值是 Cron - Wikipedia 格式的字符串,例如:0 * * * *,或者 @hourly,來確定調(diào)度策略。

.spec.startingDeadlineSeconds 是可選字段,表示啟動 Job 的期限(秒級別),如果因為任何原因而錯過了被調(diào)度的時間,那么錯誤執(zhí)行時間的 Job 被認為是失敗的。如果沒有指定,則沒有期限。

.spec.concurrencyPolicy 也是可選字段,指定了 cronJob 創(chuàng)建 Job 的并發(fā)執(zhí)行策略:

Allow(默認):允許并發(fā)運行 Job。

Forbid:禁止并發(fā)運行,如果前一個還沒有完成,則直接跳過。

Replace:取消當前正在運行的 Jobs,然后新建 Job 來替換。

.spec.suspend 也是可選字段,如果設(shè)置為 true,則后續(xù)所有的執(zhí)行都會被過濾掉,但是對當前已經(jīng)在運行的 Job 不影響。默認為false。

.spec.successfulJobsHistoryLimit.spec.failedJobsHistoryLimit 這兩個字段也是可選的。它們指定了可以保留完成和失敗 Job 數(shù)量的限制。
默認沒有限制,所有成功和失敗的 Job 都會被保留。然而,當運行一個 Cron Job 時,很快就會堆積很多 Job,推薦設(shè)置這兩個字段的值。設(shè)置限制值為 0,相關(guān)類型的 Job 完成后將不會被保留。

CronJobController 結(jié)構(gòu)

路徑:pkg/controller/cronjob/cronjob_controller.go

type CronJobController struct {
    // 訪問 kube-apiserver 的 client.
    kubeClient clientset.Interface
    // job 控制器,用于創(chuàng)建和刪除 job.
    jobControl jobControlInterface
    // cronJob 控制器,用于更新狀態(tài).
    sjControl  sjControlInterface
    // pod 控制器,用于list & delete pods
    // 在刪除 job 時,同時也清理 job 創(chuàng)建的 pods.
    podControl podControlInterface
    // cronJob 相關(guān)的events, 通過該 recorder 進行廣播
    recorder   record.EventRecorder
}
注意:代碼中有很多sj,因為以前不叫 cronJob,叫 scheduled jobs。
startCronJobController()

路徑:cmd/kube-controller-manager/app/batch.go

startCronJobController() 是啟動 cronJob controller 的入口函數(shù)。它會初始化 CronJobController 對象,并Run().

func startCronJobController(ctx ControllerContext) (bool, error) {
    // 在啟動 cronJob controller 之前,判斷下 cronJob 是否有配置生效
    // 用戶可以在創(chuàng)建k8s clusters時,通過修改kube-apiserver --runtime-config配置想要生效的 resource
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
        return false, nil
    }
    // 初始化 CronJobController 對象
    cjc, err := cronjob.NewCronJobController(
        ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
    )
    if err != nil {
        return true, fmt.Errorf("error creating CronJob controller: %v", err)
    }
    // Run
    go cjc.Run(ctx.Stop)
    return true, nil
}
syncAll()

CronJobController Run() 方法比較簡單,就是每10s 循環(huán)調(diào)用 syncAll() 函數(shù)。
syncAll() 邏輯也比較清楚,根據(jù)初始化的 kubeClient, 獲取所有的 jobs 和 cronJobs,并遍歷所有 Jobs, 根據(jù)ObjectMeta.OwnerReferences 字段匹配是否由 cronJob controller 所創(chuàng)建。最后基于 cronJob 的UUID 進行整理。
最后處理所有的 cronJobs,確認需要調(diào)度的時間并根據(jù)并行策略創(chuàng)建 jobs,同步完后再清理所有已經(jīng) finished jobs。

func (jm *CronJobController) syncAll() {
    // 列出所有的 jobs
    jl, err := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("can"t list Jobs: %v", err))
        return
    }
    js := jl.Items
    glog.V(4).Infof("Found %d jobs", len(js))

    // 列出所有的 cronJobs
    sjl, err := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("can"t list CronJobs: %v", err))
        return
    }
    sjs := sjl.Items
    glog.V(4).Infof("Found %d cronjobs", len(sjs))

    // 遍歷所有的 jobs, 根據(jù) ObjectMeta.OwnerReferences 字段確定該 job 是否由 cronJob 所創(chuàng)建。
    // 然后根據(jù) cronJob uuid 進行排列
    jobsBySj := groupJobsByParent(js)
    glog.V(4).Infof("Found %d groups", len(jobsBySj))

    // 遍歷所有的 cronJobs
    for _, sj := range sjs {
        // 進行同步
        // 確定需要調(diào)度的時間,并根據(jù) Spec.ConcurrencyPolicy 策略,確認如何來創(chuàng)建 jobs
        // 并更新 cronJob.Status
        syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
        // 清理所有已經(jīng)完成的 jobs
        cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
    }
}
syncOne()

該接口就是 cronJob controller 中實現(xiàn)同步的關(guān)鍵部分。

func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
    nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)

    // 遍歷所有獲取到的 jobs
    // 1.記錄到 childrenJobs 中,表示當前屬于該 cronJob 的所有 Jobs,便于后面清理 cronJob 中記錄的 active Jobs
    // 2.查看該 job 是否在 cronJob.Status.Active 的列表中
    // - 如果在的話,且該 Job 已經(jīng) finished,則將該 job 從 active list 中刪除
    // - 如果不在,且該 Job 還沒有 finished,則發(fā)送異常事件 
    childrenJobs := make(map[types.UID]bool)
    for _, j := range js {
        childrenJobs[j.ObjectMeta.UID] = true
        found := inActiveList(*sj, j.ObjectMeta.UID)
        if !found && !IsJobFinished(&j) {
            recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
        } else if found && IsJobFinished(&j) {
            deleteFromActiveList(sj, j.ObjectMeta.UID)
            // TODO: event to call out failure vs success.
            recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
        }
    }

    // 遍歷 cronJob 所有的 active jobs, 根據(jù)前面的 childrenJobs 來判斷該繼續(xù)的 active job 是否還存在,如果不存在的話,也從 active list 中刪除。
    for _, j := range sj.Status.Active {
        if found := childrenJobs[j.UID]; !found {
            recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
            deleteFromActiveList(sj, j.UID)
        }
    }

    // 上面更新了 cronJob.Status.Active 字段,所以需要更新一把 cronJob
    updatedSJ, err := sjc.UpdateStatus(sj)
    if err != nil {
        glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
        return
    }
    *sj = *updatedSJ

    // 如果 cronJob 已經(jīng)被用戶刪除,則直接 return
    if sj.DeletionTimestamp != nil {
        return
    }

    // 如果 cronJob 已經(jīng)被 suspend,也直接 return
    if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
        glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
        return
    }

    // 根據(jù) cronJob 的創(chuàng)建時間或最近一次的調(diào)度時間,和 cronJob.Spec.Schedule 配置,計算出到現(xiàn)在為止所有應該調(diào)度的時間點。
    times, err := getRecentUnmetScheduleTimes(*sj, now)
    if err != nil {
        recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
        glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
        return
    }
    // 如果返回的時間點列表為空,則表示該 cronJob 暫時還不需要調(diào)度,直接 return
    if len(times) == 0 {
        glog.V(4).Infof("No unmet start times for %s", nameForLog)
        return
    }
    // 有多次未滿足的調(diào)度時間
    if len(times) > 1 {
        glog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
    }

    // scheduledTime 取列表中的最后一次時間
    scheduledTime := times[len(times)-1]
    tooLate := false
    // 如果用戶配置了 Spec.StartingDeadlineSeconds,則需要判斷 scheduledTime 是否滿足條件
    // 如果 now - scheduledTime > Spec.StartingDeadlineSeconds,則直接 return
    if sj.Spec.StartingDeadlineSeconds != nil {
        tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
    }
    if tooLate {
        glog.V(4).Infof("Missed starting window for %s", nameForLog)
        return
    }
    // scheduledTime 滿足各種條件的情況下,就需要查看 cronJob 配置的并發(fā)策略
    // 如果 ForbidConcurrent,且 active jobs > 0, 則直接 return;
    // 否則繼續(xù)往下創(chuàng)建;
    if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
        glog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
        return
    }
    // 如果 ReplaceConcurrent,則刪除所有的 active jobs, 等后面重新創(chuàng)建
    if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
        for _, j := range sj.Status.Active {
            glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)

            job, err := jc.GetJob(j.Namespace, j.Name)
            if err != nil {
                recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
                return
            }
            if !deleteJob(sj, job, jc, pc, recorder, "") {
                return
            }
        }
    }

    // 根據(jù) cronJob.spec.JobTemplate,填充 job 的完整結(jié)構(gòu)
    // 比如 name, labels, OwnerReferences 等等。
    jobReq, err := getJobFromTemplate(sj, scheduledTime)
    if err != nil {
        glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
        return
    }
    // 創(chuàng)建 job
    jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
    if err != nil {
        recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
        return
    }
    glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
    recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)

    // 根據(jù)創(chuàng)建 job 返回的 response,獲取 ObjectReference 結(jié)構(gòu)
    // 用于記錄到 cronJob.Status.Active 中
    ref, err := getRef(jobResp)
    if err != nil {
        glog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
    } else {
        sj.Status.Active = append(sj.Status.Active, *ref)
    }
    // 設(shè)置最近一次的調(diào)度時間
    sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
    // 更新 cronJob
    if _, err := sjc.UpdateStatus(sj); err != nil {
        glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
    }

    return
}
參考資料

Running automated tasks with cron jobs - Kubernetes

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/32721.html

相關(guān)文章

  • 深入K8S Job(一):介紹

    摘要:用于批量處理短暫的一次性任務,并保證指定數(shù)量的成功結(jié)束。一旦有一個成功結(jié)束,其他都會準備退出。默認值指定可運行的時間期限,超過時間還未結(jié)束,系統(tǒng)將會嘗試進行終止。已知問題設(shè)置為時,會與沖突,可以暫時將設(shè)置為進行規(guī)避。 介紹 Kubernetes有兩個概念跟job有關(guān): Job: 負責批量處理短暫的一次性任務,僅執(zhí)行一次,并保證處理的一個或者多個Pod成功結(jié)束。 CronJob: ...

    ysl_unh 評論0 收藏0
  • 深入K8S Job(二):job controller源碼分析

    摘要:用于獲取元數(shù)據(jù)及根據(jù)的來匹配該會使用到的接口如下用于根據(jù)反推根據(jù)獲取元數(shù)據(jù)提供了接口用于獲取指定下管理的所有通過的數(shù)據(jù)變更,比如,來操作該。 k8s version: v1.11.0author: [email protected] 源碼流程圖 showImg(https://segmentfault.com/img/remote/1460000016496285?w...

    EddieChan 評論0 收藏0
  • Kubernetes概念與術(shù)語

    摘要:標識是與操作對象間的紐帶。集群為每個對象維護三類信息對象元數(shù)據(jù)期望狀態(tài)與實際狀態(tài)元數(shù)據(jù)指對象的基本信息,比如命名標簽注釋等等,用于識別對象期望狀態(tài)一般由用戶配置來描述的實際狀態(tài)是由集群各個組件上報的集群實際的運行情況。 綜述 學習Kubernetes時,發(fā)現(xiàn)它的概念和術(shù)語還是比較多的,光靠啃官方文檔比較晦澀。所以邊學習邊整理,對主要的概念和術(shù)語做一下分類及簡要說明。感覺把重要概念都理解...

    _Suqin 評論0 收藏0
  • 容器監(jiān)控實踐—kube-state-metrics

    摘要:功能提供的指標,按照階段分為三種類別實驗性質(zhì)的中階段的或者的字段。穩(wěn)定版本的中不向后兼容的主要版本的更新被廢棄的已經(jīng)不在維護的。通過比較來保證的順序并不保證包含所有資源本文為容器監(jiān)控實踐系列文章,完整內(nèi)容見 概述 已經(jīng)有了cadvisor、heapster、metric-server,幾乎容器運行的所有指標都能拿到,但是下面這種情況卻無能為力: 我調(diào)度了多少個replicas?現(xiàn)在可...

    kevin 評論0 收藏0
  • 容器監(jiān)控實踐—kube-state-metrics

    摘要:功能提供的指標,按照階段分為三種類別實驗性質(zhì)的中階段的或者的字段。穩(wěn)定版本的中不向后兼容的主要版本的更新被廢棄的已經(jīng)不在維護的。通過比較來保證的順序并不保證包含所有資源本文為容器監(jiān)控實踐系列文章,完整內(nèi)容見 概述 已經(jīng)有了cadvisor、heapster、metric-server,幾乎容器運行的所有指標都能拿到,但是下面這種情況卻無能為力: 我調(diào)度了多少個replicas?現(xiàn)在可...

    cikenerd 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<