...

Source file src/k8s.io/client-go/tools/cache/shared_informer_test.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"strconv"
    24  	"strings"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/go-cmp/cmp"
    30  	"github.com/google/go-cmp/cmp/cmpopts"
    31  	"github.com/stretchr/testify/assert"
    32  	v1 "k8s.io/api/core/v1"
    33  	"k8s.io/apimachinery/pkg/api/meta"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	fcache "k8s.io/client-go/tools/cache/testing"
    38  	testingclock "k8s.io/utils/clock/testing"
    39  )
    40  
    41  type testListener struct {
    42  	lock              sync.RWMutex
    43  	resyncPeriod      time.Duration
    44  	expectedItemNames sets.String
    45  	receivedItemNames []string
    46  	name              string
    47  }
    48  
    49  func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener {
    50  	l := &testListener{
    51  		resyncPeriod:      resyncPeriod,
    52  		expectedItemNames: sets.NewString(expected...),
    53  		name:              name,
    54  	}
    55  	return l
    56  }
    57  
    58  func (l *testListener) OnAdd(obj interface{}, isInInitialList bool) {
    59  	l.handle(obj)
    60  }
    61  
    62  func (l *testListener) OnUpdate(old, new interface{}) {
    63  	l.handle(new)
    64  }
    65  
    66  func (l *testListener) OnDelete(obj interface{}) {
    67  }
    68  
    69  func (l *testListener) handle(obj interface{}) {
    70  	key, _ := MetaNamespaceKeyFunc(obj)
    71  	fmt.Printf("%s: handle: %v\n", l.name, key)
    72  	l.lock.Lock()
    73  	defer l.lock.Unlock()
    74  	objectMeta, _ := meta.Accessor(obj)
    75  	l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName())
    76  }
    77  
    78  func (l *testListener) ok() bool {
    79  	fmt.Println("polling")
    80  	err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
    81  		if l.satisfiedExpectations() {
    82  			return true, nil
    83  		}
    84  		return false, nil
    85  	})
    86  	if err != nil {
    87  		return false
    88  	}
    89  
    90  	// wait just a bit to allow any unexpected stragglers to come in
    91  	fmt.Println("sleeping")
    92  	time.Sleep(1 * time.Second)
    93  	fmt.Println("final check")
    94  	return l.satisfiedExpectations()
    95  }
    96  
    97  func (l *testListener) satisfiedExpectations() bool {
    98  	l.lock.RLock()
    99  	defer l.lock.RUnlock()
   100  
   101  	return sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames)
   102  }
   103  
   104  func eventHandlerCount(i SharedInformer) int {
   105  	s := i.(*sharedIndexInformer)
   106  	s.startedLock.Lock()
   107  	defer s.startedLock.Unlock()
   108  	return len(s.processor.listeners)
   109  }
   110  
   111  func isStarted(i SharedInformer) bool {
   112  	s := i.(*sharedIndexInformer)
   113  	s.startedLock.Lock()
   114  	defer s.startedLock.Unlock()
   115  	return s.started
   116  }
   117  
   118  func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
   119  	s := i.(*sharedIndexInformer)
   120  	return s.processor.getListener(h) != nil
   121  }
   122  
   123  func TestIndexer(t *testing.T) {
   124  	assert := assert.New(t)
   125  	// source simulates an apiserver object endpoint.
   126  	source := fcache.NewFakeControllerSource()
   127  	pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
   128  	pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
   129  	pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
   130  	source.Add(pod1)
   131  	source.Add(pod2)
   132  
   133  	// create the shared informer and resync every 1s
   134  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   135  	err := informer.AddIndexers(map[string]IndexFunc{
   136  		"labels": func(obj interface{}) ([]string, error) {
   137  			res := []string{}
   138  			for k := range obj.(*v1.Pod).Labels {
   139  				res = append(res, k)
   140  			}
   141  			return res, nil
   142  		},
   143  	})
   144  	if err != nil {
   145  		t.Fatal(err)
   146  	}
   147  	stop := make(chan struct{})
   148  	defer close(stop)
   149  
   150  	go informer.Run(stop)
   151  	WaitForCacheSync(stop, informer.HasSynced)
   152  
   153  	cmpOps := cmpopts.SortSlices(func(a, b any) bool {
   154  		return a.(*v1.Pod).Name < b.(*v1.Pod).Name
   155  	})
   156  
   157  	// We should be able to lookup by index
   158  	res, err := informer.GetIndexer().ByIndex("labels", "a")
   159  	assert.NoError(err)
   160  	if diff := cmp.Diff([]any{pod1}, res); diff != "" {
   161  		t.Fatal(diff)
   162  	}
   163  
   164  	// Adding an item later is fine as well
   165  	source.Add(pod3)
   166  	// Event is async, need to poll
   167  	assert.Eventually(func() bool {
   168  		res, _ := informer.GetIndexer().ByIndex("labels", "a")
   169  		return cmp.Diff([]any{pod1, pod3}, res, cmpOps) == ""
   170  	}, time.Second*3, time.Millisecond)
   171  
   172  	// Adding an index later is also fine
   173  	err = informer.AddIndexers(map[string]IndexFunc{
   174  		"labels-again": func(obj interface{}) ([]string, error) {
   175  			res := []string{}
   176  			for k := range obj.(*v1.Pod).Labels {
   177  				res = append(res, k)
   178  			}
   179  			return res, nil
   180  		},
   181  	})
   182  	assert.NoError(err)
   183  
   184  	// Should be immediately available
   185  	res, err = informer.GetIndexer().ByIndex("labels-again", "a")
   186  	assert.NoError(err)
   187  	if diff := cmp.Diff([]any{pod1, pod3}, res, cmpOps); diff != "" {
   188  		t.Fatal(diff)
   189  	}
   190  	if got := informer.GetIndexer().ListIndexFuncValues("labels"); !sets.New(got...).Equal(sets.New("a", "b")) {
   191  		t.Fatalf("got %v", got)
   192  	}
   193  	if got := informer.GetIndexer().ListIndexFuncValues("labels-again"); !sets.New(got...).Equal(sets.New("a", "b")) {
   194  		t.Fatalf("got %v", got)
   195  	}
   196  }
   197  
   198  func TestListenerResyncPeriods(t *testing.T) {
   199  	// source simulates an apiserver object endpoint.
   200  	source := fcache.NewFakeControllerSource()
   201  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   202  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
   203  
   204  	// create the shared informer and resync every 1s
   205  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   206  
   207  	clock := testingclock.NewFakeClock(time.Now())
   208  	informer.clock = clock
   209  	informer.processor.clock = clock
   210  
   211  	// listener 1, never resync
   212  	listener1 := newTestListener("listener1", 0, "pod1", "pod2")
   213  	informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
   214  
   215  	// listener 2, resync every 2s
   216  	listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2")
   217  	informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
   218  
   219  	// listener 3, resync every 3s
   220  	listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2")
   221  	informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
   222  	listeners := []*testListener{listener1, listener2, listener3}
   223  
   224  	stop := make(chan struct{})
   225  	defer close(stop)
   226  
   227  	go informer.Run(stop)
   228  
   229  	// ensure all listeners got the initial List
   230  	for _, listener := range listeners {
   231  		if !listener.ok() {
   232  			t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
   233  		}
   234  	}
   235  
   236  	// reset
   237  	for _, listener := range listeners {
   238  		listener.receivedItemNames = []string{}
   239  	}
   240  
   241  	// advance so listener2 gets a resync
   242  	clock.Step(2 * time.Second)
   243  
   244  	// make sure listener2 got the resync
   245  	if !listener2.ok() {
   246  		t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
   247  	}
   248  
   249  	// wait a bit to give errant items a chance to go to 1 and 3
   250  	time.Sleep(1 * time.Second)
   251  
   252  	// make sure listeners 1 and 3 got nothing
   253  	if len(listener1.receivedItemNames) != 0 {
   254  		t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
   255  	}
   256  	if len(listener3.receivedItemNames) != 0 {
   257  		t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames))
   258  	}
   259  
   260  	// reset
   261  	for _, listener := range listeners {
   262  		listener.receivedItemNames = []string{}
   263  	}
   264  
   265  	// advance so listener3 gets a resync
   266  	clock.Step(1 * time.Second)
   267  
   268  	// make sure listener3 got the resync
   269  	if !listener3.ok() {
   270  		t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
   271  	}
   272  
   273  	// wait a bit to give errant items a chance to go to 1 and 2
   274  	time.Sleep(1 * time.Second)
   275  
   276  	// make sure listeners 1 and 2 got nothing
   277  	if len(listener1.receivedItemNames) != 0 {
   278  		t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
   279  	}
   280  	if len(listener2.receivedItemNames) != 0 {
   281  		t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames))
   282  	}
   283  }
   284  
   285  func TestResyncCheckPeriod(t *testing.T) {
   286  	// source simulates an apiserver object endpoint.
   287  	source := fcache.NewFakeControllerSource()
   288  
   289  	// create the shared informer and resync every 12 hours
   290  	informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
   291  	gl := informer.processor.getListener
   292  
   293  	clock := testingclock.NewFakeClock(time.Now())
   294  	informer.clock = clock
   295  	informer.processor.clock = clock
   296  
   297  	// listener 1, never resync
   298  	listener1 := newTestListener("listener1", 0)
   299  	handler1, _ := informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
   300  
   301  	if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a {
   302  		t.Errorf("expected %d, got %d", e, a)
   303  	}
   304  	if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
   305  		t.Errorf("expected %d, got %d", e, a)
   306  	}
   307  
   308  	// listener 2, resync every minute
   309  	listener2 := newTestListener("listener2", 1*time.Minute)
   310  	handler2, _ := informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
   311  	if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a {
   312  		t.Errorf("expected %d, got %d", e, a)
   313  	}
   314  	if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
   315  		t.Errorf("expected %d, got %d", e, a)
   316  	}
   317  	if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
   318  		t.Errorf("expected %d, got %d", e, a)
   319  	}
   320  
   321  	// listener 3, resync every 55 seconds
   322  	listener3 := newTestListener("listener3", 55*time.Second)
   323  	handler3, _ := informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
   324  	if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a {
   325  		t.Errorf("expected %d, got %d", e, a)
   326  	}
   327  	if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
   328  		t.Errorf("expected %d, got %d", e, a)
   329  	}
   330  	if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
   331  		t.Errorf("expected %d, got %d", e, a)
   332  	}
   333  	if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
   334  		t.Errorf("expected %d, got %d", e, a)
   335  	}
   336  
   337  	// listener 4, resync every 5 seconds
   338  	listener4 := newTestListener("listener4", 5*time.Second)
   339  	handler4, _ := informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod)
   340  	if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a {
   341  		t.Errorf("expected %d, got %d", e, a)
   342  	}
   343  	if e, a := time.Duration(0), gl(handler1).resyncPeriod; e != a {
   344  		t.Errorf("expected %d, got %d", e, a)
   345  	}
   346  	if e, a := 1*time.Minute, gl(handler2).resyncPeriod; e != a {
   347  		t.Errorf("expected %d, got %d", e, a)
   348  	}
   349  	if e, a := 55*time.Second, gl(handler3).resyncPeriod; e != a {
   350  		t.Errorf("expected %d, got %d", e, a)
   351  	}
   352  	if e, a := 5*time.Second, gl(handler4).resyncPeriod; e != a {
   353  		t.Errorf("expected %d, got %d", e, a)
   354  	}
   355  }
   356  
   357  // verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed
   358  func TestSharedInformerInitializationRace(t *testing.T) {
   359  	source := fcache.NewFakeControllerSource()
   360  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   361  	listener := newTestListener("raceListener", 0)
   362  
   363  	stop := make(chan struct{})
   364  	go informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
   365  	go informer.Run(stop)
   366  	close(stop)
   367  }
   368  
   369  // TestSharedInformerWatchDisruption simulates a watch that was closed
   370  // with updates to the store during that time. We ensure that handlers with
   371  // resync and no resync see the expected state.
   372  func TestSharedInformerWatchDisruption(t *testing.T) {
   373  	// source simulates an apiserver object endpoint.
   374  	source := fcache.NewFakeControllerSource()
   375  
   376  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
   377  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
   378  
   379  	// create the shared informer and resync every 1s
   380  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   381  
   382  	clock := testingclock.NewFakeClock(time.Now())
   383  	informer.clock = clock
   384  	informer.processor.clock = clock
   385  
   386  	// listener, never resync
   387  	listenerNoResync := newTestListener("listenerNoResync", 0, "pod1", "pod2")
   388  	informer.AddEventHandlerWithResyncPeriod(listenerNoResync, listenerNoResync.resyncPeriod)
   389  
   390  	listenerResync := newTestListener("listenerResync", 1*time.Second, "pod1", "pod2")
   391  	informer.AddEventHandlerWithResyncPeriod(listenerResync, listenerResync.resyncPeriod)
   392  	listeners := []*testListener{listenerNoResync, listenerResync}
   393  
   394  	stop := make(chan struct{})
   395  	defer close(stop)
   396  
   397  	go informer.Run(stop)
   398  
   399  	for _, listener := range listeners {
   400  		if !listener.ok() {
   401  			t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
   402  		}
   403  	}
   404  
   405  	// Add pod3, bump pod2 but don't broadcast it, so that the change will be seen only on relist
   406  	source.AddDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", UID: "pod3", ResourceVersion: "3"}})
   407  	source.ModifyDropWatch(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "4"}})
   408  
   409  	// Ensure that nobody saw any changes
   410  	for _, listener := range listeners {
   411  		if !listener.ok() {
   412  			t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
   413  		}
   414  	}
   415  
   416  	for _, listener := range listeners {
   417  		listener.receivedItemNames = []string{}
   418  	}
   419  
   420  	listenerNoResync.expectedItemNames = sets.NewString("pod2", "pod3")
   421  	listenerResync.expectedItemNames = sets.NewString("pod1", "pod2", "pod3")
   422  
   423  	// This calls shouldSync, which deletes noResync from the list of syncingListeners
   424  	clock.Step(1 * time.Second)
   425  
   426  	// Simulate a connection loss (or even just a too-old-watch)
   427  	source.ResetWatch()
   428  
   429  	// Wait long enough for the reflector to exit and the backoff function to start waiting
   430  	// on the fake clock, otherwise advancing the fake clock will have no effect.
   431  	// TODO: Make this deterministic by counting the number of waiters on FakeClock
   432  	time.Sleep(10 * time.Millisecond)
   433  
   434  	// Advance the clock to cause the backoff wait to expire.
   435  	clock.Step(1601 * time.Millisecond)
   436  
   437  	// Wait long enough for backoff to invoke ListWatch a second time and distribute events
   438  	// to listeners.
   439  	time.Sleep(10 * time.Millisecond)
   440  
   441  	for _, listener := range listeners {
   442  		if !listener.ok() {
   443  			t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
   444  		}
   445  	}
   446  }
   447  
   448  func TestSharedInformerErrorHandling(t *testing.T) {
   449  	source := fcache.NewFakeControllerSource()
   450  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   451  	source.ListError = fmt.Errorf("Access Denied")
   452  
   453  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   454  
   455  	errCh := make(chan error)
   456  	_ = informer.SetWatchErrorHandler(func(_ *Reflector, err error) {
   457  		errCh <- err
   458  	})
   459  
   460  	stop := make(chan struct{})
   461  	go informer.Run(stop)
   462  
   463  	select {
   464  	case err := <-errCh:
   465  		if !strings.Contains(err.Error(), "Access Denied") {
   466  			t.Errorf("Expected 'Access Denied' error. Actual: %v", err)
   467  		}
   468  	case <-time.After(time.Second):
   469  		t.Errorf("Timeout waiting for error handler call")
   470  	}
   471  	close(stop)
   472  }
   473  
   474  // TestSharedInformerStartRace is a regression test to ensure there is no race between
   475  // Run and SetWatchErrorHandler, and Run and SetTransform.
   476  func TestSharedInformerStartRace(t *testing.T) {
   477  	source := fcache.NewFakeControllerSource()
   478  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   479  	stop := make(chan struct{})
   480  	go func() {
   481  		for {
   482  			select {
   483  			case <-stop:
   484  				return
   485  			default:
   486  			}
   487  			// Set dummy functions, just to test for race
   488  			informer.SetTransform(func(i interface{}) (interface{}, error) {
   489  				return i, nil
   490  			})
   491  			informer.SetWatchErrorHandler(func(r *Reflector, err error) {
   492  			})
   493  		}
   494  	}()
   495  
   496  	go informer.Run(stop)
   497  
   498  	close(stop)
   499  }
   500  
   501  func TestSharedInformerTransformer(t *testing.T) {
   502  	// source simulates an apiserver object endpoint.
   503  	source := fcache.NewFakeControllerSource()
   504  
   505  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
   506  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
   507  
   508  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   509  	informer.SetTransform(func(obj interface{}) (interface{}, error) {
   510  		if pod, ok := obj.(*v1.Pod); ok {
   511  			name := pod.GetName()
   512  
   513  			if upper := strings.ToUpper(name); upper != name {
   514  				pod.SetName(upper)
   515  				return pod, nil
   516  			}
   517  		}
   518  		return obj, nil
   519  	})
   520  
   521  	listenerTransformer := newTestListener("listenerTransformer", 0, "POD1", "POD2")
   522  	informer.AddEventHandler(listenerTransformer)
   523  
   524  	stop := make(chan struct{})
   525  	go informer.Run(stop)
   526  	defer close(stop)
   527  
   528  	if !listenerTransformer.ok() {
   529  		t.Errorf("%s: expected %v, got %v", listenerTransformer.name, listenerTransformer.expectedItemNames, listenerTransformer.receivedItemNames)
   530  	}
   531  }
   532  
   533  func TestSharedInformerRemoveHandler(t *testing.T) {
   534  	source := fcache.NewFakeControllerSource()
   535  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   536  
   537  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
   538  
   539  	handler1 := &ResourceEventHandlerFuncs{}
   540  	handle1, err := informer.AddEventHandler(handler1)
   541  	if err != nil {
   542  		t.Errorf("informer did not add handler1: %s", err)
   543  		return
   544  	}
   545  	handler2 := &ResourceEventHandlerFuncs{}
   546  	handle2, err := informer.AddEventHandler(handler2)
   547  	if err != nil {
   548  		t.Errorf("informer did not add handler2: %s", err)
   549  		return
   550  	}
   551  
   552  	if eventHandlerCount(informer) != 2 {
   553  		t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
   554  	}
   555  
   556  	if err := informer.RemoveEventHandler(handle2); err != nil {
   557  		t.Errorf("removing of second pointer handler failed: %s", err)
   558  	}
   559  	if eventHandlerCount(informer) != 1 {
   560  		t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
   561  	}
   562  
   563  	if err := informer.RemoveEventHandler(handle1); err != nil {
   564  		t.Errorf("removing of first pointer handler failed: %s", err)
   565  	}
   566  	if eventHandlerCount(informer) != 0 {
   567  		t.Errorf("informer still has registered handlers after removing both handlers")
   568  	}
   569  }
   570  
   571  func TestSharedInformerRemoveForeignHandler(t *testing.T) {
   572  	source := fcache.NewFakeControllerSource()
   573  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   574  
   575  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   576  
   577  	source2 := fcache.NewFakeControllerSource()
   578  	source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   579  
   580  	informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   581  
   582  	handler1 := &ResourceEventHandlerFuncs{}
   583  	handle1, err := informer.AddEventHandler(handler1)
   584  	if err != nil {
   585  		t.Errorf("informer did not add handler1: %s", err)
   586  		return
   587  	}
   588  	handler2 := &ResourceEventHandlerFuncs{}
   589  	handle2, err := informer.AddEventHandler(handler2)
   590  	if err != nil {
   591  		t.Errorf("informer did not add handler2: %s", err)
   592  		return
   593  	}
   594  
   595  	if eventHandlerCount(informer) != 2 {
   596  		t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
   597  	}
   598  	if eventHandlerCount(informer2) != 0 {
   599  		t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
   600  	}
   601  
   602  	// remove handle at foreign informer
   603  	if isRegistered(informer2, handle1) {
   604  		t.Errorf("handle1 registered for informer2")
   605  	}
   606  	if isRegistered(informer2, handle2) {
   607  		t.Errorf("handle2 registered for informer2")
   608  	}
   609  	if err := informer2.RemoveEventHandler(handle1); err != nil {
   610  		t.Errorf("removing of second pointer handler failed: %s", err)
   611  	}
   612  	if eventHandlerCount(informer) != 2 {
   613  		t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
   614  	}
   615  	if eventHandlerCount(informer2) != 0 {
   616  		t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
   617  	}
   618  	if !isRegistered(informer, handle1) {
   619  		t.Errorf("handle1 not registered anymore for informer")
   620  	}
   621  	if !isRegistered(informer, handle2) {
   622  		t.Errorf("handle2 not registered anymore for informer")
   623  	}
   624  
   625  	if eventHandlerCount(informer) != 2 {
   626  		t.Errorf("informer has %d registered handler, instead of 2", eventHandlerCount(informer))
   627  	}
   628  	if eventHandlerCount(informer2) != 0 {
   629  		t.Errorf("informer2 has %d registered handler, instead of 0", eventHandlerCount(informer2))
   630  	}
   631  	if !isRegistered(informer, handle1) {
   632  		t.Errorf("handle1 not registered anymore for informer")
   633  	}
   634  	if !isRegistered(informer, handle2) {
   635  		t.Errorf("handle2 not registered anymore for informer")
   636  	}
   637  
   638  	if err := informer.RemoveEventHandler(handle2); err != nil {
   639  		t.Errorf("removing of second pointer handler failed: %s", err)
   640  	}
   641  	if eventHandlerCount(informer) != 1 {
   642  		t.Errorf("after removing handler informer has %d registered handler(s), instead of 1", eventHandlerCount(informer))
   643  	}
   644  
   645  	if err := informer.RemoveEventHandler(handle1); err != nil {
   646  		t.Errorf("removing of first pointer handler failed: %s", err)
   647  	}
   648  	if eventHandlerCount(informer) != 0 {
   649  		t.Errorf("informer still has registered handlers after removing both handlers")
   650  	}
   651  }
   652  
   653  func TestSharedInformerMultipleRegistration(t *testing.T) {
   654  	source := fcache.NewFakeControllerSource()
   655  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   656  
   657  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   658  
   659  	handler1 := &ResourceEventHandlerFuncs{}
   660  	reg1, err := informer.AddEventHandler(handler1)
   661  	if err != nil {
   662  		t.Errorf("informer did not add handler for the first time: %s", err)
   663  		return
   664  	}
   665  
   666  	if !isRegistered(informer, reg1) {
   667  		t.Errorf("handle1 is not active after successful registration")
   668  		return
   669  	}
   670  
   671  	reg2, err := informer.AddEventHandler(handler1)
   672  	if err != nil {
   673  		t.Errorf("informer did not add handler for the second: %s", err)
   674  		return
   675  	}
   676  
   677  	if !isRegistered(informer, reg2) {
   678  		t.Errorf("handle2 is not active after successful registration")
   679  		return
   680  	}
   681  
   682  	if eventHandlerCount(informer) != 2 {
   683  		t.Errorf("informer has %d registered handler(s), instead of 2", eventHandlerCount(informer))
   684  	}
   685  
   686  	if err := informer.RemoveEventHandler(reg1); err != nil {
   687  		t.Errorf("removing of duplicate handler registration failed: %s", err)
   688  	}
   689  
   690  	if isRegistered(informer, reg1) {
   691  		t.Errorf("handle1 is still active after successful remove")
   692  		return
   693  	}
   694  	if !isRegistered(informer, reg2) {
   695  		t.Errorf("handle2 is not active after removing handle1")
   696  		return
   697  	}
   698  
   699  	if eventHandlerCount(informer) != 1 {
   700  		if eventHandlerCount(informer) == 0 {
   701  			t.Errorf("informer has no registered handler anymore after removal of duplicate registrations")
   702  		} else {
   703  			t.Errorf("informer has unexpected number (%d) of handlers after removal of duplicate handler registration", eventHandlerCount(informer))
   704  		}
   705  	}
   706  
   707  	if err := informer.RemoveEventHandler(reg2); err != nil {
   708  		t.Errorf("removing of second handler registration failed: %s", err)
   709  	}
   710  
   711  	if isRegistered(informer, reg2) {
   712  		t.Errorf("handle2 is still active after successful remove")
   713  		return
   714  	}
   715  
   716  	if eventHandlerCount(informer) != 0 {
   717  		t.Errorf("informer has unexpected number (%d) of handlers after removal of second handler registrations", eventHandlerCount(informer))
   718  	}
   719  }
   720  
   721  func TestRemovingRemovedSharedInformer(t *testing.T) {
   722  	source := fcache.NewFakeControllerSource()
   723  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   724  
   725  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   726  	handler := &ResourceEventHandlerFuncs{}
   727  	reg, err := informer.AddEventHandler(handler)
   728  
   729  	if err != nil {
   730  		t.Errorf("informer did not add handler for the first time: %s", err)
   731  		return
   732  	}
   733  	if err := informer.RemoveEventHandler(reg); err != nil {
   734  		t.Errorf("removing of handler registration failed: %s", err)
   735  		return
   736  	}
   737  	if isRegistered(informer, reg) {
   738  		t.Errorf("handle is still active after successful remove")
   739  		return
   740  	}
   741  	if err := informer.RemoveEventHandler(reg); err != nil {
   742  		t.Errorf("removing of already removed registration yields unexpected error: %s", err)
   743  	}
   744  	if isRegistered(informer, reg) {
   745  		t.Errorf("handle is still active after second remove")
   746  		return
   747  	}
   748  }
   749  
   750  // Shows that many concurrent goroutines can be manipulating shared informer
   751  // listeners without tripping it up. There are not really many assertions in this
   752  // test. Meant to be run with -race to find race conditions
   753  func TestSharedInformerHandlerAbuse(t *testing.T) {
   754  	source := fcache.NewFakeControllerSource()
   755  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   756  
   757  	ctx, cancel := context.WithCancel(context.Background())
   758  	informerCtx, informerCancel := context.WithCancel(context.Background())
   759  	go func() {
   760  		informer.Run(informerCtx.Done())
   761  		cancel()
   762  	}()
   763  
   764  	worker := func() {
   765  		// Keep adding and removing handler
   766  		// Make sure no duplicate events?
   767  		funcs := ResourceEventHandlerDetailedFuncs{
   768  			AddFunc:    func(obj interface{}, isInInitialList bool) {},
   769  			UpdateFunc: func(oldObj, newObj interface{}) {},
   770  			DeleteFunc: func(obj interface{}) {},
   771  		}
   772  		handles := []ResourceEventHandlerRegistration{}
   773  
   774  		for {
   775  			select {
   776  			case <-ctx.Done():
   777  				return
   778  			default:
   779  				switch rand.Intn(2) {
   780  				case 0:
   781  					// Register handler again
   782  					reg, err := informer.AddEventHandlerWithResyncPeriod(funcs, 1*time.Second)
   783  					if err != nil {
   784  						if strings.Contains(err.Error(), "stopped already") {
   785  							// test is over
   786  							return
   787  						}
   788  						t.Errorf("failed to add handler: %v", err)
   789  						return
   790  					}
   791  					handles = append(handles, reg)
   792  				case 1:
   793  					//  Remove a random handler
   794  					if len(handles) == 0 {
   795  						continue
   796  					}
   797  
   798  					idx := rand.Intn(len(handles))
   799  					err := informer.RemoveEventHandler(handles[idx])
   800  					if err != nil {
   801  						if strings.Contains(err.Error(), "stopped already") {
   802  							// test is over
   803  							return
   804  						}
   805  						t.Errorf("failed to remove handler: %v", err)
   806  						return
   807  					}
   808  					handles = append(handles[:idx], handles[idx+1:]...)
   809  				}
   810  			}
   811  		}
   812  	}
   813  
   814  	wg := sync.WaitGroup{}
   815  	for i := 0; i < 100; i++ {
   816  		wg.Add(1)
   817  		go func() {
   818  			worker()
   819  			wg.Done()
   820  		}()
   821  	}
   822  
   823  	objs := []*v1.Pod{}
   824  
   825  	// While workers run, randomly create events for the informer
   826  	for i := 0; i < 10000; i++ {
   827  		if len(objs) == 0 {
   828  			// Make sure there is always an object
   829  			obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
   830  				Name: "pod" + strconv.Itoa(i),
   831  			}}
   832  			objs = append(objs, obj)
   833  
   834  			// deep copy before adding since the Modify function mutates the obj
   835  			source.Add(obj.DeepCopy())
   836  		}
   837  
   838  		switch rand.Intn(3) {
   839  		case 0:
   840  			// Add Object
   841  			obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{
   842  				Name: "pod" + strconv.Itoa(i),
   843  			}}
   844  			objs = append(objs, obj)
   845  			source.Add(obj.DeepCopy())
   846  		case 1:
   847  			// Update Object
   848  			idx := rand.Intn(len(objs))
   849  			source.Modify(objs[idx].DeepCopy())
   850  
   851  		case 2:
   852  			// Remove Object
   853  			idx := rand.Intn(len(objs))
   854  			source.Delete(objs[idx].DeepCopy())
   855  			objs = append(objs[:idx], objs[idx+1:]...)
   856  		}
   857  	}
   858  
   859  	// sotp informer which stops workers. stopping informer first to exercise
   860  	// contention for informer while it is closing
   861  	informerCancel()
   862  
   863  	// wait for workers to finish since they may throw errors
   864  	wg.Wait()
   865  }
   866  
   867  func TestStateSharedInformer(t *testing.T) {
   868  	source := fcache.NewFakeControllerSource()
   869  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   870  
   871  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   872  	listener := newTestListener("listener", 0, "pod1")
   873  	informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
   874  
   875  	if isStarted(informer) {
   876  		t.Errorf("informer already started after creation")
   877  		return
   878  	}
   879  	if informer.IsStopped() {
   880  		t.Errorf("informer already stopped after creation")
   881  		return
   882  	}
   883  	stop := make(chan struct{})
   884  	go informer.Run(stop)
   885  	if !listener.ok() {
   886  		t.Errorf("informer did not report initial objects")
   887  		close(stop)
   888  		return
   889  	}
   890  
   891  	if !isStarted(informer) {
   892  		t.Errorf("informer does not report to be started although handling events")
   893  		close(stop)
   894  		return
   895  	}
   896  	if informer.IsStopped() {
   897  		t.Errorf("informer reports to be stopped although stop channel not closed")
   898  		close(stop)
   899  		return
   900  	}
   901  
   902  	close(stop)
   903  	fmt.Println("sleeping")
   904  	time.Sleep(1 * time.Second)
   905  
   906  	if !informer.IsStopped() {
   907  		t.Errorf("informer reports not to be stopped although stop channel closed")
   908  		return
   909  	}
   910  	if !isStarted(informer) {
   911  		t.Errorf("informer reports not to be started after it has been started and stopped")
   912  		return
   913  	}
   914  }
   915  
   916  func TestAddOnStoppedSharedInformer(t *testing.T) {
   917  	source := fcache.NewFakeControllerSource()
   918  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   919  
   920  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   921  	listener := newTestListener("listener", 0, "pod1")
   922  	stop := make(chan struct{})
   923  	go informer.Run(stop)
   924  	close(stop)
   925  
   926  	err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
   927  		if informer.IsStopped() {
   928  			return true, nil
   929  		}
   930  		return false, nil
   931  	})
   932  
   933  	if err != nil {
   934  		t.Errorf("informer reports not to be stopped although stop channel closed")
   935  		return
   936  	}
   937  
   938  	_, err = informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
   939  	if err == nil {
   940  		t.Errorf("stopped informer did not reject add handler")
   941  		return
   942  	}
   943  	if !strings.HasSuffix(err.Error(), "was not added to shared informer because it has stopped already") {
   944  		t.Errorf("adding handler to a stopped informer yields unexpected error: %s", err)
   945  		return
   946  	}
   947  }
   948  
   949  func TestRemoveOnStoppedSharedInformer(t *testing.T) {
   950  	source := fcache.NewFakeControllerSource()
   951  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   952  
   953  	informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
   954  	listener := newTestListener("listener", 0, "pod1")
   955  	handle, err := informer.AddEventHandlerWithResyncPeriod(listener, listener.resyncPeriod)
   956  	if err != nil {
   957  		t.Errorf("informer did not add handler: %s", err)
   958  		return
   959  	}
   960  	stop := make(chan struct{})
   961  	go informer.Run(stop)
   962  	close(stop)
   963  	fmt.Println("sleeping")
   964  	time.Sleep(1 * time.Second)
   965  
   966  	if !informer.IsStopped() {
   967  		t.Errorf("informer reports not to be stopped although stop channel closed")
   968  		return
   969  	}
   970  	err = informer.RemoveEventHandler(handle)
   971  	if err != nil {
   972  		t.Errorf("informer does not remove handler on stopped informer")
   973  		return
   974  	}
   975  }
   976  
   977  func TestRemoveWhileActive(t *testing.T) {
   978  	// source simulates an apiserver object endpoint.
   979  	source := fcache.NewFakeControllerSource()
   980  
   981  	// create the shared informer and resync every 12 hours
   982  	informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
   983  
   984  	listener := newTestListener("listener", 0, "pod1")
   985  	handle, _ := informer.AddEventHandler(listener)
   986  
   987  	stop := make(chan struct{})
   988  	defer close(stop)
   989  
   990  	go informer.Run(stop)
   991  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
   992  
   993  	if !listener.ok() {
   994  		t.Errorf("event did not occur")
   995  		return
   996  	}
   997  
   998  	informer.RemoveEventHandler(handle)
   999  
  1000  	if isRegistered(informer, handle) {
  1001  		t.Errorf("handle is still active after successful remove")
  1002  		return
  1003  	}
  1004  
  1005  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
  1006  
  1007  	if !listener.ok() {
  1008  		t.Errorf("unexpected event occurred")
  1009  		return
  1010  	}
  1011  }
  1012  
  1013  func TestAddWhileActive(t *testing.T) {
  1014  	// source simulates an apiserver object endpoint.
  1015  	source := fcache.NewFakeControllerSource()
  1016  
  1017  	// create the shared informer and resync every 12 hours
  1018  	informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
  1019  	listener1 := newTestListener("originalListener", 0, "pod1")
  1020  	listener2 := newTestListener("listener2", 0, "pod1", "pod2")
  1021  	handle1, _ := informer.AddEventHandler(listener1)
  1022  
  1023  	if handle1.HasSynced() {
  1024  		t.Error("Synced before Run??")
  1025  	}
  1026  
  1027  	stop := make(chan struct{})
  1028  	defer close(stop)
  1029  
  1030  	go informer.Run(stop)
  1031  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
  1032  
  1033  	if !listener1.ok() {
  1034  		t.Errorf("events on listener1 did not occur")
  1035  		return
  1036  	}
  1037  
  1038  	if !handle1.HasSynced() {
  1039  		t.Error("Not synced after Run??")
  1040  	}
  1041  
  1042  	listener2.lock.Lock() // ensure we observe it before it has synced
  1043  	handle2, _ := informer.AddEventHandler(listener2)
  1044  	if handle2.HasSynced() {
  1045  		t.Error("Synced before processing anything?")
  1046  	}
  1047  	listener2.lock.Unlock() // permit it to proceed and sync
  1048  
  1049  	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
  1050  
  1051  	if !listener2.ok() {
  1052  		t.Errorf("event on listener2 did not occur")
  1053  		return
  1054  	}
  1055  
  1056  	if !handle2.HasSynced() {
  1057  		t.Error("Not synced even after processing?")
  1058  	}
  1059  
  1060  	if !isRegistered(informer, handle1) {
  1061  		t.Errorf("handle1 is not active")
  1062  		return
  1063  	}
  1064  	if !isRegistered(informer, handle2) {
  1065  		t.Errorf("handle2 is not active")
  1066  		return
  1067  	}
  1068  
  1069  	listener1.expectedItemNames = listener2.expectedItemNames
  1070  	if !listener1.ok() {
  1071  		t.Errorf("events on listener1 did not occur")
  1072  		return
  1073  	}
  1074  }
  1075  

View as plain text