跳到主要内容

09.waitJob

背景

将代码构建成一个镜像一般会经过如下几个步骤:拉取代码到本地,安装语言环境,使用docker构建镜像,一般会在一个job中执行这些步骤,我们将这个job称作构建镜像。那如何监听job已经开始运行,如何监听job运行结束呢?

我们来看看同事写的代码

for {
pod, err = k.getJobPod(ctx, namespace, jobName)
if err != nil {
log.Error("error", err)
return content, err
}
if pod != nil && pod.Status.Phase == "Running" {
break
}
if iCount > 10 {
return content, err
}
time.Sleep(1 * time.Second)
iCount++
}

func (k *K8S) getJobPod(ctx context.Context, namespace, jobName *string) (*coreV1.Pod, error) {
pods, err := k.client.CoreV1().Pods(*namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set{"job-name": *jobName}.String(),
})
if err != nil {
log.Error("error", err)
return nil, err
}
if pods.Items == nil || len(pods.Items) == 0 {
return nil, nil
}
return k.client.CoreV1().Pods(*namespace).Get(ctx, pods.Items[0].Name, metav1.GetOptions{})
}

我想大部分人第一次写的代码可能和这个差不多,那上面这段代码有什么毛病呢?首先他默认job只有一个pod,其次他没有超时设置,最关键的是他这个代码拓展性不高。那我们如何改造这段代码呢?最好的办法是去官方库找答案!

重构

在官方库搜索 WaitForJob 这个关键词,我们能搜索到下面的代码:

// WaitForJobPodsRunning wait for all pods for the Job named JobName in namespace ns to become Running.  Only use
// when pods will run for a long time, or it will be racy.
func WaitForJobPodsRunning(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32) error {
return waitForJobPodsInPhase(ctx, c, ns, jobName, expectedCount, v1.PodRunning)
}

// waitForJobPodsInPhase wait for all pods for the Job named JobName in namespace ns to be in a given phase.
func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32, phase v1.PodPhase) error {
return wait.PollWithContext(ctx, framework.Poll, JobTimeout, func(ctx context.Context) (bool, error) {
pods, err := GetJobPods(ctx, c, ns, jobName)
if err != nil {
return false, err
}
count := int32(0)
for _, p := range pods.Items {
if p.Status.Phase == phase {
count++
}
}
return count == expectedCount, nil
})
}

哈哈哈,得来全不费工夫,我们只要在代码中调用 WaitForJobPodsRunning(ctx, k.client, namespace, jobName, 1) 就行,最后一个参数 1 代表你期望 job中 pod 处于Running状态的数量,我们可以看到,超时时间在 调用 wait.PollWithContext 函数时默认帮我们传了 15min ,同时还带有指数重试机制。

GPT 给出的解释

这两个函数属于Kubernetes的客户端代码,用于检查指定的Job是否有期望数量的Pods已经处于运行状态。下面是对这段代码的详细解释:

WaitForJobPodsRunning函数:这个函数接收一个上下文(ctx),Kubernetes的客户端接口(c),命名空间(ns),Job的名称(jobName),以及期望的Pods数量(expectedCount)。这个函数主要调用了waitForJobPodsInPhase函数,将v1.PodRunning作为期望的Pods阶段。

waitForJobPodsInPhase函数:这个函数也接收一个上下文(ctx),Kubernetes的客户端接口(c),命名空间(ns),Job的名称(jobName),期望的Pods数量(expectedCount),以及期望的Pods阶段(phase)。这个函数使用了Kubernetes的wait.PollWithContext方法,定期检查Job的Pods是否都已经处于期望的阶段。如果所有Pods都处于期望的阶段,那么函数就会返回true;否则,它会继续等待,直到超时或者满足条件。

wait.PollWithContext的第一个参数是上下文,第二个参数是轮询的间隔时间,第三个参数是超时时间,第四个参数是一个函数,这个函数在每次轮询的时候都会被调用。

在这个函数中,它首先获取Job的所有Pods,然后遍历所有Pods,检查它们的阶段是否和期望的阶段相同。如果相同,就增加计数器。最后,如果计数器的值和期望的Pods数量相同,那么函数就返回true,表示所有Pods都已经处于期望的阶段。如果计数器的值和期望的Pods数量不同,那么函数就返回false,表示还有一些Pods没有处于期望的阶段,需要继续等待。

拓展

试着去看看源码中其他 WaitForJob 函数吧,比如等待 job finish