...

Source file src/k8s.io/client-go/tools/watch/retrywatcher_test.go

Documentation: k8s.io/client-go/tools/watch

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package watch
    18  
    19  import (
    20  	"errors"
    21  	"flag"
    22  	"fmt"
    23  	"reflect"
    24  	"strconv"
    25  	"sync/atomic"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/go-cmp/cmp"
    30  
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/runtime"
    34  	"k8s.io/apimachinery/pkg/runtime/schema"
    35  	"k8s.io/apimachinery/pkg/util/dump"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/apimachinery/pkg/watch"
    38  	"k8s.io/client-go/tools/cache"
    39  	"k8s.io/klog/v2"
    40  )
    41  
    42  func init() {
    43  	// Enable klog which is used in dependencies
    44  	klog.InitFlags(nil)
    45  	flag.Set("logtostderr", "true")
    46  	flag.Set("v", "9")
    47  }
    48  
    49  type testObject struct {
    50  	resourceVersion string
    51  }
    52  
    53  func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
    54  func (o testObject) DeepCopyObject() runtime.Object   { return o }
    55  func (o testObject) GetResourceVersion() string       { return o.resourceVersion }
    56  
    57  func withCounter(w cache.Watcher) (*uint32, cache.Watcher) {
    58  	var counter uint32
    59  	return &counter, &cache.ListWatch{
    60  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
    61  			atomic.AddUint32(&counter, 1)
    62  			return w.Watch(options)
    63  		},
    64  	}
    65  }
    66  
    67  func makeTestEvent(rv int) watch.Event {
    68  	return watch.Event{
    69  		Type: watch.Added,
    70  		Object: testObject{
    71  			resourceVersion: fmt.Sprintf("%d", rv),
    72  		},
    73  	}
    74  }
    75  
    76  func arrayToChannel(array []watch.Event) chan watch.Event {
    77  	ch := make(chan watch.Event, len(array))
    78  
    79  	for _, event := range array {
    80  		ch <- event
    81  	}
    82  
    83  	return ch
    84  }
    85  
    86  // parseResourceVersionOrDie is test-only that code simulating the server and thus can interpret resourceVersion
    87  func parseResourceVersionOrDie(resourceVersion string) uint64 {
    88  	// We can't use etcdstorage.Versioner.ParseResourceVersion() because of imports restrictions
    89  
    90  	if resourceVersion == "" {
    91  		return 0
    92  	}
    93  	version, err := strconv.ParseUint(resourceVersion, 10, 64)
    94  	if err != nil {
    95  		panic(fmt.Errorf("failed to parse resourceVersion %q", resourceVersion))
    96  	}
    97  	return version
    98  }
    99  
   100  func fromRV(resourceVersion string, array []watch.Event) []watch.Event {
   101  	var result []watch.Event
   102  	rv := parseResourceVersionOrDie(resourceVersion)
   103  	for _, event := range array {
   104  		if event.Type == watch.Error {
   105  			if len(result) == 0 {
   106  				// Skip error events until we find an object matching RV requirement
   107  				continue
   108  			}
   109  		} else {
   110  			rvGetter, ok := event.Object.(resourceVersionGetter)
   111  			if ok {
   112  				if parseResourceVersionOrDie(rvGetter.GetResourceVersion()) <= rv {
   113  					continue
   114  				}
   115  			}
   116  		}
   117  
   118  		result = append(result, event)
   119  	}
   120  
   121  	return result
   122  }
   123  
   124  func closeAfterN(n int, source chan watch.Event) chan watch.Event {
   125  	result := make(chan watch.Event, 0)
   126  	go func() {
   127  		defer close(result)
   128  		defer close(source)
   129  		for i := 0; i < n; i++ {
   130  			result <- <-source
   131  		}
   132  	}()
   133  	return result
   134  }
   135  
   136  type unexpectedError struct {
   137  	// Inheriting any struct fulfilling runtime.Object interface would do.
   138  	metav1.Status
   139  }
   140  
   141  var _ runtime.Object = &unexpectedError{}
   142  
   143  func TestNewRetryWatcher(t *testing.T) {
   144  	tt := []struct {
   145  		name      string
   146  		initialRV string
   147  		err       error
   148  	}{
   149  		{
   150  			name:      "empty RV should fail",
   151  			initialRV: "",
   152  			err:       errors.New("initial RV \"\" is not supported due to issues with underlying WATCH"),
   153  		},
   154  		{
   155  			name:      "RV \"0\" should fail",
   156  			initialRV: "0",
   157  			err:       errors.New("initial RV \"0\" is not supported due to issues with underlying WATCH"),
   158  		},
   159  	}
   160  	for _, tc := range tt {
   161  		t.Run(tc.name, func(t *testing.T) {
   162  			_, err := NewRetryWatcher(tc.initialRV, nil)
   163  			if !reflect.DeepEqual(err, tc.err) {
   164  				t.Errorf("Expected error: %v, got: %v", tc.err, err)
   165  			}
   166  		})
   167  	}
   168  }
   169  
   170  func TestRetryWatcher(t *testing.T) {
   171  	tt := []struct {
   172  		name        string
   173  		initialRV   string
   174  		watchClient cache.Watcher
   175  		watchCount  uint32
   176  		expected    []watch.Event
   177  	}{
   178  		{
   179  			name:      "recovers if watchClient returns error",
   180  			initialRV: "1",
   181  			watchClient: &cache.ListWatch{
   182  				WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) {
   183  					firstRun := true
   184  					return func(options metav1.ListOptions) (watch.Interface, error) {
   185  						if firstRun {
   186  							firstRun = false
   187  							return nil, fmt.Errorf("test error")
   188  						}
   189  
   190  						return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   191  							makeTestEvent(2),
   192  						}))), nil
   193  					}
   194  				}(),
   195  			},
   196  			watchCount: 2,
   197  			expected: []watch.Event{
   198  				makeTestEvent(2),
   199  			},
   200  		},
   201  		{
   202  			name:      "recovers if watchClient returns nil watcher",
   203  			initialRV: "1",
   204  			watchClient: &cache.ListWatch{
   205  				WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) {
   206  					firstRun := true
   207  					return func(options metav1.ListOptions) (watch.Interface, error) {
   208  						if firstRun {
   209  							firstRun = false
   210  							return nil, nil
   211  						}
   212  
   213  						return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   214  							makeTestEvent(2),
   215  						}))), nil
   216  					}
   217  				}(),
   218  			},
   219  			watchCount: 2,
   220  			expected: []watch.Event{
   221  				makeTestEvent(2),
   222  			},
   223  		},
   224  		{
   225  			name:      "works with empty initialRV",
   226  			initialRV: "1",
   227  			watchClient: &cache.ListWatch{
   228  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   229  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   230  						makeTestEvent(2),
   231  					}))), nil
   232  				},
   233  			},
   234  			watchCount: 1,
   235  			expected: []watch.Event{
   236  				makeTestEvent(2),
   237  			},
   238  		},
   239  		{
   240  			name:      "works with initialRV set, skipping the preceding items but reading those directly following",
   241  			initialRV: "1",
   242  			watchClient: &cache.ListWatch{
   243  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   244  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   245  						makeTestEvent(1),
   246  						makeTestEvent(2),
   247  					}))), nil
   248  				},
   249  			},
   250  			watchCount: 1,
   251  			expected: []watch.Event{
   252  				makeTestEvent(2),
   253  			},
   254  		},
   255  		{
   256  			name:      "works with initialRV set, skipping the preceding items with none following",
   257  			initialRV: "3",
   258  			watchClient: &cache.ListWatch{
   259  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   260  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   261  						makeTestEvent(2),
   262  					}))), nil
   263  				},
   264  			},
   265  			watchCount: 1,
   266  			expected:   nil,
   267  		},
   268  		{
   269  			name:      "fails on Gone (RV too old error)",
   270  			initialRV: "5",
   271  			watchClient: &cache.ListWatch{
   272  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   273  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   274  						makeTestEvent(5),
   275  						makeTestEvent(6),
   276  						{Type: watch.Error, Object: &apierrors.NewGone("").ErrStatus},
   277  						makeTestEvent(7),
   278  						makeTestEvent(8),
   279  					}))), nil
   280  				},
   281  			},
   282  			watchCount: 1,
   283  			expected: []watch.Event{
   284  				makeTestEvent(6),
   285  				{
   286  					Type:   watch.Error,
   287  					Object: &apierrors.NewGone("").ErrStatus,
   288  				},
   289  			},
   290  		},
   291  		{
   292  			name:      "recovers from timeout error",
   293  			initialRV: "5",
   294  			watchClient: &cache.ListWatch{
   295  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   296  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   297  						makeTestEvent(6),
   298  						{
   299  							Type:   watch.Error,
   300  							Object: &apierrors.NewTimeoutError("", 0).ErrStatus,
   301  						},
   302  						makeTestEvent(7),
   303  					}))), nil
   304  				},
   305  			},
   306  			watchCount: 2,
   307  			expected: []watch.Event{
   308  				makeTestEvent(6),
   309  				makeTestEvent(7),
   310  			},
   311  		},
   312  		{
   313  			name:      "recovers from internal server error",
   314  			initialRV: "5",
   315  			watchClient: &cache.ListWatch{
   316  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   317  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   318  						makeTestEvent(6),
   319  						{
   320  							Type:   watch.Error,
   321  							Object: &apierrors.NewInternalError(errors.New("")).ErrStatus,
   322  						},
   323  						makeTestEvent(7),
   324  					}))), nil
   325  				},
   326  			},
   327  			watchCount: 2,
   328  			expected: []watch.Event{
   329  				makeTestEvent(6),
   330  				makeTestEvent(7),
   331  			},
   332  		},
   333  		{
   334  			name:      "recovers from unexpected error code",
   335  			initialRV: "5",
   336  			watchClient: &cache.ListWatch{
   337  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   338  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   339  						makeTestEvent(6),
   340  						{
   341  							Type: watch.Error,
   342  							Object: &metav1.Status{
   343  								Code: 666,
   344  							},
   345  						},
   346  						makeTestEvent(7),
   347  					}))), nil
   348  				},
   349  			},
   350  			watchCount: 2,
   351  			expected: []watch.Event{
   352  				makeTestEvent(6),
   353  				makeTestEvent(7),
   354  			},
   355  		},
   356  		{
   357  			name:      "recovers from unexpected error type",
   358  			initialRV: "5",
   359  			watchClient: &cache.ListWatch{
   360  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   361  					return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   362  						makeTestEvent(6),
   363  						{
   364  							Type:   watch.Error,
   365  							Object: &unexpectedError{},
   366  						},
   367  						makeTestEvent(7),
   368  					}))), nil
   369  				},
   370  			},
   371  			watchCount: 2,
   372  			expected: []watch.Event{
   373  				makeTestEvent(6),
   374  				makeTestEvent(7),
   375  			},
   376  		},
   377  		{
   378  			name:      "survives 1 closed watch and reads 1 item",
   379  			initialRV: "5",
   380  			watchClient: &cache.ListWatch{
   381  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   382  					return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   383  						makeTestEvent(6),
   384  					})))), nil
   385  				},
   386  			},
   387  			watchCount: 2,
   388  			expected: []watch.Event{
   389  				makeTestEvent(6),
   390  			},
   391  		},
   392  		{
   393  			name:      "survives 2 closed watches and reads 2 items",
   394  			initialRV: "4",
   395  			watchClient: &cache.ListWatch{
   396  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   397  					return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   398  						makeTestEvent(5),
   399  						makeTestEvent(6),
   400  					})))), nil
   401  				},
   402  			},
   403  			watchCount: 3,
   404  			expected: []watch.Event{
   405  				makeTestEvent(5),
   406  				makeTestEvent(6),
   407  			},
   408  		},
   409  		{
   410  			name:      "survives 2 closed watches and reads 2 items for nonconsecutive RVs",
   411  			initialRV: "4",
   412  			watchClient: &cache.ListWatch{
   413  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   414  					return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   415  						makeTestEvent(5),
   416  						makeTestEvent(7),
   417  					})))), nil
   418  				},
   419  			},
   420  			watchCount: 3,
   421  			expected: []watch.Event{
   422  				makeTestEvent(5),
   423  				makeTestEvent(7),
   424  			},
   425  		},
   426  		{
   427  			name:      "survives 2 closed watches and reads 2 items for nonconsecutive RVs starting at much lower RV",
   428  			initialRV: "2",
   429  			watchClient: &cache.ListWatch{
   430  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   431  					return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   432  						makeTestEvent(5),
   433  						makeTestEvent(7),
   434  					})))), nil
   435  				},
   436  			},
   437  			watchCount: 3,
   438  			expected: []watch.Event{
   439  				makeTestEvent(5),
   440  				makeTestEvent(7),
   441  			},
   442  		},
   443  		{
   444  			name:      "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs",
   445  			initialRV: "2",
   446  			watchClient: &cache.ListWatch{
   447  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   448  					return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   449  						makeTestEvent(5),
   450  						makeTestEvent(6),
   451  						makeTestEvent(7),
   452  						makeTestEvent(11),
   453  					})))), nil
   454  				},
   455  			},
   456  			watchCount: 5,
   457  			expected: []watch.Event{
   458  				makeTestEvent(5),
   459  				makeTestEvent(6),
   460  				makeTestEvent(7),
   461  				makeTestEvent(11),
   462  			},
   463  		},
   464  		{
   465  			name:      "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs and skips those with lower or equal RV",
   466  			initialRV: "2",
   467  			watchClient: &cache.ListWatch{
   468  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   469  					return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   470  						makeTestEvent(1),
   471  						makeTestEvent(2),
   472  						makeTestEvent(5),
   473  						makeTestEvent(6),
   474  						makeTestEvent(7),
   475  						makeTestEvent(11),
   476  					})))), nil
   477  				},
   478  			},
   479  			watchCount: 5,
   480  			expected: []watch.Event{
   481  				makeTestEvent(5),
   482  				makeTestEvent(6),
   483  				makeTestEvent(7),
   484  				makeTestEvent(11),
   485  			},
   486  		},
   487  		{
   488  			name:      "survives 2 closed watches and reads 2+2+1 items skipping those with equal RV",
   489  			initialRV: "1",
   490  			watchClient: &cache.ListWatch{
   491  				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   492  					return watch.NewProxyWatcher(closeAfterN(2, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
   493  						makeTestEvent(1),
   494  						makeTestEvent(2),
   495  						makeTestEvent(5),
   496  						makeTestEvent(6),
   497  						makeTestEvent(7),
   498  						makeTestEvent(11),
   499  					})))), nil
   500  				},
   501  			},
   502  			watchCount: 3,
   503  			expected: []watch.Event{
   504  				makeTestEvent(2),
   505  				makeTestEvent(5),
   506  				makeTestEvent(6),
   507  				makeTestEvent(7),
   508  				makeTestEvent(11),
   509  			},
   510  		},
   511  	}
   512  
   513  	for _, tc := range tt {
   514  		tc := tc
   515  		t.Run(tc.name, func(t *testing.T) {
   516  			t.Parallel()
   517  
   518  			atomicCounter, watchFunc := withCounter(tc.watchClient)
   519  			watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0))
   520  			if err != nil {
   521  				t.Fatalf("failed to create a RetryWatcher: %v", err)
   522  			}
   523  			defer func() {
   524  				watcher.Stop()
   525  				t.Log("Waiting on RetryWatcher to stop...")
   526  				<-watcher.Done()
   527  			}()
   528  
   529  			var got []watch.Event
   530  			for i := 0; i < len(tc.expected); i++ {
   531  				event, ok := <-watcher.ResultChan()
   532  				if !ok {
   533  					t.Errorf("expected event %s, but channel is closed", dump.Pretty(tc.expected[i]))
   534  					break
   535  				}
   536  
   537  				got = append(got, event)
   538  			}
   539  
   540  			// (Sanity check, best effort) Make sure there are no more events to be received
   541  			// RetryWatcher proxies the source channel so we can't try reading it immediately
   542  			// but have to tolerate some delay. Given this is best effort detection we can use short duration.
   543  			// It also makes sure that for 0 events the watchFunc has time to be called.
   544  			select {
   545  			case event, ok := <-watcher.ResultChan():
   546  				if ok {
   547  					t.Errorf("Unexpected event received after reading all the expected ones: %s", dump.Pretty(event))
   548  				}
   549  			case <-time.After(10 * time.Millisecond):
   550  				break
   551  			}
   552  
   553  			var counter uint32
   554  			// We always count with the last watch reestablishing which is imminent but still a race.
   555  			// We will wait for the last watch to reestablish to avoid it.
   556  			err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (done bool, err error) {
   557  				counter = atomic.LoadUint32(atomicCounter)
   558  				return counter == tc.watchCount, nil
   559  			})
   560  			if err == wait.ErrWaitTimeout {
   561  				t.Errorf("expected %d watcher starts, but it has started %d times", tc.watchCount, counter)
   562  			} else if err != nil {
   563  				t.Fatal(err)
   564  			}
   565  
   566  			if !reflect.DeepEqual(tc.expected, got) {
   567  				t.Fatalf("expected %s, got %s;\ndiff: %s", dump.Pretty(tc.expected), dump.Pretty(got), cmp.Diff(tc.expected, got))
   568  			}
   569  		})
   570  	}
   571  }
   572  
   573  func TestRetryWatcherToFinishWithUnreadEvents(t *testing.T) {
   574  	watcher, err := NewRetryWatcher("1", &cache.ListWatch{
   575  		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   576  			return watch.NewProxyWatcher(arrayToChannel([]watch.Event{
   577  				makeTestEvent(2),
   578  			})), nil
   579  		},
   580  	})
   581  	if err != nil {
   582  		t.Fatalf("failed to create a RetryWatcher: %v", err)
   583  	}
   584  
   585  	// Give the watcher a chance to get to sending events (blocking)
   586  	time.Sleep(10 * time.Millisecond)
   587  
   588  	watcher.Stop()
   589  
   590  	maxTime := time.Second
   591  	select {
   592  	case <-watcher.Done():
   593  		break
   594  	case <-time.After(maxTime):
   595  		t.Errorf("The watcher failed to be closed in %s", maxTime)
   596  	}
   597  
   598  	// RetryWatcher result channel should be closed
   599  	_, ok := <-watcher.ResultChan()
   600  	if ok {
   601  		t.Error("ResultChan is not closed")
   602  	}
   603  }
   604  

View as plain text