当前位置: 首页 > news >正文

Volcano Controller控制器源码解析

Volcano Controller控制器源码解析

本文从源码的角度分析Volcano Controller相关功能的实现。

本篇Volcano版本为v1.8.0。

Volcano项目地址: https://github.com/volcano-sh/volcano

controller命令main入口: cmd/controller-manager/main.go

controller相关代码目录: pkg/controllers
更多文章访问: https://www.cyisme.top

整体实现并不复杂, 而且项目比较简洁、风格一致(与k8s controller代码风格也一致)。可以作为学习开发k8s controller的一个参考。
在这里插入图片描述

代码风格

controller需要实现framework中interface的定义。

type Controller interface {Name() string// 初始化Initialize(opt *ControllerOption) error// 运行Run(stopCh <-chan struct{})
}

Initialize方法作为根据option初始化controller的入口, 像infomer设置、queue设置、cache设置等都在这里完成。

func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {// clientjf.kubeClient = opt.KubeClientjf.vcClient = opt.VolcanoClient// informerjf.jobFlowInformer = informerfactory.NewSharedInformerFactory(jf.vcClient, 0).Flow().V1alpha1().JobFlows()jf.jobFlowSynced = jf.jobFlowInformer.Informer().HasSyncedjf.jobFlowLister = jf.jobFlowInformer.Lister()jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    jf.addJobFlow,UpdateFunc: jf.updateJobFlow,})// 参数jf.maxRequeueNum = opt.MaxRequeueNumif jf.maxRequeueNum < 0 {jf.maxRequeueNum = -1}// queuejf.queue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())// 入队的工具函数jf.enqueueJobFlow = jf.enqueue// 处理队列中数据的处理函数jf.syncHandler = jf.handleJobFlow// ...
}

Run方法会运行多个goroutine, 执行操作

func (jf *jobflowcontroller) Run(stopCh <-chan struct{}) {defer jf.queue.ShutDown()go jf.jobFlowInformer.Informer().Run(stopCh)go jf.jobTemplateInformer.Informer().Run(stopCh)go jf.jobInformer.Informer().Run(stopCh)cache.WaitForCacheSync(stopCh, jf.jobSynced, jf.jobFlowSynced, jf.jobTemplateSynced)// 使用 k8s pkg中的util , 与k8s controller的风格一致go wait.Until(jf.worker, time.Second, stopCh)klog.Infof("JobFlowController is running ...... ")<-stopCh
}

worker会负责处理队列中的数据, 交给handler处理。 vocalno中所有的controller外层都是这执行逻辑(可能会有细微差别), 具体的handler 是差异化的。所以后面的controller介绍也不会再提这一部分, 会着重handler的实现。

func (jf *jobflowcontroller) worker() {// 代理一层for jf.processNextWorkItem() {}
}func (jf *jobflowcontroller) processNextWorkItem() bool {// 获取数据obj, shutdown := jf.queue.Get()if shutdown {// Stop workingreturn false}defer jf.queue.Done(obj)req, ok := obj.(*apis.FlowRequest)if !ok {klog.Errorf("%v is not a valid queue request struct.", obj)return true}// 具体处理handlererr := jf.syncHandler(req)jf.handleJobFlowErr(err, obj)return true
}

Queue Controller

Queue Controler主要监听三个资源对象:

  • Queue
  • PodGroup
  • Command

控制器会监听他们的状态,用以更新Queue资源的状态,从而实现依据Queue资源的调度。

func (c *queuecontroller) Initialize(opt *framework.ControllerOption) error {// 省略部分代码queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    c.addQueue,UpdateFunc: c.updateQueue,DeleteFunc: c.deleteQueue,})pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{// 省略部分代码})if utilfeature.DefaultFeatureGate.Enabled(features.QueueCommandSync) {c.cmdInformer = factory.Bus().V1alpha1().Commands()c.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch v := obj.(type) {case *busv1alpha1.Command:return IsQueueReference(v.TargetObject)default:return false}},Handler: cache.ResourceEventHandlerFuncs{AddFunc: c.addCommand,},})c.cmdLister = c.cmdInformer.Lister()c.cmdSynced = c.cmdInformer.Informer().HasSynced}// 省略部分代码
}

监听到的消息会放到队列中, 队列是通过k8s pkg中的WorkQueue实现的。

type queuecontroller struct {// 省略部分代码// ...// queues that need to be updated.queue        workqueue.RateLimitingInterfacecommandQueue workqueue.RateLimitingInterface// queue name -> podgroup namespace/namepodGroups map[string]map[string]struct{}// 省略部分代码// ...
}

queuecontroller.queue接收apis.Request对象作为消息,queuecontroller.commandQueue接收busv1alpha1.Command对象作为消息。

在经过queuecontroller.handlerCommand方法处理后, queuecontroller.commandQueue中的busv1alpha1.Command对象转换成apis.Request事件,放到queuecontroller.queue中统一处理。

func (c *queuecontroller) handleCommand(cmd *busv1alpha1.Command) error {// 接受处理, 删除commanderr := c.vcClient.BusV1alpha1().Commands(cmd.Namespace).Delete(context.TODO(), cmd.Name, metav1.DeleteOptions{})if err != nil {// 省略部分代码}// command对象中会有ownerReference, 从中提取queue对象名称req := &apis.Request{QueueName: cmd.TargetObject.Name,// CommandIssuedEvent是内部事件类型, 用户引发命令时, 会触发该事件Event:     busv1alpha1.CommandIssuedEvent,Action:    busv1alpha1.Action(cmd.Action),}// 将command事件转换成request事件,放到queue中c.enqueueQueue(req)return nil
}

queuecontroller.handleQueuequeuecontroller.queue的事件处理函数,主要是根据request事件的类型,调用不同的处理函数更新Queue资源的状态。

func (c *queuecontroller) handleQueue(req *apis.Request) error {// 这里的queue是k8s中的Queue资源对象queue, err := c.queueLister.Get(req.QueueName)if err != nil {// 省略部分代码}// 根据queue当前的状态, 生成不同执行器queueState := queuestate.NewState(queue)// 执行操作if err := queueState.Execute(req.Action); err != nil {// 省略部分代码}return nil
}

Queue资源有4中状态(QueueState), 四种状态分别对应四种执行器:

  • Open --> openState
  • Closed --> closedState
  • Closing --> closingState
  • Unknown --> unknownState

closeState执行器为例,代码实现如下:(其他的执行器实现类似,不再举例)

type closedState struct {queue *v1beta1.Queue
}
func (cs *closedState) Execute(action v1alpha1.Action) error {switch action {// 开启动作case v1alpha1.OpenQueueAction:return OpenQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {status.State = v1beta1.QueueStateOpen})// 关闭动作case v1alpha1.CloseQueueAction:return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {status.State = v1beta1.QueueStateClosed})// 默认动作default:return SyncQueue(cs.queue, func(status *v1beta1.QueueStatus, podGroupList []string) {specState := cs.queue.Status.Stateif specState == v1beta1.QueueStateOpen {status.State = v1beta1.QueueStateOpenreturn}if specState == v1beta1.QueueStateClosed {status.State = v1beta1.QueueStateClosedreturn}status.State = v1beta1.QueueStateUnknown})}
}

Queue资源volcano中有4种动作(Action), 执行器中将根据动作执行不同的操作:

  • EnqueueJob (这个动作执行器中没有用到)
  • SyncQueue (这个动作执行器中执行默认操作)
  • OpenQueue
  • CloseQueue

实际上, 对应这三个动作会有三个处理函数,他们被定义为QueueActionFn类型

type QueueActionFn func(queue *v1beta1.Queue, fn UpdateQueueStatusFn) error

因为Queue资源可以重复的Close或者Open, 所以其实执行器中并没有拦截或者限制这种操作, 而是比较简单的对状态进行重置。

操作调用的函数如下:

  • closedStateclosingState状态执行器中
OpenQueueAction
OpenQueue
CloseQueueAction
SyncQueue
Other
  • openState状态执行器中
OpenQueueAction
SyncQueue
CloseQueueAction
CloseQueue
Other
  • unknownState状态执行器中
OpenQueueAction
OpenQueue
CloseQueueAction
CloseQueue
Other
SyncQueue

可以看出, 执行逻辑:

  • 如果当前状态与预期状态一致, 则调用SyncQueue同步状态
  • 如果当前状态与预期状态不一致, 则调用OpenQueue或者CloseQueue更新状态
  • 如果状态未知, 则调用SyncQueue同步状态

然后来看一下具体的函数实现

// syncQueue主要是更新queue中podgroup的状态计数
func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {// 获取queue中的podgrouppodGroups := c.getPodGroups(queue.Name)queueStatus := schedulingv1beta1.QueueStatus{}for _, pgKey := range podGroups {// 获取podgroup对象pg, err := c.pgLister.PodGroups(ns).Get(name)// 更新计数器switch pg.Status.Phase {case schedulingv1beta1.PodGroupPending:queueStatus.Pending++case schedulingv1beta1.PodGroupRunning:queueStatus.Running++case schedulingv1beta1.PodGroupUnknown:queueStatus.Unknown++case schedulingv1beta1.PodGroupInqueue:queueStatus.Inqueue++}}// updateStateFn是在执行器中定义的函数, 用于更新queue的状态if updateStateFn != nil {updateStateFn(&queueStatus, podGroups)} else {queueStatus.State = queue.Status.State}// 省略部分代码// ...// 调用api更新queue的状态if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {}return nil
}
func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {newQueue := queue.DeepCopy()newQueue.Status.State = schedulingv1beta1.QueueStateOpen// 这里调用Update没有看懂, copy出来的对应应该除了状态,其他的都是一样的// 而Update方法是更新对象, 而不是更新状态if queue.Status.State != newQueue.Status.State {if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),fmt.Sprintf("Open queue failed for %v", err))return err}c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), "Open queue succeed")} else {return nil}// 获取queue对象q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})newQueue = q.DeepCopy()// 执行操作if updateStateFn != nil {updateStateFn(&newQueue.Status, nil)} else {return fmt.Errorf("internal error, update state function should be provided")}// 调用api更新queue的状态if queue.Status.State != newQueue.Status.State {if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {}}return nil
}
// closeQueue与之类似, 不再举例

PodGroup Controller

PodGroup Controller比较简单, 它负责为未指定PodGroup的Pod分配PodGroup。

func (pg *pgcontroller) processNextReq() bool {// 省略部分代码// 获取pod对象pod, err := pg.podLister.Pods(req.podNamespace).Get(req.podName)// 根据调度器名称过滤if !commonutil.Contains(pg.schedulerNames, pod.Spec.SchedulerName) {return true}// 如果pod已经有podgroup, 则不再处理if pod.Annotations != nil && pod.Annotations[scheduling.KubeGroupNameAnnotationKey] != "" {return true}// 为pod分配podgroupif err := pg.createNormalPodPGIfNotExist(pod); err != nil {// AddRateLimited将在一段时间后重新添加req到队列中pg.queue.AddRateLimited(req)return true}// 省略部分代码
}
func (pg *pgcontroller) createNormalPodPGIfNotExist(pod *v1.Pod) error {// pgname将以”podgroup-“开头pgName := helpers.GeneratePodgroupName(pod)if _, err := pg.pgLister.PodGroups(pod.Namespace).Get(pgName); err != nil {// podgroup不存在, 则创建if !apierrors.IsNotFound(err) {return err}// 省略了一些从pod中继承赋值的代码obj := &scheduling.PodGroup{ObjectMeta: metav1.ObjectMeta{// podgroup的ownerReference是podOwnerReferences: newPGOwnerReferences(pod),},Spec: scheduling.PodGroupSpec{// 最小成员数为1MinMember:         1,},Status: scheduling.PodGroupStatus{// 状态为pendingPhase: scheduling.PodGroupPending,},}// 继承pod的owner信息,写入到annotationspg.inheritUpperAnnotations(pod, obj)// 继承pod annotationsif queueName, ok := pod.Annotations[scheduling.QueueNameAnnotationKey]; ok {obj.Spec.Queue = queueName}// 省略annotations继承的代码// ...// 创建podgroupif _, err := pg.vcClient.SchedulingV1beta1().PodGroups(pod.Namespace).Create(context.TODO(), obj, metav1.CreateOptions{}); err != nil {}}// 如果存在pg,则更新pod的annotationsreturn pg.updatePodAnnotations(pod, pgName)
}

JobFlow Controller

JobFlow是在volcano 1.8之后引入的CRD对象, 它配合JobTemplate使用,用于vcjob任务的编排。

JobFlow Controller主要监听JobFlowJob两个对象的变化, 并更新JobFlow的状态。

func (jf *jobflowcontroller) Initialize(opt *framework.ControllerOption) error {// ...jf.jobFlowInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    jf.addJobFlow,UpdateFunc: jf.updateJobFlow,})jf.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{UpdateFunc: jf.updateJob,})// 省略部分代码
}

Job更新时,会判断是否属于JobFlow, 如果是,则将关联的JobFlow加入到队列。

func (jf *jobflowcontroller) updateJob(oldObj, newObj interface{}) {// ...if newJob.ResourceVersion == oldJob.ResourceVersion {return}jobFlowName := getJobFlowNameByJob(newJob)if jobFlowName == "" {return}req := &apis.FlowRequest{Namespace:   newJob.Namespace,JobFlowName: jobFlowName,Action:      jobflowv1alpha1.SyncJobFlowAction,Event:       jobflowv1alpha1.OutOfSyncEvent,}jf.queue.Add(req)
}

放入队列的apis.FlowRequest对象,最终会由handleJobFlow函数处理, 然后根据当前JobFlow的状态,生成并调用不同执行器。(这里的运行逻辑和Queue的差不多)

func (jf *jobflowcontroller) handleJobFlow(req *apis.FlowRequest) error {// 省略部分代码// ...jobflow, err := jf.jobFlowLister.JobFlows(req.Namespace).Get(req.JobFlowName)// 根据jobflow的状态, 生成不同的执行器jobFlowState := jobflowstate.NewState(jobflow)if err := jobFlowState.Execute(req.Action); err != nil {}return nil
}

JobFlow有5种状态(Flow Phase), 分别对应5种执行器::

  • Succeed --> succeedState
  • Terminating --> terminatingState (这个状态的执行器并没有实际动作,因为资源即将释放)
  • Failed --> failedState (这个状态的执行器并没有实际动作,因为状态异常)
  • Running --> runningState
  • Pending --> pendingState

JobFlow目前只有1种动作SyncJobFlow(Action), 由SyncJobFlow函数执行具体操作。

func (jf *jobflowcontroller) syncJobFlow(jobFlow *v1alpha1flow.JobFlow, updateStateFn state.UpdateJobFlowStatusFn) error {// ...// 如果当前jobflow的状态为succeed, 且job的保留策略为delete, 则删除所有由jobflow创建的jobif jobFlow.Spec.JobRetainPolicy == v1alpha1flow.Delete && jobFlow.Status.State.Phase == v1alpha1flow.Succeed {if err := jf.deleteAllJobsCreatedByJobFlow(jobFlow); err != nil {}return nil}// 根据jobflow中声明的jobtemplate创建job, 声明顺序即为创建顺序if err := jf.deployJob(jobFlow); err != nil {}// 获取jobflow下所有job的状态jobFlowStatus, err := jf.getAllJobStatus(jobFlow)if err != nil {return err}// 更新jobflow的状态jobFlow.Status = *jobFlowStatusupdateStateFn(&jobFlow.Status, len(jobFlow.Spec.Flows))_, err = jf.vcClient.FlowV1alpha1().JobFlows(jobFlow.Namespace).UpdateStatus(context.Background(), jobFlow, metav1.UpdateOptions{})return nil
}
func (jf *jobflowcontroller) deployJob(jobFlow *v1alpha1flow.JobFlow) error {for _, flow := range jobFlow.Spec.Flows {jobName := getJobName(jobFlow.Name, flow.Name)if _, err := jf.jobLister.Jobs(jobFlow.Namespace).Get(jobName); err != nil {if errors.IsNotFound(err) {// 如果job没有依赖, 则直接创建if flow.DependsOn == nil || flow.DependsOn.Targets == nil {// createJob根据jobtemplat创建job// 创建已经存在的job, 不会报错if err := jf.createJob(jobFlow, flow); err != nil {return err}} else {// 有依赖则判断依赖的job是否已经完成// 任何一个依赖的job未完成都不会创建flag, err := jf.judge(jobFlow, flow)if flag {if err := jf.createJob(jobFlow, flow); err != nil {return err}}}continue}return err}}return nil
}

Job Controller

Jobvolcano中的核心资源对象, 为了避免与k8s中的Job对象混淆, 也会称之为vcjob或者vj

Job Controller监听多个资源对象的变更事件:

func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {// ...cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    cc.addJob,UpdateFunc: cc.updateJob,DeleteFunc: cc.deleteJob,})cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{FilterFunc: func(obj interface{}) bool {switch v := obj.(type) {case *busv1alpha1.Command:if v.TargetObject != nil &&v.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&v.TargetObject.Kind == "Job" {return true}return falsedefault:return false}},Handler: cache.ResourceEventHandlerFuncs{AddFunc: cc.addCommand,},},)cc.podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:    cc.addPod,UpdateFunc: cc.updatePod,DeleteFunc: cc.deletePod,})cc.pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{UpdateFunc: cc.updatePodGroup,})// ...
}

vcjob的处理量会比较大, 所以Job Controller会启动多个worker来处理事件, 每个worker会有属于自己的queue

func (cc *jobcontroller) Run(stopCh <-chan struct{}) {// ...// commandQueue是用于处理busv1alpha1.Command对象的队列// 与Queue Controller中类似, 最终会转换成apis.Request对象, 放入queue中go wait.Until(cc.handleCommands, 0, stopCh)var i uint32// 启动多个workerfor i = 0; i < cc.workers; i++ {go func(num uint32) {wait.Until(func() {cc.worker(num)},time.Second,stopCh)}(i)}// cache用于缓存资源状态go cc.cache.Run(stopCh)// 处理错误taskgo wait.Until(cc.processResyncTask, 0, stopCh)// ...
}

新的事件会通过getWokerQueue函数来获取对应的queue, 然后放入队列中。

func (cc *jobcontroller) getWorkerQueue(key string) workqueue.RateLimitingInterface {// ...hashVal = fnv.New32()hashVal.Write([]byte(key))val = hashVal.Sum32()// 通过hash值取模来获取queuequeue := cc.queueList[val%cc.workers]return queue
}

command事件转换成request事件的过程与Queue Controller类似, 这里不再赘述。 queue中的数据处理是由processNextReq函数接收的。

func (cc *jobcontroller) processNextReq(count uint32) bool {// 获取queue, queue的数量与worker数量相同并一一对应queue := cc.queueList[count]req := obj.(apis.Request)key := jobcache.JobKeyByReq(&req)if !cc.belongsToThisRoutine(key, count) {// 这里做了校验, 如果key不属于当前worker, 则重新放入queue中queueLocal := cc.getWorkerQueue(key)queueLocal.Add(req)return true}jobInfo, err := cc.cache.Get(key)// state.NewState 这个名字见过很多次了, 用于生成执行器st := state.NewState(jobInfo)if st == nil {return true}// 获取当前需要执行的动作action := applyPolicies(jobInfo.Job, &req)// 非同步动作, 记录事件if action != busv1alpha1.SyncJobAction {cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf("Start to execute action %s ", action))}// 执行动作if err := st.Execute(action); err != nil {// 如果执行失败, 则根据重试次数, 决定是否重新放入queue中。// maxRequeueNum -1, 表示无限重试if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {queue.AddRateLimited(req)return true}}// 如果执行成功, 则删除queue中的事件queue.Forget(req)return true
}

vcjob有10种状态(JobPhase), 对应8种执行器:

  • Pending --> pendingState
  • Aborting --> abortingState
  • Aborted --> abortedState
  • Running --> runningState
  • Restarting --> restartingState
  • Completing --> completingState
  • Terminating --> terminatingState
  • Terminated、Failed、Completed --> terminatedState

abortedState为例, 代码实现如下:

func (as *abortedState) Execute(action v1alpha1.Action) error {switch action {case v1alpha1.ResumeJobAction:return KillJob(as.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool {status.State.Phase = vcbatch.Restartingstatus.RetryCount++return true})default:return KillJob(as.job, PodRetainPhaseSoft, nil)}
}

vcjob有11种动作(Action), 执行器中将根据动作执行不同的操作:

  • AbortJob 如果设置此操作,整个工作将被中止;所有作业的Pod都将被驱逐,并且不会重新创建任何Pod
  • RestartJob 如果设置了此操作,整个作业将重新启动
  • RestartTask 如果设置此操作,则仅重新启动任务;默认操作。
  • TerminateJob 如果设置了此操作,整个工作将被终止并且无法恢复;所有作业的Pod都将被驱逐,并且不会重新创建任何Pod。
  • CompleteJob 如果设置此操作,未完成的pod将被杀死,作业完成。
  • ResumeJob 恢复中止的工作。
  • SyncJob 同步Job/Pod状态的操作。(内部动作)
  • EnqueueJob 同步作业入队状态的操作。(内部动作)
  • SyncQueue 同步队列状态的操作。(内部动作)
  • OpenQueue 打开队列的操作。(内部动作)
  • CloseQueue 关闭队列的操作。(内部动作)

实际上, 对应这些动作会有不同的处理函数,他们被定义为ActionFn类型和KillActionFn类型。 这两个类型被声明为SyncJobKillJob的函数,并被执行器调用。

type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error
type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error
var (// SyncJob将根据Job的规范创建或删除Pod。SyncJob ActionFn// KillJob 将杀死状态不在podRetainPhase中的pod.KillJob KillActionFn
)

操作调用的函数如下:(虽然不同动作调用的操作可能相同, 但是会更新不同的状态信息)

  • pendingStaterunningState状态执行器中:
RestartJobAction
KillJob
AbortJobAction
CompleteJobAction
TerminateJobAction
Other
SyncJob
  • restartingState状态执行器中, 直接调用KillJob
  • finishedState为最终状态, 所以不会执行任何动作。
  • terminatingState 直接调用KillJob
  • abortingStateabortedState状态执行器中:
ResumeJobAction
KillJob
Other
  • completingState直接调用KillJob

可以看出, 执行逻辑:

  • 如果是干预vcjob状态的动作, 则调用KillJob
  • 反之, 则调用SyncJob

然后来看一下具体实现函数。

killJob

killJob对应删除pod的操作。

遍历pod执行动作
设置job状态
执行删除插件
更新job状态
删除podgroup
func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {// job已经处于删除状态, 则不再处理if job.DeletionTimestamp != nil {return nil}// 状态计数器, 用于更新job的状态var pending, running, terminating, succeeded, failed, unknown int32taskStatusCount := make(map[string]batch.TaskState)for _, pods := range jobInfo.Pods {for _, pod := range pods {total++if pod.DeletionTimestamp != nil {// pod处于删除状态, 则不再处理continue}maxRetry := job.Spec.MaxRetrylastRetry := false// 判断是否是最后一次重试if job.Status.RetryCount >= maxRetry-1 {lastRetry = true}// 如果是最后一次重试, 则保留失败和成功的podretainPhase := podRetainPhaseif lastRetry {// var PodRetainPhaseSoft = PhaseMap{//     v1.PodSucceeded: {},//     v1.PodFailed:    {},// }retainPhase = state.PodRetainPhaseSoft}_, retain := retainPhase[pod.Status.Phase]// 如果不保留pod, 则删除podif !retain {err := cc.deleteJobPod(job.Name, pod)if err == nil {terminating++continue}// 失败放入重试队列errs = append(errs, err)cc.resyncTask(pod)}// 更新状态计数器classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)calcPodStatus(pod, taskStatusCount)}}if len(errs) != 0 {return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)}// 更新job的状态计数job = job.DeepCopy()job.Status.Version++job.Status.Pending = pendingjob.Status.Running = runningjob.Status.Succeeded = succeededjob.Status.Failed = failedjob.Status.Terminating = terminatingjob.Status.Unknown = unknownjob.Status.TaskStatusCount = taskStatusCount// 更新运行持续时间job.Status.RunningDuration = &metav1.Duration{Duration: time.Since(jobInfo.Job.CreationTimestamp.Time)}// 更新job的状态if updateStatus != nil {if updateStatus(&job.Status) {job.Status.State.LastTransitionTime = metav1.Now()jobCondition := newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)job.Status.Conditions = append(job.Status.Conditions, jobCondition)}}// 执行删除插件if err := cc.pluginOnJobDelete(job); err != nil {return err}// 调用api更新job的状态newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})if err != nil {return err}if e := cc.cache.Update(newJob); e != nil {return e}// 删除podgrouppgName := job.Name + "-" + string(job.UID)if err := cc.vcClient.SchedulingV1beta1().PodGroups(job.Namespace).Delete(context.TODO(), pgName, metav1.DeleteOptions{}); err != nil {if !apierrors.IsNotFound(err) {return err}}return nil
}

syncJob

syncJob对应创建pod的操作。

遍历task统计add/del的pod
创建pod
删除pod
更新job状态
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {if jobInfo.Job.DeletionTimestamp != nil {return nil}// ...// 获取job的queue信息queueInfo, err := cc.GetQueueInfo(job.Spec.Queue)if err != nil {return err}var jobForwarding bool// ExtendClusters 这个属性没有找到介绍, 好像只在这里用到了if len(queueInfo.Spec.ExtendClusters) != 0 {jobForwarding = trueif len(job.Annotations) == 0 {job.Annotations = make(map[string]string)}job.Annotations[batch.JobForwardingKey] = "true"job, err = cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).Update(context.TODO(), job, metav1.UpdateOptions{})if err != nil {return err}}// 初始化jobif !isInitiated(job) {// initiateJob中会更新job状态、调用add插件、更新podgroupif job, err = cc.initiateJob(job); err != nil {return err}} else {// initOnJobUpdate会调用add插件、更新podgroupif err = cc.initOnJobUpdate(job); err != nil {return err}}// ... 省略 queueInfo.Spec.ExtendClusters 的处理var syncTask boolpgName := job.Name + "-" + string(job.UID)if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(pgName); pg != nil {if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {syncTask = true}// ...}var jobCondition batch.JobCondition// 如果包含刚创建的podgroup, 则更新job状态if !syncTask {if updateStatus != nil {if updateStatus(&job.Status) {job.Status.State.LastTransitionTime = metav1.Now()jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)job.Status.Conditions = append(job.Status.Conditions, jobCondition)}}newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})// ...return nil}// ... 省略一些计数声明代码// ...waitCreationGroup := sync.WaitGroup{}// 遍历job中的taskfor _, ts := range job.Spec.Tasks {// ...var podToCreateEachTask []*v1.Pod// 根据副本数, 创建或删除podfor i := 0; i < int(ts.Replicas); i++ {podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)if pod, found := pods[podName]; !found {newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)if err := cc.pluginOnPodCreate(job, newPod); err != nil {return err}podToCreateEachTask = append(podToCreateEachTask, newPod)waitCreationGroup.Add(1)} else {delete(pods, podName)if pod.DeletionTimestamp != nil {atomic.AddInt32(&terminating, 1)continue}// 更新状态计数器classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown)calcPodStatus(pod, taskStatusCount)}}// 统计需要创建和删除的podpodToCreate[ts.Name] = podToCreateEachTaskfor _, pod := range pods {podToDelete = append(podToDelete, pod)}}// 创建podfor taskName, podToCreateEachTask := range podToCreate {if len(podToCreateEachTask) == 0 {continue}go func(taskName string, podToCreateEachTask []*v1.Pod) {taskIndex := jobhelpers.GetTasklndexUnderJob(taskName, job)if job.Spec.Tasks[taskIndex].DependsOn != nil {// 统一判断依赖关系是否满足需求, 不满足则不创建podif !cc.waitDependsOnTaskMeetCondition(taskName, taskIndex, podToCreateEachTask, job) {for _, pod := range podToCreateEachTask {go func(pod *v1.Pod) {defer waitCreationGroup.Done()}(pod)}return}}// 执行创建for _, pod := range podToCreateEachTask {go func(pod *v1.Pod) {defer waitCreationGroup.Done()newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})if err != nil && !apierrors.IsAlreadyExists(err) {appendError(&creationErrs, fmt.Errorf("failed to create pod %s, err: %#v", pod.Name, err))} else {classifyAndAddUpPodBaseOnPhase(newPod, &pending, &running, &succeeded, &failed, &unknown)calcPodStatus(pod, taskStatusCount)}}(pod)}}(taskName, podToCreateEachTask)}// 等待创建完成waitCreationGroup.Wait()if len(creationErrs) != 0 {return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))}// 删除podfor _, pod := range podToDelete {go func(pod *v1.Pod) {defer waitDeletionGroup.Done()err := cc.deleteJobPod(job.Name, pod)if err != nil {appendError(&deletionErrs, err)cc.resyncTask(pod)} else {klog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",pod.Name, job.Namespace, job.Name)atomic.AddInt32(&terminating, 1)}}(pod)}// 等待删除完成waitDeletionGroup.Wait()if len(deletionErrs) != 0 {return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))}job.Status = batch.JobStatus{State: job.Status.State,Pending:             pending,Running:             running,Succeeded:           succeeded,Failed:              failed,Terminating:         terminating,Unknown:             unknown,Version:             job.Status.Version,MinAvailable:        job.Spec.MinAvailable,TaskStatusCount:     taskStatusCount,ControlledResources: job.Status.ControlledResources,Conditions:          job.Status.Conditions,RetryCount:          job.Status.RetryCount,}// 更新job状态if updateStatus != nil && updateStatus(&job.Status) {job.Status.State.LastTransitionTime = metav1.Now()jobCondition = newCondition(job.Status.State.Phase, &job.Status.State.LastTransitionTime)job.Status.Conditions = append(job.Status.Conditions, jobCondition)}// 调用api更新job状态newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(context.TODO(), job, metav1.UpdateOptions{})if err != nil {klog.Errorf("Failed to update status of Job %v/%v: %v",job.Namespace, job.Name, err)return err}if e := cc.cache.Update(newJob); e != nil {return e}return nil
}

其他控制器

其他一些控制器因为逻辑比较简单,就不再从代码解析了:

  • jobTemplate controller 监听vcjobjobtemplate, 用于更新jobtemplate 状态中的JobDependsOnList, 即有哪些vcjob依赖于该jobtemplatejobTemplate被官方称之为vcjob的套壳(jobTemplate.spec = vcjob.spec), 目的是为了职责区分。
  • gc controller 监听具有.spec.ttlSecondsAfterFinished属性的vcjob, ttl过期则删除job

相关文章:

Volcano Controller控制器源码解析

Volcano Controller控制器源码解析 本文从源码的角度分析Volcano Controller相关功能的实现。 本篇Volcano版本为v1.8.0。 Volcano项目地址: https://github.com/volcano-sh/volcano controller命令main入口: cmd/controller-manager/main.go controller相关代码目录: pkg/co…...

开源协议简介和选择

软件国产化已经提到日程上了&#xff0c;先来研究一下开源协议。 引言 在追求“自由”的开源软件领域的同时不能忽视程序员的权益。为了激发程序员的创造力&#xff0c;现今世界上有超过60种的开源许可协议被开源促进组织&#xff08;Open Source Initiative&#xff09;所认可…...

大创项目推荐 深度学习卫星遥感图像检测与识别 -opencv python 目标检测

文章目录 0 前言1 课题背景2 实现效果3 Yolov5算法4 数据处理和训练5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **深度学习卫星遥感图像检测与识别 ** 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐…...

pod的环节

pod 是k8s当中最小的资源管理组件 Pod也是最小化运行容器化的应用的资源管理对象 Pod是一个抽象化的概念&#xff0c;可以理解为一个或多个容器化的集合 在一个pod当中运行一个容器&#xff0c;是最常用的方式 在一个pod当中同时运行多个容器&#xff0c;在一个pod当中可以…...

Unity | Shader基础知识番外(向量数学知识速成)

目录 一、向量定义 二、计算向量 三、向量的加法&#xff08;连续行走&#xff09; 四、向量的长度 五、单位向量 六、向量的点积 1 计算 2 作用 七、向量的叉乘 1 承上启下 2 叉乘结论 3 叉乘的计算&#xff08;这里看不懂就百度叉乘计算&#xff09; 八、欢迎收…...

一个小白的微不足道的见解关于未来

随着科技的不断发展&#xff0c;IT行业日益壮大&#xff0c;运维工程师在其中扮演着至关重要的角色。他们负责维护和管理企业的技术基础设施&#xff0c;确保系统的正常运行。然而&#xff0c;随着技术的进步和行业的变化&#xff0c;运维工程师的未来将面临着一系列挑战和机遇…...

图的遍历(搜索)算法(深度优先算法DFS和广度优先算法BFS)

一、图的遍历的定义&#xff1a; 从图的某个顶点出发访问遍图中所有顶点&#xff0c;且每个顶点仅被访问一次。&#xff08;连通图与非连通图&#xff09; 二、深度优先遍历&#xff08;DFS&#xff09;&#xff1b; 1、访问指定的起始顶点&#xff1b; 2、若当前访问的顶点…...

抖店做不起来?新手常见起店失败问题总结,看下你中了几条?

我是王路飞。 能看到这篇文章的&#xff0c;肯定是处境符合标题内容了。 抖店的门槛很低&#xff0c;运营思路其实也不算难&#xff0c;但就是很多新手做不起来。 这中间&#xff0c;可能跟平台、项目没什么关系&#xff0c;而是跟你自己有关系&#xff0c;走错了方向&#…...

【每日面试题】精选java面试题之redis

Redis是什么&#xff1f;为什么要使用Redis&#xff1f; Redis是一个开源的高性能键值对存储数据库。它提供了多种数据结构&#xff0c;包括字符串、列表、集合、有序集合、哈希表等。Redis具有快速、可扩展、持久化、支持多种数据结构等特点&#xff0c;适用于缓存、消息队列…...

OSCP 靶场 - Vault

端口扫描 nmap nmap -O 192.168.162.172 smb枚举 smbmap(kali自带) //枚举GUEST用户可以使用的目录 smbmap -u GUEST -H 192.168.162.172 NTLMrelay—smbrelay 1.制作钓鱼文件 使用GitHub - xct/hashgrab: generate payloads that force authentication against an attacker…...

uniapp子组件向父组件传值

目录 子组件向父组件传值子组件1子组件2 父组件最后 子组件向父组件传值 子组件1 <template><view class"content"><view v-for"(item,index) in list" :key"index">{{item}}</view></view> </template>&…...

过滤特殊 微信昵称

$nickName preg_replace(/[\xf0-\xf7].{3}/, , $userData[nickName]);...

LLM、AGI、多模态AI 篇一:开源大语言模型简记

文章目录 系列开源大模型LlamaChinese-LLaMA-AlpacaLlama2-ChineseLinlyYaYiChatGLMtransformersGPT-3(未完全开源)BERTT5QwenBELLEMossBaichuan...

微信小程序中获取用户当前位置的解决方案

微信小程序中获取用户当前位置的解决方案 1 概述 微信小程序有时需要获取用户当前位置&#xff0c;以便为用户提供基于位置信息的服务&#xff08;附近美食、出行方案等&#xff09;。 获取用户当前位置的前提是用户手机需要打开 GPS 定位开关&#xff1b;其次&#xff0c;微…...

Vue3-35-路由-路由守卫的简单认识

什么是路由守卫 路由守卫&#xff0c;就是在 路由跳转 的过程中&#xff0c; 可以进行一些拦截&#xff0c;做一些逻辑判断&#xff0c; 控制该路由是否可以正常跳转的函数。常用的路由守卫有三个 &#xff1a; beforeEach() : 前置守卫&#xff0c;在路由 跳转前 就会被拦截&…...

制药企业符合CSV验证需要注意什么?

在制药行业中&#xff0c;计算机化系统验证&#xff08;CSV&#xff09;是确保生产过程的合规性和数据完整性的关键要素。通过CSV验证&#xff0c;制药企业可以保证其计算机化系统的可靠性和合规性&#xff0c;从而确保产品质量和患者安全。然而&#xff0c;符合CSV验证并不是一…...

再谈动态SQL

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…...

【数据结构】树

一.二叉树的基本概念和性质&#xff1a; 1.二叉树的递归定义&#xff1a; 二叉树或为空树&#xff0c;或是由一个根结点加上两棵分别称为左子树和右子树的、互不相交的二叉树组成 2.二叉树的特点&#xff1a; &#xff08;1&#xff09;每个结点最多只有两棵子树&#xff0…...

【Midjourney】AI绘画新手教程(一)登录和创建服务器,生成第一幅画作

一、登录Discord 1、访问Discord官网 使用柯學尚网&#xff08;亲测非必须&#xff0c;可加快响应速度&#xff09;访问Discord官方网址&#xff1a;https://discord.com 选择“在您的浏览器中打开Discord” 然后&#xff0c;注册帐号、购买套餐等&#xff0c;在此不做缀述。…...

对比 PyTorch 和 TensorFlow:选择适合你的深度学习框架

目录 引言 深度学习在各行业中的应用 PyTorch 和 TensorFlow 简介 PyTorch&#xff1a;简介与设计理念 发展历史和背景 主要特点和设计理念 TensorFlow&#xff1a;简介与设计理念 发展历史和背景 主要特点和设计理念 PyTorch 和 TensorFlow 的重要性 Pytorch对比Te…...

Oracle笔记-查看表已使用空间最大空间

目前以Oracle18c为例&#xff0c;主要是查这个表USER_SEGMENTS。 在 Oracle 18c 数据库中&#xff0c;USER_SEGMENTS 是一个系统表&#xff0c;用于存储当前用户&#xff08;当前会话&#xff09;拥有的所有段的信息。段是 Oracle 中分配存储空间的逻辑单位&#xff0c;用于存…...

大数据HCIE成神之路之特征工程——特征选择

特征选择 1.1 特征选择 - Filter方法1.1.1 实验任务1.1.1.1 实验背景1.1.1.2 实验目标1.1.1.3 实验数据解析1.1.1.4 实验思路 1.1.2 实验操作步骤 1.2 特征选择 - Wrapper方法1.2.1 实验任务1.2.1.1 实验背景1.2.1.2 实验目标1.2.1.3 实验数据解析1.2.1.4 实验思路 1.2.2 实验操…...

python 正则-常见题目

1、邮箱 print(re.findall(r[\w-][\w-]\.[\w-], weidianqq.com))2、身份证号 xxxxxx yyyy MM dd 375 0 十八位 print(re.findall(r(?:18|19|(?:[23]\d))\d{2}, 2010)) # 年print(re.findall(r(?:0[1-9])|10|11|12, 11)) # 月print(re.findall(r(?:[0-2][1-9])|10|20|30|3…...

解析:Eureka的工作原理

Eureka是Netflix开源的一个基于REST的的服务发现注册框架&#xff0c;它遵循了REST协议&#xff0c;提供了一套简单的API来完成服务的注册和发现。Eureka能够帮助分布式系统中的服务提供者自动将自身注册到注册中心&#xff0c;同时也能够让服务消费者从注册中心发现服务提供者…...

RecyclerView 与 ListView 区别和使用

前置知识&#xff1a;ListView基本用法与性能提升 RecyclerView 与 ListView 区别 RecyclerView 需要设置布局&#xff08;LinearLayoutManager、GridLayoutManager、StaggeredGridLayoutManager&#xff09; recyclerView?.layoutManager LinearLayoutManager(activity) …...

力扣232. 用栈实现队列

题目 请你仅使用两个栈实现先入先出队列。队列应当支持一般队列支持的所有操作&#xff08;push、pop、peek、empty&#xff09;&#xff1a; 实现 MyQueue 类&#xff1a; void push(int x) 将元素 x 推到队列的末尾int pop() 从队列的开头移除并返回元素int peek() 返回队列开…...

这个方法可以让你把图片无损放大

随着数字技术的不断发展&#xff0c;照片无损放大已经成为了摄影领域中的一项重要技术。照片无损放大能够让摄影师在不损失细节和画质的情况下&#xff0c;将照片放大到更大的尺寸&#xff0c;从而让观众能够更加清晰地欣赏到照片中的每一个细节。 今天推荐的这款软件主要是通…...

Springboot整合Elastic-job

一 概述 Elastic-Job 最开始只有一个 elastic-job-core 的项目&#xff0c;定位轻量级、无中心化&#xff0c;最核心的服务就是支持弹性扩容和数据分片&#xff01;从 2.X 版本以后&#xff0c;主要分为 Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目。esjbo官网地址 Ela…...

VsCode的介绍和入门

目录 ​编辑 介绍 我应该切换到 VS Code 吗&#xff1f;为什么&#xff1f; 入门 Explorer 搜索 源代码控制 调试器 扩展 终点站 命令面板 主题 定制化 不错的配置选项 最适合编码的字体 工作空间 编辑 智能感知 代码格式化 错误和警告 键盘快捷键 键位图…...

C++:自创小游戏

欢迎来玩&#xff0c;每次都有不一样的结果。 长达142行。 #include<bits/stdc.h> #include<windows.h> #define random(a,b) (rand()%(b-a1)a) using namespace std; int main(){int n;cout<<"输1~10,越小越好,不告诉你有什么用&#xff0c;当然也可…...

网站做下载功能/郑州技术支持seo

前言函数系统定义函数聚合函数数学函数字符串函数时间日期函数 自定义函数标量值函数表值函数 后语 前言 学习数据库视频的时候&#xff0c;感觉函数的使用非常重要&#xff0c;所以对数据库函数进行了总结和分类&#xff01;加深一下自己的理解&#xff0c;也能让同学和自己在…...

做网站客户要求分期/全网热搜关键词排行榜

本系列文章介绍数据中心服务器部署前后整个过程需要考虑到的十个基础问题。第一部分《服务器部署十大问题系列一&#xff1a;软件与硬件》中&#xff0c;我们介绍了新增服务器对当前设施有哪些影响&#xff0c;以及软硬件方面需要考虑的问题。 本文为系列二&#xff0c;主要介绍…...

西安网站建设技术/百度上看了不健康的内容犯法吗

重复的随机数废话不多说&#xff0c;首先我们来看使用seed的一个很神奇的现象。func main() {for i : 0; i < 5; i {rand.Seed(time.Now().Unix())fmt.Println(rand.Intn(100))} }// 结果如下 // 90 // 90 // 90 // 90 // 90可能不熟悉seed用法的看到这里会很疑惑&#xff0…...

东莞企业建站收费产品推广/泉州关键词快速排名

我肯定会为每种请求类型添加一个新类.是的,您可能需要编写大量代码,但它会更安全.关键(对我来说)是谁会写这段代码&#xff1f;让我们把这个答案读到最后(或直接跳转到最后建议的选项).在这些例子中,我将使用Dictionary< string,string>对于通用对象,但您可能/应该使用适…...

网络网站首页设计/西安网站外包

此错误表示您的代码或您在应用程序中使用的任何外部库都在使用SLF4J库(一个开放源代码日志记录库)&#xff0c;但无法找到所需的JAR文件&#xff0c;例如slf4j-api-1.7.2.jar因此它是在线程“ main” java.lang.NoClassDefFoundError&#xff1a; org/slf4j/LoggerFactory 。 如…...

借贷网站建设/互联网推广是什么意思

hdfs常用命令&#xff0c;可查看博文 hdfs常用命令 //从本地加载数据到表,linux上的文件不会丢失&#xff0c;相当于是复制 ,这是 追加的模式 load data local inpath /data/log/1.txt into table employee; //从本地加载数据到表,linux上的文件不会丢失&#xff0c;相当于是复…...