...

Source file src/github.com/datawire/ambassador/v2/cmd/reproducer/create.go

Documentation: github.com/datawire/ambassador/v2/cmd/reproducer

     1  package reproducer
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"path/filepath"
     8  	"reflect"
     9  	"sort"
    10  	"strings"
    11  	"time"
    12  
    13  	"github.com/pkg/errors"
    14  	"github.com/spf13/cobra"
    15  
    16  	"github.com/datawire/ambassador/v2/pkg/kates"
    17  	"github.com/datawire/dlib/dlog"
    18  )
    19  
    20  var createCmd = &cobra.Command{
    21  	Use:   "create ( <snapshot> | <extraction> | <archive> )",
    22  	Short: "create produces a working set of manifests based on snapshots and/or extractions produce by the extract subcommand",
    23  	Long: `The create subcommand is designed for creating high fidelity reproductions of a source cluster using edge-stack. All of the ambassador inputs are recreated exactly as they are in the source cluster. All the services in the source cluster are also recreated, but they are transformed to point to a single set of pods with a "service: reproducer" label. This allows for a high fidelity working reproduction of the source cluster without requiring access to any of the propriety deployments in the source cluster.
    24  
    25  The output of the create command can be passed directly to kubectl, e.g.:
    26  
    27      reproducer create sanitized.tgz | kubectl apply -f -
    28  
    29  You can also save the output and hand edit it should you need to tweak some of the details:
    30  
    31      reproducer create sanitized.tgz > repro.yaml
    32      vi repro.yaml
    33      kubectl apply -f repro.yaml
    34  `,
    35  	Args: cobra.ExactArgs(1),
    36  	RunE: create,
    37  }
    38  
    39  func create(cmd *cobra.Command, args []string) error {
    40  	ctx := cmd.Context()
    41  	filename := args[0]
    42  
    43  	extensions := map[string]bool{
    44  		".yml":  true,
    45  		".yaml": true,
    46  		".json": true,
    47  	}
    48  
    49  	repro := NewRepro()
    50  
    51  	err := search(filename, func(path, contentType, encoding string, content []byte) error {
    52  		base := filepath.Base(path)
    53  		ext := filepath.Ext(base)
    54  		name := base[:len(base)-len(ext)]
    55  
    56  		if !extensions[ext] {
    57  			dlog.Printf(ctx, "skipping %s", path)
    58  			return nil
    59  		}
    60  
    61  		if !(name == "snapshot" || name == "manifests") {
    62  			dlog.Printf(ctx, "skipping %s", path)
    63  			return nil
    64  		}
    65  
    66  		dlog.Printf(ctx, "found resources from %s", path)
    67  
    68  		switch name {
    69  		case "snapshot":
    70  			var snapshot struct {
    71  				Kubernetes map[string][]*kates.Unstructured
    72  			}
    73  			err := json.Unmarshal(content, &snapshot)
    74  			if err != nil {
    75  				return errors.Wrapf(err, "decoding snapshot at %s", path)
    76  			}
    77  
    78  			for _, values := range snapshot.Kubernetes {
    79  				for _, resource := range values {
    80  					err := repro.Add(resource)
    81  					if err != nil {
    82  						return errors.Wrapf(err, "adding resource")
    83  					}
    84  				}
    85  			}
    86  		case "manifests":
    87  			resources, err := kates.ParseManifests(string(content))
    88  			if err != nil {
    89  				return errors.Wrapf(err, "decoding manifests at %s", path)
    90  			}
    91  
    92  			for _, resource := range resources {
    93  				err := repro.Add(resource)
    94  				if err != nil {
    95  					return errors.Wrapf(err, "adding resource")
    96  				}
    97  			}
    98  		}
    99  
   100  		return breakSearch
   101  	})
   102  
   103  	if err == nil {
   104  		return errors.Errorf("unable to find suitable snapshot in %s", filename)
   105  	}
   106  
   107  	if err != breakSearch {
   108  		return err
   109  	}
   110  
   111  	// Process all the resources we found.
   112  	err = repro.Process(ctx)
   113  	if err != nil {
   114  		return err
   115  	}
   116  
   117  	// Marshal all the transformed resources.
   118  	marshalled, err := repro.Marshal()
   119  	if err != nil {
   120  		return err
   121  	}
   122  
   123  	fmt.Print(string(marshalled))
   124  
   125  	return nil
   126  }
   127  
   128  type Repro struct {
   129  	Resources  map[string][]*kates.Unstructured
   130  	Namespaces map[string]bool
   131  	Ports      map[string]bool
   132  	Processed  []*kates.Unstructured
   133  }
   134  
   135  func NewRepro() *Repro {
   136  	return &Repro{
   137  		Resources:  map[string][]*kates.Unstructured{},
   138  		Namespaces: map[string]bool{},
   139  		Ports:      map[string]bool{},
   140  	}
   141  }
   142  
   143  // Add an input resource from the source cluster.
   144  func (r *Repro) Add(resource kates.Object) error {
   145  	un, err := kates.NewUnstructuredFromObject(resource)
   146  	if err != nil {
   147  		return err
   148  	}
   149  	gvk := resource.GetObjectKind().GroupVersionKind()
   150  	r.Resources[gvk.Kind] = append(r.Resources[gvk.Kind], un)
   151  	return nil
   152  
   153  }
   154  
   155  func (r *Repro) Process(ctx context.Context) error {
   156  	// Process resources in order.
   157  	for _, key := range r.OrderedKinds() {
   158  		values, ok := r.Resources[key]
   159  		if ok {
   160  			delete(r.Resources, key)
   161  			for _, resource := range values {
   162  				p := r.callProcess(ctx, resource)
   163  				if p != nil {
   164  					r.Processed = append(r.Processed, p)
   165  				}
   166  			}
   167  		}
   168  	}
   169  
   170  	// Remove any included namespaces
   171  	for _, p := range r.Processed {
   172  		if p.GetObjectKind().GroupVersionKind().Kind == "Namespace" {
   173  			delete(r.Namespaces, p.GetName())
   174  		}
   175  	}
   176  
   177  	// Auto create any missing namespaces and prepend so they are defined before being used.
   178  	ns := []*kates.Unstructured{}
   179  	for _, k := range sortedKeys(r.Namespaces) {
   180  		un, err := kates.NewUnstructuredFromObject(&kates.Namespace{
   181  			TypeMeta:   kates.TypeMeta{APIVersion: "v1", Kind: "Namespace"},
   182  			ObjectMeta: kates.ObjectMeta{Name: k},
   183  		})
   184  		if err != nil {
   185  			return errors.Wrapf(err, "error creating namespace %s", k)
   186  		}
   187  		ns = append(ns, un)
   188  	}
   189  
   190  	r.Processed = append(ns, r.Processed...)
   191  
   192  	return nil
   193  }
   194  
   195  // OrderedKinds returns all the k8s kinds in the proper order to avoid dangling references.
   196  func (r *Repro) OrderedKinds() []string {
   197  	return append([]string{
   198  		"CustomResourceDefinition",
   199  		"Namespace",
   200  		"ServiceAccount",
   201  		"ClusterRole",
   202  		"ClusterRoleBinding",
   203  		"Role",
   204  		"RoleBinding",
   205  		"Secret",
   206  	},
   207  		sortedKeys(r.Resources)...)
   208  }
   209  
   210  func (r *Repro) callProcess(ctx context.Context, resource *kates.Unstructured) *kates.Unstructured {
   211  	if len(resource.GetOwnerReferences()) > 0 {
   212  		return nil
   213  	}
   214  	if resource.GetNamespace() == "kube-system" {
   215  		return nil
   216  	}
   217  
   218  	gvk := resource.GetObjectKind().GroupVersionKind()
   219  	switch gvk.Kind {
   220  	case "APIService":
   221  		return nil
   222  	case "ComponentStatus":
   223  		return nil
   224  	case "EndpointSlice":
   225  		return nil
   226  	case "Endpoints":
   227  		return nil
   228  	case "Event":
   229  		return nil
   230  	case "Lease":
   231  		return nil
   232  	case "Node":
   233  		return nil
   234  	case "NodeMetrics":
   235  		return nil
   236  	case "PodMetrics":
   237  		return nil
   238  	case "StorageClass":
   239  		return nil
   240  	case "PriorityClass":
   241  		return nil
   242  	}
   243  
   244  	obj, err := kates.NewObjectFromUnstructured(resource)
   245  	if err != nil {
   246  		dlog.Printf(ctx, "error processing object: %+v", err)
   247  		return nil
   248  	}
   249  
   250  	obj = r.process(obj)
   251  
   252  	// convert back to unstructured so we serialize prettier, e.g. no creationTimestamp: null
   253  	result, err := kates.NewUnstructuredFromObject(obj)
   254  	if err != nil {
   255  		dlog.Printf(ctx, "error making unstructured from object: %+v", err)
   256  		return nil
   257  	}
   258  
   259  	return clean(result)
   260  }
   261  
   262  func (r *Repro) process(object kates.Object) kates.Object {
   263  	r.Namespaces[object.GetNamespace()] = true
   264  
   265  	rbac := false
   266  	switch obj := object.(type) {
   267  	case *kates.Service:
   268  		obj.Spec.ClusterIP = ""
   269  		if !isAmbassadorResource(object) {
   270  			obj.Spec.Selector = map[string]string{
   271  				"service": "reproducer",
   272  			}
   273  			for _, port := range obj.Spec.Ports {
   274  				r.Ports[port.TargetPort.String()] = true
   275  			}
   276  		}
   277  	case *kates.ClusterRole:
   278  		rbac = true
   279  	case *kates.ClusterRoleBinding:
   280  		rbac = true
   281  	case *kates.Role:
   282  		rbac = true
   283  	case *kates.RoleBinding:
   284  		rbac = true
   285  	case *kates.ServiceAccount:
   286  		rbac = true
   287  		if obj.GetName() == "default" {
   288  			return nil
   289  		}
   290  	}
   291  
   292  	if rbac && strings.Contains(object.GetName(), "system:") {
   293  		return nil
   294  	}
   295  
   296  	return object
   297  }
   298  
   299  const lastApplied = "kubectl.kubernetes.io/last-applied-configuration"
   300  const bootstrappingLabel = "kubernetes.io/bootstrapping"
   301  
   302  // Clean does generic cleanup of resources from the source cluster. Kubectl and/or the API server
   303  // will add a bunch of annotations about last-applied-configurations and managed fields and what
   304  // not, and these annotations will make kubectl and/or the API server barf if present on a resource
   305  // supplied to `kubectl apply`.
   306  func clean(resource *kates.Unstructured) *kates.Unstructured {
   307  	if resource == nil {
   308  		return nil
   309  	}
   310  
   311  	ann := resource.GetAnnotations()
   312  	if ann == nil {
   313  		ann = map[string]string{}
   314  	}
   315  	delete(ann, lastApplied)
   316  
   317  	labels := resource.GetLabels()
   318  	_, ok := labels[bootstrappingLabel]
   319  	if ok {
   320  		return nil
   321  	}
   322  
   323  	if len(ann) > 0 {
   324  		resource.SetAnnotations(ann)
   325  	} else {
   326  		resource.SetAnnotations(nil)
   327  	}
   328  	resource.SetManagedFields(nil)
   329  	resource.SetCreationTimestamp(kates.Time{Time: time.Time{}})
   330  	resource.SetUID("")
   331  	resource.SetResourceVersion("")
   332  	resource.SetSelfLink("")
   333  	resource.SetGeneration(0)
   334  	delete(resource.Object, "status")
   335  	return resource
   336  }
   337  
   338  func (r *Repro) Marshal() ([]byte, error) {
   339  	return marshalManifests(r.Processed)
   340  }
   341  
   342  func sortedKeys(m interface{}) (result []string) {
   343  	mval := reflect.ValueOf(m)
   344  	for _, v := range mval.MapKeys() {
   345  		result = append(result, v.Interface().(string))
   346  	}
   347  	sort.Strings(result)
   348  	return
   349  }
   350  

View as plain text