...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports/node_ports.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nodeports
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	"k8s.io/apimachinery/pkg/runtime"
    25  	"k8s.io/klog/v2"
    26  	"k8s.io/kubernetes/pkg/scheduler/framework"
    27  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    28  	"k8s.io/kubernetes/pkg/scheduler/util"
    29  )
    30  
    31  // NodePorts is a plugin that checks if a node has free ports for the requested pod ports.
    32  type NodePorts struct{}
    33  
    34  var _ framework.PreFilterPlugin = &NodePorts{}
    35  var _ framework.FilterPlugin = &NodePorts{}
    36  var _ framework.EnqueueExtensions = &NodePorts{}
    37  
    38  const (
    39  	// Name is the name of the plugin used in the plugin registry and configurations.
    40  	Name = names.NodePorts
    41  
    42  	// preFilterStateKey is the key in CycleState to NodePorts pre-computed data.
    43  	// Using the name of the plugin will likely help us avoid collisions with other plugins.
    44  	preFilterStateKey = "PreFilter" + Name
    45  
    46  	// ErrReason when node ports aren't available.
    47  	ErrReason = "node(s) didn't have free ports for the requested pod ports"
    48  )
    49  
    50  type preFilterState []*v1.ContainerPort
    51  
    52  // Clone the prefilter state.
    53  func (s preFilterState) Clone() framework.StateData {
    54  	// The state is not impacted by adding/removing existing pods, hence we don't need to make a deep copy.
    55  	return s
    56  }
    57  
    58  // Name returns name of the plugin. It is used in logs, etc.
    59  func (pl *NodePorts) Name() string {
    60  	return Name
    61  }
    62  
    63  // getContainerPorts returns the used host ports of Pods: if 'port' was used, a 'port:true' pair
    64  // will be in the result; but it does not resolve port conflict.
    65  func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
    66  	ports := []*v1.ContainerPort{}
    67  	for _, pod := range pods {
    68  		for j := range pod.Spec.Containers {
    69  			container := &pod.Spec.Containers[j]
    70  			for k := range container.Ports {
    71  				// Only return ports with a host port specified.
    72  				if container.Ports[k].HostPort <= 0 {
    73  					continue
    74  				}
    75  				ports = append(ports, &container.Ports[k])
    76  			}
    77  		}
    78  	}
    79  	return ports
    80  }
    81  
    82  // PreFilter invoked at the prefilter extension point.
    83  func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
    84  	s := getContainerPorts(pod)
    85  	// Skip if a pod has no ports.
    86  	if len(s) == 0 {
    87  		return nil, framework.NewStatus(framework.Skip)
    88  	}
    89  	cycleState.Write(preFilterStateKey, preFilterState(s))
    90  	return nil, nil
    91  }
    92  
    93  // PreFilterExtensions do not exist for this plugin.
    94  func (pl *NodePorts) PreFilterExtensions() framework.PreFilterExtensions {
    95  	return nil
    96  }
    97  
    98  func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) {
    99  	c, err := cycleState.Read(preFilterStateKey)
   100  	if err != nil {
   101  		// preFilterState doesn't exist, likely PreFilter wasn't invoked.
   102  		return nil, fmt.Errorf("reading %q from cycleState: %w", preFilterStateKey, err)
   103  	}
   104  
   105  	s, ok := c.(preFilterState)
   106  	if !ok {
   107  		return nil, fmt.Errorf("%+v  convert to nodeports.preFilterState error", c)
   108  	}
   109  	return s, nil
   110  }
   111  
   112  // EventsToRegister returns the possible events that may make a Pod
   113  // failed by this plugin schedulable.
   114  func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
   115  	return []framework.ClusterEventWithHint{
   116  		// Due to immutable fields `spec.containers[*].ports`, pod update events are ignored.
   117  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
   118  		// TODO(#110175): Ideally, it's supposed to register only NodeCreated, because NodeUpdated event never means to have any free ports for the Pod.
   119  		// But, we may miss NodeCreated event due to preCheck.
   120  		// See: https://github.com/kubernetes/kubernetes/issues/109437
   121  		// And, we can remove NodeUpdated event once https://github.com/kubernetes/kubernetes/issues/110175 is solved.
   122  		// We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens.
   123  		// (the same as Queue)
   124  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
   125  	}
   126  }
   127  
   128  // isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether
   129  // that change made a previously unschedulable pod schedulable.
   130  func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   131  	deletedPod, _, err := util.As[*v1.Pod](oldObj, nil)
   132  	if err != nil {
   133  		return framework.Queue, err
   134  	}
   135  
   136  	// If the deleted pod is unscheduled, it doesn't make the target pod schedulable.
   137  	if deletedPod.Spec.NodeName == "" {
   138  		logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", pod, "deletedPod", deletedPod)
   139  		return framework.QueueSkip, nil
   140  	}
   141  
   142  	// Get the used host ports of the deleted pod.
   143  	usedPorts := make(framework.HostPortInfo)
   144  	for _, container := range deletedPod.Spec.Containers {
   145  		for _, podPort := range container.Ports {
   146  			if podPort.HostPort > 0 {
   147  				usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
   148  			}
   149  		}
   150  	}
   151  
   152  	// If the deleted pod doesn't use any host ports, it doesn't make the target pod schedulable.
   153  	if len(usedPorts) == 0 {
   154  		return framework.QueueSkip, nil
   155  	}
   156  
   157  	// Construct a fake NodeInfo that only has the deleted Pod.
   158  	// If we can schedule `pod` to this fake node, it means that `pod` and the deleted pod don't have any common port(s).
   159  	// So, deleting that pod couldn't make `pod` schedulable.
   160  	nodeInfo := framework.NodeInfo{UsedPorts: usedPorts}
   161  	if Fits(pod, &nodeInfo) {
   162  		logger.V(4).Info("the deleted pod and the target pod don't have any common port(s), returning QueueSkip as deleting this Pod won't make the Pod schedulable", "pod", pod, "deletedPod", deletedPod)
   163  		return framework.QueueSkip, nil
   164  	}
   165  
   166  	logger.V(4).Info("the deleted pod and the target pod have any common port(s), returning Queue as deleting this Pod may make the Pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod))
   167  	return framework.Queue, nil
   168  }
   169  
   170  // Filter invoked at the filter extension point.
   171  func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   172  	wantPorts, err := getPreFilterState(cycleState)
   173  	if err != nil {
   174  		return framework.AsStatus(err)
   175  	}
   176  
   177  	fits := fitsPorts(wantPorts, nodeInfo)
   178  	if !fits {
   179  		return framework.NewStatus(framework.Unschedulable, ErrReason)
   180  	}
   181  
   182  	return nil
   183  }
   184  
   185  // Fits checks if the pod fits the node.
   186  func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
   187  	return fitsPorts(getContainerPorts(pod), nodeInfo)
   188  }
   189  
   190  func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
   191  	// try to see whether existingPorts and wantPorts will conflict or not
   192  	existingPorts := nodeInfo.UsedPorts
   193  	for _, cp := range wantPorts {
   194  		if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
   195  			return false
   196  		}
   197  	}
   198  	return true
   199  }
   200  
   201  // New initializes a new plugin and returns it.
   202  func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
   203  	return &NodePorts{}, nil
   204  }
   205  

View as plain text