...
1
16
17 package eventratelimit
18
19 import (
20 "fmt"
21 "strings"
22
23 "k8s.io/apiserver/pkg/admission"
24 "k8s.io/client-go/util/flowcontrol"
25 api "k8s.io/kubernetes/pkg/apis/core"
26 eventratelimitapi "k8s.io/kubernetes/plugin/pkg/admission/eventratelimit/apis/eventratelimit"
27 "k8s.io/utils/lru"
28 )
29
30 const (
31
32 defaultCacheSize = 4096
33 )
34
35
36 type limitEnforcer struct {
37
38 limitType eventratelimitapi.LimitType
39
40 cache cache
41
42 keyFunc func(admission.Attributes) string
43 }
44
45 func newLimitEnforcer(config eventratelimitapi.Limit, clock flowcontrol.Clock) (*limitEnforcer, error) {
46 rateLimiterFactory := func() flowcontrol.RateLimiter {
47 return flowcontrol.NewTokenBucketRateLimiterWithClock(float32(config.QPS), int(config.Burst), clock)
48 }
49
50 if config.Type == eventratelimitapi.ServerLimitType {
51 return &limitEnforcer{
52 limitType: config.Type,
53 cache: &singleCache{
54 rateLimiter: rateLimiterFactory(),
55 },
56 keyFunc: getServerKey,
57 }, nil
58 }
59
60 cacheSize := int(config.CacheSize)
61 if cacheSize == 0 {
62 cacheSize = defaultCacheSize
63 }
64 underlyingCache := lru.New(cacheSize)
65 cache := &lruCache{
66 rateLimiterFactory: rateLimiterFactory,
67 cache: underlyingCache,
68 }
69
70 var keyFunc func(admission.Attributes) string
71 switch t := config.Type; t {
72 case eventratelimitapi.NamespaceLimitType:
73 keyFunc = getNamespaceKey
74 case eventratelimitapi.UserLimitType:
75 keyFunc = getUserKey
76 case eventratelimitapi.SourceAndObjectLimitType:
77 keyFunc = getSourceAndObjectKey
78 default:
79 return nil, fmt.Errorf("unknown event rate limit type: %v", t)
80 }
81
82 return &limitEnforcer{
83 limitType: config.Type,
84 cache: cache,
85 keyFunc: keyFunc,
86 }, nil
87 }
88
89 func (enforcer *limitEnforcer) accept(attr admission.Attributes) error {
90 key := enforcer.keyFunc(attr)
91 rateLimiter := enforcer.cache.get(key)
92
93
94 allow := rateLimiter.TryAccept()
95
96 if !allow {
97 return fmt.Errorf("limit reached on type %v for key %v", enforcer.limitType, key)
98 }
99
100 return nil
101 }
102
103 func getServerKey(attr admission.Attributes) string {
104 return ""
105 }
106
107
108 func getNamespaceKey(attr admission.Attributes) string {
109 return attr.GetNamespace()
110 }
111
112
113 func getUserKey(attr admission.Attributes) string {
114 userInfo := attr.GetUserInfo()
115 if userInfo == nil {
116 return ""
117 }
118 return userInfo.GetName()
119 }
120
121
122 func getSourceAndObjectKey(attr admission.Attributes) string {
123 object := attr.GetObject()
124 if object == nil {
125 return ""
126 }
127 event, ok := object.(*api.Event)
128 if !ok {
129 return ""
130 }
131 return strings.Join([]string{
132 event.Source.Component,
133 event.Source.Host,
134 event.InvolvedObject.Kind,
135 event.InvolvedObject.Namespace,
136 event.InvolvedObject.Name,
137 string(event.InvolvedObject.UID),
138 event.InvolvedObject.APIVersion,
139 }, "")
140 }
141
View as plain text