...

Source file src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"fmt"
    21  	"sort"
    22  	"sync"
    23  	"testing"
    24  
    25  	"github.com/google/go-cmp/cmp"
    26  	"github.com/google/go-cmp/cmp/cmpopts"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/watch"
    35  	"k8s.io/utils/pointer"
    36  	"k8s.io/utils/ptr"
    37  )
    38  
    39  func TestWatchList(t *testing.T) {
    40  	scenarios := []struct {
    41  		name                string
    42  		disableUseWatchList bool
    43  
    44  		// closes listWatcher after sending the specified number of watch events
    45  		closeAfterWatchEvents int
    46  		// closes listWatcher after getting the specified number of watch requests
    47  		closeAfterWatchRequests int
    48  		// closes listWatcher after getting the specified number of list requests
    49  		closeAfterListRequests int
    50  
    51  		// stops Watcher after sending the specified number of watch events
    52  		stopAfterWatchEvents int
    53  
    54  		watchOptionsPredicate func(options metav1.ListOptions) error
    55  		watchEvents           []watch.Event
    56  		podList               *v1.PodList
    57  
    58  		expectedRequestOptions []metav1.ListOptions
    59  		expectedWatchRequests  int
    60  		expectedListRequests   int
    61  		expectedStoreContent   []v1.Pod
    62  		expectedError          error
    63  	}{
    64  		{
    65  			name:                  "the reflector won't be synced if the bookmark event has been received",
    66  			watchEvents:           []watch.Event{{Type: watch.Added, Object: makePod("p1", "1")}},
    67  			closeAfterWatchEvents: 1,
    68  			expectedWatchRequests: 1,
    69  			expectedRequestOptions: []metav1.ListOptions{{
    70  				SendInitialEvents:    pointer.Bool(true),
    71  				AllowWatchBookmarks:  true,
    72  				ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
    73  				TimeoutSeconds:       pointer.Int64(1),
    74  			}},
    75  		},
    76  		{
    77  			name:                    "the reflector uses the old LIST/WATCH semantics if the UseWatchList is turned off",
    78  			disableUseWatchList:     true,
    79  			closeAfterWatchRequests: 1,
    80  			podList: &v1.PodList{
    81  				ListMeta: metav1.ListMeta{ResourceVersion: "1"},
    82  				Items:    []v1.Pod{*makePod("p1", "1")},
    83  			},
    84  			expectedWatchRequests: 1,
    85  			expectedListRequests:  1,
    86  			expectedRequestOptions: []metav1.ListOptions{
    87  				{
    88  					ResourceVersion: "0",
    89  					Limit:           500,
    90  				},
    91  				{
    92  					AllowWatchBookmarks: true,
    93  					ResourceVersion:     "1",
    94  					TimeoutSeconds:      pointer.Int64(1),
    95  				}},
    96  			expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
    97  		},
    98  		{
    99  			name: "returning any other error than apierrors.NewInvalid forces fallback",
   100  			watchOptionsPredicate: func(options metav1.ListOptions) error {
   101  				if options.SendInitialEvents != nil && *options.SendInitialEvents {
   102  					return fmt.Errorf("dummy error")
   103  				}
   104  				return nil
   105  			},
   106  			podList: &v1.PodList{
   107  				ListMeta: metav1.ListMeta{ResourceVersion: "1"},
   108  				Items:    []v1.Pod{*makePod("p1", "1")},
   109  			},
   110  			closeAfterWatchEvents: 1,
   111  			watchEvents:           []watch.Event{{Type: watch.Added, Object: makePod("p2", "2")}},
   112  			expectedWatchRequests: 2,
   113  			expectedListRequests:  1,
   114  			expectedStoreContent:  []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
   115  			expectedRequestOptions: []metav1.ListOptions{
   116  				{
   117  					SendInitialEvents:    pointer.Bool(true),
   118  					AllowWatchBookmarks:  true,
   119  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   120  					TimeoutSeconds:       pointer.Int64(1),
   121  				},
   122  				{
   123  					ResourceVersion: "0",
   124  					Limit:           500,
   125  				},
   126  				{
   127  					AllowWatchBookmarks: true,
   128  					ResourceVersion:     "1",
   129  					TimeoutSeconds:      pointer.Int64(1),
   130  				},
   131  			},
   132  		},
   133  		{
   134  			name: "the reflector can fall back to old LIST/WATCH semantics when a server doesn't support streaming",
   135  			watchOptionsPredicate: func(options metav1.ListOptions) error {
   136  				if options.SendInitialEvents != nil && *options.SendInitialEvents {
   137  					return apierrors.NewInvalid(schema.GroupKind{}, "streaming is not allowed", nil)
   138  				}
   139  				return nil
   140  			},
   141  			podList: &v1.PodList{
   142  				ListMeta: metav1.ListMeta{ResourceVersion: "1"},
   143  				Items:    []v1.Pod{*makePod("p1", "1")},
   144  			},
   145  			closeAfterWatchEvents: 1,
   146  			watchEvents:           []watch.Event{{Type: watch.Added, Object: makePod("p2", "2")}},
   147  			expectedWatchRequests: 2,
   148  			expectedListRequests:  1,
   149  			expectedStoreContent:  []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
   150  			expectedRequestOptions: []metav1.ListOptions{
   151  				{
   152  					SendInitialEvents:    pointer.Bool(true),
   153  					AllowWatchBookmarks:  true,
   154  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   155  					TimeoutSeconds:       pointer.Int64(1),
   156  				},
   157  				{
   158  					ResourceVersion: "0",
   159  					Limit:           500,
   160  				},
   161  				{
   162  					AllowWatchBookmarks: true,
   163  					ResourceVersion:     "1",
   164  					TimeoutSeconds:      pointer.Int64(1),
   165  				},
   166  			},
   167  		},
   168  		{
   169  			name:                  "prove that the reflector is synced after receiving a bookmark event",
   170  			closeAfterWatchEvents: 3,
   171  			watchEvents: []watch.Event{
   172  				{Type: watch.Added, Object: makePod("p1", "1")},
   173  				{Type: watch.Added, Object: makePod("p2", "2")},
   174  				{Type: watch.Bookmark, Object: &v1.Pod{
   175  					ObjectMeta: metav1.ObjectMeta{
   176  						ResourceVersion: "2",
   177  						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
   178  					},
   179  				}},
   180  			},
   181  			expectedWatchRequests: 1,
   182  			expectedRequestOptions: []metav1.ListOptions{{
   183  				SendInitialEvents:    pointer.Bool(true),
   184  				AllowWatchBookmarks:  true,
   185  				ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   186  				TimeoutSeconds:       pointer.Int64(1),
   187  			}},
   188  			expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2")},
   189  		},
   190  		{
   191  			name:                  "check if Updates and Deletes events are propagated during streaming (until the bookmark is received)",
   192  			closeAfterWatchEvents: 6,
   193  			watchEvents: []watch.Event{
   194  				{Type: watch.Added, Object: makePod("p1", "1")},
   195  				{Type: watch.Added, Object: makePod("p2", "2")},
   196  				{Type: watch.Modified, Object: func() runtime.Object {
   197  					p1 := makePod("p1", "3")
   198  					p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12)
   199  					return p1
   200  				}()},
   201  				{Type: watch.Added, Object: makePod("p3", "4")},
   202  				{Type: watch.Deleted, Object: makePod("p3", "5")},
   203  				{Type: watch.Bookmark, Object: &v1.Pod{
   204  					ObjectMeta: metav1.ObjectMeta{
   205  						ResourceVersion: "5",
   206  						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
   207  					},
   208  				}},
   209  			},
   210  			expectedWatchRequests: 1,
   211  			expectedRequestOptions: []metav1.ListOptions{{
   212  				SendInitialEvents:    pointer.Bool(true),
   213  				AllowWatchBookmarks:  true,
   214  				ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   215  				TimeoutSeconds:       pointer.Int64(1),
   216  			}},
   217  			expectedStoreContent: []v1.Pod{
   218  				*makePod("p2", "2"),
   219  				func() v1.Pod {
   220  					p1 := *makePod("p1", "3")
   221  					p1.Spec.ActiveDeadlineSeconds = pointer.Int64(12)
   222  					return p1
   223  				}(),
   224  			},
   225  		},
   226  		{
   227  			name: "checks if the reflector retries 429",
   228  			watchOptionsPredicate: func() func(options metav1.ListOptions) error {
   229  				counter := 1
   230  				return func(options metav1.ListOptions) error {
   231  					if counter < 3 {
   232  						counter++
   233  						return apierrors.NewTooManyRequests("busy, check again later", 1)
   234  					}
   235  					return nil
   236  				}
   237  			}(),
   238  			closeAfterWatchEvents: 2,
   239  			watchEvents: []watch.Event{
   240  				{Type: watch.Added, Object: makePod("p1", "1")},
   241  				{Type: watch.Bookmark, Object: &v1.Pod{
   242  					ObjectMeta: metav1.ObjectMeta{
   243  						ResourceVersion: "2",
   244  						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
   245  					},
   246  				}},
   247  			},
   248  			expectedWatchRequests: 3,
   249  			expectedRequestOptions: []metav1.ListOptions{
   250  				{
   251  					SendInitialEvents:    pointer.Bool(true),
   252  					AllowWatchBookmarks:  true,
   253  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   254  					TimeoutSeconds:       pointer.Int64(1),
   255  				},
   256  				{
   257  					SendInitialEvents:    pointer.Bool(true),
   258  					AllowWatchBookmarks:  true,
   259  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   260  					TimeoutSeconds:       pointer.Int64(1),
   261  				},
   262  				{
   263  					SendInitialEvents:    pointer.Bool(true),
   264  					AllowWatchBookmarks:  true,
   265  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   266  					TimeoutSeconds:       pointer.Int64(1),
   267  				},
   268  			},
   269  			expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
   270  		},
   271  		{
   272  			name:                  "check if stopping a watcher before sync results in creating a new watch-list request",
   273  			stopAfterWatchEvents:  1,
   274  			closeAfterWatchEvents: 3,
   275  			watchEvents: []watch.Event{
   276  				{Type: watch.Added, Object: makePod("p1", "1")},
   277  				// second request
   278  				{Type: watch.Added, Object: makePod("p1", "1")},
   279  				{Type: watch.Bookmark, Object: &v1.Pod{
   280  					ObjectMeta: metav1.ObjectMeta{
   281  						ResourceVersion: "1",
   282  						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
   283  					},
   284  				}},
   285  			},
   286  			expectedWatchRequests: 2,
   287  			expectedRequestOptions: []metav1.ListOptions{
   288  				{
   289  					SendInitialEvents:    pointer.Bool(true),
   290  					AllowWatchBookmarks:  true,
   291  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   292  					TimeoutSeconds:       pointer.Int64(1),
   293  				},
   294  				{
   295  					SendInitialEvents:    pointer.Bool(true),
   296  					AllowWatchBookmarks:  true,
   297  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   298  					TimeoutSeconds:       pointer.Int64(1),
   299  				},
   300  			},
   301  			expectedStoreContent: []v1.Pod{*makePod("p1", "1")},
   302  		},
   303  		{
   304  			name:                  "stopping a watcher after synchronization results in creating a new watch request",
   305  			stopAfterWatchEvents:  4,
   306  			closeAfterWatchEvents: 5,
   307  			watchEvents: []watch.Event{
   308  				{Type: watch.Added, Object: makePod("p1", "1")},
   309  				{Type: watch.Added, Object: makePod("p2", "2")},
   310  				{Type: watch.Bookmark, Object: &v1.Pod{
   311  					ObjectMeta: metav1.ObjectMeta{
   312  						ResourceVersion: "2",
   313  						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
   314  					},
   315  				}},
   316  				{Type: watch.Added, Object: makePod("p3", "3")},
   317  				// second request
   318  				{Type: watch.Added, Object: makePod("p4", "4")},
   319  			},
   320  			expectedWatchRequests: 2,
   321  			expectedRequestOptions: []metav1.ListOptions{
   322  				{
   323  					SendInitialEvents:    pointer.Bool(true),
   324  					AllowWatchBookmarks:  true,
   325  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   326  					TimeoutSeconds:       pointer.Int64(1),
   327  				},
   328  				{
   329  					AllowWatchBookmarks: true,
   330  					ResourceVersion:     "3",
   331  					TimeoutSeconds:      pointer.Int64(1),
   332  				},
   333  			},
   334  			expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p2", "2"), *makePod("p3", "3"), *makePod("p4", "4")},
   335  		},
   336  		{
   337  			name: "expiring an established watcher results in returning an error from the reflector",
   338  			watchOptionsPredicate: func() func(options metav1.ListOptions) error {
   339  				counter := 0
   340  				return func(options metav1.ListOptions) error {
   341  					counter++
   342  					if counter == 2 {
   343  						return apierrors.NewResourceExpired("rv already expired")
   344  					}
   345  					return nil
   346  				}
   347  			}(),
   348  			stopAfterWatchEvents: 3,
   349  			watchEvents: []watch.Event{
   350  				{Type: watch.Added, Object: makePod("p1", "1")},
   351  				{Type: watch.Bookmark, Object: &v1.Pod{
   352  					ObjectMeta: metav1.ObjectMeta{
   353  						ResourceVersion: "2",
   354  						Annotations:     map[string]string{"k8s.io/initial-events-end": "true"},
   355  					},
   356  				}},
   357  				{Type: watch.Added, Object: makePod("p3", "3")},
   358  			},
   359  			expectedWatchRequests: 2,
   360  			expectedRequestOptions: []metav1.ListOptions{
   361  				{
   362  					SendInitialEvents:    pointer.Bool(true),
   363  					AllowWatchBookmarks:  true,
   364  					ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   365  					TimeoutSeconds:       pointer.Int64(1),
   366  				},
   367  				{
   368  					AllowWatchBookmarks: true,
   369  					ResourceVersion:     "3",
   370  					TimeoutSeconds:      pointer.Int64(1),
   371  				},
   372  			},
   373  			expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")},
   374  			expectedError:        apierrors.NewResourceExpired("rv already expired"),
   375  		},
   376  		{
   377  			name:                  "prove that the reflector is checking the value of the initialEventsEnd annotation",
   378  			closeAfterWatchEvents: 3,
   379  			watchEvents: []watch.Event{
   380  				{Type: watch.Added, Object: makePod("p1", "1")},
   381  				{Type: watch.Added, Object: makePod("p2", "2")},
   382  				{Type: watch.Bookmark, Object: &v1.Pod{
   383  					ObjectMeta: metav1.ObjectMeta{
   384  						ResourceVersion: "2",
   385  						Annotations:     map[string]string{"k8s.io/initial-events-end": "false"},
   386  					},
   387  				}},
   388  			},
   389  			expectedWatchRequests: 1,
   390  			expectedRequestOptions: []metav1.ListOptions{{
   391  				SendInitialEvents:    pointer.Bool(true),
   392  				AllowWatchBookmarks:  true,
   393  				ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   394  				TimeoutSeconds:       pointer.Int64(1),
   395  			}},
   396  		},
   397  	}
   398  	for _, s := range scenarios {
   399  		t.Run(s.name, func(t *testing.T) {
   400  			scenario := s // capture as local variable
   401  			listWatcher, store, reflector, stopCh := testData()
   402  			go func() {
   403  				for i, e := range scenario.watchEvents {
   404  					listWatcher.fakeWatcher.Action(e.Type, e.Object)
   405  					if i+1 == scenario.stopAfterWatchEvents {
   406  						listWatcher.StopAndRecreateWatch()
   407  						continue
   408  					}
   409  					if i+1 == scenario.closeAfterWatchEvents {
   410  						close(stopCh)
   411  					}
   412  				}
   413  			}()
   414  			listWatcher.watchOptionsPredicate = scenario.watchOptionsPredicate
   415  			listWatcher.closeAfterWatchRequests = scenario.closeAfterWatchRequests
   416  			listWatcher.customListResponse = scenario.podList
   417  			listWatcher.closeAfterListRequests = scenario.closeAfterListRequests
   418  			if scenario.disableUseWatchList {
   419  				reflector.UseWatchList = ptr.To(false)
   420  			}
   421  
   422  			err := reflector.ListAndWatch(stopCh)
   423  			if scenario.expectedError != nil && err == nil {
   424  				t.Fatalf("expected error %q, got nil", scenario.expectedError)
   425  			}
   426  			if scenario.expectedError == nil && err != nil {
   427  				t.Fatalf("unexpected error: %v", err)
   428  			}
   429  			if scenario.expectedError != nil && err.Error() != scenario.expectedError.Error() {
   430  				t.Fatalf("expected error %q, got %q", scenario.expectedError, err.Error())
   431  			}
   432  
   433  			verifyWatchCounter(t, listWatcher, scenario.expectedWatchRequests)
   434  			verifyListCounter(t, listWatcher, scenario.expectedListRequests)
   435  			verifyRequestOptions(t, listWatcher, scenario.expectedRequestOptions)
   436  			verifyStore(t, store, scenario.expectedStoreContent)
   437  		})
   438  	}
   439  }
   440  
   441  func verifyRequestOptions(t *testing.T, lw *fakeListWatcher, expectedRequestOptions []metav1.ListOptions) {
   442  	if len(lw.requestOptions) != len(expectedRequestOptions) {
   443  		t.Fatalf("expected to receive exactly %v requests, got %v", len(expectedRequestOptions), len(lw.requestOptions))
   444  	}
   445  
   446  	for index, expectedRequestOption := range expectedRequestOptions {
   447  		actualRequestOption := lw.requestOptions[index]
   448  		if actualRequestOption.TimeoutSeconds == nil && expectedRequestOption.TimeoutSeconds != nil {
   449  			t.Fatalf("expected the request to specify TimeoutSeconds option but it didn't, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption)
   450  		}
   451  		if actualRequestOption.TimeoutSeconds != nil && expectedRequestOption.TimeoutSeconds == nil {
   452  			t.Fatalf("unexpected TimeoutSeconds option specified, actual = %#v, expected = %#v", actualRequestOption, expectedRequestOption)
   453  		}
   454  		// ignore actual values
   455  		actualRequestOption.TimeoutSeconds = nil
   456  		expectedRequestOption.TimeoutSeconds = nil
   457  		if !cmp.Equal(actualRequestOption, expectedRequestOption) {
   458  			t.Fatalf("expected %#v, got %#v", expectedRequestOption, actualRequestOption)
   459  		}
   460  	}
   461  }
   462  
   463  func verifyListCounter(t *testing.T, lw *fakeListWatcher, expectedListCounter int) {
   464  	if lw.listCounter != expectedListCounter {
   465  		t.Fatalf("unexpected number of LIST requests, got: %v, expected: %v", lw.listCounter, expectedListCounter)
   466  	}
   467  }
   468  
   469  func verifyWatchCounter(t *testing.T, lw *fakeListWatcher, expectedWatchCounter int) {
   470  	if lw.watchCounter != expectedWatchCounter {
   471  		t.Fatalf("unexpected number of WATCH requests, got: %v, expected: %v", lw.watchCounter, expectedWatchCounter)
   472  	}
   473  }
   474  
   475  type byName []v1.Pod
   476  
   477  func (a byName) Len() int           { return len(a) }
   478  func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
   479  func (a byName) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
   480  
   481  func verifyStore(t *testing.T, s Store, expectedPods []v1.Pod) {
   482  	rawPods := s.List()
   483  	actualPods := []v1.Pod{}
   484  	for _, p := range rawPods {
   485  		actualPods = append(actualPods, *p.(*v1.Pod))
   486  	}
   487  
   488  	sort.Sort(byName(actualPods))
   489  	sort.Sort(byName(expectedPods))
   490  	if !cmp.Equal(actualPods, expectedPods, cmpopts.EquateEmpty()) {
   491  		t.Fatalf("unexpected store content, diff: %s", cmp.Diff(actualPods, expectedPods))
   492  	}
   493  }
   494  
   495  func makePod(name, rv string) *v1.Pod {
   496  	return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: rv, UID: types.UID(name)}}
   497  }
   498  
   499  func testData() (*fakeListWatcher, Store, *Reflector, chan struct{}) {
   500  	s := NewStore(MetaNamespaceKeyFunc)
   501  	stopCh := make(chan struct{})
   502  	lw := &fakeListWatcher{
   503  		fakeWatcher: watch.NewFake(),
   504  		stop: func() {
   505  			close(stopCh)
   506  		},
   507  	}
   508  	r := NewReflector(lw, &v1.Pod{}, s, 0)
   509  	r.UseWatchList = ptr.To(true)
   510  
   511  	return lw, s, r, stopCh
   512  }
   513  
   514  type fakeListWatcher struct {
   515  	lock                    sync.Mutex
   516  	fakeWatcher             *watch.FakeWatcher
   517  	listCounter             int
   518  	watchCounter            int
   519  	closeAfterWatchRequests int
   520  	closeAfterListRequests  int
   521  	stop                    func()
   522  
   523  	requestOptions []metav1.ListOptions
   524  
   525  	customListResponse    *v1.PodList
   526  	watchOptionsPredicate func(options metav1.ListOptions) error
   527  }
   528  
   529  func (lw *fakeListWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
   530  	lw.listCounter++
   531  	lw.requestOptions = append(lw.requestOptions, options)
   532  	if lw.listCounter == lw.closeAfterListRequests {
   533  		lw.stop()
   534  	}
   535  	if lw.customListResponse != nil {
   536  		return lw.customListResponse, nil
   537  	}
   538  	return nil, fmt.Errorf("not implemented")
   539  }
   540  
   541  func (lw *fakeListWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
   542  	lw.watchCounter++
   543  	lw.requestOptions = append(lw.requestOptions, options)
   544  	if lw.watchCounter == lw.closeAfterWatchRequests {
   545  		lw.stop()
   546  	}
   547  	if lw.watchOptionsPredicate != nil {
   548  		if err := lw.watchOptionsPredicate(options); err != nil {
   549  			return nil, err
   550  		}
   551  	}
   552  	lw.lock.Lock()
   553  	defer lw.lock.Unlock()
   554  	return lw.fakeWatcher, nil
   555  }
   556  
   557  func (lw *fakeListWatcher) StopAndRecreateWatch() {
   558  	lw.lock.Lock()
   559  	defer lw.lock.Unlock()
   560  	lw.fakeWatcher.Stop()
   561  	lw.fakeWatcher = watch.NewFake()
   562  }
   563  

View as plain text