1
16
17 package kube
18
19 import (
20 "context"
21 "fmt"
22
23 appsv1 "k8s.io/api/apps/v1"
24 appsv1beta1 "k8s.io/api/apps/v1beta1"
25 appsv1beta2 "k8s.io/api/apps/v1beta2"
26 batchv1 "k8s.io/api/batch/v1"
27 corev1 "k8s.io/api/core/v1"
28 extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
29 apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
30 apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime"
33 "k8s.io/apimachinery/pkg/util/intstr"
34 "k8s.io/cli-runtime/pkg/resource"
35 "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/kubernetes/scheme"
37
38 deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util"
39 )
40
41
42 type ReadyCheckerOption func(*ReadyChecker)
43
44
45
46
47 func PausedAsReady(pausedAsReady bool) ReadyCheckerOption {
48 return func(c *ReadyChecker) {
49 c.pausedAsReady = pausedAsReady
50 }
51 }
52
53
54
55 func CheckJobs(checkJobs bool) ReadyCheckerOption {
56 return func(c *ReadyChecker) {
57 c.checkJobs = checkJobs
58 }
59 }
60
61
62
63 func NewReadyChecker(cl kubernetes.Interface, log func(string, ...interface{}), opts ...ReadyCheckerOption) ReadyChecker {
64 c := ReadyChecker{
65 client: cl,
66 log: log,
67 }
68 if c.log == nil {
69 c.log = nopLogger
70 }
71 for _, opt := range opts {
72 opt(&c)
73 }
74 return c
75 }
76
77
78 type ReadyChecker struct {
79 client kubernetes.Interface
80 log func(string, ...interface{})
81 checkJobs bool
82 pausedAsReady bool
83 }
84
85
86
87
88
89
90
91
92 func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) {
93 switch value := AsVersioned(v).(type) {
94 case *corev1.Pod:
95 pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
96 if err != nil || !c.isPodReady(pod) {
97 return false, err
98 }
99 case *batchv1.Job:
100 if c.checkJobs {
101 job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
102 if err != nil {
103 return false, err
104 }
105 ready, err := c.jobReady(job)
106 return ready, err
107 }
108 case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
109 currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
110 if err != nil {
111 return false, err
112 }
113
114 if currentDeployment.Spec.Paused {
115 return c.pausedAsReady, nil
116 }
117
118 newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, c.client.AppsV1())
119 if err != nil || newReplicaSet == nil {
120 return false, err
121 }
122 if !c.deploymentReady(newReplicaSet, currentDeployment) {
123 return false, nil
124 }
125 case *corev1.PersistentVolumeClaim:
126 claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
127 if err != nil {
128 return false, err
129 }
130 if !c.volumeReady(claim) {
131 return false, nil
132 }
133 case *corev1.Service:
134 svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
135 if err != nil {
136 return false, err
137 }
138 if !c.serviceReady(svc) {
139 return false, nil
140 }
141 case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
142 ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
143 if err != nil {
144 return false, err
145 }
146 if !c.daemonSetReady(ds) {
147 return false, nil
148 }
149 case *apiextv1beta1.CustomResourceDefinition:
150 if err := v.Get(); err != nil {
151 return false, err
152 }
153 crd := &apiextv1beta1.CustomResourceDefinition{}
154 if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
155 return false, err
156 }
157 if !c.crdBetaReady(*crd) {
158 return false, nil
159 }
160 case *apiextv1.CustomResourceDefinition:
161 if err := v.Get(); err != nil {
162 return false, err
163 }
164 crd := &apiextv1.CustomResourceDefinition{}
165 if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
166 return false, err
167 }
168 if !c.crdReady(*crd) {
169 return false, nil
170 }
171 case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
172 sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
173 if err != nil {
174 return false, err
175 }
176 if !c.statefulSetReady(sts) {
177 return false, nil
178 }
179 case *corev1.ReplicationController:
180 rc, err := c.client.CoreV1().ReplicationControllers(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
181 if err != nil {
182 return false, err
183 }
184 if !c.replicationControllerReady(rc) {
185 return false, nil
186 }
187 ready, err := c.podsReadyForObject(ctx, v.Namespace, value)
188 if !ready || err != nil {
189 return false, err
190 }
191 case *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
192 rs, err := c.client.AppsV1().ReplicaSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
193 if err != nil {
194 return false, err
195 }
196 if !c.replicaSetReady(rs) {
197 return false, nil
198 }
199 ready, err := c.podsReadyForObject(ctx, v.Namespace, value)
200 if !ready || err != nil {
201 return false, err
202 }
203 }
204 return true, nil
205 }
206
207 func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) {
208 pods, err := c.podsforObject(ctx, namespace, obj)
209 if err != nil {
210 return false, err
211 }
212 for _, pod := range pods {
213 if !c.isPodReady(&pod) {
214 return false, nil
215 }
216 }
217 return true, nil
218 }
219
220 func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) {
221 selector, err := SelectorsForObject(obj)
222 if err != nil {
223 return nil, err
224 }
225 list, err := getPods(ctx, c.client, namespace, selector.String())
226 return list, err
227 }
228
229
230 func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool {
231 for _, c := range pod.Status.Conditions {
232 if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
233 return true
234 }
235 }
236 c.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
237 return false
238 }
239
240 func (c *ReadyChecker) jobReady(job *batchv1.Job) (bool, error) {
241 if job.Status.Failed > *job.Spec.BackoffLimit {
242 c.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
243
244 return false, fmt.Errorf("job is failed: %s/%s", job.GetNamespace(), job.GetName())
245 }
246 if job.Spec.Completions != nil && job.Status.Succeeded < *job.Spec.Completions {
247 c.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
248 return false, nil
249 }
250 return true, nil
251 }
252
253 func (c *ReadyChecker) serviceReady(s *corev1.Service) bool {
254
255 if s.Spec.Type == corev1.ServiceTypeExternalName {
256 return true
257 }
258
259
260 if s.Spec.ClusterIP == "" {
261 c.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
262 return false
263 }
264
265
266 if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
267
268 if len(s.Spec.ExternalIPs) > 0 {
269 c.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
270 return true
271 }
272
273 if s.Status.LoadBalancer.Ingress == nil {
274 c.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
275 return false
276 }
277 }
278
279 return true
280 }
281
282 func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool {
283 if v.Status.Phase != corev1.ClaimBound {
284 c.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
285 return false
286 }
287 return true
288 }
289
290 func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
291
292 if !c.replicaSetReady(rs) {
293 return false
294 }
295
296 if dep.Status.ObservedGeneration != dep.ObjectMeta.Generation {
297 c.log("Deployment is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", dep.Namespace, dep.Name, dep.Status.ObservedGeneration, dep.ObjectMeta.Generation)
298 return false
299 }
300
301 expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep)
302 if !(rs.Status.ReadyReplicas >= expectedReady) {
303 c.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
304 return false
305 }
306 return true
307 }
308
309 func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool {
310
311 if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation {
312 c.log("DaemonSet is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", ds.Namespace, ds.Name, ds.Status.ObservedGeneration, ds.ObjectMeta.Generation)
313 return false
314 }
315
316
317 if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
318 return true
319 }
320
321
322 if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
323 c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
324 return false
325 }
326 maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
327 if err != nil {
328
329
330
331 maxUnavailable = int(ds.Status.DesiredNumberScheduled)
332 }
333
334 expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
335 if !(int(ds.Status.NumberReady) >= expectedReady) {
336 c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
337 return false
338 }
339 return true
340 }
341
342
343
344
345 func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
346 for _, cond := range crd.Status.Conditions {
347 switch cond.Type {
348 case apiextv1beta1.Established:
349 if cond.Status == apiextv1beta1.ConditionTrue {
350 return true
351 }
352 case apiextv1beta1.NamesAccepted:
353 if cond.Status == apiextv1beta1.ConditionFalse {
354
355
356
357
358 return true
359 }
360 }
361 }
362 return false
363 }
364
365 func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool {
366 for _, cond := range crd.Status.Conditions {
367 switch cond.Type {
368 case apiextv1.Established:
369 if cond.Status == apiextv1.ConditionTrue {
370 return true
371 }
372 case apiextv1.NamesAccepted:
373 if cond.Status == apiextv1.ConditionFalse {
374
375
376
377
378 return true
379 }
380 }
381 }
382 return false
383 }
384
385 func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool {
386
387 if sts.Status.ObservedGeneration != sts.ObjectMeta.Generation {
388 c.log("StatefulSet is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", sts.Namespace, sts.Name, sts.Status.ObservedGeneration, sts.ObjectMeta.Generation)
389 return false
390 }
391
392
393 if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
394 c.log("StatefulSet skipped ready check: %s/%s. updateStrategy is %v", sts.Namespace, sts.Name, sts.Spec.UpdateStrategy.Type)
395 return true
396 }
397
398
399 var partition int
400
401 replicas := 1
402
403
404
405 if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
406 partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
407 }
408 if sts.Spec.Replicas != nil {
409 replicas = int(*sts.Spec.Replicas)
410 }
411
412
413
414
415
416 expectedReplicas := replicas - partition
417
418
419 if int(sts.Status.UpdatedReplicas) < expectedReplicas {
420 c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
421 return false
422 }
423
424 if int(sts.Status.ReadyReplicas) != replicas {
425 c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
426 return false
427 }
428
429
430
431 if partition == 0 && sts.Status.CurrentRevision != sts.Status.UpdateRevision {
432 c.log("StatefulSet is not ready: %s/%s. currentRevision %s does not yet match updateRevision %s", sts.Namespace, sts.Name, sts.Status.CurrentRevision, sts.Status.UpdateRevision)
433 return false
434 }
435
436 c.log("StatefulSet is ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
437 return true
438 }
439
440 func (c *ReadyChecker) replicationControllerReady(rc *corev1.ReplicationController) bool {
441
442 if rc.Status.ObservedGeneration != rc.ObjectMeta.Generation {
443 c.log("ReplicationController is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", rc.Namespace, rc.Name, rc.Status.ObservedGeneration, rc.ObjectMeta.Generation)
444 return false
445 }
446 return true
447 }
448
449 func (c *ReadyChecker) replicaSetReady(rs *appsv1.ReplicaSet) bool {
450
451 if rs.Status.ObservedGeneration != rs.ObjectMeta.Generation {
452 c.log("ReplicaSet is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", rs.Namespace, rs.Name, rs.Status.ObservedGeneration, rs.ObjectMeta.Generation)
453 return false
454 }
455 return true
456 }
457
458 func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
459 list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
460 LabelSelector: selector,
461 })
462 return list.Items, err
463 }
464
View as plain text