...

Source file src/helm.sh/helm/v3/pkg/kube/ready.go

Documentation: helm.sh/helm/v3/pkg/kube

     1  /*
     2  Copyright The Helm Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kube // import "helm.sh/helm/v3/pkg/kube"
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	appsv1 "k8s.io/api/apps/v1"
    24  	appsv1beta1 "k8s.io/api/apps/v1beta1"
    25  	appsv1beta2 "k8s.io/api/apps/v1beta2"
    26  	batchv1 "k8s.io/api/batch/v1"
    27  	corev1 "k8s.io/api/core/v1"
    28  	extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
    29  	apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    30  	apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/util/intstr"
    34  	"k8s.io/cli-runtime/pkg/resource"
    35  	"k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/kubernetes/scheme"
    37  
    38  	deploymentutil "helm.sh/helm/v3/internal/third_party/k8s.io/kubernetes/deployment/util"
    39  )
    40  
    41  // ReadyCheckerOption is a function that configures a ReadyChecker.
    42  type ReadyCheckerOption func(*ReadyChecker)
    43  
    44  // PausedAsReady returns a ReadyCheckerOption that configures a ReadyChecker
    45  // to consider paused resources to be ready. For example a Deployment
    46  // with spec.paused equal to true would be considered ready.
    47  func PausedAsReady(pausedAsReady bool) ReadyCheckerOption {
    48  	return func(c *ReadyChecker) {
    49  		c.pausedAsReady = pausedAsReady
    50  	}
    51  }
    52  
    53  // CheckJobs returns a ReadyCheckerOption that configures a ReadyChecker
    54  // to consider readiness of Job resources.
    55  func CheckJobs(checkJobs bool) ReadyCheckerOption {
    56  	return func(c *ReadyChecker) {
    57  		c.checkJobs = checkJobs
    58  	}
    59  }
    60  
    61  // NewReadyChecker creates a new checker. Passed ReadyCheckerOptions can
    62  // be used to override defaults.
    63  func NewReadyChecker(cl kubernetes.Interface, log func(string, ...interface{}), opts ...ReadyCheckerOption) ReadyChecker {
    64  	c := ReadyChecker{
    65  		client: cl,
    66  		log:    log,
    67  	}
    68  	if c.log == nil {
    69  		c.log = nopLogger
    70  	}
    71  	for _, opt := range opts {
    72  		opt(&c)
    73  	}
    74  	return c
    75  }
    76  
    77  // ReadyChecker is a type that can check core Kubernetes types for readiness.
    78  type ReadyChecker struct {
    79  	client        kubernetes.Interface
    80  	log           func(string, ...interface{})
    81  	checkJobs     bool
    82  	pausedAsReady bool
    83  }
    84  
    85  // IsReady checks if v is ready. It supports checking readiness for pods,
    86  // deployments, persistent volume claims, services, daemon sets, custom
    87  // resource definitions, stateful sets, replication controllers, jobs (optional),
    88  // and replica sets. All other resource kinds are always considered ready.
    89  //
    90  // IsReady will fetch the latest state of the object from the server prior to
    91  // performing readiness checks, and it will return any error encountered.
    92  func (c *ReadyChecker) IsReady(ctx context.Context, v *resource.Info) (bool, error) {
    93  	switch value := AsVersioned(v).(type) {
    94  	case *corev1.Pod:
    95  		pod, err := c.client.CoreV1().Pods(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
    96  		if err != nil || !c.isPodReady(pod) {
    97  			return false, err
    98  		}
    99  	case *batchv1.Job:
   100  		if c.checkJobs {
   101  			job, err := c.client.BatchV1().Jobs(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   102  			if err != nil {
   103  				return false, err
   104  			}
   105  			ready, err := c.jobReady(job)
   106  			return ready, err
   107  		}
   108  	case *appsv1.Deployment, *appsv1beta1.Deployment, *appsv1beta2.Deployment, *extensionsv1beta1.Deployment:
   109  		currentDeployment, err := c.client.AppsV1().Deployments(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   110  		if err != nil {
   111  			return false, err
   112  		}
   113  		// If paused deployment will never be ready
   114  		if currentDeployment.Spec.Paused {
   115  			return c.pausedAsReady, nil
   116  		}
   117  		// Find RS associated with deployment
   118  		newReplicaSet, err := deploymentutil.GetNewReplicaSet(currentDeployment, c.client.AppsV1())
   119  		if err != nil || newReplicaSet == nil {
   120  			return false, err
   121  		}
   122  		if !c.deploymentReady(newReplicaSet, currentDeployment) {
   123  			return false, nil
   124  		}
   125  	case *corev1.PersistentVolumeClaim:
   126  		claim, err := c.client.CoreV1().PersistentVolumeClaims(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   127  		if err != nil {
   128  			return false, err
   129  		}
   130  		if !c.volumeReady(claim) {
   131  			return false, nil
   132  		}
   133  	case *corev1.Service:
   134  		svc, err := c.client.CoreV1().Services(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   135  		if err != nil {
   136  			return false, err
   137  		}
   138  		if !c.serviceReady(svc) {
   139  			return false, nil
   140  		}
   141  	case *extensionsv1beta1.DaemonSet, *appsv1.DaemonSet, *appsv1beta2.DaemonSet:
   142  		ds, err := c.client.AppsV1().DaemonSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   143  		if err != nil {
   144  			return false, err
   145  		}
   146  		if !c.daemonSetReady(ds) {
   147  			return false, nil
   148  		}
   149  	case *apiextv1beta1.CustomResourceDefinition:
   150  		if err := v.Get(); err != nil {
   151  			return false, err
   152  		}
   153  		crd := &apiextv1beta1.CustomResourceDefinition{}
   154  		if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
   155  			return false, err
   156  		}
   157  		if !c.crdBetaReady(*crd) {
   158  			return false, nil
   159  		}
   160  	case *apiextv1.CustomResourceDefinition:
   161  		if err := v.Get(); err != nil {
   162  			return false, err
   163  		}
   164  		crd := &apiextv1.CustomResourceDefinition{}
   165  		if err := scheme.Scheme.Convert(v.Object, crd, nil); err != nil {
   166  			return false, err
   167  		}
   168  		if !c.crdReady(*crd) {
   169  			return false, nil
   170  		}
   171  	case *appsv1.StatefulSet, *appsv1beta1.StatefulSet, *appsv1beta2.StatefulSet:
   172  		sts, err := c.client.AppsV1().StatefulSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   173  		if err != nil {
   174  			return false, err
   175  		}
   176  		if !c.statefulSetReady(sts) {
   177  			return false, nil
   178  		}
   179  	case *corev1.ReplicationController:
   180  		rc, err := c.client.CoreV1().ReplicationControllers(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   181  		if err != nil {
   182  			return false, err
   183  		}
   184  		if !c.replicationControllerReady(rc) {
   185  			return false, nil
   186  		}
   187  		ready, err := c.podsReadyForObject(ctx, v.Namespace, value)
   188  		if !ready || err != nil {
   189  			return false, err
   190  		}
   191  	case *extensionsv1beta1.ReplicaSet, *appsv1beta2.ReplicaSet, *appsv1.ReplicaSet:
   192  		rs, err := c.client.AppsV1().ReplicaSets(v.Namespace).Get(ctx, v.Name, metav1.GetOptions{})
   193  		if err != nil {
   194  			return false, err
   195  		}
   196  		if !c.replicaSetReady(rs) {
   197  			return false, nil
   198  		}
   199  		ready, err := c.podsReadyForObject(ctx, v.Namespace, value)
   200  		if !ready || err != nil {
   201  			return false, err
   202  		}
   203  	}
   204  	return true, nil
   205  }
   206  
   207  func (c *ReadyChecker) podsReadyForObject(ctx context.Context, namespace string, obj runtime.Object) (bool, error) {
   208  	pods, err := c.podsforObject(ctx, namespace, obj)
   209  	if err != nil {
   210  		return false, err
   211  	}
   212  	for _, pod := range pods {
   213  		if !c.isPodReady(&pod) {
   214  			return false, nil
   215  		}
   216  	}
   217  	return true, nil
   218  }
   219  
   220  func (c *ReadyChecker) podsforObject(ctx context.Context, namespace string, obj runtime.Object) ([]corev1.Pod, error) {
   221  	selector, err := SelectorsForObject(obj)
   222  	if err != nil {
   223  		return nil, err
   224  	}
   225  	list, err := getPods(ctx, c.client, namespace, selector.String())
   226  	return list, err
   227  }
   228  
   229  // isPodReady returns true if a pod is ready; false otherwise.
   230  func (c *ReadyChecker) isPodReady(pod *corev1.Pod) bool {
   231  	for _, c := range pod.Status.Conditions {
   232  		if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
   233  			return true
   234  		}
   235  	}
   236  	c.log("Pod is not ready: %s/%s", pod.GetNamespace(), pod.GetName())
   237  	return false
   238  }
   239  
   240  func (c *ReadyChecker) jobReady(job *batchv1.Job) (bool, error) {
   241  	if job.Status.Failed > *job.Spec.BackoffLimit {
   242  		c.log("Job is failed: %s/%s", job.GetNamespace(), job.GetName())
   243  		// If a job is failed, it can't recover, so throw an error
   244  		return false, fmt.Errorf("job is failed: %s/%s", job.GetNamespace(), job.GetName())
   245  	}
   246  	if job.Spec.Completions != nil && job.Status.Succeeded < *job.Spec.Completions {
   247  		c.log("Job is not completed: %s/%s", job.GetNamespace(), job.GetName())
   248  		return false, nil
   249  	}
   250  	return true, nil
   251  }
   252  
   253  func (c *ReadyChecker) serviceReady(s *corev1.Service) bool {
   254  	// ExternalName Services are external to cluster so helm shouldn't be checking to see if they're 'ready' (i.e. have an IP Set)
   255  	if s.Spec.Type == corev1.ServiceTypeExternalName {
   256  		return true
   257  	}
   258  
   259  	// Ensure that the service cluster IP is not empty
   260  	if s.Spec.ClusterIP == "" {
   261  		c.log("Service does not have cluster IP address: %s/%s", s.GetNamespace(), s.GetName())
   262  		return false
   263  	}
   264  
   265  	// This checks if the service has a LoadBalancer and that balancer has an Ingress defined
   266  	if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
   267  		// do not wait when at least 1 external IP is set
   268  		if len(s.Spec.ExternalIPs) > 0 {
   269  			c.log("Service %s/%s has external IP addresses (%v), marking as ready", s.GetNamespace(), s.GetName(), s.Spec.ExternalIPs)
   270  			return true
   271  		}
   272  
   273  		if s.Status.LoadBalancer.Ingress == nil {
   274  			c.log("Service does not have load balancer ingress IP address: %s/%s", s.GetNamespace(), s.GetName())
   275  			return false
   276  		}
   277  	}
   278  
   279  	return true
   280  }
   281  
   282  func (c *ReadyChecker) volumeReady(v *corev1.PersistentVolumeClaim) bool {
   283  	if v.Status.Phase != corev1.ClaimBound {
   284  		c.log("PersistentVolumeClaim is not bound: %s/%s", v.GetNamespace(), v.GetName())
   285  		return false
   286  	}
   287  	return true
   288  }
   289  
   290  func (c *ReadyChecker) deploymentReady(rs *appsv1.ReplicaSet, dep *appsv1.Deployment) bool {
   291  	// Verify the replicaset readiness
   292  	if !c.replicaSetReady(rs) {
   293  		return false
   294  	}
   295  	// Verify the generation observed by the deployment controller matches the spec generation
   296  	if dep.Status.ObservedGeneration != dep.ObjectMeta.Generation {
   297  		c.log("Deployment is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", dep.Namespace, dep.Name, dep.Status.ObservedGeneration, dep.ObjectMeta.Generation)
   298  		return false
   299  	}
   300  
   301  	expectedReady := *dep.Spec.Replicas - deploymentutil.MaxUnavailable(*dep)
   302  	if !(rs.Status.ReadyReplicas >= expectedReady) {
   303  		c.log("Deployment is not ready: %s/%s. %d out of %d expected pods are ready", dep.Namespace, dep.Name, rs.Status.ReadyReplicas, expectedReady)
   304  		return false
   305  	}
   306  	return true
   307  }
   308  
   309  func (c *ReadyChecker) daemonSetReady(ds *appsv1.DaemonSet) bool {
   310  	// Verify the generation observed by the daemonSet controller matches the spec generation
   311  	if ds.Status.ObservedGeneration != ds.ObjectMeta.Generation {
   312  		c.log("DaemonSet is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", ds.Namespace, ds.Name, ds.Status.ObservedGeneration, ds.ObjectMeta.Generation)
   313  		return false
   314  	}
   315  
   316  	// If the update strategy is not a rolling update, there will be nothing to wait for
   317  	if ds.Spec.UpdateStrategy.Type != appsv1.RollingUpdateDaemonSetStrategyType {
   318  		return true
   319  	}
   320  
   321  	// Make sure all the updated pods have been scheduled
   322  	if ds.Status.UpdatedNumberScheduled != ds.Status.DesiredNumberScheduled {
   323  		c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", ds.Namespace, ds.Name, ds.Status.UpdatedNumberScheduled, ds.Status.DesiredNumberScheduled)
   324  		return false
   325  	}
   326  	maxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, int(ds.Status.DesiredNumberScheduled), true)
   327  	if err != nil {
   328  		// If for some reason the value is invalid, set max unavailable to the
   329  		// number of desired replicas. This is the same behavior as the
   330  		// `MaxUnavailable` function in deploymentutil
   331  		maxUnavailable = int(ds.Status.DesiredNumberScheduled)
   332  	}
   333  
   334  	expectedReady := int(ds.Status.DesiredNumberScheduled) - maxUnavailable
   335  	if !(int(ds.Status.NumberReady) >= expectedReady) {
   336  		c.log("DaemonSet is not ready: %s/%s. %d out of %d expected pods are ready", ds.Namespace, ds.Name, ds.Status.NumberReady, expectedReady)
   337  		return false
   338  	}
   339  	return true
   340  }
   341  
   342  // Because the v1 extensions API is not available on all supported k8s versions
   343  // yet and because Go doesn't support generics, we need to have a duplicate
   344  // function to support the v1beta1 types
   345  func (c *ReadyChecker) crdBetaReady(crd apiextv1beta1.CustomResourceDefinition) bool {
   346  	for _, cond := range crd.Status.Conditions {
   347  		switch cond.Type {
   348  		case apiextv1beta1.Established:
   349  			if cond.Status == apiextv1beta1.ConditionTrue {
   350  				return true
   351  			}
   352  		case apiextv1beta1.NamesAccepted:
   353  			if cond.Status == apiextv1beta1.ConditionFalse {
   354  				// This indicates a naming conflict, but it's probably not the
   355  				// job of this function to fail because of that. Instead,
   356  				// we treat it as a success, since the process should be able to
   357  				// continue.
   358  				return true
   359  			}
   360  		}
   361  	}
   362  	return false
   363  }
   364  
   365  func (c *ReadyChecker) crdReady(crd apiextv1.CustomResourceDefinition) bool {
   366  	for _, cond := range crd.Status.Conditions {
   367  		switch cond.Type {
   368  		case apiextv1.Established:
   369  			if cond.Status == apiextv1.ConditionTrue {
   370  				return true
   371  			}
   372  		case apiextv1.NamesAccepted:
   373  			if cond.Status == apiextv1.ConditionFalse {
   374  				// This indicates a naming conflict, but it's probably not the
   375  				// job of this function to fail because of that. Instead,
   376  				// we treat it as a success, since the process should be able to
   377  				// continue.
   378  				return true
   379  			}
   380  		}
   381  	}
   382  	return false
   383  }
   384  
   385  func (c *ReadyChecker) statefulSetReady(sts *appsv1.StatefulSet) bool {
   386  	// Verify the generation observed by the statefulSet controller matches the spec generation
   387  	if sts.Status.ObservedGeneration != sts.ObjectMeta.Generation {
   388  		c.log("StatefulSet is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", sts.Namespace, sts.Name, sts.Status.ObservedGeneration, sts.ObjectMeta.Generation)
   389  		return false
   390  	}
   391  
   392  	// If the update strategy is not a rolling update, there will be nothing to wait for
   393  	if sts.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
   394  		c.log("StatefulSet skipped ready check: %s/%s. updateStrategy is %v", sts.Namespace, sts.Name, sts.Spec.UpdateStrategy.Type)
   395  		return true
   396  	}
   397  
   398  	// Dereference all the pointers because StatefulSets like them
   399  	var partition int
   400  	// 1 is the default for replicas if not set
   401  	replicas := 1
   402  	// For some reason, even if the update strategy is a rolling update, the
   403  	// actual rollingUpdate field can be nil. If it is, we can safely assume
   404  	// there is no partition value
   405  	if sts.Spec.UpdateStrategy.RollingUpdate != nil && sts.Spec.UpdateStrategy.RollingUpdate.Partition != nil {
   406  		partition = int(*sts.Spec.UpdateStrategy.RollingUpdate.Partition)
   407  	}
   408  	if sts.Spec.Replicas != nil {
   409  		replicas = int(*sts.Spec.Replicas)
   410  	}
   411  
   412  	// Because an update strategy can use partitioning, we need to calculate the
   413  	// number of updated replicas we should have. For example, if the replicas
   414  	// is set to 3 and the partition is 2, we'd expect only one pod to be
   415  	// updated
   416  	expectedReplicas := replicas - partition
   417  
   418  	// Make sure all the updated pods have been scheduled
   419  	if int(sts.Status.UpdatedReplicas) < expectedReplicas {
   420  		c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods have been scheduled", sts.Namespace, sts.Name, sts.Status.UpdatedReplicas, expectedReplicas)
   421  		return false
   422  	}
   423  
   424  	if int(sts.Status.ReadyReplicas) != replicas {
   425  		c.log("StatefulSet is not ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
   426  		return false
   427  	}
   428  	// This check only makes sense when all partitions are being upgraded otherwise during a
   429  	// partioned rolling upgrade, this condition will never evaluate to true, leading to
   430  	// error.
   431  	if partition == 0 && sts.Status.CurrentRevision != sts.Status.UpdateRevision {
   432  		c.log("StatefulSet is not ready: %s/%s. currentRevision %s does not yet match updateRevision %s", sts.Namespace, sts.Name, sts.Status.CurrentRevision, sts.Status.UpdateRevision)
   433  		return false
   434  	}
   435  
   436  	c.log("StatefulSet is ready: %s/%s. %d out of %d expected pods are ready", sts.Namespace, sts.Name, sts.Status.ReadyReplicas, replicas)
   437  	return true
   438  }
   439  
   440  func (c *ReadyChecker) replicationControllerReady(rc *corev1.ReplicationController) bool {
   441  	// Verify the generation observed by the replicationController controller matches the spec generation
   442  	if rc.Status.ObservedGeneration != rc.ObjectMeta.Generation {
   443  		c.log("ReplicationController is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", rc.Namespace, rc.Name, rc.Status.ObservedGeneration, rc.ObjectMeta.Generation)
   444  		return false
   445  	}
   446  	return true
   447  }
   448  
   449  func (c *ReadyChecker) replicaSetReady(rs *appsv1.ReplicaSet) bool {
   450  	// Verify the generation observed by the replicaSet controller matches the spec generation
   451  	if rs.Status.ObservedGeneration != rs.ObjectMeta.Generation {
   452  		c.log("ReplicaSet is not ready: %s/%s. observedGeneration (%d) does not match spec generation (%d).", rs.Namespace, rs.Name, rs.Status.ObservedGeneration, rs.ObjectMeta.Generation)
   453  		return false
   454  	}
   455  	return true
   456  }
   457  
   458  func getPods(ctx context.Context, client kubernetes.Interface, namespace, selector string) ([]corev1.Pod, error) {
   459  	list, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
   460  		LabelSelector: selector,
   461  	})
   462  	return list.Items, err
   463  }
   464  

View as plain text