...

Source file src/k8s.io/kubernetes/pkg/scheduler/testing/framework/fake_extender.go

Documentation: k8s.io/kubernetes/pkg/scheduler/testing/framework

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package framework
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	corev1helpers "k8s.io/component-helpers/scheduling/corev1"
    27  	"k8s.io/klog/v2"
    28  	extenderv1 "k8s.io/kube-scheduler/extender/v1"
    29  	"k8s.io/kubernetes/pkg/scheduler/framework"
    30  	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    31  	"k8s.io/kubernetes/pkg/scheduler/util"
    32  )
    33  
    34  // FitPredicate is a function type which is used in fake extender.
    35  type FitPredicate func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status
    36  
    37  // PriorityFunc is a function type which is used in fake extender.
    38  type PriorityFunc func(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error)
    39  
    40  // PriorityConfig is used in fake extender to perform Prioritize function.
    41  type PriorityConfig struct {
    42  	Function PriorityFunc
    43  	Weight   int64
    44  }
    45  
    46  // ErrorPredicateExtender implements FitPredicate function to always return error status.
    47  func ErrorPredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
    48  	return framework.NewStatus(framework.Error, "some error")
    49  }
    50  
    51  // FalsePredicateExtender implements FitPredicate function to always return unschedulable status.
    52  func FalsePredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
    53  	return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("pod is unschedulable on the node %q", node.Node().Name))
    54  }
    55  
    56  // TruePredicateExtender implements FitPredicate function to always return success status.
    57  func TruePredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
    58  	return framework.NewStatus(framework.Success)
    59  }
    60  
    61  // Node1PredicateExtender implements FitPredicate function to return true
    62  // when the given node's name is "node1"; otherwise return false.
    63  func Node1PredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
    64  	if node.Node().Name == "node1" {
    65  		return framework.NewStatus(framework.Success)
    66  	}
    67  	return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
    68  }
    69  
    70  // Node2PredicateExtender implements FitPredicate function to return true
    71  // when the given node's name is "node2"; otherwise return false.
    72  func Node2PredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
    73  	if node.Node().Name == "node2" {
    74  		return framework.NewStatus(framework.Success)
    75  	}
    76  	return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
    77  }
    78  
    79  // ErrorPrioritizerExtender implements PriorityFunc function to always return error.
    80  func ErrorPrioritizerExtender(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error) {
    81  	return &framework.NodeScoreList{}, fmt.Errorf("some error")
    82  }
    83  
    84  // Node1PrioritizerExtender implements PriorityFunc function to give score 10
    85  // if the given node's name is "node1"; otherwise score 1.
    86  func Node1PrioritizerExtender(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error) {
    87  	result := framework.NodeScoreList{}
    88  	for _, node := range nodes {
    89  		score := 1
    90  		if node.Node().Name == "node1" {
    91  			score = 10
    92  		}
    93  		result = append(result, framework.NodeScore{Name: node.Node().Name, Score: int64(score)})
    94  	}
    95  	return &result, nil
    96  }
    97  
    98  // Node2PrioritizerExtender implements PriorityFunc function to give score 10
    99  // if the given node's name is "node2"; otherwise score 1.
   100  func Node2PrioritizerExtender(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error) {
   101  	result := framework.NodeScoreList{}
   102  	for _, node := range nodes {
   103  		score := 1
   104  		if node.Node().Name == "node2" {
   105  			score = 10
   106  		}
   107  		result = append(result, framework.NodeScore{Name: node.Node().Name, Score: int64(score)})
   108  	}
   109  	return &result, nil
   110  }
   111  
   112  type node2PrioritizerPlugin struct{}
   113  
   114  // NewNode2PrioritizerPlugin returns a factory function to build node2PrioritizerPlugin.
   115  func NewNode2PrioritizerPlugin() frameworkruntime.PluginFactory {
   116  	return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
   117  		return &node2PrioritizerPlugin{}, nil
   118  	}
   119  }
   120  
   121  // Name returns name of the plugin.
   122  func (pl *node2PrioritizerPlugin) Name() string {
   123  	return "Node2Prioritizer"
   124  }
   125  
   126  // Score return score 100 if the given nodeName is "node2"; otherwise return score 10.
   127  func (pl *node2PrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
   128  	score := 10
   129  	if nodeName == "node2" {
   130  		score = 100
   131  	}
   132  	return int64(score), nil
   133  }
   134  
   135  // ScoreExtensions returns nil.
   136  func (pl *node2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
   137  	return nil
   138  }
   139  
   140  // FakeExtender is a data struct which implements the Extender interface.
   141  type FakeExtender struct {
   142  	// ExtenderName indicates this fake extender's name.
   143  	// Note that extender name should be unique.
   144  	ExtenderName     string
   145  	Predicates       []FitPredicate
   146  	Prioritizers     []PriorityConfig
   147  	Weight           int64
   148  	NodeCacheCapable bool
   149  	FilteredNodes    []*framework.NodeInfo
   150  	UnInterested     bool
   151  	Ignorable        bool
   152  	Binder           func() error
   153  
   154  	// Cached node information for fake extender
   155  	CachedNodeNameToInfo map[string]*framework.NodeInfo
   156  }
   157  
   158  const defaultFakeExtenderName = "defaultFakeExtender"
   159  
   160  // Name returns name of the extender.
   161  func (f *FakeExtender) Name() string {
   162  	if f.ExtenderName == "" {
   163  		// If ExtenderName is unset, use default name.
   164  		return defaultFakeExtenderName
   165  	}
   166  	return f.ExtenderName
   167  }
   168  
   169  // IsIgnorable returns a bool value indicating whether internal errors can be ignored.
   170  func (f *FakeExtender) IsIgnorable() bool {
   171  	return f.Ignorable
   172  }
   173  
   174  // SupportsPreemption returns true indicating the extender supports preemption.
   175  func (f *FakeExtender) SupportsPreemption() bool {
   176  	// Assume preempt verb is always defined.
   177  	return true
   178  }
   179  
   180  // ProcessPreemption implements the extender preempt function.
   181  func (f *FakeExtender) ProcessPreemption(
   182  	pod *v1.Pod,
   183  	nodeNameToVictims map[string]*extenderv1.Victims,
   184  	nodeInfos framework.NodeInfoLister,
   185  ) (map[string]*extenderv1.Victims, error) {
   186  	nodeNameToVictimsCopy := map[string]*extenderv1.Victims{}
   187  	// We don't want to change the original nodeNameToVictims
   188  	for k, v := range nodeNameToVictims {
   189  		// In real world implementation, extender's user should have their own way to get node object
   190  		// by name if needed (e.g. query kube-apiserver etc).
   191  		//
   192  		// For test purpose, we just use node from parameters directly.
   193  		nodeNameToVictimsCopy[k] = v
   194  	}
   195  
   196  	// If Extender.ProcessPreemption ever gets extended with a context parameter, then the logger should be retrieved from that.
   197  	// Now, in order not to modify the Extender interface, we get the logger from klog.TODO()
   198  	logger := klog.TODO()
   199  	for nodeName, victims := range nodeNameToVictimsCopy {
   200  		// Try to do preemption on extender side.
   201  		nodeInfo, _ := nodeInfos.Get(nodeName)
   202  		extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(logger, pod, nodeInfo)
   203  		if err != nil {
   204  			return nil, err
   205  		}
   206  		// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
   207  		// let's remove it from potential preemption nodes.
   208  		if !fits {
   209  			delete(nodeNameToVictimsCopy, nodeName)
   210  		} else {
   211  			// Append new victims to original victims
   212  			nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...)
   213  			nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations)
   214  		}
   215  	}
   216  	return nodeNameToVictimsCopy, nil
   217  }
   218  
   219  // selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.
   220  // Returns:
   221  // 1. More victim pods (if any) amended by preemption phase of extender.
   222  // 2. Number of violating victim (used to calculate PDB).
   223  // 3. Fits or not after preemption phase on extender's side.
   224  func (f *FakeExtender) selectVictimsOnNodeByExtender(logger klog.Logger, pod *v1.Pod, node *framework.NodeInfo) ([]*v1.Pod, int, bool, error) {
   225  	// If a extender support preemption but have no cached node info, let's run filter to make sure
   226  	// default scheduler's decision still stand with given pod and node.
   227  	if !f.NodeCacheCapable {
   228  		err := f.runPredicate(pod, node)
   229  		if err.IsSuccess() {
   230  			return []*v1.Pod{}, 0, true, nil
   231  		} else if err.IsRejected() {
   232  			return nil, 0, false, nil
   233  		} else {
   234  			return nil, 0, false, err.AsError()
   235  		}
   236  	}
   237  
   238  	// Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available
   239  	// and get cached node info by given node name.
   240  	nodeInfoCopy := f.CachedNodeNameToInfo[node.Node().Name].Snapshot()
   241  
   242  	var potentialVictims []*v1.Pod
   243  
   244  	removePod := func(rp *v1.Pod) error {
   245  		return nodeInfoCopy.RemovePod(logger, rp)
   246  	}
   247  	addPod := func(ap *v1.Pod) {
   248  		nodeInfoCopy.AddPod(ap)
   249  	}
   250  	// As the first step, remove all the lower priority pods from the node and
   251  	// check if the given pod can be scheduled.
   252  	podPriority := corev1helpers.PodPriority(pod)
   253  	for _, p := range nodeInfoCopy.Pods {
   254  		if corev1helpers.PodPriority(p.Pod) < podPriority {
   255  			potentialVictims = append(potentialVictims, p.Pod)
   256  			if err := removePod(p.Pod); err != nil {
   257  				return nil, 0, false, err
   258  			}
   259  		}
   260  	}
   261  	sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
   262  
   263  	// If the new pod does not fit after removing all the lower priority pods,
   264  	// we are almost done and this node is not suitable for preemption.
   265  	status := f.runPredicate(pod, nodeInfoCopy)
   266  	if status.IsSuccess() {
   267  		// pass
   268  	} else if status.IsRejected() {
   269  		// does not fit
   270  		return nil, 0, false, nil
   271  	} else {
   272  		// internal errors
   273  		return nil, 0, false, status.AsError()
   274  	}
   275  
   276  	var victims []*v1.Pod
   277  
   278  	// TODO(harry): handle PDBs in the future.
   279  	numViolatingVictim := 0
   280  
   281  	reprievePod := func(p *v1.Pod) bool {
   282  		addPod(p)
   283  		status := f.runPredicate(pod, nodeInfoCopy)
   284  		if !status.IsSuccess() {
   285  			if err := removePod(p); err != nil {
   286  				return false
   287  			}
   288  			victims = append(victims, p)
   289  		}
   290  		return status.IsSuccess()
   291  	}
   292  
   293  	// For now, assume all potential victims to be non-violating.
   294  	// Now we try to reprieve non-violating victims.
   295  	for _, p := range potentialVictims {
   296  		reprievePod(p)
   297  	}
   298  
   299  	return victims, numViolatingVictim, true, nil
   300  }
   301  
   302  // runPredicate run predicates of extender one by one for given pod and node.
   303  // Returns: fits or not.
   304  func (f *FakeExtender) runPredicate(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
   305  	for _, predicate := range f.Predicates {
   306  		status := predicate(pod, node)
   307  		if !status.IsSuccess() {
   308  			return status
   309  		}
   310  	}
   311  	return framework.NewStatus(framework.Success)
   312  }
   313  
   314  // Filter implements the extender Filter function.
   315  func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*framework.NodeInfo) ([]*framework.NodeInfo, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
   316  	var filtered []*framework.NodeInfo
   317  	failedNodesMap := extenderv1.FailedNodesMap{}
   318  	failedAndUnresolvableMap := extenderv1.FailedNodesMap{}
   319  	for _, node := range nodes {
   320  		status := f.runPredicate(pod, node)
   321  		if status.IsSuccess() {
   322  			filtered = append(filtered, node)
   323  		} else if status.Code() == framework.Unschedulable {
   324  			failedNodesMap[node.Node().Name] = fmt.Sprintf("FakeExtender: node %q failed", node.Node().Name)
   325  		} else if status.Code() == framework.UnschedulableAndUnresolvable {
   326  			failedAndUnresolvableMap[node.Node().Name] = fmt.Sprintf("FakeExtender: node %q failed and unresolvable", node.Node().Name)
   327  		} else {
   328  			return nil, nil, nil, status.AsError()
   329  		}
   330  	}
   331  
   332  	f.FilteredNodes = filtered
   333  	if f.NodeCacheCapable {
   334  		return filtered, failedNodesMap, failedAndUnresolvableMap, nil
   335  	}
   336  	return filtered, failedNodesMap, failedAndUnresolvableMap, nil
   337  }
   338  
   339  // Prioritize implements the extender Prioritize function.
   340  func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*framework.NodeInfo) (*extenderv1.HostPriorityList, int64, error) {
   341  	result := extenderv1.HostPriorityList{}
   342  	combinedScores := map[string]int64{}
   343  	for _, prioritizer := range f.Prioritizers {
   344  		weight := prioritizer.Weight
   345  		if weight == 0 {
   346  			continue
   347  		}
   348  		priorityFunc := prioritizer.Function
   349  		prioritizedList, err := priorityFunc(pod, nodes)
   350  		if err != nil {
   351  			return &extenderv1.HostPriorityList{}, 0, err
   352  		}
   353  		for _, hostEntry := range *prioritizedList {
   354  			combinedScores[hostEntry.Name] += hostEntry.Score * weight
   355  		}
   356  	}
   357  	for host, score := range combinedScores {
   358  		result = append(result, extenderv1.HostPriority{Host: host, Score: score})
   359  	}
   360  	return &result, f.Weight, nil
   361  }
   362  
   363  // Bind implements the extender Bind function.
   364  func (f *FakeExtender) Bind(binding *v1.Binding) error {
   365  	if f.Binder != nil {
   366  		return f.Binder()
   367  	}
   368  	if len(f.FilteredNodes) != 0 {
   369  		for _, node := range f.FilteredNodes {
   370  			if node.Node().Name == binding.Target.Name {
   371  				f.FilteredNodes = nil
   372  				return nil
   373  			}
   374  		}
   375  		err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.FilteredNodes)
   376  		f.FilteredNodes = nil
   377  		return err
   378  	}
   379  	return nil
   380  }
   381  
   382  // IsBinder returns true indicating the extender implements the Binder function.
   383  func (f *FakeExtender) IsBinder() bool {
   384  	return true
   385  }
   386  
   387  // IsPrioritizer returns true if there are any prioritizers.
   388  func (f *FakeExtender) IsPrioritizer() bool {
   389  	return len(f.Prioritizers) > 0
   390  }
   391  
   392  // IsFilter returns true if there are any filters.
   393  func (f *FakeExtender) IsFilter() bool {
   394  	return len(f.Predicates) > 0
   395  }
   396  
   397  // IsInterested returns a bool indicating whether this extender is interested in this Pod.
   398  func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
   399  	return !f.UnInterested
   400  }
   401  
   402  var _ framework.Extender = &FakeExtender{}
   403  

View as plain text