...

Source file src/k8s.io/client-go/tools/cache/reflector_test.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"math/rand"
    24  	"reflect"
    25  	goruntime "runtime"
    26  	"strconv"
    27  	"syscall"
    28  	"testing"
    29  	"time"
    30  
    31  	"github.com/stretchr/testify/require"
    32  
    33  	v1 "k8s.io/api/core/v1"
    34  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    35  	"k8s.io/apimachinery/pkg/api/meta"
    36  	"k8s.io/apimachinery/pkg/api/resource"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    39  	"k8s.io/apimachinery/pkg/runtime"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	"k8s.io/apimachinery/pkg/util/sets"
    42  	"k8s.io/apimachinery/pkg/util/wait"
    43  	"k8s.io/apimachinery/pkg/watch"
    44  	"k8s.io/utils/clock"
    45  	testingclock "k8s.io/utils/clock/testing"
    46  )
    47  
    48  var nevererrc chan error
    49  
    50  type testLW struct {
    51  	ListFunc  func(options metav1.ListOptions) (runtime.Object, error)
    52  	WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
    53  }
    54  
    55  func (t *testLW) List(options metav1.ListOptions) (runtime.Object, error) {
    56  	return t.ListFunc(options)
    57  }
    58  func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
    59  	return t.WatchFunc(options)
    60  }
    61  
    62  func TestCloseWatchChannelOnError(t *testing.T) {
    63  	r := NewReflector(&testLW{}, &v1.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
    64  	pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
    65  	fw := watch.NewFake()
    66  	r.listerWatcher = &testLW{
    67  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    68  			return fw, nil
    69  		},
    70  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    71  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
    72  		},
    73  	}
    74  	go r.ListAndWatch(wait.NeverStop)
    75  	fw.Error(pod)
    76  	select {
    77  	case _, ok := <-fw.ResultChan():
    78  		if ok {
    79  			t.Errorf("Watch channel left open after cancellation")
    80  		}
    81  	case <-time.After(wait.ForeverTestTimeout):
    82  		t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
    83  		break
    84  	}
    85  }
    86  
    87  func TestRunUntil(t *testing.T) {
    88  	stopCh := make(chan struct{})
    89  	store := NewStore(MetaNamespaceKeyFunc)
    90  	r := NewReflector(&testLW{}, &v1.Pod{}, store, 0)
    91  	fw := watch.NewFake()
    92  	r.listerWatcher = &testLW{
    93  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    94  			return fw, nil
    95  		},
    96  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
    97  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
    98  		},
    99  	}
   100  	go r.Run(stopCh)
   101  	// Synchronously add a dummy pod into the watch channel so we
   102  	// know the RunUntil go routine is in the watch handler.
   103  	fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
   104  	close(stopCh)
   105  	select {
   106  	case _, ok := <-fw.ResultChan():
   107  		if ok {
   108  			t.Errorf("Watch channel left open after stopping the watch")
   109  		}
   110  	case <-time.After(wait.ForeverTestTimeout):
   111  		t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
   112  		break
   113  	}
   114  }
   115  
   116  func TestReflectorResyncChan(t *testing.T) {
   117  	s := NewStore(MetaNamespaceKeyFunc)
   118  	g := NewReflector(&testLW{}, &v1.Pod{}, s, time.Millisecond)
   119  	a, _ := g.resyncChan()
   120  	b := time.After(wait.ForeverTestTimeout)
   121  	select {
   122  	case <-a:
   123  		t.Logf("got timeout as expected")
   124  	case <-b:
   125  		t.Errorf("resyncChan() is at least 99 milliseconds late??")
   126  	}
   127  }
   128  
   129  // TestEstablishedWatchStoppedAfterStopCh ensures that
   130  // an established watch will be closed right after
   131  // the StopCh was also closed.
   132  func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) {
   133  	ctx, ctxCancel := context.WithCancel(context.TODO())
   134  	ctxCancel()
   135  	w := watch.NewFake()
   136  	require.False(t, w.IsStopped())
   137  
   138  	// w is stopped when the stopCh is closed
   139  	target := NewReflector(nil, &v1.Pod{}, nil, 0)
   140  	err := target.watch(w, ctx.Done(), nil)
   141  	require.NoError(t, err)
   142  	require.True(t, w.IsStopped())
   143  
   144  	// noop when the w is nil and the ctx is closed
   145  	err = target.watch(nil, ctx.Done(), nil)
   146  	require.NoError(t, err)
   147  }
   148  
   149  func BenchmarkReflectorResyncChanMany(b *testing.B) {
   150  	s := NewStore(MetaNamespaceKeyFunc)
   151  	g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond)
   152  	// The improvement to this (calling the timer's Stop() method) makes
   153  	// this benchmark about 40% faster.
   154  	for i := 0; i < b.N; i++ {
   155  		g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25)
   156  		_, stop := g.resyncChan()
   157  		stop()
   158  	}
   159  }
   160  
   161  func TestReflectorWatchHandlerError(t *testing.T) {
   162  	s := NewStore(MetaNamespaceKeyFunc)
   163  	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
   164  	fw := watch.NewFake()
   165  	go func() {
   166  		fw.Stop()
   167  	}()
   168  	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
   169  	if err == nil {
   170  		t.Errorf("unexpected non-error")
   171  	}
   172  }
   173  
   174  func TestReflectorWatchHandler(t *testing.T) {
   175  	s := NewStore(MetaNamespaceKeyFunc)
   176  	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
   177  	fw := watch.NewFake()
   178  	s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
   179  	s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
   180  	go func() {
   181  		fw.Add(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "rejected"}})
   182  		fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
   183  		fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
   184  		fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
   185  		fw.Stop()
   186  	}()
   187  	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
   188  	if err != nil {
   189  		t.Errorf("unexpected error %v", err)
   190  	}
   191  
   192  	mkPod := func(id string, rv string) *v1.Pod {
   193  		return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
   194  	}
   195  
   196  	table := []struct {
   197  		Pod    *v1.Pod
   198  		exists bool
   199  	}{
   200  		{mkPod("foo", ""), false},
   201  		{mkPod("rejected", ""), false},
   202  		{mkPod("bar", "55"), true},
   203  		{mkPod("baz", "32"), true},
   204  	}
   205  	for _, item := range table {
   206  		obj, exists, _ := s.Get(item.Pod)
   207  		if e, a := item.exists, exists; e != a {
   208  			t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
   209  		}
   210  		if !exists {
   211  			continue
   212  		}
   213  		if e, a := item.Pod.ResourceVersion, obj.(*v1.Pod).ResourceVersion; e != a {
   214  			t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
   215  		}
   216  	}
   217  
   218  	// RV should send the last version we see.
   219  	if e, a := "32", g.LastSyncResourceVersion(); e != a {
   220  		t.Errorf("expected %v, got %v", e, a)
   221  	}
   222  
   223  	// last sync resource version should be the last version synced with store
   224  	if e, a := "32", g.LastSyncResourceVersion(); e != a {
   225  		t.Errorf("expected %v, got %v", e, a)
   226  	}
   227  }
   228  
   229  func TestReflectorStopWatch(t *testing.T) {
   230  	s := NewStore(MetaNamespaceKeyFunc)
   231  	g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
   232  	fw := watch.NewFake()
   233  	stopWatch := make(chan struct{}, 1)
   234  	stopWatch <- struct{}{}
   235  	err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
   236  	if err != errorStopRequested {
   237  		t.Errorf("expected stop error, got %q", err)
   238  	}
   239  }
   240  
   241  func TestReflectorListAndWatch(t *testing.T) {
   242  	createdFakes := make(chan *watch.FakeWatcher)
   243  
   244  	// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
   245  	// to get called at the beginning of the watch with 1, and again with 3 when we
   246  	// inject an error.
   247  	expectedRVs := []string{"1", "3"}
   248  	lw := &testLW{
   249  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   250  			rv := options.ResourceVersion
   251  			fw := watch.NewFake()
   252  			if e, a := expectedRVs[0], rv; e != a {
   253  				t.Errorf("Expected rv %v, but got %v", e, a)
   254  			}
   255  			expectedRVs = expectedRVs[1:]
   256  			// channel is not buffered because the for loop below needs to block. But
   257  			// we don't want to block here, so report the new fake via a go routine.
   258  			go func() { createdFakes <- fw }()
   259  			return fw, nil
   260  		},
   261  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   262  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
   263  		},
   264  	}
   265  	s := NewFIFO(MetaNamespaceKeyFunc)
   266  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   267  	go r.ListAndWatch(wait.NeverStop)
   268  
   269  	ids := []string{"foo", "bar", "baz", "qux", "zoo"}
   270  	var fw *watch.FakeWatcher
   271  	for i, id := range ids {
   272  		if fw == nil {
   273  			fw = <-createdFakes
   274  		}
   275  		sendingRV := strconv.FormatUint(uint64(i+2), 10)
   276  		fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
   277  		if sendingRV == "3" {
   278  			// Inject a failure.
   279  			fw.Stop()
   280  			fw = nil
   281  		}
   282  	}
   283  
   284  	// Verify we received the right ids with the right resource versions.
   285  	for i, id := range ids {
   286  		pod := Pop(s).(*v1.Pod)
   287  		if e, a := id, pod.Name; e != a {
   288  			t.Errorf("%v: Expected %v, got %v", i, e, a)
   289  		}
   290  		if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {
   291  			t.Errorf("%v: Expected %v, got %v", i, e, a)
   292  		}
   293  	}
   294  
   295  	if len(expectedRVs) != 0 {
   296  		t.Error("called watchStarter an unexpected number of times")
   297  	}
   298  }
   299  
   300  func TestReflectorListAndWatchWithErrors(t *testing.T) {
   301  	mkPod := func(id string, rv string) *v1.Pod {
   302  		return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
   303  	}
   304  	mkList := func(rv string, pods ...*v1.Pod) *v1.PodList {
   305  		list := &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: rv}}
   306  		for _, pod := range pods {
   307  			list.Items = append(list.Items, *pod)
   308  		}
   309  		return list
   310  	}
   311  	table := []struct {
   312  		list     *v1.PodList
   313  		listErr  error
   314  		events   []watch.Event
   315  		watchErr error
   316  	}{
   317  		{
   318  			list: mkList("1"),
   319  			events: []watch.Event{
   320  				{Type: watch.Added, Object: mkPod("foo", "2")},
   321  				{Type: watch.Added, Object: mkPod("bar", "3")},
   322  			},
   323  		}, {
   324  			list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")),
   325  			events: []watch.Event{
   326  				{Type: watch.Deleted, Object: mkPod("foo", "4")},
   327  				{Type: watch.Added, Object: mkPod("qux", "5")},
   328  			},
   329  		}, {
   330  			listErr: fmt.Errorf("a list error"),
   331  		}, {
   332  			list:     mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
   333  			watchErr: fmt.Errorf("a watch error"),
   334  		}, {
   335  			list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
   336  			events: []watch.Event{
   337  				{Type: watch.Added, Object: mkPod("baz", "6")},
   338  			},
   339  		}, {
   340  			list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")),
   341  		},
   342  	}
   343  
   344  	s := NewFIFO(MetaNamespaceKeyFunc)
   345  	for line, item := range table {
   346  		if item.list != nil {
   347  			// Test that the list is what currently exists in the store.
   348  			current := s.List()
   349  			checkMap := map[string]string{}
   350  			for _, item := range current {
   351  				pod := item.(*v1.Pod)
   352  				checkMap[pod.Name] = pod.ResourceVersion
   353  			}
   354  			for _, pod := range item.list.Items {
   355  				if e, a := pod.ResourceVersion, checkMap[pod.Name]; e != a {
   356  					t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.Name)
   357  				}
   358  			}
   359  			if e, a := len(item.list.Items), len(checkMap); e != a {
   360  				t.Errorf("%v: expected %v, got %v", line, e, a)
   361  			}
   362  		}
   363  		watchRet, watchErr := item.events, item.watchErr
   364  		lw := &testLW{
   365  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   366  				if watchErr != nil {
   367  					return nil, watchErr
   368  				}
   369  				watchErr = fmt.Errorf("second watch")
   370  				fw := watch.NewFake()
   371  				go func() {
   372  					for _, e := range watchRet {
   373  						fw.Action(e.Type, e.Object)
   374  					}
   375  					fw.Stop()
   376  				}()
   377  				return fw, nil
   378  			},
   379  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   380  				return item.list, item.listErr
   381  			},
   382  		}
   383  		r := NewReflector(lw, &v1.Pod{}, s, 0)
   384  		r.ListAndWatch(wait.NeverStop)
   385  	}
   386  }
   387  
   388  func TestReflectorListAndWatchInitConnBackoff(t *testing.T) {
   389  	maxBackoff := 50 * time.Millisecond
   390  	table := []struct {
   391  		numConnFails  int
   392  		expLowerBound time.Duration
   393  		expUpperBound time.Duration
   394  	}{
   395  		{5, 32 * time.Millisecond, 64 * time.Millisecond}, // case where maxBackoff is not hit, time should grow exponentially
   396  		{40, 35 * 2 * maxBackoff, 40 * 2 * maxBackoff},    // case where maxBoff is hit, backoff time should flatten
   397  
   398  	}
   399  	for _, test := range table {
   400  		t.Run(fmt.Sprintf("%d connection failures takes at least %d ms", test.numConnFails, 1<<test.numConnFails),
   401  			func(t *testing.T) {
   402  				stopCh := make(chan struct{})
   403  				connFails := test.numConnFails
   404  				fakeClock := testingclock.NewFakeClock(time.Unix(0, 0))
   405  				bm := wait.NewExponentialBackoffManager(time.Millisecond, maxBackoff, 100*time.Millisecond, 2.0, 1.0, fakeClock)
   406  				done := make(chan struct{})
   407  				defer close(done)
   408  				go func() {
   409  					i := 0
   410  					for {
   411  						select {
   412  						case <-done:
   413  							return
   414  						default:
   415  						}
   416  						if fakeClock.HasWaiters() {
   417  							step := (1 << (i + 1)) * time.Millisecond
   418  							if step > maxBackoff*2 {
   419  								step = maxBackoff * 2
   420  							}
   421  							fakeClock.Step(step)
   422  							i++
   423  						}
   424  						time.Sleep(100 * time.Microsecond)
   425  					}
   426  				}()
   427  				lw := &testLW{
   428  					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   429  						if connFails > 0 {
   430  							connFails--
   431  							return nil, syscall.ECONNREFUSED
   432  						}
   433  						close(stopCh)
   434  						return watch.NewFake(), nil
   435  					},
   436  					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   437  						return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
   438  					},
   439  				}
   440  				r := &Reflector{
   441  					name:              "test-reflector",
   442  					listerWatcher:     lw,
   443  					store:             NewFIFO(MetaNamespaceKeyFunc),
   444  					backoffManager:    bm,
   445  					clock:             fakeClock,
   446  					watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
   447  				}
   448  				start := fakeClock.Now()
   449  				err := r.ListAndWatch(stopCh)
   450  				elapsed := fakeClock.Since(start)
   451  				if err != nil {
   452  					t.Errorf("unexpected error %v", err)
   453  				}
   454  				if elapsed < (test.expLowerBound) {
   455  					t.Errorf("expected lower bound of ListAndWatch: %v, got %v", test.expLowerBound, elapsed)
   456  				}
   457  				if elapsed > (test.expUpperBound) {
   458  					t.Errorf("expected upper bound of ListAndWatch: %v, got %v", test.expUpperBound, elapsed)
   459  				}
   460  			})
   461  	}
   462  }
   463  
   464  type fakeBackoff struct {
   465  	clock clock.Clock
   466  	calls int
   467  }
   468  
   469  func (f *fakeBackoff) Backoff() clock.Timer {
   470  	f.calls++
   471  	return f.clock.NewTimer(time.Duration(0))
   472  }
   473  
   474  func TestBackoffOnTooManyRequests(t *testing.T) {
   475  	err := apierrors.NewTooManyRequests("too many requests", 1)
   476  	clock := &clock.RealClock{}
   477  	bm := &fakeBackoff{clock: clock}
   478  
   479  	lw := &testLW{
   480  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   481  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
   482  		},
   483  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   484  			switch bm.calls {
   485  			case 0:
   486  				return nil, err
   487  			case 1:
   488  				w := watch.NewFakeWithChanSize(1, false)
   489  				status := err.Status()
   490  				w.Error(&status)
   491  				return w, nil
   492  			default:
   493  				w := watch.NewFake()
   494  				w.Stop()
   495  				return w, nil
   496  			}
   497  		},
   498  	}
   499  
   500  	r := &Reflector{
   501  		name:              "test-reflector",
   502  		listerWatcher:     lw,
   503  		store:             NewFIFO(MetaNamespaceKeyFunc),
   504  		backoffManager:    bm,
   505  		clock:             clock,
   506  		watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
   507  	}
   508  
   509  	stopCh := make(chan struct{})
   510  	r.ListAndWatch(stopCh)
   511  	close(stopCh)
   512  	if bm.calls != 2 {
   513  		t.Errorf("unexpected watch backoff calls: %d", bm.calls)
   514  	}
   515  }
   516  
   517  func TestRetryInternalError(t *testing.T) {
   518  	testCases := []struct {
   519  		name                string
   520  		maxInternalDuration time.Duration
   521  		rewindTime          int
   522  		wantRetries         int
   523  	}{
   524  		{
   525  			name:                "retries off",
   526  			maxInternalDuration: time.Duration(0),
   527  			wantRetries:         0,
   528  		},
   529  		{
   530  			name:                "retries on, all calls fail",
   531  			maxInternalDuration: time.Second * 30,
   532  			wantRetries:         31,
   533  		},
   534  		{
   535  			name:                "retries on, one call successful",
   536  			maxInternalDuration: time.Second * 30,
   537  			rewindTime:          10,
   538  			wantRetries:         40,
   539  		},
   540  	}
   541  
   542  	for _, tc := range testCases {
   543  		err := apierrors.NewInternalError(fmt.Errorf("etcdserver: no leader"))
   544  		fakeClock := testingclock.NewFakeClock(time.Now())
   545  		bm := &fakeBackoff{clock: fakeClock}
   546  
   547  		counter := 0
   548  
   549  		lw := &testLW{
   550  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   551  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
   552  			},
   553  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   554  				counter = counter + 1
   555  				t.Logf("Counter: %v", counter)
   556  				if counter == tc.rewindTime {
   557  					t.Logf("Rewinding")
   558  					fakeClock.Step(time.Minute)
   559  				}
   560  
   561  				fakeClock.Step(time.Second)
   562  				w := watch.NewFakeWithChanSize(1, false)
   563  				status := err.Status()
   564  				w.Error(&status)
   565  				return w, nil
   566  			},
   567  		}
   568  
   569  		r := &Reflector{
   570  			name:              "test-reflector",
   571  			listerWatcher:     lw,
   572  			store:             NewFIFO(MetaNamespaceKeyFunc),
   573  			backoffManager:    bm,
   574  			clock:             fakeClock,
   575  			watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
   576  		}
   577  
   578  		r.MaxInternalErrorRetryDuration = tc.maxInternalDuration
   579  
   580  		stopCh := make(chan struct{})
   581  		r.ListAndWatch(stopCh)
   582  		close(stopCh)
   583  
   584  		if counter-1 != tc.wantRetries {
   585  			t.Errorf("%v unexpected number of retries: %d", tc, counter-1)
   586  		}
   587  	}
   588  }
   589  
   590  func TestReflectorResync(t *testing.T) {
   591  	iteration := 0
   592  	stopCh := make(chan struct{})
   593  	rerr := errors.New("expected resync reached")
   594  	s := &FakeCustomStore{
   595  		ResyncFunc: func() error {
   596  			iteration++
   597  			if iteration == 2 {
   598  				return rerr
   599  			}
   600  			return nil
   601  		},
   602  	}
   603  
   604  	lw := &testLW{
   605  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   606  			fw := watch.NewFake()
   607  			return fw, nil
   608  		},
   609  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   610  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "0"}}, nil
   611  		},
   612  	}
   613  	resyncPeriod := 1 * time.Millisecond
   614  	r := NewReflector(lw, &v1.Pod{}, s, resyncPeriod)
   615  	if err := r.ListAndWatch(stopCh); err != nil {
   616  		// error from Resync is not propaged up to here.
   617  		t.Errorf("expected error %v", err)
   618  	}
   619  	if iteration != 2 {
   620  		t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
   621  	}
   622  }
   623  
   624  func TestReflectorWatchListPageSize(t *testing.T) {
   625  	stopCh := make(chan struct{})
   626  	s := NewStore(MetaNamespaceKeyFunc)
   627  
   628  	lw := &testLW{
   629  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   630  			// Stop once the reflector begins watching since we're only interested in the list.
   631  			close(stopCh)
   632  			fw := watch.NewFake()
   633  			return fw, nil
   634  		},
   635  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   636  			if options.Limit != 4 {
   637  				t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
   638  			}
   639  			pods := make([]v1.Pod, 10)
   640  			for i := 0; i < 10; i++ {
   641  				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
   642  			}
   643  			switch options.Continue {
   644  			case "":
   645  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
   646  			case "C1":
   647  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
   648  			case "C2":
   649  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
   650  			default:
   651  				t.Fatalf("Unrecognized continue: %s", options.Continue)
   652  			}
   653  			return nil, nil
   654  		},
   655  	}
   656  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   657  	// Set resource version to test pagination also for not consistent reads.
   658  	r.setLastSyncResourceVersion("10")
   659  	// Set the reflector to paginate the list request in 4 item chunks.
   660  	r.WatchListPageSize = 4
   661  	r.ListAndWatch(stopCh)
   662  
   663  	results := s.List()
   664  	if len(results) != 10 {
   665  		t.Errorf("Expected 10 results, got %d", len(results))
   666  	}
   667  }
   668  
   669  func TestReflectorNotPaginatingNotConsistentReads(t *testing.T) {
   670  	stopCh := make(chan struct{})
   671  	s := NewStore(MetaNamespaceKeyFunc)
   672  
   673  	lw := &testLW{
   674  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   675  			// Stop once the reflector begins watching since we're only interested in the list.
   676  			close(stopCh)
   677  			fw := watch.NewFake()
   678  			return fw, nil
   679  		},
   680  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   681  			if options.ResourceVersion != "10" {
   682  				t.Fatalf("Expected ResourceVersion: \"10\", got: %s", options.ResourceVersion)
   683  			}
   684  			if options.Limit != 0 {
   685  				t.Fatalf("Expected list Limit of 0 but got %d", options.Limit)
   686  			}
   687  			pods := make([]v1.Pod, 10)
   688  			for i := 0; i < 10; i++ {
   689  				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
   690  			}
   691  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods}, nil
   692  		},
   693  	}
   694  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   695  	r.setLastSyncResourceVersion("10")
   696  	r.ListAndWatch(stopCh)
   697  
   698  	results := s.List()
   699  	if len(results) != 10 {
   700  		t.Errorf("Expected 10 results, got %d", len(results))
   701  	}
   702  }
   703  
   704  func TestReflectorPaginatingNonConsistentReadsIfWatchCacheDisabled(t *testing.T) {
   705  	var stopCh chan struct{}
   706  	s := NewStore(MetaNamespaceKeyFunc)
   707  
   708  	lw := &testLW{
   709  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   710  			// Stop once the reflector begins watching since we're only interested in the list.
   711  			close(stopCh)
   712  			fw := watch.NewFake()
   713  			return fw, nil
   714  		},
   715  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   716  			// Check that default pager limit is set.
   717  			if options.Limit != 500 {
   718  				t.Fatalf("Expected list Limit of 500 but got %d", options.Limit)
   719  			}
   720  			pods := make([]v1.Pod, 10)
   721  			for i := 0; i < 10; i++ {
   722  				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
   723  			}
   724  			switch options.Continue {
   725  			case "":
   726  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
   727  			case "C1":
   728  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
   729  			case "C2":
   730  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
   731  			default:
   732  				t.Fatalf("Unrecognized continue: %s", options.Continue)
   733  			}
   734  			return nil, nil
   735  		},
   736  	}
   737  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   738  
   739  	// Initial list should initialize paginatedResult in the reflector.
   740  	stopCh = make(chan struct{})
   741  	r.ListAndWatch(stopCh)
   742  	if results := s.List(); len(results) != 10 {
   743  		t.Errorf("Expected 10 results, got %d", len(results))
   744  	}
   745  
   746  	// Since initial list for ResourceVersion="0" was paginated, the subsequent
   747  	// ones should also be paginated.
   748  	stopCh = make(chan struct{})
   749  	r.ListAndWatch(stopCh)
   750  	if results := s.List(); len(results) != 10 {
   751  		t.Errorf("Expected 10 results, got %d", len(results))
   752  	}
   753  }
   754  
   755  // TestReflectorResyncWithResourceVersion ensures that a reflector keeps track of the ResourceVersion and sends
   756  // it in relist requests to prevent the reflector from traveling back in time if the relist is to a api-server or
   757  // etcd that is partitioned and serving older data than the reflector has already processed.
   758  func TestReflectorResyncWithResourceVersion(t *testing.T) {
   759  	stopCh := make(chan struct{})
   760  	s := NewStore(MetaNamespaceKeyFunc)
   761  	listCallRVs := []string{}
   762  
   763  	lw := &testLW{
   764  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   765  			// Stop once the reflector begins watching since we're only interested in the list.
   766  			close(stopCh)
   767  			fw := watch.NewFake()
   768  			return fw, nil
   769  		},
   770  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   771  			listCallRVs = append(listCallRVs, options.ResourceVersion)
   772  			pods := make([]v1.Pod, 8)
   773  			for i := 0; i < 8; i++ {
   774  				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
   775  			}
   776  			switch options.ResourceVersion {
   777  			case "0":
   778  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
   779  			case "10":
   780  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
   781  			default:
   782  				t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
   783  			}
   784  			return nil, nil
   785  		},
   786  	}
   787  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   788  
   789  	// Initial list should use RV=0
   790  	r.ListAndWatch(stopCh)
   791  
   792  	results := s.List()
   793  	if len(results) != 4 {
   794  		t.Errorf("Expected 4 results, got %d", len(results))
   795  	}
   796  
   797  	// relist should use lastSyncResourceVersions (RV=10)
   798  	stopCh = make(chan struct{})
   799  	r.ListAndWatch(stopCh)
   800  
   801  	results = s.List()
   802  	if len(results) != 8 {
   803  		t.Errorf("Expected 8 results, got %d", len(results))
   804  	}
   805  
   806  	expectedRVs := []string{"0", "10"}
   807  	if !reflect.DeepEqual(listCallRVs, expectedRVs) {
   808  		t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
   809  	}
   810  }
   811  
   812  // TestReflectorExpiredExactResourceVersion tests that a reflector handles the behavior of kubernetes 1.16 an earlier
   813  // where if the exact ResourceVersion requested is not available for a List request for a non-zero ResourceVersion,
   814  // an "Expired" error is returned if the ResourceVersion has expired (etcd has compacted it).
   815  // (In kubernetes 1.17, or when the watch cache is enabled, the List will instead return the list that is no older than
   816  // the requested ResourceVersion).
   817  func TestReflectorExpiredExactResourceVersion(t *testing.T) {
   818  	stopCh := make(chan struct{})
   819  	s := NewStore(MetaNamespaceKeyFunc)
   820  	listCallRVs := []string{}
   821  
   822  	lw := &testLW{
   823  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   824  			// Stop once the reflector begins watching since we're only interested in the list.
   825  			close(stopCh)
   826  			fw := watch.NewFake()
   827  			return fw, nil
   828  		},
   829  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   830  			listCallRVs = append(listCallRVs, options.ResourceVersion)
   831  			pods := make([]v1.Pod, 8)
   832  			for i := 0; i < 8; i++ {
   833  				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
   834  			}
   835  			switch options.ResourceVersion {
   836  			case "0":
   837  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
   838  			case "10":
   839  				// When watch cache is disabled, if the exact ResourceVersion requested is not available, a "Expired" error is returned.
   840  				return nil, apierrors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
   841  			case "":
   842  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
   843  			default:
   844  				t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
   845  			}
   846  			return nil, nil
   847  		},
   848  	}
   849  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   850  
   851  	// Initial list should use RV=0
   852  	r.ListAndWatch(stopCh)
   853  
   854  	results := s.List()
   855  	if len(results) != 4 {
   856  		t.Errorf("Expected 4 results, got %d", len(results))
   857  	}
   858  
   859  	// relist should use lastSyncResourceVersions (RV=10) and since RV=10 is expired, it should retry with RV="".
   860  	stopCh = make(chan struct{})
   861  	r.ListAndWatch(stopCh)
   862  
   863  	results = s.List()
   864  	if len(results) != 8 {
   865  		t.Errorf("Expected 8 results, got %d", len(results))
   866  	}
   867  
   868  	expectedRVs := []string{"0", "10", ""}
   869  	if !reflect.DeepEqual(listCallRVs, expectedRVs) {
   870  		t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
   871  	}
   872  }
   873  
   874  func TestReflectorFullListIfExpired(t *testing.T) {
   875  	stopCh := make(chan struct{})
   876  	s := NewStore(MetaNamespaceKeyFunc)
   877  	listCallRVs := []string{}
   878  
   879  	lw := &testLW{
   880  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   881  			// Stop once the reflector begins watching since we're only interested in the list.
   882  			close(stopCh)
   883  			fw := watch.NewFake()
   884  			return fw, nil
   885  		},
   886  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   887  			listCallRVs = append(listCallRVs, options.ResourceVersion)
   888  			pods := make([]v1.Pod, 8)
   889  			for i := 0; i < 8; i++ {
   890  				pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
   891  			}
   892  			rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions {
   893  				return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l}
   894  			}
   895  			switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) {
   896  			// initial limited list
   897  			case rvContinueLimit("0", "", 4):
   898  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
   899  			// first page of the rv=10 list
   900  			case rvContinueLimit("10", "", 4):
   901  				return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
   902  			// second page of the above list
   903  			case rvContinueLimit("", "C1", 4):
   904  				return nil, apierrors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
   905  			// rv=10 unlimited list
   906  			case rvContinueLimit("10", "", 0):
   907  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
   908  			default:
   909  				err := fmt.Errorf("unexpected list options: %#v", options)
   910  				t.Error(err)
   911  				return nil, err
   912  			}
   913  		},
   914  	}
   915  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   916  	r.WatchListPageSize = 4
   917  
   918  	// Initial list should use RV=0
   919  	if err := r.ListAndWatch(stopCh); err != nil {
   920  		t.Fatal(err)
   921  	}
   922  
   923  	results := s.List()
   924  	if len(results) != 4 {
   925  		t.Errorf("Expected 4 results, got %d", len(results))
   926  	}
   927  
   928  	// relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10
   929  	stopCh = make(chan struct{})
   930  	if err := r.ListAndWatch(stopCh); err != nil {
   931  		t.Fatal(err)
   932  	}
   933  
   934  	results = s.List()
   935  	if len(results) != 8 {
   936  		t.Errorf("Expected 8 results, got %d", len(results))
   937  	}
   938  
   939  	expectedRVs := []string{"0", "10", "", "10"}
   940  	if !reflect.DeepEqual(listCallRVs, expectedRVs) {
   941  		t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs)
   942  	}
   943  }
   944  
   945  func TestReflectorFullListIfTooLarge(t *testing.T) {
   946  	stopCh := make(chan struct{})
   947  	s := NewStore(MetaNamespaceKeyFunc)
   948  	listCallRVs := []string{}
   949  	version := 30
   950  
   951  	lw := &testLW{
   952  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   953  			// Stop once the reflector begins watching since we're only interested in the list.
   954  			close(stopCh)
   955  			fw := watch.NewFake()
   956  			return fw, nil
   957  		},
   958  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   959  			listCallRVs = append(listCallRVs, options.ResourceVersion)
   960  			resourceVersion := strconv.Itoa(version)
   961  
   962  			switch options.ResourceVersion {
   963  			// initial list
   964  			case "0":
   965  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "20"}}, nil
   966  			// relist after the initial list
   967  			case "20":
   968  				err := apierrors.NewTimeoutError("too large resource version", 1)
   969  				err.ErrStatus.Details.Causes = []metav1.StatusCause{{Type: metav1.CauseTypeResourceVersionTooLarge}}
   970  				return nil, err
   971  			// relist after the initial list (covers the error format used in api server 1.17.0-1.18.5)
   972  			case "30":
   973  				err := apierrors.NewTimeoutError("too large resource version", 1)
   974  				err.ErrStatus.Details.Causes = []metav1.StatusCause{{Message: "Too large resource version"}}
   975  				return nil, err
   976  			// relist after the initial list (covers the error format used in api server before 1.17.0)
   977  			case "40":
   978  				err := apierrors.NewTimeoutError("Too large resource version", 1)
   979  				return nil, err
   980  			// relist from etcd after "too large" error
   981  			case "":
   982  				version += 10
   983  				return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: resourceVersion}}, nil
   984  			default:
   985  				return nil, fmt.Errorf("unexpected List call: %s", options.ResourceVersion)
   986  			}
   987  		},
   988  	}
   989  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   990  
   991  	// Initial list should use RV=0
   992  	if err := r.ListAndWatch(stopCh); err != nil {
   993  		t.Fatal(err)
   994  	}
   995  
   996  	// Relist from the future version.
   997  	// This may happen, as watchcache is initialized from "current global etcd resource version"
   998  	// when kube-apiserver is starting and if no objects are changing after that each kube-apiserver
   999  	// may be synced to a different version and they will never converge.
  1000  	// TODO: We should use etcd progress-notify feature to avoid this behavior but until this is
  1001  	// done we simply try to relist from now to avoid continuous errors on relists.
  1002  	for i := 1; i <= 3; i++ {
  1003  		// relist twice to cover the two variants of TooLargeResourceVersion api errors
  1004  		stopCh = make(chan struct{})
  1005  		if err := r.ListAndWatch(stopCh); err != nil {
  1006  			t.Fatal(err)
  1007  		}
  1008  	}
  1009  
  1010  	expectedRVs := []string{"0", "20", "", "30", "", "40", ""}
  1011  	if !reflect.DeepEqual(listCallRVs, expectedRVs) {
  1012  		t.Errorf("Expected series of list calls with resource version of %#v but got: %#v", expectedRVs, listCallRVs)
  1013  	}
  1014  }
  1015  
  1016  func TestGetTypeDescriptionFromObject(t *testing.T) {
  1017  	obj := &unstructured.Unstructured{}
  1018  	gvk := schema.GroupVersionKind{
  1019  		Group:   "mygroup",
  1020  		Version: "v1",
  1021  		Kind:    "MyKind",
  1022  	}
  1023  	obj.SetGroupVersionKind(gvk)
  1024  
  1025  	testCases := map[string]struct {
  1026  		inputType               interface{}
  1027  		expectedTypeDescription string
  1028  	}{
  1029  		"Nil type": {
  1030  			expectedTypeDescription: defaultExpectedTypeName,
  1031  		},
  1032  		"Normal type": {
  1033  			inputType:               &v1.Pod{},
  1034  			expectedTypeDescription: "*v1.Pod",
  1035  		},
  1036  		"Unstructured type without GVK": {
  1037  			inputType:               &unstructured.Unstructured{},
  1038  			expectedTypeDescription: "*unstructured.Unstructured",
  1039  		},
  1040  		"Unstructured type with GVK": {
  1041  			inputType:               obj,
  1042  			expectedTypeDescription: gvk.String(),
  1043  		},
  1044  	}
  1045  	for testName, tc := range testCases {
  1046  		t.Run(testName, func(t *testing.T) {
  1047  			typeDescription := getTypeDescriptionFromObject(tc.inputType)
  1048  			if tc.expectedTypeDescription != typeDescription {
  1049  				t.Fatalf("Expected typeDescription %v, got %v", tc.expectedTypeDescription, typeDescription)
  1050  			}
  1051  		})
  1052  	}
  1053  }
  1054  
  1055  func TestGetExpectedGVKFromObject(t *testing.T) {
  1056  	obj := &unstructured.Unstructured{}
  1057  	gvk := schema.GroupVersionKind{
  1058  		Group:   "mygroup",
  1059  		Version: "v1",
  1060  		Kind:    "MyKind",
  1061  	}
  1062  	obj.SetGroupVersionKind(gvk)
  1063  
  1064  	testCases := map[string]struct {
  1065  		inputType   interface{}
  1066  		expectedGVK *schema.GroupVersionKind
  1067  	}{
  1068  		"Nil type": {},
  1069  		"Some non Unstructured type": {
  1070  			inputType: &v1.Pod{},
  1071  		},
  1072  		"Unstructured type without GVK": {
  1073  			inputType: &unstructured.Unstructured{},
  1074  		},
  1075  		"Unstructured type with GVK": {
  1076  			inputType:   obj,
  1077  			expectedGVK: &gvk,
  1078  		},
  1079  	}
  1080  	for testName, tc := range testCases {
  1081  		t.Run(testName, func(t *testing.T) {
  1082  			expectedGVK := getExpectedGVKFromObject(tc.inputType)
  1083  			gvkNotEqual := (tc.expectedGVK == nil) != (expectedGVK == nil)
  1084  			if tc.expectedGVK != nil && expectedGVK != nil {
  1085  				gvkNotEqual = *tc.expectedGVK != *expectedGVK
  1086  			}
  1087  			if gvkNotEqual {
  1088  				t.Fatalf("Expected expectedGVK %v, got %v", tc.expectedGVK, expectedGVK)
  1089  			}
  1090  		})
  1091  	}
  1092  }
  1093  
  1094  type storeWithRV struct {
  1095  	Store
  1096  
  1097  	// resourceVersions tracks values passed by UpdateResourceVersion
  1098  	resourceVersions []string
  1099  }
  1100  
  1101  func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) {
  1102  	s.resourceVersions = append(s.resourceVersions, resourceVersion)
  1103  }
  1104  
  1105  func newStoreWithRV() *storeWithRV {
  1106  	return &storeWithRV{
  1107  		Store: NewStore(MetaNamespaceKeyFunc),
  1108  	}
  1109  }
  1110  
  1111  func TestReflectorResourceVersionUpdate(t *testing.T) {
  1112  	s := newStoreWithRV()
  1113  
  1114  	stopCh := make(chan struct{})
  1115  	fw := watch.NewFake()
  1116  
  1117  	lw := &testLW{
  1118  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
  1119  			return fw, nil
  1120  		},
  1121  		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
  1122  			return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
  1123  		},
  1124  	}
  1125  	r := NewReflector(lw, &v1.Pod{}, s, 0)
  1126  
  1127  	makePod := func(rv string) *v1.Pod {
  1128  		return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}}
  1129  	}
  1130  
  1131  	go func() {
  1132  		fw.Action(watch.Added, makePod("10"))
  1133  		fw.Action(watch.Modified, makePod("20"))
  1134  		fw.Action(watch.Bookmark, makePod("30"))
  1135  		fw.Action(watch.Deleted, makePod("40"))
  1136  		close(stopCh)
  1137  	}()
  1138  
  1139  	// Initial list should use RV=0
  1140  	if err := r.ListAndWatch(stopCh); err != nil {
  1141  		t.Fatal(err)
  1142  	}
  1143  
  1144  	expectedRVs := []string{"10", "20", "30", "40"}
  1145  	if !reflect.DeepEqual(s.resourceVersions, expectedRVs) {
  1146  		t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
  1147  	}
  1148  }
  1149  
  1150  const (
  1151  	fakeItemsNum      = 100
  1152  	exemptObjectIndex = fakeItemsNum / 4
  1153  	pageNum           = 3
  1154  )
  1155  
  1156  func getPodListItems(start int, numItems int) (string, string, *v1.PodList) {
  1157  	out := &v1.PodList{
  1158  		Items: make([]v1.Pod, numItems),
  1159  	}
  1160  
  1161  	for i := 0; i < numItems; i++ {
  1162  
  1163  		out.Items[i] = v1.Pod{
  1164  			TypeMeta: metav1.TypeMeta{
  1165  				APIVersion: "v1",
  1166  				Kind:       "Pod",
  1167  			},
  1168  			ObjectMeta: metav1.ObjectMeta{
  1169  				Name:      fmt.Sprintf("pod-%d", i+start),
  1170  				Namespace: "default",
  1171  				Labels: map[string]string{
  1172  					"label-key-1": "label-value-1",
  1173  				},
  1174  				Annotations: map[string]string{
  1175  					"annotations-key-1": "annotations-value-1",
  1176  				},
  1177  			},
  1178  			Spec: v1.PodSpec{
  1179  				Overhead: v1.ResourceList{
  1180  					v1.ResourceCPU:    resource.MustParse("3"),
  1181  					v1.ResourceMemory: resource.MustParse("8"),
  1182  				},
  1183  				NodeSelector: map[string]string{
  1184  					"foo": "bar",
  1185  					"baz": "quux",
  1186  				},
  1187  				Affinity: &v1.Affinity{
  1188  					NodeAffinity: &v1.NodeAffinity{
  1189  						RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
  1190  							NodeSelectorTerms: []v1.NodeSelectorTerm{
  1191  								{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}},
  1192  							},
  1193  						},
  1194  						PreferredDuringSchedulingIgnoredDuringExecution: []v1.PreferredSchedulingTerm{
  1195  							{Preference: v1.NodeSelectorTerm{MatchExpressions: []v1.NodeSelectorRequirement{{Key: `foo`}}}},
  1196  						},
  1197  					},
  1198  				},
  1199  				TopologySpreadConstraints: []v1.TopologySpreadConstraint{
  1200  					{TopologyKey: `foo`},
  1201  				},
  1202  				HostAliases: []v1.HostAlias{
  1203  					{IP: "1.1.1.1"},
  1204  					{IP: "2.2.2.2"},
  1205  				},
  1206  				ImagePullSecrets: []v1.LocalObjectReference{
  1207  					{Name: "secret1"},
  1208  					{Name: "secret2"},
  1209  				},
  1210  				Containers: []v1.Container{
  1211  					{
  1212  						Name:  "foobar",
  1213  						Image: "alpine",
  1214  						Resources: v1.ResourceRequirements{
  1215  							Requests: v1.ResourceList{
  1216  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("1"),
  1217  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
  1218  							},
  1219  							Limits: v1.ResourceList{
  1220  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("2"),
  1221  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("10"),
  1222  							},
  1223  						},
  1224  					},
  1225  					{
  1226  						Name:  "foobar2",
  1227  						Image: "alpine",
  1228  						Resources: v1.ResourceRequirements{
  1229  							Requests: v1.ResourceList{
  1230  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("4"),
  1231  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("12"),
  1232  							},
  1233  							Limits: v1.ResourceList{
  1234  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("8"),
  1235  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("24"),
  1236  							},
  1237  						},
  1238  					},
  1239  				},
  1240  				InitContainers: []v1.Container{
  1241  					{
  1242  						Name:  "small-init",
  1243  						Image: "alpine",
  1244  						Resources: v1.ResourceRequirements{
  1245  							Requests: v1.ResourceList{
  1246  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("1"),
  1247  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
  1248  							},
  1249  							Limits: v1.ResourceList{
  1250  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("1"),
  1251  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("5"),
  1252  							},
  1253  						},
  1254  					},
  1255  					{
  1256  						Name:  "big-init",
  1257  						Image: "alpine",
  1258  						Resources: v1.ResourceRequirements{
  1259  							Requests: v1.ResourceList{
  1260  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("40"),
  1261  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("120"),
  1262  							},
  1263  							Limits: v1.ResourceList{
  1264  								v1.ResourceName(v1.ResourceCPU):    resource.MustParse("80"),
  1265  								v1.ResourceName(v1.ResourceMemory): resource.MustParse("240"),
  1266  							},
  1267  						},
  1268  					},
  1269  				},
  1270  				Hostname: fmt.Sprintf("node-%d", i),
  1271  			},
  1272  			Status: v1.PodStatus{
  1273  				Phase: v1.PodRunning,
  1274  				ContainerStatuses: []v1.ContainerStatus{
  1275  					{
  1276  						ContainerID: "docker://numbers",
  1277  						Image:       "alpine",
  1278  						Name:        "foobar",
  1279  						Ready:       false,
  1280  					},
  1281  					{
  1282  						ContainerID: "docker://numbers",
  1283  						Image:       "alpine",
  1284  						Name:        "foobar2",
  1285  						Ready:       false,
  1286  					},
  1287  				},
  1288  				InitContainerStatuses: []v1.ContainerStatus{
  1289  					{
  1290  						ContainerID: "docker://numbers",
  1291  						Image:       "alpine",
  1292  						Name:        "small-init",
  1293  						Ready:       false,
  1294  					},
  1295  					{
  1296  						ContainerID: "docker://numbers",
  1297  						Image:       "alpine",
  1298  						Name:        "big-init",
  1299  						Ready:       false,
  1300  					},
  1301  				},
  1302  				Conditions: []v1.PodCondition{
  1303  					{
  1304  						Type:               v1.PodScheduled,
  1305  						Status:             v1.ConditionTrue,
  1306  						Reason:             "successfully",
  1307  						Message:            "sync pod successfully",
  1308  						LastProbeTime:      metav1.Now(),
  1309  						LastTransitionTime: metav1.Now(),
  1310  					},
  1311  				},
  1312  			},
  1313  		}
  1314  	}
  1315  
  1316  	return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
  1317  }
  1318  
  1319  func getConfigmapListItems(start int, numItems int) (string, string, *v1.ConfigMapList) {
  1320  	out := &v1.ConfigMapList{
  1321  		Items: make([]v1.ConfigMap, numItems),
  1322  	}
  1323  
  1324  	for i := 0; i < numItems; i++ {
  1325  		out.Items[i] = v1.ConfigMap{
  1326  			TypeMeta: metav1.TypeMeta{
  1327  				APIVersion: "v1",
  1328  				Kind:       "ConfigMap",
  1329  			},
  1330  			ObjectMeta: metav1.ObjectMeta{
  1331  				Name:      fmt.Sprintf("cm-%d", i+start),
  1332  				Namespace: "default",
  1333  				Labels: map[string]string{
  1334  					"label-key-1": "label-value-1",
  1335  				},
  1336  				Annotations: map[string]string{
  1337  					"annotations-key-1": "annotations-value-1",
  1338  				},
  1339  			},
  1340  			Data: map[string]string{
  1341  				"data-1": "value-1",
  1342  				"data-2": "value-2",
  1343  			},
  1344  		}
  1345  	}
  1346  
  1347  	return out.Items[0].GetName(), out.Items[exemptObjectIndex].GetName(), out
  1348  }
  1349  
  1350  type TestPagingPodsLW struct {
  1351  	totalPageCount   int
  1352  	fetchedPageCount int
  1353  
  1354  	detectedObjectNameList []string
  1355  	exemptObjectNameList   []string
  1356  }
  1357  
  1358  func newPageTestLW(totalPageNum int) *TestPagingPodsLW {
  1359  	return &TestPagingPodsLW{
  1360  		totalPageCount:   totalPageNum,
  1361  		fetchedPageCount: 0,
  1362  	}
  1363  }
  1364  
  1365  func (t *TestPagingPodsLW) List(options metav1.ListOptions) (runtime.Object, error) {
  1366  	firstPodName, exemptPodName, list := getPodListItems(t.fetchedPageCount*fakeItemsNum, fakeItemsNum)
  1367  	t.detectedObjectNameList = append(t.detectedObjectNameList, firstPodName)
  1368  	t.exemptObjectNameList = append(t.exemptObjectNameList, exemptPodName)
  1369  	t.fetchedPageCount++
  1370  	if t.fetchedPageCount >= t.totalPageCount {
  1371  		return list, nil
  1372  	}
  1373  	list.SetContinue("true")
  1374  	return list, nil
  1375  }
  1376  
  1377  func (t *TestPagingPodsLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
  1378  	return nil, nil
  1379  }
  1380  
  1381  func TestReflectorListExtract(t *testing.T) {
  1382  	store := NewStore(func(obj interface{}) (string, error) {
  1383  		pod, ok := obj.(*v1.Pod)
  1384  		if !ok {
  1385  			return "", fmt.Errorf("expect *v1.Pod, but got %T", obj)
  1386  		}
  1387  		return pod.GetName(), nil
  1388  	})
  1389  
  1390  	lw := newPageTestLW(5)
  1391  	reflector := NewReflector(lw, &v1.Pod{}, store, 0)
  1392  	reflector.WatchListPageSize = fakeItemsNum
  1393  
  1394  	// execute list to fill store
  1395  	stopCh := make(chan struct{})
  1396  	if err := reflector.list(stopCh); err != nil {
  1397  		t.Fatal(err)
  1398  	}
  1399  
  1400  	// We will not delete exemptPod,
  1401  	// in order to see if the existence of this Pod causes other Pods that are not used to be unable to properly clear.
  1402  	for _, podName := range lw.exemptObjectNameList {
  1403  		_, exist, err := store.GetByKey(podName)
  1404  		if err != nil || !exist {
  1405  			t.Fatalf("%s should exist in pod store", podName)
  1406  		}
  1407  	}
  1408  
  1409  	// we will pay attention to whether the memory occupied by the first Pod is released
  1410  	// Golang's can only be SetFinalizer for the first element of the array,
  1411  	// so pod-0 will be the object of our attention
  1412  	detectedPodAlreadyBeCleared := make(chan struct{}, len(lw.detectedObjectNameList))
  1413  
  1414  	for _, firstPodName := range lw.detectedObjectNameList {
  1415  		_, exist, err := store.GetByKey(firstPodName)
  1416  		if err != nil || !exist {
  1417  			t.Fatalf("%s should exist in pod store", firstPodName)
  1418  		}
  1419  		firstPod, exist, err := store.GetByKey(firstPodName)
  1420  		if err != nil || !exist {
  1421  			t.Fatalf("%s should exist in pod store", firstPodName)
  1422  		}
  1423  		goruntime.SetFinalizer(firstPod, func(obj interface{}) {
  1424  			t.Logf("%s already be gc\n", obj.(*v1.Pod).GetName())
  1425  			detectedPodAlreadyBeCleared <- struct{}{}
  1426  		})
  1427  	}
  1428  
  1429  	storedObjectKeys := store.ListKeys()
  1430  	for _, k := range storedObjectKeys {
  1431  		// delete all Pods except the exempted Pods.
  1432  		if sets.NewString(lw.exemptObjectNameList...).Has(k) {
  1433  			continue
  1434  		}
  1435  		obj, exist, err := store.GetByKey(k)
  1436  		if err != nil || !exist {
  1437  			t.Fatalf("%s should exist in pod store", k)
  1438  		}
  1439  
  1440  		if err := store.Delete(obj); err != nil {
  1441  			t.Fatalf("delete object: %v", err)
  1442  		}
  1443  		goruntime.GC()
  1444  	}
  1445  
  1446  	clearedNum := 0
  1447  	for {
  1448  		select {
  1449  		case <-detectedPodAlreadyBeCleared:
  1450  			clearedNum++
  1451  			if clearedNum == len(lw.detectedObjectNameList) {
  1452  				return
  1453  			}
  1454  		}
  1455  	}
  1456  }
  1457  
  1458  func BenchmarkExtractList(b *testing.B) {
  1459  	_, _, podList := getPodListItems(0, fakeItemsNum)
  1460  	_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
  1461  	tests := []struct {
  1462  		name string
  1463  		list runtime.Object
  1464  	}{
  1465  		{
  1466  			name: "PodList",
  1467  			list: podList,
  1468  		},
  1469  		{
  1470  			name: "ConfigMapList",
  1471  			list: configMapList,
  1472  		},
  1473  	}
  1474  
  1475  	for _, tc := range tests {
  1476  		b.Run(tc.name, func(b *testing.B) {
  1477  			b.ResetTimer()
  1478  			for i := 0; i < b.N; i++ {
  1479  				_, err := meta.ExtractList(tc.list)
  1480  				if err != nil {
  1481  					b.Errorf("extract list: %v", err)
  1482  				}
  1483  			}
  1484  			b.StopTimer()
  1485  		})
  1486  	}
  1487  }
  1488  
  1489  func BenchmarkEachListItem(b *testing.B) {
  1490  	_, _, podList := getPodListItems(0, fakeItemsNum)
  1491  	_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
  1492  	tests := []struct {
  1493  		name string
  1494  		list runtime.Object
  1495  	}{
  1496  		{
  1497  			name: "PodList",
  1498  			list: podList,
  1499  		},
  1500  		{
  1501  			name: "ConfigMapList",
  1502  			list: configMapList,
  1503  		},
  1504  	}
  1505  
  1506  	for _, tc := range tests {
  1507  		b.Run(tc.name, func(b *testing.B) {
  1508  			b.ResetTimer()
  1509  			for i := 0; i < b.N; i++ {
  1510  				err := meta.EachListItem(tc.list, func(object runtime.Object) error {
  1511  					return nil
  1512  				})
  1513  				if err != nil {
  1514  					b.Errorf("each list: %v", err)
  1515  				}
  1516  			}
  1517  			b.StopTimer()
  1518  		})
  1519  	}
  1520  }
  1521  
  1522  func BenchmarkExtractListWithAlloc(b *testing.B) {
  1523  	_, _, podList := getPodListItems(0, fakeItemsNum)
  1524  	_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
  1525  	tests := []struct {
  1526  		name string
  1527  		list runtime.Object
  1528  	}{
  1529  		{
  1530  			name: "PodList",
  1531  			list: podList,
  1532  		},
  1533  		{
  1534  			name: "ConfigMapList",
  1535  			list: configMapList,
  1536  		},
  1537  	}
  1538  
  1539  	for _, tc := range tests {
  1540  		b.Run(tc.name, func(b *testing.B) {
  1541  			b.ResetTimer()
  1542  			for i := 0; i < b.N; i++ {
  1543  				_, err := meta.ExtractListWithAlloc(tc.list)
  1544  				if err != nil {
  1545  					b.Errorf("extract list with alloc: %v", err)
  1546  				}
  1547  			}
  1548  			b.StopTimer()
  1549  		})
  1550  	}
  1551  }
  1552  
  1553  func BenchmarkEachListItemWithAlloc(b *testing.B) {
  1554  	_, _, podList := getPodListItems(0, fakeItemsNum)
  1555  	_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
  1556  	tests := []struct {
  1557  		name string
  1558  		list runtime.Object
  1559  	}{
  1560  		{
  1561  			name: "PodList",
  1562  			list: podList,
  1563  		},
  1564  		{
  1565  			name: "ConfigMapList",
  1566  			list: configMapList,
  1567  		},
  1568  	}
  1569  
  1570  	for _, tc := range tests {
  1571  		b.Run(tc.name, func(b *testing.B) {
  1572  			b.ResetTimer()
  1573  			for i := 0; i < b.N; i++ {
  1574  				err := meta.EachListItemWithAlloc(tc.list, func(object runtime.Object) error {
  1575  					return nil
  1576  				})
  1577  				if err != nil {
  1578  					b.Errorf("each list with alloc: %v", err)
  1579  				}
  1580  			}
  1581  			b.StopTimer()
  1582  		})
  1583  	}
  1584  }
  1585  
  1586  func BenchmarkReflectorList(b *testing.B) {
  1587  	ctx, cancel := context.WithTimeout(context.Background(), wait.ForeverTestTimeout)
  1588  	defer cancel()
  1589  
  1590  	store := NewStore(func(obj interface{}) (string, error) {
  1591  		o, err := meta.Accessor(obj)
  1592  		if err != nil {
  1593  			return "", err
  1594  		}
  1595  		return o.GetName(), nil
  1596  	})
  1597  
  1598  	_, _, podList := getPodListItems(0, fakeItemsNum)
  1599  	_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
  1600  	tests := []struct {
  1601  		name   string
  1602  		sample func() interface{}
  1603  		list   runtime.Object
  1604  	}{
  1605  		{
  1606  			name: "PodList",
  1607  			sample: func() interface{} {
  1608  				return v1.Pod{}
  1609  			},
  1610  			list: podList,
  1611  		},
  1612  		{
  1613  			name: "ConfigMapList",
  1614  			sample: func() interface{} {
  1615  				return v1.ConfigMap{}
  1616  			},
  1617  			list: configMapList,
  1618  		},
  1619  	}
  1620  
  1621  	for _, tc := range tests {
  1622  		b.Run(tc.name, func(b *testing.B) {
  1623  
  1624  			sample := tc.sample()
  1625  			reflector := NewReflector(newPageTestLW(pageNum), &sample, store, 0)
  1626  			reflector.WatchListPageSize = fakeItemsNum
  1627  
  1628  			b.ResetTimer()
  1629  			for i := 0; i < b.N; i++ {
  1630  				err := reflector.list(ctx.Done())
  1631  				if err != nil {
  1632  					b.Fatalf("reflect list: %v", err)
  1633  				}
  1634  			}
  1635  			b.StopTimer()
  1636  		})
  1637  	}
  1638  }
  1639  

View as plain text