1
16
17 package testing
18
19 import (
20 "fmt"
21 "reflect"
22 "sort"
23 "strings"
24 "sync"
25
26 jsonpatch "github.com/evanphx/json-patch"
27
28 "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/api/meta"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/util/json"
35 "k8s.io/apimachinery/pkg/util/strategicpatch"
36 "k8s.io/apimachinery/pkg/watch"
37 restclient "k8s.io/client-go/rest"
38 )
39
40
41
42
43 type ObjectTracker interface {
44
45
46 Add(obj runtime.Object) error
47
48
49 Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error)
50
51
52 Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
53
54
55 Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
56
57
58
59 List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error)
60
61
62
63
64 Delete(gvr schema.GroupVersionResource, ns, name string) error
65
66
67
68 Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error)
69 }
70
71
72 type ObjectScheme interface {
73 runtime.ObjectCreater
74 runtime.ObjectTyper
75 }
76
77
78
79 func ObjectReaction(tracker ObjectTracker) ReactionFunc {
80 return func(action Action) (bool, runtime.Object, error) {
81 ns := action.GetNamespace()
82 gvr := action.GetResource()
83
84
85
86
87 switch action := action.(type) {
88
89 case ListActionImpl:
90 obj, err := tracker.List(gvr, action.GetKind(), ns)
91 return true, obj, err
92
93 case GetActionImpl:
94 obj, err := tracker.Get(gvr, ns, action.GetName())
95 return true, obj, err
96
97 case CreateActionImpl:
98 objMeta, err := meta.Accessor(action.GetObject())
99 if err != nil {
100 return true, nil, err
101 }
102 if action.GetSubresource() == "" {
103 err = tracker.Create(gvr, action.GetObject(), ns)
104 } else {
105 oldObj, getOldObjErr := tracker.Get(gvr, ns, objMeta.GetName())
106 if getOldObjErr != nil {
107 return true, nil, getOldObjErr
108 }
109
110 if reflect.TypeOf(oldObj) == reflect.TypeOf(action.GetObject()) {
111
112
113
114 err = tracker.Update(gvr, action.GetObject(), ns)
115 } else {
116
117 return true, action.GetObject(), nil
118 }
119 }
120 if err != nil {
121 return true, nil, err
122 }
123 obj, err := tracker.Get(gvr, ns, objMeta.GetName())
124 return true, obj, err
125
126 case UpdateActionImpl:
127 objMeta, err := meta.Accessor(action.GetObject())
128 if err != nil {
129 return true, nil, err
130 }
131 err = tracker.Update(gvr, action.GetObject(), ns)
132 if err != nil {
133 return true, nil, err
134 }
135 obj, err := tracker.Get(gvr, ns, objMeta.GetName())
136 return true, obj, err
137
138 case DeleteActionImpl:
139 err := tracker.Delete(gvr, ns, action.GetName())
140 if err != nil {
141 return true, nil, err
142 }
143 return true, nil, nil
144
145 case PatchActionImpl:
146 obj, err := tracker.Get(gvr, ns, action.GetName())
147 if err != nil {
148 return true, nil, err
149 }
150
151 old, err := json.Marshal(obj)
152 if err != nil {
153 return true, nil, err
154 }
155
156
157
158 value := reflect.ValueOf(obj)
159 value.Elem().Set(reflect.New(value.Type().Elem()).Elem())
160
161 switch action.GetPatchType() {
162 case types.JSONPatchType:
163 patch, err := jsonpatch.DecodePatch(action.GetPatch())
164 if err != nil {
165 return true, nil, err
166 }
167 modified, err := patch.Apply(old)
168 if err != nil {
169 return true, nil, err
170 }
171
172 if err = json.Unmarshal(modified, obj); err != nil {
173 return true, nil, err
174 }
175 case types.MergePatchType:
176 modified, err := jsonpatch.MergePatch(old, action.GetPatch())
177 if err != nil {
178 return true, nil, err
179 }
180
181 if err := json.Unmarshal(modified, obj); err != nil {
182 return true, nil, err
183 }
184 case types.StrategicMergePatchType, types.ApplyPatchType:
185 mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
186 if err != nil {
187 return true, nil, err
188 }
189 if err = json.Unmarshal(mergedByte, obj); err != nil {
190 return true, nil, err
191 }
192 default:
193 return true, nil, fmt.Errorf("PatchType is not supported")
194 }
195
196 if err = tracker.Update(gvr, obj, ns); err != nil {
197 return true, nil, err
198 }
199
200 return true, obj, nil
201
202 default:
203 return false, nil, fmt.Errorf("no reaction implemented for %s", action)
204 }
205 }
206 }
207
208 type tracker struct {
209 scheme ObjectScheme
210 decoder runtime.Decoder
211 lock sync.RWMutex
212 objects map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object
213
214
215
216
217
218 watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
219 }
220
221 var _ ObjectTracker = &tracker{}
222
223
224
225 func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
226 return &tracker{
227 scheme: scheme,
228 decoder: decoder,
229 objects: make(map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object),
230 watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
231 }
232 }
233
234 func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) {
235
236
237
238 listGVK := gvk
239 listGVK.Kind = listGVK.Kind + "List"
240
241
242 if listGVK.Version == "" {
243 listGVK.Version = runtime.APIVersionInternal
244 }
245
246 list, err := t.scheme.New(listGVK)
247 if err != nil {
248 return nil, err
249 }
250
251 if !meta.IsListType(list) {
252 return nil, fmt.Errorf("%q is not a list type", listGVK.Kind)
253 }
254
255 t.lock.RLock()
256 defer t.lock.RUnlock()
257
258 objs, ok := t.objects[gvr]
259 if !ok {
260 return list, nil
261 }
262
263 matchingObjs, err := filterByNamespace(objs, ns)
264 if err != nil {
265 return nil, err
266 }
267 if err := meta.SetList(list, matchingObjs); err != nil {
268 return nil, err
269 }
270 return list.DeepCopyObject(), nil
271 }
272
273 func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
274 t.lock.Lock()
275 defer t.lock.Unlock()
276
277 fakewatcher := watch.NewRaceFreeFake()
278
279 if _, exists := t.watchers[gvr]; !exists {
280 t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
281 }
282 t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
283 return fakewatcher, nil
284 }
285
286 func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
287 errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
288
289 t.lock.RLock()
290 defer t.lock.RUnlock()
291
292 objs, ok := t.objects[gvr]
293 if !ok {
294 return nil, errNotFound
295 }
296
297 matchingObj, ok := objs[types.NamespacedName{Namespace: ns, Name: name}]
298 if !ok {
299 return nil, errNotFound
300 }
301
302
303
304
305 obj := matchingObj.DeepCopyObject()
306 if status, ok := obj.(*metav1.Status); ok {
307 if status.Status != metav1.StatusSuccess {
308 return nil, &errors.StatusError{ErrStatus: *status}
309 }
310 }
311
312 return obj, nil
313 }
314
315 func (t *tracker) Add(obj runtime.Object) error {
316 if meta.IsListType(obj) {
317 return t.addList(obj, false)
318 }
319 objMeta, err := meta.Accessor(obj)
320 if err != nil {
321 return err
322 }
323 gvks, _, err := t.scheme.ObjectKinds(obj)
324 if err != nil {
325 return err
326 }
327
328 if partial, ok := obj.(*metav1.PartialObjectMetadata); ok && len(partial.TypeMeta.APIVersion) > 0 {
329 gvks = []schema.GroupVersionKind{partial.TypeMeta.GroupVersionKind()}
330 }
331
332 if len(gvks) == 0 {
333 return fmt.Errorf("no registered kinds for %v", obj)
334 }
335 for _, gvk := range gvks {
336
337
338
339
340
341 gvr, _ := meta.UnsafeGuessKindToResource(gvk)
342
343 if gvr.Version == runtime.APIVersionInternal {
344 gvr.Version = ""
345 }
346
347 err := t.add(gvr, obj, objMeta.GetNamespace(), false)
348 if err != nil {
349 return err
350 }
351 }
352 return nil
353 }
354
355 func (t *tracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
356 return t.add(gvr, obj, ns, false)
357 }
358
359 func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
360 return t.add(gvr, obj, ns, true)
361 }
362
363 func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
364 watches := []*watch.RaceFreeFakeWatcher{}
365 if t.watchers[gvr] != nil {
366 if w := t.watchers[gvr][ns]; w != nil {
367 watches = append(watches, w...)
368 }
369 if ns != metav1.NamespaceAll {
370 if w := t.watchers[gvr][metav1.NamespaceAll]; w != nil {
371 watches = append(watches, w...)
372 }
373 }
374 }
375 return watches
376 }
377
378 func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
379 t.lock.Lock()
380 defer t.lock.Unlock()
381
382 gr := gvr.GroupResource()
383
384
385
386
387 obj = obj.DeepCopyObject()
388
389 newMeta, err := meta.Accessor(obj)
390 if err != nil {
391 return err
392 }
393
394
395 if len(newMeta.GetNamespace()) == 0 {
396 newMeta.SetNamespace(ns)
397 }
398
399 if ns != newMeta.GetNamespace() {
400 msg := fmt.Sprintf("request namespace does not match object namespace, request: %q object: %q", ns, newMeta.GetNamespace())
401 return errors.NewBadRequest(msg)
402 }
403
404 _, ok := t.objects[gvr]
405 if !ok {
406 t.objects[gvr] = make(map[types.NamespacedName]runtime.Object)
407 }
408
409 namespacedName := types.NamespacedName{Namespace: newMeta.GetNamespace(), Name: newMeta.GetName()}
410 if _, ok = t.objects[gvr][namespacedName]; ok {
411 if replaceExisting {
412 for _, w := range t.getWatches(gvr, ns) {
413
414 w.Modify(obj.DeepCopyObject())
415 }
416 t.objects[gvr][namespacedName] = obj
417 return nil
418 }
419 return errors.NewAlreadyExists(gr, newMeta.GetName())
420 }
421
422 if replaceExisting {
423
424 return errors.NewNotFound(gr, newMeta.GetName())
425 }
426
427 t.objects[gvr][namespacedName] = obj
428
429 for _, w := range t.getWatches(gvr, ns) {
430
431 w.Add(obj.DeepCopyObject())
432 }
433
434 return nil
435 }
436
437 func (t *tracker) addList(obj runtime.Object, replaceExisting bool) error {
438 list, err := meta.ExtractList(obj)
439 if err != nil {
440 return err
441 }
442 errs := runtime.DecodeList(list, t.decoder)
443 if len(errs) > 0 {
444 return errs[0]
445 }
446 for _, obj := range list {
447 if err := t.Add(obj); err != nil {
448 return err
449 }
450 }
451 return nil
452 }
453
454 func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error {
455 t.lock.Lock()
456 defer t.lock.Unlock()
457
458 objs, ok := t.objects[gvr]
459 if !ok {
460 return errors.NewNotFound(gvr.GroupResource(), name)
461 }
462
463 namespacedName := types.NamespacedName{Namespace: ns, Name: name}
464 obj, ok := objs[namespacedName]
465 if !ok {
466 return errors.NewNotFound(gvr.GroupResource(), name)
467 }
468
469 delete(objs, namespacedName)
470 for _, w := range t.getWatches(gvr, ns) {
471 w.Delete(obj.DeepCopyObject())
472 }
473 return nil
474 }
475
476
477
478
479 func filterByNamespace(objs map[types.NamespacedName]runtime.Object, ns string) ([]runtime.Object, error) {
480 var res []runtime.Object
481
482 for _, obj := range objs {
483 acc, err := meta.Accessor(obj)
484 if err != nil {
485 return nil, err
486 }
487 if ns != "" && acc.GetNamespace() != ns {
488 continue
489 }
490 res = append(res, obj)
491 }
492
493
494 sort.Slice(res, func(i, j int) bool {
495 acc1, _ := meta.Accessor(res[i])
496 acc2, _ := meta.Accessor(res[j])
497 if acc1.GetNamespace() != acc2.GetNamespace() {
498 return acc1.GetNamespace() < acc2.GetNamespace()
499 }
500 return acc1.GetName() < acc2.GetName()
501 })
502 return res, nil
503 }
504
505 func DefaultWatchReactor(watchInterface watch.Interface, err error) WatchReactionFunc {
506 return func(action Action) (bool, watch.Interface, error) {
507 return true, watchInterface, err
508 }
509 }
510
511
512
513 type SimpleReactor struct {
514 Verb string
515 Resource string
516
517 Reaction ReactionFunc
518 }
519
520 func (r *SimpleReactor) Handles(action Action) bool {
521 verbCovers := r.Verb == "*" || r.Verb == action.GetVerb()
522 if !verbCovers {
523 return false
524 }
525
526 return resourceCovers(r.Resource, action)
527 }
528
529 func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) {
530 return r.Reaction(action)
531 }
532
533
534
535 type SimpleWatchReactor struct {
536 Resource string
537
538 Reaction WatchReactionFunc
539 }
540
541 func (r *SimpleWatchReactor) Handles(action Action) bool {
542 return resourceCovers(r.Resource, action)
543 }
544
545 func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) {
546 return r.Reaction(action)
547 }
548
549
550
551 type SimpleProxyReactor struct {
552 Resource string
553
554 Reaction ProxyReactionFunc
555 }
556
557 func (r *SimpleProxyReactor) Handles(action Action) bool {
558 return resourceCovers(r.Resource, action)
559 }
560
561 func (r *SimpleProxyReactor) React(action Action) (bool, restclient.ResponseWrapper, error) {
562 return r.Reaction(action)
563 }
564
565 func resourceCovers(resource string, action Action) bool {
566 if resource == "*" {
567 return true
568 }
569
570 if resource == action.GetResource().Resource {
571 return true
572 }
573
574 if index := strings.Index(resource, "/"); index != -1 &&
575 resource[:index] == action.GetResource().Resource &&
576 resource[index+1:] == action.GetSubresource() {
577 return true
578 }
579
580 return false
581 }
582
View as plain text