1
16
17 package eventratelimit
18
19 import (
20 "context"
21 "net/http"
22 "testing"
23 "time"
24
25 "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/apiserver/pkg/admission"
29 "k8s.io/apiserver/pkg/authentication/user"
30 api "k8s.io/kubernetes/pkg/apis/core"
31 eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
32 testingclock "k8s.io/utils/clock/testing"
33 )
34
35 const (
36 qps = 1
37 eventKind = "Event"
38 nonEventKind = "NonEvent"
39 )
40
41
42 func attributesForRequest(rq request) admission.Attributes {
43 return admission.NewAttributesRecord(
44 rq.event,
45 nil,
46 api.Kind(rq.kind).WithVersion("version"),
47 rq.namespace,
48 "name",
49 api.Resource("resource").WithVersion("version"),
50 "",
51 admission.Create,
52 &metav1.CreateOptions{},
53 rq.dryRun,
54 &user.DefaultInfo{Name: rq.username})
55 }
56
57 type request struct {
58 kind string
59 namespace string
60 username string
61 event *api.Event
62 delay time.Duration
63 accepted bool
64 dryRun bool
65 }
66
67 func newRequest(kind string) request {
68 return request{
69 kind: kind,
70 accepted: true,
71 }
72 }
73
74 func newEventRequest() request {
75 return newRequest(eventKind)
76 }
77
78 func newNonEventRequest() request {
79 return newRequest(nonEventKind)
80 }
81
82 func (r request) withNamespace(namespace string) request {
83 r.namespace = namespace
84 return r
85 }
86
87 func (r request) withEvent(event *api.Event) request {
88 r.event = event
89 return r
90 }
91
92 func (r request) withEventComponent(component string) request {
93 return r.withEvent(&api.Event{
94 Source: api.EventSource{
95 Component: component,
96 },
97 })
98 }
99
100 func (r request) withDryRun(dryRun bool) request {
101 r.dryRun = dryRun
102 return r
103 }
104
105 func (r request) withUser(name string) request {
106 r.username = name
107 return r
108 }
109
110 func (r request) blocked() request {
111 r.accepted = false
112 return r
113 }
114
115
116 func (r request) withDelay(delayInSeconds int) request {
117 r.delay = time.Duration(delayInSeconds) * time.Second
118 return r
119 }
120
121
122
123 func createSourceAndObjectKeyInclusionRequests(eventFactory func(label string) *api.Event) []request {
124 return []request{
125 newEventRequest().withEvent(eventFactory("A")),
126 newEventRequest().withEvent(eventFactory("A")).blocked(),
127 newEventRequest().withEvent(eventFactory("B")),
128 }
129 }
130
131 func TestEventRateLimiting(t *testing.T) {
132 cases := []struct {
133 name string
134 serverBurst int32
135 namespaceBurst int32
136 namespaceCacheSize int32
137 sourceAndObjectBurst int32
138 sourceAndObjectCacheSize int32
139 userBurst int32
140 userCacheSize int32
141 requests []request
142 }{
143 {
144 name: "event not blocked when tokens available",
145 serverBurst: 3,
146 requests: []request{
147 newEventRequest(),
148 },
149 },
150 {
151 name: "non-event not blocked",
152 serverBurst: 3,
153 requests: []request{
154 newNonEventRequest(),
155 },
156 },
157 {
158 name: "event blocked after tokens exhausted",
159 serverBurst: 3,
160 requests: []request{
161 newEventRequest(),
162 newEventRequest(),
163 newEventRequest(),
164 newEventRequest().blocked(),
165 },
166 },
167 {
168 name: "event not blocked by dry-run requests",
169 serverBurst: 3,
170 requests: []request{
171 newEventRequest(),
172 newEventRequest(),
173 newEventRequest().withDryRun(true),
174 newEventRequest().withDryRun(true),
175 newEventRequest().withDryRun(true),
176 newEventRequest().withDryRun(true),
177 newEventRequest(),
178 newEventRequest().blocked(),
179 newEventRequest().withDryRun(true),
180 },
181 },
182 {
183 name: "non-event not blocked after tokens exhausted",
184 serverBurst: 3,
185 requests: []request{
186 newEventRequest(),
187 newEventRequest(),
188 newEventRequest(),
189 newNonEventRequest(),
190 },
191 },
192 {
193 name: "non-events should not count against limit",
194 serverBurst: 3,
195 requests: []request{
196 newEventRequest(),
197 newEventRequest(),
198 newNonEventRequest(),
199 newEventRequest(),
200 },
201 },
202 {
203 name: "event accepted after token refill",
204 serverBurst: 3,
205 requests: []request{
206 newEventRequest(),
207 newEventRequest(),
208 newEventRequest(),
209 newEventRequest().blocked(),
210 newEventRequest().withDelay(1),
211 },
212 },
213 {
214 name: "event blocked by namespace limits",
215 serverBurst: 100,
216 namespaceBurst: 3,
217 namespaceCacheSize: 10,
218 requests: []request{
219 newEventRequest().withNamespace("A"),
220 newEventRequest().withNamespace("A"),
221 newEventRequest().withNamespace("A"),
222 newEventRequest().withNamespace("A").blocked(),
223 },
224 },
225 {
226 name: "event from other namespace not blocked",
227 serverBurst: 100,
228 namespaceBurst: 3,
229 namespaceCacheSize: 10,
230 requests: []request{
231 newEventRequest().withNamespace("A"),
232 newEventRequest().withNamespace("A"),
233 newEventRequest().withNamespace("A"),
234 newEventRequest().withNamespace("B"),
235 },
236 },
237 {
238 name: "events from other namespaces should not count against limit",
239 serverBurst: 100,
240 namespaceBurst: 3,
241 namespaceCacheSize: 10,
242 requests: []request{
243 newEventRequest().withNamespace("A"),
244 newEventRequest().withNamespace("A"),
245 newEventRequest().withNamespace("B"),
246 newEventRequest().withNamespace("A"),
247 },
248 },
249 {
250 name: "event accepted after namespace token refill",
251 serverBurst: 100,
252 namespaceBurst: 3,
253 namespaceCacheSize: 10,
254 requests: []request{
255 newEventRequest().withNamespace("A"),
256 newEventRequest().withNamespace("A"),
257 newEventRequest().withNamespace("A"),
258 newEventRequest().withNamespace("A").blocked(),
259 newEventRequest().withNamespace("A").withDelay(1),
260 },
261 },
262 {
263 name: "event from other namespaces should not clear namespace limits",
264 serverBurst: 100,
265 namespaceBurst: 3,
266 namespaceCacheSize: 10,
267 requests: []request{
268 newEventRequest().withNamespace("A"),
269 newEventRequest().withNamespace("A"),
270 newEventRequest().withNamespace("A"),
271 newEventRequest().withNamespace("B"),
272 newEventRequest().withNamespace("A").blocked(),
273 },
274 },
275 {
276 name: "namespace limits from lru namespace should clear when cache size exceeded",
277 serverBurst: 100,
278 namespaceBurst: 3,
279 namespaceCacheSize: 2,
280 requests: []request{
281 newEventRequest().withNamespace("A"),
282 newEventRequest().withNamespace("A"),
283 newEventRequest().withNamespace("B"),
284 newEventRequest().withNamespace("B"),
285 newEventRequest().withNamespace("B"),
286 newEventRequest().withNamespace("A"),
287 newEventRequest().withNamespace("B").blocked(),
288 newEventRequest().withNamespace("A").blocked(),
289
290 newEventRequest().withNamespace("C"),
291 newEventRequest().withNamespace("A").blocked(),
292 newEventRequest().withNamespace("B"),
293 },
294 },
295 {
296 name: "event blocked by source+object limits",
297 serverBurst: 100,
298 sourceAndObjectBurst: 3,
299 sourceAndObjectCacheSize: 10,
300 requests: []request{
301 newEventRequest().withEventComponent("A"),
302 newEventRequest().withEventComponent("A"),
303 newEventRequest().withEventComponent("A"),
304 newEventRequest().withEventComponent("A").blocked(),
305 },
306 },
307 {
308 name: "event from other source+object not blocked",
309 serverBurst: 100,
310 sourceAndObjectBurst: 3,
311 sourceAndObjectCacheSize: 10,
312 requests: []request{
313 newEventRequest().withEventComponent("A"),
314 newEventRequest().withEventComponent("A"),
315 newEventRequest().withEventComponent("A"),
316 newEventRequest().withEventComponent("B"),
317 },
318 },
319 {
320 name: "events from other source+object should not count against limit",
321 serverBurst: 100,
322 sourceAndObjectBurst: 3,
323 sourceAndObjectCacheSize: 10,
324 requests: []request{
325 newEventRequest().withEventComponent("A"),
326 newEventRequest().withEventComponent("A"),
327 newEventRequest().withEventComponent("B"),
328 newEventRequest().withEventComponent("A"),
329 },
330 },
331 {
332 name: "event accepted after source+object token refill",
333 serverBurst: 100,
334 sourceAndObjectBurst: 3,
335 sourceAndObjectCacheSize: 10,
336 requests: []request{
337 newEventRequest().withEventComponent("A"),
338 newEventRequest().withEventComponent("A"),
339 newEventRequest().withEventComponent("A"),
340 newEventRequest().withEventComponent("A").blocked(),
341 newEventRequest().withEventComponent("A").withDelay(1),
342 },
343 },
344 {
345 name: "event from other source+object should not clear source+object limits",
346 serverBurst: 100,
347 sourceAndObjectBurst: 3,
348 sourceAndObjectCacheSize: 10,
349 requests: []request{
350 newEventRequest().withEventComponent("A"),
351 newEventRequest().withEventComponent("A"),
352 newEventRequest().withEventComponent("A"),
353 newEventRequest().withEventComponent("B"),
354 newEventRequest().withEventComponent("A").blocked(),
355 },
356 },
357 {
358 name: "source+object limits from lru source+object should clear when cache size exceeded",
359 serverBurst: 100,
360 sourceAndObjectBurst: 3,
361 sourceAndObjectCacheSize: 2,
362 requests: []request{
363 newEventRequest().withEventComponent("A"),
364 newEventRequest().withEventComponent("A"),
365 newEventRequest().withEventComponent("B"),
366 newEventRequest().withEventComponent("B"),
367 newEventRequest().withEventComponent("B"),
368 newEventRequest().withEventComponent("A"),
369 newEventRequest().withEventComponent("B").blocked(),
370 newEventRequest().withEventComponent("A").blocked(),
371
372 newEventRequest().withEventComponent("C"),
373 newEventRequest().withEventComponent("A").blocked(),
374 newEventRequest().withEventComponent("B"),
375 },
376 },
377 {
378 name: "source host should be included in source+object key",
379 serverBurst: 100,
380 sourceAndObjectBurst: 1,
381 sourceAndObjectCacheSize: 10,
382 requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event {
383 return &api.Event{Source: api.EventSource{Host: label}}
384 }),
385 },
386 {
387 name: "involved object kind should be included in source+object key",
388 serverBurst: 100,
389 sourceAndObjectBurst: 1,
390 sourceAndObjectCacheSize: 10,
391 requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event {
392 return &api.Event{InvolvedObject: api.ObjectReference{Kind: label}}
393 }),
394 },
395 {
396 name: "involved object namespace should be included in source+object key",
397 serverBurst: 100,
398 sourceAndObjectBurst: 1,
399 sourceAndObjectCacheSize: 10,
400 requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event {
401 return &api.Event{InvolvedObject: api.ObjectReference{Namespace: label}}
402 }),
403 },
404 {
405 name: "involved object name should be included in source+object key",
406 serverBurst: 100,
407 sourceAndObjectBurst: 1,
408 sourceAndObjectCacheSize: 10,
409 requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event {
410 return &api.Event{InvolvedObject: api.ObjectReference{Name: label}}
411 }),
412 },
413 {
414 name: "involved object UID should be included in source+object key",
415 serverBurst: 100,
416 sourceAndObjectBurst: 1,
417 sourceAndObjectCacheSize: 10,
418 requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event {
419 return &api.Event{InvolvedObject: api.ObjectReference{UID: types.UID(label)}}
420 }),
421 },
422 {
423 name: "involved object APIVersion should be included in source+object key",
424 serverBurst: 100,
425 sourceAndObjectBurst: 1,
426 sourceAndObjectCacheSize: 10,
427 requests: createSourceAndObjectKeyInclusionRequests(func(label string) *api.Event {
428 return &api.Event{InvolvedObject: api.ObjectReference{APIVersion: label}}
429 }),
430 },
431 {
432 name: "event blocked by user limits",
433 userBurst: 3,
434 userCacheSize: 10,
435 requests: []request{
436 newEventRequest().withUser("A"),
437 newEventRequest().withUser("A"),
438 newEventRequest().withUser("A"),
439 newEventRequest().withUser("A").blocked(),
440 },
441 },
442 {
443 name: "event from other user not blocked",
444 requests: []request{
445 newEventRequest().withUser("A"),
446 newEventRequest().withUser("A"),
447 newEventRequest().withUser("A"),
448 newEventRequest().withUser("B"),
449 },
450 },
451 {
452 name: "events from other user should not count against limit",
453 requests: []request{
454 newEventRequest().withUser("A"),
455 newEventRequest().withUser("A"),
456 newEventRequest().withUser("B"),
457 newEventRequest().withUser("A"),
458 },
459 },
460 }
461
462 for _, tc := range cases {
463 t.Run(tc.name, func(t *testing.T) {
464 clock := testingclock.NewFakeClock(time.Now())
465 config := &eventratelimitapi.Configuration{}
466 if tc.serverBurst > 0 {
467 serverLimit := eventratelimitapi.Limit{
468 Type: eventratelimitapi.ServerLimitType,
469 QPS: qps,
470 Burst: tc.serverBurst,
471 }
472 config.Limits = append(config.Limits, serverLimit)
473 }
474 if tc.namespaceBurst > 0 {
475 namespaceLimit := eventratelimitapi.Limit{
476 Type: eventratelimitapi.NamespaceLimitType,
477 Burst: tc.namespaceBurst,
478 QPS: qps,
479 CacheSize: tc.namespaceCacheSize,
480 }
481 config.Limits = append(config.Limits, namespaceLimit)
482 }
483 if tc.userBurst > 0 {
484 userLimit := eventratelimitapi.Limit{
485 Type: eventratelimitapi.UserLimitType,
486 Burst: tc.userBurst,
487 QPS: qps,
488 CacheSize: tc.userCacheSize,
489 }
490 config.Limits = append(config.Limits, userLimit)
491 }
492 if tc.sourceAndObjectBurst > 0 {
493 sourceAndObjectLimit := eventratelimitapi.Limit{
494 Type: eventratelimitapi.SourceAndObjectLimitType,
495 Burst: tc.sourceAndObjectBurst,
496 QPS: qps,
497 CacheSize: tc.sourceAndObjectCacheSize,
498 }
499 config.Limits = append(config.Limits, sourceAndObjectLimit)
500 }
501 eventratelimit, err := newEventRateLimit(config, clock)
502 if err != nil {
503 t.Fatalf("%v: Could not create EventRateLimit: %v", tc.name, err)
504 }
505
506 for rqIndex, rq := range tc.requests {
507 if rq.delay > 0 {
508 clock.Step(rq.delay)
509 }
510 attributes := attributesForRequest(rq)
511 err = eventratelimit.Validate(context.TODO(), attributes, nil)
512 if rq.accepted != (err == nil) {
513 expectedAction := "admitted"
514 if !rq.accepted {
515 expectedAction = "blocked"
516 }
517 t.Fatalf("%v: Request %v should have been %v: %v", tc.name, rqIndex, expectedAction, err)
518 }
519 if err != nil {
520 statusErr, ok := err.(*errors.StatusError)
521 if ok && statusErr.ErrStatus.Code != http.StatusTooManyRequests {
522 t.Fatalf("%v: Request %v should yield a 429 response: %v", tc.name, rqIndex, err)
523 }
524 }
525 }
526 })
527 }
528 }
529
View as plain text