1
16
17 package testutil
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "reflect"
25 "sync"
26 "testing"
27 "time"
28
29 v1 "k8s.io/api/core/v1"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 "k8s.io/apimachinery/pkg/api/resource"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/apimachinery/pkg/util/sets"
37 "k8s.io/apimachinery/pkg/util/strategicpatch"
38 "k8s.io/apimachinery/pkg/watch"
39 v1apply "k8s.io/client-go/applyconfigurations/core/v1"
40 "k8s.io/client-go/kubernetes/fake"
41 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
42 "k8s.io/client-go/tools/cache"
43 ref "k8s.io/client-go/tools/reference"
44 utilnode "k8s.io/component-helpers/node/topology"
45 "k8s.io/klog/v2"
46 "k8s.io/kubernetes/pkg/api/legacyscheme"
47 api "k8s.io/kubernetes/pkg/apis/core"
48 "k8s.io/utils/clock"
49 testingclock "k8s.io/utils/clock/testing"
50
51 jsonpatch "github.com/evanphx/json-patch"
52 )
53
54 var (
55 keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
56 )
57
58
59
60
61
62 type FakeNodeHandler struct {
63 *fake.Clientset
64
65
66 CreateHook func(*FakeNodeHandler, *v1.Node) bool
67 Existing []*v1.Node
68 AsyncCalls []func(*FakeNodeHandler)
69
70
71 CreatedNodes []*v1.Node
72 DeletedNodes []*v1.Node
73 UpdatedNodes []*v1.Node
74 UpdatedNodeStatuses []*v1.Node
75 RequestCount int
76
77
78 lock sync.Mutex
79 DeleteWaitChan chan struct{}
80 PatchWaitChan chan struct{}
81 }
82
83
84 type FakeLegacyHandler struct {
85 v1core.CoreV1Interface
86 n *FakeNodeHandler
87 }
88
89
90 func (m *FakeNodeHandler) GetUpdatedNodesCopy() []*v1.Node {
91 m.lock.Lock()
92 defer m.lock.Unlock()
93 updatedNodesCopy := make([]*v1.Node, len(m.UpdatedNodes), len(m.UpdatedNodes))
94 copy(updatedNodesCopy, m.UpdatedNodes)
95 return updatedNodesCopy
96 }
97
98
99 func (m *FakeNodeHandler) Core() v1core.CoreV1Interface {
100 return &FakeLegacyHandler{m.Clientset.CoreV1(), m}
101 }
102
103
104 func (m *FakeNodeHandler) CoreV1() v1core.CoreV1Interface {
105 return &FakeLegacyHandler{m.Clientset.CoreV1(), m}
106 }
107
108
109 func (m *FakeLegacyHandler) Nodes() v1core.NodeInterface {
110 return m.n
111 }
112
113
114 func (m *FakeNodeHandler) Create(_ context.Context, node *v1.Node, _ metav1.CreateOptions) (*v1.Node, error) {
115 m.lock.Lock()
116 defer func() {
117 m.RequestCount++
118 m.lock.Unlock()
119 }()
120 for _, n := range m.Existing {
121 if n.Name == node.Name {
122 return nil, apierrors.NewAlreadyExists(api.Resource("nodes"), node.Name)
123 }
124 }
125 if m.CreateHook == nil || m.CreateHook(m, node) {
126 nodeCopy := *node
127 m.CreatedNodes = append(m.CreatedNodes, &nodeCopy)
128 return node, nil
129 }
130 return nil, errors.New("create error")
131 }
132
133
134 func (m *FakeNodeHandler) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
135 m.lock.Lock()
136 defer func() {
137 m.RequestCount++
138 m.runAsyncCalls()
139 m.lock.Unlock()
140 }()
141 for i := range m.UpdatedNodes {
142 if m.UpdatedNodes[i].Name == name {
143 nodeCopy := *m.UpdatedNodes[i]
144 return &nodeCopy, nil
145 }
146 }
147 for i := range m.Existing {
148 if m.Existing[i].Name == name {
149 nodeCopy := *m.Existing[i]
150 return &nodeCopy, nil
151 }
152 }
153 return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "nodes"}, name)
154 }
155
156 func (m *FakeNodeHandler) runAsyncCalls() {
157 for _, a := range m.AsyncCalls {
158 a(m)
159 }
160 }
161
162
163 func (m *FakeNodeHandler) List(_ context.Context, opts metav1.ListOptions) (*v1.NodeList, error) {
164 m.lock.Lock()
165 defer func() {
166 m.RequestCount++
167 m.lock.Unlock()
168 }()
169 var nodes []*v1.Node
170 for i := 0; i < len(m.UpdatedNodes); i++ {
171 if !contains(m.UpdatedNodes[i], m.DeletedNodes) {
172 nodes = append(nodes, m.UpdatedNodes[i])
173 }
174 }
175 for i := 0; i < len(m.Existing); i++ {
176 if !contains(m.Existing[i], m.DeletedNodes) && !contains(m.Existing[i], nodes) {
177 nodes = append(nodes, m.Existing[i])
178 }
179 }
180 for i := 0; i < len(m.CreatedNodes); i++ {
181 if !contains(m.CreatedNodes[i], m.DeletedNodes) && !contains(m.CreatedNodes[i], nodes) {
182 nodes = append(nodes, m.CreatedNodes[i])
183 }
184 }
185 nodeList := &v1.NodeList{}
186 for _, node := range nodes {
187 nodeList.Items = append(nodeList.Items, *node)
188 }
189 return nodeList, nil
190 }
191
192
193 func (m *FakeNodeHandler) Delete(_ context.Context, id string, opt metav1.DeleteOptions) error {
194 m.lock.Lock()
195 defer func() {
196 m.RequestCount++
197 if m.DeleteWaitChan != nil {
198 m.DeleteWaitChan <- struct{}{}
199 }
200 m.lock.Unlock()
201 }()
202 m.DeletedNodes = append(m.DeletedNodes, NewNode(id))
203 return nil
204 }
205
206
207 func (m *FakeNodeHandler) DeleteCollection(_ context.Context, opt metav1.DeleteOptions, listOpts metav1.ListOptions) error {
208 return nil
209 }
210
211
212 func (m *FakeNodeHandler) Update(_ context.Context, node *v1.Node, _ metav1.UpdateOptions) (*v1.Node, error) {
213 m.lock.Lock()
214 defer func() {
215 m.RequestCount++
216 m.lock.Unlock()
217 }()
218
219 nodeCopy := *node
220 for i, updateNode := range m.UpdatedNodes {
221 if updateNode.Name == nodeCopy.Name {
222 if updateNode.GetObjectMeta().GetResourceVersion() != nodeCopy.GetObjectMeta().GetResourceVersion() {
223 return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
224 }
225 m.UpdatedNodes[i] = &nodeCopy
226 return node, nil
227 }
228 }
229 m.UpdatedNodes = append(m.UpdatedNodes, &nodeCopy)
230 return node, nil
231 }
232
233
234 func (m *FakeNodeHandler) UpdateStatus(_ context.Context, node *v1.Node, _ metav1.UpdateOptions) (*v1.Node, error) {
235 m.lock.Lock()
236 defer func() {
237 m.RequestCount++
238 m.lock.Unlock()
239 }()
240
241 var origNodeCopy v1.Node
242 found := false
243 for i := range m.Existing {
244 if m.Existing[i].Name == node.Name {
245 origNodeCopy = *m.Existing[i]
246 found = true
247 break
248 }
249 }
250 updatedNodeIndex := -1
251 for i := range m.UpdatedNodes {
252 if m.UpdatedNodes[i].Name == node.Name {
253 origNodeCopy = *m.UpdatedNodes[i]
254 updatedNodeIndex = i
255 found = true
256 break
257 }
258 }
259
260 if !found {
261 return nil, fmt.Errorf("not found node %v", node)
262 }
263
264 origNodeCopy.Status = node.Status
265 if updatedNodeIndex < 0 {
266 m.UpdatedNodes = append(m.UpdatedNodes, &origNodeCopy)
267 } else {
268 m.UpdatedNodes[updatedNodeIndex] = &origNodeCopy
269 }
270
271 nodeCopy := *node
272 m.UpdatedNodeStatuses = append(m.UpdatedNodeStatuses, &nodeCopy)
273 return node, nil
274 }
275
276
277 func (m *FakeNodeHandler) PatchStatus(ctx context.Context, nodeName string, data []byte) (*v1.Node, error) {
278 m.RequestCount++
279 return m.Patch(ctx, nodeName, types.StrategicMergePatchType, data, metav1.PatchOptions{}, "status")
280 }
281
282
283 func (m *FakeNodeHandler) Watch(_ context.Context, opts metav1.ListOptions) (watch.Interface, error) {
284 return watch.NewFake(), nil
285 }
286
287
288 func (m *FakeNodeHandler) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, _ metav1.PatchOptions, subresources ...string) (*v1.Node, error) {
289 m.lock.Lock()
290 defer func() {
291 m.RequestCount++
292 if m.PatchWaitChan != nil {
293 m.PatchWaitChan <- struct{}{}
294 }
295 m.lock.Unlock()
296 }()
297 var nodeCopy v1.Node
298 for i := range m.Existing {
299 if m.Existing[i].Name == name {
300 nodeCopy = *m.Existing[i]
301 }
302 }
303 updatedNodeIndex := -1
304 for i := range m.UpdatedNodes {
305 if m.UpdatedNodes[i].Name == name {
306 nodeCopy = *m.UpdatedNodes[i]
307 updatedNodeIndex = i
308 }
309 }
310
311 originalObjJS, err := json.Marshal(nodeCopy)
312 if err != nil {
313 klog.FromContext(ctx).Error(nil, "Failed to marshal", "node", klog.KObj(&nodeCopy))
314 return nil, nil
315 }
316 var originalNode v1.Node
317 if err = json.Unmarshal(originalObjJS, &originalNode); err != nil {
318 klog.FromContext(ctx).Error(err, "Failed to unmarshal original object")
319 return nil, nil
320 }
321
322 var patchedObjJS []byte
323 switch pt {
324 case types.JSONPatchType:
325 patchObj, err := jsonpatch.DecodePatch(data)
326 if err != nil {
327 klog.FromContext(ctx).Error(err, "")
328 return nil, nil
329 }
330 if patchedObjJS, err = patchObj.Apply(originalObjJS); err != nil {
331 klog.FromContext(ctx).Error(err, "")
332 return nil, nil
333 }
334 case types.MergePatchType:
335 if patchedObjJS, err = jsonpatch.MergePatch(originalObjJS, data); err != nil {
336 klog.FromContext(ctx).Error(err, "")
337 return nil, nil
338 }
339 case types.StrategicMergePatchType:
340 if patchedObjJS, err = strategicpatch.StrategicMergePatch(originalObjJS, data, originalNode); err != nil {
341 klog.FromContext(ctx).Error(err, "")
342 return nil, nil
343 }
344 default:
345 klog.FromContext(ctx).Error(nil, "Unknown Content-Type header", "patch", pt)
346 return nil, nil
347 }
348
349 var updatedNode v1.Node
350 if err = json.Unmarshal(patchedObjJS, &updatedNode); err != nil {
351 klog.FromContext(ctx).Error(err, "Failed to unmarshal patched object")
352 return nil, nil
353 }
354
355 if updatedNodeIndex < 0 {
356 m.UpdatedNodes = append(m.UpdatedNodes, &updatedNode)
357 } else {
358 if updatedNode.GetObjectMeta().GetResourceVersion() != m.UpdatedNodes[updatedNodeIndex].GetObjectMeta().GetResourceVersion() {
359 return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
360 }
361 m.UpdatedNodes[updatedNodeIndex] = &updatedNode
362 }
363
364 return &updatedNode, nil
365 }
366
367
368 func (m *FakeNodeHandler) Apply(ctx context.Context, node *v1apply.NodeApplyConfiguration, opts metav1.ApplyOptions) (*v1.Node, error) {
369 patchOpts := opts.ToPatchOptions()
370 data, err := json.Marshal(node)
371 if err != nil {
372 return nil, err
373 }
374 name := node.Name
375 if name == nil {
376 return nil, fmt.Errorf("deployment.Name must be provided to Apply")
377 }
378
379 return m.Patch(ctx, *name, types.ApplyPatchType, data, patchOpts)
380 }
381
382
383 func (m *FakeNodeHandler) ApplyStatus(ctx context.Context, node *v1apply.NodeApplyConfiguration, opts metav1.ApplyOptions) (*v1.Node, error) {
384 patchOpts := opts.ToPatchOptions()
385 data, err := json.Marshal(node)
386 if err != nil {
387 return nil, err
388 }
389 name := node.Name
390 if name == nil {
391 return nil, fmt.Errorf("deployment.Name must be provided to Apply")
392 }
393
394 return m.Patch(ctx, *name, types.ApplyPatchType, data, patchOpts, "status")
395 }
396
397
398 type FakeRecorder struct {
399 sync.Mutex
400 source v1.EventSource
401 Events []*v1.Event
402 clock clock.Clock
403 }
404
405
406 func (f *FakeRecorder) Event(obj runtime.Object, eventtype, reason, message string) {
407 f.generateEvent(obj, metav1.Now(), eventtype, reason, message)
408 }
409
410
411 func (f *FakeRecorder) Eventf(obj runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) {
412 f.Event(obj, eventtype, reason, fmt.Sprintf(messageFmt, args...))
413 }
414
415
416 func (f *FakeRecorder) AnnotatedEventf(obj runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) {
417 f.Eventf(obj, eventtype, reason, messageFmt, args...)
418 }
419
420 func (f *FakeRecorder) generateEvent(obj runtime.Object, timestamp metav1.Time, eventtype, reason, message string) {
421 f.Lock()
422 defer f.Unlock()
423 ctx := context.TODO()
424 ref, err := ref.GetReference(legacyscheme.Scheme, obj)
425 if err != nil {
426 klog.FromContext(ctx).Error(err, "Encountered error while getting reference")
427 return
428 }
429 event := f.makeEvent(ref, eventtype, reason, message)
430 event.Source = f.source
431 if f.Events != nil {
432 f.Events = append(f.Events, event)
433 }
434 }
435
436 func (f *FakeRecorder) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *v1.Event {
437 t := metav1.Time{Time: f.clock.Now()}
438 namespace := ref.Namespace
439 if namespace == "" {
440 namespace = metav1.NamespaceDefault
441 }
442
443 clientref := v1.ObjectReference{
444 Kind: ref.Kind,
445 Namespace: ref.Namespace,
446 Name: ref.Name,
447 UID: ref.UID,
448 APIVersion: ref.APIVersion,
449 ResourceVersion: ref.ResourceVersion,
450 FieldPath: ref.FieldPath,
451 }
452
453 return &v1.Event{
454 ObjectMeta: metav1.ObjectMeta{
455 Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
456 Namespace: namespace,
457 },
458 InvolvedObject: clientref,
459 Reason: reason,
460 Message: message,
461 FirstTimestamp: t,
462 LastTimestamp: t,
463 Count: 1,
464 Type: eventtype,
465 }
466 }
467
468
469 func NewFakeRecorder() *FakeRecorder {
470 return &FakeRecorder{
471 source: v1.EventSource{Component: "nodeControllerTest"},
472 Events: []*v1.Event{},
473 clock: testingclock.NewFakeClock(time.Now()),
474 }
475 }
476
477
478 func NewNode(name string) *v1.Node {
479 return &v1.Node{
480 ObjectMeta: metav1.ObjectMeta{Name: name},
481 Status: v1.NodeStatus{
482 Capacity: v1.ResourceList{
483 v1.ResourceName(v1.ResourceCPU): resource.MustParse("10"),
484 v1.ResourceName(v1.ResourceMemory): resource.MustParse("10G"),
485 },
486 },
487 }
488 }
489
490
491 func NewPod(name, host string) *v1.Pod {
492 pod := &v1.Pod{
493 ObjectMeta: metav1.ObjectMeta{
494 Namespace: "default",
495 Name: name,
496 },
497 Spec: v1.PodSpec{
498 NodeName: host,
499 },
500 Status: v1.PodStatus{
501 Conditions: []v1.PodCondition{
502 {
503 Type: v1.PodReady,
504 Status: v1.ConditionTrue,
505 },
506 },
507 },
508 }
509
510 return pod
511 }
512
513 func contains(node *v1.Node, nodes []*v1.Node) bool {
514 for i := 0; i < len(nodes); i++ {
515 if node.Name == nodes[i].Name {
516 return true
517 }
518 }
519 return false
520 }
521
522
523 func GetZones(nodeHandler *FakeNodeHandler) []string {
524 nodes, _ := nodeHandler.List(context.TODO(), metav1.ListOptions{})
525 zones := sets.NewString()
526 for _, node := range nodes.Items {
527 zones.Insert(utilnode.GetZoneKey(&node))
528 }
529 return zones.List()
530 }
531
532
533 func CreateZoneID(region, zone string) string {
534 return region + ":\x00:" + zone
535 }
536
537
538
539 func GetKey(obj interface{}, t *testing.T) string {
540 t.Helper()
541 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
542 if ok {
543
544 obj = tombstone.Obj
545 }
546 val := reflect.ValueOf(obj).Elem()
547 name := val.FieldByName("Name").String()
548 if len(name) == 0 {
549 t.Errorf("Unexpected object %v", obj)
550 }
551
552 key, err := keyFunc(obj)
553 if err != nil {
554 t.Errorf("Unexpected error getting key for %T %v: %v", val.Interface(), name, err)
555 return ""
556 }
557 return key
558 }
559
View as plain text