1
16
17 package deployment
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23
24 apps "k8s.io/api/apps/v1"
25 "k8s.io/klog/v2"
26 "k8s.io/kubernetes/pkg/controller"
27 deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
28 )
29
30
31 func (dc *DeploymentController) rolloutRolling(ctx context.Context, d *apps.Deployment, rsList []*apps.ReplicaSet) error {
32 newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)
33 if err != nil {
34 return err
35 }
36 allRSs := append(oldRSs, newRS)
37
38
39 scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)
40 if err != nil {
41 return err
42 }
43 if scaledUp {
44
45 return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
46 }
47
48
49 scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)
50 if err != nil {
51 return err
52 }
53 if scaledDown {
54
55 return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
56 }
57
58 if deploymentutil.DeploymentComplete(d, &d.Status) {
59 if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
60 return err
61 }
62 }
63
64
65 return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
66 }
67
68 func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
69 if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
70
71 return false, nil
72 }
73 if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
74
75 scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
76 return scaled, err
77 }
78 newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)
79 if err != nil {
80 return false, err
81 }
82 scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
83 return scaled, err
84 }
85
86 func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, deployment *apps.Deployment) (bool, error) {
87 logger := klog.FromContext(ctx)
88 oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
89 if oldPodsCount == 0 {
90
91 return false, nil
92 }
93 allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
94 logger.V(4).Info("New replica set", "replicaSet", klog.KObj(newRS), "availableReplicas", newRS.Status.AvailableReplicas)
95 maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
128 newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
129 maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
130 if maxScaledDown <= 0 {
131 return false, nil
132 }
133
134
135
136 oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, oldRSs, deployment, maxScaledDown)
137 if err != nil {
138 return false, nil
139 }
140 logger.V(4).Info("Cleaned up unhealthy replicas from old RSes", "count", cleanupCount)
141
142
143 allRSs = append(oldRSs, newRS)
144 scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, allRSs, oldRSs, deployment)
145 if err != nil {
146 return false, nil
147 }
148 logger.V(4).Info("Scaled down old RSes", "deployment", klog.KObj(deployment), "count", scaledDownCount)
149
150 totalScaledDown := cleanupCount + scaledDownCount
151 return totalScaledDown > 0, nil
152 }
153
154
155 func (dc *DeploymentController) cleanupUnhealthyReplicas(ctx context.Context, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment, maxCleanupCount int32) ([]*apps.ReplicaSet, int32, error) {
156 logger := klog.FromContext(ctx)
157 sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
158
159
160
161 totalScaledDown := int32(0)
162 for i, targetRS := range oldRSs {
163 if totalScaledDown >= maxCleanupCount {
164 break
165 }
166 if *(targetRS.Spec.Replicas) == 0 {
167
168 continue
169 }
170 logger.V(4).Info("Found available pods in old RS", "replicaSet", klog.KObj(targetRS), "availableReplicas", targetRS.Status.AvailableReplicas)
171 if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas {
172
173 continue
174 }
175
176 scaledDownCount := min(maxCleanupCount-totalScaledDown, *(targetRS.Spec.Replicas)-targetRS.Status.AvailableReplicas)
177 newReplicasCount := *(targetRS.Spec.Replicas) - scaledDownCount
178 if newReplicasCount > *(targetRS.Spec.Replicas) {
179 return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
180 }
181 _, updatedOldRS, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
182 if err != nil {
183 return nil, totalScaledDown, err
184 }
185 totalScaledDown += scaledDownCount
186 oldRSs[i] = updatedOldRS
187 }
188 return oldRSs, totalScaledDown, nil
189 }
190
191
192
193 func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(ctx context.Context, allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
194 logger := klog.FromContext(ctx)
195 maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
196
197
198 minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
199
200 availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
201 if availablePodCount <= minAvailable {
202
203 return 0, nil
204 }
205 logger.V(4).Info("Found available pods in deployment, scaling down old RSes", "deployment", klog.KObj(deployment), "availableReplicas", availablePodCount)
206
207 sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
208
209 totalScaledDown := int32(0)
210 totalScaleDownCount := availablePodCount - minAvailable
211 for _, targetRS := range oldRSs {
212 if totalScaledDown >= totalScaleDownCount {
213
214 break
215 }
216 if *(targetRS.Spec.Replicas) == 0 {
217
218 continue
219 }
220
221 scaleDownCount := min(*(targetRS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
222 newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
223 if newReplicasCount > *(targetRS.Spec.Replicas) {
224 return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
225 }
226 _, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, targetRS, newReplicasCount, deployment)
227 if err != nil {
228 return totalScaledDown, err
229 }
230
231 totalScaledDown += scaleDownCount
232 }
233
234 return totalScaledDown, nil
235 }
236
View as plain text