1
16
17 package flowcontrol
18
19 import (
20 "math/rand"
21 "sync"
22 "time"
23
24 "k8s.io/utils/clock"
25 testingclock "k8s.io/utils/clock/testing"
26 )
27
28 type backoffEntry struct {
29 backoff time.Duration
30 lastUpdate time.Time
31 }
32
33 type Backoff struct {
34 sync.RWMutex
35 Clock clock.Clock
36 defaultDuration time.Duration
37 maxDuration time.Duration
38 perItemBackoff map[string]*backoffEntry
39 rand *rand.Rand
40
41
42
43
44 maxJitterFactor float64
45 }
46
47 func NewFakeBackOff(initial, max time.Duration, tc *testingclock.FakeClock) *Backoff {
48 return newBackoff(tc, initial, max, 0.0)
49 }
50
51 func NewBackOff(initial, max time.Duration) *Backoff {
52 return NewBackOffWithJitter(initial, max, 0.0)
53 }
54
55 func NewFakeBackOffWithJitter(initial, max time.Duration, tc *testingclock.FakeClock, maxJitterFactor float64) *Backoff {
56 return newBackoff(tc, initial, max, maxJitterFactor)
57 }
58
59 func NewBackOffWithJitter(initial, max time.Duration, maxJitterFactor float64) *Backoff {
60 clock := clock.RealClock{}
61 return newBackoff(clock, initial, max, maxJitterFactor)
62 }
63
64 func newBackoff(clock clock.Clock, initial, max time.Duration, maxJitterFactor float64) *Backoff {
65 var random *rand.Rand
66 if maxJitterFactor > 0 {
67 random = rand.New(rand.NewSource(clock.Now().UnixNano()))
68 }
69 return &Backoff{
70 perItemBackoff: map[string]*backoffEntry{},
71 Clock: clock,
72 defaultDuration: initial,
73 maxDuration: max,
74 maxJitterFactor: maxJitterFactor,
75 rand: random,
76 }
77 }
78
79
80 func (p *Backoff) Get(id string) time.Duration {
81 p.RLock()
82 defer p.RUnlock()
83 var delay time.Duration
84 entry, ok := p.perItemBackoff[id]
85 if ok {
86 delay = entry.backoff
87 }
88 return delay
89 }
90
91
92 func (p *Backoff) Next(id string, eventTime time.Time) {
93 p.Lock()
94 defer p.Unlock()
95 entry, ok := p.perItemBackoff[id]
96 if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
97 entry = p.initEntryUnsafe(id)
98 entry.backoff += p.jitter(entry.backoff)
99 } else {
100 delay := entry.backoff * 2
101 delay += p.jitter(entry.backoff)
102 entry.backoff = min(delay, p.maxDuration)
103 }
104 entry.lastUpdate = p.Clock.Now()
105 }
106
107
108 func (p *Backoff) Reset(id string) {
109 p.Lock()
110 defer p.Unlock()
111 delete(p.perItemBackoff, id)
112 }
113
114
115 func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
116 p.RLock()
117 defer p.RUnlock()
118 entry, ok := p.perItemBackoff[id]
119 if !ok {
120 return false
121 }
122 if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
123 return false
124 }
125 return p.Clock.Since(eventTime) < entry.backoff
126 }
127
128
129 func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
130 p.RLock()
131 defer p.RUnlock()
132 entry, ok := p.perItemBackoff[id]
133 if !ok {
134 return false
135 }
136 if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
137 return false
138 }
139 return eventTime.Sub(entry.lastUpdate) < entry.backoff
140 }
141
142
143
144 func (p *Backoff) GC() {
145 p.Lock()
146 defer p.Unlock()
147 now := p.Clock.Now()
148 for id, entry := range p.perItemBackoff {
149 if now.Sub(entry.lastUpdate) > p.maxDuration*2 {
150
151 delete(p.perItemBackoff, id)
152 }
153 }
154 }
155
156 func (p *Backoff) DeleteEntry(id string) {
157 p.Lock()
158 defer p.Unlock()
159 delete(p.perItemBackoff, id)
160 }
161
162
163 func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
164 entry := &backoffEntry{backoff: p.defaultDuration}
165 p.perItemBackoff[id] = entry
166 return entry
167 }
168
169 func (p *Backoff) jitter(delay time.Duration) time.Duration {
170 if p.rand == nil {
171 return 0
172 }
173
174 return time.Duration(p.rand.Float64() * p.maxJitterFactor * float64(delay))
175 }
176
177
178 func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
179 return eventTime.Sub(lastUpdate) > maxDuration*2
180 }
181
View as plain text