...

Source file src/k8s.io/apimachinery/pkg/util/wait/backoff.go

Documentation: k8s.io/apimachinery/pkg/util/wait

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package wait
    18  
    19  import (
    20  	"context"
    21  	"math"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/apimachinery/pkg/util/runtime"
    26  	"k8s.io/utils/clock"
    27  )
    28  
    29  // Backoff holds parameters applied to a Backoff function.
    30  type Backoff struct {
    31  	// The initial duration.
    32  	Duration time.Duration
    33  	// Duration is multiplied by factor each iteration, if factor is not zero
    34  	// and the limits imposed by Steps and Cap have not been reached.
    35  	// Should not be negative.
    36  	// The jitter does not contribute to the updates to the duration parameter.
    37  	Factor float64
    38  	// The sleep at each iteration is the duration plus an additional
    39  	// amount chosen uniformly at random from the interval between
    40  	// zero and `jitter*duration`.
    41  	Jitter float64
    42  	// The remaining number of iterations in which the duration
    43  	// parameter may change (but progress can be stopped earlier by
    44  	// hitting the cap). If not positive, the duration is not
    45  	// changed. Used for exponential backoff in combination with
    46  	// Factor and Cap.
    47  	Steps int
    48  	// A limit on revised values of the duration parameter. If a
    49  	// multiplication by the factor parameter would make the duration
    50  	// exceed the cap then the duration is set to the cap and the
    51  	// steps parameter is set to zero.
    52  	Cap time.Duration
    53  }
    54  
    55  // Step returns an amount of time to sleep determined by the original
    56  // Duration and Jitter. The backoff is mutated to update its Steps and
    57  // Duration. A nil Backoff always has a zero-duration step.
    58  func (b *Backoff) Step() time.Duration {
    59  	if b == nil {
    60  		return 0
    61  	}
    62  	var nextDuration time.Duration
    63  	nextDuration, b.Duration, b.Steps = delay(b.Steps, b.Duration, b.Cap, b.Factor, b.Jitter)
    64  	return nextDuration
    65  }
    66  
    67  // DelayFunc returns a function that will compute the next interval to
    68  // wait given the arguments in b. It does not mutate the original backoff
    69  // but the function is safe to use only from a single goroutine.
    70  func (b Backoff) DelayFunc() DelayFunc {
    71  	steps := b.Steps
    72  	duration := b.Duration
    73  	cap := b.Cap
    74  	factor := b.Factor
    75  	jitter := b.Jitter
    76  
    77  	return func() time.Duration {
    78  		var nextDuration time.Duration
    79  		// jitter is applied per step and is not cumulative over multiple steps
    80  		nextDuration, duration, steps = delay(steps, duration, cap, factor, jitter)
    81  		return nextDuration
    82  	}
    83  }
    84  
    85  // Timer returns a timer implementation appropriate to this backoff's parameters
    86  // for use with wait functions.
    87  func (b Backoff) Timer() Timer {
    88  	if b.Steps > 1 || b.Jitter != 0 {
    89  		return &variableTimer{new: internalClock.NewTimer, fn: b.DelayFunc()}
    90  	}
    91  	if b.Duration > 0 {
    92  		return &fixedTimer{new: internalClock.NewTicker, interval: b.Duration}
    93  	}
    94  	return newNoopTimer()
    95  }
    96  
    97  // delay implements the core delay algorithm used in this package.
    98  func delay(steps int, duration, cap time.Duration, factor, jitter float64) (_ time.Duration, next time.Duration, nextSteps int) {
    99  	// when steps is non-positive, do not alter the base duration
   100  	if steps < 1 {
   101  		if jitter > 0 {
   102  			return Jitter(duration, jitter), duration, 0
   103  		}
   104  		return duration, duration, 0
   105  	}
   106  	steps--
   107  
   108  	// calculate the next step's interval
   109  	if factor != 0 {
   110  		next = time.Duration(float64(duration) * factor)
   111  		if cap > 0 && next > cap {
   112  			next = cap
   113  			steps = 0
   114  		}
   115  	} else {
   116  		next = duration
   117  	}
   118  
   119  	// add jitter for this step
   120  	if jitter > 0 {
   121  		duration = Jitter(duration, jitter)
   122  	}
   123  
   124  	return duration, next, steps
   125  
   126  }
   127  
   128  // DelayWithReset returns a DelayFunc that will return the appropriate next interval to
   129  // wait. Every resetInterval the backoff parameters are reset to their initial state.
   130  // This method is safe to invoke from multiple goroutines, but all calls will advance
   131  // the backoff state when Factor is set. If Factor is zero, this method is the same as
   132  // invoking b.DelayFunc() since Steps has no impact without Factor. If resetInterval is
   133  // zero no backoff will be performed as the same calling DelayFunc with a zero factor
   134  // and steps.
   135  func (b Backoff) DelayWithReset(c clock.Clock, resetInterval time.Duration) DelayFunc {
   136  	if b.Factor <= 0 {
   137  		return b.DelayFunc()
   138  	}
   139  	if resetInterval <= 0 {
   140  		b.Steps = 0
   141  		b.Factor = 0
   142  		return b.DelayFunc()
   143  	}
   144  	return (&backoffManager{
   145  		backoff:        b,
   146  		initialBackoff: b,
   147  		resetInterval:  resetInterval,
   148  
   149  		clock:     c,
   150  		lastStart: c.Now(),
   151  		timer:     nil,
   152  	}).Step
   153  }
   154  
   155  // Until loops until stop channel is closed, running f every period.
   156  //
   157  // Until is syntactic sugar on top of JitterUntil with zero jitter factor and
   158  // with sliding = true (which means the timer for period starts after the f
   159  // completes).
   160  func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
   161  	JitterUntil(f, period, 0.0, true, stopCh)
   162  }
   163  
   164  // UntilWithContext loops until context is done, running f every period.
   165  //
   166  // UntilWithContext is syntactic sugar on top of JitterUntilWithContext
   167  // with zero jitter factor and with sliding = true (which means the timer
   168  // for period starts after the f completes).
   169  func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
   170  	JitterUntilWithContext(ctx, f, period, 0.0, true)
   171  }
   172  
   173  // NonSlidingUntil loops until stop channel is closed, running f every
   174  // period.
   175  //
   176  // NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter
   177  // factor, with sliding = false (meaning the timer for period starts at the same
   178  // time as the function starts).
   179  func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
   180  	JitterUntil(f, period, 0.0, false, stopCh)
   181  }
   182  
   183  // NonSlidingUntilWithContext loops until context is done, running f every
   184  // period.
   185  //
   186  // NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext
   187  // with zero jitter factor, with sliding = false (meaning the timer for period
   188  // starts at the same time as the function starts).
   189  func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) {
   190  	JitterUntilWithContext(ctx, f, period, 0.0, false)
   191  }
   192  
   193  // JitterUntil loops until stop channel is closed, running f every period.
   194  //
   195  // If jitterFactor is positive, the period is jittered before every run of f.
   196  // If jitterFactor is not positive, the period is unchanged and not jittered.
   197  //
   198  // If sliding is true, the period is computed after f runs. If it is false then
   199  // period includes the runtime for f.
   200  //
   201  // Close stopCh to stop. f may not be invoked if stop channel is already
   202  // closed. Pass NeverStop to if you don't want it stop.
   203  func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
   204  	BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh)
   205  }
   206  
   207  // BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager.
   208  //
   209  // If sliding is true, the period is computed after f runs. If it is false then
   210  // period includes the runtime for f.
   211  func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
   212  	var t clock.Timer
   213  	for {
   214  		select {
   215  		case <-stopCh:
   216  			return
   217  		default:
   218  		}
   219  
   220  		if !sliding {
   221  			t = backoff.Backoff()
   222  		}
   223  
   224  		func() {
   225  			defer runtime.HandleCrash()
   226  			f()
   227  		}()
   228  
   229  		if sliding {
   230  			t = backoff.Backoff()
   231  		}
   232  
   233  		// NOTE: b/c there is no priority selection in golang
   234  		// it is possible for this to race, meaning we could
   235  		// trigger t.C and stopCh, and t.C select falls through.
   236  		// In order to mitigate we re-check stopCh at the beginning
   237  		// of every loop to prevent extra executions of f().
   238  		select {
   239  		case <-stopCh:
   240  			if !t.Stop() {
   241  				<-t.C()
   242  			}
   243  			return
   244  		case <-t.C():
   245  		}
   246  	}
   247  }
   248  
   249  // JitterUntilWithContext loops until context is done, running f every period.
   250  //
   251  // If jitterFactor is positive, the period is jittered before every run of f.
   252  // If jitterFactor is not positive, the period is unchanged and not jittered.
   253  //
   254  // If sliding is true, the period is computed after f runs. If it is false then
   255  // period includes the runtime for f.
   256  //
   257  // Cancel context to stop. f may not be invoked if context is already expired.
   258  func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) {
   259  	JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done())
   260  }
   261  
   262  // backoffManager provides simple backoff behavior in a threadsafe manner to a caller.
   263  type backoffManager struct {
   264  	backoff        Backoff
   265  	initialBackoff Backoff
   266  	resetInterval  time.Duration
   267  
   268  	clock clock.Clock
   269  
   270  	lock      sync.Mutex
   271  	lastStart time.Time
   272  	timer     clock.Timer
   273  }
   274  
   275  // Step returns the expected next duration to wait.
   276  func (b *backoffManager) Step() time.Duration {
   277  	b.lock.Lock()
   278  	defer b.lock.Unlock()
   279  
   280  	switch {
   281  	case b.resetInterval == 0:
   282  		b.backoff = b.initialBackoff
   283  	case b.clock.Now().Sub(b.lastStart) > b.resetInterval:
   284  		b.backoff = b.initialBackoff
   285  		b.lastStart = b.clock.Now()
   286  	}
   287  	return b.backoff.Step()
   288  }
   289  
   290  // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer
   291  // for exponential backoff. The returned timer must be drained before calling Backoff() the second
   292  // time.
   293  func (b *backoffManager) Backoff() clock.Timer {
   294  	b.lock.Lock()
   295  	defer b.lock.Unlock()
   296  	if b.timer == nil {
   297  		b.timer = b.clock.NewTimer(b.Step())
   298  	} else {
   299  		b.timer.Reset(b.Step())
   300  	}
   301  	return b.timer
   302  }
   303  
   304  // Timer returns a new Timer instance that shares the clock and the reset behavior with all other
   305  // timers.
   306  func (b *backoffManager) Timer() Timer {
   307  	return DelayFunc(b.Step).Timer(b.clock)
   308  }
   309  
   310  // BackoffManager manages backoff with a particular scheme based on its underlying implementation.
   311  type BackoffManager interface {
   312  	// Backoff returns a shared clock.Timer that is Reset on every invocation. This method is not
   313  	// safe for use from multiple threads. It returns a timer for backoff, and caller shall backoff
   314  	// until Timer.C() drains. If the second Backoff() is called before the timer from the first
   315  	// Backoff() call finishes, the first timer will NOT be drained and result in undetermined
   316  	// behavior.
   317  	Backoff() clock.Timer
   318  }
   319  
   320  // Deprecated: Will be removed when the legacy polling functions are removed.
   321  type exponentialBackoffManagerImpl struct {
   322  	backoff              *Backoff
   323  	backoffTimer         clock.Timer
   324  	lastBackoffStart     time.Time
   325  	initialBackoff       time.Duration
   326  	backoffResetDuration time.Duration
   327  	clock                clock.Clock
   328  }
   329  
   330  // NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and
   331  // backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset.
   332  // This backoff manager is used to reduce load during upstream unhealthiness.
   333  //
   334  // Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
   335  // Backoff struct, use DelayWithReset() to get a DelayFunc that periodically resets itself, and then
   336  // invoke Timer() when calling wait.BackoffUntil.
   337  //
   338  // Instead of:
   339  //
   340  //	bm := wait.NewExponentialBackoffManager(init, max, reset, factor, jitter, clock)
   341  //	...
   342  //	wait.BackoffUntil(..., bm.Backoff, ...)
   343  //
   344  // Use:
   345  //
   346  //	delayFn := wait.Backoff{
   347  //	  Duration: init,
   348  //	  Cap:      max,
   349  //	  Steps:    int(math.Ceil(float64(max) / float64(init))), // now a required argument
   350  //	  Factor:   factor,
   351  //	  Jitter:   jitter,
   352  //	}.DelayWithReset(reset, clock)
   353  //	wait.BackoffUntil(..., delayFn.Timer(), ...)
   354  func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager {
   355  	return &exponentialBackoffManagerImpl{
   356  		backoff: &Backoff{
   357  			Duration: initBackoff,
   358  			Factor:   backoffFactor,
   359  			Jitter:   jitter,
   360  
   361  			// the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not
   362  			// what we ideally need here, we set it to max int and assume we will never use up the steps
   363  			Steps: math.MaxInt32,
   364  			Cap:   maxBackoff,
   365  		},
   366  		backoffTimer:         nil,
   367  		initialBackoff:       initBackoff,
   368  		lastBackoffStart:     c.Now(),
   369  		backoffResetDuration: resetDuration,
   370  		clock:                c,
   371  	}
   372  }
   373  
   374  func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
   375  	if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
   376  		b.backoff.Steps = math.MaxInt32
   377  		b.backoff.Duration = b.initialBackoff
   378  	}
   379  	b.lastBackoffStart = b.clock.Now()
   380  	return b.backoff.Step()
   381  }
   382  
   383  // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
   384  // The returned timer must be drained before calling Backoff() the second time
   385  func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
   386  	if b.backoffTimer == nil {
   387  		b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
   388  	} else {
   389  		b.backoffTimer.Reset(b.getNextBackoff())
   390  	}
   391  	return b.backoffTimer
   392  }
   393  
   394  // Deprecated: Will be removed when the legacy polling functions are removed.
   395  type jitteredBackoffManagerImpl struct {
   396  	clock        clock.Clock
   397  	duration     time.Duration
   398  	jitter       float64
   399  	backoffTimer clock.Timer
   400  }
   401  
   402  // NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter
   403  // is negative, backoff will not be jittered.
   404  //
   405  // Deprecated: Will be removed when the legacy Poll methods are removed. Callers should construct a
   406  // Backoff struct and invoke Timer() when calling wait.BackoffUntil.
   407  //
   408  // Instead of:
   409  //
   410  //	bm := wait.NewJitteredBackoffManager(duration, jitter, clock)
   411  //	...
   412  //	wait.BackoffUntil(..., bm.Backoff, ...)
   413  //
   414  // Use:
   415  //
   416  //	wait.BackoffUntil(..., wait.Backoff{Duration: duration, Jitter: jitter}.Timer(), ...)
   417  func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager {
   418  	return &jitteredBackoffManagerImpl{
   419  		clock:        c,
   420  		duration:     duration,
   421  		jitter:       jitter,
   422  		backoffTimer: nil,
   423  	}
   424  }
   425  
   426  func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration {
   427  	jitteredPeriod := j.duration
   428  	if j.jitter > 0.0 {
   429  		jitteredPeriod = Jitter(j.duration, j.jitter)
   430  	}
   431  	return jitteredPeriod
   432  }
   433  
   434  // Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff.
   435  // The returned timer must be drained before calling Backoff() the second time
   436  func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer {
   437  	backoff := j.getNextBackoff()
   438  	if j.backoffTimer == nil {
   439  		j.backoffTimer = j.clock.NewTimer(backoff)
   440  	} else {
   441  		j.backoffTimer.Reset(backoff)
   442  	}
   443  	return j.backoffTimer
   444  }
   445  
   446  // ExponentialBackoff repeats a condition check with exponential backoff.
   447  //
   448  // It repeatedly checks the condition and then sleeps, using `backoff.Step()`
   449  // to determine the length of the sleep and adjust Duration and Steps.
   450  // Stops and returns as soon as:
   451  // 1. the condition check returns true or an error,
   452  // 2. `backoff.Steps` checks of the condition have been done, or
   453  // 3. a sleep truncated by the cap on duration has been completed.
   454  // In case (1) the returned error is what the condition function returned.
   455  // In all other cases, ErrWaitTimeout is returned.
   456  //
   457  // Since backoffs are often subject to cancellation, we recommend using
   458  // ExponentialBackoffWithContext and passing a context to the method.
   459  func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
   460  	for backoff.Steps > 0 {
   461  		if ok, err := runConditionWithCrashProtection(condition); err != nil || ok {
   462  			return err
   463  		}
   464  		if backoff.Steps == 1 {
   465  			break
   466  		}
   467  		time.Sleep(backoff.Step())
   468  	}
   469  	return ErrWaitTimeout
   470  }
   471  
   472  // ExponentialBackoffWithContext repeats a condition check with exponential backoff.
   473  // It immediately returns an error if the condition returns an error, the context is cancelled
   474  // or hits the deadline, or if the maximum attempts defined in backoff is exceeded (ErrWaitTimeout).
   475  // If an error is returned by the condition the backoff stops immediately. The condition will
   476  // never be invoked more than backoff.Steps times.
   477  func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionWithContextFunc) error {
   478  	for backoff.Steps > 0 {
   479  		select {
   480  		case <-ctx.Done():
   481  			return ctx.Err()
   482  		default:
   483  		}
   484  
   485  		if ok, err := runConditionWithCrashProtectionWithContext(ctx, condition); err != nil || ok {
   486  			return err
   487  		}
   488  
   489  		if backoff.Steps == 1 {
   490  			break
   491  		}
   492  
   493  		waitBeforeRetry := backoff.Step()
   494  		select {
   495  		case <-ctx.Done():
   496  			return ctx.Err()
   497  		case <-time.After(waitBeforeRetry):
   498  		}
   499  	}
   500  
   501  	return ErrWaitTimeout
   502  }
   503  

View as plain text