1
16
17 package resourcequota
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "time"
24
25 "k8s.io/klog/v2"
26
27 "k8s.io/apimachinery/pkg/api/meta"
28 "k8s.io/apimachinery/pkg/runtime/schema"
29 utilerrors "k8s.io/apimachinery/pkg/util/errors"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 quota "k8s.io/apiserver/pkg/quota/v1"
33 "k8s.io/apiserver/pkg/quota/v1/generic"
34 "k8s.io/client-go/tools/cache"
35 "k8s.io/client-go/util/workqueue"
36 "k8s.io/controller-manager/pkg/informerfactory"
37 "k8s.io/kubernetes/pkg/controller"
38 )
39
40 type eventType int
41
42 func (e eventType) String() string {
43 switch e {
44 case addEvent:
45 return "add"
46 case updateEvent:
47 return "update"
48 case deleteEvent:
49 return "delete"
50 default:
51 return fmt.Sprintf("unknown(%d)", int(e))
52 }
53 }
54
55 const (
56 addEvent eventType = iota
57 updateEvent
58 deleteEvent
59 )
60
61 type event struct {
62 eventType eventType
63 obj interface{}
64 oldObj interface{}
65 gvr schema.GroupVersionResource
66 }
67
68
69 type QuotaMonitor struct {
70
71 monitors monitors
72 monitorLock sync.RWMutex
73
74
75 informersStarted <-chan struct{}
76
77
78
79 stopCh <-chan struct{}
80
81
82
83 running bool
84
85
86 resourceChanges workqueue.RateLimitingInterface
87
88
89 informerFactory informerfactory.InformerFactory
90
91
92 ignoredResources map[schema.GroupResource]struct{}
93
94
95 resyncPeriod controller.ResyncPeriodFunc
96
97
98 replenishmentFunc ReplenishmentFunc
99
100
101 registry quota.Registry
102
103 updateFilter UpdateFilter
104 }
105
106
107 func NewMonitor(informersStarted <-chan struct{}, informerFactory informerfactory.InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry, updateFilter UpdateFilter) *QuotaMonitor {
108 return &QuotaMonitor{
109 informersStarted: informersStarted,
110 informerFactory: informerFactory,
111 ignoredResources: ignoredResources,
112 resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
113 resyncPeriod: resyncPeriod,
114 replenishmentFunc: replenishmentFunc,
115 registry: registry,
116 updateFilter: updateFilter,
117 }
118 }
119
120
121 type monitor struct {
122 controller cache.Controller
123
124
125
126 stopCh chan struct{}
127 }
128
129
130
131 func (m *monitor) Run() {
132 m.controller.Run(m.stopCh)
133 }
134
135 type monitors map[schema.GroupVersionResource]*monitor
136
137
138 type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool
139
140 func (qm *QuotaMonitor) controllerFor(ctx context.Context, resource schema.GroupVersionResource) (cache.Controller, error) {
141 logger := klog.FromContext(ctx)
142
143 handlers := cache.ResourceEventHandlerFuncs{
144 UpdateFunc: func(oldObj, newObj interface{}) {
145 if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) {
146 event := &event{
147 eventType: updateEvent,
148 obj: newObj,
149 oldObj: oldObj,
150 gvr: resource,
151 }
152 qm.resourceChanges.Add(event)
153 }
154 },
155 DeleteFunc: func(obj interface{}) {
156
157 if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
158 obj = deletedFinalStateUnknown.Obj
159 }
160 event := &event{
161 eventType: deleteEvent,
162 obj: obj,
163 gvr: resource,
164 }
165 qm.resourceChanges.Add(event)
166 },
167 }
168 shared, err := qm.informerFactory.ForResource(resource)
169 if err == nil {
170 logger.V(4).Info("QuotaMonitor using a shared informer", "resource", resource.String())
171 shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod())
172 return shared.Informer().GetController(), nil
173 }
174 logger.V(4).Error(err, "QuotaMonitor unable to use a shared informer", "resource", resource.String())
175
176
177
178 return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
179 }
180
181
182
183
184
185
186
187 func (qm *QuotaMonitor) SyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
188 logger := klog.FromContext(ctx)
189
190 qm.monitorLock.Lock()
191 defer qm.monitorLock.Unlock()
192
193 toRemove := qm.monitors
194 if toRemove == nil {
195 toRemove = monitors{}
196 }
197 current := monitors{}
198 var errs []error
199 kept := 0
200 added := 0
201 for resource := range resources {
202 if _, ok := qm.ignoredResources[resource.GroupResource()]; ok {
203 continue
204 }
205 if m, ok := toRemove[resource]; ok {
206 current[resource] = m
207 delete(toRemove, resource)
208 kept++
209 continue
210 }
211 c, err := qm.controllerFor(ctx, resource)
212 if err != nil {
213 errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
214 continue
215 }
216
217
218 evaluator := qm.registry.Get(resource.GroupResource())
219 if evaluator == nil {
220 listerFunc := generic.ListerFuncForResourceFunc(qm.informerFactory.ForResource)
221 listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource)
222 evaluator = generic.NewObjectCountEvaluator(resource.GroupResource(), listResourceFunc, "")
223 qm.registry.Add(evaluator)
224 logger.Info("QuotaMonitor created object count evaluator", "resource", resource.GroupResource())
225 }
226
227
228 current[resource] = &monitor{controller: c}
229 added++
230 }
231 qm.monitors = current
232
233 for _, monitor := range toRemove {
234 if monitor.stopCh != nil {
235 close(monitor.stopCh)
236 }
237 }
238
239 logger.V(4).Info("quota synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
240
241 return utilerrors.NewAggregate(errs)
242 }
243
244
245
246
247
248
249 func (qm *QuotaMonitor) StartMonitors(ctx context.Context) {
250 qm.monitorLock.Lock()
251 defer qm.monitorLock.Unlock()
252
253 if !qm.running {
254 return
255 }
256
257
258
259 <-qm.informersStarted
260
261 monitors := qm.monitors
262 started := 0
263 for _, monitor := range monitors {
264 if monitor.stopCh == nil {
265 monitor.stopCh = make(chan struct{})
266 qm.informerFactory.Start(qm.stopCh)
267 go monitor.Run()
268 started++
269 }
270 }
271 klog.FromContext(ctx).V(4).Info("QuotaMonitor finished starting monitors", "new", started, "total", len(monitors))
272 }
273
274
275
276
277
278 func (qm *QuotaMonitor) IsSynced(ctx context.Context) bool {
279 logger := klog.FromContext(ctx)
280
281 qm.monitorLock.RLock()
282 defer qm.monitorLock.RUnlock()
283
284 if len(qm.monitors) == 0 {
285 logger.V(4).Info("quota monitor not synced: no monitors")
286 return false
287 }
288
289 for resource, monitor := range qm.monitors {
290 if !monitor.controller.HasSynced() {
291 logger.V(4).Info("quota monitor not synced", "resource", resource)
292 return false
293 }
294 }
295 return true
296 }
297
298
299
300 func (qm *QuotaMonitor) Run(ctx context.Context) {
301 defer utilruntime.HandleCrash()
302
303 logger := klog.FromContext(ctx)
304
305 logger.Info("QuotaMonitor running")
306 defer logger.Info("QuotaMonitor stopping")
307
308
309 qm.monitorLock.Lock()
310 qm.stopCh = ctx.Done()
311 qm.running = true
312 qm.monitorLock.Unlock()
313
314
315
316 qm.StartMonitors(ctx)
317
318
319
320 go func() {
321 defer utilruntime.HandleCrash()
322 defer qm.resourceChanges.ShutDown()
323
324 <-ctx.Done()
325 }()
326 wait.UntilWithContext(ctx, qm.runProcessResourceChanges, 1*time.Second)
327
328
329 qm.monitorLock.Lock()
330 defer qm.monitorLock.Unlock()
331 monitors := qm.monitors
332 stopped := 0
333 for _, monitor := range monitors {
334 if monitor.stopCh != nil {
335 stopped++
336 close(monitor.stopCh)
337 }
338 }
339 logger.Info("QuotaMonitor stopped monitors", "stopped", stopped, "total", len(monitors))
340 }
341
342 func (qm *QuotaMonitor) runProcessResourceChanges(ctx context.Context) {
343 for qm.processResourceChanges(ctx) {
344 }
345 }
346
347
348 func (qm *QuotaMonitor) processResourceChanges(ctx context.Context) bool {
349 item, quit := qm.resourceChanges.Get()
350 if quit {
351 return false
352 }
353 defer qm.resourceChanges.Done(item)
354 event, ok := item.(*event)
355 if !ok {
356 utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
357 return true
358 }
359 obj := event.obj
360 accessor, err := meta.Accessor(obj)
361 if err != nil {
362 utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
363 return true
364 }
365 klog.FromContext(ctx).V(4).Info("QuotaMonitor process object",
366 "resource", event.gvr.String(),
367 "namespace", accessor.GetNamespace(),
368 "name", accessor.GetName(),
369 "uid", string(accessor.GetUID()),
370 "eventType", event.eventType,
371 )
372 qm.replenishmentFunc(ctx, event.gvr.GroupResource(), accessor.GetNamespace())
373 return true
374 }
375
View as plain text