...

Source file src/k8s.io/client-go/tools/watch/retrywatcher.go

Documentation: k8s.io/client-go/tools/watch

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package watch
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"time"
    26  
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/dump"
    30  	"k8s.io/apimachinery/pkg/util/net"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	"k8s.io/apimachinery/pkg/watch"
    33  	"k8s.io/client-go/tools/cache"
    34  	"k8s.io/klog/v2"
    35  )
    36  
    37  // resourceVersionGetter is an interface used to get resource version from events.
    38  // We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method
    39  type resourceVersionGetter interface {
    40  	GetResourceVersion() string
    41  }
    42  
    43  // RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout)
    44  // it will get restarted from the last point without the consumer even knowing about it.
    45  // RetryWatcher does that by inspecting events and keeping track of resourceVersion.
    46  // Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes.
    47  // Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
    48  // use Informers for that.
    49  type RetryWatcher struct {
    50  	lastResourceVersion string
    51  	watcherClient       cache.Watcher
    52  	resultChan          chan watch.Event
    53  	stopChan            chan struct{}
    54  	doneChan            chan struct{}
    55  	minRestartDelay     time.Duration
    56  }
    57  
    58  // NewRetryWatcher creates a new RetryWatcher.
    59  // It will make sure that watches gets restarted in case of recoverable errors.
    60  // The initialResourceVersion will be given to watch method when first called.
    61  func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
    62  	return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
    63  }
    64  
    65  func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
    66  	switch initialResourceVersion {
    67  	case "", "0":
    68  		// TODO: revisit this if we ever get WATCH v2 where it means start "now"
    69  		//       without doing the synthetic list of objects at the beginning (see #74022)
    70  		return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
    71  	default:
    72  		break
    73  	}
    74  
    75  	rw := &RetryWatcher{
    76  		lastResourceVersion: initialResourceVersion,
    77  		watcherClient:       watcherClient,
    78  		stopChan:            make(chan struct{}),
    79  		doneChan:            make(chan struct{}),
    80  		resultChan:          make(chan watch.Event, 0),
    81  		minRestartDelay:     minRestartDelay,
    82  	}
    83  
    84  	go rw.receive()
    85  	return rw, nil
    86  }
    87  
    88  func (rw *RetryWatcher) send(event watch.Event) bool {
    89  	// Writing to an unbuffered channel is blocking operation
    90  	// and we need to check if stop wasn't requested while doing so.
    91  	select {
    92  	case rw.resultChan <- event:
    93  		return true
    94  	case <-rw.stopChan:
    95  		return false
    96  	}
    97  }
    98  
    99  // doReceive returns true when it is done, false otherwise.
   100  // If it is not done the second return value holds the time to wait before calling it again.
   101  func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
   102  	watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
   103  		ResourceVersion:     rw.lastResourceVersion,
   104  		AllowWatchBookmarks: true,
   105  	})
   106  	// We are very unlikely to hit EOF here since we are just establishing the call,
   107  	// but it may happen that the apiserver is just shutting down (e.g. being restarted)
   108  	// This is consistent with how it is handled for informers
   109  	switch err {
   110  	case nil:
   111  		break
   112  
   113  	case io.EOF:
   114  		// watch closed normally
   115  		return false, 0
   116  
   117  	case io.ErrUnexpectedEOF:
   118  		klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err)
   119  		return false, 0
   120  
   121  	default:
   122  		msg := "Watch failed"
   123  		if net.IsProbableEOF(err) || net.IsTimeout(err) {
   124  			klog.V(5).InfoS(msg, "err", err)
   125  			// Retry
   126  			return false, 0
   127  		}
   128  
   129  		klog.ErrorS(err, msg)
   130  		// Retry
   131  		return false, 0
   132  	}
   133  
   134  	if watcher == nil {
   135  		klog.ErrorS(nil, "Watch returned nil watcher")
   136  		// Retry
   137  		return false, 0
   138  	}
   139  
   140  	ch := watcher.ResultChan()
   141  	defer watcher.Stop()
   142  
   143  	for {
   144  		select {
   145  		case <-rw.stopChan:
   146  			klog.V(4).InfoS("Stopping RetryWatcher.")
   147  			return true, 0
   148  		case event, ok := <-ch:
   149  			if !ok {
   150  				klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion)
   151  				return false, 0
   152  			}
   153  
   154  			// We need to inspect the event and get ResourceVersion out of it
   155  			switch event.Type {
   156  			case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
   157  				metaObject, ok := event.Object.(resourceVersionGetter)
   158  				if !ok {
   159  					_ = rw.send(watch.Event{
   160  						Type:   watch.Error,
   161  						Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
   162  					})
   163  					// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
   164  					return true, 0
   165  				}
   166  
   167  				resourceVersion := metaObject.GetResourceVersion()
   168  				if resourceVersion == "" {
   169  					_ = rw.send(watch.Event{
   170  						Type:   watch.Error,
   171  						Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
   172  					})
   173  					// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
   174  					return true, 0
   175  				}
   176  
   177  				// All is fine; send the non-bookmark events and update resource version.
   178  				if event.Type != watch.Bookmark {
   179  					ok = rw.send(event)
   180  					if !ok {
   181  						return true, 0
   182  					}
   183  				}
   184  				rw.lastResourceVersion = resourceVersion
   185  
   186  				continue
   187  
   188  			case watch.Error:
   189  				// This round trip allows us to handle unstructured status
   190  				errObject := apierrors.FromObject(event.Object)
   191  				statusErr, ok := errObject.(*apierrors.StatusError)
   192  				if !ok {
   193  					klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object)))
   194  					// Retry unknown errors
   195  					return false, 0
   196  				}
   197  
   198  				status := statusErr.ErrStatus
   199  
   200  				statusDelay := time.Duration(0)
   201  				if status.Details != nil {
   202  					statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second
   203  				}
   204  
   205  				switch status.Code {
   206  				case http.StatusGone:
   207  					// Never retry RV too old errors
   208  					_ = rw.send(event)
   209  					return true, 0
   210  
   211  				case http.StatusGatewayTimeout, http.StatusInternalServerError:
   212  					// Retry
   213  					return false, statusDelay
   214  
   215  				default:
   216  					// We retry by default. RetryWatcher is meant to proceed unless it is certain
   217  					// that it can't. If we are not certain, we proceed with retry and leave it
   218  					// up to the user to timeout if needed.
   219  
   220  					// Log here so we have a record of hitting the unexpected error
   221  					// and we can whitelist some error codes if we missed any that are expected.
   222  					klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object)))
   223  
   224  					// Retry
   225  					return false, statusDelay
   226  				}
   227  
   228  			default:
   229  				klog.Errorf("Failed to recognize Event type %q", event.Type)
   230  				_ = rw.send(watch.Event{
   231  					Type:   watch.Error,
   232  					Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
   233  				})
   234  				// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
   235  				return true, 0
   236  			}
   237  		}
   238  	}
   239  }
   240  
   241  // receive reads the result from a watcher, restarting it if necessary.
   242  func (rw *RetryWatcher) receive() {
   243  	defer close(rw.doneChan)
   244  	defer close(rw.resultChan)
   245  
   246  	klog.V(4).Info("Starting RetryWatcher.")
   247  	defer klog.V(4).Info("Stopping RetryWatcher.")
   248  
   249  	ctx, cancel := context.WithCancel(context.Background())
   250  	defer cancel()
   251  	go func() {
   252  		select {
   253  		case <-rw.stopChan:
   254  			cancel()
   255  			return
   256  		case <-ctx.Done():
   257  			return
   258  		}
   259  	}()
   260  
   261  	// We use non sliding until so we don't introduce delays on happy path when WATCH call
   262  	// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
   263  	wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
   264  		done, retryAfter := rw.doReceive()
   265  		if done {
   266  			cancel()
   267  			return
   268  		}
   269  
   270  		timer := time.NewTimer(retryAfter)
   271  		select {
   272  		case <-ctx.Done():
   273  			timer.Stop()
   274  			return
   275  		case <-timer.C:
   276  		}
   277  
   278  		klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
   279  	}, rw.minRestartDelay)
   280  }
   281  
   282  // ResultChan implements Interface.
   283  func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
   284  	return rw.resultChan
   285  }
   286  
   287  // Stop implements Interface.
   288  func (rw *RetryWatcher) Stop() {
   289  	close(rw.stopChan)
   290  }
   291  
   292  // Done allows the caller to be notified when Retry watcher stops.
   293  func (rw *RetryWatcher) Done() <-chan struct{} {
   294  	return rw.doneChan
   295  }
   296  

View as plain text