...

Source file src/k8s.io/client-go/testing/fixture.go

Documentation: k8s.io/client-go/testing

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package testing
    18  
    19  import (
    20  	"fmt"
    21  	"reflect"
    22  	"sort"
    23  	"strings"
    24  	"sync"
    25  
    26  	jsonpatch "github.com/evanphx/json-patch"
    27  
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	"k8s.io/apimachinery/pkg/api/meta"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/json"
    35  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    36  	"k8s.io/apimachinery/pkg/watch"
    37  	restclient "k8s.io/client-go/rest"
    38  )
    39  
    40  // ObjectTracker keeps track of objects. It is intended to be used to
    41  // fake calls to a server by returning objects based on their kind,
    42  // namespace and name.
    43  type ObjectTracker interface {
    44  	// Add adds an object to the tracker. If object being added
    45  	// is a list, its items are added separately.
    46  	Add(obj runtime.Object) error
    47  
    48  	// Get retrieves the object by its kind, namespace and name.
    49  	Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error)
    50  
    51  	// Create adds an object to the tracker in the specified namespace.
    52  	Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
    53  
    54  	// Update updates an existing object in the tracker in the specified namespace.
    55  	Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error
    56  
    57  	// List retrieves all objects of a given kind in the given
    58  	// namespace. Only non-List kinds are accepted.
    59  	List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error)
    60  
    61  	// Delete deletes an existing object from the tracker. If object
    62  	// didn't exist in the tracker prior to deletion, Delete returns
    63  	// no error.
    64  	Delete(gvr schema.GroupVersionResource, ns, name string) error
    65  
    66  	// Watch watches objects from the tracker. Watch returns a channel
    67  	// which will push added / modified / deleted object.
    68  	Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error)
    69  }
    70  
    71  // ObjectScheme abstracts the implementation of common operations on objects.
    72  type ObjectScheme interface {
    73  	runtime.ObjectCreater
    74  	runtime.ObjectTyper
    75  }
    76  
    77  // ObjectReaction returns a ReactionFunc that applies core.Action to
    78  // the given tracker.
    79  func ObjectReaction(tracker ObjectTracker) ReactionFunc {
    80  	return func(action Action) (bool, runtime.Object, error) {
    81  		ns := action.GetNamespace()
    82  		gvr := action.GetResource()
    83  		// Here and below we need to switch on implementation types,
    84  		// not on interfaces, as some interfaces are identical
    85  		// (e.g. UpdateAction and CreateAction), so if we use them,
    86  		// updates and creates end up matching the same case branch.
    87  		switch action := action.(type) {
    88  
    89  		case ListActionImpl:
    90  			obj, err := tracker.List(gvr, action.GetKind(), ns)
    91  			return true, obj, err
    92  
    93  		case GetActionImpl:
    94  			obj, err := tracker.Get(gvr, ns, action.GetName())
    95  			return true, obj, err
    96  
    97  		case CreateActionImpl:
    98  			objMeta, err := meta.Accessor(action.GetObject())
    99  			if err != nil {
   100  				return true, nil, err
   101  			}
   102  			if action.GetSubresource() == "" {
   103  				err = tracker.Create(gvr, action.GetObject(), ns)
   104  			} else {
   105  				oldObj, getOldObjErr := tracker.Get(gvr, ns, objMeta.GetName())
   106  				if getOldObjErr != nil {
   107  					return true, nil, getOldObjErr
   108  				}
   109  				// Check whether the existing historical object type is the same as the current operation object type that needs to be updated, and if it is the same, perform the update operation.
   110  				if reflect.TypeOf(oldObj) == reflect.TypeOf(action.GetObject()) {
   111  					// TODO: Currently we're handling subresource creation as an update
   112  					// on the enclosing resource. This works for some subresources but
   113  					// might not be generic enough.
   114  					err = tracker.Update(gvr, action.GetObject(), ns)
   115  				} else {
   116  					// If the historical object type is different from the current object type, need to make sure we return the object submitted,don't persist the submitted object in the tracker.
   117  					return true, action.GetObject(), nil
   118  				}
   119  			}
   120  			if err != nil {
   121  				return true, nil, err
   122  			}
   123  			obj, err := tracker.Get(gvr, ns, objMeta.GetName())
   124  			return true, obj, err
   125  
   126  		case UpdateActionImpl:
   127  			objMeta, err := meta.Accessor(action.GetObject())
   128  			if err != nil {
   129  				return true, nil, err
   130  			}
   131  			err = tracker.Update(gvr, action.GetObject(), ns)
   132  			if err != nil {
   133  				return true, nil, err
   134  			}
   135  			obj, err := tracker.Get(gvr, ns, objMeta.GetName())
   136  			return true, obj, err
   137  
   138  		case DeleteActionImpl:
   139  			err := tracker.Delete(gvr, ns, action.GetName())
   140  			if err != nil {
   141  				return true, nil, err
   142  			}
   143  			return true, nil, nil
   144  
   145  		case PatchActionImpl:
   146  			obj, err := tracker.Get(gvr, ns, action.GetName())
   147  			if err != nil {
   148  				return true, nil, err
   149  			}
   150  
   151  			old, err := json.Marshal(obj)
   152  			if err != nil {
   153  				return true, nil, err
   154  			}
   155  
   156  			// reset the object in preparation to unmarshal, since unmarshal does not guarantee that fields
   157  			// in obj that are removed by patch are cleared
   158  			value := reflect.ValueOf(obj)
   159  			value.Elem().Set(reflect.New(value.Type().Elem()).Elem())
   160  
   161  			switch action.GetPatchType() {
   162  			case types.JSONPatchType:
   163  				patch, err := jsonpatch.DecodePatch(action.GetPatch())
   164  				if err != nil {
   165  					return true, nil, err
   166  				}
   167  				modified, err := patch.Apply(old)
   168  				if err != nil {
   169  					return true, nil, err
   170  				}
   171  
   172  				if err = json.Unmarshal(modified, obj); err != nil {
   173  					return true, nil, err
   174  				}
   175  			case types.MergePatchType:
   176  				modified, err := jsonpatch.MergePatch(old, action.GetPatch())
   177  				if err != nil {
   178  					return true, nil, err
   179  				}
   180  
   181  				if err := json.Unmarshal(modified, obj); err != nil {
   182  					return true, nil, err
   183  				}
   184  			case types.StrategicMergePatchType, types.ApplyPatchType:
   185  				mergedByte, err := strategicpatch.StrategicMergePatch(old, action.GetPatch(), obj)
   186  				if err != nil {
   187  					return true, nil, err
   188  				}
   189  				if err = json.Unmarshal(mergedByte, obj); err != nil {
   190  					return true, nil, err
   191  				}
   192  			default:
   193  				return true, nil, fmt.Errorf("PatchType is not supported")
   194  			}
   195  
   196  			if err = tracker.Update(gvr, obj, ns); err != nil {
   197  				return true, nil, err
   198  			}
   199  
   200  			return true, obj, nil
   201  
   202  		default:
   203  			return false, nil, fmt.Errorf("no reaction implemented for %s", action)
   204  		}
   205  	}
   206  }
   207  
   208  type tracker struct {
   209  	scheme  ObjectScheme
   210  	decoder runtime.Decoder
   211  	lock    sync.RWMutex
   212  	objects map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object
   213  	// The value type of watchers is a map of which the key is either a namespace or
   214  	// all/non namespace aka "" and its value is list of fake watchers.
   215  	// Manipulations on resources will broadcast the notification events into the
   216  	// watchers' channel. Note that too many unhandled events (currently 100,
   217  	// see apimachinery/pkg/watch.DefaultChanSize) will cause a panic.
   218  	watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
   219  }
   220  
   221  var _ ObjectTracker = &tracker{}
   222  
   223  // NewObjectTracker returns an ObjectTracker that can be used to keep track
   224  // of objects for the fake clientset. Mostly useful for unit tests.
   225  func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
   226  	return &tracker{
   227  		scheme:   scheme,
   228  		decoder:  decoder,
   229  		objects:  make(map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object),
   230  		watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
   231  	}
   232  }
   233  
   234  func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, ns string) (runtime.Object, error) {
   235  	// Heuristic for list kind: original kind + List suffix. Might
   236  	// not always be true but this tracker has a pretty limited
   237  	// understanding of the actual API model.
   238  	listGVK := gvk
   239  	listGVK.Kind = listGVK.Kind + "List"
   240  	// GVK does have the concept of "internal version". The scheme recognizes
   241  	// the runtime.APIVersionInternal, but not the empty string.
   242  	if listGVK.Version == "" {
   243  		listGVK.Version = runtime.APIVersionInternal
   244  	}
   245  
   246  	list, err := t.scheme.New(listGVK)
   247  	if err != nil {
   248  		return nil, err
   249  	}
   250  
   251  	if !meta.IsListType(list) {
   252  		return nil, fmt.Errorf("%q is not a list type", listGVK.Kind)
   253  	}
   254  
   255  	t.lock.RLock()
   256  	defer t.lock.RUnlock()
   257  
   258  	objs, ok := t.objects[gvr]
   259  	if !ok {
   260  		return list, nil
   261  	}
   262  
   263  	matchingObjs, err := filterByNamespace(objs, ns)
   264  	if err != nil {
   265  		return nil, err
   266  	}
   267  	if err := meta.SetList(list, matchingObjs); err != nil {
   268  		return nil, err
   269  	}
   270  	return list.DeepCopyObject(), nil
   271  }
   272  
   273  func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
   274  	t.lock.Lock()
   275  	defer t.lock.Unlock()
   276  
   277  	fakewatcher := watch.NewRaceFreeFake()
   278  
   279  	if _, exists := t.watchers[gvr]; !exists {
   280  		t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
   281  	}
   282  	t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
   283  	return fakewatcher, nil
   284  }
   285  
   286  func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
   287  	errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
   288  
   289  	t.lock.RLock()
   290  	defer t.lock.RUnlock()
   291  
   292  	objs, ok := t.objects[gvr]
   293  	if !ok {
   294  		return nil, errNotFound
   295  	}
   296  
   297  	matchingObj, ok := objs[types.NamespacedName{Namespace: ns, Name: name}]
   298  	if !ok {
   299  		return nil, errNotFound
   300  	}
   301  
   302  	// Only one object should match in the tracker if it works
   303  	// correctly, as Add/Update methods enforce kind/namespace/name
   304  	// uniqueness.
   305  	obj := matchingObj.DeepCopyObject()
   306  	if status, ok := obj.(*metav1.Status); ok {
   307  		if status.Status != metav1.StatusSuccess {
   308  			return nil, &errors.StatusError{ErrStatus: *status}
   309  		}
   310  	}
   311  
   312  	return obj, nil
   313  }
   314  
   315  func (t *tracker) Add(obj runtime.Object) error {
   316  	if meta.IsListType(obj) {
   317  		return t.addList(obj, false)
   318  	}
   319  	objMeta, err := meta.Accessor(obj)
   320  	if err != nil {
   321  		return err
   322  	}
   323  	gvks, _, err := t.scheme.ObjectKinds(obj)
   324  	if err != nil {
   325  		return err
   326  	}
   327  
   328  	if partial, ok := obj.(*metav1.PartialObjectMetadata); ok && len(partial.TypeMeta.APIVersion) > 0 {
   329  		gvks = []schema.GroupVersionKind{partial.TypeMeta.GroupVersionKind()}
   330  	}
   331  
   332  	if len(gvks) == 0 {
   333  		return fmt.Errorf("no registered kinds for %v", obj)
   334  	}
   335  	for _, gvk := range gvks {
   336  		// NOTE: UnsafeGuessKindToResource is a heuristic and default match. The
   337  		// actual registration in apiserver can specify arbitrary route for a
   338  		// gvk. If a test uses such objects, it cannot preset the tracker with
   339  		// objects via Add(). Instead, it should trigger the Create() function
   340  		// of the tracker, where an arbitrary gvr can be specified.
   341  		gvr, _ := meta.UnsafeGuessKindToResource(gvk)
   342  		// Resource doesn't have the concept of "__internal" version, just set it to "".
   343  		if gvr.Version == runtime.APIVersionInternal {
   344  			gvr.Version = ""
   345  		}
   346  
   347  		err := t.add(gvr, obj, objMeta.GetNamespace(), false)
   348  		if err != nil {
   349  			return err
   350  		}
   351  	}
   352  	return nil
   353  }
   354  
   355  func (t *tracker) Create(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
   356  	return t.add(gvr, obj, ns, false)
   357  }
   358  
   359  func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns string) error {
   360  	return t.add(gvr, obj, ns, true)
   361  }
   362  
   363  func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
   364  	watches := []*watch.RaceFreeFakeWatcher{}
   365  	if t.watchers[gvr] != nil {
   366  		if w := t.watchers[gvr][ns]; w != nil {
   367  			watches = append(watches, w...)
   368  		}
   369  		if ns != metav1.NamespaceAll {
   370  			if w := t.watchers[gvr][metav1.NamespaceAll]; w != nil {
   371  				watches = append(watches, w...)
   372  			}
   373  		}
   374  	}
   375  	return watches
   376  }
   377  
   378  func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
   379  	t.lock.Lock()
   380  	defer t.lock.Unlock()
   381  
   382  	gr := gvr.GroupResource()
   383  
   384  	// To avoid the object from being accidentally modified by caller
   385  	// after it's been added to the tracker, we always store the deep
   386  	// copy.
   387  	obj = obj.DeepCopyObject()
   388  
   389  	newMeta, err := meta.Accessor(obj)
   390  	if err != nil {
   391  		return err
   392  	}
   393  
   394  	// Propagate namespace to the new object if hasn't already been set.
   395  	if len(newMeta.GetNamespace()) == 0 {
   396  		newMeta.SetNamespace(ns)
   397  	}
   398  
   399  	if ns != newMeta.GetNamespace() {
   400  		msg := fmt.Sprintf("request namespace does not match object namespace, request: %q object: %q", ns, newMeta.GetNamespace())
   401  		return errors.NewBadRequest(msg)
   402  	}
   403  
   404  	_, ok := t.objects[gvr]
   405  	if !ok {
   406  		t.objects[gvr] = make(map[types.NamespacedName]runtime.Object)
   407  	}
   408  
   409  	namespacedName := types.NamespacedName{Namespace: newMeta.GetNamespace(), Name: newMeta.GetName()}
   410  	if _, ok = t.objects[gvr][namespacedName]; ok {
   411  		if replaceExisting {
   412  			for _, w := range t.getWatches(gvr, ns) {
   413  				// To avoid the object from being accidentally modified by watcher
   414  				w.Modify(obj.DeepCopyObject())
   415  			}
   416  			t.objects[gvr][namespacedName] = obj
   417  			return nil
   418  		}
   419  		return errors.NewAlreadyExists(gr, newMeta.GetName())
   420  	}
   421  
   422  	if replaceExisting {
   423  		// Tried to update but no matching object was found.
   424  		return errors.NewNotFound(gr, newMeta.GetName())
   425  	}
   426  
   427  	t.objects[gvr][namespacedName] = obj
   428  
   429  	for _, w := range t.getWatches(gvr, ns) {
   430  		// To avoid the object from being accidentally modified by watcher
   431  		w.Add(obj.DeepCopyObject())
   432  	}
   433  
   434  	return nil
   435  }
   436  
   437  func (t *tracker) addList(obj runtime.Object, replaceExisting bool) error {
   438  	list, err := meta.ExtractList(obj)
   439  	if err != nil {
   440  		return err
   441  	}
   442  	errs := runtime.DecodeList(list, t.decoder)
   443  	if len(errs) > 0 {
   444  		return errs[0]
   445  	}
   446  	for _, obj := range list {
   447  		if err := t.Add(obj); err != nil {
   448  			return err
   449  		}
   450  	}
   451  	return nil
   452  }
   453  
   454  func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error {
   455  	t.lock.Lock()
   456  	defer t.lock.Unlock()
   457  
   458  	objs, ok := t.objects[gvr]
   459  	if !ok {
   460  		return errors.NewNotFound(gvr.GroupResource(), name)
   461  	}
   462  
   463  	namespacedName := types.NamespacedName{Namespace: ns, Name: name}
   464  	obj, ok := objs[namespacedName]
   465  	if !ok {
   466  		return errors.NewNotFound(gvr.GroupResource(), name)
   467  	}
   468  
   469  	delete(objs, namespacedName)
   470  	for _, w := range t.getWatches(gvr, ns) {
   471  		w.Delete(obj.DeepCopyObject())
   472  	}
   473  	return nil
   474  }
   475  
   476  // filterByNamespace returns all objects in the collection that
   477  // match provided namespace. Empty namespace matches
   478  // non-namespaced objects.
   479  func filterByNamespace(objs map[types.NamespacedName]runtime.Object, ns string) ([]runtime.Object, error) {
   480  	var res []runtime.Object
   481  
   482  	for _, obj := range objs {
   483  		acc, err := meta.Accessor(obj)
   484  		if err != nil {
   485  			return nil, err
   486  		}
   487  		if ns != "" && acc.GetNamespace() != ns {
   488  			continue
   489  		}
   490  		res = append(res, obj)
   491  	}
   492  
   493  	// Sort res to get deterministic order.
   494  	sort.Slice(res, func(i, j int) bool {
   495  		acc1, _ := meta.Accessor(res[i])
   496  		acc2, _ := meta.Accessor(res[j])
   497  		if acc1.GetNamespace() != acc2.GetNamespace() {
   498  			return acc1.GetNamespace() < acc2.GetNamespace()
   499  		}
   500  		return acc1.GetName() < acc2.GetName()
   501  	})
   502  	return res, nil
   503  }
   504  
   505  func DefaultWatchReactor(watchInterface watch.Interface, err error) WatchReactionFunc {
   506  	return func(action Action) (bool, watch.Interface, error) {
   507  		return true, watchInterface, err
   508  	}
   509  }
   510  
   511  // SimpleReactor is a Reactor.  Each reaction function is attached to a given verb,resource tuple.  "*" in either field matches everything for that value.
   512  // For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions
   513  type SimpleReactor struct {
   514  	Verb     string
   515  	Resource string
   516  
   517  	Reaction ReactionFunc
   518  }
   519  
   520  func (r *SimpleReactor) Handles(action Action) bool {
   521  	verbCovers := r.Verb == "*" || r.Verb == action.GetVerb()
   522  	if !verbCovers {
   523  		return false
   524  	}
   525  
   526  	return resourceCovers(r.Resource, action)
   527  }
   528  
   529  func (r *SimpleReactor) React(action Action) (bool, runtime.Object, error) {
   530  	return r.Reaction(action)
   531  }
   532  
   533  // SimpleWatchReactor is a WatchReactor.  Each reaction function is attached to a given resource.  "*" matches everything for that value.
   534  // For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions
   535  type SimpleWatchReactor struct {
   536  	Resource string
   537  
   538  	Reaction WatchReactionFunc
   539  }
   540  
   541  func (r *SimpleWatchReactor) Handles(action Action) bool {
   542  	return resourceCovers(r.Resource, action)
   543  }
   544  
   545  func (r *SimpleWatchReactor) React(action Action) (bool, watch.Interface, error) {
   546  	return r.Reaction(action)
   547  }
   548  
   549  // SimpleProxyReactor is a ProxyReactor.  Each reaction function is attached to a given resource.  "*" matches everything for that value.
   550  // For instance, *,pods matches all verbs on pods.  This allows for easier composition of reaction functions.
   551  type SimpleProxyReactor struct {
   552  	Resource string
   553  
   554  	Reaction ProxyReactionFunc
   555  }
   556  
   557  func (r *SimpleProxyReactor) Handles(action Action) bool {
   558  	return resourceCovers(r.Resource, action)
   559  }
   560  
   561  func (r *SimpleProxyReactor) React(action Action) (bool, restclient.ResponseWrapper, error) {
   562  	return r.Reaction(action)
   563  }
   564  
   565  func resourceCovers(resource string, action Action) bool {
   566  	if resource == "*" {
   567  		return true
   568  	}
   569  
   570  	if resource == action.GetResource().Resource {
   571  		return true
   572  	}
   573  
   574  	if index := strings.Index(resource, "/"); index != -1 &&
   575  		resource[:index] == action.GetResource().Resource &&
   576  		resource[index+1:] == action.GetSubresource() {
   577  		return true
   578  	}
   579  
   580  	return false
   581  }
   582  

View as plain text