...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/types.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package framework
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"sort"
    23  	"strings"
    24  	"sync/atomic"
    25  	"time"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    31  	"k8s.io/apimachinery/pkg/util/sets"
    32  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    33  	"k8s.io/klog/v2"
    34  
    35  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    36  	resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
    37  	"k8s.io/kubernetes/pkg/features"
    38  	schedutil "k8s.io/kubernetes/pkg/scheduler/util"
    39  )
    40  
    41  var generation int64
    42  
    43  // ActionType is an integer to represent one type of resource change.
    44  // Different ActionTypes can be bit-wised to compose new semantics.
    45  type ActionType int64
    46  
    47  // Constants for ActionTypes.
    48  const (
    49  	Add    ActionType = 1 << iota // 1
    50  	Delete                        // 10
    51  	// UpdateNodeXYZ is only applicable for Node events.
    52  	UpdateNodeAllocatable // 100
    53  	UpdateNodeLabel       // 1000
    54  	UpdateNodeTaint       // 10000
    55  	UpdateNodeCondition   // 100000
    56  	UpdateNodeAnnotation  // 1000000
    57  
    58  	All ActionType = 1<<iota - 1 // 1111111
    59  
    60  	// Use the general Update type if you don't either know or care the specific sub-Update type to use.
    61  	Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation
    62  )
    63  
    64  // GVK is short for group/version/kind, which can uniquely represent a particular API resource.
    65  type GVK string
    66  
    67  // Constants for GVKs.
    68  const (
    69  	// There are a couple of notes about how the scheduler notifies the events of Pods:
    70  	// - Add: add events could be triggered by either a newly created Pod or an existing Pod that is scheduled to a Node.
    71  	// - Delete: delete events could be triggered by:
    72  	//           - a Pod that is deleted
    73  	//           - a Pod that was assumed, but gets un-assumed due to some errors in the binding cycle.
    74  	//           - an existing Pod that was unscheduled but gets scheduled to a Node.
    75  	Pod GVK = "Pod"
    76  	// A note about NodeAdd event and UpdateNodeTaint event:
    77  	// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
    78  	// It's definitely not something expected for plugin developers,
    79  	// and registering UpdateNodeTaint event is the only mitigation for now.
    80  	// So, kube-scheduler registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
    81  	// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuck in the
    82  	// unschedulable pod pool.
    83  	// This behavior will be removed when we remove the preCheck feature.
    84  	// See: https://github.com/kubernetes/kubernetes/issues/110175
    85  	Node                    GVK = "Node"
    86  	PersistentVolume        GVK = "PersistentVolume"
    87  	PersistentVolumeClaim   GVK = "PersistentVolumeClaim"
    88  	CSINode                 GVK = "storage.k8s.io/CSINode"
    89  	CSIDriver               GVK = "storage.k8s.io/CSIDriver"
    90  	CSIStorageCapacity      GVK = "storage.k8s.io/CSIStorageCapacity"
    91  	StorageClass            GVK = "storage.k8s.io/StorageClass"
    92  	PodSchedulingContext    GVK = "PodSchedulingContext"
    93  	ResourceClaim           GVK = "ResourceClaim"
    94  	ResourceClass           GVK = "ResourceClass"
    95  	ResourceClaimParameters GVK = "ResourceClaimParameters"
    96  	ResourceClassParameters GVK = "ResourceClassParameters"
    97  
    98  	// WildCard is a special GVK to match all resources.
    99  	// e.g., If you register `{Resource: "*", ActionType: All}` in EventsToRegister,
   100  	// all coming clusterEvents will be admitted. Be careful to register it, it will
   101  	// increase the computing pressure in requeueing unless you really need it.
   102  	//
   103  	// Meanwhile, if the coming clusterEvent is a wildcard one, all pods
   104  	// will be moved from unschedulablePod pool to activeQ/backoffQ forcibly.
   105  	WildCard GVK = "*"
   106  )
   107  
   108  type ClusterEventWithHint struct {
   109  	Event ClusterEvent
   110  	// QueueingHintFn is executed for the plugin rejected by this plugin when the above Event happens,
   111  	// and filters out events to reduce useless retry of Pod's scheduling.
   112  	// It's an optional field. If not set,
   113  	// the scheduling of Pods will be always retried with backoff when this Event happens.
   114  	// (the same as Queue)
   115  	QueueingHintFn QueueingHintFn
   116  }
   117  
   118  // QueueingHintFn returns a hint that signals whether the event can make a Pod,
   119  // which was rejected by this plugin in the past scheduling cycle, schedulable or not.
   120  // It's called before a Pod gets moved from unschedulableQ to backoffQ or activeQ.
   121  // If it returns an error, we'll take the returned QueueingHint as `Queue` at the caller whatever we returned here so that
   122  // we can prevent the Pod from being stuck in the unschedulable pod pool.
   123  //
   124  // - `pod`: the Pod to be enqueued, which is rejected by this plugin in the past.
   125  // - `oldObj` `newObj`: the object involved in that event.
   126  //   - For example, the given event is "Node deleted", the `oldObj` will be that deleted Node.
   127  //   - `oldObj` is nil if the event is add event.
   128  //   - `newObj` is nil if the event is delete event.
   129  type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (QueueingHint, error)
   130  
   131  type QueueingHint int
   132  
   133  const (
   134  	// QueueSkip implies that the cluster event has no impact on
   135  	// scheduling of the pod.
   136  	QueueSkip QueueingHint = iota
   137  
   138  	// Queue implies that the Pod may be schedulable by the event.
   139  	Queue
   140  )
   141  
   142  func (s QueueingHint) String() string {
   143  	switch s {
   144  	case QueueSkip:
   145  		return "QueueSkip"
   146  	case Queue:
   147  		return "Queue"
   148  	}
   149  	return ""
   150  }
   151  
   152  // ClusterEvent abstracts how a system resource's state gets changed.
   153  // Resource represents the standard API resources such as Pod, Node, etc.
   154  // ActionType denotes the specific change such as Add, Update or Delete.
   155  type ClusterEvent struct {
   156  	Resource   GVK
   157  	ActionType ActionType
   158  	Label      string
   159  }
   160  
   161  // IsWildCard returns true if ClusterEvent follows WildCard semantics
   162  func (ce ClusterEvent) IsWildCard() bool {
   163  	return ce.Resource == WildCard && ce.ActionType == All
   164  }
   165  
   166  // Match returns true if ClusterEvent is matched with the coming event.
   167  // If the ce.Resource is "*", there's no requirement for the coming event' Resource.
   168  // Contrarily, if the coming event's Resource is "*", the ce.Resource should only be "*".
   169  //
   170  // Note: we have a special case here when the coming event is a wildcard event,
   171  // it will force all Pods to move to activeQ/backoffQ,
   172  // but we take it as an unmatched event unless the ce is also a wildcard one.
   173  func (ce ClusterEvent) Match(event ClusterEvent) bool {
   174  	return ce.IsWildCard() || (ce.Resource == WildCard || ce.Resource == event.Resource) && ce.ActionType&event.ActionType != 0
   175  }
   176  
   177  func UnrollWildCardResource() []ClusterEventWithHint {
   178  	return []ClusterEventWithHint{
   179  		{Event: ClusterEvent{Resource: Pod, ActionType: All}},
   180  		{Event: ClusterEvent{Resource: Node, ActionType: All}},
   181  		{Event: ClusterEvent{Resource: PersistentVolume, ActionType: All}},
   182  		{Event: ClusterEvent{Resource: PersistentVolumeClaim, ActionType: All}},
   183  		{Event: ClusterEvent{Resource: CSINode, ActionType: All}},
   184  		{Event: ClusterEvent{Resource: CSIDriver, ActionType: All}},
   185  		{Event: ClusterEvent{Resource: CSIStorageCapacity, ActionType: All}},
   186  		{Event: ClusterEvent{Resource: StorageClass, ActionType: All}},
   187  		{Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}},
   188  		{Event: ClusterEvent{Resource: ResourceClaim, ActionType: All}},
   189  		{Event: ClusterEvent{Resource: ResourceClass, ActionType: All}},
   190  		{Event: ClusterEvent{Resource: ResourceClaimParameters, ActionType: All}},
   191  		{Event: ClusterEvent{Resource: ResourceClassParameters, ActionType: All}},
   192  	}
   193  }
   194  
   195  // QueuedPodInfo is a Pod wrapper with additional information related to
   196  // the pod's status in the scheduling queue, such as the timestamp when
   197  // it's added to the queue.
   198  type QueuedPodInfo struct {
   199  	*PodInfo
   200  	// The time pod added to the scheduling queue.
   201  	Timestamp time.Time
   202  	// Number of schedule attempts before successfully scheduled.
   203  	// It's used to record the # attempts metric.
   204  	Attempts int
   205  	// The time when the pod is added to the queue for the first time. The pod may be added
   206  	// back to the queue multiple times before it's successfully scheduled.
   207  	// It shouldn't be updated once initialized. It's used to record the e2e scheduling
   208  	// latency for a pod.
   209  	InitialAttemptTimestamp *time.Time
   210  	// UnschedulablePlugins records the plugin names that the Pod failed with Unschedulable or UnschedulableAndUnresolvable status.
   211  	// It's registered only when the Pod is rejected in PreFilter, Filter, Reserve, or Permit (WaitOnPermit).
   212  	UnschedulablePlugins sets.Set[string]
   213  	// PendingPlugins records the plugin names that the Pod failed with Pending status.
   214  	PendingPlugins sets.Set[string]
   215  	// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.
   216  	Gated bool
   217  }
   218  
   219  // DeepCopy returns a deep copy of the QueuedPodInfo object.
   220  func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
   221  	return &QueuedPodInfo{
   222  		PodInfo:                 pqi.PodInfo.DeepCopy(),
   223  		Timestamp:               pqi.Timestamp,
   224  		Attempts:                pqi.Attempts,
   225  		InitialAttemptTimestamp: pqi.InitialAttemptTimestamp,
   226  		UnschedulablePlugins:    pqi.UnschedulablePlugins.Clone(),
   227  		Gated:                   pqi.Gated,
   228  	}
   229  }
   230  
   231  // PodInfo is a wrapper to a Pod with additional pre-computed information to
   232  // accelerate processing. This information is typically immutable (e.g., pre-processed
   233  // inter-pod affinity selectors).
   234  type PodInfo struct {
   235  	Pod                        *v1.Pod
   236  	RequiredAffinityTerms      []AffinityTerm
   237  	RequiredAntiAffinityTerms  []AffinityTerm
   238  	PreferredAffinityTerms     []WeightedAffinityTerm
   239  	PreferredAntiAffinityTerms []WeightedAffinityTerm
   240  }
   241  
   242  // DeepCopy returns a deep copy of the PodInfo object.
   243  func (pi *PodInfo) DeepCopy() *PodInfo {
   244  	return &PodInfo{
   245  		Pod:                        pi.Pod.DeepCopy(),
   246  		RequiredAffinityTerms:      pi.RequiredAffinityTerms,
   247  		RequiredAntiAffinityTerms:  pi.RequiredAntiAffinityTerms,
   248  		PreferredAffinityTerms:     pi.PreferredAffinityTerms,
   249  		PreferredAntiAffinityTerms: pi.PreferredAntiAffinityTerms,
   250  	}
   251  }
   252  
   253  // Update creates a full new PodInfo by default. And only updates the pod when the PodInfo
   254  // has been instantiated and the passed pod is the exact same one as the original pod.
   255  func (pi *PodInfo) Update(pod *v1.Pod) error {
   256  	if pod != nil && pi.Pod != nil && pi.Pod.UID == pod.UID {
   257  		// PodInfo includes immutable information, and so it is safe to update the pod in place if it is
   258  		// the exact same pod
   259  		pi.Pod = pod
   260  		return nil
   261  	}
   262  	var preferredAffinityTerms []v1.WeightedPodAffinityTerm
   263  	var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm
   264  	if affinity := pod.Spec.Affinity; affinity != nil {
   265  		if a := affinity.PodAffinity; a != nil {
   266  			preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
   267  		}
   268  		if a := affinity.PodAntiAffinity; a != nil {
   269  			preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
   270  		}
   271  	}
   272  
   273  	// Attempt to parse the affinity terms
   274  	var parseErrs []error
   275  	requiredAffinityTerms, err := getAffinityTerms(pod, getPodAffinityTerms(pod.Spec.Affinity))
   276  	if err != nil {
   277  		parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err))
   278  	}
   279  	requiredAntiAffinityTerms, err := getAffinityTerms(pod,
   280  		getPodAntiAffinityTerms(pod.Spec.Affinity))
   281  	if err != nil {
   282  		parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err))
   283  	}
   284  	weightedAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAffinityTerms)
   285  	if err != nil {
   286  		parseErrs = append(parseErrs, fmt.Errorf("preferredAffinityTerms: %w", err))
   287  	}
   288  	weightedAntiAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAntiAffinityTerms)
   289  	if err != nil {
   290  		parseErrs = append(parseErrs, fmt.Errorf("preferredAntiAffinityTerms: %w", err))
   291  	}
   292  
   293  	pi.Pod = pod
   294  	pi.RequiredAffinityTerms = requiredAffinityTerms
   295  	pi.RequiredAntiAffinityTerms = requiredAntiAffinityTerms
   296  	pi.PreferredAffinityTerms = weightedAffinityTerms
   297  	pi.PreferredAntiAffinityTerms = weightedAntiAffinityTerms
   298  	return utilerrors.NewAggregate(parseErrs)
   299  }
   300  
   301  // AffinityTerm is a processed version of v1.PodAffinityTerm.
   302  type AffinityTerm struct {
   303  	Namespaces        sets.Set[string]
   304  	Selector          labels.Selector
   305  	TopologyKey       string
   306  	NamespaceSelector labels.Selector
   307  }
   308  
   309  // Matches returns true if the pod matches the label selector and namespaces or namespace selector.
   310  func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set) bool {
   311  	if at.Namespaces.Has(pod.Namespace) || at.NamespaceSelector.Matches(nsLabels) {
   312  		return at.Selector.Matches(labels.Set(pod.Labels))
   313  	}
   314  	return false
   315  }
   316  
   317  // WeightedAffinityTerm is a "processed" representation of v1.WeightedAffinityTerm.
   318  type WeightedAffinityTerm struct {
   319  	AffinityTerm
   320  	Weight int32
   321  }
   322  
   323  // ExtenderName is a fake plugin name put in UnschedulablePlugins when Extender rejected some Nodes.
   324  const ExtenderName = "Extender"
   325  
   326  // Diagnosis records the details to diagnose a scheduling failure.
   327  type Diagnosis struct {
   328  	// NodeToStatusMap records the status of each node
   329  	// if they're rejected in PreFilter (via PreFilterResult) or Filter plugins.
   330  	// Nodes that pass PreFilter/Filter plugins are not included in this map.
   331  	NodeToStatusMap NodeToStatusMap
   332  	// UnschedulablePlugins are plugins that returns Unschedulable or UnschedulableAndUnresolvable.
   333  	UnschedulablePlugins sets.Set[string]
   334  	// UnschedulablePlugins are plugins that returns Pending.
   335  	PendingPlugins sets.Set[string]
   336  	// PreFilterMsg records the messages returned from PreFilter plugins.
   337  	PreFilterMsg string
   338  	// PostFilterMsg records the messages returned from PostFilter plugins.
   339  	PostFilterMsg string
   340  }
   341  
   342  // FitError describes a fit error of a pod.
   343  type FitError struct {
   344  	Pod         *v1.Pod
   345  	NumAllNodes int
   346  	Diagnosis   Diagnosis
   347  }
   348  
   349  const (
   350  	// NoNodeAvailableMsg is used to format message when no nodes available.
   351  	NoNodeAvailableMsg = "0/%v nodes are available"
   352  )
   353  
   354  func (d *Diagnosis) AddPluginStatus(sts *Status) {
   355  	if sts.Plugin() == "" {
   356  		return
   357  	}
   358  	if sts.IsRejected() {
   359  		if d.UnschedulablePlugins == nil {
   360  			d.UnschedulablePlugins = sets.New[string]()
   361  		}
   362  		d.UnschedulablePlugins.Insert(sts.Plugin())
   363  	}
   364  	if sts.Code() == Pending {
   365  		if d.PendingPlugins == nil {
   366  			d.PendingPlugins = sets.New[string]()
   367  		}
   368  		d.PendingPlugins.Insert(sts.Plugin())
   369  	}
   370  }
   371  
   372  // Error returns detailed information of why the pod failed to fit on each node.
   373  // A message format is "0/X nodes are available: <PreFilterMsg>. <FilterMsg>. <PostFilterMsg>."
   374  func (f *FitError) Error() string {
   375  	reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+":", f.NumAllNodes)
   376  	preFilterMsg := f.Diagnosis.PreFilterMsg
   377  	if preFilterMsg != "" {
   378  		// PreFilter plugin returns unschedulable.
   379  		// Add the messages from PreFilter plugins to reasonMsg.
   380  		reasonMsg += fmt.Sprintf(" %v.", preFilterMsg)
   381  	}
   382  
   383  	if preFilterMsg == "" {
   384  		// the scheduling cycle went through PreFilter extension point successfully.
   385  		//
   386  		// When the prefilter plugin returns unschedulable,
   387  		// the scheduling framework inserts the same unschedulable status to all nodes in NodeToStatusMap.
   388  		// So, we shouldn't add the message from NodeToStatusMap when the PreFilter failed.
   389  		// Otherwise, we will have duplicated reasons in the error message.
   390  		reasons := make(map[string]int)
   391  		for _, status := range f.Diagnosis.NodeToStatusMap {
   392  			for _, reason := range status.Reasons() {
   393  				reasons[reason]++
   394  			}
   395  		}
   396  
   397  		sortReasonsHistogram := func() []string {
   398  			var reasonStrings []string
   399  			for k, v := range reasons {
   400  				reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
   401  			}
   402  			sort.Strings(reasonStrings)
   403  			return reasonStrings
   404  		}
   405  		sortedFilterMsg := sortReasonsHistogram()
   406  		if len(sortedFilterMsg) != 0 {
   407  			reasonMsg += fmt.Sprintf(" %v.", strings.Join(sortedFilterMsg, ", "))
   408  		}
   409  	}
   410  
   411  	// Add the messages from PostFilter plugins to reasonMsg.
   412  	// We can add this message regardless of whether the scheduling cycle fails at PreFilter or Filter
   413  	// since we may run PostFilter (if enabled) in both cases.
   414  	postFilterMsg := f.Diagnosis.PostFilterMsg
   415  	if postFilterMsg != "" {
   416  		reasonMsg += fmt.Sprintf(" %v", postFilterMsg)
   417  	}
   418  	return reasonMsg
   419  }
   420  
   421  func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) {
   422  	selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
   423  	if err != nil {
   424  		return nil, err
   425  	}
   426  
   427  	namespaces := getNamespacesFromPodAffinityTerm(pod, term)
   428  	nsSelector, err := metav1.LabelSelectorAsSelector(term.NamespaceSelector)
   429  	if err != nil {
   430  		return nil, err
   431  	}
   432  
   433  	return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey, NamespaceSelector: nsSelector}, nil
   434  }
   435  
   436  // getAffinityTerms receives a Pod and affinity terms and returns the namespaces and
   437  // selectors of the terms.
   438  func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]AffinityTerm, error) {
   439  	if v1Terms == nil {
   440  		return nil, nil
   441  	}
   442  
   443  	var terms []AffinityTerm
   444  	for i := range v1Terms {
   445  		t, err := newAffinityTerm(pod, &v1Terms[i])
   446  		if err != nil {
   447  			// We get here if the label selector failed to process
   448  			return nil, err
   449  		}
   450  		terms = append(terms, *t)
   451  	}
   452  	return terms, nil
   453  }
   454  
   455  // getWeightedAffinityTerms returns the list of processed affinity terms.
   456  func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]WeightedAffinityTerm, error) {
   457  	if v1Terms == nil {
   458  		return nil, nil
   459  	}
   460  
   461  	var terms []WeightedAffinityTerm
   462  	for i := range v1Terms {
   463  		t, err := newAffinityTerm(pod, &v1Terms[i].PodAffinityTerm)
   464  		if err != nil {
   465  			// We get here if the label selector failed to process
   466  			return nil, err
   467  		}
   468  		terms = append(terms, WeightedAffinityTerm{AffinityTerm: *t, Weight: v1Terms[i].Weight})
   469  	}
   470  	return terms, nil
   471  }
   472  
   473  // NewPodInfo returns a new PodInfo.
   474  func NewPodInfo(pod *v1.Pod) (*PodInfo, error) {
   475  	pInfo := &PodInfo{}
   476  	err := pInfo.Update(pod)
   477  	return pInfo, err
   478  }
   479  
   480  func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
   481  	if affinity != nil && affinity.PodAffinity != nil {
   482  		if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
   483  			terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
   484  		}
   485  		// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
   486  		// if len(affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
   487  		//	terms = append(terms, affinity.PodAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
   488  		// }
   489  	}
   490  	return terms
   491  }
   492  
   493  func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
   494  	if affinity != nil && affinity.PodAntiAffinity != nil {
   495  		if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
   496  			terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
   497  		}
   498  		// TODO: Uncomment this block when implement RequiredDuringSchedulingRequiredDuringExecution.
   499  		// if len(affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution) != 0 {
   500  		//	terms = append(terms, affinity.PodAntiAffinity.RequiredDuringSchedulingRequiredDuringExecution...)
   501  		// }
   502  	}
   503  	return terms
   504  }
   505  
   506  // returns a set of names according to the namespaces indicated in podAffinityTerm.
   507  // If namespaces is empty it considers the given pod's namespace.
   508  func getNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.Set[string] {
   509  	names := sets.Set[string]{}
   510  	if len(podAffinityTerm.Namespaces) == 0 && podAffinityTerm.NamespaceSelector == nil {
   511  		names.Insert(pod.Namespace)
   512  	} else {
   513  		names.Insert(podAffinityTerm.Namespaces...)
   514  	}
   515  	return names
   516  }
   517  
   518  // ImageStateSummary provides summarized information about the state of an image.
   519  type ImageStateSummary struct {
   520  	// Size of the image
   521  	Size int64
   522  	// Used to track how many nodes have this image, it is computed from the Nodes field below
   523  	// during the execution of Snapshot.
   524  	NumNodes int
   525  	// A set of node names for nodes having this image present. This field is used for
   526  	// keeping track of the nodes during update/add/remove events.
   527  	Nodes sets.Set[string]
   528  }
   529  
   530  // Snapshot returns a copy without Nodes field of ImageStateSummary
   531  func (iss *ImageStateSummary) Snapshot() *ImageStateSummary {
   532  	return &ImageStateSummary{
   533  		Size:     iss.Size,
   534  		NumNodes: iss.Nodes.Len(),
   535  	}
   536  }
   537  
   538  // NodeInfo is node level aggregated information.
   539  type NodeInfo struct {
   540  	// Overall node information.
   541  	node *v1.Node
   542  
   543  	// Pods running on the node.
   544  	Pods []*PodInfo
   545  
   546  	// The subset of pods with affinity.
   547  	PodsWithAffinity []*PodInfo
   548  
   549  	// The subset of pods with required anti-affinity.
   550  	PodsWithRequiredAntiAffinity []*PodInfo
   551  
   552  	// Ports allocated on the node.
   553  	UsedPorts HostPortInfo
   554  
   555  	// Total requested resources of all pods on this node. This includes assumed
   556  	// pods, which scheduler has sent for binding, but may not be scheduled yet.
   557  	Requested *Resource
   558  	// Total requested resources of all pods on this node with a minimum value
   559  	// applied to each container's CPU and memory requests. This does not reflect
   560  	// the actual resource requests for this node, but is used to avoid scheduling
   561  	// many zero-request pods onto one node.
   562  	NonZeroRequested *Resource
   563  	// We store allocatedResources (which is Node.Status.Allocatable.*) explicitly
   564  	// as int64, to avoid conversions and accessing map.
   565  	Allocatable *Resource
   566  
   567  	// ImageStates holds the entry of an image if and only if this image is on the node. The entry can be used for
   568  	// checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image
   569  	// state information.
   570  	ImageStates map[string]*ImageStateSummary
   571  
   572  	// PVCRefCounts contains a mapping of PVC names to the number of pods on the node using it.
   573  	// Keys are in the format "namespace/name".
   574  	PVCRefCounts map[string]int
   575  
   576  	// Whenever NodeInfo changes, generation is bumped.
   577  	// This is used to avoid cloning it if the object didn't change.
   578  	Generation int64
   579  }
   580  
   581  // nextGeneration: Let's make sure history never forgets the name...
   582  // Increments the generation number monotonically ensuring that generation numbers never collide.
   583  // Collision of the generation numbers would be particularly problematic if a node was deleted and
   584  // added back with the same name. See issue#63262.
   585  func nextGeneration() int64 {
   586  	return atomic.AddInt64(&generation, 1)
   587  }
   588  
   589  // Resource is a collection of compute resource.
   590  type Resource struct {
   591  	MilliCPU         int64
   592  	Memory           int64
   593  	EphemeralStorage int64
   594  	// We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value())
   595  	// explicitly as int, to avoid conversions and improve performance.
   596  	AllowedPodNumber int
   597  	// ScalarResources
   598  	ScalarResources map[v1.ResourceName]int64
   599  }
   600  
   601  // NewResource creates a Resource from ResourceList
   602  func NewResource(rl v1.ResourceList) *Resource {
   603  	r := &Resource{}
   604  	r.Add(rl)
   605  	return r
   606  }
   607  
   608  // Add adds ResourceList into Resource.
   609  func (r *Resource) Add(rl v1.ResourceList) {
   610  	if r == nil {
   611  		return
   612  	}
   613  
   614  	for rName, rQuant := range rl {
   615  		switch rName {
   616  		case v1.ResourceCPU:
   617  			r.MilliCPU += rQuant.MilliValue()
   618  		case v1.ResourceMemory:
   619  			r.Memory += rQuant.Value()
   620  		case v1.ResourcePods:
   621  			r.AllowedPodNumber += int(rQuant.Value())
   622  		case v1.ResourceEphemeralStorage:
   623  			r.EphemeralStorage += rQuant.Value()
   624  		default:
   625  			if schedutil.IsScalarResourceName(rName) {
   626  				r.AddScalar(rName, rQuant.Value())
   627  			}
   628  		}
   629  	}
   630  }
   631  
   632  // Clone returns a copy of this resource.
   633  func (r *Resource) Clone() *Resource {
   634  	res := &Resource{
   635  		MilliCPU:         r.MilliCPU,
   636  		Memory:           r.Memory,
   637  		AllowedPodNumber: r.AllowedPodNumber,
   638  		EphemeralStorage: r.EphemeralStorage,
   639  	}
   640  	if r.ScalarResources != nil {
   641  		res.ScalarResources = make(map[v1.ResourceName]int64, len(r.ScalarResources))
   642  		for k, v := range r.ScalarResources {
   643  			res.ScalarResources[k] = v
   644  		}
   645  	}
   646  	return res
   647  }
   648  
   649  // AddScalar adds a resource by a scalar value of this resource.
   650  func (r *Resource) AddScalar(name v1.ResourceName, quantity int64) {
   651  	r.SetScalar(name, r.ScalarResources[name]+quantity)
   652  }
   653  
   654  // SetScalar sets a resource by a scalar value of this resource.
   655  func (r *Resource) SetScalar(name v1.ResourceName, quantity int64) {
   656  	// Lazily allocate scalar resource map.
   657  	if r.ScalarResources == nil {
   658  		r.ScalarResources = map[v1.ResourceName]int64{}
   659  	}
   660  	r.ScalarResources[name] = quantity
   661  }
   662  
   663  // SetMaxResource compares with ResourceList and takes max value for each Resource.
   664  func (r *Resource) SetMaxResource(rl v1.ResourceList) {
   665  	if r == nil {
   666  		return
   667  	}
   668  
   669  	for rName, rQuantity := range rl {
   670  		switch rName {
   671  		case v1.ResourceMemory:
   672  			r.Memory = max(r.Memory, rQuantity.Value())
   673  		case v1.ResourceCPU:
   674  			r.MilliCPU = max(r.MilliCPU, rQuantity.MilliValue())
   675  		case v1.ResourceEphemeralStorage:
   676  			r.EphemeralStorage = max(r.EphemeralStorage, rQuantity.Value())
   677  		default:
   678  			if schedutil.IsScalarResourceName(rName) {
   679  				r.SetScalar(rName, max(r.ScalarResources[rName], rQuantity.Value()))
   680  			}
   681  		}
   682  	}
   683  }
   684  
   685  // NewNodeInfo returns a ready to use empty NodeInfo object.
   686  // If any pods are given in arguments, their information will be aggregated in
   687  // the returned object.
   688  func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
   689  	ni := &NodeInfo{
   690  		Requested:        &Resource{},
   691  		NonZeroRequested: &Resource{},
   692  		Allocatable:      &Resource{},
   693  		Generation:       nextGeneration(),
   694  		UsedPorts:        make(HostPortInfo),
   695  		ImageStates:      make(map[string]*ImageStateSummary),
   696  		PVCRefCounts:     make(map[string]int),
   697  	}
   698  	for _, pod := range pods {
   699  		ni.AddPod(pod)
   700  	}
   701  	return ni
   702  }
   703  
   704  // Node returns overall information about this node.
   705  func (n *NodeInfo) Node() *v1.Node {
   706  	if n == nil {
   707  		return nil
   708  	}
   709  	return n.node
   710  }
   711  
   712  // Snapshot returns a copy of this node, Except that ImageStates is copied without the Nodes field.
   713  func (n *NodeInfo) Snapshot() *NodeInfo {
   714  	clone := &NodeInfo{
   715  		node:             n.node,
   716  		Requested:        n.Requested.Clone(),
   717  		NonZeroRequested: n.NonZeroRequested.Clone(),
   718  		Allocatable:      n.Allocatable.Clone(),
   719  		UsedPorts:        make(HostPortInfo),
   720  		ImageStates:      make(map[string]*ImageStateSummary),
   721  		PVCRefCounts:     make(map[string]int),
   722  		Generation:       n.Generation,
   723  	}
   724  	if len(n.Pods) > 0 {
   725  		clone.Pods = append([]*PodInfo(nil), n.Pods...)
   726  	}
   727  	if len(n.UsedPorts) > 0 {
   728  		// HostPortInfo is a map-in-map struct
   729  		// make sure it's deep copied
   730  		for ip, portMap := range n.UsedPorts {
   731  			clone.UsedPorts[ip] = make(map[ProtocolPort]struct{})
   732  			for protocolPort, v := range portMap {
   733  				clone.UsedPorts[ip][protocolPort] = v
   734  			}
   735  		}
   736  	}
   737  	if len(n.PodsWithAffinity) > 0 {
   738  		clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
   739  	}
   740  	if len(n.PodsWithRequiredAntiAffinity) > 0 {
   741  		clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
   742  	}
   743  	if len(n.ImageStates) > 0 {
   744  		state := make(map[string]*ImageStateSummary, len(n.ImageStates))
   745  		for imageName, imageState := range n.ImageStates {
   746  			state[imageName] = imageState.Snapshot()
   747  		}
   748  		clone.ImageStates = state
   749  	}
   750  	for key, value := range n.PVCRefCounts {
   751  		clone.PVCRefCounts[key] = value
   752  	}
   753  	return clone
   754  }
   755  
   756  // String returns representation of human readable format of this NodeInfo.
   757  func (n *NodeInfo) String() string {
   758  	podKeys := make([]string, len(n.Pods))
   759  	for i, p := range n.Pods {
   760  		podKeys[i] = p.Pod.Name
   761  	}
   762  	return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}",
   763  		podKeys, n.Requested, n.NonZeroRequested, n.UsedPorts, n.Allocatable)
   764  }
   765  
   766  // AddPodInfo adds pod information to this NodeInfo.
   767  // Consider using this instead of AddPod if a PodInfo is already computed.
   768  func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
   769  	n.Pods = append(n.Pods, podInfo)
   770  	if podWithAffinity(podInfo.Pod) {
   771  		n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
   772  	}
   773  	if podWithRequiredAntiAffinity(podInfo.Pod) {
   774  		n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
   775  	}
   776  	n.update(podInfo.Pod, 1)
   777  }
   778  
   779  // AddPod is a wrapper around AddPodInfo.
   780  func (n *NodeInfo) AddPod(pod *v1.Pod) {
   781  	// ignore this err since apiserver doesn't properly validate affinity terms
   782  	// and we can't fix the validation for backwards compatibility.
   783  	podInfo, _ := NewPodInfo(pod)
   784  	n.AddPodInfo(podInfo)
   785  }
   786  
   787  func podWithAffinity(p *v1.Pod) bool {
   788  	affinity := p.Spec.Affinity
   789  	return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
   790  }
   791  
   792  func podWithRequiredAntiAffinity(p *v1.Pod) bool {
   793  	affinity := p.Spec.Affinity
   794  	return affinity != nil && affinity.PodAntiAffinity != nil &&
   795  		len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
   796  }
   797  
   798  func removeFromSlice(logger klog.Logger, s []*PodInfo, k string) ([]*PodInfo, bool) {
   799  	var removed bool
   800  	for i := range s {
   801  		tmpKey, err := GetPodKey(s[i].Pod)
   802  		if err != nil {
   803  			logger.Error(err, "Cannot get pod key", "pod", klog.KObj(s[i].Pod))
   804  			continue
   805  		}
   806  		if k == tmpKey {
   807  			// delete the element
   808  			s[i] = s[len(s)-1]
   809  			s = s[:len(s)-1]
   810  			removed = true
   811  			break
   812  		}
   813  	}
   814  	// resets the slices to nil so that we can do DeepEqual in unit tests.
   815  	if len(s) == 0 {
   816  		return nil, removed
   817  	}
   818  	return s, removed
   819  }
   820  
   821  // RemovePod subtracts pod information from this NodeInfo.
   822  func (n *NodeInfo) RemovePod(logger klog.Logger, pod *v1.Pod) error {
   823  	k, err := GetPodKey(pod)
   824  	if err != nil {
   825  		return err
   826  	}
   827  	if podWithAffinity(pod) {
   828  		n.PodsWithAffinity, _ = removeFromSlice(logger, n.PodsWithAffinity, k)
   829  	}
   830  	if podWithRequiredAntiAffinity(pod) {
   831  		n.PodsWithRequiredAntiAffinity, _ = removeFromSlice(logger, n.PodsWithRequiredAntiAffinity, k)
   832  	}
   833  
   834  	var removed bool
   835  	if n.Pods, removed = removeFromSlice(logger, n.Pods, k); removed {
   836  		n.update(pod, -1)
   837  		return nil
   838  	}
   839  	return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name)
   840  }
   841  
   842  // update node info based on the pod and sign.
   843  // The sign will be set to `+1` when AddPod and to `-1` when RemovePod.
   844  func (n *NodeInfo) update(pod *v1.Pod, sign int64) {
   845  	res, non0CPU, non0Mem := calculateResource(pod)
   846  	n.Requested.MilliCPU += sign * res.MilliCPU
   847  	n.Requested.Memory += sign * res.Memory
   848  	n.Requested.EphemeralStorage += sign * res.EphemeralStorage
   849  	if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
   850  		n.Requested.ScalarResources = map[v1.ResourceName]int64{}
   851  	}
   852  	for rName, rQuant := range res.ScalarResources {
   853  		n.Requested.ScalarResources[rName] += sign * rQuant
   854  	}
   855  	n.NonZeroRequested.MilliCPU += sign * non0CPU
   856  	n.NonZeroRequested.Memory += sign * non0Mem
   857  
   858  	// Consume ports when pod added or release ports when pod removed.
   859  	n.updateUsedPorts(pod, sign > 0)
   860  	n.updatePVCRefCounts(pod, sign > 0)
   861  
   862  	n.Generation = nextGeneration()
   863  }
   864  
   865  func calculateResource(pod *v1.Pod) (Resource, int64, int64) {
   866  	var non0InitCPU, non0InitMem int64
   867  	var non0CPU, non0Mem int64
   868  	requests := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{
   869  		InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
   870  		ContainerFn: func(requests v1.ResourceList, containerType podutil.ContainerType) {
   871  			non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&requests)
   872  			switch containerType {
   873  			case podutil.Containers:
   874  				non0CPU += non0CPUReq
   875  				non0Mem += non0MemReq
   876  			case podutil.InitContainers:
   877  				non0InitCPU = max(non0InitCPU, non0CPUReq)
   878  				non0InitMem = max(non0InitMem, non0MemReq)
   879  			}
   880  		},
   881  	})
   882  
   883  	non0CPU = max(non0CPU, non0InitCPU)
   884  	non0Mem = max(non0Mem, non0InitMem)
   885  
   886  	// If Overhead is being utilized, add to the non-zero cpu/memory tracking for the pod. It has already been added
   887  	// into ScalarResources since it is part of requests
   888  	if pod.Spec.Overhead != nil {
   889  		if _, found := pod.Spec.Overhead[v1.ResourceCPU]; found {
   890  			non0CPU += pod.Spec.Overhead.Cpu().MilliValue()
   891  		}
   892  
   893  		if _, found := pod.Spec.Overhead[v1.ResourceMemory]; found {
   894  			non0Mem += pod.Spec.Overhead.Memory().Value()
   895  		}
   896  	}
   897  	var res Resource
   898  	res.Add(requests)
   899  	return res, non0CPU, non0Mem
   900  }
   901  
   902  // updateUsedPorts updates the UsedPorts of NodeInfo.
   903  func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
   904  	for _, container := range pod.Spec.Containers {
   905  		for _, podPort := range container.Ports {
   906  			if add {
   907  				n.UsedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
   908  			} else {
   909  				n.UsedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
   910  			}
   911  		}
   912  	}
   913  }
   914  
   915  // updatePVCRefCounts updates the PVCRefCounts of NodeInfo.
   916  func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) {
   917  	for _, v := range pod.Spec.Volumes {
   918  		if v.PersistentVolumeClaim == nil {
   919  			continue
   920  		}
   921  
   922  		key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
   923  		if add {
   924  			n.PVCRefCounts[key] += 1
   925  		} else {
   926  			n.PVCRefCounts[key] -= 1
   927  			if n.PVCRefCounts[key] <= 0 {
   928  				delete(n.PVCRefCounts, key)
   929  			}
   930  		}
   931  	}
   932  }
   933  
   934  // SetNode sets the overall node information.
   935  func (n *NodeInfo) SetNode(node *v1.Node) {
   936  	n.node = node
   937  	n.Allocatable = NewResource(node.Status.Allocatable)
   938  	n.Generation = nextGeneration()
   939  }
   940  
   941  // RemoveNode removes the node object, leaving all other tracking information.
   942  func (n *NodeInfo) RemoveNode() {
   943  	n.node = nil
   944  	n.Generation = nextGeneration()
   945  }
   946  
   947  // GetPodKey returns the string key of a pod.
   948  func GetPodKey(pod *v1.Pod) (string, error) {
   949  	uid := string(pod.UID)
   950  	if len(uid) == 0 {
   951  		return "", errors.New("cannot get cache key for pod with empty UID")
   952  	}
   953  	return uid, nil
   954  }
   955  
   956  // GetNamespacedName returns the string format of a namespaced resource name.
   957  func GetNamespacedName(namespace, name string) string {
   958  	return fmt.Sprintf("%s/%s", namespace, name)
   959  }
   960  
   961  // DefaultBindAllHostIP defines the default ip address used to bind to all host.
   962  const DefaultBindAllHostIP = "0.0.0.0"
   963  
   964  // ProtocolPort represents a protocol port pair, e.g. tcp:80.
   965  type ProtocolPort struct {
   966  	Protocol string
   967  	Port     int32
   968  }
   969  
   970  // NewProtocolPort creates a ProtocolPort instance.
   971  func NewProtocolPort(protocol string, port int32) *ProtocolPort {
   972  	pp := &ProtocolPort{
   973  		Protocol: protocol,
   974  		Port:     port,
   975  	}
   976  
   977  	if len(pp.Protocol) == 0 {
   978  		pp.Protocol = string(v1.ProtocolTCP)
   979  	}
   980  
   981  	return pp
   982  }
   983  
   984  // HostPortInfo stores mapping from ip to a set of ProtocolPort
   985  type HostPortInfo map[string]map[ProtocolPort]struct{}
   986  
   987  // Add adds (ip, protocol, port) to HostPortInfo
   988  func (h HostPortInfo) Add(ip, protocol string, port int32) {
   989  	if port <= 0 {
   990  		return
   991  	}
   992  
   993  	h.sanitize(&ip, &protocol)
   994  
   995  	pp := NewProtocolPort(protocol, port)
   996  	if _, ok := h[ip]; !ok {
   997  		h[ip] = map[ProtocolPort]struct{}{
   998  			*pp: {},
   999  		}
  1000  		return
  1001  	}
  1002  
  1003  	h[ip][*pp] = struct{}{}
  1004  }
  1005  
  1006  // Remove removes (ip, protocol, port) from HostPortInfo
  1007  func (h HostPortInfo) Remove(ip, protocol string, port int32) {
  1008  	if port <= 0 {
  1009  		return
  1010  	}
  1011  
  1012  	h.sanitize(&ip, &protocol)
  1013  
  1014  	pp := NewProtocolPort(protocol, port)
  1015  	if m, ok := h[ip]; ok {
  1016  		delete(m, *pp)
  1017  		if len(h[ip]) == 0 {
  1018  			delete(h, ip)
  1019  		}
  1020  	}
  1021  }
  1022  
  1023  // Len returns the total number of (ip, protocol, port) tuple in HostPortInfo
  1024  func (h HostPortInfo) Len() int {
  1025  	length := 0
  1026  	for _, m := range h {
  1027  		length += len(m)
  1028  	}
  1029  	return length
  1030  }
  1031  
  1032  // CheckConflict checks if the input (ip, protocol, port) conflicts with the existing
  1033  // ones in HostPortInfo.
  1034  func (h HostPortInfo) CheckConflict(ip, protocol string, port int32) bool {
  1035  	if port <= 0 {
  1036  		return false
  1037  	}
  1038  
  1039  	h.sanitize(&ip, &protocol)
  1040  
  1041  	pp := NewProtocolPort(protocol, port)
  1042  
  1043  	// If ip is 0.0.0.0 check all IP's (protocol, port) pair
  1044  	if ip == DefaultBindAllHostIP {
  1045  		for _, m := range h {
  1046  			if _, ok := m[*pp]; ok {
  1047  				return true
  1048  			}
  1049  		}
  1050  		return false
  1051  	}
  1052  
  1053  	// If ip isn't 0.0.0.0, only check IP and 0.0.0.0's (protocol, port) pair
  1054  	for _, key := range []string{DefaultBindAllHostIP, ip} {
  1055  		if m, ok := h[key]; ok {
  1056  			if _, ok2 := m[*pp]; ok2 {
  1057  				return true
  1058  			}
  1059  		}
  1060  	}
  1061  
  1062  	return false
  1063  }
  1064  
  1065  // sanitize the parameters
  1066  func (h HostPortInfo) sanitize(ip, protocol *string) {
  1067  	if len(*ip) == 0 {
  1068  		*ip = DefaultBindAllHostIP
  1069  	}
  1070  	if len(*protocol) == 0 {
  1071  		*protocol = string(v1.ProtocolTCP)
  1072  	}
  1073  }
  1074  

View as plain text