...

Source file src/k8s.io/client-go/tools/cache/controller_test.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"fmt"
    21  	"math/rand"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/util/sets"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	"k8s.io/apimachinery/pkg/watch"
    33  	fcache "k8s.io/client-go/tools/cache/testing"
    34  
    35  	fuzz "github.com/google/gofuzz"
    36  )
    37  
    38  func Example() {
    39  	// source simulates an apiserver object endpoint.
    40  	source := fcache.NewFakeControllerSource()
    41  
    42  	// This will hold the downstream state, as we know it.
    43  	downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
    44  
    45  	// This will hold incoming changes. Note how we pass downstream in as a
    46  	// KeyLister, that way resync operations will result in the correct set
    47  	// of update/delete deltas.
    48  	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    49  		KeyFunction:  MetaNamespaceKeyFunc,
    50  		KnownObjects: downstream,
    51  	})
    52  
    53  	// Let's do threadsafe output to get predictable test results.
    54  	deletionCounter := make(chan string, 1000)
    55  
    56  	cfg := &Config{
    57  		Queue:            fifo,
    58  		ListerWatcher:    source,
    59  		ObjectType:       &v1.Pod{},
    60  		FullResyncPeriod: time.Millisecond * 100,
    61  		RetryOnError:     false,
    62  
    63  		// Let's implement a simple controller that just deletes
    64  		// everything that comes in.
    65  		Process: func(obj interface{}, isInInitialList bool) error {
    66  			// Obj is from the Pop method of the Queue we make above.
    67  			newest := obj.(Deltas).Newest()
    68  
    69  			if newest.Type != Deleted {
    70  				// Update our downstream store.
    71  				err := downstream.Add(newest.Object)
    72  				if err != nil {
    73  					return err
    74  				}
    75  
    76  				// Delete this object.
    77  				source.Delete(newest.Object.(runtime.Object))
    78  			} else {
    79  				// Update our downstream store.
    80  				err := downstream.Delete(newest.Object)
    81  				if err != nil {
    82  					return err
    83  				}
    84  
    85  				// fifo's KeyOf is easiest, because it handles
    86  				// DeletedFinalStateUnknown markers.
    87  				key, err := fifo.KeyOf(newest.Object)
    88  				if err != nil {
    89  					return err
    90  				}
    91  
    92  				// Report this deletion.
    93  				deletionCounter <- key
    94  			}
    95  			return nil
    96  		},
    97  	}
    98  
    99  	// Create the controller and run it until we close stop.
   100  	stop := make(chan struct{})
   101  	defer close(stop)
   102  	go New(cfg).Run(stop)
   103  
   104  	// Let's add a few objects to the source.
   105  	testIDs := []string{"a-hello", "b-controller", "c-framework"}
   106  	for _, name := range testIDs {
   107  		// Note that these pods are not valid-- the fake source doesn't
   108  		// call validation or anything.
   109  		source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
   110  	}
   111  
   112  	// Let's wait for the controller to process the things we just added.
   113  	outputSet := sets.String{}
   114  	for i := 0; i < len(testIDs); i++ {
   115  		outputSet.Insert(<-deletionCounter)
   116  	}
   117  
   118  	for _, key := range outputSet.List() {
   119  		fmt.Println(key)
   120  	}
   121  	// Output:
   122  	// a-hello
   123  	// b-controller
   124  	// c-framework
   125  }
   126  
   127  func ExampleNewInformer() {
   128  	// source simulates an apiserver object endpoint.
   129  	source := fcache.NewFakeControllerSource()
   130  
   131  	// Let's do threadsafe output to get predictable test results.
   132  	deletionCounter := make(chan string, 1000)
   133  
   134  	// Make a controller that immediately deletes anything added to it, and
   135  	// logs anything deleted.
   136  	_, controller := NewInformer(
   137  		source,
   138  		&v1.Pod{},
   139  		time.Millisecond*100,
   140  		ResourceEventHandlerDetailedFuncs{
   141  			AddFunc: func(obj interface{}, isInInitialList bool) {
   142  				source.Delete(obj.(runtime.Object))
   143  			},
   144  			DeleteFunc: func(obj interface{}) {
   145  				key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
   146  				if err != nil {
   147  					key = "oops something went wrong with the key"
   148  				}
   149  
   150  				// Report this deletion.
   151  				deletionCounter <- key
   152  			},
   153  		},
   154  	)
   155  
   156  	// Run the controller and run it until we close stop.
   157  	stop := make(chan struct{})
   158  	defer close(stop)
   159  	go controller.Run(stop)
   160  
   161  	// Let's add a few objects to the source.
   162  	testIDs := []string{"a-hello", "b-controller", "c-framework"}
   163  	for _, name := range testIDs {
   164  		// Note that these pods are not valid-- the fake source doesn't
   165  		// call validation or anything.
   166  		source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
   167  	}
   168  
   169  	// Let's wait for the controller to process the things we just added.
   170  	outputSet := sets.String{}
   171  	for i := 0; i < len(testIDs); i++ {
   172  		outputSet.Insert(<-deletionCounter)
   173  	}
   174  
   175  	for _, key := range outputSet.List() {
   176  		fmt.Println(key)
   177  	}
   178  	// Output:
   179  	// a-hello
   180  	// b-controller
   181  	// c-framework
   182  }
   183  
   184  func TestHammerController(t *testing.T) {
   185  	// This test executes a bunch of requests through the fake source and
   186  	// controller framework to make sure there's no locking/threading
   187  	// errors. If an error happens, it should hang forever or trigger the
   188  	// race detector.
   189  
   190  	// source simulates an apiserver object endpoint.
   191  	source := fcache.NewFakeControllerSource()
   192  
   193  	// Let's do threadsafe output to get predictable test results.
   194  	outputSetLock := sync.Mutex{}
   195  	// map of key to operations done on the key
   196  	outputSet := map[string][]string{}
   197  
   198  	recordFunc := func(eventType string, obj interface{}) {
   199  		key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
   200  		if err != nil {
   201  			t.Errorf("something wrong with key: %v", err)
   202  			key = "oops something went wrong with the key"
   203  		}
   204  
   205  		// Record some output when items are deleted.
   206  		outputSetLock.Lock()
   207  		defer outputSetLock.Unlock()
   208  		outputSet[key] = append(outputSet[key], eventType)
   209  	}
   210  
   211  	// Make a controller which just logs all the changes it gets.
   212  	_, controller := NewInformer(
   213  		source,
   214  		&v1.Pod{},
   215  		time.Millisecond*100,
   216  		ResourceEventHandlerDetailedFuncs{
   217  			AddFunc:    func(obj interface{}, isInInitialList bool) { recordFunc("add", obj) },
   218  			UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
   219  			DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
   220  		},
   221  	)
   222  
   223  	if controller.HasSynced() {
   224  		t.Errorf("Expected HasSynced() to return false before we started the controller")
   225  	}
   226  
   227  	// Run the controller and run it until we close stop.
   228  	stop := make(chan struct{})
   229  	go controller.Run(stop)
   230  
   231  	// Let's wait for the controller to do its initial sync
   232  	wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
   233  		return controller.HasSynced(), nil
   234  	})
   235  	if !controller.HasSynced() {
   236  		t.Errorf("Expected HasSynced() to return true after the initial sync")
   237  	}
   238  
   239  	wg := sync.WaitGroup{}
   240  	const threads = 3
   241  	wg.Add(threads)
   242  	for i := 0; i < threads; i++ {
   243  		go func() {
   244  			defer wg.Done()
   245  			// Let's add a few objects to the source.
   246  			currentNames := sets.String{}
   247  			rs := rand.NewSource(rand.Int63())
   248  			f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
   249  			for i := 0; i < 100; i++ {
   250  				var name string
   251  				var isNew bool
   252  				if currentNames.Len() == 0 || rand.Intn(3) == 1 {
   253  					f.Fuzz(&name)
   254  					isNew = true
   255  				} else {
   256  					l := currentNames.List()
   257  					name = l[rand.Intn(len(l))]
   258  				}
   259  
   260  				pod := &v1.Pod{}
   261  				f.Fuzz(pod)
   262  				pod.ObjectMeta.Name = name
   263  				pod.ObjectMeta.Namespace = "default"
   264  				// Add, update, or delete randomly.
   265  				// Note that these pods are not valid-- the fake source doesn't
   266  				// call validation or perform any other checking.
   267  				if isNew {
   268  					currentNames.Insert(name)
   269  					source.Add(pod)
   270  					continue
   271  				}
   272  				switch rand.Intn(2) {
   273  				case 0:
   274  					currentNames.Insert(name)
   275  					source.Modify(pod)
   276  				case 1:
   277  					currentNames.Delete(name)
   278  					source.Delete(pod)
   279  				}
   280  			}
   281  		}()
   282  	}
   283  	wg.Wait()
   284  
   285  	// Let's wait for the controller to finish processing the things we just added.
   286  	// TODO: look in the queue to see how many items need to be processed.
   287  	time.Sleep(100 * time.Millisecond)
   288  	close(stop)
   289  
   290  	// TODO: Verify that no goroutines were leaked here and that everything shut
   291  	// down cleanly.
   292  
   293  	outputSetLock.Lock()
   294  	t.Logf("got: %#v", outputSet)
   295  }
   296  
   297  func TestUpdate(t *testing.T) {
   298  	// This test is going to exercise the various paths that result in a
   299  	// call to update.
   300  
   301  	// source simulates an apiserver object endpoint.
   302  	source := fcache.NewFakeControllerSource()
   303  
   304  	const (
   305  		FROM = "from"
   306  		TO   = "to"
   307  	)
   308  
   309  	// These are the transitions we expect to see; because this is
   310  	// asynchronous, there are a lot of valid possibilities.
   311  	type pair struct{ from, to string }
   312  	allowedTransitions := map[pair]bool{
   313  		{FROM, TO}: true,
   314  
   315  		// Because a resync can happen when we've already observed one
   316  		// of the above but before the item is deleted.
   317  		{TO, TO}: true,
   318  		// Because a resync could happen before we observe an update.
   319  		{FROM, FROM}: true,
   320  	}
   321  
   322  	pod := func(name, check string, final bool) *v1.Pod {
   323  		p := &v1.Pod{
   324  			ObjectMeta: metav1.ObjectMeta{
   325  				Name:   name,
   326  				Labels: map[string]string{"check": check},
   327  			},
   328  		}
   329  		if final {
   330  			p.Labels["final"] = "true"
   331  		}
   332  		return p
   333  	}
   334  	deletePod := func(p *v1.Pod) bool {
   335  		return p.Labels["final"] == "true"
   336  	}
   337  
   338  	tests := []func(string){
   339  		func(name string) {
   340  			name = "a-" + name
   341  			source.Add(pod(name, FROM, false))
   342  			source.Modify(pod(name, TO, true))
   343  		},
   344  	}
   345  
   346  	const threads = 3
   347  
   348  	var testDoneWG sync.WaitGroup
   349  	testDoneWG.Add(threads * len(tests))
   350  
   351  	// Make a controller that deletes things once it observes an update.
   352  	// It calls Done() on the wait group on deletions so we can tell when
   353  	// everything we've added has been deleted.
   354  	watchCh := make(chan struct{})
   355  	_, controller := NewInformer(
   356  		&testLW{
   357  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   358  				watch, err := source.Watch(options)
   359  				close(watchCh)
   360  				return watch, err
   361  			},
   362  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   363  				return source.List(options)
   364  			},
   365  		},
   366  		&v1.Pod{},
   367  		0,
   368  		ResourceEventHandlerFuncs{
   369  			UpdateFunc: func(oldObj, newObj interface{}) {
   370  				o, n := oldObj.(*v1.Pod), newObj.(*v1.Pod)
   371  				from, to := o.Labels["check"], n.Labels["check"]
   372  				if !allowedTransitions[pair{from, to}] {
   373  					t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
   374  				}
   375  				if deletePod(n) {
   376  					source.Delete(n)
   377  				}
   378  			},
   379  			DeleteFunc: func(obj interface{}) {
   380  				testDoneWG.Done()
   381  			},
   382  		},
   383  	)
   384  
   385  	// Run the controller and run it until we close stop.
   386  	// Once Run() is called, calls to testDoneWG.Done() might start, so
   387  	// all testDoneWG.Add() calls must happen before this point
   388  	stop := make(chan struct{})
   389  	go controller.Run(stop)
   390  	<-watchCh
   391  
   392  	// run every test a few times, in parallel
   393  	var wg sync.WaitGroup
   394  	wg.Add(threads * len(tests))
   395  	for i := 0; i < threads; i++ {
   396  		for j, f := range tests {
   397  			go func(name string, f func(string)) {
   398  				defer wg.Done()
   399  				f(name)
   400  			}(fmt.Sprintf("%v-%v", i, j), f)
   401  		}
   402  	}
   403  	wg.Wait()
   404  
   405  	// Let's wait for the controller to process the things we just added.
   406  	testDoneWG.Wait()
   407  	close(stop)
   408  }
   409  
   410  func TestPanicPropagated(t *testing.T) {
   411  	// source simulates an apiserver object endpoint.
   412  	source := fcache.NewFakeControllerSource()
   413  
   414  	// Make a controller that just panic if the AddFunc is called.
   415  	_, controller := NewInformer(
   416  		source,
   417  		&v1.Pod{},
   418  		time.Millisecond*100,
   419  		ResourceEventHandlerDetailedFuncs{
   420  			AddFunc: func(obj interface{}, isInInitialList bool) {
   421  				// Create a panic.
   422  				panic("Just panic.")
   423  			},
   424  		},
   425  	)
   426  
   427  	// Run the controller and run it until we close stop.
   428  	stop := make(chan struct{})
   429  	defer close(stop)
   430  
   431  	propagated := make(chan interface{})
   432  	go func() {
   433  		defer func() {
   434  			if r := recover(); r != nil {
   435  				propagated <- r
   436  			}
   437  		}()
   438  		controller.Run(stop)
   439  	}()
   440  	// Let's add a object to the source. It will trigger a panic.
   441  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test"}})
   442  
   443  	// Check if the panic propagated up.
   444  	select {
   445  	case p := <-propagated:
   446  		if p == "Just panic." {
   447  			t.Logf("Test Passed")
   448  		} else {
   449  			t.Errorf("unrecognized panic in controller run: %v", p)
   450  		}
   451  	case <-time.After(wait.ForeverTestTimeout):
   452  		t.Errorf("timeout: the panic failed to propagate from the controller run method!")
   453  	}
   454  }
   455  
   456  func TestTransformingInformer(t *testing.T) {
   457  	// source simulates an apiserver object endpoint.
   458  	source := fcache.NewFakeControllerSource()
   459  
   460  	makePod := func(name, generation string) *v1.Pod {
   461  		return &v1.Pod{
   462  			ObjectMeta: metav1.ObjectMeta{
   463  				Name:      name,
   464  				Namespace: "namespace",
   465  				Labels:    map[string]string{"generation": generation},
   466  			},
   467  			Spec: v1.PodSpec{
   468  				Hostname:  "hostname",
   469  				Subdomain: "subdomain",
   470  			},
   471  		}
   472  	}
   473  	expectedPod := func(name, generation string) *v1.Pod {
   474  		pod := makePod(name, generation)
   475  		pod.Spec.Hostname = "new-hostname"
   476  		pod.Spec.Subdomain = ""
   477  		pod.Spec.NodeName = "nodename"
   478  		return pod
   479  	}
   480  
   481  	source.Add(makePod("pod1", "1"))
   482  	source.Modify(makePod("pod1", "2"))
   483  
   484  	type event struct {
   485  		eventType watch.EventType
   486  		previous  interface{}
   487  		current   interface{}
   488  	}
   489  	events := make(chan event, 10)
   490  	recordEvent := func(eventType watch.EventType, previous, current interface{}) {
   491  		events <- event{eventType: eventType, previous: previous, current: current}
   492  	}
   493  	verifyEvent := func(eventType watch.EventType, previous, current interface{}) {
   494  		select {
   495  		case event := <-events:
   496  			if event.eventType != eventType {
   497  				t.Errorf("expected type %v, got %v", eventType, event.eventType)
   498  			}
   499  			if !apiequality.Semantic.DeepEqual(event.previous, previous) {
   500  				t.Errorf("expected previous object %#v, got %#v", previous, event.previous)
   501  			}
   502  			if !apiequality.Semantic.DeepEqual(event.current, current) {
   503  				t.Errorf("expected object %#v, got %#v", current, event.current)
   504  			}
   505  		case <-time.After(wait.ForeverTestTimeout):
   506  			t.Errorf("failed to get event")
   507  		}
   508  	}
   509  
   510  	podTransformer := func(obj interface{}) (interface{}, error) {
   511  		pod, ok := obj.(*v1.Pod)
   512  		if !ok {
   513  			return nil, fmt.Errorf("unexpected object type: %T", obj)
   514  		}
   515  		pod.Spec.Hostname = "new-hostname"
   516  		pod.Spec.Subdomain = ""
   517  		pod.Spec.NodeName = "nodename"
   518  
   519  		// Clear out ResourceVersion to simplify comparisons.
   520  		pod.ResourceVersion = ""
   521  
   522  		return pod, nil
   523  	}
   524  
   525  	store, controller := NewTransformingInformer(
   526  		source,
   527  		&v1.Pod{},
   528  		0,
   529  		ResourceEventHandlerDetailedFuncs{
   530  			AddFunc:    func(obj interface{}, isInInitialList bool) { recordEvent(watch.Added, nil, obj) },
   531  			UpdateFunc: func(oldObj, newObj interface{}) { recordEvent(watch.Modified, oldObj, newObj) },
   532  			DeleteFunc: func(obj interface{}) { recordEvent(watch.Deleted, obj, nil) },
   533  		},
   534  		podTransformer,
   535  	)
   536  
   537  	verifyStore := func(expectedItems []interface{}) {
   538  		items := store.List()
   539  		if len(items) != len(expectedItems) {
   540  			t.Errorf("unexpected items %v, expected %v", items, expectedItems)
   541  		}
   542  		for _, expectedItem := range expectedItems {
   543  			found := false
   544  			for _, item := range items {
   545  				if apiequality.Semantic.DeepEqual(item, expectedItem) {
   546  					found = true
   547  				}
   548  			}
   549  			if !found {
   550  				t.Errorf("expected item %v not found in %v", expectedItem, items)
   551  			}
   552  		}
   553  	}
   554  
   555  	stopCh := make(chan struct{})
   556  	go controller.Run(stopCh)
   557  
   558  	verifyEvent(watch.Added, nil, expectedPod("pod1", "2"))
   559  	verifyStore([]interface{}{expectedPod("pod1", "2")})
   560  
   561  	source.Add(makePod("pod2", "1"))
   562  	verifyEvent(watch.Added, nil, expectedPod("pod2", "1"))
   563  	verifyStore([]interface{}{expectedPod("pod1", "2"), expectedPod("pod2", "1")})
   564  
   565  	source.Add(makePod("pod3", "1"))
   566  	verifyEvent(watch.Added, nil, expectedPod("pod3", "1"))
   567  
   568  	source.Modify(makePod("pod2", "2"))
   569  	verifyEvent(watch.Modified, expectedPod("pod2", "1"), expectedPod("pod2", "2"))
   570  
   571  	source.Delete(makePod("pod1", "2"))
   572  	verifyEvent(watch.Deleted, expectedPod("pod1", "2"), nil)
   573  	verifyStore([]interface{}{expectedPod("pod2", "2"), expectedPod("pod3", "1")})
   574  
   575  	close(stopCh)
   576  }
   577  
   578  func TestDeletionHandlingObjectToName(t *testing.T) {
   579  	cm := &v1.ConfigMap{
   580  		ObjectMeta: metav1.ObjectMeta{
   581  			Name:      "testname",
   582  			Namespace: "testnamespace",
   583  		},
   584  	}
   585  	stringKey, err := MetaNamespaceKeyFunc(cm)
   586  	if err != nil {
   587  		t.Error(err)
   588  	}
   589  	deleted := DeletedFinalStateUnknown{
   590  		Key: stringKey,
   591  		Obj: cm,
   592  	}
   593  	expected, err := ObjectToName(cm)
   594  	if err != nil {
   595  		t.Error(err)
   596  	}
   597  	actual, err := DeletionHandlingObjectToName(deleted)
   598  	if err != nil {
   599  		t.Error(err)
   600  	}
   601  	if expected != actual {
   602  		t.Errorf("Expected %#v, got %#v", expected, actual)
   603  	}
   604  }
   605  

View as plain text