1
16
17 package scheduler
18
19 import (
20 "bytes"
21 "encoding/json"
22 "fmt"
23 "net/http"
24 "strings"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 utilnet "k8s.io/apimachinery/pkg/util/net"
29 "k8s.io/apimachinery/pkg/util/sets"
30 restclient "k8s.io/client-go/rest"
31 extenderv1 "k8s.io/kube-scheduler/extender/v1"
32 schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
33 "k8s.io/kubernetes/pkg/scheduler/framework"
34 )
35
36 const (
37
38 DefaultExtenderTimeout = 5 * time.Second
39 )
40
41
42 type HTTPExtender struct {
43 extenderURL string
44 preemptVerb string
45 filterVerb string
46 prioritizeVerb string
47 bindVerb string
48 weight int64
49 client *http.Client
50 nodeCacheCapable bool
51 managedResources sets.Set[string]
52 ignorable bool
53 }
54
55 func makeTransport(config *schedulerapi.Extender) (http.RoundTripper, error) {
56 var cfg restclient.Config
57 if config.TLSConfig != nil {
58 cfg.TLSClientConfig.Insecure = config.TLSConfig.Insecure
59 cfg.TLSClientConfig.ServerName = config.TLSConfig.ServerName
60 cfg.TLSClientConfig.CertFile = config.TLSConfig.CertFile
61 cfg.TLSClientConfig.KeyFile = config.TLSConfig.KeyFile
62 cfg.TLSClientConfig.CAFile = config.TLSConfig.CAFile
63 cfg.TLSClientConfig.CertData = config.TLSConfig.CertData
64 cfg.TLSClientConfig.KeyData = config.TLSConfig.KeyData
65 cfg.TLSClientConfig.CAData = config.TLSConfig.CAData
66 }
67 if config.EnableHTTPS {
68 hasCA := len(cfg.CAFile) > 0 || len(cfg.CAData) > 0
69 if !hasCA {
70 cfg.Insecure = true
71 }
72 }
73 tlsConfig, err := restclient.TLSConfigFor(&cfg)
74 if err != nil {
75 return nil, err
76 }
77 if tlsConfig != nil {
78 return utilnet.SetTransportDefaults(&http.Transport{
79 TLSClientConfig: tlsConfig,
80 }), nil
81 }
82 return utilnet.SetTransportDefaults(&http.Transport{}), nil
83 }
84
85
86 func NewHTTPExtender(config *schedulerapi.Extender) (framework.Extender, error) {
87 if config.HTTPTimeout.Duration.Nanoseconds() == 0 {
88 config.HTTPTimeout.Duration = time.Duration(DefaultExtenderTimeout)
89 }
90
91 transport, err := makeTransport(config)
92 if err != nil {
93 return nil, err
94 }
95 client := &http.Client{
96 Transport: transport,
97 Timeout: config.HTTPTimeout.Duration,
98 }
99 managedResources := sets.New[string]()
100 for _, r := range config.ManagedResources {
101 managedResources.Insert(string(r.Name))
102 }
103 return &HTTPExtender{
104 extenderURL: config.URLPrefix,
105 preemptVerb: config.PreemptVerb,
106 filterVerb: config.FilterVerb,
107 prioritizeVerb: config.PrioritizeVerb,
108 bindVerb: config.BindVerb,
109 weight: config.Weight,
110 client: client,
111 nodeCacheCapable: config.NodeCacheCapable,
112 managedResources: managedResources,
113 ignorable: config.Ignorable,
114 }, nil
115 }
116
117
118 func (h *HTTPExtender) Name() string {
119 return h.extenderURL
120 }
121
122
123
124 func (h *HTTPExtender) IsIgnorable() bool {
125 return h.ignorable
126 }
127
128
129
130 func (h *HTTPExtender) SupportsPreemption() bool {
131 return len(h.preemptVerb) > 0
132 }
133
134
135 func (h *HTTPExtender) ProcessPreemption(
136 pod *v1.Pod,
137 nodeNameToVictims map[string]*extenderv1.Victims,
138 nodeInfos framework.NodeInfoLister,
139 ) (map[string]*extenderv1.Victims, error) {
140 var (
141 result extenderv1.ExtenderPreemptionResult
142 args *extenderv1.ExtenderPreemptionArgs
143 )
144
145 if !h.SupportsPreemption() {
146 return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL)
147 }
148
149 if h.nodeCacheCapable {
150
151 nodeNameToMetaVictims := convertToMetaVictims(nodeNameToVictims)
152 args = &extenderv1.ExtenderPreemptionArgs{
153 Pod: pod,
154 NodeNameToMetaVictims: nodeNameToMetaVictims,
155 }
156 } else {
157 args = &extenderv1.ExtenderPreemptionArgs{
158 Pod: pod,
159 NodeNameToVictims: nodeNameToVictims,
160 }
161 }
162
163 if err := h.send(h.preemptVerb, args, &result); err != nil {
164 return nil, err
165 }
166
167
168
169 newNodeNameToVictims, err := h.convertToVictims(result.NodeNameToMetaVictims, nodeInfos)
170 if err != nil {
171 return nil, err
172 }
173
174 return newNodeNameToVictims, nil
175 }
176
177
178
179 func (h *HTTPExtender) convertToVictims(
180 nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
181 nodeInfos framework.NodeInfoLister,
182 ) (map[string]*extenderv1.Victims, error) {
183 nodeNameToVictims := map[string]*extenderv1.Victims{}
184 for nodeName, metaVictims := range nodeNameToMetaVictims {
185 nodeInfo, err := nodeInfos.Get(nodeName)
186 if err != nil {
187 return nil, err
188 }
189 victims := &extenderv1.Victims{
190 Pods: []*v1.Pod{},
191 NumPDBViolations: metaVictims.NumPDBViolations,
192 }
193 for _, metaPod := range metaVictims.Pods {
194 pod, err := h.convertPodUIDToPod(metaPod, nodeInfo)
195 if err != nil {
196 return nil, err
197 }
198 victims.Pods = append(victims.Pods, pod)
199 }
200 nodeNameToVictims[nodeName] = victims
201 }
202 return nodeNameToVictims, nil
203 }
204
205
206
207
208
209 func (h *HTTPExtender) convertPodUIDToPod(
210 metaPod *extenderv1.MetaPod,
211 nodeInfo *framework.NodeInfo) (*v1.Pod, error) {
212 for _, p := range nodeInfo.Pods {
213 if string(p.Pod.UID) == metaPod.UID {
214 return p.Pod, nil
215 }
216 }
217 return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node",
218 h.extenderURL, metaPod, nodeInfo.Node().Name)
219 }
220
221
222 func convertToMetaVictims(
223 nodeNameToVictims map[string]*extenderv1.Victims,
224 ) map[string]*extenderv1.MetaVictims {
225 nodeNameToMetaVictims := map[string]*extenderv1.MetaVictims{}
226 for node, victims := range nodeNameToVictims {
227 metaVictims := &extenderv1.MetaVictims{
228 Pods: []*extenderv1.MetaPod{},
229 NumPDBViolations: victims.NumPDBViolations,
230 }
231 for _, pod := range victims.Pods {
232 metaPod := &extenderv1.MetaPod{
233 UID: string(pod.UID),
234 }
235 metaVictims.Pods = append(metaVictims.Pods, metaPod)
236 }
237 nodeNameToMetaVictims[node] = metaVictims
238 }
239 return nodeNameToMetaVictims
240 }
241
242
243
244
245
246
247 func (h *HTTPExtender) Filter(
248 pod *v1.Pod,
249 nodes []*framework.NodeInfo,
250 ) (filteredList []*framework.NodeInfo, failedNodes, failedAndUnresolvableNodes extenderv1.FailedNodesMap, err error) {
251 var (
252 result extenderv1.ExtenderFilterResult
253 nodeList *v1.NodeList
254 nodeNames *[]string
255 nodeResult []*framework.NodeInfo
256 args *extenderv1.ExtenderArgs
257 )
258 fromNodeName := make(map[string]*framework.NodeInfo)
259 for _, n := range nodes {
260 fromNodeName[n.Node().Name] = n
261 }
262
263 if h.filterVerb == "" {
264 return nodes, extenderv1.FailedNodesMap{}, extenderv1.FailedNodesMap{}, nil
265 }
266
267 if h.nodeCacheCapable {
268 nodeNameSlice := make([]string, 0, len(nodes))
269 for _, node := range nodes {
270 nodeNameSlice = append(nodeNameSlice, node.Node().Name)
271 }
272 nodeNames = &nodeNameSlice
273 } else {
274 nodeList = &v1.NodeList{}
275 for _, node := range nodes {
276 nodeList.Items = append(nodeList.Items, *node.Node())
277 }
278 }
279
280 args = &extenderv1.ExtenderArgs{
281 Pod: pod,
282 Nodes: nodeList,
283 NodeNames: nodeNames,
284 }
285
286 if err := h.send(h.filterVerb, args, &result); err != nil {
287 return nil, nil, nil, err
288 }
289 if result.Error != "" {
290 return nil, nil, nil, fmt.Errorf(result.Error)
291 }
292
293 if h.nodeCacheCapable && result.NodeNames != nil {
294 nodeResult = make([]*framework.NodeInfo, len(*result.NodeNames))
295 for i, nodeName := range *result.NodeNames {
296 if n, ok := fromNodeName[nodeName]; ok {
297 nodeResult[i] = n
298 } else {
299 return nil, nil, nil, fmt.Errorf(
300 "extender %q claims a filtered node %q which is not found in the input node list",
301 h.extenderURL, nodeName)
302 }
303 }
304 } else if result.Nodes != nil {
305 nodeResult = make([]*framework.NodeInfo, len(result.Nodes.Items))
306 for i := range result.Nodes.Items {
307 nodeResult[i] = framework.NewNodeInfo()
308 nodeResult[i].SetNode(&result.Nodes.Items[i])
309 }
310 }
311
312 return nodeResult, result.FailedNodes, result.FailedAndUnresolvableNodes, nil
313 }
314
315
316
317
318 func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*framework.NodeInfo) (*extenderv1.HostPriorityList, int64, error) {
319 var (
320 result extenderv1.HostPriorityList
321 nodeList *v1.NodeList
322 nodeNames *[]string
323 args *extenderv1.ExtenderArgs
324 )
325
326 if h.prioritizeVerb == "" {
327 result := extenderv1.HostPriorityList{}
328 for _, node := range nodes {
329 result = append(result, extenderv1.HostPriority{Host: node.Node().Name, Score: 0})
330 }
331 return &result, 0, nil
332 }
333
334 if h.nodeCacheCapable {
335 nodeNameSlice := make([]string, 0, len(nodes))
336 for _, node := range nodes {
337 nodeNameSlice = append(nodeNameSlice, node.Node().Name)
338 }
339 nodeNames = &nodeNameSlice
340 } else {
341 nodeList = &v1.NodeList{}
342 for _, node := range nodes {
343 nodeList.Items = append(nodeList.Items, *node.Node())
344 }
345 }
346
347 args = &extenderv1.ExtenderArgs{
348 Pod: pod,
349 Nodes: nodeList,
350 NodeNames: nodeNames,
351 }
352
353 if err := h.send(h.prioritizeVerb, args, &result); err != nil {
354 return nil, 0, err
355 }
356 return &result, h.weight, nil
357 }
358
359
360 func (h *HTTPExtender) Bind(binding *v1.Binding) error {
361 var result extenderv1.ExtenderBindingResult
362 if !h.IsBinder() {
363
364 return fmt.Errorf("unexpected empty bindVerb in extender")
365 }
366 req := &extenderv1.ExtenderBindingArgs{
367 PodName: binding.Name,
368 PodNamespace: binding.Namespace,
369 PodUID: binding.UID,
370 Node: binding.Target.Name,
371 }
372 if err := h.send(h.bindVerb, req, &result); err != nil {
373 return err
374 }
375 if result.Error != "" {
376 return fmt.Errorf(result.Error)
377 }
378 return nil
379 }
380
381
382 func (h *HTTPExtender) IsBinder() bool {
383 return h.bindVerb != ""
384 }
385
386
387 func (h *HTTPExtender) IsPrioritizer() bool {
388 return h.prioritizeVerb != ""
389 }
390
391
392 func (h *HTTPExtender) IsFilter() bool {
393 return h.filterVerb != ""
394 }
395
396
397 func (h *HTTPExtender) send(action string, args interface{}, result interface{}) error {
398 out, err := json.Marshal(args)
399 if err != nil {
400 return err
401 }
402
403 url := strings.TrimRight(h.extenderURL, "/") + "/" + action
404
405 req, err := http.NewRequest("POST", url, bytes.NewReader(out))
406 if err != nil {
407 return err
408 }
409
410 req.Header.Set("Content-Type", "application/json")
411
412 resp, err := h.client.Do(req)
413 if err != nil {
414 return err
415 }
416 defer resp.Body.Close()
417
418 if resp.StatusCode != http.StatusOK {
419 return fmt.Errorf("failed %v with extender at URL %v, code %v", action, url, resp.StatusCode)
420 }
421
422 return json.NewDecoder(resp.Body).Decode(result)
423 }
424
425
426
427 func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
428 if h.managedResources.Len() == 0 {
429 return true
430 }
431 if h.hasManagedResources(pod.Spec.Containers) {
432 return true
433 }
434 if h.hasManagedResources(pod.Spec.InitContainers) {
435 return true
436 }
437 return false
438 }
439
440 func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
441 for i := range containers {
442 container := &containers[i]
443 for resourceName := range container.Resources.Requests {
444 if h.managedResources.Has(string(resourceName)) {
445 return true
446 }
447 }
448 for resourceName := range container.Resources.Limits {
449 if h.managedResources.Has(string(resourceName)) {
450 return true
451 }
452 }
453 }
454 return false
455 }
456
View as plain text