...
1
16
17 package flowcontrol
18
19 import (
20 "context"
21 "errors"
22 "sync"
23 "time"
24
25 "golang.org/x/time/rate"
26 "k8s.io/utils/clock"
27 )
28
29 type PassiveRateLimiter interface {
30
31
32 TryAccept() bool
33
34 Stop()
35
36 QPS() float32
37 }
38
39 type RateLimiter interface {
40 PassiveRateLimiter
41
42 Accept()
43
44 Wait(ctx context.Context) error
45 }
46
47 type tokenBucketPassiveRateLimiter struct {
48 limiter *rate.Limiter
49 qps float32
50 clock clock.PassiveClock
51 }
52
53 type tokenBucketRateLimiter struct {
54 tokenBucketPassiveRateLimiter
55 clock Clock
56 }
57
58
59
60
61
62
63 func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
64 limiter := rate.NewLimiter(rate.Limit(qps), burst)
65 return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
66 }
67
68
69
70 func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
71 limiter := rate.NewLimiter(rate.Limit(qps), burst)
72 return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
73 }
74
75
76 type Clock interface {
77 clock.PassiveClock
78 Sleep(time.Duration)
79 }
80
81 var _ Clock = (*clock.RealClock)(nil)
82
83
84
85 func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
86 limiter := rate.NewLimiter(rate.Limit(qps), burst)
87 return newTokenBucketRateLimiterWithClock(limiter, c, qps)
88 }
89
90
91
92
93 func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
94 limiter := rate.NewLimiter(rate.Limit(qps), burst)
95 return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
96 }
97
98 func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
99 return &tokenBucketRateLimiter{
100 tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
101 clock: c,
102 }
103 }
104
105 func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
106 return &tokenBucketPassiveRateLimiter{
107 limiter: limiter,
108 qps: qps,
109 clock: c,
110 }
111 }
112
113 func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
114 }
115
116 func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
117 return tbprl.qps
118 }
119
120 func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
121 return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
122 }
123
124
125 func (tbrl *tokenBucketRateLimiter) Accept() {
126 now := tbrl.clock.Now()
127 tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
128 }
129
130 func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
131 return tbrl.limiter.Wait(ctx)
132 }
133
134 type fakeAlwaysRateLimiter struct{}
135
136 func NewFakeAlwaysRateLimiter() RateLimiter {
137 return &fakeAlwaysRateLimiter{}
138 }
139
140 func (t *fakeAlwaysRateLimiter) TryAccept() bool {
141 return true
142 }
143
144 func (t *fakeAlwaysRateLimiter) Stop() {}
145
146 func (t *fakeAlwaysRateLimiter) Accept() {}
147
148 func (t *fakeAlwaysRateLimiter) QPS() float32 {
149 return 1
150 }
151
152 func (t *fakeAlwaysRateLimiter) Wait(ctx context.Context) error {
153 return nil
154 }
155
156 type fakeNeverRateLimiter struct {
157 wg sync.WaitGroup
158 }
159
160 func NewFakeNeverRateLimiter() RateLimiter {
161 rl := fakeNeverRateLimiter{}
162 rl.wg.Add(1)
163 return &rl
164 }
165
166 func (t *fakeNeverRateLimiter) TryAccept() bool {
167 return false
168 }
169
170 func (t *fakeNeverRateLimiter) Stop() {
171 t.wg.Done()
172 }
173
174 func (t *fakeNeverRateLimiter) Accept() {
175 t.wg.Wait()
176 }
177
178 func (t *fakeNeverRateLimiter) QPS() float32 {
179 return 1
180 }
181
182 func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
183 return errors.New("can not be accept")
184 }
185
186 var (
187 _ RateLimiter = (*tokenBucketRateLimiter)(nil)
188 _ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
189 _ RateLimiter = (*fakeNeverRateLimiter)(nil)
190 )
191
192 var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)
193
View as plain text