...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/reproducer/extract.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/reproducer

     1  package reproducer
     2  
     3  import (
     4  	"archive/tar"
     5  	"bytes"
     6  	"compress/gzip"
     7  	"context"
     8  	"encoding/json"
     9  	"fmt"
    10  	"io"
    11  	"os"
    12  	"sort"
    13  	"strings"
    14  	"sync"
    15  	"time"
    16  
    17  	"github.com/pkg/errors"
    18  	"github.com/spf13/cobra"
    19  
    20  	"github.com/datawire/dlib/dexec"
    21  	"github.com/datawire/dlib/dlog"
    22  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    23  )
    24  
    25  var extractCmd = &cobra.Command{
    26  	Use:   "extract [<out-file>]",
    27  	Short: "extract a redacted set of Ambassador Edge Stack inputs/configuration and logs/debug",
    28  	Long: `The extract subcommand is inteded to help extract as much info as possible from a source cluster to aid in creation of a reproducer. This source info is redacted and then bundled up in a single archive for ease of uploading.
    29  
    30  The extract subcommand is designed to be run from both outside the cluster and from in the ambassador pod itself. In each case it will capture as much as it can, however it is preferrable to run it from outside the cluster as it will likely have more expansive rbac privileges and therefore be able to capture more relevant details.
    31  
    32  Currently the extract command when run with sufficient rbac privileges captures:
    33  
    34    - The previous and current logs for all ambassador pods.
    35    - The output of grab-snapshots for all ambassador pods.
    36    - Additional resources not included in the snapshot.
    37      + All apro resources.
    38      + All pod info/states.
    39      + The cluster Event log.
    40    - The environment variables for the ambassador pods (with AUTH and PASSWORDs redacted).
    41  `,
    42  	Args: cobra.RangeArgs(0, 1),
    43  	RunE: extract,
    44  }
    45  
    46  func extract(cmd *cobra.Command, args []string) error {
    47  	var filename string
    48  	if len(args) > 0 {
    49  		filename = args[0]
    50  	} else {
    51  		filename = fmt.Sprintf("extraction-%s.tgz", time.Now().Format(time.RFC3339))
    52  	}
    53  
    54  	ctx := cmd.Context()
    55  	cli, err := kates.NewClient(kates.ClientConfig{})
    56  	if err != nil {
    57  		return errors.Wrapf(err, "initializing kubernetes client")
    58  	}
    59  
    60  	ex := NewExtraction(cli)
    61  
    62  	// Find interesting pods.
    63  	pods := ex.ListAmbassadorPods(ctx)
    64  
    65  	// Kick off async log capture for those pods.
    66  	podLogsFunc := ex.CaptureLogs(ctx, pods)
    67  
    68  	// Capture snapshots from pods
    69  	err = ex.CaptureRemoteSnapshots(ctx, pods)
    70  	if err != nil {
    71  		return err
    72  	}
    73  
    74  	// Capture interesting resources.
    75  	err = ex.CaptureResources(ctx)
    76  	if err != nil {
    77  		return err
    78  	}
    79  
    80  	// Capture the environment if we are inside the cluster.
    81  	if kates.InCluster() {
    82  		ex.CaptureEnviron(ctx)
    83  		ex.CaptureSnapshot(ctx)
    84  	}
    85  
    86  	// Save all the results in a tarball.
    87  	return ex.WriteArchive(ctx, filename, podLogsFunc())
    88  }
    89  
    90  type PodLogs = map[string][]kates.LogEvent
    91  
    92  type Extraction struct {
    93  	client        *kates.Client
    94  	ExtractionLog []*LogEntry           // A log of the extraction process itsef.
    95  	Snapshots     map[string][]byte     // Snapshots from all pods in the cluster.
    96  	Resources     []*kates.Unstructured // Interesting resources in the cluster that may not be included in snapshots.
    97  	Environ       map[string]string     // Capture the environment if we are invoked in the cluster.
    98  }
    99  
   100  type LogEntry struct {
   101  	Timestamp time.Time `json:"timestamp"`
   102  	Message   string    `json:"message"`
   103  	Error     error     `json:"error,omitempty"`
   104  }
   105  
   106  func NewExtraction(client *kates.Client) *Extraction {
   107  	return &Extraction{client: client, Snapshots: map[string][]byte{}}
   108  }
   109  
   110  func (ex *Extraction) add(entry *LogEntry) {
   111  	ex.ExtractionLog = append(ex.ExtractionLog, entry)
   112  }
   113  
   114  func (ex *Extraction) Printf(ctx context.Context, format string, args ...interface{}) {
   115  	fmt.Fprintf(os.Stderr, "%s\n", fmt.Sprintf(format, args...))
   116  	ex.add(&LogEntry{Timestamp: time.Now(), Message: fmt.Sprintf(format, args...)})
   117  }
   118  
   119  func (ex *Extraction) Warnf(ctx context.Context, err error, format string, args ...interface{}) {
   120  	fmt.Fprintf(os.Stderr, "%s: %+v\n", fmt.Sprintf(format, args...), err)
   121  	ex.add(&LogEntry{Timestamp: time.Now(), Message: fmt.Sprintf(format, args...), Error: err})
   122  }
   123  
   124  // ListAmbassadorPods will search the entire cluster for ambassador pods resulting from either a
   125  // helm based or manifest based install. It does this by using the service=ambassador labeling used
   126  // by the manifests and the product=aes labeling used by the helm charts. This may get more pods
   127  // than just edge-stack, e.g. it may pull in the redis logs since they have the product=aes label,
   128  // but that is ok we would rather grab more debugging info than less.
   129  func (ex *Extraction) ListAmbassadorPods(ctx context.Context) []*kates.Pod {
   130  	var result []*kates.Pod
   131  	for _, sel := range []string{"service=ambassador", "product=aes"} {
   132  		var pods []*kates.Pod
   133  		err := ex.client.List(ctx, kates.Query{Kind: "Pod", Namespace: kates.NamespaceAll, LabelSelector: sel}, &pods)
   134  		if err != nil {
   135  			ex.Warnf(ctx, err, "error listing pods, no logs will be available")
   136  			continue
   137  		}
   138  		result = append(result, pods...)
   139  	}
   140  
   141  	var podNames []string
   142  	for _, p := range result {
   143  		podNames = append(podNames, QName(p))
   144  	}
   145  	sort.Strings(podNames)
   146  	if len(result) > 0 {
   147  		ex.Printf(ctx, "found ambassador pods: %s", strings.Join(podNames, ", "))
   148  	} else {
   149  		ex.Printf(ctx, "unable to find ambassador pods")
   150  	}
   151  	return result
   152  }
   153  
   154  // CaptureLogs will capture the current and previous logs from all the listed pods. It operates
   155  // asynchronously and returns a function that can be used to access the final result (i.e. a poor
   156  // mans future).
   157  func (ex *Extraction) CaptureLogs(ctx context.Context, pods []*kates.Pod) func() PodLogs {
   158  	previousEvents := make(chan kates.LogEvent)
   159  	currentEvents := make(chan kates.LogEvent)
   160  	wg := sync.WaitGroup{}
   161  	byID := map[string]string{}
   162  	for _, pod := range pods {
   163  		byID[string(pod.GetUID())] = QName(pod)
   164  		err := ex.client.PodLogs(ctx, pod, &kates.PodLogOptions{Previous: true}, previousEvents)
   165  		if err != nil {
   166  			ex.Warnf(ctx, err, "error listing previous logs for pod %s in namespaces %s", pod.Name, pod.Namespace)
   167  		} else {
   168  			wg.Add(1)
   169  		}
   170  		err = ex.client.PodLogs(ctx, pod, &kates.PodLogOptions{}, currentEvents)
   171  		if err != nil {
   172  			ex.Warnf(ctx, err, "error listing current logs for pod %s in namespaces %s", pod.Name, pod.Namespace)
   173  		} else {
   174  			wg.Add(1)
   175  		}
   176  	}
   177  	podLogs := PodLogs{}
   178  	go func() {
   179  		for {
   180  			var ev kates.LogEvent
   181  			var name string
   182  			select {
   183  			case ev = <-previousEvents:
   184  				name = fmt.Sprintf("%s:previous", byID[ev.PodID])
   185  			case ev = <-currentEvents:
   186  				name = byID[ev.PodID]
   187  			}
   188  			podLogs[name] = append(podLogs[name], ev)
   189  			if ev.Closed {
   190  				wg.Done()
   191  			}
   192  		}
   193  	}()
   194  
   195  	return func() PodLogs {
   196  		wg.Wait()
   197  		return podLogs
   198  	}
   199  }
   200  
   201  // CaptureRemoteSnapshots will exec grab-snapshots on all the supplied pods and capture the resulting tarballs.
   202  func (ex *Extraction) CaptureRemoteSnapshots(ctx context.Context, pods []*kates.Pod) error {
   203  	for _, p := range pods {
   204  		cmd := dexec.CommandContext(ctx, "kubectl", "exec", "-i", "-n", p.GetNamespace(), p.GetName(), "--", "grab-snapshots", "-o", "/tmp/sanitized.tgz")
   205  		cmd.DisableLogging = true
   206  		cmd.Stdout = os.Stdout
   207  		cmd.Stderr = os.Stderr
   208  		err := cmd.Run()
   209  		if err != nil {
   210  			ex.Warnf(ctx, err, "error grabbing snapshot for pod %s", QName(p))
   211  			continue
   212  		}
   213  
   214  		cmd = dexec.CommandContext(ctx, "kubectl", "cp", "-n", p.GetNamespace(), fmt.Sprintf("%s:/tmp/sanitized.tgz", p.GetName()), "/dev/stdout")
   215  		cmd.DisableLogging = true
   216  		cmd.Stderr = nil
   217  		snapshot, err := cmd.Output()
   218  		if err != nil {
   219  			ex.Warnf(ctx, err, "error copying snapshot for pod %s", QName(p))
   220  			continue
   221  		}
   222  		ex.Snapshots[QName(p)] = snapshot
   223  	}
   224  
   225  	return nil
   226  }
   227  
   228  // CaptureResources will capture and sanitize as many resources as permitted by the RBAC of the
   229  // system account running the extraction. The code is careful to redact both secrets and config maps
   230  // as well as the enviornment variables of any unrecognized deployments. For ambassador deployments
   231  // only the environment variables that contain AUTH and/or PASSWORD are redacted.
   232  func (ex *Extraction) CaptureResources(ctx context.Context) error {
   233  	preferredResources, err := ex.client.ServerPreferredResources()
   234  	if err != nil {
   235  		return errors.Wrapf(err, "querying server resources")
   236  	}
   237  
   238  	for _, r := range preferredResources {
   239  		hasList := false
   240  		for _, v := range r.Verbs {
   241  			if v == "list" {
   242  				hasList = true
   243  			}
   244  		}
   245  		if hasList {
   246  			ex.capture(ctx, kates.Query{Kind: r.Kind, Namespace: kates.NamespaceAll})
   247  		}
   248  	}
   249  
   250  	ex.Printf(ctx, "extracted %d total resources", len(ex.Resources))
   251  	return nil
   252  }
   253  
   254  func (ex *Extraction) capture(ctx context.Context, query kates.Query) {
   255  	var rsrcs []*kates.Unstructured
   256  	err := ex.client.List(ctx, query, &rsrcs)
   257  	if err != nil {
   258  		ex.Warnf(ctx, err, "error extracting resource %s", query.Kind)
   259  		return
   260  	}
   261  
   262  	sanitized := []*kates.Unstructured{}
   263  	for _, r := range rsrcs {
   264  		s := ex.callSanitize(ctx, r)
   265  		if s != nil {
   266  			sanitized = append(sanitized, s)
   267  		}
   268  	}
   269  
   270  	ex.Printf(ctx, "extracted %d of %d %s", len(sanitized), len(rsrcs), query.Kind)
   271  	ex.Resources = append(ex.Resources, sanitized...)
   272  }
   273  
   274  func (ex *Extraction) callSanitize(ctx context.Context, resource *kates.Unstructured) *kates.Unstructured {
   275  	obj, err := kates.NewObjectFromUnstructured(resource)
   276  	if err != nil {
   277  		ex.Printf(ctx, "error sanitizing object: %+v", err)
   278  		return nil
   279  	}
   280  
   281  	obj = ex.sanitize(ctx, obj)
   282  
   283  	result, err := kates.NewUnstructuredFromObject(obj)
   284  	if err != nil {
   285  		dlog.Printf(ctx, "error converting resource to Unstructured: %+v", err)
   286  		return nil
   287  	}
   288  	return result
   289  }
   290  
   291  func (ex *Extraction) sanitize(ctx context.Context, object kates.Object) kates.Object {
   292  	// Don't capture secrets and don't capture ConfigMaps because the latter often has secrets.
   293  	switch obj := object.(type) {
   294  	case *kates.Secret:
   295  		if obj.Type == kates.SecretTypeServiceAccountToken {
   296  			return nil
   297  		}
   298  
   299  		ex.Printf(ctx, "redacting secret %s", QName(obj))
   300  		data := map[string][]byte{}
   301  		for k := range obj.Data {
   302  			data[k] = []byte("<redacted>")
   303  		}
   304  		obj.Data = data
   305  		obj.StringData = nil
   306  	case *kates.ConfigMap:
   307  		ex.Printf(ctx, "redacting configmap %s", QName(obj))
   308  		data := map[string]string{}
   309  		for k := range obj.Data {
   310  			data[k] = "<redacted>"
   311  		}
   312  		obj.Data = data
   313  		obj.BinaryData = nil
   314  	case *kates.Deployment:
   315  		for _, c := range obj.Spec.Template.Spec.Containers {
   316  			filtered := []kates.EnvVar{}
   317  			for _, e := range c.Env {
   318  				copy := e
   319  				if e.Value != "" {
   320  					if isAmbassadorResource(obj) {
   321  						if strings.Contains(e.Name, "AUTH") || strings.Contains(e.Name, "PASSWORD") {
   322  							ex.Printf(ctx, "redacting env var %s", e.Name)
   323  							copy.Value = "<redacted>"
   324  						}
   325  					} else {
   326  						ex.Printf(ctx, "redacting env var %s", e.Name)
   327  						copy.Value = "<redacted>"
   328  					}
   329  				}
   330  				filtered = append(filtered, copy)
   331  			}
   332  			c.Env = filtered
   333  		}
   334  	}
   335  
   336  	return object
   337  }
   338  
   339  func isAmbassadorResource(object kates.Object) bool {
   340  	labels := object.GetLabels()
   341  	if labels["product"] == "aes" {
   342  		return true
   343  	}
   344  
   345  	return false
   346  }
   347  
   348  // CaptureEnviron captures the environment while redacting any secrets.
   349  func (ex *Extraction) CaptureEnviron(ctx context.Context) {
   350  	ex.Environ = map[string]string{}
   351  	for _, e := range os.Environ() {
   352  		parts := strings.SplitN(e, "=", 2)
   353  		if len(parts) != 2 {
   354  			ex.Printf(ctx, "unable to split os.Environ() result %v", e)
   355  			continue
   356  		}
   357  		k := parts[0]
   358  		v := parts[1]
   359  		if strings.Contains(k, "AUTH") || strings.Contains(k, "PASSWORD") {
   360  			v = "<redacted>"
   361  			ex.Printf(ctx, "redacting %s environmen variable", k)
   362  		}
   363  		ex.Environ[k] = v
   364  	}
   365  	ex.Printf(ctx, "extracted %d environment variables", len(ex.Environ))
   366  }
   367  
   368  // CaptureSnapshot captures the local snapshot if we are in cluster.
   369  func (ex *Extraction) CaptureSnapshot(ctx context.Context) {
   370  	cmd := dexec.CommandContext(ctx, "grab-snapshots", "-o", "/dev/stdout")
   371  	cmd.DisableLogging = true
   372  	cmd.Stderr = nil
   373  	snapshot, err := cmd.Output()
   374  	if err != nil {
   375  		ex.Warnf(ctx, err, "error extracting local snapshot")
   376  		return
   377  	}
   378  	ex.Snapshots["local"] = snapshot
   379  }
   380  
   381  // WriteArchive saves all the extracted info into a tarball.
   382  func (ex *Extraction) WriteArchive(ctx context.Context, filename string, podLogs PodLogs) error {
   383  	manifests, err := marshalManifests(ex.Resources)
   384  	if err != nil {
   385  		return errors.Wrapf(err, "marshalling resources")
   386  	}
   387  
   388  	logTotal := 0
   389  	for k, v := range podLogs {
   390  		logTotal += len(v)
   391  		ex.Printf(ctx, "extracted %d log entries from pod %s", len(v), k)
   392  	}
   393  	ex.Printf(ctx, "extracted %d total log entries", logTotal)
   394  
   395  	out, err := os.Create(filename)
   396  	if err != nil {
   397  		return errors.Wrapf(err, "creating output")
   398  	}
   399  	ex.Printf(ctx, "created %s", filename)
   400  	defer func() {
   401  		out.Close()
   402  		ex.Printf(ctx, "closed %s", filename)
   403  	}()
   404  
   405  	gw := gzip.NewWriter(out)
   406  	defer gw.Close()
   407  	tw := tar.NewWriter(gw)
   408  	defer tw.Close()
   409  
   410  	archive := func(name string, content []byte) error {
   411  		ex.Printf(ctx, "%s: adding %s (%d bytes)", filename, name, len(content))
   412  		header := &tar.Header{
   413  			Name:    name,
   414  			Size:    int64(len(content)),
   415  			Mode:    0777,
   416  			ModTime: time.Now(),
   417  		}
   418  
   419  		err = tw.WriteHeader(header)
   420  		if err != nil {
   421  			return errors.Wrapf(err, "writing archive header %s", name)
   422  		}
   423  		_, err = io.Copy(tw, bytes.NewReader(content))
   424  		if err != nil {
   425  			return errors.Wrapf(err, "writing archive entry %s", name)
   426  		}
   427  
   428  		return nil
   429  	}
   430  
   431  	err = archive("manifests.yaml", manifests)
   432  	if err != nil {
   433  		return err
   434  	}
   435  
   436  	archiveJson := func(name string, value interface{}) error {
   437  		bytes, err := json.MarshalIndent(value, "", "  ")
   438  		if err != nil {
   439  			return err
   440  		}
   441  		return archive(name, bytes)
   442  	}
   443  
   444  	err = archiveJson("pods.log", podLogs)
   445  	if err != nil {
   446  		return err
   447  	}
   448  
   449  	for k, v := range ex.Snapshots {
   450  		err = archive(fmt.Sprintf("%s.snapshot.tgz", k), v)
   451  		if err != nil {
   452  			return err
   453  		}
   454  	}
   455  
   456  	if kates.InCluster() {
   457  		err = archiveJson("environ.json", ex.Environ)
   458  		if err != nil {
   459  			return err
   460  		}
   461  	}
   462  
   463  	return archiveJson("extraction.log", ex.ExtractionLog)
   464  }
   465  
   466  func QName(obj kates.Object) string {
   467  	return fmt.Sprintf("%s.%s", obj.GetName(), obj.GetNamespace())
   468  }
   469  

View as plain text