1
16
17 package nodeports
18
19 import (
20 "context"
21 "fmt"
22
23 v1 "k8s.io/api/core/v1"
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/klog/v2"
26 "k8s.io/kubernetes/pkg/scheduler/framework"
27 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
28 "k8s.io/kubernetes/pkg/scheduler/util"
29 )
30
31
32 type NodePorts struct{}
33
34 var _ framework.PreFilterPlugin = &NodePorts{}
35 var _ framework.FilterPlugin = &NodePorts{}
36 var _ framework.EnqueueExtensions = &NodePorts{}
37
38 const (
39
40 Name = names.NodePorts
41
42
43
44 preFilterStateKey = "PreFilter" + Name
45
46
47 ErrReason = "node(s) didn't have free ports for the requested pod ports"
48 )
49
50 type preFilterState []*v1.ContainerPort
51
52
53 func (s preFilterState) Clone() framework.StateData {
54
55 return s
56 }
57
58
59 func (pl *NodePorts) Name() string {
60 return Name
61 }
62
63
64
65 func getContainerPorts(pods ...*v1.Pod) []*v1.ContainerPort {
66 ports := []*v1.ContainerPort{}
67 for _, pod := range pods {
68 for j := range pod.Spec.Containers {
69 container := &pod.Spec.Containers[j]
70 for k := range container.Ports {
71
72 if container.Ports[k].HostPort <= 0 {
73 continue
74 }
75 ports = append(ports, &container.Ports[k])
76 }
77 }
78 }
79 return ports
80 }
81
82
83 func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
84 s := getContainerPorts(pod)
85
86 if len(s) == 0 {
87 return nil, framework.NewStatus(framework.Skip)
88 }
89 cycleState.Write(preFilterStateKey, preFilterState(s))
90 return nil, nil
91 }
92
93
94 func (pl *NodePorts) PreFilterExtensions() framework.PreFilterExtensions {
95 return nil
96 }
97
98 func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) {
99 c, err := cycleState.Read(preFilterStateKey)
100 if err != nil {
101
102 return nil, fmt.Errorf("reading %q from cycleState: %w", preFilterStateKey, err)
103 }
104
105 s, ok := c.(preFilterState)
106 if !ok {
107 return nil, fmt.Errorf("%+v convert to nodeports.preFilterState error", c)
108 }
109 return s, nil
110 }
111
112
113
114 func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
115 return []framework.ClusterEventWithHint{
116
117 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
118
119
120
121
122
123
124 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
125 }
126 }
127
128
129
130 func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
131 deletedPod, _, err := util.As[*v1.Pod](oldObj, nil)
132 if err != nil {
133 return framework.Queue, err
134 }
135
136
137 if deletedPod.Spec.NodeName == "" {
138 logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", pod, "deletedPod", deletedPod)
139 return framework.QueueSkip, nil
140 }
141
142
143 usedPorts := make(framework.HostPortInfo)
144 for _, container := range deletedPod.Spec.Containers {
145 for _, podPort := range container.Ports {
146 if podPort.HostPort > 0 {
147 usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
148 }
149 }
150 }
151
152
153 if len(usedPorts) == 0 {
154 return framework.QueueSkip, nil
155 }
156
157
158
159
160 nodeInfo := framework.NodeInfo{UsedPorts: usedPorts}
161 if Fits(pod, &nodeInfo) {
162 logger.V(4).Info("the deleted pod and the target pod don't have any common port(s), returning QueueSkip as deleting this Pod won't make the Pod schedulable", "pod", pod, "deletedPod", deletedPod)
163 return framework.QueueSkip, nil
164 }
165
166 logger.V(4).Info("the deleted pod and the target pod have any common port(s), returning Queue as deleting this Pod may make the Pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod))
167 return framework.Queue, nil
168 }
169
170
171 func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
172 wantPorts, err := getPreFilterState(cycleState)
173 if err != nil {
174 return framework.AsStatus(err)
175 }
176
177 fits := fitsPorts(wantPorts, nodeInfo)
178 if !fits {
179 return framework.NewStatus(framework.Unschedulable, ErrReason)
180 }
181
182 return nil
183 }
184
185
186 func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
187 return fitsPorts(getContainerPorts(pod), nodeInfo)
188 }
189
190 func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool {
191
192 existingPorts := nodeInfo.UsedPorts
193 for _, cp := range wantPorts {
194 if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) {
195 return false
196 }
197 }
198 return true
199 }
200
201
202 func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
203 return &NodePorts{}, nil
204 }
205
View as plain text