...

Source file src/k8s.io/kubernetes/pkg/controller/podgc/gc_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/podgc

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package podgc
    18  
    19  import (
    20  	"context"
    21  	"sort"
    22  	"sync"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/labels"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/sets"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    33  	coreinformers "k8s.io/client-go/informers/core/v1"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	corelisters "k8s.io/client-go/listers/core/v1"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/util/workqueue"
    38  	"k8s.io/klog/v2"
    39  	apipod "k8s.io/kubernetes/pkg/api/v1/pod"
    40  	"k8s.io/kubernetes/pkg/controller/podgc/metrics"
    41  	"k8s.io/kubernetes/pkg/features"
    42  	"k8s.io/kubernetes/pkg/kubelet/eviction"
    43  	nodeutil "k8s.io/kubernetes/pkg/util/node"
    44  	utilpod "k8s.io/kubernetes/pkg/util/pod"
    45  	"k8s.io/kubernetes/pkg/util/taints"
    46  )
    47  
    48  const (
    49  	// gcCheckPeriod defines frequency of running main controller loop
    50  	gcCheckPeriod = 20 * time.Second
    51  	// quarantineTime defines how long Orphaned GC waits for nodes to show up
    52  	// in an informer before issuing a GET call to check if they are truly gone
    53  	quarantineTime = 40 * time.Second
    54  )
    55  
    56  type PodGCController struct {
    57  	kubeClient clientset.Interface
    58  
    59  	podLister        corelisters.PodLister
    60  	podListerSynced  cache.InformerSynced
    61  	nodeLister       corelisters.NodeLister
    62  	nodeListerSynced cache.InformerSynced
    63  
    64  	nodeQueue workqueue.DelayingInterface
    65  
    66  	terminatedPodThreshold int
    67  	gcCheckPeriod          time.Duration
    68  	quarantineTime         time.Duration
    69  }
    70  
    71  func NewPodGC(ctx context.Context, kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
    72  	nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int) *PodGCController {
    73  	return NewPodGCInternal(ctx, kubeClient, podInformer, nodeInformer, terminatedPodThreshold, gcCheckPeriod, quarantineTime)
    74  }
    75  
    76  // This function is only intended for integration tests
    77  func NewPodGCInternal(ctx context.Context, kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
    78  	nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int, gcCheckPeriod, quarantineTime time.Duration) *PodGCController {
    79  	gcc := &PodGCController{
    80  		kubeClient:             kubeClient,
    81  		terminatedPodThreshold: terminatedPodThreshold,
    82  		podLister:              podInformer.Lister(),
    83  		podListerSynced:        podInformer.Informer().HasSynced,
    84  		nodeLister:             nodeInformer.Lister(),
    85  		nodeListerSynced:       nodeInformer.Informer().HasSynced,
    86  		nodeQueue:              workqueue.NewNamedDelayingQueue("orphaned_pods_nodes"),
    87  		gcCheckPeriod:          gcCheckPeriod,
    88  		quarantineTime:         quarantineTime,
    89  	}
    90  
    91  	// Register prometheus metrics
    92  	metrics.RegisterMetrics()
    93  	return gcc
    94  }
    95  
    96  func (gcc *PodGCController) Run(ctx context.Context) {
    97  	logger := klog.FromContext(ctx)
    98  
    99  	defer utilruntime.HandleCrash()
   100  
   101  	logger.Info("Starting GC controller")
   102  	defer gcc.nodeQueue.ShutDown()
   103  	defer logger.Info("Shutting down GC controller")
   104  
   105  	if !cache.WaitForNamedCacheSync("GC", ctx.Done(), gcc.podListerSynced, gcc.nodeListerSynced) {
   106  		return
   107  	}
   108  
   109  	go wait.UntilWithContext(ctx, gcc.gc, gcc.gcCheckPeriod)
   110  
   111  	<-ctx.Done()
   112  }
   113  
   114  func (gcc *PodGCController) gc(ctx context.Context) {
   115  	pods, err := gcc.podLister.List(labels.Everything())
   116  	if err != nil {
   117  		klog.FromContext(ctx).Error(err, "Error while listing all pods")
   118  		return
   119  	}
   120  	nodes, err := gcc.nodeLister.List(labels.Everything())
   121  	if err != nil {
   122  		klog.FromContext(ctx).Error(err, "Error while listing all nodes")
   123  		return
   124  	}
   125  	if gcc.terminatedPodThreshold > 0 {
   126  		gcc.gcTerminated(ctx, pods)
   127  	}
   128  	gcc.gcTerminating(ctx, pods)
   129  	gcc.gcOrphaned(ctx, pods, nodes)
   130  	gcc.gcUnscheduledTerminating(ctx, pods)
   131  }
   132  
   133  func isPodTerminated(pod *v1.Pod) bool {
   134  	if phase := pod.Status.Phase; phase != v1.PodPending && phase != v1.PodRunning && phase != v1.PodUnknown {
   135  		return true
   136  	}
   137  	return false
   138  }
   139  
   140  // isPodTerminating returns true if the pod is terminating.
   141  func isPodTerminating(pod *v1.Pod) bool {
   142  	return pod.ObjectMeta.DeletionTimestamp != nil
   143  }
   144  
   145  func (gcc *PodGCController) gcTerminating(ctx context.Context, pods []*v1.Pod) {
   146  	logger := klog.FromContext(ctx)
   147  	logger.V(4).Info("GC'ing terminating pods that are on out-of-service nodes")
   148  	terminatingPods := []*v1.Pod{}
   149  	for _, pod := range pods {
   150  		if isPodTerminating(pod) {
   151  			node, err := gcc.nodeLister.Get(pod.Spec.NodeName)
   152  			if err != nil {
   153  				logger.Error(err, "Failed to get node", "node", klog.KRef("", pod.Spec.NodeName))
   154  				continue
   155  			}
   156  			// Add this pod to terminatingPods list only if the following conditions are met:
   157  			// 1. Node is not ready.
   158  			// 2. Node has `node.kubernetes.io/out-of-service` taint.
   159  			if !nodeutil.IsNodeReady(node) && taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService) {
   160  				logger.V(4).Info("Garbage collecting pod that is terminating", "pod", klog.KObj(pod), "phase", pod.Status.Phase)
   161  				terminatingPods = append(terminatingPods, pod)
   162  			}
   163  		}
   164  	}
   165  
   166  	deleteCount := len(terminatingPods)
   167  	if deleteCount == 0 {
   168  		return
   169  	}
   170  
   171  	logger.V(4).Info("Garbage collecting pods that are terminating on node tainted with node.kubernetes.io/out-of-service", "numPods", deleteCount)
   172  	// sort only when necessary
   173  	sort.Sort(byEvictionAndCreationTimestamp(terminatingPods))
   174  	var wait sync.WaitGroup
   175  	for i := 0; i < deleteCount; i++ {
   176  		wait.Add(1)
   177  		go func(pod *v1.Pod) {
   178  			defer wait.Done()
   179  			metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingOutOfService).Inc()
   180  			if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
   181  				// ignore not founds
   182  				utilruntime.HandleError(err)
   183  				metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingOutOfService).Inc()
   184  			}
   185  		}(terminatingPods[i])
   186  	}
   187  	wait.Wait()
   188  }
   189  
   190  func (gcc *PodGCController) gcTerminated(ctx context.Context, pods []*v1.Pod) {
   191  	terminatedPods := []*v1.Pod{}
   192  	for _, pod := range pods {
   193  		if isPodTerminated(pod) {
   194  			terminatedPods = append(terminatedPods, pod)
   195  		}
   196  	}
   197  
   198  	terminatedPodCount := len(terminatedPods)
   199  	deleteCount := terminatedPodCount - gcc.terminatedPodThreshold
   200  
   201  	if deleteCount <= 0 {
   202  		return
   203  	}
   204  
   205  	logger := klog.FromContext(ctx)
   206  	logger.Info("Garbage collecting pods", "numPods", deleteCount)
   207  	// sort only when necessary
   208  	sort.Sort(byEvictionAndCreationTimestamp(terminatedPods))
   209  	var wait sync.WaitGroup
   210  	for i := 0; i < deleteCount; i++ {
   211  		wait.Add(1)
   212  		go func(pod *v1.Pod) {
   213  			defer wait.Done()
   214  			if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
   215  				// ignore not founds
   216  				defer utilruntime.HandleError(err)
   217  				metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminated).Inc()
   218  			}
   219  			metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminated).Inc()
   220  		}(terminatedPods[i])
   221  	}
   222  	wait.Wait()
   223  }
   224  
   225  // gcOrphaned deletes pods that are bound to nodes that don't exist.
   226  func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, nodes []*v1.Node) {
   227  	logger := klog.FromContext(ctx)
   228  	logger.V(4).Info("GC'ing orphaned")
   229  	existingNodeNames := sets.NewString()
   230  	for _, node := range nodes {
   231  		existingNodeNames.Insert(node.Name)
   232  	}
   233  	// Add newly found unknown nodes to quarantine
   234  	for _, pod := range pods {
   235  		if pod.Spec.NodeName != "" && !existingNodeNames.Has(pod.Spec.NodeName) {
   236  			gcc.nodeQueue.AddAfter(pod.Spec.NodeName, gcc.quarantineTime)
   237  		}
   238  	}
   239  	// Check if nodes are still missing after quarantine period
   240  	deletedNodesNames, quit := gcc.discoverDeletedNodes(ctx, existingNodeNames)
   241  	if quit {
   242  		return
   243  	}
   244  	// Delete orphaned pods
   245  	for _, pod := range pods {
   246  		if !deletedNodesNames.Has(pod.Spec.NodeName) {
   247  			continue
   248  		}
   249  		logger.V(2).Info("Found orphaned Pod assigned to the Node, deleting", "pod", klog.KObj(pod), "node", klog.KRef("", pod.Spec.NodeName))
   250  		condition := &v1.PodCondition{
   251  			Type:    v1.DisruptionTarget,
   252  			Status:  v1.ConditionTrue,
   253  			Reason:  "DeletionByPodGC",
   254  			Message: "PodGC: node no longer exists",
   255  		}
   256  		if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil {
   257  			utilruntime.HandleError(err)
   258  			metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc()
   259  		} else {
   260  			logger.Info("Forced deletion of orphaned Pod succeeded", "pod", klog.KObj(pod))
   261  		}
   262  		metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonOrphaned).Inc()
   263  	}
   264  }
   265  
   266  func (gcc *PodGCController) discoverDeletedNodes(ctx context.Context, existingNodeNames sets.String) (sets.String, bool) {
   267  	deletedNodesNames := sets.NewString()
   268  	for gcc.nodeQueue.Len() > 0 {
   269  		item, quit := gcc.nodeQueue.Get()
   270  		if quit {
   271  			return nil, true
   272  		}
   273  		nodeName := item.(string)
   274  		if !existingNodeNames.Has(nodeName) {
   275  			exists, err := gcc.checkIfNodeExists(ctx, nodeName)
   276  			switch {
   277  			case err != nil:
   278  				klog.FromContext(ctx).Error(err, "Error while getting node", "node", klog.KRef("", nodeName))
   279  				// Node will be added back to the queue in the subsequent loop if still needed
   280  			case !exists:
   281  				deletedNodesNames.Insert(nodeName)
   282  			}
   283  		}
   284  		gcc.nodeQueue.Done(item)
   285  	}
   286  	return deletedNodesNames, false
   287  }
   288  
   289  func (gcc *PodGCController) checkIfNodeExists(ctx context.Context, name string) (bool, error) {
   290  	_, fetchErr := gcc.kubeClient.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
   291  	if errors.IsNotFound(fetchErr) {
   292  		return false, nil
   293  	}
   294  	return fetchErr == nil, fetchErr
   295  }
   296  
   297  // gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node.
   298  func (gcc *PodGCController) gcUnscheduledTerminating(ctx context.Context, pods []*v1.Pod) {
   299  	logger := klog.FromContext(ctx)
   300  	logger.V(4).Info("GC'ing unscheduled pods which are terminating")
   301  
   302  	for _, pod := range pods {
   303  		if pod.DeletionTimestamp == nil || len(pod.Spec.NodeName) > 0 {
   304  			continue
   305  		}
   306  
   307  		logger.V(2).Info("Found unscheduled terminating Pod not assigned to any Node, deleting", "pod", klog.KObj(pod))
   308  		if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil {
   309  			utilruntime.HandleError(err)
   310  			metrics.DeletingPodsErrorTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingUnscheduled).Inc()
   311  		} else {
   312  			logger.Info("Forced deletion of unscheduled terminating Pod succeeded", "pod", klog.KObj(pod))
   313  		}
   314  		metrics.DeletingPodsTotal.WithLabelValues(pod.Namespace, metrics.PodGCReasonTerminatingUnscheduled).Inc()
   315  	}
   316  }
   317  
   318  // byEvictionAndCreationTimestamp sorts a list by Evicted status and then creation timestamp,
   319  // using their names as a tie breaker.
   320  // Evicted pods will be deleted first to avoid impact on terminated pods created by controllers.
   321  type byEvictionAndCreationTimestamp []*v1.Pod
   322  
   323  func (o byEvictionAndCreationTimestamp) Len() int      { return len(o) }
   324  func (o byEvictionAndCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
   325  
   326  func (o byEvictionAndCreationTimestamp) Less(i, j int) bool {
   327  	iEvicted, jEvicted := eviction.PodIsEvicted(o[i].Status), eviction.PodIsEvicted(o[j].Status)
   328  	// Evicted pod is smaller
   329  	if iEvicted != jEvicted {
   330  		return iEvicted
   331  	}
   332  	if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
   333  		return o[i].Name < o[j].Name
   334  	}
   335  	return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
   336  }
   337  
   338  func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.Pod) error {
   339  	return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil)
   340  }
   341  
   342  func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error {
   343  	logger := klog.FromContext(ctx)
   344  	logger.Info("PodGC is force deleting Pod", "pod", klog.KObj(pod))
   345  	// Patch the pod to make sure it is transitioned to the Failed phase before deletion.
   346  	// This is needed for the JobPodReplacementPolicy feature to make sure Job replacement pods are created.
   347  	// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/3939-allow-replacement-when-fully-terminated#risks-and-mitigations
   348  	// for more details.
   349  	if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) || utilfeature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
   350  
   351  		// Mark the pod as failed - this is especially important in case the pod
   352  		// is orphaned, in which case the pod would remain in the Running phase
   353  		// forever as there is no kubelet running to change the phase.
   354  		if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed {
   355  			newStatus := pod.Status.DeepCopy()
   356  			newStatus.Phase = v1.PodFailed
   357  			if condition != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
   358  				apipod.UpdatePodCondition(newStatus, condition)
   359  			}
   360  			if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
   361  				return err
   362  			}
   363  		}
   364  	}
   365  	return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0))
   366  }
   367  

View as plain text