...

Source file src/k8s.io/client-go/tools/cache/reflector.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"math/rand"
    25  	"os"
    26  	"reflect"
    27  	"strings"
    28  	"sync"
    29  	"time"
    30  
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	"k8s.io/apimachinery/pkg/api/meta"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    35  	"k8s.io/apimachinery/pkg/runtime"
    36  	"k8s.io/apimachinery/pkg/runtime/schema"
    37  	"k8s.io/apimachinery/pkg/util/naming"
    38  	utilnet "k8s.io/apimachinery/pkg/util/net"
    39  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    40  	"k8s.io/apimachinery/pkg/util/wait"
    41  	"k8s.io/apimachinery/pkg/watch"
    42  	"k8s.io/client-go/tools/pager"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/utils/clock"
    45  	"k8s.io/utils/pointer"
    46  	"k8s.io/utils/ptr"
    47  	"k8s.io/utils/trace"
    48  )
    49  
    50  const defaultExpectedTypeName = "<unspecified>"
    51  
    52  // Reflector watches a specified resource and causes all changes to be reflected in the given store.
    53  type Reflector struct {
    54  	// name identifies this reflector. By default it will be a file:line if possible.
    55  	name string
    56  	// The name of the type we expect to place in the store. The name
    57  	// will be the stringification of expectedGVK if provided, and the
    58  	// stringification of expectedType otherwise. It is for display
    59  	// only, and should not be used for parsing or comparison.
    60  	typeDescription string
    61  	// An example object of the type we expect to place in the store.
    62  	// Only the type needs to be right, except that when that is
    63  	// `unstructured.Unstructured` the object's `"apiVersion"` and
    64  	// `"kind"` must also be right.
    65  	expectedType reflect.Type
    66  	// The GVK of the object we expect to place in the store if unstructured.
    67  	expectedGVK *schema.GroupVersionKind
    68  	// The destination to sync up with the watch source
    69  	store Store
    70  	// listerWatcher is used to perform lists and watches.
    71  	listerWatcher ListerWatcher
    72  	// backoff manages backoff of ListWatch
    73  	backoffManager wait.BackoffManager
    74  	resyncPeriod   time.Duration
    75  	// clock allows tests to manipulate time
    76  	clock clock.Clock
    77  	// paginatedResult defines whether pagination should be forced for list calls.
    78  	// It is set based on the result of the initial list call.
    79  	paginatedResult bool
    80  	// lastSyncResourceVersion is the resource version token last
    81  	// observed when doing a sync with the underlying store
    82  	// it is thread safe, but not synchronized with the underlying store
    83  	lastSyncResourceVersion string
    84  	// isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
    85  	// lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
    86  	isLastSyncResourceVersionUnavailable bool
    87  	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    88  	lastSyncResourceVersionMutex sync.RWMutex
    89  	// Called whenever the ListAndWatch drops the connection with an error.
    90  	watchErrorHandler WatchErrorHandler
    91  	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
    92  	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
    93  	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
    94  	// it will turn off pagination to allow serving them from watch cache.
    95  	// NOTE: It should be used carefully as paginated lists are always served directly from
    96  	// etcd, which is significantly less efficient and may lead to serious performance and
    97  	// scalability problems.
    98  	WatchListPageSize int64
    99  	// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
   100  	ShouldResync func() bool
   101  	// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
   102  	MaxInternalErrorRetryDuration time.Duration
   103  	// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
   104  	// Streaming has the primary advantage of using fewer server's resources to fetch data.
   105  	//
   106  	// The old behaviour establishes a LIST request which gets data in chunks.
   107  	// Paginated list is less efficient and depending on the actual size of objects
   108  	// might result in an increased memory consumption of the APIServer.
   109  	//
   110  	// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
   111  	//
   112  	// TODO(#115478): Consider making reflector.UseWatchList a private field. Since we implemented "api streaming" on the etcd storage layer it should work.
   113  	UseWatchList *bool
   114  }
   115  
   116  // ResourceVersionUpdater is an interface that allows store implementation to
   117  // track the current resource version of the reflector. This is especially
   118  // important if storage bookmarks are enabled.
   119  type ResourceVersionUpdater interface {
   120  	// UpdateResourceVersion is called each time current resource version of the reflector
   121  	// is updated.
   122  	UpdateResourceVersion(resourceVersion string)
   123  }
   124  
   125  // The WatchErrorHandler is called whenever ListAndWatch drops the
   126  // connection with an error. After calling this handler, the informer
   127  // will backoff and retry.
   128  //
   129  // The default implementation looks at the error type and tries to log
   130  // the error message at an appropriate level.
   131  //
   132  // Implementations of this handler may display the error message in other
   133  // ways. Implementations should return quickly - any expensive processing
   134  // should be offloaded.
   135  type WatchErrorHandler func(r *Reflector, err error)
   136  
   137  // DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
   138  func DefaultWatchErrorHandler(r *Reflector, err error) {
   139  	switch {
   140  	case isExpiredError(err):
   141  		// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
   142  		// has a semantic that it returns data at least as fresh as provided RV.
   143  		// So first try to LIST with setting RV to resource version of last observed object.
   144  		klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
   145  	case err == io.EOF:
   146  		// watch closed normally
   147  	case err == io.ErrUnexpectedEOF:
   148  		klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.typeDescription, err)
   149  	default:
   150  		utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.typeDescription, err))
   151  	}
   152  }
   153  
   154  var (
   155  	// We try to spread the load on apiserver by setting timeouts for
   156  	// watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
   157  	minWatchTimeout = 5 * time.Minute
   158  )
   159  
   160  // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
   161  // The indexer is configured to key on namespace
   162  func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
   163  	indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
   164  	reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
   165  	return indexer, reflector
   166  }
   167  
   168  // NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack
   169  // that is outside this package. See NewReflectorWithOptions for further information.
   170  func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
   171  	return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
   172  }
   173  
   174  // NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further
   175  // information.
   176  func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
   177  	return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
   178  }
   179  
   180  // ReflectorOptions configures a Reflector.
   181  type ReflectorOptions struct {
   182  	// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
   183  	// in the call stack that is outside this package.
   184  	Name string
   185  
   186  	// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
   187  	// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
   188  	// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
   189  	// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
   190  	// go type of expectedType..
   191  	TypeDescription string
   192  
   193  	// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
   194  	// (do not resync).
   195  	ResyncPeriod time.Duration
   196  
   197  	// Clock allows tests to control time. If unset defaults to clock.RealClock{}
   198  	Clock clock.Clock
   199  }
   200  
   201  // NewReflectorWithOptions creates a new Reflector object which will keep the
   202  // given store up to date with the server's contents for the given
   203  // resource. Reflector promises to only put things in the store that
   204  // have the type of expectedType, unless expectedType is nil. If
   205  // resyncPeriod is non-zero, then the reflector will periodically
   206  // consult its ShouldResync function to determine whether to invoke
   207  // the Store's Resync operation; `ShouldResync==nil` means always
   208  // "yes".  This enables you to use reflectors to periodically process
   209  // everything as well as incrementally processing the things that
   210  // change.
   211  func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector {
   212  	reflectorClock := options.Clock
   213  	if reflectorClock == nil {
   214  		reflectorClock = clock.RealClock{}
   215  	}
   216  	r := &Reflector{
   217  		name:            options.Name,
   218  		resyncPeriod:    options.ResyncPeriod,
   219  		typeDescription: options.TypeDescription,
   220  		listerWatcher:   lw,
   221  		store:           store,
   222  		// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
   223  		// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
   224  		// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
   225  		backoffManager:    wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, reflectorClock),
   226  		clock:             reflectorClock,
   227  		watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
   228  		expectedType:      reflect.TypeOf(expectedType),
   229  	}
   230  
   231  	if r.name == "" {
   232  		r.name = naming.GetNameFromCallsite(internalPackages...)
   233  	}
   234  
   235  	if r.typeDescription == "" {
   236  		r.typeDescription = getTypeDescriptionFromObject(expectedType)
   237  	}
   238  
   239  	if r.expectedGVK == nil {
   240  		r.expectedGVK = getExpectedGVKFromObject(expectedType)
   241  	}
   242  
   243  	// don't overwrite UseWatchList if already set
   244  	// because the higher layers (e.g. storage/cacher) disabled it on purpose
   245  	if r.UseWatchList == nil {
   246  		if s := os.Getenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA"); len(s) > 0 {
   247  			r.UseWatchList = ptr.To(true)
   248  		}
   249  	}
   250  
   251  	return r
   252  }
   253  
   254  func getTypeDescriptionFromObject(expectedType interface{}) string {
   255  	if expectedType == nil {
   256  		return defaultExpectedTypeName
   257  	}
   258  
   259  	reflectDescription := reflect.TypeOf(expectedType).String()
   260  
   261  	obj, ok := expectedType.(*unstructured.Unstructured)
   262  	if !ok {
   263  		return reflectDescription
   264  	}
   265  
   266  	gvk := obj.GroupVersionKind()
   267  	if gvk.Empty() {
   268  		return reflectDescription
   269  	}
   270  
   271  	return gvk.String()
   272  }
   273  
   274  func getExpectedGVKFromObject(expectedType interface{}) *schema.GroupVersionKind {
   275  	obj, ok := expectedType.(*unstructured.Unstructured)
   276  	if !ok {
   277  		return nil
   278  	}
   279  
   280  	gvk := obj.GroupVersionKind()
   281  	if gvk.Empty() {
   282  		return nil
   283  	}
   284  
   285  	return &gvk
   286  }
   287  
   288  // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
   289  // call chains to NewReflector, so they'd be low entropy names for reflectors
   290  var internalPackages = []string{"client-go/tools/cache/"}
   291  
   292  // Run repeatedly uses the reflector's ListAndWatch to fetch all the
   293  // objects and subsequent deltas.
   294  // Run will exit when stopCh is closed.
   295  func (r *Reflector) Run(stopCh <-chan struct{}) {
   296  	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
   297  	wait.BackoffUntil(func() {
   298  		if err := r.ListAndWatch(stopCh); err != nil {
   299  			r.watchErrorHandler(r, err)
   300  		}
   301  	}, r.backoffManager, true, stopCh)
   302  	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
   303  }
   304  
   305  var (
   306  	// nothing will ever be sent down this channel
   307  	neverExitWatch <-chan time.Time = make(chan time.Time)
   308  
   309  	// Used to indicate that watching stopped because of a signal from the stop
   310  	// channel passed in from a client of the reflector.
   311  	errorStopRequested = errors.New("stop requested")
   312  )
   313  
   314  // resyncChan returns a channel which will receive something when a resync is
   315  // required, and a cleanup function.
   316  func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
   317  	if r.resyncPeriod == 0 {
   318  		return neverExitWatch, func() bool { return false }
   319  	}
   320  	// The cleanup function is required: imagine the scenario where watches
   321  	// always fail so we end up listing frequently. Then, if we don't
   322  	// manually stop the timer, we could end up with many timers active
   323  	// concurrently.
   324  	t := r.clock.NewTimer(r.resyncPeriod)
   325  	return t.C(), t.Stop
   326  }
   327  
   328  // ListAndWatch first lists all items and get the resource version at the moment of call,
   329  // and then use the resource version to watch.
   330  // It returns error if ListAndWatch didn't even try to initialize watch.
   331  func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
   332  	klog.V(3).Infof("Listing and watching %v from %s", r.typeDescription, r.name)
   333  	var err error
   334  	var w watch.Interface
   335  	useWatchList := ptr.Deref(r.UseWatchList, false)
   336  	fallbackToList := !useWatchList
   337  
   338  	if useWatchList {
   339  		w, err = r.watchList(stopCh)
   340  		if w == nil && err == nil {
   341  			// stopCh was closed
   342  			return nil
   343  		}
   344  		if err != nil {
   345  			klog.Warningf("The watchlist request ended with an error, falling back to the standard LIST/WATCH semantics because making progress is better than deadlocking, err = %v", err)
   346  			fallbackToList = true
   347  			// ensure that we won't accidentally pass some garbage down the watch.
   348  			w = nil
   349  		}
   350  	}
   351  
   352  	if fallbackToList {
   353  		err = r.list(stopCh)
   354  		if err != nil {
   355  			return err
   356  		}
   357  	}
   358  
   359  	klog.V(2).Infof("Caches populated for %v from %s", r.typeDescription, r.name)
   360  
   361  	resyncerrc := make(chan error, 1)
   362  	cancelCh := make(chan struct{})
   363  	defer close(cancelCh)
   364  	go r.startResync(stopCh, cancelCh, resyncerrc)
   365  	return r.watch(w, stopCh, resyncerrc)
   366  }
   367  
   368  // startResync periodically calls r.store.Resync() method.
   369  // Note that this method is blocking and should be
   370  // called in a separate goroutine.
   371  func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
   372  	resyncCh, cleanup := r.resyncChan()
   373  	defer func() {
   374  		cleanup() // Call the last one written into cleanup
   375  	}()
   376  	for {
   377  		select {
   378  		case <-resyncCh:
   379  		case <-stopCh:
   380  			return
   381  		case <-cancelCh:
   382  			return
   383  		}
   384  		if r.ShouldResync == nil || r.ShouldResync() {
   385  			klog.V(4).Infof("%s: forcing resync", r.name)
   386  			if err := r.store.Resync(); err != nil {
   387  				resyncerrc <- err
   388  				return
   389  			}
   390  		}
   391  		cleanup()
   392  		resyncCh, cleanup = r.resyncChan()
   393  	}
   394  }
   395  
   396  // watch simply starts a watch request with the server.
   397  func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
   398  	var err error
   399  	retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
   400  
   401  	for {
   402  		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
   403  		select {
   404  		case <-stopCh:
   405  			// we can only end up here when the stopCh
   406  			// was closed after a successful watchlist or list request
   407  			if w != nil {
   408  				w.Stop()
   409  			}
   410  			return nil
   411  		default:
   412  		}
   413  
   414  		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
   415  		start := r.clock.Now()
   416  
   417  		if w == nil {
   418  			timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
   419  			options := metav1.ListOptions{
   420  				ResourceVersion: r.LastSyncResourceVersion(),
   421  				// We want to avoid situations of hanging watchers. Stop any watchers that do not
   422  				// receive any events within the timeout window.
   423  				TimeoutSeconds: &timeoutSeconds,
   424  				// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
   425  				// Reflector doesn't assume bookmarks are returned at all (if the server do not support
   426  				// watch bookmarks, it will ignore this field).
   427  				AllowWatchBookmarks: true,
   428  			}
   429  
   430  			w, err = r.listerWatcher.Watch(options)
   431  			if err != nil {
   432  				if canRetry := isWatchErrorRetriable(err); canRetry {
   433  					klog.V(4).Infof("%s: watch of %v returned %v - backing off", r.name, r.typeDescription, err)
   434  					select {
   435  					case <-stopCh:
   436  						return nil
   437  					case <-r.backoffManager.Backoff().C():
   438  						continue
   439  					}
   440  				}
   441  				return err
   442  			}
   443  		}
   444  
   445  		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, nil, r.clock, resyncerrc, stopCh)
   446  		// Ensure that watch will not be reused across iterations.
   447  		w.Stop()
   448  		w = nil
   449  		retry.After(err)
   450  		if err != nil {
   451  			if err != errorStopRequested {
   452  				switch {
   453  				case isExpiredError(err):
   454  					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
   455  					// has a semantic that it returns data at least as fresh as provided RV.
   456  					// So first try to LIST with setting RV to resource version of last observed object.
   457  					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.typeDescription, err)
   458  				case apierrors.IsTooManyRequests(err):
   459  					klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.typeDescription)
   460  					select {
   461  					case <-stopCh:
   462  						return nil
   463  					case <-r.backoffManager.Backoff().C():
   464  						continue
   465  					}
   466  				case apierrors.IsInternalError(err) && retry.ShouldRetry():
   467  					klog.V(2).Infof("%s: retrying watch of %v internal error: %v", r.name, r.typeDescription, err)
   468  					continue
   469  				default:
   470  					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.typeDescription, err)
   471  				}
   472  			}
   473  			return nil
   474  		}
   475  	}
   476  }
   477  
   478  // list simply lists all items and records a resource version obtained from the server at the moment of the call.
   479  // the resource version can be used for further progress notification (aka. watch).
   480  func (r *Reflector) list(stopCh <-chan struct{}) error {
   481  	var resourceVersion string
   482  	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
   483  
   484  	initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
   485  	defer initTrace.LogIfLong(10 * time.Second)
   486  	var list runtime.Object
   487  	var paginatedResult bool
   488  	var err error
   489  	listCh := make(chan struct{}, 1)
   490  	panicCh := make(chan interface{}, 1)
   491  	go func() {
   492  		defer func() {
   493  			if r := recover(); r != nil {
   494  				panicCh <- r
   495  			}
   496  		}()
   497  		// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
   498  		// list request will return the full response.
   499  		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
   500  			return r.listerWatcher.List(opts)
   501  		}))
   502  		switch {
   503  		case r.WatchListPageSize != 0:
   504  			pager.PageSize = r.WatchListPageSize
   505  		case r.paginatedResult:
   506  			// We got a paginated result initially. Assume this resource and server honor
   507  			// paging requests (i.e. watch cache is probably disabled) and leave the default
   508  			// pager size set.
   509  		case options.ResourceVersion != "" && options.ResourceVersion != "0":
   510  			// User didn't explicitly request pagination.
   511  			//
   512  			// With ResourceVersion != "", we have a possibility to list from watch cache,
   513  			// but we do that (for ResourceVersion != "0") only if Limit is unset.
   514  			// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
   515  			// switch off pagination to force listing from watch cache (if enabled).
   516  			// With the existing semantic of RV (result is at least as fresh as provided RV),
   517  			// this is correct and doesn't lead to going back in time.
   518  			//
   519  			// We also don't turn off pagination for ResourceVersion="0", since watch cache
   520  			// is ignoring Limit in that case anyway, and if watch cache is not enabled
   521  			// we don't introduce regression.
   522  			pager.PageSize = 0
   523  		}
   524  
   525  		list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
   526  		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
   527  			r.setIsLastSyncResourceVersionUnavailable(true)
   528  			// Retry immediately if the resource version used to list is unavailable.
   529  			// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
   530  			// continuation pages, but the pager might not be enabled, the full list might fail because the
   531  			// resource version it is listing at is expired or the cache may not yet be synced to the provided
   532  			// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
   533  			// the reflector makes forward progress.
   534  			list, paginatedResult, err = pager.ListWithAlloc(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
   535  		}
   536  		close(listCh)
   537  	}()
   538  	select {
   539  	case <-stopCh:
   540  		return nil
   541  	case r := <-panicCh:
   542  		panic(r)
   543  	case <-listCh:
   544  	}
   545  	initTrace.Step("Objects listed", trace.Field{Key: "error", Value: err})
   546  	if err != nil {
   547  		klog.Warningf("%s: failed to list %v: %v", r.name, r.typeDescription, err)
   548  		return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
   549  	}
   550  
   551  	// We check if the list was paginated and if so set the paginatedResult based on that.
   552  	// However, we want to do that only for the initial list (which is the only case
   553  	// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
   554  	// situations we may force listing directly from etcd (by setting ResourceVersion="")
   555  	// which will return paginated result, even if watch cache is enabled. However, in
   556  	// that case, we still want to prefer sending requests to watch cache if possible.
   557  	//
   558  	// Paginated result returned for request with ResourceVersion="0" mean that watch
   559  	// cache is disabled and there are a lot of objects of a given type. In such case,
   560  	// there is no need to prefer listing from watch cache.
   561  	if options.ResourceVersion == "0" && paginatedResult {
   562  		r.paginatedResult = true
   563  	}
   564  
   565  	r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
   566  	listMetaInterface, err := meta.ListAccessor(list)
   567  	if err != nil {
   568  		return fmt.Errorf("unable to understand list result %#v: %v", list, err)
   569  	}
   570  	resourceVersion = listMetaInterface.GetResourceVersion()
   571  	initTrace.Step("Resource version extracted")
   572  	items, err := meta.ExtractListWithAlloc(list)
   573  	if err != nil {
   574  		return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
   575  	}
   576  	initTrace.Step("Objects extracted")
   577  	if err := r.syncWith(items, resourceVersion); err != nil {
   578  		return fmt.Errorf("unable to sync list result: %v", err)
   579  	}
   580  	initTrace.Step("SyncWith done")
   581  	r.setLastSyncResourceVersion(resourceVersion)
   582  	initTrace.Step("Resource version updated")
   583  	return nil
   584  }
   585  
   586  // watchList establishes a stream to get a consistent snapshot of data
   587  // from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
   588  //
   589  // case 1: start at Most Recent (RV="", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
   590  // Establishes a consistent stream with the server.
   591  // That means the returned data is consistent, as if, served directly from etcd via a quorum read.
   592  // It begins with synthetic "Added" events of all resources up to the most recent ResourceVersion.
   593  // It ends with a synthetic "Bookmark" event containing the most recent ResourceVersion.
   594  // After receiving a "Bookmark" event the reflector is considered to be synchronized.
   595  // It replaces its internal store with the collected items and
   596  // reuses the current watch requests for getting further events.
   597  //
   598  // case 2: start at Exact (RV>"0", ResourceVersionMatch=ResourceVersionMatchNotOlderThan)
   599  // Establishes a stream with the server at the provided resource version.
   600  // To establish the initial state the server begins with synthetic "Added" events.
   601  // It ends with a synthetic "Bookmark" event containing the provided or newer resource version.
   602  // After receiving a "Bookmark" event the reflector is considered to be synchronized.
   603  // It replaces its internal store with the collected items and
   604  // reuses the current watch requests for getting further events.
   605  func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
   606  	var w watch.Interface
   607  	var err error
   608  	var temporaryStore Store
   609  	var resourceVersion string
   610  	// TODO(#115478): see if this function could be turned
   611  	//  into a method and see if error handling
   612  	//  could be unified with the r.watch method
   613  	isErrorRetriableWithSideEffectsFn := func(err error) bool {
   614  		if canRetry := isWatchErrorRetriable(err); canRetry {
   615  			klog.V(2).Infof("%s: watch-list of %v returned %v - backing off", r.name, r.typeDescription, err)
   616  			<-r.backoffManager.Backoff().C()
   617  			return true
   618  		}
   619  		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
   620  			// we tried to re-establish a watch request but the provided RV
   621  			// has either expired or it is greater than the server knows about.
   622  			// In that case we reset the RV and
   623  			// try to get a consistent snapshot from the watch cache (case 1)
   624  			r.setIsLastSyncResourceVersionUnavailable(true)
   625  			return true
   626  		}
   627  		return false
   628  	}
   629  
   630  	initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
   631  	defer initTrace.LogIfLong(10 * time.Second)
   632  	for {
   633  		select {
   634  		case <-stopCh:
   635  			return nil, nil
   636  		default:
   637  		}
   638  
   639  		resourceVersion = ""
   640  		lastKnownRV := r.rewatchResourceVersion()
   641  		temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
   642  		// TODO(#115478): large "list", slow clients, slow network, p&f
   643  		//  might slow down streaming and eventually fail.
   644  		//  maybe in such a case we should retry with an increased timeout?
   645  		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
   646  		options := metav1.ListOptions{
   647  			ResourceVersion:      lastKnownRV,
   648  			AllowWatchBookmarks:  true,
   649  			SendInitialEvents:    pointer.Bool(true),
   650  			ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
   651  			TimeoutSeconds:       &timeoutSeconds,
   652  		}
   653  		start := r.clock.Now()
   654  
   655  		w, err = r.listerWatcher.Watch(options)
   656  		if err != nil {
   657  			if isErrorRetriableWithSideEffectsFn(err) {
   658  				continue
   659  			}
   660  			return nil, err
   661  		}
   662  		bookmarkReceived := pointer.Bool(false)
   663  		err = watchHandler(start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription,
   664  			func(rv string) { resourceVersion = rv },
   665  			bookmarkReceived,
   666  			r.clock, make(chan error), stopCh)
   667  		if err != nil {
   668  			w.Stop() // stop and retry with clean state
   669  			if err == errorStopRequested {
   670  				return nil, nil
   671  			}
   672  			if isErrorRetriableWithSideEffectsFn(err) {
   673  				continue
   674  			}
   675  			return nil, err
   676  		}
   677  		if *bookmarkReceived {
   678  			break
   679  		}
   680  	}
   681  	// We successfully got initial state from watch-list confirmed by the
   682  	// "k8s.io/initial-events-end" bookmark.
   683  	initTrace.Step("Objects streamed", trace.Field{Key: "count", Value: len(temporaryStore.List())})
   684  	r.setIsLastSyncResourceVersionUnavailable(false)
   685  
   686  	// we utilize the temporaryStore to ensure independence from the current store implementation.
   687  	// as of today, the store is implemented as a queue and will be drained by the higher-level
   688  	// component as soon as it finishes replacing the content.
   689  	checkWatchListConsistencyIfRequested(stopCh, r.name, resourceVersion, r.listerWatcher, temporaryStore)
   690  
   691  	if err = r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
   692  		return nil, fmt.Errorf("unable to sync watch-list result: %v", err)
   693  	}
   694  	initTrace.Step("SyncWith done")
   695  	r.setLastSyncResourceVersion(resourceVersion)
   696  
   697  	return w, nil
   698  }
   699  
   700  // syncWith replaces the store's items with the given list.
   701  func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
   702  	found := make([]interface{}, 0, len(items))
   703  	for _, item := range items {
   704  		found = append(found, item)
   705  	}
   706  	return r.store.Replace(found, resourceVersion)
   707  }
   708  
   709  // watchHandler watches w and sets setLastSyncResourceVersion
   710  func watchHandler(start time.Time,
   711  	w watch.Interface,
   712  	store Store,
   713  	expectedType reflect.Type,
   714  	expectedGVK *schema.GroupVersionKind,
   715  	name string,
   716  	expectedTypeName string,
   717  	setLastSyncResourceVersion func(string),
   718  	exitOnInitialEventsEndBookmark *bool,
   719  	clock clock.Clock,
   720  	errc chan error,
   721  	stopCh <-chan struct{},
   722  ) error {
   723  	eventCount := 0
   724  	if exitOnInitialEventsEndBookmark != nil {
   725  		// set it to false just in case somebody
   726  		// made it positive
   727  		*exitOnInitialEventsEndBookmark = false
   728  	}
   729  
   730  loop:
   731  	for {
   732  		select {
   733  		case <-stopCh:
   734  			return errorStopRequested
   735  		case err := <-errc:
   736  			return err
   737  		case event, ok := <-w.ResultChan():
   738  			if !ok {
   739  				break loop
   740  			}
   741  			if event.Type == watch.Error {
   742  				return apierrors.FromObject(event.Object)
   743  			}
   744  			if expectedType != nil {
   745  				if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
   746  					utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
   747  					continue
   748  				}
   749  			}
   750  			if expectedGVK != nil {
   751  				if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
   752  					utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
   753  					continue
   754  				}
   755  			}
   756  			meta, err := meta.Accessor(event.Object)
   757  			if err != nil {
   758  				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
   759  				continue
   760  			}
   761  			resourceVersion := meta.GetResourceVersion()
   762  			switch event.Type {
   763  			case watch.Added:
   764  				err := store.Add(event.Object)
   765  				if err != nil {
   766  					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
   767  				}
   768  			case watch.Modified:
   769  				err := store.Update(event.Object)
   770  				if err != nil {
   771  					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
   772  				}
   773  			case watch.Deleted:
   774  				// TODO: Will any consumers need access to the "last known
   775  				// state", which is passed in event.Object? If so, may need
   776  				// to change this.
   777  				err := store.Delete(event.Object)
   778  				if err != nil {
   779  					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
   780  				}
   781  			case watch.Bookmark:
   782  				// A `Bookmark` means watch has synced here, just update the resourceVersion
   783  				if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" {
   784  					if exitOnInitialEventsEndBookmark != nil {
   785  						*exitOnInitialEventsEndBookmark = true
   786  					}
   787  				}
   788  			default:
   789  				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
   790  			}
   791  			setLastSyncResourceVersion(resourceVersion)
   792  			if rvu, ok := store.(ResourceVersionUpdater); ok {
   793  				rvu.UpdateResourceVersion(resourceVersion)
   794  			}
   795  			eventCount++
   796  			if exitOnInitialEventsEndBookmark != nil && *exitOnInitialEventsEndBookmark {
   797  				watchDuration := clock.Since(start)
   798  				klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
   799  				return nil
   800  			}
   801  		}
   802  	}
   803  
   804  	watchDuration := clock.Since(start)
   805  	if watchDuration < 1*time.Second && eventCount == 0 {
   806  		return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)
   807  	}
   808  	klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)
   809  	return nil
   810  }
   811  
   812  // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
   813  // The value returned is not synchronized with access to the underlying store and is not thread-safe
   814  func (r *Reflector) LastSyncResourceVersion() string {
   815  	r.lastSyncResourceVersionMutex.RLock()
   816  	defer r.lastSyncResourceVersionMutex.RUnlock()
   817  	return r.lastSyncResourceVersion
   818  }
   819  
   820  func (r *Reflector) setLastSyncResourceVersion(v string) {
   821  	r.lastSyncResourceVersionMutex.Lock()
   822  	defer r.lastSyncResourceVersionMutex.Unlock()
   823  	r.lastSyncResourceVersion = v
   824  }
   825  
   826  // relistResourceVersion determines the resource version the reflector should list or relist from.
   827  // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
   828  // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
   829  // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
   830  // etcd via a quorum read.
   831  func (r *Reflector) relistResourceVersion() string {
   832  	r.lastSyncResourceVersionMutex.RLock()
   833  	defer r.lastSyncResourceVersionMutex.RUnlock()
   834  
   835  	if r.isLastSyncResourceVersionUnavailable {
   836  		// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
   837  		// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
   838  		// to the latest available ResourceVersion, using a consistent read from etcd.
   839  		return ""
   840  	}
   841  	if r.lastSyncResourceVersion == "" {
   842  		// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
   843  		// be served from the watch cache if it is enabled.
   844  		return "0"
   845  	}
   846  	return r.lastSyncResourceVersion
   847  }
   848  
   849  // rewatchResourceVersion determines the resource version the reflector should start streaming from.
   850  func (r *Reflector) rewatchResourceVersion() string {
   851  	r.lastSyncResourceVersionMutex.RLock()
   852  	defer r.lastSyncResourceVersionMutex.RUnlock()
   853  	if r.isLastSyncResourceVersionUnavailable {
   854  		// initial stream should return data at the most recent resource version.
   855  		// the returned data must be consistent i.e. as if served from etcd via a quorum read
   856  		return ""
   857  	}
   858  	return r.lastSyncResourceVersion
   859  }
   860  
   861  // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
   862  // "expired" or "too large resource version" error.
   863  func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
   864  	r.lastSyncResourceVersionMutex.Lock()
   865  	defer r.lastSyncResourceVersionMutex.Unlock()
   866  	r.isLastSyncResourceVersionUnavailable = isUnavailable
   867  }
   868  
   869  func isExpiredError(err error) bool {
   870  	// In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
   871  	// apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
   872  	// and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
   873  	// check when we fully drop support for Kubernetes 1.17 servers from reflectors.
   874  	return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
   875  }
   876  
   877  func isTooLargeResourceVersionError(err error) bool {
   878  	if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
   879  		return true
   880  	}
   881  	// In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
   882  	// metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
   883  	// version is larger than the largest currently available resource version. To ensure backward
   884  	// compatibility with these server versions we also need to detect the error based on the content
   885  	// of the error message field.
   886  	if !apierrors.IsTimeout(err) {
   887  		return false
   888  	}
   889  	apierr, ok := err.(apierrors.APIStatus)
   890  	if !ok || apierr == nil || apierr.Status().Details == nil {
   891  		return false
   892  	}
   893  	for _, cause := range apierr.Status().Details.Causes {
   894  		// Matches the message returned by api server 1.17.0-1.18.5 for this error condition
   895  		if cause.Message == "Too large resource version" {
   896  			return true
   897  		}
   898  	}
   899  
   900  	// Matches the message returned by api server before 1.17.0
   901  	if strings.Contains(apierr.Status().Message, "Too large resource version") {
   902  		return true
   903  	}
   904  
   905  	return false
   906  }
   907  
   908  // isWatchErrorRetriable determines if it is safe to retry
   909  // a watch error retrieved from the server.
   910  func isWatchErrorRetriable(err error) bool {
   911  	// If this is "connection refused" error, it means that most likely apiserver is not responsive.
   912  	// It doesn't make sense to re-list all objects because most likely we will be able to restart
   913  	// watch where we ended.
   914  	// If that's the case begin exponentially backing off and resend watch request.
   915  	// Do the same for "429" errors.
   916  	if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
   917  		return true
   918  	}
   919  	return false
   920  }
   921  

View as plain text