1
16
17 package utils
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 apps "k8s.io/api/apps/v1"
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/util/dump"
28 "k8s.io/apimachinery/pkg/util/wait"
29 clientset "k8s.io/client-go/kubernetes"
30 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
31 deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
32 labelsutil "k8s.io/kubernetes/pkg/util/labels"
33 )
34
35 type LogfFn func(format string, args ...interface{})
36
37 func LogReplicaSetsOfDeployment(deployment *apps.Deployment, allOldRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, logf LogfFn) {
38 if newRS != nil {
39 logf("New ReplicaSet %q of Deployment %q:\n%s", newRS.Name, deployment.Name, dump.Pretty(*newRS))
40 } else {
41 logf("New ReplicaSet of Deployment %q is nil.", deployment.Name)
42 }
43 if len(allOldRSs) > 0 {
44 logf("All old ReplicaSets of Deployment %q:", deployment.Name)
45 }
46 for i := range allOldRSs {
47 logf(dump.Pretty(*allOldRSs[i]))
48 }
49 }
50
51 func LogPodsOfDeployment(c clientset.Interface, deployment *apps.Deployment, rsList []*apps.ReplicaSet, logf LogfFn) {
52 minReadySeconds := deployment.Spec.MinReadySeconds
53 podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
54 return c.CoreV1().Pods(namespace).List(context.TODO(), options)
55 }
56
57 podList, err := deploymentutil.ListPods(deployment, rsList, podListFunc)
58 if err != nil {
59 logf("Failed to list Pods of Deployment %q: %v", deployment.Name, err)
60 return
61 }
62 for _, pod := range podList.Items {
63 availability := "not available"
64 if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
65 availability = "available"
66 }
67 logf("Pod %q is %s:\n%s", pod.Name, availability, dump.Pretty(pod))
68 }
69 }
70
71
72
73
74
75 func waitForDeploymentCompleteMaybeCheckRolling(c clientset.Interface, d *apps.Deployment, rolling bool, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
76 var (
77 deployment *apps.Deployment
78 reason string
79 )
80
81 err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
82 var err error
83 deployment, err = c.AppsV1().Deployments(d.Namespace).Get(context.TODO(), d.Name, metav1.GetOptions{})
84 if err != nil {
85 return false, err
86 }
87
88
89 if rolling {
90 reason, err = checkRollingUpdateStatus(c, deployment, logf)
91 if err != nil {
92 return false, err
93 }
94 logf(reason)
95 }
96
97
98 if deploymentutil.DeploymentComplete(d, &deployment.Status) {
99 return true, nil
100 }
101
102 reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
103 logf(reason)
104
105 return false, nil
106 })
107
108 if wait.Interrupted(err) {
109 err = fmt.Errorf("%s", reason)
110 }
111 if err != nil {
112 return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
113 }
114 return nil
115 }
116
117 func checkRollingUpdateStatus(c clientset.Interface, deployment *apps.Deployment, logf LogfFn) (string, error) {
118 var reason string
119 oldRSs, allOldRSs, newRS, err := GetAllReplicaSets(deployment, c)
120 if err != nil {
121 return "", err
122 }
123 if newRS == nil {
124
125 reason = "new replica set hasn't been created yet"
126 return reason, nil
127 }
128 allRSs := append(oldRSs, newRS)
129
130 for i := range allRSs {
131 if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
132 reason = "all replica sets need to contain the pod-template-hash label"
133 return reason, nil
134 }
135 }
136
137
138 totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
139 maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
140 if totalCreated > maxCreated {
141 LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
142 LogPodsOfDeployment(c, deployment, allRSs, logf)
143 return "", fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
144 }
145 minAvailable := deploymentutil.MinAvailable(deployment)
146 if deployment.Status.AvailableReplicas < minAvailable {
147 LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
148 LogPodsOfDeployment(c, deployment, allRSs, logf)
149 return "", fmt.Errorf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
150 }
151 return "", nil
152 }
153
154
155
156
157 func GetAllReplicaSets(deployment *apps.Deployment, c clientset.Interface) ([]*apps.ReplicaSet, []*apps.ReplicaSet, *apps.ReplicaSet, error) {
158 rsList, err := deploymentutil.ListReplicaSets(deployment, deploymentutil.RsListFromClient(c.AppsV1()))
159 if err != nil {
160 return nil, nil, nil, err
161 }
162 oldRSes, allOldRSes := deploymentutil.FindOldReplicaSets(deployment, rsList)
163 newRS := deploymentutil.FindNewReplicaSet(deployment, rsList)
164 return oldRSes, allOldRSes, newRS, nil
165 }
166
167
168
169 func GetOldReplicaSets(deployment *apps.Deployment, c clientset.Interface) ([]*apps.ReplicaSet, []*apps.ReplicaSet, error) {
170 rsList, err := deploymentutil.ListReplicaSets(deployment, deploymentutil.RsListFromClient(c.AppsV1()))
171 if err != nil {
172 return nil, nil, err
173 }
174 oldRSes, allOldRSes := deploymentutil.FindOldReplicaSets(deployment, rsList)
175 return oldRSes, allOldRSes, nil
176 }
177
178
179
180 func GetNewReplicaSet(deployment *apps.Deployment, c clientset.Interface) (*apps.ReplicaSet, error) {
181 rsList, err := deploymentutil.ListReplicaSets(deployment, deploymentutil.RsListFromClient(c.AppsV1()))
182 if err != nil {
183 return nil, err
184 }
185 return deploymentutil.FindNewReplicaSet(deployment, rsList), nil
186 }
187
188
189
190 func WaitForDeploymentCompleteAndCheckRolling(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
191 rolling := true
192 return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
193 }
194
195
196
197
198 func WaitForDeploymentComplete(c clientset.Interface, d *apps.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
199 rolling := false
200 return waitForDeploymentCompleteMaybeCheckRolling(c, d, rolling, logf, pollInterval, pollTimeout)
201 }
202
203
204
205 func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
206 var deployment *apps.Deployment
207 var newRS *apps.ReplicaSet
208 var reason string
209 err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
210 var err error
211 deployment, err = c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
212 if err != nil {
213 return false, err
214 }
215
216 newRS, err = GetNewReplicaSet(deployment, c)
217 if err != nil {
218 return false, err
219 }
220 if err := checkRevisionAndImage(deployment, newRS, revision, image); err != nil {
221 reason = err.Error()
222 logf(reason)
223 return false, nil
224 }
225 return true, nil
226 })
227 if wait.Interrupted(err) {
228 LogReplicaSetsOfDeployment(deployment, nil, newRS, logf)
229 err = fmt.Errorf(reason)
230 }
231 if newRS == nil {
232 return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
233 }
234 if err != nil {
235 if deployment == nil {
236 return fmt.Errorf("error creating new replica set for deployment %q: %w", deploymentName, err)
237 }
238 deploymentImage := ""
239 if len(deployment.Spec.Template.Spec.Containers) > 0 {
240 deploymentImage = deployment.Spec.Template.Spec.Containers[0].Image
241 }
242 newRSImage := ""
243 if len(newRS.Spec.Template.Spec.Containers) > 0 {
244 newRSImage = newRS.Spec.Template.Spec.Containers[0].Image
245 }
246 return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deploymentImage, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRSImage, revision, image, err)
247 }
248 return nil
249 }
250
251
252 func CheckDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName, revision, image string) error {
253 deployment, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
254 if err != nil {
255 return fmt.Errorf("unable to get deployment %s during revision check: %v", deploymentName, err)
256 }
257
258
259 newRS, err := GetNewReplicaSet(deployment, c)
260 if err != nil {
261 return fmt.Errorf("unable to get new replicaset of deployment %s during revision check: %v", deploymentName, err)
262 }
263 return checkRevisionAndImage(deployment, newRS, revision, image)
264 }
265
266 func checkRevisionAndImage(deployment *apps.Deployment, newRS *apps.ReplicaSet, revision, image string) error {
267
268 if newRS == nil {
269 return fmt.Errorf("new replicaset for deployment %q is yet to be created", deployment.Name)
270 }
271 if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey) {
272 return fmt.Errorf("new replica set %q doesn't have %q label selector", newRS.Name, apps.DefaultDeploymentUniqueLabelKey)
273 }
274
275 if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
276 return fmt.Errorf("deployment %q doesn't have the required revision set", deployment.Name)
277 }
278 if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
279 return fmt.Errorf("new replicaset %q doesn't have the required revision set", newRS.Name)
280 }
281
282 if !containsImage(deployment.Spec.Template.Spec.Containers, image) {
283 return fmt.Errorf("deployment %q doesn't have the required image %s set", deployment.Name, image)
284 }
285 if !containsImage(newRS.Spec.Template.Spec.Containers, image) {
286 return fmt.Errorf("new replica set %q doesn't have the required image %s.", newRS.Name, image)
287 }
288 return nil
289 }
290
291 func containsImage(containers []v1.Container, imageName string) bool {
292 for _, container := range containers {
293 if container.Image == imageName {
294 return true
295 }
296 }
297 return false
298 }
299
300 type UpdateDeploymentFunc func(d *apps.Deployment)
301
302 func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate UpdateDeploymentFunc, logf LogfFn, pollInterval, pollTimeout time.Duration) (*apps.Deployment, error) {
303 var deployment *apps.Deployment
304 var updateErr error
305 pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
306 var err error
307 if deployment, err = c.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}); err != nil {
308 return false, err
309 }
310
311 applyUpdate(deployment)
312 if deployment, err = c.AppsV1().Deployments(namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{}); err == nil {
313 logf("Updating deployment %s", name)
314 return true, nil
315 }
316 updateErr = err
317 return false, nil
318 })
319 if wait.Interrupted(pollErr) {
320 pollErr = fmt.Errorf("couldn't apply the provided updated to deployment %q: %v", name, updateErr)
321 }
322 return deployment, pollErr
323 }
324
325 func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, desiredGeneration int64) error {
326 return deploymentutil.WaitForObservedDeployment(func() (*apps.Deployment, error) {
327 return c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
328 }, desiredGeneration, 2*time.Second, 1*time.Minute)
329 }
330
331
332 func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName string, pollInterval, pollTimeout time.Duration) error {
333 err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
334 deployment, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
335 if err != nil {
336 return false, err
337 }
338
339 if deployment.Annotations[apps.DeprecatedRollbackTo] == "" {
340 return true, nil
341 }
342 return false, nil
343 })
344 if err != nil {
345 return fmt.Errorf("error waiting for deployment %s rollbackTo to be cleared: %v", deploymentName, err)
346 }
347 return nil
348 }
349
350
351 func WaitForDeploymentUpdatedReplicasGTE(c clientset.Interface, ns, deploymentName string, minUpdatedReplicas int32, desiredGeneration int64, pollInterval, pollTimeout time.Duration) error {
352 var deployment *apps.Deployment
353 err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
354 d, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
355 if err != nil {
356 return false, err
357 }
358 deployment = d
359 return deployment.Status.ObservedGeneration >= desiredGeneration && deployment.Status.UpdatedReplicas >= minUpdatedReplicas, nil
360 })
361 if err != nil {
362 return fmt.Errorf("error waiting for deployment %q to have at least %d updatedReplicas: %v; latest .status.updatedReplicas: %d", deploymentName, minUpdatedReplicas, err, deployment.Status.UpdatedReplicas)
363 }
364 return nil
365 }
366
367 func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, reason string, condType apps.DeploymentConditionType, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
368 var deployment *apps.Deployment
369 pollErr := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
370 d, err := c.AppsV1().Deployments(ns).Get(context.TODO(), deploymentName, metav1.GetOptions{})
371 if err != nil {
372 return false, err
373 }
374 deployment = d
375 cond := deploymentutil.GetDeploymentCondition(deployment.Status, condType)
376 return cond != nil && cond.Reason == reason, nil
377 })
378 if wait.Interrupted(pollErr) {
379 pollErr = fmt.Errorf("deployment %q never updated with the desired condition and reason, latest deployment conditions: %+v", deployment.Name, deployment.Status.Conditions)
380 _, allOldRSs, newRS, err := GetAllReplicaSets(deployment, c)
381 if err == nil {
382 LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
383 LogPodsOfDeployment(c, deployment, append(allOldRSs, newRS), logf)
384 }
385 }
386 return pollErr
387 }
388
View as plain text