1
16
17
18
19
20
21 package deployment
22
23 import (
24 "context"
25 "fmt"
26 "reflect"
27 "time"
28
29 apps "k8s.io/api/apps/v1"
30 v1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/types"
35 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36 "k8s.io/apimachinery/pkg/util/wait"
37 appsinformers "k8s.io/client-go/informers/apps/v1"
38 coreinformers "k8s.io/client-go/informers/core/v1"
39 clientset "k8s.io/client-go/kubernetes"
40 "k8s.io/client-go/kubernetes/scheme"
41 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
42 appslisters "k8s.io/client-go/listers/apps/v1"
43 corelisters "k8s.io/client-go/listers/core/v1"
44 "k8s.io/client-go/tools/cache"
45 "k8s.io/client-go/tools/record"
46 "k8s.io/client-go/util/workqueue"
47 "k8s.io/klog/v2"
48 "k8s.io/kubernetes/pkg/controller"
49 "k8s.io/kubernetes/pkg/controller/deployment/util"
50 )
51
52 const (
53
54
55
56
57
58 maxRetries = 15
59 )
60
61
62 var controllerKind = apps.SchemeGroupVersion.WithKind("Deployment")
63
64
65
66 type DeploymentController struct {
67
68 rsControl controller.RSControlInterface
69 client clientset.Interface
70
71 eventBroadcaster record.EventBroadcaster
72 eventRecorder record.EventRecorder
73
74
75 syncHandler func(ctx context.Context, dKey string) error
76
77 enqueueDeployment func(deployment *apps.Deployment)
78
79
80 dLister appslisters.DeploymentLister
81
82 rsLister appslisters.ReplicaSetLister
83
84 podLister corelisters.PodLister
85
86
87
88 dListerSynced cache.InformerSynced
89
90
91 rsListerSynced cache.InformerSynced
92
93
94 podListerSynced cache.InformerSynced
95
96
97 queue workqueue.RateLimitingInterface
98 }
99
100
101 func NewDeploymentController(ctx context.Context, dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
102 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
103 logger := klog.FromContext(ctx)
104 dc := &DeploymentController{
105 client: client,
106 eventBroadcaster: eventBroadcaster,
107 eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),
108 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
109 }
110 dc.rsControl = controller.RealRSControl{
111 KubeClient: client,
112 Recorder: dc.eventRecorder,
113 }
114
115 dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
116 AddFunc: func(obj interface{}) {
117 dc.addDeployment(logger, obj)
118 },
119 UpdateFunc: func(oldObj, newObj interface{}) {
120 dc.updateDeployment(logger, oldObj, newObj)
121 },
122
123 DeleteFunc: func(obj interface{}) {
124 dc.deleteDeployment(logger, obj)
125 },
126 })
127 rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
128 AddFunc: func(obj interface{}) {
129 dc.addReplicaSet(logger, obj)
130 },
131 UpdateFunc: func(oldObj, newObj interface{}) {
132 dc.updateReplicaSet(logger, oldObj, newObj)
133 },
134 DeleteFunc: func(obj interface{}) {
135 dc.deleteReplicaSet(logger, obj)
136 },
137 })
138 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
139 DeleteFunc: func(obj interface{}) {
140 dc.deletePod(logger, obj)
141 },
142 })
143
144 dc.syncHandler = dc.syncDeployment
145 dc.enqueueDeployment = dc.enqueue
146
147 dc.dLister = dInformer.Lister()
148 dc.rsLister = rsInformer.Lister()
149 dc.podLister = podInformer.Lister()
150 dc.dListerSynced = dInformer.Informer().HasSynced
151 dc.rsListerSynced = rsInformer.Informer().HasSynced
152 dc.podListerSynced = podInformer.Informer().HasSynced
153 return dc, nil
154 }
155
156
157 func (dc *DeploymentController) Run(ctx context.Context, workers int) {
158 defer utilruntime.HandleCrash()
159
160
161 dc.eventBroadcaster.StartStructuredLogging(3)
162 dc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.client.CoreV1().Events("")})
163 defer dc.eventBroadcaster.Shutdown()
164
165 defer dc.queue.ShutDown()
166
167 logger := klog.FromContext(ctx)
168 logger.Info("Starting controller", "controller", "deployment")
169 defer logger.Info("Shutting down controller", "controller", "deployment")
170
171 if !cache.WaitForNamedCacheSync("deployment", ctx.Done(), dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {
172 return
173 }
174
175 for i := 0; i < workers; i++ {
176 go wait.UntilWithContext(ctx, dc.worker, time.Second)
177 }
178
179 <-ctx.Done()
180 }
181
182 func (dc *DeploymentController) addDeployment(logger klog.Logger, obj interface{}) {
183 d := obj.(*apps.Deployment)
184 logger.V(4).Info("Adding deployment", "deployment", klog.KObj(d))
185 dc.enqueueDeployment(d)
186 }
187
188 func (dc *DeploymentController) updateDeployment(logger klog.Logger, old, cur interface{}) {
189 oldD := old.(*apps.Deployment)
190 curD := cur.(*apps.Deployment)
191 logger.V(4).Info("Updating deployment", "deployment", klog.KObj(oldD))
192 dc.enqueueDeployment(curD)
193 }
194
195 func (dc *DeploymentController) deleteDeployment(logger klog.Logger, obj interface{}) {
196 d, ok := obj.(*apps.Deployment)
197 if !ok {
198 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
199 if !ok {
200 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
201 return
202 }
203 d, ok = tombstone.Obj.(*apps.Deployment)
204 if !ok {
205 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Deployment %#v", obj))
206 return
207 }
208 }
209 logger.V(4).Info("Deleting deployment", "deployment", klog.KObj(d))
210 dc.enqueueDeployment(d)
211 }
212
213
214 func (dc *DeploymentController) addReplicaSet(logger klog.Logger, obj interface{}) {
215 rs := obj.(*apps.ReplicaSet)
216
217 if rs.DeletionTimestamp != nil {
218
219
220 dc.deleteReplicaSet(logger, rs)
221 return
222 }
223
224 if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
225 d := dc.resolveControllerRef(rs.Namespace, controllerRef)
226 if d == nil {
227 return
228 }
229 logger.V(4).Info("ReplicaSet added", "replicaSet", klog.KObj(rs))
230 dc.enqueueDeployment(d)
231 return
232 }
233
234
235
236 ds := dc.getDeploymentsForReplicaSet(logger, rs)
237 if len(ds) == 0 {
238 return
239 }
240 logger.V(4).Info("Orphan ReplicaSet added", "replicaSet", klog.KObj(rs))
241 for _, d := range ds {
242 dc.enqueueDeployment(d)
243 }
244 }
245
246
247
248 func (dc *DeploymentController) getDeploymentsForReplicaSet(logger klog.Logger, rs *apps.ReplicaSet) []*apps.Deployment {
249 deployments, err := util.GetDeploymentsForReplicaSet(dc.dLister, rs)
250 if err != nil || len(deployments) == 0 {
251 return nil
252 }
253
254
255
256
257 if len(deployments) > 1 {
258
259
260 logger.V(4).Info("user error! more than one deployment is selecting replica set",
261 "replicaSet", klog.KObj(rs), "labels", rs.Labels, "deployment", klog.KObj(deployments[0]))
262 }
263 return deployments
264 }
265
266
267
268
269
270 func (dc *DeploymentController) updateReplicaSet(logger klog.Logger, old, cur interface{}) {
271 curRS := cur.(*apps.ReplicaSet)
272 oldRS := old.(*apps.ReplicaSet)
273 if curRS.ResourceVersion == oldRS.ResourceVersion {
274
275
276 return
277 }
278
279 curControllerRef := metav1.GetControllerOf(curRS)
280 oldControllerRef := metav1.GetControllerOf(oldRS)
281 controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
282 if controllerRefChanged && oldControllerRef != nil {
283
284 if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {
285 dc.enqueueDeployment(d)
286 }
287 }
288
289 if curControllerRef != nil {
290 d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)
291 if d == nil {
292 return
293 }
294 logger.V(4).Info("ReplicaSet updated", "replicaSet", klog.KObj(curRS))
295 dc.enqueueDeployment(d)
296 return
297 }
298
299
300
301 labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)
302 if labelChanged || controllerRefChanged {
303 ds := dc.getDeploymentsForReplicaSet(logger, curRS)
304 if len(ds) == 0 {
305 return
306 }
307 logger.V(4).Info("Orphan ReplicaSet updated", "replicaSet", klog.KObj(curRS))
308 for _, d := range ds {
309 dc.enqueueDeployment(d)
310 }
311 }
312 }
313
314
315
316
317 func (dc *DeploymentController) deleteReplicaSet(logger klog.Logger, obj interface{}) {
318 rs, ok := obj.(*apps.ReplicaSet)
319
320
321
322
323
324 if !ok {
325 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
326 if !ok {
327 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
328 return
329 }
330 rs, ok = tombstone.Obj.(*apps.ReplicaSet)
331 if !ok {
332 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a ReplicaSet %#v", obj))
333 return
334 }
335 }
336
337 controllerRef := metav1.GetControllerOf(rs)
338 if controllerRef == nil {
339
340 return
341 }
342 d := dc.resolveControllerRef(rs.Namespace, controllerRef)
343 if d == nil {
344 return
345 }
346 logger.V(4).Info("ReplicaSet deleted", "replicaSet", klog.KObj(rs))
347 dc.enqueueDeployment(d)
348 }
349
350
351 func (dc *DeploymentController) deletePod(logger klog.Logger, obj interface{}) {
352 pod, ok := obj.(*v1.Pod)
353
354
355
356
357
358 if !ok {
359 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
360 if !ok {
361 utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
362 return
363 }
364 pod, ok = tombstone.Obj.(*v1.Pod)
365 if !ok {
366 utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
367 return
368 }
369 }
370 d := dc.getDeploymentForPod(logger, pod)
371 if d == nil {
372 return
373 }
374 logger.V(4).Info("Pod deleted", "pod", klog.KObj(pod))
375 if d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {
376
377 rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))
378 if err != nil {
379 return
380 }
381 podMap, err := dc.getPodMapForDeployment(d, rsList)
382 if err != nil {
383 return
384 }
385 numPods := 0
386 for _, podList := range podMap {
387 numPods += len(podList)
388 }
389 if numPods == 0 {
390 dc.enqueueDeployment(d)
391 }
392 }
393 }
394
395 func (dc *DeploymentController) enqueue(deployment *apps.Deployment) {
396 key, err := controller.KeyFunc(deployment)
397 if err != nil {
398 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
399 return
400 }
401
402 dc.queue.Add(key)
403 }
404
405 func (dc *DeploymentController) enqueueRateLimited(deployment *apps.Deployment) {
406 key, err := controller.KeyFunc(deployment)
407 if err != nil {
408 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
409 return
410 }
411
412 dc.queue.AddRateLimited(key)
413 }
414
415
416 func (dc *DeploymentController) enqueueAfter(deployment *apps.Deployment, after time.Duration) {
417 key, err := controller.KeyFunc(deployment)
418 if err != nil {
419 utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", deployment, err))
420 return
421 }
422
423 dc.queue.AddAfter(key, after)
424 }
425
426
427 func (dc *DeploymentController) getDeploymentForPod(logger klog.Logger, pod *v1.Pod) *apps.Deployment {
428
429 var rs *apps.ReplicaSet
430 var err error
431 controllerRef := metav1.GetControllerOf(pod)
432 if controllerRef == nil {
433
434 return nil
435 }
436 if controllerRef.Kind != apps.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
437
438 return nil
439 }
440 rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
441 if err != nil || rs.UID != controllerRef.UID {
442 logger.V(4).Info("Cannot get replicaset for pod", "ownerReference", controllerRef.Name, "pod", klog.KObj(pod), "err", err)
443 return nil
444 }
445
446
447 controllerRef = metav1.GetControllerOf(rs)
448 if controllerRef == nil {
449 return nil
450 }
451 return dc.resolveControllerRef(rs.Namespace, controllerRef)
452 }
453
454
455
456
457 func (dc *DeploymentController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.Deployment {
458
459
460 if controllerRef.Kind != controllerKind.Kind {
461 return nil
462 }
463 d, err := dc.dLister.Deployments(namespace).Get(controllerRef.Name)
464 if err != nil {
465 return nil
466 }
467 if d.UID != controllerRef.UID {
468
469
470 return nil
471 }
472 return d
473 }
474
475
476
477 func (dc *DeploymentController) worker(ctx context.Context) {
478 for dc.processNextWorkItem(ctx) {
479 }
480 }
481
482 func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
483 key, quit := dc.queue.Get()
484 if quit {
485 return false
486 }
487 defer dc.queue.Done(key)
488
489 err := dc.syncHandler(ctx, key.(string))
490 dc.handleErr(ctx, err, key)
491
492 return true
493 }
494
495 func (dc *DeploymentController) handleErr(ctx context.Context, err error, key interface{}) {
496 logger := klog.FromContext(ctx)
497 if err == nil || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
498 dc.queue.Forget(key)
499 return
500 }
501 ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
502 if keyErr != nil {
503 logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
504 }
505
506 if dc.queue.NumRequeues(key) < maxRetries {
507 logger.V(2).Info("Error syncing deployment", "deployment", klog.KRef(ns, name), "err", err)
508 dc.queue.AddRateLimited(key)
509 return
510 }
511
512 utilruntime.HandleError(err)
513 logger.V(2).Info("Dropping deployment out of the queue", "deployment", klog.KRef(ns, name), "err", err)
514 dc.queue.Forget(key)
515 }
516
517
518
519
520 func (dc *DeploymentController) getReplicaSetsForDeployment(ctx context.Context, d *apps.Deployment) ([]*apps.ReplicaSet, error) {
521
522
523 rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything())
524 if err != nil {
525 return nil, err
526 }
527 deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
528 if err != nil {
529 return nil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
530 }
531
532
533 canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
534 fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(ctx, d.Name, metav1.GetOptions{})
535 if err != nil {
536 return nil, err
537 }
538 if fresh.UID != d.UID {
539 return nil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID)
540 }
541 return fresh, nil
542 })
543 cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc)
544 return cm.ClaimReplicaSets(ctx, rsList)
545 }
546
547
548
549
550
551
552
553 func (dc *DeploymentController) getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet) (map[types.UID][]*v1.Pod, error) {
554
555 selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector)
556 if err != nil {
557 return nil, err
558 }
559 pods, err := dc.podLister.Pods(d.Namespace).List(selector)
560 if err != nil {
561 return nil, err
562 }
563
564 podMap := make(map[types.UID][]*v1.Pod, len(rsList))
565 for _, rs := range rsList {
566 podMap[rs.UID] = []*v1.Pod{}
567 }
568 for _, pod := range pods {
569
570
571 controllerRef := metav1.GetControllerOf(pod)
572 if controllerRef == nil {
573 continue
574 }
575
576 if _, ok := podMap[controllerRef.UID]; ok {
577 podMap[controllerRef.UID] = append(podMap[controllerRef.UID], pod)
578 }
579 }
580 return podMap, nil
581 }
582
583
584
585 func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
586 logger := klog.FromContext(ctx)
587 namespace, name, err := cache.SplitMetaNamespaceKey(key)
588 if err != nil {
589 logger.Error(err, "Failed to split meta namespace cache key", "cacheKey", key)
590 return err
591 }
592
593 startTime := time.Now()
594 logger.V(4).Info("Started syncing deployment", "deployment", klog.KRef(namespace, name), "startTime", startTime)
595 defer func() {
596 logger.V(4).Info("Finished syncing deployment", "deployment", klog.KRef(namespace, name), "duration", time.Since(startTime))
597 }()
598
599 deployment, err := dc.dLister.Deployments(namespace).Get(name)
600 if errors.IsNotFound(err) {
601 logger.V(2).Info("Deployment has been deleted", "deployment", klog.KRef(namespace, name))
602 return nil
603 }
604 if err != nil {
605 return err
606 }
607
608
609
610 d := deployment.DeepCopy()
611
612 everything := metav1.LabelSelector{}
613 if reflect.DeepEqual(d.Spec.Selector, &everything) {
614 dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")
615 if d.Status.ObservedGeneration < d.Generation {
616 d.Status.ObservedGeneration = d.Generation
617 dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
618 }
619 return nil
620 }
621
622
623
624 rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
625 if err != nil {
626 return err
627 }
628
629
630
631
632
633 podMap, err := dc.getPodMapForDeployment(d, rsList)
634 if err != nil {
635 return err
636 }
637
638 if d.DeletionTimestamp != nil {
639 return dc.syncStatusOnly(ctx, d, rsList)
640 }
641
642
643
644
645 if err = dc.checkPausedConditions(ctx, d); err != nil {
646 return err
647 }
648
649 if d.Spec.Paused {
650 return dc.sync(ctx, d, rsList)
651 }
652
653
654
655
656 if getRollbackTo(d) != nil {
657 return dc.rollback(ctx, d, rsList)
658 }
659
660 scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
661 if err != nil {
662 return err
663 }
664 if scalingEvent {
665 return dc.sync(ctx, d, rsList)
666 }
667
668 switch d.Spec.Strategy.Type {
669 case apps.RecreateDeploymentStrategyType:
670 return dc.rolloutRecreate(ctx, d, rsList, podMap)
671 case apps.RollingUpdateDeploymentStrategyType:
672 return dc.rolloutRolling(ctx, d, rsList)
673 }
674 return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
675 }
676
View as plain text