...

Source file src/helm.sh/helm/v3/pkg/kube/client.go

Documentation: helm.sh/helm/v3/pkg/kube

     1  /*
     2  Copyright The Helm Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kube // import "helm.sh/helm/v3/pkg/kube"
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"io"
    25  	"os"
    26  	"path/filepath"
    27  	"reflect"
    28  	"strings"
    29  	"sync"
    30  	"time"
    31  
    32  	jsonpatch "github.com/evanphx/json-patch"
    33  	"github.com/pkg/errors"
    34  	batch "k8s.io/api/batch/v1"
    35  	v1 "k8s.io/api/core/v1"
    36  	apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    37  	apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
    38  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    39  
    40  	multierror "github.com/hashicorp/go-multierror"
    41  	"k8s.io/apimachinery/pkg/api/meta"
    42  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    43  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    44  	metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
    45  	"k8s.io/apimachinery/pkg/fields"
    46  	"k8s.io/apimachinery/pkg/labels"
    47  	"k8s.io/apimachinery/pkg/runtime"
    48  	"k8s.io/apimachinery/pkg/types"
    49  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    50  	"k8s.io/apimachinery/pkg/watch"
    51  	"k8s.io/cli-runtime/pkg/genericclioptions"
    52  	"k8s.io/cli-runtime/pkg/resource"
    53  	"k8s.io/client-go/kubernetes"
    54  	"k8s.io/client-go/kubernetes/scheme"
    55  	"k8s.io/client-go/rest"
    56  	cachetools "k8s.io/client-go/tools/cache"
    57  	watchtools "k8s.io/client-go/tools/watch"
    58  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    59  )
    60  
    61  // ErrNoObjectsVisited indicates that during a visit operation, no matching objects were found.
    62  var ErrNoObjectsVisited = errors.New("no objects visited")
    63  
    64  var metadataAccessor = meta.NewAccessor()
    65  
    66  // ManagedFieldsManager is the name of the manager of Kubernetes managedFields
    67  // first introduced in Kubernetes 1.18
    68  var ManagedFieldsManager string
    69  
    70  // Client represents a client capable of communicating with the Kubernetes API.
    71  type Client struct {
    72  	// Factory provides a minimal version of the kubectl Factory interface. If
    73  	// you need the full Factory you can type switch to the full interface.
    74  	// Since Kubernetes Go API does not provide backwards compatibility across
    75  	// minor versions, this API does not follow Helm backwards compatibility.
    76  	// Helm is exposing Kubernetes in this property and cannot guarantee this
    77  	// will not change. The minimal interface only has the functions that Helm
    78  	// needs. The smaller surface area of the interface means there is a lower
    79  	// chance of it changing.
    80  	Factory Factory
    81  	Log     func(string, ...interface{})
    82  	// Namespace allows to bypass the kubeconfig file for the choice of the namespace
    83  	Namespace string
    84  
    85  	kubeClient *kubernetes.Clientset
    86  }
    87  
    88  var addToScheme sync.Once
    89  
    90  // New creates a new Client.
    91  func New(getter genericclioptions.RESTClientGetter) *Client {
    92  	if getter == nil {
    93  		getter = genericclioptions.NewConfigFlags(true)
    94  	}
    95  	// Add CRDs to the scheme. They are missing by default.
    96  	addToScheme.Do(func() {
    97  		if err := apiextv1.AddToScheme(scheme.Scheme); err != nil {
    98  			// This should never happen.
    99  			panic(err)
   100  		}
   101  		if err := apiextv1beta1.AddToScheme(scheme.Scheme); err != nil {
   102  			panic(err)
   103  		}
   104  	})
   105  	return &Client{
   106  		Factory: cmdutil.NewFactory(getter),
   107  		Log:     nopLogger,
   108  	}
   109  }
   110  
   111  var nopLogger = func(_ string, _ ...interface{}) {}
   112  
   113  // getKubeClient get or create a new KubernetesClientSet
   114  func (c *Client) getKubeClient() (*kubernetes.Clientset, error) {
   115  	var err error
   116  	if c.kubeClient == nil {
   117  		c.kubeClient, err = c.Factory.KubernetesClientSet()
   118  	}
   119  
   120  	return c.kubeClient, err
   121  }
   122  
   123  // IsReachable tests connectivity to the cluster.
   124  func (c *Client) IsReachable() error {
   125  	client, err := c.getKubeClient()
   126  	if err == genericclioptions.ErrEmptyConfig {
   127  		// re-replace kubernetes ErrEmptyConfig error with a friendy error
   128  		// moar workarounds for Kubernetes API breaking.
   129  		return errors.New("Kubernetes cluster unreachable")
   130  	}
   131  	if err != nil {
   132  		return errors.Wrap(err, "Kubernetes cluster unreachable")
   133  	}
   134  	if _, err := client.ServerVersion(); err != nil {
   135  		return errors.Wrap(err, "Kubernetes cluster unreachable")
   136  	}
   137  	return nil
   138  }
   139  
   140  // Create creates Kubernetes resources specified in the resource list.
   141  func (c *Client) Create(resources ResourceList) (*Result, error) {
   142  	c.Log("creating %d resource(s)", len(resources))
   143  	if err := perform(resources, createResource); err != nil {
   144  		return nil, err
   145  	}
   146  	return &Result{Created: resources}, nil
   147  }
   148  
   149  func transformRequests(req *rest.Request) {
   150  	tableParam := strings.Join([]string{
   151  		fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
   152  		fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1beta1.SchemeGroupVersion.Version, metav1beta1.GroupName),
   153  		"application/json",
   154  	}, ",")
   155  	req.SetHeader("Accept", tableParam)
   156  
   157  	// if sorting, ensure we receive the full object in order to introspect its fields via jsonpath
   158  	req.Param("includeObject", "Object")
   159  }
   160  
   161  // Get retrieves the resource objects supplied. If related is set to true the
   162  // related pods are fetched as well. If the passed in resources are a table kind
   163  // the related resources will also be fetched as kind=table.
   164  func (c *Client) Get(resources ResourceList, related bool) (map[string][]runtime.Object, error) {
   165  	buf := new(bytes.Buffer)
   166  	objs := make(map[string][]runtime.Object)
   167  
   168  	podSelectors := []map[string]string{}
   169  	err := resources.Visit(func(info *resource.Info, err error) error {
   170  		if err != nil {
   171  			return err
   172  		}
   173  
   174  		gvk := info.ResourceMapping().GroupVersionKind
   175  		vk := gvk.Version + "/" + gvk.Kind
   176  		obj, err := getResource(info)
   177  		if err != nil {
   178  			fmt.Fprintf(buf, "Get resource %s failed, err:%v\n", info.Name, err)
   179  		} else {
   180  			objs[vk] = append(objs[vk], obj)
   181  
   182  			// Only fetch related pods if they are requested
   183  			if related {
   184  				// Discover if the existing object is a table. If it is, request
   185  				// the pods as Tables. Otherwise request them normally.
   186  				objGVK := obj.GetObjectKind().GroupVersionKind()
   187  				var isTable bool
   188  				if objGVK.Kind == "Table" {
   189  					isTable = true
   190  				}
   191  
   192  				objs, err = c.getSelectRelationPod(info, objs, isTable, &podSelectors)
   193  				if err != nil {
   194  					c.Log("Warning: get the relation pod is failed, err:%s", err.Error())
   195  				}
   196  			}
   197  		}
   198  
   199  		return nil
   200  	})
   201  	if err != nil {
   202  		return nil, err
   203  	}
   204  
   205  	return objs, nil
   206  }
   207  
   208  func (c *Client) getSelectRelationPod(info *resource.Info, objs map[string][]runtime.Object, table bool, podSelectors *[]map[string]string) (map[string][]runtime.Object, error) {
   209  	if info == nil {
   210  		return objs, nil
   211  	}
   212  	c.Log("get relation pod of object: %s/%s/%s", info.Namespace, info.Mapping.GroupVersionKind.Kind, info.Name)
   213  	selector, ok, _ := getSelectorFromObject(info.Object)
   214  	if !ok {
   215  		return objs, nil
   216  	}
   217  
   218  	for index := range *podSelectors {
   219  		if reflect.DeepEqual((*podSelectors)[index], selector) {
   220  			// check if pods for selectors are already added. This avoids duplicate printing of pods
   221  			return objs, nil
   222  		}
   223  	}
   224  
   225  	*podSelectors = append(*podSelectors, selector)
   226  
   227  	var infos []*resource.Info
   228  	var err error
   229  	if table {
   230  		infos, err = c.Factory.NewBuilder().
   231  			Unstructured().
   232  			ContinueOnError().
   233  			NamespaceParam(info.Namespace).
   234  			DefaultNamespace().
   235  			ResourceTypes("pods").
   236  			LabelSelector(labels.Set(selector).AsSelector().String()).
   237  			TransformRequests(transformRequests).
   238  			Do().Infos()
   239  		if err != nil {
   240  			return objs, err
   241  		}
   242  	} else {
   243  		infos, err = c.Factory.NewBuilder().
   244  			Unstructured().
   245  			ContinueOnError().
   246  			NamespaceParam(info.Namespace).
   247  			DefaultNamespace().
   248  			ResourceTypes("pods").
   249  			LabelSelector(labels.Set(selector).AsSelector().String()).
   250  			Do().Infos()
   251  		if err != nil {
   252  			return objs, err
   253  		}
   254  	}
   255  	vk := "v1/Pod(related)"
   256  
   257  	for _, info := range infos {
   258  		objs[vk] = append(objs[vk], info.Object)
   259  	}
   260  	return objs, nil
   261  }
   262  
   263  func getSelectorFromObject(obj runtime.Object) (map[string]string, bool, error) {
   264  	typed := obj.(*unstructured.Unstructured)
   265  	kind := typed.Object["kind"]
   266  	switch kind {
   267  	case "ReplicaSet", "Deployment", "StatefulSet", "DaemonSet", "Job":
   268  		return unstructured.NestedStringMap(typed.Object, "spec", "selector", "matchLabels")
   269  	case "ReplicationController":
   270  		return unstructured.NestedStringMap(typed.Object, "spec", "selector")
   271  	default:
   272  		return nil, false, nil
   273  	}
   274  }
   275  
   276  func getResource(info *resource.Info) (runtime.Object, error) {
   277  	obj, err := resource.NewHelper(info.Client, info.Mapping).Get(info.Namespace, info.Name)
   278  	if err != nil {
   279  		return nil, err
   280  	}
   281  	return obj, nil
   282  }
   283  
   284  // Wait waits up to the given timeout for the specified resources to be ready.
   285  func (c *Client) Wait(resources ResourceList, timeout time.Duration) error {
   286  	cs, err := c.getKubeClient()
   287  	if err != nil {
   288  		return err
   289  	}
   290  	checker := NewReadyChecker(cs, c.Log, PausedAsReady(true))
   291  	w := waiter{
   292  		c:       checker,
   293  		log:     c.Log,
   294  		timeout: timeout,
   295  	}
   296  	return w.waitForResources(resources)
   297  }
   298  
   299  // WaitWithJobs wait up to the given timeout for the specified resources to be ready, including jobs.
   300  func (c *Client) WaitWithJobs(resources ResourceList, timeout time.Duration) error {
   301  	cs, err := c.getKubeClient()
   302  	if err != nil {
   303  		return err
   304  	}
   305  	checker := NewReadyChecker(cs, c.Log, PausedAsReady(true), CheckJobs(true))
   306  	w := waiter{
   307  		c:       checker,
   308  		log:     c.Log,
   309  		timeout: timeout,
   310  	}
   311  	return w.waitForResources(resources)
   312  }
   313  
   314  // WaitForDelete wait up to the given timeout for the specified resources to be deleted.
   315  func (c *Client) WaitForDelete(resources ResourceList, timeout time.Duration) error {
   316  	w := waiter{
   317  		log:     c.Log,
   318  		timeout: timeout,
   319  	}
   320  	return w.waitForDeletedResources(resources)
   321  }
   322  
   323  func (c *Client) namespace() string {
   324  	if c.Namespace != "" {
   325  		return c.Namespace
   326  	}
   327  	if ns, _, err := c.Factory.ToRawKubeConfigLoader().Namespace(); err == nil {
   328  		return ns
   329  	}
   330  	return v1.NamespaceDefault
   331  }
   332  
   333  // newBuilder returns a new resource builder for structured api objects.
   334  func (c *Client) newBuilder() *resource.Builder {
   335  	return c.Factory.NewBuilder().
   336  		ContinueOnError().
   337  		NamespaceParam(c.namespace()).
   338  		DefaultNamespace().
   339  		Flatten()
   340  }
   341  
   342  // Build validates for Kubernetes objects and returns unstructured infos.
   343  func (c *Client) Build(reader io.Reader, validate bool) (ResourceList, error) {
   344  	validationDirective := metav1.FieldValidationIgnore
   345  	if validate {
   346  		validationDirective = metav1.FieldValidationStrict
   347  	}
   348  
   349  	schema, err := c.Factory.Validator(validationDirective)
   350  	if err != nil {
   351  		return nil, err
   352  	}
   353  	result, err := c.newBuilder().
   354  		Unstructured().
   355  		Schema(schema).
   356  		Stream(reader, "").
   357  		Do().Infos()
   358  	return result, scrubValidationError(err)
   359  }
   360  
   361  // BuildTable validates for Kubernetes objects and returns unstructured infos.
   362  // The returned kind is a Table.
   363  func (c *Client) BuildTable(reader io.Reader, validate bool) (ResourceList, error) {
   364  	validationDirective := metav1.FieldValidationIgnore
   365  	if validate {
   366  		validationDirective = metav1.FieldValidationStrict
   367  	}
   368  
   369  	schema, err := c.Factory.Validator(validationDirective)
   370  	if err != nil {
   371  		return nil, err
   372  	}
   373  	result, err := c.newBuilder().
   374  		Unstructured().
   375  		Schema(schema).
   376  		Stream(reader, "").
   377  		TransformRequests(transformRequests).
   378  		Do().Infos()
   379  	return result, scrubValidationError(err)
   380  }
   381  
   382  // Update takes the current list of objects and target list of objects and
   383  // creates resources that don't already exist, updates resources that have been
   384  // modified in the target configuration, and deletes resources from the current
   385  // configuration that are not present in the target configuration. If an error
   386  // occurs, a Result will still be returned with the error, containing all
   387  // resource updates, creations, and deletions that were attempted. These can be
   388  // used for cleanup or other logging purposes.
   389  func (c *Client) Update(original, target ResourceList, force bool) (*Result, error) {
   390  	updateErrors := []string{}
   391  	res := &Result{}
   392  
   393  	c.Log("checking %d resources for changes", len(target))
   394  	err := target.Visit(func(info *resource.Info, err error) error {
   395  		if err != nil {
   396  			return err
   397  		}
   398  
   399  		helper := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager())
   400  		if _, err := helper.Get(info.Namespace, info.Name); err != nil {
   401  			if !apierrors.IsNotFound(err) {
   402  				return errors.Wrap(err, "could not get information about the resource")
   403  			}
   404  
   405  			// Append the created resource to the results, even if something fails
   406  			res.Created = append(res.Created, info)
   407  
   408  			// Since the resource does not exist, create it.
   409  			if err := createResource(info); err != nil {
   410  				return errors.Wrap(err, "failed to create resource")
   411  			}
   412  
   413  			kind := info.Mapping.GroupVersionKind.Kind
   414  			c.Log("Created a new %s called %q in %s\n", kind, info.Name, info.Namespace)
   415  			return nil
   416  		}
   417  
   418  		originalInfo := original.Get(info)
   419  		if originalInfo == nil {
   420  			kind := info.Mapping.GroupVersionKind.Kind
   421  			return errors.Errorf("no %s with the name %q found", kind, info.Name)
   422  		}
   423  
   424  		if err := updateResource(c, info, originalInfo.Object, force); err != nil {
   425  			c.Log("error updating the resource %q:\n\t %v", info.Name, err)
   426  			updateErrors = append(updateErrors, err.Error())
   427  		}
   428  		// Because we check for errors later, append the info regardless
   429  		res.Updated = append(res.Updated, info)
   430  
   431  		return nil
   432  	})
   433  
   434  	switch {
   435  	case err != nil:
   436  		return res, err
   437  	case len(updateErrors) != 0:
   438  		return res, errors.Errorf(strings.Join(updateErrors, " && "))
   439  	}
   440  
   441  	for _, info := range original.Difference(target) {
   442  		c.Log("Deleting %s %q in namespace %s...", info.Mapping.GroupVersionKind.Kind, info.Name, info.Namespace)
   443  
   444  		if err := info.Get(); err != nil {
   445  			c.Log("Unable to get obj %q, err: %s", info.Name, err)
   446  			continue
   447  		}
   448  		annotations, err := metadataAccessor.Annotations(info.Object)
   449  		if err != nil {
   450  			c.Log("Unable to get annotations on %q, err: %s", info.Name, err)
   451  		}
   452  		if annotations != nil && annotations[ResourcePolicyAnno] == KeepPolicy {
   453  			c.Log("Skipping delete of %q due to annotation [%s=%s]", info.Name, ResourcePolicyAnno, KeepPolicy)
   454  			continue
   455  		}
   456  		if err := deleteResource(info, metav1.DeletePropagationBackground); err != nil {
   457  			c.Log("Failed to delete %q, err: %s", info.ObjectName(), err)
   458  			continue
   459  		}
   460  		res.Deleted = append(res.Deleted, info)
   461  	}
   462  	return res, nil
   463  }
   464  
   465  // Delete deletes Kubernetes resources specified in the resources list with
   466  // background cascade deletion. It will attempt to delete all resources even
   467  // if one or more fail and collect any errors. All successfully deleted items
   468  // will be returned in the `Deleted` ResourceList that is part of the result.
   469  func (c *Client) Delete(resources ResourceList) (*Result, []error) {
   470  	return rdelete(c, resources, metav1.DeletePropagationBackground)
   471  }
   472  
   473  // Delete deletes Kubernetes resources specified in the resources list with
   474  // given deletion propagation policy. It will attempt to delete all resources even
   475  // if one or more fail and collect any errors. All successfully deleted items
   476  // will be returned in the `Deleted` ResourceList that is part of the result.
   477  func (c *Client) DeleteWithPropagationPolicy(resources ResourceList, policy metav1.DeletionPropagation) (*Result, []error) {
   478  	return rdelete(c, resources, policy)
   479  }
   480  
   481  func rdelete(c *Client, resources ResourceList, propagation metav1.DeletionPropagation) (*Result, []error) {
   482  	var errs []error
   483  	res := &Result{}
   484  	mtx := sync.Mutex{}
   485  	err := perform(resources, func(info *resource.Info) error {
   486  		c.Log("Starting delete for %q %s", info.Name, info.Mapping.GroupVersionKind.Kind)
   487  		err := deleteResource(info, propagation)
   488  		if err == nil || apierrors.IsNotFound(err) {
   489  			if err != nil {
   490  				c.Log("Ignoring delete failure for %q %s: %v", info.Name, info.Mapping.GroupVersionKind, err)
   491  			}
   492  			mtx.Lock()
   493  			defer mtx.Unlock()
   494  			res.Deleted = append(res.Deleted, info)
   495  			return nil
   496  		}
   497  		mtx.Lock()
   498  		defer mtx.Unlock()
   499  		// Collect the error and continue on
   500  		errs = append(errs, err)
   501  		return nil
   502  	})
   503  	if err != nil {
   504  		if errors.Is(err, ErrNoObjectsVisited) {
   505  			err = fmt.Errorf("object not found, skipping delete: %w", err)
   506  		}
   507  		errs = append(errs, err)
   508  	}
   509  	if errs != nil {
   510  		return nil, errs
   511  	}
   512  	return res, nil
   513  }
   514  
   515  func (c *Client) watchTimeout(t time.Duration) func(*resource.Info) error {
   516  	return func(info *resource.Info) error {
   517  		return c.watchUntilReady(t, info)
   518  	}
   519  }
   520  
   521  // WatchUntilReady watches the resources given and waits until it is ready.
   522  //
   523  // This method is mainly for hook implementations. It watches for a resource to
   524  // hit a particular milestone. The milestone depends on the Kind.
   525  //
   526  // For most kinds, it checks to see if the resource is marked as Added or Modified
   527  // by the Kubernetes event stream. For some kinds, it does more:
   528  //
   529  //   - Jobs: A job is marked "Ready" when it has successfully completed. This is
   530  //     ascertained by watching the Status fields in a job's output.
   531  //   - Pods: A pod is marked "Ready" when it has successfully completed. This is
   532  //     ascertained by watching the status.phase field in a pod's output.
   533  //
   534  // Handling for other kinds will be added as necessary.
   535  func (c *Client) WatchUntilReady(resources ResourceList, timeout time.Duration) error {
   536  	// For jobs, there's also the option to do poll c.Jobs(namespace).Get():
   537  	// https://github.com/adamreese/kubernetes/blob/master/test/e2e/job.go#L291-L300
   538  	return perform(resources, c.watchTimeout(timeout))
   539  }
   540  
   541  func perform(infos ResourceList, fn func(*resource.Info) error) error {
   542  	var result error
   543  
   544  	if len(infos) == 0 {
   545  		return ErrNoObjectsVisited
   546  	}
   547  
   548  	errs := make(chan error)
   549  	go batchPerform(infos, fn, errs)
   550  
   551  	for range infos {
   552  		err := <-errs
   553  		if err != nil {
   554  			result = multierror.Append(result, err)
   555  		}
   556  	}
   557  
   558  	return result
   559  }
   560  
   561  // getManagedFieldsManager returns the manager string. If one was set it will be returned.
   562  // Otherwise, one is calculated based on the name of the binary.
   563  func getManagedFieldsManager() string {
   564  
   565  	// When a manager is explicitly set use it
   566  	if ManagedFieldsManager != "" {
   567  		return ManagedFieldsManager
   568  	}
   569  
   570  	// When no manager is set and no calling application can be found it is unknown
   571  	if len(os.Args[0]) == 0 {
   572  		return "unknown"
   573  	}
   574  
   575  	// When there is an application that can be determined and no set manager
   576  	// use the base name. This is one of the ways Kubernetes libs handle figuring
   577  	// names out.
   578  	return filepath.Base(os.Args[0])
   579  }
   580  
   581  func batchPerform(infos ResourceList, fn func(*resource.Info) error, errs chan<- error) {
   582  	var kind string
   583  	var wg sync.WaitGroup
   584  	for _, info := range infos {
   585  		currentKind := info.Object.GetObjectKind().GroupVersionKind().Kind
   586  		if kind != currentKind {
   587  			wg.Wait()
   588  			kind = currentKind
   589  		}
   590  		wg.Add(1)
   591  		go func(i *resource.Info) {
   592  			errs <- fn(i)
   593  			wg.Done()
   594  		}(info)
   595  	}
   596  }
   597  
   598  func createResource(info *resource.Info) error {
   599  	obj, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).Create(info.Namespace, true, info.Object)
   600  	if err != nil {
   601  		return err
   602  	}
   603  	return info.Refresh(obj, true)
   604  }
   605  
   606  func deleteResource(info *resource.Info, policy metav1.DeletionPropagation) error {
   607  	opts := &metav1.DeleteOptions{PropagationPolicy: &policy}
   608  	_, err := resource.NewHelper(info.Client, info.Mapping).WithFieldManager(getManagedFieldsManager()).DeleteWithOptions(info.Namespace, info.Name, opts)
   609  	return err
   610  }
   611  
   612  func createPatch(target *resource.Info, current runtime.Object) ([]byte, types.PatchType, error) {
   613  	oldData, err := json.Marshal(current)
   614  	if err != nil {
   615  		return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing current configuration")
   616  	}
   617  	newData, err := json.Marshal(target.Object)
   618  	if err != nil {
   619  		return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing target configuration")
   620  	}
   621  
   622  	// Fetch the current object for the three way merge
   623  	helper := resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
   624  	currentObj, err := helper.Get(target.Namespace, target.Name)
   625  	if err != nil && !apierrors.IsNotFound(err) {
   626  		return nil, types.StrategicMergePatchType, errors.Wrapf(err, "unable to get data for current object %s/%s", target.Namespace, target.Name)
   627  	}
   628  
   629  	// Even if currentObj is nil (because it was not found), it will marshal just fine
   630  	currentData, err := json.Marshal(currentObj)
   631  	if err != nil {
   632  		return nil, types.StrategicMergePatchType, errors.Wrap(err, "serializing live configuration")
   633  	}
   634  
   635  	// Get a versioned object
   636  	versionedObject := AsVersioned(target)
   637  
   638  	// Unstructured objects, such as CRDs, may not have an not registered error
   639  	// returned from ConvertToVersion. Anything that's unstructured should
   640  	// use the jsonpatch.CreateMergePatch. Strategic Merge Patch is not supported
   641  	// on objects like CRDs.
   642  	_, isUnstructured := versionedObject.(runtime.Unstructured)
   643  
   644  	// On newer K8s versions, CRDs aren't unstructured but has this dedicated type
   645  	_, isCRD := versionedObject.(*apiextv1beta1.CustomResourceDefinition)
   646  
   647  	if isUnstructured || isCRD {
   648  		// fall back to generic JSON merge patch
   649  		patch, err := jsonpatch.CreateMergePatch(oldData, newData)
   650  		return patch, types.MergePatchType, err
   651  	}
   652  
   653  	patchMeta, err := strategicpatch.NewPatchMetaFromStruct(versionedObject)
   654  	if err != nil {
   655  		return nil, types.StrategicMergePatchType, errors.Wrap(err, "unable to create patch metadata from object")
   656  	}
   657  
   658  	patch, err := strategicpatch.CreateThreeWayMergePatch(oldData, newData, currentData, patchMeta, true)
   659  	return patch, types.StrategicMergePatchType, err
   660  }
   661  
   662  func updateResource(c *Client, target *resource.Info, currentObj runtime.Object, force bool) error {
   663  	var (
   664  		obj    runtime.Object
   665  		helper = resource.NewHelper(target.Client, target.Mapping).WithFieldManager(getManagedFieldsManager())
   666  		kind   = target.Mapping.GroupVersionKind.Kind
   667  	)
   668  
   669  	// if --force is applied, attempt to replace the existing resource with the new object.
   670  	if force {
   671  		var err error
   672  		obj, err = helper.Replace(target.Namespace, target.Name, true, target.Object)
   673  		if err != nil {
   674  			return errors.Wrap(err, "failed to replace object")
   675  		}
   676  		c.Log("Replaced %q with kind %s for kind %s", target.Name, currentObj.GetObjectKind().GroupVersionKind().Kind, kind)
   677  	} else {
   678  		patch, patchType, err := createPatch(target, currentObj)
   679  		if err != nil {
   680  			return errors.Wrap(err, "failed to create patch")
   681  		}
   682  
   683  		if patch == nil || string(patch) == "{}" {
   684  			c.Log("Looks like there are no changes for %s %q", kind, target.Name)
   685  			// This needs to happen to make sure that Helm has the latest info from the API
   686  			// Otherwise there will be no labels and other functions that use labels will panic
   687  			if err := target.Get(); err != nil {
   688  				return errors.Wrap(err, "failed to refresh resource information")
   689  			}
   690  			return nil
   691  		}
   692  		// send patch to server
   693  		c.Log("Patch %s %q in namespace %s", kind, target.Name, target.Namespace)
   694  		obj, err = helper.Patch(target.Namespace, target.Name, patchType, patch, nil)
   695  		if err != nil {
   696  			return errors.Wrapf(err, "cannot patch %q with kind %s", target.Name, kind)
   697  		}
   698  	}
   699  
   700  	target.Refresh(obj, true)
   701  	return nil
   702  }
   703  
   704  func (c *Client) watchUntilReady(timeout time.Duration, info *resource.Info) error {
   705  	kind := info.Mapping.GroupVersionKind.Kind
   706  	switch kind {
   707  	case "Job", "Pod":
   708  	default:
   709  		return nil
   710  	}
   711  
   712  	c.Log("Watching for changes to %s %s with timeout of %v", kind, info.Name, timeout)
   713  
   714  	// Use a selector on the name of the resource. This should be unique for the
   715  	// given version and kind
   716  	selector, err := fields.ParseSelector(fmt.Sprintf("metadata.name=%s", info.Name))
   717  	if err != nil {
   718  		return err
   719  	}
   720  	lw := cachetools.NewListWatchFromClient(info.Client, info.Mapping.Resource.Resource, info.Namespace, selector)
   721  
   722  	// What we watch for depends on the Kind.
   723  	// - For a Job, we watch for completion.
   724  	// - For all else, we watch until Ready.
   725  	// In the future, we might want to add some special logic for types
   726  	// like Ingress, Volume, etc.
   727  
   728  	ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
   729  	defer cancel()
   730  	_, err = watchtools.UntilWithSync(ctx, lw, &unstructured.Unstructured{}, nil, func(e watch.Event) (bool, error) {
   731  		// Make sure the incoming object is versioned as we use unstructured
   732  		// objects when we build manifests
   733  		obj := convertWithMapper(e.Object, info.Mapping)
   734  		switch e.Type {
   735  		case watch.Added, watch.Modified:
   736  			// For things like a secret or a config map, this is the best indicator
   737  			// we get. We care mostly about jobs, where what we want to see is
   738  			// the status go into a good state. For other types, like ReplicaSet
   739  			// we don't really do anything to support these as hooks.
   740  			c.Log("Add/Modify event for %s: %v", info.Name, e.Type)
   741  			switch kind {
   742  			case "Job":
   743  				return c.waitForJob(obj, info.Name)
   744  			case "Pod":
   745  				return c.waitForPodSuccess(obj, info.Name)
   746  			}
   747  			return true, nil
   748  		case watch.Deleted:
   749  			c.Log("Deleted event for %s", info.Name)
   750  			return true, nil
   751  		case watch.Error:
   752  			// Handle error and return with an error.
   753  			c.Log("Error event for %s", info.Name)
   754  			return true, errors.Errorf("failed to deploy %s", info.Name)
   755  		default:
   756  			return false, nil
   757  		}
   758  	})
   759  	return err
   760  }
   761  
   762  // waitForJob is a helper that waits for a job to complete.
   763  //
   764  // This operates on an event returned from a watcher.
   765  func (c *Client) waitForJob(obj runtime.Object, name string) (bool, error) {
   766  	o, ok := obj.(*batch.Job)
   767  	if !ok {
   768  		return true, errors.Errorf("expected %s to be a *batch.Job, got %T", name, obj)
   769  	}
   770  
   771  	for _, c := range o.Status.Conditions {
   772  		if c.Type == batch.JobComplete && c.Status == "True" {
   773  			return true, nil
   774  		} else if c.Type == batch.JobFailed && c.Status == "True" {
   775  			return true, errors.Errorf("job %s failed: %s", name, c.Reason)
   776  		}
   777  	}
   778  
   779  	c.Log("%s: Jobs active: %d, jobs failed: %d, jobs succeeded: %d", name, o.Status.Active, o.Status.Failed, o.Status.Succeeded)
   780  	return false, nil
   781  }
   782  
   783  // waitForPodSuccess is a helper that waits for a pod to complete.
   784  //
   785  // This operates on an event returned from a watcher.
   786  func (c *Client) waitForPodSuccess(obj runtime.Object, name string) (bool, error) {
   787  	o, ok := obj.(*v1.Pod)
   788  	if !ok {
   789  		return true, errors.Errorf("expected %s to be a *v1.Pod, got %T", name, obj)
   790  	}
   791  
   792  	switch o.Status.Phase {
   793  	case v1.PodSucceeded:
   794  		c.Log("Pod %s succeeded", o.Name)
   795  		return true, nil
   796  	case v1.PodFailed:
   797  		return true, errors.Errorf("pod %s failed", o.Name)
   798  	case v1.PodPending:
   799  		c.Log("Pod %s pending", o.Name)
   800  	case v1.PodRunning:
   801  		c.Log("Pod %s running", o.Name)
   802  	}
   803  
   804  	return false, nil
   805  }
   806  
   807  // scrubValidationError removes kubectl info from the message.
   808  func scrubValidationError(err error) error {
   809  	if err == nil {
   810  		return nil
   811  	}
   812  	const stopValidateMessage = "if you choose to ignore these errors, turn validation off with --validate=false"
   813  
   814  	if strings.Contains(err.Error(), stopValidateMessage) {
   815  		return errors.New(strings.ReplaceAll(err.Error(), "; "+stopValidateMessage, ""))
   816  	}
   817  	return err
   818  }
   819  
   820  // WaitAndGetCompletedPodPhase waits up to a timeout until a pod enters a completed phase
   821  // and returns said phase (PodSucceeded or PodFailed qualify).
   822  func (c *Client) WaitAndGetCompletedPodPhase(name string, timeout time.Duration) (v1.PodPhase, error) {
   823  	client, err := c.getKubeClient()
   824  	if err != nil {
   825  		return v1.PodUnknown, err
   826  	}
   827  	to := int64(timeout)
   828  	watcher, err := client.CoreV1().Pods(c.namespace()).Watch(context.Background(), metav1.ListOptions{
   829  		FieldSelector:  fmt.Sprintf("metadata.name=%s", name),
   830  		TimeoutSeconds: &to,
   831  	})
   832  	if err != nil {
   833  		return v1.PodUnknown, err
   834  	}
   835  
   836  	for event := range watcher.ResultChan() {
   837  		p, ok := event.Object.(*v1.Pod)
   838  		if !ok {
   839  			return v1.PodUnknown, fmt.Errorf("%s not a pod", name)
   840  		}
   841  		switch p.Status.Phase {
   842  		case v1.PodFailed:
   843  			return v1.PodFailed, nil
   844  		case v1.PodSucceeded:
   845  			return v1.PodSucceeded, nil
   846  		}
   847  	}
   848  
   849  	return v1.PodUnknown, err
   850  }
   851  

View as plain text