1
16
17 package extender
18
19
20
21 import (
22 "context"
23 "encoding/json"
24 "fmt"
25 "net/http"
26 "net/http/httptest"
27 "strings"
28 "testing"
29 "time"
30
31 v1 "k8s.io/api/core/v1"
32 "k8s.io/apimachinery/pkg/api/resource"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/util/wait"
35 clientset "k8s.io/client-go/kubernetes"
36 extenderv1 "k8s.io/kube-scheduler/extender/v1"
37 "k8s.io/kubernetes/pkg/scheduler"
38 schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
39 testutils "k8s.io/kubernetes/test/integration/util"
40 imageutils "k8s.io/kubernetes/test/utils/image"
41 )
42
43
44 var (
45 createNode = testutils.CreateNode
46 )
47
48 const (
49 filter = "filter"
50 prioritize = "prioritize"
51 bind = "bind"
52 extendedResourceName = "foo.com/bar"
53 )
54
55 type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
56 type priorityFunc func(pod *v1.Pod, nodes *v1.NodeList) (*extenderv1.HostPriorityList, error)
57
58 type priorityConfig struct {
59 function priorityFunc
60 weight int64
61 }
62
63 type Extender struct {
64 name string
65 predicates []fitPredicate
66 prioritizers []priorityConfig
67 nodeCacheCapable bool
68 Client clientset.Interface
69 }
70
71 func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) {
72 decoder := json.NewDecoder(req.Body)
73 defer req.Body.Close()
74
75 encoder := json.NewEncoder(w)
76
77 if strings.Contains(req.URL.Path, filter) || strings.Contains(req.URL.Path, prioritize) {
78 var args extenderv1.ExtenderArgs
79
80 if err := decoder.Decode(&args); err != nil {
81 http.Error(w, "Decode error", http.StatusBadRequest)
82 return
83 }
84
85 if strings.Contains(req.URL.Path, filter) {
86 resp, err := e.Filter(&args)
87 if err != nil {
88 resp.Error = err.Error()
89 }
90
91 if err := encoder.Encode(resp); err != nil {
92 t.Fatalf("Failed to encode %v", resp)
93 }
94 } else if strings.Contains(req.URL.Path, prioritize) {
95
96
97 priorities, _ := e.Prioritize(&args)
98
99 if err := encoder.Encode(priorities); err != nil {
100 t.Fatalf("Failed to encode %+v", priorities)
101 }
102 }
103 } else if strings.Contains(req.URL.Path, bind) {
104 var args extenderv1.ExtenderBindingArgs
105
106 if err := decoder.Decode(&args); err != nil {
107 http.Error(w, "Decode error", http.StatusBadRequest)
108 return
109 }
110
111 resp := &extenderv1.ExtenderBindingResult{}
112
113 if err := e.Bind(&args); err != nil {
114 resp.Error = err.Error()
115 }
116
117 if err := encoder.Encode(resp); err != nil {
118 t.Fatalf("Failed to encode %+v", resp)
119 }
120 } else {
121 http.Error(w, "Unknown method", http.StatusNotFound)
122 }
123 }
124
125 func (e *Extender) filterUsingNodeCache(args *extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
126 nodeSlice := make([]string, 0)
127 failedNodesMap := extenderv1.FailedNodesMap{}
128 for _, nodeName := range *args.NodeNames {
129 fits := true
130 for _, predicate := range e.predicates {
131 fit, err := predicate(args.Pod,
132 &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
133 if err != nil {
134 return &extenderv1.ExtenderFilterResult{
135 Nodes: nil,
136 NodeNames: nil,
137 FailedNodes: extenderv1.FailedNodesMap{},
138 Error: err.Error(),
139 }, err
140 }
141 if !fit {
142 fits = false
143 break
144 }
145 }
146 if fits {
147 nodeSlice = append(nodeSlice, nodeName)
148 } else {
149 failedNodesMap[nodeName] = fmt.Sprintf("extender failed: %s", e.name)
150 }
151 }
152
153 return &extenderv1.ExtenderFilterResult{
154 Nodes: nil,
155 NodeNames: &nodeSlice,
156 FailedNodes: failedNodesMap,
157 }, nil
158 }
159
160 func (e *Extender) Filter(args *extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
161 filtered := []v1.Node{}
162 failedNodesMap := extenderv1.FailedNodesMap{}
163
164 if e.nodeCacheCapable {
165 return e.filterUsingNodeCache(args)
166 }
167
168 for _, node := range args.Nodes.Items {
169 fits := true
170 for _, predicate := range e.predicates {
171 fit, err := predicate(args.Pod, &node)
172 if err != nil {
173 return &extenderv1.ExtenderFilterResult{
174 Nodes: &v1.NodeList{},
175 NodeNames: nil,
176 FailedNodes: extenderv1.FailedNodesMap{},
177 Error: err.Error(),
178 }, err
179 }
180 if !fit {
181 fits = false
182 break
183 }
184 }
185 if fits {
186 filtered = append(filtered, node)
187 } else {
188 failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name)
189 }
190 }
191
192 return &extenderv1.ExtenderFilterResult{
193 Nodes: &v1.NodeList{Items: filtered},
194 NodeNames: nil,
195 FailedNodes: failedNodesMap,
196 }, nil
197 }
198
199 func (e *Extender) Prioritize(args *extenderv1.ExtenderArgs) (*extenderv1.HostPriorityList, error) {
200 result := extenderv1.HostPriorityList{}
201 combinedScores := map[string]int64{}
202 var nodes = &v1.NodeList{Items: []v1.Node{}}
203
204 if e.nodeCacheCapable {
205 for _, nodeName := range *args.NodeNames {
206 nodes.Items = append(nodes.Items, v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
207 }
208 } else {
209 nodes = args.Nodes
210 }
211
212 for _, prioritizer := range e.prioritizers {
213 weight := prioritizer.weight
214 if weight == 0 {
215 continue
216 }
217 priorityFunc := prioritizer.function
218 prioritizedList, err := priorityFunc(args.Pod, nodes)
219 if err != nil {
220 return &extenderv1.HostPriorityList{}, err
221 }
222 for _, hostEntry := range *prioritizedList {
223 combinedScores[hostEntry.Host] += hostEntry.Score * weight
224 }
225 }
226 for host, score := range combinedScores {
227 result = append(result, extenderv1.HostPriority{Host: host, Score: score})
228 }
229 return &result, nil
230 }
231
232 func (e *Extender) Bind(binding *extenderv1.ExtenderBindingArgs) error {
233 b := &v1.Binding{
234 ObjectMeta: metav1.ObjectMeta{Namespace: binding.PodNamespace, Name: binding.PodName, UID: binding.PodUID},
235 Target: v1.ObjectReference{
236 Kind: "Node",
237 Name: binding.Node,
238 },
239 }
240
241 return e.Client.CoreV1().Pods(b.Namespace).Bind(context.TODO(), b, metav1.CreateOptions{})
242 }
243
244 func machine1_2_3Predicate(pod *v1.Pod, node *v1.Node) (bool, error) {
245 if node.Name == "machine1" || node.Name == "machine2" || node.Name == "machine3" {
246 return true, nil
247 }
248 return false, nil
249 }
250
251 func machine2_3_5Predicate(pod *v1.Pod, node *v1.Node) (bool, error) {
252 if node.Name == "machine2" || node.Name == "machine3" || node.Name == "machine5" {
253 return true, nil
254 }
255 return false, nil
256 }
257
258 func machine2Prioritizer(pod *v1.Pod, nodes *v1.NodeList) (*extenderv1.HostPriorityList, error) {
259 result := extenderv1.HostPriorityList{}
260 for _, node := range nodes.Items {
261 score := 1
262 if node.Name == "machine2" {
263 score = 10
264 }
265 result = append(result, extenderv1.HostPriority{
266 Host: node.Name,
267 Score: int64(score),
268 })
269 }
270 return &result, nil
271 }
272
273 func machine3Prioritizer(pod *v1.Pod, nodes *v1.NodeList) (*extenderv1.HostPriorityList, error) {
274 result := extenderv1.HostPriorityList{}
275 for _, node := range nodes.Items {
276 score := 1
277 if node.Name == "machine3" {
278 score = 10
279 }
280 result = append(result, extenderv1.HostPriority{
281 Host: node.Name,
282 Score: int64(score),
283 })
284 }
285 return &result, nil
286 }
287
288 func TestSchedulerExtender(t *testing.T) {
289 testCtx := testutils.InitTestAPIServer(t, "scheduler-extender", nil)
290 clientSet := testCtx.ClientSet
291
292 extender1 := &Extender{
293 name: "extender1",
294 predicates: []fitPredicate{machine1_2_3Predicate},
295 prioritizers: []priorityConfig{{machine2Prioritizer, 1}},
296 }
297 es1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
298 extender1.serveHTTP(t, w, req)
299 }))
300 defer es1.Close()
301
302 extender2 := &Extender{
303 name: "extender2",
304 predicates: []fitPredicate{machine2_3_5Predicate},
305 prioritizers: []priorityConfig{{machine3Prioritizer, 1}},
306 Client: clientSet,
307 }
308 es2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
309 extender2.serveHTTP(t, w, req)
310 }))
311 defer es2.Close()
312
313 extender3 := &Extender{
314 name: "extender3",
315 predicates: []fitPredicate{machine1_2_3Predicate},
316 prioritizers: []priorityConfig{{machine2Prioritizer, 5}},
317 nodeCacheCapable: true,
318 }
319 es3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
320 extender3.serveHTTP(t, w, req)
321 }))
322 defer es3.Close()
323
324 extenders := []schedulerapi.Extender{
325 {
326 URLPrefix: es1.URL,
327 FilterVerb: filter,
328 PrioritizeVerb: prioritize,
329 Weight: 3,
330 EnableHTTPS: false,
331 },
332 {
333 URLPrefix: es2.URL,
334 FilterVerb: filter,
335 PrioritizeVerb: prioritize,
336 BindVerb: bind,
337 Weight: 4,
338 EnableHTTPS: false,
339 ManagedResources: []schedulerapi.ExtenderManagedResource{
340 {
341 Name: extendedResourceName,
342 IgnoredByScheduler: true,
343 },
344 },
345 },
346 {
347 URLPrefix: es3.URL,
348 FilterVerb: filter,
349 PrioritizeVerb: prioritize,
350 Weight: 10,
351 EnableHTTPS: false,
352 NodeCacheCapable: true,
353 },
354 }
355
356 testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, 0, scheduler.WithExtenders(extenders...))
357 testutils.SyncSchedulerInformerFactory(testCtx)
358 go testCtx.Scheduler.Run(testCtx.Ctx)
359
360 DoTestPodScheduling(testCtx.NS, t, clientSet)
361 }
362
363 func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) {
364
365
366 defer cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{})
367
368 goodCondition := v1.NodeCondition{
369 Type: v1.NodeReady,
370 Status: v1.ConditionTrue,
371 Reason: fmt.Sprintf("schedulable condition"),
372 LastHeartbeatTime: metav1.Time{Time: time.Now()},
373 }
374 node := &v1.Node{
375 Spec: v1.NodeSpec{Unschedulable: false},
376 Status: v1.NodeStatus{
377 Capacity: v1.ResourceList{
378 v1.ResourcePods: *resource.NewQuantity(32, resource.DecimalSI),
379 },
380 Conditions: []v1.NodeCondition{goodCondition},
381 },
382 }
383
384 for ii := 0; ii < 5; ii++ {
385 node.Name = fmt.Sprintf("machine%d", ii+1)
386 if _, err := createNode(cs, node); err != nil {
387 t.Fatalf("Failed to create nodes: %v", err)
388 }
389 }
390
391 pod := &v1.Pod{
392 ObjectMeta: metav1.ObjectMeta{Name: "extender-test-pod"},
393 Spec: v1.PodSpec{
394 Containers: []v1.Container{
395 {
396 Name: "container",
397 Image: imageutils.GetPauseImageName(),
398 Resources: v1.ResourceRequirements{
399 Limits: v1.ResourceList{
400 extendedResourceName: *resource.NewQuantity(1, resource.DecimalSI),
401 },
402 },
403 },
404 },
405 },
406 }
407
408 myPod, err := cs.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{})
409 if err != nil {
410 t.Fatalf("Failed to create pod: %v", err)
411 }
412
413 err = wait.PollUntilContextTimeout(context.TODO(), time.Second, wait.ForeverTestTimeout, false,
414 testutils.PodScheduled(cs, myPod.Namespace, myPod.Name))
415 if err != nil {
416 t.Fatalf("Failed to schedule pod: %v", err)
417 }
418
419 myPod, err = cs.CoreV1().Pods(ns.Name).Get(context.TODO(), myPod.Name, metav1.GetOptions{})
420 if err != nil {
421 t.Fatalf("Failed to get pod: %v", err)
422 } else if myPod.Spec.NodeName != "machine2" {
423 t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName)
424 }
425 var gracePeriod int64
426 if err := cs.CoreV1().Pods(ns.Name).Delete(context.TODO(), myPod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}); err != nil {
427 t.Fatalf("Failed to delete pod: %v", err)
428 }
429 _, err = cs.CoreV1().Pods(ns.Name).Get(context.TODO(), myPod.Name, metav1.GetOptions{})
430 if err == nil {
431 t.Fatalf("Failed to delete pod: %v", err)
432 }
433 t.Logf("Scheduled pod using extenders")
434 }
435
View as plain text