...

Source file src/k8s.io/kubernetes/pkg/kubelet/runonce.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"os"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/klog/v2"
    27  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    28  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    29  	"k8s.io/kubernetes/pkg/kubelet/util/format"
    30  )
    31  
    32  const (
    33  	runOnceManifestDelay     = 1 * time.Second
    34  	runOnceMaxRetries        = 10
    35  	runOnceRetryDelay        = 1 * time.Second
    36  	runOnceRetryDelayBackoff = 2
    37  )
    38  
    39  // RunPodResult defines the running results of a Pod.
    40  type RunPodResult struct {
    41  	Pod *v1.Pod
    42  	Err error
    43  }
    44  
    45  // RunOnce polls from one configuration update and run the associated pods.
    46  func (kl *Kubelet) RunOnce(updates <-chan kubetypes.PodUpdate) ([]RunPodResult, error) {
    47  	ctx := context.Background()
    48  	// Setup filesystem directories.
    49  	if err := kl.setupDataDirs(); err != nil {
    50  		return nil, err
    51  	}
    52  
    53  	// If the container logs directory does not exist, create it.
    54  	if _, err := os.Stat(ContainerLogsDir); err != nil {
    55  		if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
    56  			klog.ErrorS(err, "Failed to create directory", "path", ContainerLogsDir)
    57  		}
    58  	}
    59  
    60  	select {
    61  	case u := <-updates:
    62  		klog.InfoS("Processing manifest with pods", "numPods", len(u.Pods))
    63  		result, err := kl.runOnce(ctx, u.Pods, runOnceRetryDelay)
    64  		klog.InfoS("Finished processing pods", "numPods", len(u.Pods))
    65  		return result, err
    66  	case <-time.After(runOnceManifestDelay):
    67  		return nil, fmt.Errorf("no pod manifest update after %v", runOnceManifestDelay)
    68  	}
    69  }
    70  
    71  // runOnce runs a given set of pods and returns their status.
    72  func (kl *Kubelet) runOnce(ctx context.Context, pods []*v1.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
    73  	ch := make(chan RunPodResult)
    74  	admitted := []*v1.Pod{}
    75  	for _, pod := range pods {
    76  		// Check if we can admit the pod.
    77  		if ok, reason, message := kl.canAdmitPod(admitted, pod); !ok {
    78  			kl.rejectPod(pod, reason, message)
    79  			results = append(results, RunPodResult{pod, nil})
    80  			continue
    81  		}
    82  
    83  		admitted = append(admitted, pod)
    84  		go func(pod *v1.Pod) {
    85  			err := kl.runPod(ctx, pod, retryDelay)
    86  			ch <- RunPodResult{pod, err}
    87  		}(pod)
    88  	}
    89  
    90  	klog.InfoS("Waiting for pods", "numPods", len(admitted))
    91  	failedPods := []string{}
    92  	for i := 0; i < len(admitted); i++ {
    93  		res := <-ch
    94  		results = append(results, res)
    95  		if res.Err != nil {
    96  			failedContainerName, err := kl.getFailedContainers(ctx, res.Pod)
    97  			if err != nil {
    98  				klog.InfoS("Unable to get failed containers' names for pod", "pod", klog.KObj(res.Pod), "err", err)
    99  			} else {
   100  				klog.InfoS("Unable to start pod because container failed", "pod", klog.KObj(res.Pod), "containerName", failedContainerName)
   101  			}
   102  			failedPods = append(failedPods, format.Pod(res.Pod))
   103  		} else {
   104  			klog.InfoS("Started pod", "pod", klog.KObj(res.Pod))
   105  		}
   106  	}
   107  	if len(failedPods) > 0 {
   108  		return results, fmt.Errorf("error running pods: %v", failedPods)
   109  	}
   110  	klog.InfoS("Pods started", "numPods", len(pods))
   111  	return results, err
   112  }
   113  
   114  // runPod runs a single pod and waits until all containers are running.
   115  func (kl *Kubelet) runPod(ctx context.Context, pod *v1.Pod, retryDelay time.Duration) error {
   116  	var isTerminal bool
   117  	delay := retryDelay
   118  	retry := 0
   119  	for !isTerminal {
   120  		status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
   121  		if err != nil {
   122  			return fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
   123  		}
   124  
   125  		if kl.isPodRunning(pod, status) {
   126  			klog.InfoS("Pod's containers running", "pod", klog.KObj(pod))
   127  			return nil
   128  		}
   129  		klog.InfoS("Pod's containers not running: syncing", "pod", klog.KObj(pod))
   130  
   131  		klog.InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
   132  		if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
   133  			klog.ErrorS(err, "Failed creating a mirror pod", "pod", klog.KObj(pod))
   134  		}
   135  		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
   136  		if isTerminal, err = kl.SyncPod(ctx, kubetypes.SyncPodUpdate, pod, mirrorPod, status); err != nil {
   137  			return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
   138  		}
   139  		if retry >= runOnceMaxRetries {
   140  			return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries)
   141  		}
   142  		// TODO(proppy): health checking would be better than waiting + checking the state at the next iteration.
   143  		klog.InfoS("Pod's containers synced, waiting", "pod", klog.KObj(pod), "duration", delay)
   144  		time.Sleep(delay)
   145  		retry++
   146  		delay *= runOnceRetryDelayBackoff
   147  	}
   148  	return nil
   149  }
   150  
   151  // isPodRunning returns true if all containers of a manifest are running.
   152  func (kl *Kubelet) isPodRunning(pod *v1.Pod, status *kubecontainer.PodStatus) bool {
   153  	for _, c := range pod.Spec.Containers {
   154  		cs := status.FindContainerStatusByName(c.Name)
   155  		if cs == nil || cs.State != kubecontainer.ContainerStateRunning {
   156  			klog.InfoS("Container not running", "pod", klog.KObj(pod), "containerName", c.Name)
   157  			return false
   158  		}
   159  	}
   160  	return true
   161  }
   162  
   163  // getFailedContainer returns failed container name for pod.
   164  func (kl *Kubelet) getFailedContainers(ctx context.Context, pod *v1.Pod) ([]string, error) {
   165  	status, err := kl.containerRuntime.GetPodStatus(ctx, pod.UID, pod.Name, pod.Namespace)
   166  	if err != nil {
   167  		return nil, fmt.Errorf("unable to get status for pod %q: %v", format.Pod(pod), err)
   168  	}
   169  	var containerNames []string
   170  	for _, cs := range status.ContainerStatuses {
   171  		if cs.State != kubecontainer.ContainerStateRunning && cs.ExitCode != 0 {
   172  			containerNames = append(containerNames, cs.Name)
   173  		}
   174  	}
   175  	return containerNames, nil
   176  }
   177  

View as plain text