1
16
17 package deployment
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sort"
24 "strconv"
25
26 apps "k8s.io/api/apps/v1"
27 v1 "k8s.io/api/core/v1"
28 "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/klog/v2"
31 "k8s.io/kubernetes/pkg/controller"
32 deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
33 labelsutil "k8s.io/kubernetes/pkg/util/labels"
34 )
35
36
37 func (dc *DeploymentController) syncStatusOnly(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
38 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
39 if err != nil {
40 return err
41 }
42
43 allRSs := append(oldRSs, newRS)
44 return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
45 }
46
47
48
49 func (dc *DeploymentController) sync(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
50 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
51 if err != nil {
52 return err
53 }
54 if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
55
56
57 return err
58 }
59
60
61 if d.Spec.Paused && getRollbackTo(d) == nil {
62 if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
63 return err
64 }
65 }
66
67 allRSs := append(oldRSs, newRS)
68 return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
69 }
70
71
72
73
74 func (dc *DeploymentController) checkPausedConditions(ctx context.Context, d *apps.Deployment) error {
75 if !deploymentutil.HasProgressDeadline(d) {
76 return nil
77 }
78 cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
79 if cond != nil && cond.Reason == deploymentutil.TimedOutReason {
80
81 return nil
82 }
83 pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
84
85 needsUpdate := false
86 if d.Spec.Paused && !pausedCondExists {
87 condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.PausedDeployReason, "Deployment is paused")
88 deploymentutil.SetDeploymentCondition(&d.Status, *condition)
89 needsUpdate = true
90 } else if !d.Spec.Paused && pausedCondExists {
91 condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionUnknown, deploymentutil.ResumedDeployReason, "Deployment is resumed")
92 deploymentutil.SetDeploymentCondition(&d.Status, *condition)
93 needsUpdate = true
94 }
95
96 if !needsUpdate {
97 return nil
98 }
99
100 var err error
101 _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
102 return err
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116 func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, []*apps.ReplicaSet, error) {
117 _, allOldRSs := deploymentutil.FindOldReplicaSets(d, rsList)
118
119
120 newRS, err := dc.getNewReplicaSet(ctx, d, rsList, allOldRSs, createIfNotExisted)
121 if err != nil {
122 return nil, nil, err
123 }
124
125 return newRS, allOldRSs, nil
126 }
127
128 const (
129
130 maxRevHistoryLengthInChars = 2000
131 )
132
133
134
135
136
137
138 func (dc *DeploymentController) getNewReplicaSet(ctx context.Context, d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
139 logger := klog.FromContext(ctx)
140 existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)
141
142
143 maxOldRevision := deploymentutil.MaxRevision(logger, oldRSs)
144
145 newRevision := strconv.FormatInt(maxOldRevision+1, 10)
146
147
148
149
150
151 if existingNewRS != nil {
152 rsCopy := existingNewRS.DeepCopy()
153
154
155 annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(ctx, d, rsCopy, newRevision, true, maxRevHistoryLengthInChars)
156 minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
157 if annotationsUpdated || minReadySecondsNeedsUpdate {
158 rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
159 return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
160 }
161
162
163 needsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])
164
165
166
167 cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)
168 if deploymentutil.HasProgressDeadline(d) && cond == nil {
169 msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)
170 condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)
171 deploymentutil.SetDeploymentCondition(&d.Status, *condition)
172 needsUpdate = true
173 }
174
175 if needsUpdate {
176 var err error
177 if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
178 return nil, err
179 }
180 }
181 return rsCopy, nil
182 }
183
184 if !createIfNotExisted {
185 return nil, nil
186 }
187
188
189 newRSTemplate := *d.Spec.Template.DeepCopy()
190 podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)
191 newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
192
193 newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
194
195
196 newRS := apps.ReplicaSet{
197 ObjectMeta: metav1.ObjectMeta{
198
199 Name: d.Name + "-" + podTemplateSpecHash,
200 Namespace: d.Namespace,
201 OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)},
202 Labels: newRSTemplate.Labels,
203 },
204 Spec: apps.ReplicaSetSpec{
205 Replicas: new(int32),
206 MinReadySeconds: d.Spec.MinReadySeconds,
207 Selector: newRSSelector,
208 Template: newRSTemplate,
209 },
210 }
211 allRSs := append(oldRSs, &newRS)
212 newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)
213 if err != nil {
214 return nil, err
215 }
216
217 *(newRS.Spec.Replicas) = newReplicasCount
218
219 deploymentutil.SetNewReplicaSetAnnotations(ctx, d, &newRS, newRevision, false, maxRevHistoryLengthInChars)
220
221
222
223 alreadyExists := false
224 createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
225 switch {
226
227 case errors.IsAlreadyExists(err):
228 alreadyExists = true
229
230
231 rs, rsErr := dc.rsLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
232 if rsErr != nil {
233 return nil, rsErr
234 }
235
236
237
238
239
240 controllerRef := metav1.GetControllerOf(rs)
241 if controllerRef != nil && controllerRef.UID == d.UID && deploymentutil.EqualIgnoreHash(&d.Spec.Template, &rs.Spec.Template) {
242 createdRS = rs
243 err = nil
244 break
245 }
246
247
248
249 if d.Status.CollisionCount == nil {
250 d.Status.CollisionCount = new(int32)
251 }
252 preCollisionCount := *d.Status.CollisionCount
253 *d.Status.CollisionCount++
254
255
256 _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
257 if dErr == nil {
258 logger.V(2).Info("Found a hash collision for deployment - bumping collisionCount to resolve it", "deployment", klog.KObj(d), "oldCollisionCount", preCollisionCount, "newCollisionCount", *d.Status.CollisionCount)
259 }
260 return nil, err
261 case errors.HasStatusCause(err, v1.NamespaceTerminatingCause):
262
263 return nil, err
264 case err != nil:
265 msg := fmt.Sprintf("Failed to create new replica set %q: %v", newRS.Name, err)
266 if deploymentutil.HasProgressDeadline(d) {
267 cond := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionFalse, deploymentutil.FailedRSCreateReason, msg)
268 deploymentutil.SetDeploymentCondition(&d.Status, *cond)
269
270
271
272 _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
273 }
274 dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
275 return nil, err
276 }
277 if !alreadyExists && newReplicasCount > 0 {
278 dc.eventRecorder.Eventf(d, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
279 }
280
281 needsUpdate := deploymentutil.SetDeploymentRevision(d, newRevision)
282 if !alreadyExists && deploymentutil.HasProgressDeadline(d) {
283 msg := fmt.Sprintf("Created new replica set %q", createdRS.Name)
284 condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.NewReplicaSetReason, msg)
285 deploymentutil.SetDeploymentCondition(&d.Status, *condition)
286 needsUpdate = true
287 }
288 if needsUpdate {
289 _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
290 }
291 return createdRS, err
292 }
293
294
295
296
297
298
299 func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
300
301
302 if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
303 if *(activeOrLatest.Spec.Replicas) == *(deployment.Spec.Replicas) {
304 return nil
305 }
306 _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, activeOrLatest, *(deployment.Spec.Replicas), deployment)
307 return err
308 }
309
310
311
312 if deploymentutil.IsSaturated(deployment, newRS) {
313 for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
314 if _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, old, 0, deployment); err != nil {
315 return err
316 }
317 }
318 return nil
319 }
320
321
322
323
324 if deploymentutil.IsRollingUpdate(deployment) {
325 allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
326 allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
327
328 allowedSize := int32(0)
329 if *(deployment.Spec.Replicas) > 0 {
330 allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
331 }
332
333
334
335
336 deploymentReplicasToAdd := allowedSize - allRSsReplicas
337
338
339
340
341
342
343 var scalingOperation string
344 switch {
345 case deploymentReplicasToAdd > 0:
346 sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
347 scalingOperation = "up"
348
349 case deploymentReplicasToAdd < 0:
350 sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
351 scalingOperation = "down"
352 }
353
354
355
356
357 deploymentReplicasAdded := int32(0)
358 nameToSize := make(map[string]int32)
359 logger := klog.FromContext(ctx)
360 for i := range allRSs {
361 rs := allRSs[i]
362
363
364
365 if deploymentReplicasToAdd != 0 {
366 proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
367
368 nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
369 deploymentReplicasAdded += proportion
370 } else {
371 nameToSize[rs.Name] = *(rs.Spec.Replicas)
372 }
373 }
374
375
376 for i := range allRSs {
377 rs := allRSs[i]
378
379
380 if i == 0 && deploymentReplicasToAdd != 0 {
381 leftover := deploymentReplicasToAdd - deploymentReplicasAdded
382 nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
383 if nameToSize[rs.Name] < 0 {
384 nameToSize[rs.Name] = 0
385 }
386 }
387
388
389 if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
390
391 return err
392 }
393 }
394 }
395 return nil
396 }
397
398 func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment) (bool, *apps.ReplicaSet, error) {
399
400 if *(rs.Spec.Replicas) == newScale {
401 return false, rs, nil
402 }
403 var scalingOperation string
404 if *(rs.Spec.Replicas) < newScale {
405 scalingOperation = "up"
406 } else {
407 scalingOperation = "down"
408 }
409 scaled, newRS, err := dc.scaleReplicaSet(ctx, rs, newScale, deployment, scalingOperation)
410 return scaled, newRS, err
411 }
412
413 func (dc *DeploymentController) scaleReplicaSet(ctx context.Context, rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
414
415 sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
416
417 annotationsNeedUpdate := deploymentutil.ReplicasAnnotationsNeedUpdate(rs, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
418
419 scaled := false
420 var err error
421 if sizeNeedsUpdate || annotationsNeedUpdate {
422 oldScale := *(rs.Spec.Replicas)
423 rsCopy := rs.DeepCopy()
424 *(rsCopy.Spec.Replicas) = newScale
425 deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
426 rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
427 if err == nil && sizeNeedsUpdate {
428 scaled = true
429 dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d from %d", scalingOperation, rs.Name, newScale, oldScale)
430 }
431 }
432 return scaled, rs, err
433 }
434
435
436
437
438 func (dc *DeploymentController) cleanupDeployment(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
439 logger := klog.FromContext(ctx)
440 if !deploymentutil.HasRevisionHistoryLimit(deployment) {
441 return nil
442 }
443
444
445 aliveFilter := func(rs *apps.ReplicaSet) bool {
446 return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
447 }
448 cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter)
449
450 diff := int32(len(cleanableRSes)) - *deployment.Spec.RevisionHistoryLimit
451 if diff <= 0 {
452 return nil
453 }
454
455 sort.Sort(deploymentutil.ReplicaSetsByRevision(cleanableRSes))
456 logger.V(4).Info("Looking to cleanup old replica sets for deployment", "deployment", klog.KObj(deployment))
457
458 for i := int32(0); i < diff; i++ {
459 rs := cleanableRSes[i]
460
461 if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
462 continue
463 }
464 logger.V(4).Info("Trying to cleanup replica set for deployment", "replicaSet", klog.KObj(rs), "deployment", klog.KObj(deployment))
465 if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
466
467
468 return err
469 }
470 }
471
472 return nil
473 }
474
475
476 func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
477 newStatus := calculateStatus(allRSs, newRS, d)
478
479 if reflect.DeepEqual(d.Status, newStatus) {
480 return nil
481 }
482
483 newDeployment := d
484 newDeployment.Status = newStatus
485 _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
486 return err
487 }
488
489
490 func calculateStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) apps.DeploymentStatus {
491 availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
492 totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
493 unavailableReplicas := totalReplicas - availableReplicas
494
495
496 if unavailableReplicas < 0 {
497 unavailableReplicas = 0
498 }
499
500 status := apps.DeploymentStatus{
501
502 ObservedGeneration: deployment.Generation,
503 Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
504 UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*apps.ReplicaSet{newRS}),
505 ReadyReplicas: deploymentutil.GetReadyReplicaCountForReplicaSets(allRSs),
506 AvailableReplicas: availableReplicas,
507 UnavailableReplicas: unavailableReplicas,
508 CollisionCount: deployment.Status.CollisionCount,
509 }
510
511
512 conditions := deployment.Status.Conditions
513 for i := range conditions {
514 status.Conditions = append(status.Conditions, conditions[i])
515 }
516
517 if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
518 minAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
519 deploymentutil.SetDeploymentCondition(&status, *minAvailability)
520 } else {
521 noMinAvailability := deploymentutil.NewDeploymentCondition(apps.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
522 deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
523 }
524
525 return status
526 }
527
528
529
530
531
532 func (dc *DeploymentController) isScalingEvent(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) (bool, error) {
533 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
534 if err != nil {
535 return false, err
536 }
537 allRSs := append(oldRSs, newRS)
538 logger := klog.FromContext(ctx)
539 for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
540 desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
541 if !ok {
542 continue
543 }
544 if desired != *(d.Spec.Replicas) {
545 return true, nil
546 }
547 }
548 return false, nil
549 }
550
View as plain text