...

Source file src/k8s.io/kubernetes/pkg/scheduler/extender.go

Documentation: k8s.io/kubernetes/pkg/scheduler

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"bytes"
    21  	"encoding/json"
    22  	"fmt"
    23  	"net/http"
    24  	"strings"
    25  	"time"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	utilnet "k8s.io/apimachinery/pkg/util/net"
    29  	"k8s.io/apimachinery/pkg/util/sets"
    30  	restclient "k8s.io/client-go/rest"
    31  	extenderv1 "k8s.io/kube-scheduler/extender/v1"
    32  	schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
    33  	"k8s.io/kubernetes/pkg/scheduler/framework"
    34  )
    35  
    36  const (
    37  	// DefaultExtenderTimeout defines the default extender timeout in second.
    38  	DefaultExtenderTimeout = 5 * time.Second
    39  )
    40  
    41  // HTTPExtender implements the Extender interface.
    42  type HTTPExtender struct {
    43  	extenderURL      string
    44  	preemptVerb      string
    45  	filterVerb       string
    46  	prioritizeVerb   string
    47  	bindVerb         string
    48  	weight           int64
    49  	client           *http.Client
    50  	nodeCacheCapable bool
    51  	managedResources sets.Set[string]
    52  	ignorable        bool
    53  }
    54  
    55  func makeTransport(config *schedulerapi.Extender) (http.RoundTripper, error) {
    56  	var cfg restclient.Config
    57  	if config.TLSConfig != nil {
    58  		cfg.TLSClientConfig.Insecure = config.TLSConfig.Insecure
    59  		cfg.TLSClientConfig.ServerName = config.TLSConfig.ServerName
    60  		cfg.TLSClientConfig.CertFile = config.TLSConfig.CertFile
    61  		cfg.TLSClientConfig.KeyFile = config.TLSConfig.KeyFile
    62  		cfg.TLSClientConfig.CAFile = config.TLSConfig.CAFile
    63  		cfg.TLSClientConfig.CertData = config.TLSConfig.CertData
    64  		cfg.TLSClientConfig.KeyData = config.TLSConfig.KeyData
    65  		cfg.TLSClientConfig.CAData = config.TLSConfig.CAData
    66  	}
    67  	if config.EnableHTTPS {
    68  		hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0
    69  		if !hasCA {
    70  			cfg.Insecure = true
    71  		}
    72  	}
    73  	tlsConfig, err := restclient.TLSConfigFor(&cfg)
    74  	if err != nil {
    75  		return nil, err
    76  	}
    77  	if tlsConfig != nil {
    78  		return utilnet.SetTransportDefaults(&http.Transport{
    79  			TLSClientConfig: tlsConfig,
    80  		}), nil
    81  	}
    82  	return utilnet.SetTransportDefaults(&http.Transport{}), nil
    83  }
    84  
    85  // NewHTTPExtender creates an HTTPExtender object.
    86  func NewHTTPExtender(config *schedulerapi.Extender) (framework.Extender, error) {
    87  	if config.HTTPTimeout.Duration.Nanoseconds() == 0 {
    88  		config.HTTPTimeout.Duration = time.Duration(DefaultExtenderTimeout)
    89  	}
    90  
    91  	transport, err := makeTransport(config)
    92  	if err != nil {
    93  		return nil, err
    94  	}
    95  	client := &http.Client{
    96  		Transport: transport,
    97  		Timeout:   config.HTTPTimeout.Duration,
    98  	}
    99  	managedResources := sets.New[string]()
   100  	for _, r := range config.ManagedResources {
   101  		managedResources.Insert(string(r.Name))
   102  	}
   103  	return &HTTPExtender{
   104  		extenderURL:      config.URLPrefix,
   105  		preemptVerb:      config.PreemptVerb,
   106  		filterVerb:       config.FilterVerb,
   107  		prioritizeVerb:   config.PrioritizeVerb,
   108  		bindVerb:         config.BindVerb,
   109  		weight:           config.Weight,
   110  		client:           client,
   111  		nodeCacheCapable: config.NodeCacheCapable,
   112  		managedResources: managedResources,
   113  		ignorable:        config.Ignorable,
   114  	}, nil
   115  }
   116  
   117  // Name returns extenderURL to identify the extender.
   118  func (h *HTTPExtender) Name() string {
   119  	return h.extenderURL
   120  }
   121  
   122  // IsIgnorable returns true indicates scheduling should not fail when this extender
   123  // is unavailable
   124  func (h *HTTPExtender) IsIgnorable() bool {
   125  	return h.ignorable
   126  }
   127  
   128  // SupportsPreemption returns true if an extender supports preemption.
   129  // An extender should have preempt verb defined and enabled its own node cache.
   130  func (h *HTTPExtender) SupportsPreemption() bool {
   131  	return len(h.preemptVerb) > 0
   132  }
   133  
   134  // ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
   135  func (h *HTTPExtender) ProcessPreemption(
   136  	pod *v1.Pod,
   137  	nodeNameToVictims map[string]*extenderv1.Victims,
   138  	nodeInfos framework.NodeInfoLister,
   139  ) (map[string]*extenderv1.Victims, error) {
   140  	var (
   141  		result extenderv1.ExtenderPreemptionResult
   142  		args   *extenderv1.ExtenderPreemptionArgs
   143  	)
   144  
   145  	if !h.SupportsPreemption() {
   146  		return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL)
   147  	}
   148  
   149  	if h.nodeCacheCapable {
   150  		// If extender has cached node info, pass NodeNameToMetaVictims in args.
   151  		nodeNameToMetaVictims := convertToMetaVictims(nodeNameToVictims)
   152  		args = &extenderv1.ExtenderPreemptionArgs{
   153  			Pod:                   pod,
   154  			NodeNameToMetaVictims: nodeNameToMetaVictims,
   155  		}
   156  	} else {
   157  		args = &extenderv1.ExtenderPreemptionArgs{
   158  			Pod:               pod,
   159  			NodeNameToVictims: nodeNameToVictims,
   160  		}
   161  	}
   162  
   163  	if err := h.send(h.preemptVerb, args, &result); err != nil {
   164  		return nil, err
   165  	}
   166  
   167  	// Extender will always return NodeNameToMetaVictims.
   168  	// So let's convert it to NodeNameToVictims by using <nodeInfos>.
   169  	newNodeNameToVictims, err := h.convertToVictims(result.NodeNameToMetaVictims, nodeInfos)
   170  	if err != nil {
   171  		return nil, err
   172  	}
   173  	// Do not override <nodeNameToVictims>.
   174  	return newNodeNameToVictims, nil
   175  }
   176  
   177  // convertToVictims converts "nodeNameToMetaVictims" from object identifiers,
   178  // such as UIDs and names, to object pointers.
   179  func (h *HTTPExtender) convertToVictims(
   180  	nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
   181  	nodeInfos framework.NodeInfoLister,
   182  ) (map[string]*extenderv1.Victims, error) {
   183  	nodeNameToVictims := map[string]*extenderv1.Victims{}
   184  	for nodeName, metaVictims := range nodeNameToMetaVictims {
   185  		nodeInfo, err := nodeInfos.Get(nodeName)
   186  		if err != nil {
   187  			return nil, err
   188  		}
   189  		victims := &extenderv1.Victims{
   190  			Pods:             []*v1.Pod{},
   191  			NumPDBViolations: metaVictims.NumPDBViolations,
   192  		}
   193  		for _, metaPod := range metaVictims.Pods {
   194  			pod, err := h.convertPodUIDToPod(metaPod, nodeInfo)
   195  			if err != nil {
   196  				return nil, err
   197  			}
   198  			victims.Pods = append(victims.Pods, pod)
   199  		}
   200  		nodeNameToVictims[nodeName] = victims
   201  	}
   202  	return nodeNameToVictims, nil
   203  }
   204  
   205  // convertPodUIDToPod returns v1.Pod object for given MetaPod and node info.
   206  // The v1.Pod object is restored by nodeInfo.Pods().
   207  // It returns an error if there's cache inconsistency between default scheduler
   208  // and extender, i.e. when the pod is not found in nodeInfo.Pods.
   209  func (h *HTTPExtender) convertPodUIDToPod(
   210  	metaPod *extenderv1.MetaPod,
   211  	nodeInfo *framework.NodeInfo) (*v1.Pod, error) {
   212  	for _, p := range nodeInfo.Pods {
   213  		if string(p.Pod.UID) == metaPod.UID {
   214  			return p.Pod, nil
   215  		}
   216  	}
   217  	return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
   218  		h.extenderURL, metaPod, nodeInfo.Node().Name)
   219  }
   220  
   221  // convertToMetaVictims converts from struct type to meta types.
   222  func convertToMetaVictims(
   223  	nodeNameToVictims map[string]*extenderv1.Victims,
   224  ) map[string]*extenderv1.MetaVictims {
   225  	nodeNameToMetaVictims := map[string]*extenderv1.MetaVictims{}
   226  	for node, victims := range nodeNameToVictims {
   227  		metaVictims := &extenderv1.MetaVictims{
   228  			Pods:             []*extenderv1.MetaPod{},
   229  			NumPDBViolations: victims.NumPDBViolations,
   230  		}
   231  		for _, pod := range victims.Pods {
   232  			metaPod := &extenderv1.MetaPod{
   233  				UID: string(pod.UID),
   234  			}
   235  			metaVictims.Pods = append(metaVictims.Pods, metaPod)
   236  		}
   237  		nodeNameToMetaVictims[node] = metaVictims
   238  	}
   239  	return nodeNameToMetaVictims
   240  }
   241  
   242  // Filter based on extender implemented predicate functions. The filtered list is
   243  // expected to be a subset of the supplied list; otherwise the function returns an error.
   244  // The failedNodes and failedAndUnresolvableNodes optionally contains the list
   245  // of failed nodes and failure reasons, except nodes in the latter are
   246  // unresolvable.
   247  func (h *HTTPExtender) Filter(
   248  	pod *v1.Pod,
   249  	nodes []*framework.NodeInfo,
   250  ) (filteredList []*framework.NodeInfo, failedNodes, failedAndUnresolvableNodes extenderv1.FailedNodesMap, err error) {
   251  	var (
   252  		result     extenderv1.ExtenderFilterResult
   253  		nodeList   *v1.NodeList
   254  		nodeNames  *[]string
   255  		nodeResult []*framework.NodeInfo
   256  		args       *extenderv1.ExtenderArgs
   257  	)
   258  	fromNodeName := make(map[string]*framework.NodeInfo)
   259  	for _, n := range nodes {
   260  		fromNodeName[n.Node().Name] = n
   261  	}
   262  
   263  	if h.filterVerb == "" {
   264  		return nodes, extenderv1.FailedNodesMap{}, extenderv1.FailedNodesMap{}, nil
   265  	}
   266  
   267  	if h.nodeCacheCapable {
   268  		nodeNameSlice := make([]string, 0, len(nodes))
   269  		for _, node := range nodes {
   270  			nodeNameSlice = append(nodeNameSlice, node.Node().Name)
   271  		}
   272  		nodeNames = &nodeNameSlice
   273  	} else {
   274  		nodeList = &v1.NodeList{}
   275  		for _, node := range nodes {
   276  			nodeList.Items = append(nodeList.Items, *node.Node())
   277  		}
   278  	}
   279  
   280  	args = &extenderv1.ExtenderArgs{
   281  		Pod:       pod,
   282  		Nodes:     nodeList,
   283  		NodeNames: nodeNames,
   284  	}
   285  
   286  	if err := h.send(h.filterVerb, args, &result); err != nil {
   287  		return nil, nil, nil, err
   288  	}
   289  	if result.Error != "" {
   290  		return nil, nil, nil, fmt.Errorf(result.Error)
   291  	}
   292  
   293  	if h.nodeCacheCapable && result.NodeNames != nil {
   294  		nodeResult = make([]*framework.NodeInfo, len(*result.NodeNames))
   295  		for i, nodeName := range *result.NodeNames {
   296  			if n, ok := fromNodeName[nodeName]; ok {
   297  				nodeResult[i] = n
   298  			} else {
   299  				return nil, nil, nil, fmt.Errorf(
   300  					"extender %q claims a filtered node %q which is not found in the input node list",
   301  					h.extenderURL, nodeName)
   302  			}
   303  		}
   304  	} else if result.Nodes != nil {
   305  		nodeResult = make([]*framework.NodeInfo, len(result.Nodes.Items))
   306  		for i := range result.Nodes.Items {
   307  			nodeResult[i] = framework.NewNodeInfo()
   308  			nodeResult[i].SetNode(&result.Nodes.Items[i])
   309  		}
   310  	}
   311  
   312  	return nodeResult, result.FailedNodes, result.FailedAndUnresolvableNodes, nil
   313  }
   314  
   315  // Prioritize based on extender implemented priority functions. Weight*priority is added
   316  // up for each such priority function. The returned score is added to the score computed
   317  // by Kubernetes scheduler. The total score is used to do the host selection.
   318  func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*framework.NodeInfo) (*extenderv1.HostPriorityList, int64, error) {
   319  	var (
   320  		result    extenderv1.HostPriorityList
   321  		nodeList  *v1.NodeList
   322  		nodeNames *[]string
   323  		args      *extenderv1.ExtenderArgs
   324  	)
   325  
   326  	if h.prioritizeVerb == "" {
   327  		result := extenderv1.HostPriorityList{}
   328  		for _, node := range nodes {
   329  			result = append(result, extenderv1.HostPriority{Host: node.Node().Name, Score: 0})
   330  		}
   331  		return &result, 0, nil
   332  	}
   333  
   334  	if h.nodeCacheCapable {
   335  		nodeNameSlice := make([]string, 0, len(nodes))
   336  		for _, node := range nodes {
   337  			nodeNameSlice = append(nodeNameSlice, node.Node().Name)
   338  		}
   339  		nodeNames = &nodeNameSlice
   340  	} else {
   341  		nodeList = &v1.NodeList{}
   342  		for _, node := range nodes {
   343  			nodeList.Items = append(nodeList.Items, *node.Node())
   344  		}
   345  	}
   346  
   347  	args = &extenderv1.ExtenderArgs{
   348  		Pod:       pod,
   349  		Nodes:     nodeList,
   350  		NodeNames: nodeNames,
   351  	}
   352  
   353  	if err := h.send(h.prioritizeVerb, args, &result); err != nil {
   354  		return nil, 0, err
   355  	}
   356  	return &result, h.weight, nil
   357  }
   358  
   359  // Bind delegates the action of binding a pod to a node to the extender.
   360  func (h *HTTPExtender) Bind(binding *v1.Binding) error {
   361  	var result extenderv1.ExtenderBindingResult
   362  	if !h.IsBinder() {
   363  		// This shouldn't happen as this extender wouldn't have become a Binder.
   364  		return fmt.Errorf("unexpected empty bindVerb in extender")
   365  	}
   366  	req := &extenderv1.ExtenderBindingArgs{
   367  		PodName:      binding.Name,
   368  		PodNamespace: binding.Namespace,
   369  		PodUID:       binding.UID,
   370  		Node:         binding.Target.Name,
   371  	}
   372  	if err := h.send(h.bindVerb, req, &result); err != nil {
   373  		return err
   374  	}
   375  	if result.Error != "" {
   376  		return fmt.Errorf(result.Error)
   377  	}
   378  	return nil
   379  }
   380  
   381  // IsBinder returns whether this extender is configured for the Bind method.
   382  func (h *HTTPExtender) IsBinder() bool {
   383  	return h.bindVerb != ""
   384  }
   385  
   386  // IsPrioritizer returns whether this extender is configured for the Prioritize method.
   387  func (h *HTTPExtender) IsPrioritizer() bool {
   388  	return h.prioritizeVerb != ""
   389  }
   390  
   391  // IsFilter returns whether this extender is configured for the Filter method.
   392  func (h *HTTPExtender) IsFilter() bool {
   393  	return h.filterVerb != ""
   394  }
   395  
   396  // Helper function to send messages to the extender
   397  func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
   398  	out, err := json.Marshal(args)
   399  	if err != nil {
   400  		return err
   401  	}
   402  
   403  	url := strings.TrimRight(h.extenderURL, "/") + "/" + action
   404  
   405  	req, err := http.NewRequest("POST", url, bytes.NewReader(out))
   406  	if err != nil {
   407  		return err
   408  	}
   409  
   410  	req.Header.Set("Content-Type", "application/json")
   411  
   412  	resp, err := h.client.Do(req)
   413  	if err != nil {
   414  		return err
   415  	}
   416  	defer resp.Body.Close()
   417  
   418  	if resp.StatusCode != http.StatusOK {
   419  		return fmt.Errorf("failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
   420  	}
   421  
   422  	return json.NewDecoder(resp.Body).Decode(result)
   423  }
   424  
   425  // IsInterested returns true if at least one extended resource requested by
   426  // this pod is managed by this extender.
   427  func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
   428  	if h.managedResources.Len() == 0 {
   429  		return true
   430  	}
   431  	if h.hasManagedResources(pod.Spec.Containers) {
   432  		return true
   433  	}
   434  	if h.hasManagedResources(pod.Spec.InitContainers) {
   435  		return true
   436  	}
   437  	return false
   438  }
   439  
   440  func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
   441  	for i := range containers {
   442  		container := &containers[i]
   443  		for resourceName := range container.Resources.Requests {
   444  			if h.managedResources.Has(string(resourceName)) {
   445  				return true
   446  			}
   447  		}
   448  		for resourceName := range container.Resources.Limits {
   449  			if h.managedResources.Has(string(resourceName)) {
   450  				return true
   451  			}
   452  		}
   453  	}
   454  	return false
   455  }
   456  

View as plain text