...

Source file src/k8s.io/kubernetes/pkg/kubelet/lifecycle/predicate.go

Documentation: k8s.io/kubernetes/pkg/kubelet/lifecycle

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package lifecycle
    18  
    19  import (
    20  	"fmt"
    21  	"runtime"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    25  	"k8s.io/component-helpers/scheduling/corev1"
    26  	"k8s.io/klog/v2"
    27  	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    28  	"k8s.io/kubernetes/pkg/features"
    29  	"k8s.io/kubernetes/pkg/kubelet/types"
    30  	"k8s.io/kubernetes/pkg/scheduler"
    31  	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
    32  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
    33  )
    34  
    35  type getNodeAnyWayFuncType func() (*v1.Node, error)
    36  
    37  type pluginResourceUpdateFuncType func(*schedulerframework.NodeInfo, *PodAdmitAttributes) error
    38  
    39  // AdmissionFailureHandler is an interface which defines how to deal with a failure to admit a pod.
    40  // This allows for the graceful handling of pod admission failure.
    41  type AdmissionFailureHandler interface {
    42  	HandleAdmissionFailure(admitPod *v1.Pod, failureReasons []PredicateFailureReason) ([]PredicateFailureReason, error)
    43  }
    44  
    45  type predicateAdmitHandler struct {
    46  	getNodeAnyWayFunc        getNodeAnyWayFuncType
    47  	pluginResourceUpdateFunc pluginResourceUpdateFuncType
    48  	admissionFailureHandler  AdmissionFailureHandler
    49  }
    50  
    51  var _ PodAdmitHandler = &predicateAdmitHandler{}
    52  
    53  // NewPredicateAdmitHandler returns a PodAdmitHandler which is used to evaluates
    54  // if a pod can be admitted from the perspective of predicates.
    55  func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, admissionFailureHandler AdmissionFailureHandler, pluginResourceUpdateFunc pluginResourceUpdateFuncType) PodAdmitHandler {
    56  	return &predicateAdmitHandler{
    57  		getNodeAnyWayFunc,
    58  		pluginResourceUpdateFunc,
    59  		admissionFailureHandler,
    60  	}
    61  }
    62  
    63  func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult {
    64  	node, err := w.getNodeAnyWayFunc()
    65  	if err != nil {
    66  		klog.ErrorS(err, "Cannot get Node info")
    67  		return PodAdmitResult{
    68  			Admit:   false,
    69  			Reason:  "InvalidNodeInfo",
    70  			Message: "Kubelet cannot get node info.",
    71  		}
    72  	}
    73  	admitPod := attrs.Pod
    74  
    75  	// perform the checks that preemption will not help first to avoid meaningless pod eviction
    76  	if rejectPodAdmissionBasedOnOSSelector(admitPod, node) {
    77  		return PodAdmitResult{
    78  			Admit:   false,
    79  			Reason:  "PodOSSelectorNodeLabelDoesNotMatch",
    80  			Message: "Failed to admit pod as the `kubernetes.io/os` label doesn't match node label",
    81  		}
    82  	}
    83  	if rejectPodAdmissionBasedOnOSField(admitPod) {
    84  		return PodAdmitResult{
    85  			Admit:   false,
    86  			Reason:  "PodOSNotSupported",
    87  			Message: "Failed to admit pod as the OS field doesn't match node OS",
    88  		}
    89  	}
    90  
    91  	pods := attrs.OtherPods
    92  	nodeInfo := schedulerframework.NewNodeInfo(pods...)
    93  	nodeInfo.SetNode(node)
    94  
    95  	// TODO: Remove this after the SidecarContainers feature gate graduates to GA.
    96  	if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
    97  		for _, c := range admitPod.Spec.InitContainers {
    98  			if types.IsRestartableInitContainer(&c) {
    99  				message := fmt.Sprintf("Init container %q may not have a non-default restartPolicy", c.Name)
   100  				klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
   101  				return PodAdmitResult{
   102  					Admit:   false,
   103  					Reason:  "InitContainerRestartPolicyForbidden",
   104  					Message: message,
   105  				}
   106  			}
   107  		}
   108  	}
   109  
   110  	// ensure the node has enough plugin resources for that required in pods
   111  	if err = w.pluginResourceUpdateFunc(nodeInfo, attrs); err != nil {
   112  		message := fmt.Sprintf("Update plugin resources failed due to %v, which is unexpected.", err)
   113  		klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "message", message)
   114  		return PodAdmitResult{
   115  			Admit:   false,
   116  			Reason:  "UnexpectedAdmissionError",
   117  			Message: message,
   118  		}
   119  	}
   120  
   121  	// Remove the requests of the extended resources that are missing in the
   122  	// node info. This is required to support cluster-level resources, which
   123  	// are extended resources unknown to nodes.
   124  	//
   125  	// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
   126  	// node-level extended resource it requires is not found, then kubelet will
   127  	// not fail admission while it should. This issue will be addressed with
   128  	// the Resource Class API in the future.
   129  	podWithoutMissingExtendedResources := removeMissingExtendedResources(admitPod, nodeInfo)
   130  
   131  	reasons := generalFilter(podWithoutMissingExtendedResources, nodeInfo)
   132  	fit := len(reasons) == 0
   133  	if !fit {
   134  		reasons, err = w.admissionFailureHandler.HandleAdmissionFailure(admitPod, reasons)
   135  		fit = len(reasons) == 0 && err == nil
   136  		if err != nil {
   137  			message := fmt.Sprintf("Unexpected error while attempting to recover from admission failure: %v", err)
   138  			klog.InfoS("Failed to admit pod, unexpected error while attempting to recover from admission failure", "pod", klog.KObj(admitPod), "err", err)
   139  			return PodAdmitResult{
   140  				Admit:   fit,
   141  				Reason:  "UnexpectedAdmissionError",
   142  				Message: message,
   143  			}
   144  		}
   145  	}
   146  	if !fit {
   147  		var reason string
   148  		var message string
   149  		if len(reasons) == 0 {
   150  			message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
   151  			klog.InfoS("Failed to admit pod: GeneralPredicates failed due to unknown reason, which is unexpected", "pod", klog.KObj(admitPod))
   152  			return PodAdmitResult{
   153  				Admit:   fit,
   154  				Reason:  "UnknownReason",
   155  				Message: message,
   156  			}
   157  		}
   158  		// If there are failed predicates, we only return the first one as a reason.
   159  		r := reasons[0]
   160  		switch re := r.(type) {
   161  		case *PredicateFailureError:
   162  			reason = re.PredicateName
   163  			message = re.Error()
   164  			klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
   165  		case *InsufficientResourceError:
   166  			reason = fmt.Sprintf("OutOf%s", re.ResourceName)
   167  			message = re.Error()
   168  			klog.V(2).InfoS("Predicate failed on Pod", "pod", klog.KObj(admitPod), "err", message)
   169  		default:
   170  			reason = "UnexpectedPredicateFailureType"
   171  			message = fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
   172  			klog.InfoS("Failed to admit pod", "pod", klog.KObj(admitPod), "err", message)
   173  		}
   174  		return PodAdmitResult{
   175  			Admit:   fit,
   176  			Reason:  reason,
   177  			Message: message,
   178  		}
   179  	}
   180  	return PodAdmitResult{
   181  		Admit: true,
   182  	}
   183  }
   184  
   185  // rejectPodAdmissionBasedOnOSSelector rejects pod if it's nodeSelector doesn't match
   186  // We expect the kubelet status reconcile which happens every 10sec to update the node labels if there is a mismatch.
   187  func rejectPodAdmissionBasedOnOSSelector(pod *v1.Pod, node *v1.Node) bool {
   188  	labels := node.Labels
   189  	osName, osLabelExists := labels[v1.LabelOSStable]
   190  	if !osLabelExists || osName != runtime.GOOS {
   191  		if len(labels) == 0 {
   192  			labels = make(map[string]string)
   193  		}
   194  		labels[v1.LabelOSStable] = runtime.GOOS
   195  	}
   196  	podLabelSelector, podOSLabelExists := pod.Labels[v1.LabelOSStable]
   197  	if !podOSLabelExists {
   198  		// If the labelselector didn't exist, let's keep the current behavior as is
   199  		return false
   200  	} else if podOSLabelExists && podLabelSelector != labels[v1.LabelOSStable] {
   201  		return true
   202  	}
   203  	return false
   204  }
   205  
   206  // rejectPodAdmissionBasedOnOSField rejects pods if their OS field doesn't match runtime.GOOS.
   207  // TODO: Relax this restriction when we start supporting LCOW in kubernetes where podOS may not match
   208  // node's OS.
   209  func rejectPodAdmissionBasedOnOSField(pod *v1.Pod) bool {
   210  	if pod.Spec.OS == nil {
   211  		return false
   212  	}
   213  	// If the pod OS doesn't match runtime.GOOS return false
   214  	return string(pod.Spec.OS.Name) != runtime.GOOS
   215  }
   216  
   217  func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) *v1.Pod {
   218  	podCopy := pod.DeepCopy()
   219  	for i, c := range pod.Spec.Containers {
   220  		// We only handle requests in Requests but not Limits because the
   221  		// PodFitsResources predicate, to which the result pod will be passed,
   222  		// does not use Limits.
   223  		podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList)
   224  		for rName, rQuant := range c.Resources.Requests {
   225  			if v1helper.IsExtendedResourceName(rName) {
   226  				if _, found := nodeInfo.Allocatable.ScalarResources[rName]; !found {
   227  					continue
   228  				}
   229  			}
   230  			podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant
   231  		}
   232  	}
   233  	return podCopy
   234  }
   235  
   236  // InsufficientResourceError is an error type that indicates what kind of resource limit is
   237  // hit and caused the unfitting failure.
   238  type InsufficientResourceError struct {
   239  	ResourceName v1.ResourceName
   240  	Requested    int64
   241  	Used         int64
   242  	Capacity     int64
   243  }
   244  
   245  func (e *InsufficientResourceError) Error() string {
   246  	return fmt.Sprintf("Node didn't have enough resource: %s, requested: %d, used: %d, capacity: %d",
   247  		e.ResourceName, e.Requested, e.Used, e.Capacity)
   248  }
   249  
   250  // PredicateFailureReason interface represents the failure reason of a predicate.
   251  type PredicateFailureReason interface {
   252  	GetReason() string
   253  }
   254  
   255  // GetReason returns the reason of the InsufficientResourceError.
   256  func (e *InsufficientResourceError) GetReason() string {
   257  	return fmt.Sprintf("Insufficient %v", e.ResourceName)
   258  }
   259  
   260  // GetInsufficientAmount returns the amount of the insufficient resource of the error.
   261  func (e *InsufficientResourceError) GetInsufficientAmount() int64 {
   262  	return e.Requested - (e.Capacity - e.Used)
   263  }
   264  
   265  // PredicateFailureError describes a failure error of predicate.
   266  type PredicateFailureError struct {
   267  	PredicateName string
   268  	PredicateDesc string
   269  }
   270  
   271  func (e *PredicateFailureError) Error() string {
   272  	return fmt.Sprintf("Predicate %s failed", e.PredicateName)
   273  }
   274  
   275  // GetReason returns the reason of the PredicateFailureError.
   276  func (e *PredicateFailureError) GetReason() string {
   277  	return e.PredicateDesc
   278  }
   279  
   280  // generalFilter checks a group of filterings that the kubelet cares about.
   281  func generalFilter(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) []PredicateFailureReason {
   282  	admissionResults := scheduler.AdmissionCheck(pod, nodeInfo, true)
   283  	var reasons []PredicateFailureReason
   284  	for _, r := range admissionResults {
   285  		if r.InsufficientResource != nil {
   286  			reasons = append(reasons, &InsufficientResourceError{
   287  				ResourceName: r.InsufficientResource.ResourceName,
   288  				Requested:    r.InsufficientResource.Requested,
   289  				Used:         r.InsufficientResource.Used,
   290  				Capacity:     r.InsufficientResource.Capacity,
   291  			})
   292  		} else {
   293  			reasons = append(reasons, &PredicateFailureError{r.Name, r.Reason})
   294  		}
   295  	}
   296  
   297  	// Check taint/toleration except for static pods
   298  	if !types.IsStaticPod(pod) {
   299  		_, isUntolerated := corev1.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
   300  			// Kubelet is only interested in the NoExecute taint.
   301  			return t.Effect == v1.TaintEffectNoExecute
   302  		})
   303  		if isUntolerated {
   304  			reasons = append(reasons, &PredicateFailureError{tainttoleration.Name, tainttoleration.ErrReasonNotMatch})
   305  		}
   306  	}
   307  
   308  	return reasons
   309  }
   310  

View as plain text