1
16
17 package disruption
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 apps "k8s.io/api/apps/v1beta1"
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/api/extensions/v1beta1"
27 policy "k8s.io/api/policy/v1"
28 apiequality "k8s.io/apimachinery/pkg/api/equality"
29 "k8s.io/apimachinery/pkg/api/errors"
30 apimeta "k8s.io/apimachinery/pkg/api/meta"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/util/intstr"
35 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36 "k8s.io/apimachinery/pkg/util/wait"
37 "k8s.io/client-go/discovery"
38 appsv1informers "k8s.io/client-go/informers/apps/v1"
39 coreinformers "k8s.io/client-go/informers/core/v1"
40 policyinformers "k8s.io/client-go/informers/policy/v1"
41 clientset "k8s.io/client-go/kubernetes"
42 "k8s.io/client-go/kubernetes/scheme"
43 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
44 appsv1listers "k8s.io/client-go/listers/apps/v1"
45 corelisters "k8s.io/client-go/listers/core/v1"
46 policylisters "k8s.io/client-go/listers/policy/v1"
47 scaleclient "k8s.io/client-go/scale"
48 "k8s.io/client-go/tools/cache"
49 "k8s.io/client-go/tools/record"
50 "k8s.io/client-go/util/workqueue"
51 pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget"
52 "k8s.io/klog/v2"
53 apipod "k8s.io/kubernetes/pkg/api/v1/pod"
54 "k8s.io/kubernetes/pkg/controller"
55 "k8s.io/utils/clock"
56 )
57
58 const (
59
60
61
62
63
64
65
66
67
68 DeletionTimeout = 2 * time.Minute
69
70
71
72
73
74
75 stalePodDisruptionTimeout = 2 * time.Minute
76 )
77
78 type updater func(context.Context, *policy.PodDisruptionBudget) error
79
80 type DisruptionController struct {
81 kubeClient clientset.Interface
82 mapper apimeta.RESTMapper
83
84 scaleNamespacer scaleclient.ScalesGetter
85 discoveryClient discovery.DiscoveryInterface
86
87 pdbLister policylisters.PodDisruptionBudgetLister
88 pdbListerSynced cache.InformerSynced
89
90 podLister corelisters.PodLister
91 podListerSynced cache.InformerSynced
92
93 rcLister corelisters.ReplicationControllerLister
94 rcListerSynced cache.InformerSynced
95
96 rsLister appsv1listers.ReplicaSetLister
97 rsListerSynced cache.InformerSynced
98
99 dLister appsv1listers.DeploymentLister
100 dListerSynced cache.InformerSynced
101
102 ssLister appsv1listers.StatefulSetLister
103 ssListerSynced cache.InformerSynced
104
105
106 queue workqueue.RateLimitingInterface
107 recheckQueue workqueue.DelayingInterface
108
109
110 stalePodDisruptionQueue workqueue.RateLimitingInterface
111 stalePodDisruptionTimeout time.Duration
112
113 broadcaster record.EventBroadcaster
114 recorder record.EventRecorder
115
116 getUpdater func() updater
117
118 clock clock.Clock
119 }
120
121
122
123 type controllerAndScale struct {
124 types.UID
125 scale int32
126 }
127
128
129
130 type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error)
131
132 func NewDisruptionController(
133 ctx context.Context,
134 podInformer coreinformers.PodInformer,
135 pdbInformer policyinformers.PodDisruptionBudgetInformer,
136 rcInformer coreinformers.ReplicationControllerInformer,
137 rsInformer appsv1informers.ReplicaSetInformer,
138 dInformer appsv1informers.DeploymentInformer,
139 ssInformer appsv1informers.StatefulSetInformer,
140 kubeClient clientset.Interface,
141 restMapper apimeta.RESTMapper,
142 scaleNamespacer scaleclient.ScalesGetter,
143 discoveryClient discovery.DiscoveryInterface,
144 ) *DisruptionController {
145 return NewDisruptionControllerInternal(
146 ctx,
147 podInformer,
148 pdbInformer,
149 rcInformer,
150 rsInformer,
151 dInformer,
152 ssInformer,
153 kubeClient,
154 restMapper,
155 scaleNamespacer,
156 discoveryClient,
157 clock.RealClock{},
158 stalePodDisruptionTimeout)
159 }
160
161
162
163
164 func NewDisruptionControllerInternal(ctx context.Context,
165 podInformer coreinformers.PodInformer,
166 pdbInformer policyinformers.PodDisruptionBudgetInformer,
167 rcInformer coreinformers.ReplicationControllerInformer,
168 rsInformer appsv1informers.ReplicaSetInformer,
169 dInformer appsv1informers.DeploymentInformer,
170 ssInformer appsv1informers.StatefulSetInformer,
171 kubeClient clientset.Interface,
172 restMapper apimeta.RESTMapper,
173 scaleNamespacer scaleclient.ScalesGetter,
174 discoveryClient discovery.DiscoveryInterface,
175 clock clock.WithTicker,
176 stalePodDisruptionTimeout time.Duration,
177 ) *DisruptionController {
178 logger := klog.FromContext(ctx)
179 dc := &DisruptionController{
180 kubeClient: kubeClient,
181 queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()),
182 recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"),
183 stalePodDisruptionQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "stale_pod_disruption"), workqueue.DefaultControllerRateLimiter()),
184 broadcaster: record.NewBroadcaster(record.WithContext(ctx)),
185 stalePodDisruptionTimeout: stalePodDisruptionTimeout,
186 }
187 dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"})
188
189 dc.getUpdater = func() updater { return dc.writePdbStatus }
190
191 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
192 AddFunc: func(obj interface{}) {
193 dc.addPod(logger, obj)
194 },
195 UpdateFunc: func(oldObj, newObj interface{}) {
196 dc.updatePod(logger, oldObj, newObj)
197 },
198 DeleteFunc: func(obj interface{}) {
199 dc.deletePod(logger, obj)
200 },
201 })
202 dc.podLister = podInformer.Lister()
203 dc.podListerSynced = podInformer.Informer().HasSynced
204
205 pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
206 AddFunc: func(obj interface{}) {
207 dc.addDB(logger, obj)
208 },
209 UpdateFunc: func(oldObj, newObj interface{}) {
210 dc.updateDB(logger, oldObj, newObj)
211 },
212 DeleteFunc: func(obj interface{}) {
213 dc.removeDB(logger, obj)
214 },
215 })
216 dc.pdbLister = pdbInformer.Lister()
217 dc.pdbListerSynced = pdbInformer.Informer().HasSynced
218
219 dc.rcLister = rcInformer.Lister()
220 dc.rcListerSynced = rcInformer.Informer().HasSynced
221
222 dc.rsLister = rsInformer.Lister()
223 dc.rsListerSynced = rsInformer.Informer().HasSynced
224
225 dc.dLister = dInformer.Lister()
226 dc.dListerSynced = dInformer.Informer().HasSynced
227
228 dc.ssLister = ssInformer.Lister()
229 dc.ssListerSynced = ssInformer.Informer().HasSynced
230
231 dc.mapper = restMapper
232 dc.scaleNamespacer = scaleNamespacer
233 dc.discoveryClient = discoveryClient
234
235 dc.clock = clock
236
237 return dc
238 }
239
240
241
242
243
244 func (dc *DisruptionController) finders() []podControllerFinder {
245 return []podControllerFinder{dc.getPodReplicationController, dc.getPodDeployment, dc.getPodReplicaSet,
246 dc.getPodStatefulSet, dc.getScaleController}
247 }
248
249 var (
250 controllerKindRS = v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
251 controllerKindSS = apps.SchemeGroupVersion.WithKind("StatefulSet")
252 controllerKindRC = v1.SchemeGroupVersion.WithKind("ReplicationController")
253 controllerKindDep = v1beta1.SchemeGroupVersion.WithKind("Deployment")
254 )
255
256
257 func (dc *DisruptionController) getPodReplicaSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
258 ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
259 if !ok || err != nil {
260 return nil, err
261 }
262 rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
263 if err != nil {
264
265 return nil, nil
266 }
267 if rs.UID != controllerRef.UID {
268 return nil, nil
269 }
270 controllerRef = metav1.GetControllerOf(rs)
271 if controllerRef != nil && controllerRef.Kind == controllerKindDep.Kind {
272
273 return nil, nil
274 }
275 return &controllerAndScale{rs.UID, *(rs.Spec.Replicas)}, nil
276 }
277
278
279 func (dc *DisruptionController) getPodStatefulSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
280 ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"})
281 if !ok || err != nil {
282 return nil, err
283 }
284 ss, err := dc.ssLister.StatefulSets(namespace).Get(controllerRef.Name)
285 if err != nil {
286
287 return nil, nil
288 }
289 if ss.UID != controllerRef.UID {
290 return nil, nil
291 }
292
293 return &controllerAndScale{ss.UID, *(ss.Spec.Replicas)}, nil
294 }
295
296
297 func (dc *DisruptionController) getPodDeployment(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
298 ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"})
299 if !ok || err != nil {
300 return nil, err
301 }
302 rs, err := dc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
303 if err != nil {
304
305 return nil, nil
306 }
307 if rs.UID != controllerRef.UID {
308 return nil, nil
309 }
310 controllerRef = metav1.GetControllerOf(rs)
311 if controllerRef == nil {
312 return nil, nil
313 }
314
315 ok, err = verifyGroupKind(controllerRef, controllerKindDep.Kind, []string{"apps", "extensions"})
316 if !ok || err != nil {
317 return nil, err
318 }
319 deployment, err := dc.dLister.Deployments(rs.Namespace).Get(controllerRef.Name)
320 if err != nil {
321
322 return nil, nil
323 }
324 if deployment.UID != controllerRef.UID {
325 return nil, nil
326 }
327 return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil
328 }
329
330 func (dc *DisruptionController) getPodReplicationController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
331 ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""})
332 if !ok || err != nil {
333 return nil, err
334 }
335 rc, err := dc.rcLister.ReplicationControllers(namespace).Get(controllerRef.Name)
336 if err != nil {
337
338 return nil, nil
339 }
340 if rc.UID != controllerRef.UID {
341 return nil, nil
342 }
343 return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil
344 }
345
346 func (dc *DisruptionController) getScaleController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) {
347 gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
348 if err != nil {
349 return nil, err
350 }
351
352 gk := schema.GroupKind{
353 Group: gv.Group,
354 Kind: controllerRef.Kind,
355 }
356
357 mapping, err := dc.mapper.RESTMapping(gk, gv.Version)
358 if err != nil {
359 return nil, err
360 }
361 gr := mapping.Resource.GroupResource()
362 scale, err := dc.scaleNamespacer.Scales(namespace).Get(ctx, gr, controllerRef.Name, metav1.GetOptions{})
363 if err != nil {
364 if errors.IsNotFound(err) {
365
366
367
368 isScale, err := dc.implementsScale(mapping.Resource)
369 if err != nil {
370 return nil, err
371 }
372 if !isScale {
373 return nil, fmt.Errorf("%s does not implement the scale subresource", gr.String())
374 }
375 return nil, nil
376 }
377 return nil, err
378 }
379 if scale.UID != controllerRef.UID {
380 return nil, nil
381 }
382 return &controllerAndScale{scale.UID, scale.Spec.Replicas}, nil
383 }
384
385 func (dc *DisruptionController) implementsScale(gvr schema.GroupVersionResource) (bool, error) {
386 resourceList, err := dc.discoveryClient.ServerResourcesForGroupVersion(gvr.GroupVersion().String())
387 if err != nil {
388 return false, err
389 }
390
391 scaleSubresourceName := fmt.Sprintf("%s/scale", gvr.Resource)
392 for _, resource := range resourceList.APIResources {
393 if resource.Name != scaleSubresourceName {
394 continue
395 }
396
397 for _, scaleGv := range scaleclient.NewScaleConverter().ScaleVersions() {
398 if resource.Group == scaleGv.Group &&
399 resource.Version == scaleGv.Version &&
400 resource.Kind == "Scale" {
401 return true, nil
402 }
403 }
404 }
405 return false, nil
406 }
407
408 func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, expectedGroups []string) (bool, error) {
409 gv, err := schema.ParseGroupVersion(controllerRef.APIVersion)
410 if err != nil {
411 return false, err
412 }
413
414 if controllerRef.Kind != expectedKind {
415 return false, nil
416 }
417
418 for _, group := range expectedGroups {
419 if group == gv.Group {
420 return true, nil
421 }
422 }
423
424 return false, nil
425 }
426
427 func (dc *DisruptionController) Run(ctx context.Context) {
428 defer utilruntime.HandleCrash()
429
430 logger := klog.FromContext(ctx)
431
432 if dc.kubeClient != nil {
433 logger.Info("Sending events to api server.")
434 dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")})
435 } else {
436 logger.Info("No api server defined - no events will be sent to API server.")
437 }
438 defer dc.broadcaster.Shutdown()
439
440 defer dc.queue.ShutDown()
441 defer dc.recheckQueue.ShutDown()
442 defer dc.stalePodDisruptionQueue.ShutDown()
443
444 logger.Info("Starting disruption controller")
445 defer logger.Info("Shutting down disruption controller")
446
447 if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) {
448 return
449 }
450
451 go wait.UntilWithContext(ctx, dc.worker, time.Second)
452 go wait.Until(dc.recheckWorker, time.Second, ctx.Done())
453 go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second)
454
455 <-ctx.Done()
456 }
457
458 func (dc *DisruptionController) addDB(logger klog.Logger, obj interface{}) {
459 pdb := obj.(*policy.PodDisruptionBudget)
460 logger.V(4).Info("Add DB", "podDisruptionBudget", klog.KObj(pdb))
461 dc.enqueuePdb(logger, pdb)
462 }
463
464 func (dc *DisruptionController) updateDB(logger klog.Logger, old, cur interface{}) {
465
466 pdb := cur.(*policy.PodDisruptionBudget)
467 logger.V(4).Info("Update DB", "podDisruptionBudget", klog.KObj(pdb))
468 dc.enqueuePdb(logger, pdb)
469 }
470
471 func (dc *DisruptionController) removeDB(logger klog.Logger, obj interface{}) {
472 pdb, ok := obj.(*policy.PodDisruptionBudget)
473 if !ok {
474 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
475 if !ok {
476 logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
477 return
478 }
479 pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget)
480 if !ok {
481 logger.Error(nil, "Tombstone contained object that is not a PDB", "obj", obj)
482 return
483 }
484 }
485 logger.V(4).Info("Remove DB", "podDisruptionBudget", klog.KObj(pdb))
486 dc.enqueuePdb(logger, pdb)
487 }
488
489 func (dc *DisruptionController) addPod(logger klog.Logger, obj interface{}) {
490 pod := obj.(*v1.Pod)
491 logger.V(4).Info("AddPod called on pod", "pod", klog.KObj(pod))
492 pdb := dc.getPdbForPod(logger, pod)
493 if pdb == nil {
494 logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
495 } else {
496 logger.V(4).Info("addPod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
497 dc.enqueuePdb(logger, pdb)
498 }
499 if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
500 dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
501 }
502 }
503
504 func (dc *DisruptionController) updatePod(logger klog.Logger, _, cur interface{}) {
505 pod := cur.(*v1.Pod)
506 logger.V(4).Info("UpdatePod called on pod", "pod", klog.KObj(pod))
507 pdb := dc.getPdbForPod(logger, pod)
508 if pdb == nil {
509 logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
510 } else {
511 logger.V(4).Info("updatePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
512 dc.enqueuePdb(logger, pdb)
513 }
514 if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has {
515 dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
516 }
517 }
518
519 func (dc *DisruptionController) deletePod(logger klog.Logger, obj interface{}) {
520 pod, ok := obj.(*v1.Pod)
521
522
523
524
525
526 if !ok {
527 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
528 if !ok {
529 logger.Error(nil, "Couldn't get object from tombstone", "obj", obj)
530 return
531 }
532 pod, ok = tombstone.Obj.(*v1.Pod)
533 if !ok {
534 logger.Error(nil, "Tombstone contained object that is not a pod", "obj", obj)
535 return
536 }
537 }
538 logger.V(4).Info("DeletePod called on pod", "pod", klog.KObj(pod))
539 pdb := dc.getPdbForPod(logger, pod)
540 if pdb == nil {
541 logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod))
542 return
543 }
544 logger.V(4).Info("DeletePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb))
545 dc.enqueuePdb(logger, pdb)
546 }
547
548 func (dc *DisruptionController) enqueuePdb(logger klog.Logger, pdb *policy.PodDisruptionBudget) {
549 key, err := controller.KeyFunc(pdb)
550 if err != nil {
551 logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
552 return
553 }
554 dc.queue.Add(key)
555 }
556
557 func (dc *DisruptionController) enqueuePdbForRecheck(logger klog.Logger, pdb *policy.PodDisruptionBudget, delay time.Duration) {
558 key, err := controller.KeyFunc(pdb)
559 if err != nil {
560 logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb))
561 return
562 }
563 dc.recheckQueue.AddAfter(key, delay)
564 }
565
566 func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(logger klog.Logger, pod *v1.Pod, d time.Duration) {
567 key, err := controller.KeyFunc(pod)
568 if err != nil {
569 logger.Error(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod))
570 return
571 }
572 dc.stalePodDisruptionQueue.AddAfter(key, d)
573 logger.V(4).Info("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod))
574 }
575
576 func (dc *DisruptionController) getPdbForPod(logger klog.Logger, pod *v1.Pod) *policy.PodDisruptionBudget {
577
578
579
580 pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
581 if err != nil {
582 logger.V(4).Info("No PodDisruptionBudgets found for pod, PodDisruptionBudget controller will avoid syncing.", "pod", klog.KObj(pod))
583 return nil
584 }
585
586 if len(pdbs) > 1 {
587 msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
588 logger.Info(msg)
589 dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
590 }
591 return pdbs[0]
592 }
593
594
595
596 func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*v1.Pod, error) {
597 sel, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
598 if err != nil {
599 return []*v1.Pod{}, err
600 }
601 pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
602 if err != nil {
603 return []*v1.Pod{}, err
604 }
605 return pods, nil
606 }
607
608 func (dc *DisruptionController) worker(ctx context.Context) {
609 for dc.processNextWorkItem(ctx) {
610 }
611 }
612
613 func (dc *DisruptionController) processNextWorkItem(ctx context.Context) bool {
614 dKey, quit := dc.queue.Get()
615 if quit {
616 return false
617 }
618 defer dc.queue.Done(dKey)
619
620 err := dc.sync(ctx, dKey.(string))
621 if err == nil {
622 dc.queue.Forget(dKey)
623 return true
624 }
625
626 utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
627 dc.queue.AddRateLimited(dKey)
628
629 return true
630 }
631
632 func (dc *DisruptionController) recheckWorker() {
633 for dc.processNextRecheckWorkItem() {
634 }
635 }
636
637 func (dc *DisruptionController) processNextRecheckWorkItem() bool {
638 dKey, quit := dc.recheckQueue.Get()
639 if quit {
640 return false
641 }
642 defer dc.recheckQueue.Done(dKey)
643 dc.queue.AddRateLimited(dKey)
644 return true
645 }
646
647 func (dc *DisruptionController) stalePodDisruptionWorker(ctx context.Context) {
648 for dc.processNextStalePodDisruptionWorkItem(ctx) {
649 }
650 }
651
652 func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx context.Context) bool {
653 key, quit := dc.stalePodDisruptionQueue.Get()
654 if quit {
655 return false
656 }
657 defer dc.stalePodDisruptionQueue.Done(key)
658 err := dc.syncStalePodDisruption(ctx, key.(string))
659 if err == nil {
660 dc.stalePodDisruptionQueue.Forget(key)
661 return true
662 }
663 utilruntime.HandleError(fmt.Errorf("error syncing Pod %v to clear DisruptionTarget condition, requeueing: %v", key.(string), err))
664 dc.stalePodDisruptionQueue.AddRateLimited(key)
665 return true
666 }
667
668 func (dc *DisruptionController) sync(ctx context.Context, key string) error {
669 logger := klog.FromContext(ctx)
670 startTime := dc.clock.Now()
671 defer func() {
672 logger.V(4).Info("Finished syncing PodDisruptionBudget", "key", key, "duration", dc.clock.Since(startTime))
673 }()
674
675 namespace, name, err := cache.SplitMetaNamespaceKey(key)
676 if err != nil {
677 return err
678 }
679 pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name)
680 if errors.IsNotFound(err) {
681 logger.V(4).Info("podDisruptionBudget has been deleted", "key", key)
682 return nil
683 }
684 if err != nil {
685 return err
686 }
687
688 err = dc.trySync(ctx, pdb)
689
690
691 if errors.IsConflict(err) {
692 return err
693 }
694 if err != nil {
695 logger.Error(err, "Failed to sync PDB", "podDisruptionBudget", klog.KRef(namespace, name))
696 return dc.failSafe(ctx, pdb, err)
697 }
698
699 return nil
700 }
701
702 func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
703 logger := klog.FromContext(ctx)
704 pods, err := dc.getPodsForPdb(pdb)
705 if err != nil {
706 dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err)
707 return err
708 }
709 if len(pods) == 0 {
710 dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found")
711 }
712
713 expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(ctx, pdb, pods)
714 if err != nil {
715 dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err)
716 return err
717 }
718
719 if len(unmanagedPods) > 0 {
720 logger.V(4).Info("Found unmanaged pods associated with this PDB", "pods", unmanagedPods)
721 dc.recorder.Eventf(pdb, v1.EventTypeWarning, "UnmanagedPods", "Pods selected by this PodDisruptionBudget (selector: %v) were found "+
722 "to be unmanaged. As a result, the status of the PDB cannot be calculated correctly, which may result in undefined behavior. "+
723 "To account for these pods please set \".spec.minAvailable\" "+
724 "field of the PDB to an integer value.", pdb.Spec.Selector)
725 }
726
727 currentTime := dc.clock.Now()
728 disruptedPods, recheckTime := dc.buildDisruptedPodMap(logger, pods, pdb, currentTime)
729 currentHealthy := countHealthyPods(pods, disruptedPods, currentTime)
730 err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods)
731
732 if err == nil && recheckTime != nil {
733
734
735
736 dc.enqueuePdbForRecheck(logger, pdb, recheckTime.Sub(currentTime))
737 }
738 return err
739 }
740
741 func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error {
742 logger := klog.FromContext(ctx)
743 startTime := dc.clock.Now()
744 namespace, name, err := cache.SplitMetaNamespaceKey(key)
745 if err != nil {
746 return err
747 }
748 defer func() {
749 logger.V(4).Info("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime))
750 }()
751 pod, err := dc.podLister.Pods(namespace).Get(name)
752 if errors.IsNotFound(err) {
753 logger.V(4).Info("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod))
754 return nil
755 }
756 if err != nil {
757 return err
758 }
759
760 hasCond, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod)
761 if !hasCond {
762 return nil
763 }
764 if cleanAfter > 0 {
765 dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter)
766 return nil
767 }
768
769 newPod := pod.DeepCopy()
770 updated := apipod.UpdatePodCondition(&newPod.Status, &v1.PodCondition{
771 Type: v1.DisruptionTarget,
772 Status: v1.ConditionFalse,
773 })
774 if !updated {
775 return nil
776 }
777 if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, newPod, metav1.UpdateOptions{}); err != nil {
778 return err
779 }
780 logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod))
781 return nil
782 }
783
784 func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) {
785 err = nil
786
787
788
789
790
791 if pdb.Spec.MaxUnavailable != nil {
792 expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods)
793 if err != nil {
794 return
795 }
796 var maxUnavailable int
797 maxUnavailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, int(expectedCount), true)
798 if err != nil {
799 return
800 }
801 desiredHealthy = expectedCount - int32(maxUnavailable)
802 if desiredHealthy < 0 {
803 desiredHealthy = 0
804 }
805 } else if pdb.Spec.MinAvailable != nil {
806 if pdb.Spec.MinAvailable.Type == intstr.Int {
807 desiredHealthy = pdb.Spec.MinAvailable.IntVal
808 expectedCount = int32(len(pods))
809 } else if pdb.Spec.MinAvailable.Type == intstr.String {
810 expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods)
811 if err != nil {
812 return
813 }
814
815 var minAvailable int
816 minAvailable, err = intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MinAvailable, int(expectedCount), true)
817 if err != nil {
818 return
819 }
820 desiredHealthy = int32(minAvailable)
821 }
822 }
823 return
824 }
825
826 func (dc *DisruptionController) getExpectedScale(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) {
827
828
829
830
831
832
833
834
835
836
837
838
839 controllerScale := map[types.UID]int32{}
840
841
842
843
844
845
846
847
848
849 for _, pod := range pods {
850 controllerRef := metav1.GetControllerOf(pod)
851 if controllerRef == nil {
852 unmanagedPods = append(unmanagedPods, pod.Name)
853 continue
854 }
855
856
857 if _, found := controllerScale[controllerRef.UID]; found {
858 continue
859 }
860
861
862 foundController := false
863 for _, finder := range dc.finders() {
864 var controllerNScale *controllerAndScale
865 controllerNScale, err = finder(ctx, controllerRef, pod.Namespace)
866 if err != nil {
867 return
868 }
869 if controllerNScale != nil {
870 controllerScale[controllerNScale.UID] = controllerNScale.scale
871 foundController = true
872 break
873 }
874 }
875 if !foundController {
876 err = fmt.Errorf("found no controllers for pod %q", pod.Name)
877 return
878 }
879 }
880
881
882 expectedCount = 0
883 for _, count := range controllerScale {
884 expectedCount += count
885 }
886
887 return
888 }
889
890 func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, currentTime time.Time) (currentHealthy int32) {
891 for _, pod := range pods {
892
893 if pod.DeletionTimestamp != nil {
894 continue
895 }
896
897 if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) {
898 continue
899 }
900 if apipod.IsPodReady(pod) {
901 currentHealthy++
902 }
903 }
904
905 return
906 }
907
908
909
910 func (dc *DisruptionController) buildDisruptedPodMap(logger klog.Logger, pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) {
911 disruptedPods := pdb.Status.DisruptedPods
912 result := make(map[string]metav1.Time)
913 var recheckTime *time.Time
914
915 if disruptedPods == nil {
916 return result, recheckTime
917 }
918 for _, pod := range pods {
919 if pod.DeletionTimestamp != nil {
920
921 continue
922 }
923 disruptionTime, found := disruptedPods[pod.Name]
924 if !found {
925
926 continue
927 }
928 expectedDeletion := disruptionTime.Time.Add(DeletionTimeout)
929 if expectedDeletion.Before(currentTime) {
930 logger.V(1).Info("pod was expected to be deleted but it wasn't, updating PDB",
931 "pod", klog.KObj(pod), "deletionTime", disruptionTime, "podDisruptionBudget", klog.KObj(pdb))
932 dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't",
933 pdb.Namespace, pdb.Namespace)
934 } else {
935 if recheckTime == nil || expectedDeletion.Before(*recheckTime) {
936 recheckTime = &expectedDeletion
937 }
938 result[pod.Name] = disruptionTime
939 }
940 }
941 return result, recheckTime
942 }
943
944
945
946
947
948
949 func (dc *DisruptionController) failSafe(ctx context.Context, pdb *policy.PodDisruptionBudget, err error) error {
950 newPdb := pdb.DeepCopy()
951 newPdb.Status.DisruptionsAllowed = 0
952
953 if newPdb.Status.Conditions == nil {
954 newPdb.Status.Conditions = make([]metav1.Condition, 0)
955 }
956 apimeta.SetStatusCondition(&newPdb.Status.Conditions, metav1.Condition{
957 Type: policy.DisruptionAllowedCondition,
958 Status: metav1.ConditionFalse,
959 Reason: policy.SyncFailedReason,
960 Message: err.Error(),
961 ObservedGeneration: newPdb.Status.ObservedGeneration,
962 })
963
964 return dc.getUpdater()(ctx, newPdb)
965 }
966
967 func (dc *DisruptionController) updatePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32,
968 disruptedPods map[string]metav1.Time) error {
969
970
971
972
973
974 disruptionsAllowed := currentHealthy - desiredHealthy
975 if expectedCount <= 0 || disruptionsAllowed <= 0 {
976 disruptionsAllowed = 0
977 }
978
979 if pdb.Status.CurrentHealthy == currentHealthy &&
980 pdb.Status.DesiredHealthy == desiredHealthy &&
981 pdb.Status.ExpectedPods == expectedCount &&
982 pdb.Status.DisruptionsAllowed == disruptionsAllowed &&
983 apiequality.Semantic.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) &&
984 pdb.Status.ObservedGeneration == pdb.Generation &&
985 pdbhelper.ConditionsAreUpToDate(pdb) {
986 return nil
987 }
988
989 newPdb := pdb.DeepCopy()
990 newPdb.Status = policy.PodDisruptionBudgetStatus{
991 CurrentHealthy: currentHealthy,
992 DesiredHealthy: desiredHealthy,
993 ExpectedPods: expectedCount,
994 DisruptionsAllowed: disruptionsAllowed,
995 DisruptedPods: disruptedPods,
996 ObservedGeneration: pdb.Generation,
997 Conditions: newPdb.Status.Conditions,
998 }
999
1000 pdbhelper.UpdateDisruptionAllowedCondition(newPdb)
1001
1002 return dc.getUpdater()(ctx, newPdb)
1003 }
1004
1005 func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget) error {
1006
1007
1008 _, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{})
1009 return err
1010 }
1011
1012 func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod *v1.Pod) (bool, time.Duration) {
1013 if pod.DeletionTimestamp != nil {
1014 return false, 0
1015 }
1016 _, cond := apipod.GetPodCondition(&pod.Status, v1.DisruptionTarget)
1017
1018
1019
1020 if cond == nil || cond.Status != v1.ConditionTrue || cond.Reason == v1.PodReasonTerminationByKubelet || apipod.IsPodPhaseTerminal(pod.Status.Phase) {
1021 return false, 0
1022 }
1023 waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time)
1024 if waitFor < 0 {
1025 waitFor = 0
1026 }
1027 return true, waitFor
1028 }
1029
View as plain text