摘要:用于獲取元數(shù)據(jù)及根據(jù)的來(lái)匹配該會(huì)使用到的接口如下用于根據(jù)反推根據(jù)獲取元數(shù)據(jù)提供了接口用于獲取指定下管理的所有通過(guò)的數(shù)據(jù)變更,比如,來(lái)操作該。
k8s version: v1.11.0源碼流程圖 JobController 結(jié)構(gòu)author: [email protected]
路徑:pkg/controller/job/job_controller.go
type JobController struct { // 訪問(wèn) kube-apiserver 的client // 需要查詢 job、pod 等元數(shù)據(jù)信息 kubeClient clientset.Interface // pod 控制器,用于創(chuàng)建和刪除pod使用 podControl controller.PodControlInterface // 用于更新 job status updateHandler func(job *batch.Job) error // job controller 核心接口,用于 sync job syncHandler func(jobKey string) (bool, error) // job controller 在啟動(dòng)時(shí)會(huì)對(duì) job & pod 先進(jìn)行同步 // 用于判斷是否已經(jīng)對(duì) pod 同步過(guò) podStoreSynced cache.InformerSynced // 用于判斷是否已經(jīng)對(duì) job 同步過(guò) jobStoreSynced cache.InformerSynced // expectations cache,記錄該job下pods的adds & dels次數(shù), // 并提供接口進(jìn)行調(diào)整,已達(dá)到期望值。 expectations controller.ControllerExpectationsInterface // jobLister 用于獲取job元數(shù)據(jù)及根據(jù)pod的labels來(lái)匹配jobs // 該controller 會(huì)使用到的接口如下: // 1. GetPodJobs(): 用于根據(jù)pod反推jobs // 2. Get(): 根據(jù)namespace & name 獲取job 元數(shù)據(jù) jobLister batchv1listers.JobLister // podStore 提供了接口用于獲取指定job下管理的所有pods podStore corelisters.PodLister // Jobs queue // job controller通過(guò)kubeClient watch jobs & pods的數(shù)據(jù)變更, // 比如add、delete、update,來(lái)操作該queue。 // 并啟動(dòng)相應(yīng)的worker,調(diào)用syncJob處理該queue中的jobs。 queue workqueue.RateLimitingInterface // jobs的相關(guān)events,通過(guò)該recorder進(jìn)行廣播 recorder record.EventRecorder }startJobController()
路徑:cmd/kube-controller-manager/app/batch.go
startJobController() 是啟動(dòng) job controller 的入口函數(shù),該函數(shù)會(huì)注冊(cè)到 kube-controller-manager 組件的 NewControllerInitializers() 接口中。
具體的 kube-controller-manager 組件的啟動(dòng)實(shí)現(xiàn)可以自己看下相關(guān)代碼,這里先只關(guān)注 job controller 的實(shí)現(xiàn)。
func startJobController(ctx ControllerContext) (bool, error) { // 在啟動(dòng)job controller之前,判斷下job 是否有配置生效 // 用戶可以在創(chuàng)建k8s clusters時(shí),通過(guò)修改kube-apiserver --runtime-config配置想要生效的 resource if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}] { return false, nil } // 初始化 JobController結(jié)構(gòu),并Run // Run的時(shí)候指定了gorutinue的數(shù)量,每個(gè)gorutinue 就是一個(gè)worker go job.NewJobController( ctx.InformerFactory.Core().V1().Pods(), ctx.InformerFactory.Batch().V1().Jobs(), ctx.ClientBuilder.ClientOrDie("job-controller"), ).Run(int(ctx.ComponentConfig.JobController.ConcurrentJobSyncs), ctx.Stop) return true, nil }NewJobController()
路徑:pkg/controller/job/job_controller.go
func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *JobController { // 初始化event broadcaster // 用于該controller 發(fā)送job 相關(guān)的events eventBroadcaster := record.NewBroadcaster() // 注冊(cè)打印event信息的function // eventBroadcaster.StartEventWatcher()會(huì)創(chuàng)建gorutinue并開(kāi)始watch event, // 根據(jù)注冊(cè)的eventHandler輪詢處理每個(gè)event,這里就是通過(guò)glog.Infof打印日志 eventBroadcaster.StartLogging(glog.Infof) // EventSinkImpl 包含了一個(gè)EventInterface, 實(shí)現(xiàn)了Create/Update/Delete/Get/Watch/Patch..等等操作 // 這一步跟上面一樣,也是通過(guò)eventBroadcaster.StartEventWatcher() 注冊(cè)了EventInterface實(shí)現(xiàn), // 用來(lái)從指定的eventBroadcaster接收event,并發(fā)送給指定的接收器。 // k8s event實(shí)現(xiàn)可以多帶帶進(jìn)行源碼分析,值得學(xué)習(xí)下。 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) // kubernetes 內(nèi)部的限流策略 // 對(duì)apiserver來(lái)說(shuō),每個(gè)controller及scheduler都是client,所以內(nèi)部的限流策略也至關(guān)重要。 if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } // 初始化JobController jm := &JobController{ // 連接kube-apiserver的client kubeClient: kubeClient, // podControl,用于manageJob()中創(chuàng)建和刪除pod podControl: controller.RealPodControl{ KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), }, // 維護(hù)的期望狀態(tài)下的Pod Cache,并且提供了修正該Cache的接口 // 比如會(huì)存jobs 下pods 的adds & dels 值,并提供了接口修改這兩個(gè)值。 expectations: controller.NewControllerExpectations(), // jobs queue, 后面會(huì)創(chuàng)建對(duì)應(yīng)數(shù)量的workers 從該queue 中處理各個(gè)jobs。 queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), // event recorder,用于發(fā)送job 相關(guān)的events recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } // 注冊(cè)jobInformer 的Add、Update、Delete 函數(shù) // 該controller 獲取到j(luò)ob 的Add、Update、Delete事件之后,會(huì)調(diào)用對(duì)應(yīng)的function // 這些function 的核心還是去操作了上面的queue,讓syncJob 處理queue 中的jobs jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { jm.enqueueController(obj, true) }, UpdateFunc: jm.updateJob, DeleteFunc: func(obj interface{}) { jm.enqueueController(obj, true) }, }) // 上面結(jié)構(gòu)中已經(jīng)有介紹 jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced // 注冊(cè) podInformer 的Add、Update、Delete 函數(shù) // job 最終是依托了pod 去運(yùn)行,所以相關(guān)的pods 事件也需要關(guān)心。 // 該podInformer 會(huì)監(jiān)聽(tīng)所有的pods 變更事件,所以函數(shù)中都會(huì)去判斷該pod 的containerRef是否是“job”, // 如果是的話再更新對(duì)應(yīng)的expectations & queue, 觸發(fā)syncJob進(jìn)行處理。 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, DeleteFunc: jm.deletePod, }) // 上面結(jié)構(gòu)中已經(jīng)有介紹 jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced // 注冊(cè)更新job status的函數(shù) jm.updateHandler = jm.updateJobStatus // 注冊(cè)sync job handler // 核心實(shí)現(xiàn) jm.syncHandler = jm.syncJob return jm }Run()
路徑:pkg/controller/job/job_controller.go
// Run the main goroutine responsible for watching and syncing jobs. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer jm.queue.ShutDown() glog.Infof("Starting job controller") defer glog.Infof("Shutting down job controller") // 每次啟動(dòng)都會(huì)先等待Job & Pod cache 是否有同步過(guò),即指queue是否已經(jīng)同步過(guò)數(shù)據(jù), // 因?yàn)槊總€(gè)worker干的活都是從queue中獲取,所以只有queue有數(shù)據(jù)才應(yīng)該繼續(xù)往下創(chuàng)建worker。 if !controller.WaitForCacheSync("job", stopCh, jm.podStoreSynced, jm.jobStoreSynced) { return } // 創(chuàng)建指定數(shù)量的gorutinue // 每個(gè)gorutinue 執(zhí)行worker,每個(gè)worker 執(zhí)行完了之后sleep 1s,然后繼續(xù)循環(huán)執(zhí)行 for i := 0; i < workers; i++ { go wait.Until(jm.worker, time.Second, stopCh) } <-stopCh }
看下具體的worker 實(shí)現(xiàn):
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (jm *JobController) worker() { for jm.processNextWorkItem() { } } func (jm *JobController) processNextWorkItem() bool { // 從queque 中獲取job key // key 構(gòu)成: namespace + "/" + name key, quit := jm.queue.Get() if quit { return false } defer jm.queue.Done(key) // 調(diào)用初始化時(shí)注冊(cè)的 syncJob() // 如果執(zhí)行成功,且forget = true, 則從queue 中刪除該 key。 forget, err := jm.syncHandler(key.(string)) if err == nil { if forget { jm.queue.Forget(key) } return true } // 如果syncJob() 出錯(cuò), 則打印出錯(cuò)信息 // 該utilruntime.HandleError() 會(huì)記錄最近一次的錯(cuò)誤時(shí)間點(diǎn)并進(jìn)行限速,防止頻繁打印錯(cuò)誤信息。 utilruntime.HandleError(fmt.Errorf("Error syncing job: %v", err)) // 如果syncJob() 出錯(cuò),則把該job key 繼續(xù)丟回queue 中, 等待下次sync。 jm.queue.AddRateLimited(key) return true }syncJob()
worker的關(guān)鍵就是調(diào)用了syncJob,下面繼續(xù)看下該函數(shù)具體做了什么:
func (jm *JobController) syncJob(key string) (bool, error) { // 慣用招數(shù),看下每次sync 花了多久 startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) }() // 把key 拆分成job namespace & name ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return false, err } if len(ns) == 0 || len(name) == 0 { return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } // 獲取job 信息 // 如果沒(méi)有找到該job的話,表示已經(jīng)被刪除,并從ControllerExpectations中刪除該key sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { glog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) return true, nil } return false, err } job := *sharedJob // 根據(jù)job.Status.Conditions是否處于“JobComplete” or "JobFailed", 來(lái)判斷該job 是否已經(jīng)完成。 // 如果已經(jīng)完成的話,直接return if IsJobFinished(&job) { return true, nil } // 根據(jù)該 job key 失敗的次數(shù)來(lái)計(jì)算該job 已經(jīng)重試的次數(shù)。 // job 默認(rèn)會(huì)有6次的重試機(jī)會(huì) previousRetry := jm.queue.NumRequeues(key) // 判斷該key 是否需要調(diào)用manageJob()進(jìn)行sync,條件如下: // 1. 該key 在ControllerExpectations中的adds和dels 都 <= 0 // 2. 該key 在ControllerExpectations中已經(jīng)超過(guò)5min沒(méi)有更新了 // 3. 該key 在ControllerExpectations中沒(méi)有查到 // 4. 調(diào)用GetExpectations()接口失敗 jobNeedsSync := jm.expectations.SatisfiedExpectations(key) // 獲取該job管理的所有pods pods, err := jm.getPodsForJob(&job) if err != nil { return false, err } // 獲取處于active 的pods activePods := controller.FilterActivePods(pods) // 獲取active & succeeded & failed pods數(shù)量 active := int32(len(activePods)) succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions) // 看下該job是否是第一次啟動(dòng),是的話,設(shè)置StartTime; // 并判斷是否設(shè)置了job.Spec.ActiveDeadlineSeconds, 如果設(shè)置了的話,在ActiveDeadlineSeconds秒后,在將該key 丟入queue if job.Status.StartTime == nil { now := metav1.Now() job.Status.StartTime = &now // enqueue a sync to check if job past ActiveDeadlineSeconds if job.Spec.ActiveDeadlineSeconds != nil { glog.V(4).Infof("Job %s have ActiveDeadlineSeconds will sync after %d seconds", key, *job.Spec.ActiveDeadlineSeconds) jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) } } var manageJobErr error jobFailed := false var failureReason string var failureMessage string // 確認(rèn)該job是否有新的pod failed jobHaveNewFailure := failed > job.Status.Failed // 確認(rèn)重試次數(shù)是否有超出預(yù)期值 exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) // 如果job重試的次數(shù)超過(guò)了job.Spec.BackoffLimit(默認(rèn)是6次),則標(biāo)記該job為failed并指明原因; // 計(jì)算job重試的次數(shù),還跟job中的pod template設(shè)置的重啟策略有關(guān),如果設(shè)置成“RestartPolicyOnFailure”, // job重試的次數(shù) = 所有pods InitContainerStatuses 和 ContainerStatuses 的RestartCount 之和, // 也需要判斷這個(gè)重試次數(shù)是否超過(guò) BackoffLimit; if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { jobFailed = true failureReason = "BackoffLimitExceeded" failureMessage = "Job has reached the specified backoff limit" // 如果job 運(yùn)行的時(shí)間超過(guò)了ActiveDeadlineSeconds,則標(biāo)記該job為failed并指明原因 } else if pastActiveDeadline(&job) { jobFailed = true failureReason = "DeadlineExceeded" failureMessage = "Job was active longer than specified deadline" } // 如果job failed,則并發(fā)等待所有active pods刪除結(jié)束; // 修改job.Status.Conditions, 并且根據(jù)之前記錄的失敗信息發(fā)送event if jobFailed { errCh := make(chan error, active) jm.deleteJobPods(&job, activePods, errCh) select { case manageJobErr = <-errCh: if manageJobErr != nil { break } default: } failed += active active = 0 job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage)) jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) // 如果job 沒(méi)有標(biāo)記為failed } else { // 根據(jù)之前判斷的job是否需要sync,且該job 還未被刪除,則調(diào)用mangeJob()。 // manageJob() 后面多帶帶解析 if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(activePods, succeeded, &job) } completions := succeeded complete := false // job.Spec.Completions 表示該job只有成功創(chuàng)建這些數(shù)量的pods,才算完成。 // 如果該值沒(méi)有設(shè)置,表示只要其中有一個(gè)pod 成功過(guò),該job 就算完成了, // 但是需要注意,如果當(dāng)前還有正在運(yùn)行的pods,則需要等待這些pods都退出,才能標(biāo)記該job完成任務(wù)了。 if job.Spec.Completions == nil { if succeeded > 0 && active == 0 { complete = true } // 如果設(shè)置了Completions值,只要該job下成功創(chuàng)建的pods數(shù)量 >= Completions,該job就成功結(jié)束了。 // 還需要發(fā)送一些異常events, 比如已經(jīng)達(dá)到要求的成功創(chuàng)建的數(shù)量后,還有處于active的pods; // 或者成功的次數(shù) > 指定的次數(shù),這些應(yīng)該都是預(yù)期之外的事件。 } else { if completions >= *job.Spec.Completions { complete = true if active > 0 { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached") } if completions > *job.Spec.Completions { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } } } // 如果job成功結(jié)束,則更新job.Status.Conditions && job.Status.CompletionTime if complete { job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) now := metav1.Now() job.Status.CompletionTime = &now } } forget := false // 如果這次有成功的pod 產(chǎn)生,則forget 該次job key if job.Status.Succeeded < succeeded { forget = true } // 更新job.Status if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed // 更新job失敗的話,將該job key繼續(xù)丟入queue中。 if err := jm.updateHandler(&job); err != nil { return forget, err } // 如果這次job 有新的pod failed,且該job還未完成,則繼續(xù)把該job key丟入queue中 if jobHaveNewFailure && !IsJobFinished(&job) { // returning an error will re-enqueue Job after the backoff period return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) } // 否則forget job forget = true } return forget, manageJobErr }manageJob()
在syncJob()中有個(gè)關(guān)鍵函數(shù) manageJob(),它主要做的事情就是根據(jù) job 配置的并發(fā)數(shù)來(lái)確認(rèn)當(dāng)前處于 active 的 pods 數(shù)量是否合理,如果不合理的話則進(jìn)行調(diào)整。
具體實(shí)現(xiàn)如下:
func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { var activeLock sync.Mutex active := int32(len(activePods)) parallelism := *job.Spec.Parallelism // 獲取job key, 根據(jù) namespace + "/" + name進(jìn)行拼接。 jobKey, err := controller.KeyFunc(job) if err != nil { utilruntime.HandleError(fmt.Errorf("Couldn"t get key for job %#v: %v", job, err)) return 0, nil } var errCh chan error // 如果處于active pods 大于job設(shè)置的并發(fā)數(shù),則并發(fā)刪除超出部分的active pods。 // 需要注意的是,需要?jiǎng)h除的active pods是有一定的優(yōu)先級(jí)的: // not-ready < ready;unscheduled < scheduled;pending < running。 // 先基于上面的優(yōu)先級(jí)對(duì)activePods 進(jìn)行排序,然后再?gòu)念^執(zhí)行刪除操作。 // 如果刪除pods失敗,則需要回滾之前設(shè)置的ControllerExpectations 和 active 值。 if active > parallelism { diff := active - parallelism errCh = make(chan error, diff) jm.expectations.ExpectDeletions(jobKey, int(diff)) glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) sort.Sort(controller.ActivePods(activePods)) active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { go func(ix int32) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { defer utilruntime.HandleError(err) glog.V(2).Infof("Failed to delete %v, decrementing expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) jm.expectations.DeletionObserved(jobKey) activeLock.Lock() active++ activeLock.Unlock() errCh <- err } }(i) } wait.Wait() // 如果active pods少于設(shè)置的并發(fā)值,則先計(jì)算diff值,具體的計(jì)算跟Completions和Parallelism的配置有關(guān)。 // 1.job.Spec.Completions == nil && succeeded pods > 0, 則diff = 0; // 2.job.Spec.Completions == nil && succeeded pods = 0,則diff = Parallelism; // 3.job.Spec.Completions != nil 則diff等于(job.Spec.Completions - succeeded - active)和parallelism中的最小值(非負(fù)值); // 計(jì)算好diff值即知道了還需要?jiǎng)?chuàng)建多少pods,由于等待創(chuàng)建的pods數(shù)量可能會(huì)非常龐大,所以這里有個(gè)分批創(chuàng)建的邏輯: // 第一批創(chuàng)建1個(gè),第二批創(chuàng)建2個(gè),后續(xù)按2的倍數(shù)繼續(xù)往下分批創(chuàng)建,但是每次創(chuàng)建的數(shù)量都不會(huì)大于diff值(diff值每次都會(huì)減掉對(duì)應(yīng)的分批數(shù)量)。 // 如果創(chuàng)建pod超時(shí),則直接return; // 如果創(chuàng)建pod失敗,則回滾ControllerExpectations的adds 和 active 值,并不在執(zhí)行后續(xù)未執(zhí)行的 pods. } else if active < parallelism { wantActive := int32(0) if job.Spec.Completions == nil { if succeeded > 0 { wantActive = active } else { wantActive = parallelism } } else { wantActive = *job.Spec.Completions - succeeded if wantActive > parallelism { wantActive = parallelism } } diff := wantActive - active if diff < 0 { utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff = 0 } jm.expectations.ExpectCreations(jobKey, int(diff)) errCh = make(chan error, diff) glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff wait := sync.WaitGroup{} // 分批創(chuàng)建 diff 數(shù)量的 pods for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) { errorCount := len(errCh) wait.Add(int(batchSize)) for i := int32(0); i < batchSize; i++ { go func() { defer wait.Done() err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil && errors.IsTimeout(err) { return } if err != nil { defer utilruntime.HandleError(err) glog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) jm.expectations.CreationObserved(jobKey) activeLock.Lock() active-- activeLock.Unlock() errCh <- err } }() } wait.Wait() // 如果這次分批創(chuàng)建pods有失敗的情況,則不在處理后續(xù)未執(zhí)行的pods // 需要計(jì)算剩余未執(zhí)行的pods數(shù)量,并更新 ControllerExpectations 的 adds 和 active 值 skippedPods := diff - batchSize if errorCount < len(errCh) && skippedPods > 0 { glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for job %q/%q", skippedPods, job.Namespace, job.Name) active -= skippedPods for i := int32(0); i < skippedPods; i++ { jm.expectations.CreationObserved(jobKey) } break } diff -= batchSize } } select { case err := <-errCh: // 只要前面有錯(cuò)誤產(chǎn)生,則返回出錯(cuò)并會(huì)將該job 繼續(xù)丟入queue,等待下次sync if err != nil { return active, err } default: } return active, nil }
整個(gè)job controller實(shí)現(xiàn)流程到這里就結(jié)束了,后面會(huì)繼續(xù)分析cronJob controller的源碼實(shí)現(xiàn)!
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/32722.html
摘要:如果沒(méi)有指定,則沒(méi)有期限。取消當(dāng)前正在運(yùn)行的,然后新建來(lái)替換。和這兩個(gè)字段也是可選的。設(shè)置限制值為,相關(guān)類型的完成后將不會(huì)被保留。列出所有的列出所有的遍歷所有的根據(jù)字段確定該是否由所創(chuàng)建。 k8s version: v1.11.0author: [email protected] 源碼流程圖 showImg(https://segmentfault.com/img/r...
摘要:用于批量處理短暫的一次性任務(wù),并保證指定數(shù)量的成功結(jié)束。一旦有一個(gè)成功結(jié)束,其他都會(huì)準(zhǔn)備退出。默認(rèn)值指定可運(yùn)行的時(shí)間期限,超過(guò)時(shí)間還未結(jié)束,系統(tǒng)將會(huì)嘗試進(jìn)行終止。已知問(wèn)題設(shè)置為時(shí),會(huì)與沖突,可以暫時(shí)將設(shè)置為進(jìn)行規(guī)避。 介紹 Kubernetes有兩個(gè)概念跟job有關(guān): Job: 負(fù)責(zé)批量處理短暫的一次性任務(wù),僅執(zhí)行一次,并保證處理的一個(gè)或者多個(gè)Pod成功結(jié)束。 CronJob: ...
摘要:執(zhí)行容器內(nèi)部運(yùn)行的執(zhí)行工作作為容器的執(zhí)行驅(qū)動(dòng),負(fù)責(zé)創(chuàng)建容器運(yùn)行命名空間,負(fù)責(zé)容器資源使用的統(tǒng)計(jì)與限制,負(fù)責(zé)容器內(nèi)部進(jìn)程的真正運(yùn)行等。典型的在啟動(dòng)后,首先將設(shè)置為進(jìn)行一系列檢查然后將其切換為供用戶使用。 在https://segmentfault.com/a/11... 容器,隔離,云的概述。這篇對(duì)其中用途廣泛的docker,k8s做詳細(xì)介紹,并給出云搭建的生態(tài)環(huán)境體系。 docker ...
摘要:執(zhí)行容器內(nèi)部運(yùn)行的執(zhí)行工作作為容器的執(zhí)行驅(qū)動(dòng),負(fù)責(zé)創(chuàng)建容器運(yùn)行命名空間,負(fù)責(zé)容器資源使用的統(tǒng)計(jì)與限制,負(fù)責(zé)容器內(nèi)部進(jìn)程的真正運(yùn)行等。典型的在啟動(dòng)后,首先將設(shè)置為進(jìn)行一系列檢查然后將其切換為供用戶使用。 在https://segmentfault.com/a/11... 容器,隔離,云的概述。這篇對(duì)其中用途廣泛的docker,k8s做詳細(xì)介紹,并給出云搭建的生態(tài)環(huán)境體系。 docker ...
閱讀 3492·2021-11-22 09:34
閱讀 1942·2019-08-30 12:53
閱讀 3546·2019-08-28 18:07
閱讀 3066·2019-08-27 10:55
閱讀 3020·2019-08-26 10:12
閱讀 3649·2019-08-23 18:21
閱讀 1395·2019-08-23 14:10
閱讀 1545·2019-08-23 13:04