...

Source file src/k8s.io/client-go/util/flowcontrol/backoff.go

Documentation: k8s.io/client-go/util/flowcontrol

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package flowcontrol
    18  
    19  import (
    20  	"math/rand"
    21  	"sync"
    22  	"time"
    23  
    24  	"k8s.io/utils/clock"
    25  	testingclock "k8s.io/utils/clock/testing"
    26  )
    27  
    28  type backoffEntry struct {
    29  	backoff    time.Duration
    30  	lastUpdate time.Time
    31  }
    32  
    33  type Backoff struct {
    34  	sync.RWMutex
    35  	Clock           clock.Clock
    36  	defaultDuration time.Duration
    37  	maxDuration     time.Duration
    38  	perItemBackoff  map[string]*backoffEntry
    39  	rand            *rand.Rand
    40  
    41  	// maxJitterFactor adds jitter to the exponentially backed off delay.
    42  	// if maxJitterFactor is zero, no jitter is added to the delay in
    43  	// order to maintain current behavior.
    44  	maxJitterFactor float64
    45  }
    46  
    47  func NewFakeBackOff(initial, max time.Duration, tc *testingclock.FakeClock) *Backoff {
    48  	return newBackoff(tc, initial, max, 0.0)
    49  }
    50  
    51  func NewBackOff(initial, max time.Duration) *Backoff {
    52  	return NewBackOffWithJitter(initial, max, 0.0)
    53  }
    54  
    55  func NewFakeBackOffWithJitter(initial, max time.Duration, tc *testingclock.FakeClock, maxJitterFactor float64) *Backoff {
    56  	return newBackoff(tc, initial, max, maxJitterFactor)
    57  }
    58  
    59  func NewBackOffWithJitter(initial, max time.Duration, maxJitterFactor float64) *Backoff {
    60  	clock := clock.RealClock{}
    61  	return newBackoff(clock, initial, max, maxJitterFactor)
    62  }
    63  
    64  func newBackoff(clock clock.Clock, initial, max time.Duration, maxJitterFactor float64) *Backoff {
    65  	var random *rand.Rand
    66  	if maxJitterFactor > 0 {
    67  		random = rand.New(rand.NewSource(clock.Now().UnixNano()))
    68  	}
    69  	return &Backoff{
    70  		perItemBackoff:  map[string]*backoffEntry{},
    71  		Clock:           clock,
    72  		defaultDuration: initial,
    73  		maxDuration:     max,
    74  		maxJitterFactor: maxJitterFactor,
    75  		rand:            random,
    76  	}
    77  }
    78  
    79  // Get the current backoff Duration
    80  func (p *Backoff) Get(id string) time.Duration {
    81  	p.RLock()
    82  	defer p.RUnlock()
    83  	var delay time.Duration
    84  	entry, ok := p.perItemBackoff[id]
    85  	if ok {
    86  		delay = entry.backoff
    87  	}
    88  	return delay
    89  }
    90  
    91  // move backoff to the next mark, capping at maxDuration
    92  func (p *Backoff) Next(id string, eventTime time.Time) {
    93  	p.Lock()
    94  	defer p.Unlock()
    95  	entry, ok := p.perItemBackoff[id]
    96  	if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
    97  		entry = p.initEntryUnsafe(id)
    98  		entry.backoff += p.jitter(entry.backoff)
    99  	} else {
   100  		delay := entry.backoff * 2       // exponential
   101  		delay += p.jitter(entry.backoff) // add some jitter to the delay
   102  		entry.backoff = min(delay, p.maxDuration)
   103  	}
   104  	entry.lastUpdate = p.Clock.Now()
   105  }
   106  
   107  // Reset forces clearing of all backoff data for a given key.
   108  func (p *Backoff) Reset(id string) {
   109  	p.Lock()
   110  	defer p.Unlock()
   111  	delete(p.perItemBackoff, id)
   112  }
   113  
   114  // Returns True if the elapsed time since eventTime is smaller than the current backoff window
   115  func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
   116  	p.RLock()
   117  	defer p.RUnlock()
   118  	entry, ok := p.perItemBackoff[id]
   119  	if !ok {
   120  		return false
   121  	}
   122  	if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
   123  		return false
   124  	}
   125  	return p.Clock.Since(eventTime) < entry.backoff
   126  }
   127  
   128  // Returns True if time since lastupdate is less than the current backoff window.
   129  func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
   130  	p.RLock()
   131  	defer p.RUnlock()
   132  	entry, ok := p.perItemBackoff[id]
   133  	if !ok {
   134  		return false
   135  	}
   136  	if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
   137  		return false
   138  	}
   139  	return eventTime.Sub(entry.lastUpdate) < entry.backoff
   140  }
   141  
   142  // Garbage collect records that have aged past maxDuration. Backoff users are expected
   143  // to invoke this periodically.
   144  func (p *Backoff) GC() {
   145  	p.Lock()
   146  	defer p.Unlock()
   147  	now := p.Clock.Now()
   148  	for id, entry := range p.perItemBackoff {
   149  		if now.Sub(entry.lastUpdate) > p.maxDuration*2 {
   150  			// GC when entry has not been updated for 2*maxDuration
   151  			delete(p.perItemBackoff, id)
   152  		}
   153  	}
   154  }
   155  
   156  func (p *Backoff) DeleteEntry(id string) {
   157  	p.Lock()
   158  	defer p.Unlock()
   159  	delete(p.perItemBackoff, id)
   160  }
   161  
   162  // Take a lock on *Backoff, before calling initEntryUnsafe
   163  func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
   164  	entry := &backoffEntry{backoff: p.defaultDuration}
   165  	p.perItemBackoff[id] = entry
   166  	return entry
   167  }
   168  
   169  func (p *Backoff) jitter(delay time.Duration) time.Duration {
   170  	if p.rand == nil {
   171  		return 0
   172  	}
   173  
   174  	return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay))
   175  }
   176  
   177  // After 2*maxDuration we restart the backoff factor to the beginning
   178  func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
   179  	return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
   180  }
   181  

View as plain text