...

Source file src/sigs.k8s.io/cli-utils/pkg/apply/mutator/apply_time_mutator.go

Documentation: sigs.k8s.io/cli-utils/pkg/apply/mutator

     1  // Copyright 2021 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package mutator
     5  
     6  import (
     7  	"context"
     8  	"encoding/json"
     9  	"errors"
    10  	"fmt"
    11  	"strings"
    12  
    13  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    14  	"k8s.io/apimachinery/pkg/api/meta"
    15  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    16  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    17  	"k8s.io/client-go/dynamic"
    18  	"k8s.io/klog/v2"
    19  	"sigs.k8s.io/cli-utils/pkg/apply/cache"
    20  	"sigs.k8s.io/cli-utils/pkg/jsonpath"
    21  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    22  	"sigs.k8s.io/cli-utils/pkg/object"
    23  	"sigs.k8s.io/cli-utils/pkg/object/mutation"
    24  )
    25  
    26  // ApplyTimeMutator mutates an object by injecting values specified by the
    27  // apply-time-mutation annotation.
    28  // The optional ResourceCache will be used to speed up source object lookups,
    29  // if specified.
    30  // Implements the Mutator interface
    31  type ApplyTimeMutator struct {
    32  	Client        dynamic.Interface
    33  	Mapper        meta.RESTMapper
    34  	ResourceCache cache.ResourceCache
    35  }
    36  
    37  // Name returns a mutator identifier for logging.
    38  func (atm *ApplyTimeMutator) Name() string {
    39  	return "ApplyTimeMutator"
    40  }
    41  
    42  // Mutate parses the apply-time-mutation annotation and loops through the
    43  // substitutions, applying each of them to the supplied target object.
    44  // Returns true with a reason, if mutation was performed.
    45  func (atm *ApplyTimeMutator) Mutate(ctx context.Context, obj *unstructured.Unstructured) (bool, string, error) {
    46  	mutated := false
    47  	reason := ""
    48  
    49  	targetRef := mutation.ResourceReferenceFromUnstructured(obj)
    50  
    51  	if !mutation.HasAnnotation(obj) {
    52  		return mutated, reason, nil
    53  	}
    54  
    55  	subs, err := mutation.ReadAnnotation(obj)
    56  	if err != nil {
    57  		return mutated, reason, fmt.Errorf("failed to read annotation in object (%s): %w", targetRef, err)
    58  	}
    59  
    60  	klog.V(4).Infof("target object: %s", targetRef)
    61  	klog.V(7).Infof("target object YAML:\n%s", object.YamlStringer{O: obj})
    62  
    63  	// validate no self-references
    64  	// Early validation to avoid GETs, but won't catch sources with implicit namespace.
    65  	for _, sub := range subs {
    66  		if targetRef.Equal(sub.SourceRef) {
    67  			return mutated, reason, fmt.Errorf("invalid self-reference (%s)", sub.SourceRef)
    68  		}
    69  	}
    70  
    71  	for _, sub := range subs {
    72  		sourceRef := sub.SourceRef
    73  
    74  		// lookup REST mapping
    75  		sourceMapping, err := atm.getMapping(sourceRef)
    76  		if err != nil {
    77  			return mutated, reason, fmt.Errorf("failed to identify source object mapping (%s): %w", sourceRef, err)
    78  		}
    79  
    80  		// Default source namespace to target namesapce, if namespace-scoped
    81  		if sourceRef.Namespace == "" && sourceMapping.Scope.Name() == meta.RESTScopeNameNamespace {
    82  			sourceRef.Namespace = targetRef.Namespace
    83  		}
    84  
    85  		// validate no self-references
    86  		// Re-check to catch sources with implicit namespace.
    87  		if targetRef.Equal(sub.SourceRef) {
    88  			return mutated, reason, fmt.Errorf("invalid self-reference (%s)", sub.SourceRef)
    89  		}
    90  
    91  		// lookup source object from cache or cluster
    92  		sourceObj, err := atm.getObject(ctx, sourceMapping, sourceRef)
    93  		if err != nil {
    94  			return mutated, reason, fmt.Errorf("failed to get source object (%s): %w", sourceRef, err)
    95  		}
    96  
    97  		klog.V(4).Infof("source object: %s", sourceRef)
    98  		klog.V(7).Infof("source object YAML:\n%s", object.YamlStringer{O: sourceObj})
    99  
   100  		// lookup target field in target object
   101  		targetValue, _, err := readFieldValue(obj, sub.TargetPath)
   102  		if err != nil {
   103  			return mutated, reason, fmt.Errorf("failed to read field (%s) from target object (%s): %w", sub.TargetPath, targetRef, err)
   104  		}
   105  
   106  		// lookup source field in source object
   107  		sourceValue, found, err := readFieldValue(sourceObj, sub.SourcePath)
   108  		if err != nil {
   109  			return mutated, reason, fmt.Errorf("failed to read field (%s) from source object (%s): %w", sub.SourcePath, sourceRef, err)
   110  		}
   111  		if !found {
   112  			return mutated, reason, fmt.Errorf("source field (%s) not present in source object (%s)", sub.SourcePath, sourceRef)
   113  		}
   114  
   115  		var newValue interface{}
   116  		if sub.Token == "" {
   117  			// token not specified, replace the entire target value with the source value
   118  			newValue = sourceValue
   119  		} else {
   120  			// token specified, substitute token for source field value in target field value
   121  			targetValueString, ok := targetValue.(string)
   122  			if !ok {
   123  				return mutated, reason, fmt.Errorf("token is specified, but target field value is %T, expected string", targetValue)
   124  			}
   125  
   126  			sourceValueString, err := valueToString(sourceValue)
   127  			if err != nil {
   128  				return mutated, reason, fmt.Errorf("failed to stringify source field value (%s): %w", targetRef, err)
   129  			}
   130  
   131  			// Substitute token for source field value, if present.
   132  			// If not present, do nothing. This is common on updates.
   133  			newValue = strings.ReplaceAll(targetValueString, sub.Token, sourceValueString)
   134  		}
   135  
   136  		klog.V(5).Infof("substitution: targetRef=(%s), sourceRef=(%s): sourceValue=(%v), token=(%s), oldTargetValue=(%v), newTargetValue=(%v)",
   137  			targetRef, sourceRef, sourceValue, sub.Token, targetValue, newValue)
   138  
   139  		// update target field in target object
   140  		err = writeFieldValue(obj, sub.TargetPath, newValue)
   141  		if err != nil {
   142  			return mutated, reason, fmt.Errorf("failed to set field in target object (%s): %w", targetRef, err)
   143  		}
   144  
   145  		mutated = true
   146  		reason = fmt.Sprintf("object contained annotation: %s", mutation.Annotation)
   147  	}
   148  
   149  	if mutated {
   150  		klog.V(4).Infof("mutated target object: %s", targetRef)
   151  		klog.V(7).Infof("mutated target object YAML:\n%s", object.YamlStringer{O: obj})
   152  	}
   153  
   154  	return mutated, reason, nil
   155  }
   156  
   157  func (atm *ApplyTimeMutator) getMapping(ref mutation.ResourceReference) (*meta.RESTMapping, error) {
   158  	// lookup object using group api version, if specified
   159  	sourceGvk := ref.GroupVersionKind()
   160  	var mapping *meta.RESTMapping
   161  	var err error
   162  	if sourceGvk.Version != "" {
   163  		mapping, err = atm.Mapper.RESTMapping(sourceGvk.GroupKind(), sourceGvk.Version)
   164  	} else {
   165  		mapping, err = atm.Mapper.RESTMapping(sourceGvk.GroupKind())
   166  	}
   167  	if err != nil {
   168  		return nil, err
   169  	}
   170  	return mapping, nil
   171  }
   172  
   173  // getObject returns a cached object, if cached and cache exists, otherwise
   174  // the object is retrieved from the cluster.
   175  func (atm *ApplyTimeMutator) getObject(ctx context.Context, mapping *meta.RESTMapping, ref mutation.ResourceReference) (*unstructured.Unstructured, error) {
   176  	// validate source object
   177  	if ref.Name == "" {
   178  		return nil, fmt.Errorf("invalid source object: empty name")
   179  	}
   180  	if ref.Kind == "" {
   181  		return nil, fmt.Errorf("invalid source object: empty kind")
   182  	}
   183  	id := ref.ToObjMetadata()
   184  
   185  	// get object from cache
   186  	if atm.ResourceCache != nil {
   187  		result := atm.ResourceCache.Get(id)
   188  		// Use the cached version, if current/reconciled.
   189  		// Otherwise, get it from the cluster.
   190  		if result.Resource != nil && result.Status == status.CurrentStatus {
   191  			return result.Resource, nil
   192  		}
   193  	}
   194  
   195  	// get object from cluster
   196  	namespacedClient := atm.Client.Resource(mapping.Resource).Namespace(ref.Namespace)
   197  	obj, err := namespacedClient.Get(ctx, ref.Name, metav1.GetOptions{})
   198  	if err != nil && !apierrors.IsNotFound(err) {
   199  		// Skip NotFound so the cache gets updated.
   200  		return nil, fmt.Errorf("failed to retrieve object from cluster: %w", err)
   201  	}
   202  
   203  	// add object to cache
   204  	if atm.ResourceCache != nil {
   205  		// If it's not cached or not current, update the cache.
   206  		// This will add external objects to the cache,
   207  		// but the user won't get status events for them.
   208  		atm.ResourceCache.Put(id, computeStatus(obj))
   209  	}
   210  
   211  	if err != nil {
   212  		// NotFound
   213  		return nil, fmt.Errorf("object not found: %w", err)
   214  	}
   215  
   216  	return obj, nil
   217  }
   218  
   219  // computeStatus compares the spec to the status and returns the result.
   220  func computeStatus(obj *unstructured.Unstructured) cache.ResourceStatus {
   221  	if obj == nil {
   222  		return cache.ResourceStatus{
   223  			Resource:      obj,
   224  			Status:        status.NotFoundStatus,
   225  			StatusMessage: "Object not found",
   226  		}
   227  	}
   228  	result, err := status.Compute(obj)
   229  	if err != nil {
   230  		if klog.V(3).Enabled() {
   231  			ref := mutation.ResourceReferenceFromUnstructured(obj)
   232  			klog.Info("failed to compute object status (%s): %d", ref, err)
   233  		}
   234  		return cache.ResourceStatus{
   235  			Resource: obj,
   236  			Status:   status.UnknownStatus,
   237  			//StatusMessage: fmt.Sprintf("Failed to compute status: %s", err),
   238  		}
   239  	}
   240  	return cache.ResourceStatus{
   241  		Resource:      obj,
   242  		Status:        result.Status,
   243  		StatusMessage: result.Message,
   244  	}
   245  }
   246  
   247  func readFieldValue(obj *unstructured.Unstructured, path string) (interface{}, bool, error) {
   248  	if path == "" {
   249  		return nil, false, errors.New("empty path expression")
   250  	}
   251  
   252  	values, err := jsonpath.Get(obj.Object, path)
   253  	if err != nil {
   254  		return nil, false, err
   255  	}
   256  	if len(values) != 1 {
   257  		return nil, false, fmt.Errorf("expected 1 match, but found %d)", len(values))
   258  	}
   259  	return values[0], true, nil
   260  }
   261  
   262  func writeFieldValue(obj *unstructured.Unstructured, path string, value interface{}) error {
   263  	if path == "" {
   264  		return errors.New("empty path expression")
   265  	}
   266  
   267  	found, err := jsonpath.Set(obj.Object, path, value)
   268  	if err != nil {
   269  		return err
   270  	}
   271  	if found != 1 {
   272  		return fmt.Errorf("expected 1 match, but found %d)", found)
   273  	}
   274  	return nil
   275  }
   276  
   277  // valueToString converts an interface{} to a string, formatting as json for
   278  // maps, lists. Designed to handle yaml/json/krm primitives.
   279  func valueToString(value interface{}) (string, error) {
   280  	var valueString string
   281  	switch valueTyped := value.(type) {
   282  	case string:
   283  		valueString = valueTyped
   284  	case int, int32, int64, float32, float64, bool:
   285  		valueString = fmt.Sprintf("%v", valueTyped)
   286  	default:
   287  		jsonBytes, err := json.Marshal(valueTyped)
   288  		if err != nil {
   289  			return "", fmt.Errorf("failed to marshal value to json: %#v", value)
   290  		}
   291  		valueString = string(jsonBytes)
   292  	}
   293  	return valueString, nil
   294  }
   295  

View as plain text