1 package reproducer
2
3 import (
4 "archive/tar"
5 "bytes"
6 "compress/gzip"
7 "context"
8 "encoding/json"
9 "fmt"
10 "io"
11 "os"
12 "sort"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/pkg/errors"
18 "github.com/spf13/cobra"
19
20 "github.com/datawire/dlib/dexec"
21 "github.com/datawire/dlib/dlog"
22 "github.com/emissary-ingress/emissary/v3/pkg/kates"
23 )
24
25 var extractCmd = &cobra.Command{
26 Use: "extract [<out-file>]",
27 Short: "extract a redacted set of Ambassador Edge Stack inputs/configuration and logs/debug",
28 Long: `The extract subcommand is inteded to help extract as much info as possible from a source cluster to aid in creation of a reproducer. This source info is redacted and then bundled up in a single archive for ease of uploading.
29
30 The extract subcommand is designed to be run from both outside the cluster and from in the ambassador pod itself. In each case it will capture as much as it can, however it is preferrable to run it from outside the cluster as it will likely have more expansive rbac privileges and therefore be able to capture more relevant details.
31
32 Currently the extract command when run with sufficient rbac privileges captures:
33
34 - The previous and current logs for all ambassador pods.
35 - The output of grab-snapshots for all ambassador pods.
36 - Additional resources not included in the snapshot.
37 + All apro resources.
38 + All pod info/states.
39 + The cluster Event log.
40 - The environment variables for the ambassador pods (with AUTH and PASSWORDs redacted).
41 `,
42 Args: cobra.RangeArgs(0, 1),
43 RunE: extract,
44 }
45
46 func extract(cmd *cobra.Command, args []string) error {
47 var filename string
48 if len(args) > 0 {
49 filename = args[0]
50 } else {
51 filename = fmt.Sprintf("extraction-%s.tgz", time.Now().Format(time.RFC3339))
52 }
53
54 ctx := cmd.Context()
55 cli, err := kates.NewClient(kates.ClientConfig{})
56 if err != nil {
57 return errors.Wrapf(err, "initializing kubernetes client")
58 }
59
60 ex := NewExtraction(cli)
61
62
63 pods := ex.ListAmbassadorPods(ctx)
64
65
66 podLogsFunc := ex.CaptureLogs(ctx, pods)
67
68
69 err = ex.CaptureRemoteSnapshots(ctx, pods)
70 if err != nil {
71 return err
72 }
73
74
75 err = ex.CaptureResources(ctx)
76 if err != nil {
77 return err
78 }
79
80
81 if kates.InCluster() {
82 ex.CaptureEnviron(ctx)
83 ex.CaptureSnapshot(ctx)
84 }
85
86
87 return ex.WriteArchive(ctx, filename, podLogsFunc())
88 }
89
90 type PodLogs = map[string][]kates.LogEvent
91
92 type Extraction struct {
93 client *kates.Client
94 ExtractionLog []*LogEntry
95 Snapshots map[string][]byte
96 Resources []*kates.Unstructured
97 Environ map[string]string
98 }
99
100 type LogEntry struct {
101 Timestamp time.Time `json:"timestamp"`
102 Message string `json:"message"`
103 Error error `json:"error,omitempty"`
104 }
105
106 func NewExtraction(client *kates.Client) *Extraction {
107 return &Extraction{client: client, Snapshots: map[string][]byte{}}
108 }
109
110 func (ex *Extraction) add(entry *LogEntry) {
111 ex.ExtractionLog = append(ex.ExtractionLog, entry)
112 }
113
114 func (ex *Extraction) Printf(ctx context.Context, format string, args ...interface{}) {
115 fmt.Fprintf(os.Stderr, "%s\n", fmt.Sprintf(format, args...))
116 ex.add(&LogEntry{Timestamp: time.Now(), Message: fmt.Sprintf(format, args...)})
117 }
118
119 func (ex *Extraction) Warnf(ctx context.Context, err error, format string, args ...interface{}) {
120 fmt.Fprintf(os.Stderr, "%s: %+v\n", fmt.Sprintf(format, args...), err)
121 ex.add(&LogEntry{Timestamp: time.Now(), Message: fmt.Sprintf(format, args...), Error: err})
122 }
123
124
125
126
127
128
129 func (ex *Extraction) ListAmbassadorPods(ctx context.Context) []*kates.Pod {
130 var result []*kates.Pod
131 for _, sel := range []string{"service=ambassador", "product=aes"} {
132 var pods []*kates.Pod
133 err := ex.client.List(ctx, kates.Query{Kind: "Pod", Namespace: kates.NamespaceAll, LabelSelector: sel}, &pods)
134 if err != nil {
135 ex.Warnf(ctx, err, "error listing pods, no logs will be available")
136 continue
137 }
138 result = append(result, pods...)
139 }
140
141 var podNames []string
142 for _, p := range result {
143 podNames = append(podNames, QName(p))
144 }
145 sort.Strings(podNames)
146 if len(result) > 0 {
147 ex.Printf(ctx, "found ambassador pods: %s", strings.Join(podNames, ", "))
148 } else {
149 ex.Printf(ctx, "unable to find ambassador pods")
150 }
151 return result
152 }
153
154
155
156
157 func (ex *Extraction) CaptureLogs(ctx context.Context, pods []*kates.Pod) func() PodLogs {
158 previousEvents := make(chan kates.LogEvent)
159 currentEvents := make(chan kates.LogEvent)
160 wg := sync.WaitGroup{}
161 byID := map[string]string{}
162 for _, pod := range pods {
163 byID[string(pod.GetUID())] = QName(pod)
164 err := ex.client.PodLogs(ctx, pod, &kates.PodLogOptions{Previous: true}, previousEvents)
165 if err != nil {
166 ex.Warnf(ctx, err, "error listing previous logs for pod %s in namespaces %s", pod.Name, pod.Namespace)
167 } else {
168 wg.Add(1)
169 }
170 err = ex.client.PodLogs(ctx, pod, &kates.PodLogOptions{}, currentEvents)
171 if err != nil {
172 ex.Warnf(ctx, err, "error listing current logs for pod %s in namespaces %s", pod.Name, pod.Namespace)
173 } else {
174 wg.Add(1)
175 }
176 }
177 podLogs := PodLogs{}
178 go func() {
179 for {
180 var ev kates.LogEvent
181 var name string
182 select {
183 case ev = <-previousEvents:
184 name = fmt.Sprintf("%s:previous", byID[ev.PodID])
185 case ev = <-currentEvents:
186 name = byID[ev.PodID]
187 }
188 podLogs[name] = append(podLogs[name], ev)
189 if ev.Closed {
190 wg.Done()
191 }
192 }
193 }()
194
195 return func() PodLogs {
196 wg.Wait()
197 return podLogs
198 }
199 }
200
201
202 func (ex *Extraction) CaptureRemoteSnapshots(ctx context.Context, pods []*kates.Pod) error {
203 for _, p := range pods {
204 cmd := dexec.CommandContext(ctx, "kubectl", "exec", "-i", "-n", p.GetNamespace(), p.GetName(), "--", "grab-snapshots", "-o", "/tmp/sanitized.tgz")
205 cmd.DisableLogging = true
206 cmd.Stdout = os.Stdout
207 cmd.Stderr = os.Stderr
208 err := cmd.Run()
209 if err != nil {
210 ex.Warnf(ctx, err, "error grabbing snapshot for pod %s", QName(p))
211 continue
212 }
213
214 cmd = dexec.CommandContext(ctx, "kubectl", "cp", "-n", p.GetNamespace(), fmt.Sprintf("%s:/tmp/sanitized.tgz", p.GetName()), "/dev/stdout")
215 cmd.DisableLogging = true
216 cmd.Stderr = nil
217 snapshot, err := cmd.Output()
218 if err != nil {
219 ex.Warnf(ctx, err, "error copying snapshot for pod %s", QName(p))
220 continue
221 }
222 ex.Snapshots[QName(p)] = snapshot
223 }
224
225 return nil
226 }
227
228
229
230
231
232 func (ex *Extraction) CaptureResources(ctx context.Context) error {
233 preferredResources, err := ex.client.ServerPreferredResources()
234 if err != nil {
235 return errors.Wrapf(err, "querying server resources")
236 }
237
238 for _, r := range preferredResources {
239 hasList := false
240 for _, v := range r.Verbs {
241 if v == "list" {
242 hasList = true
243 }
244 }
245 if hasList {
246 ex.capture(ctx, kates.Query{Kind: r.Kind, Namespace: kates.NamespaceAll})
247 }
248 }
249
250 ex.Printf(ctx, "extracted %d total resources", len(ex.Resources))
251 return nil
252 }
253
254 func (ex *Extraction) capture(ctx context.Context, query kates.Query) {
255 var rsrcs []*kates.Unstructured
256 err := ex.client.List(ctx, query, &rsrcs)
257 if err != nil {
258 ex.Warnf(ctx, err, "error extracting resource %s", query.Kind)
259 return
260 }
261
262 sanitized := []*kates.Unstructured{}
263 for _, r := range rsrcs {
264 s := ex.callSanitize(ctx, r)
265 if s != nil {
266 sanitized = append(sanitized, s)
267 }
268 }
269
270 ex.Printf(ctx, "extracted %d of %d %s", len(sanitized), len(rsrcs), query.Kind)
271 ex.Resources = append(ex.Resources, sanitized...)
272 }
273
274 func (ex *Extraction) callSanitize(ctx context.Context, resource *kates.Unstructured) *kates.Unstructured {
275 obj, err := kates.NewObjectFromUnstructured(resource)
276 if err != nil {
277 ex.Printf(ctx, "error sanitizing object: %+v", err)
278 return nil
279 }
280
281 obj = ex.sanitize(ctx, obj)
282
283 result, err := kates.NewUnstructuredFromObject(obj)
284 if err != nil {
285 dlog.Printf(ctx, "error converting resource to Unstructured: %+v", err)
286 return nil
287 }
288 return result
289 }
290
291 func (ex *Extraction) sanitize(ctx context.Context, object kates.Object) kates.Object {
292
293 switch obj := object.(type) {
294 case *kates.Secret:
295 if obj.Type == kates.SecretTypeServiceAccountToken {
296 return nil
297 }
298
299 ex.Printf(ctx, "redacting secret %s", QName(obj))
300 data := map[string][]byte{}
301 for k := range obj.Data {
302 data[k] = []byte("<redacted>")
303 }
304 obj.Data = data
305 obj.StringData = nil
306 case *kates.ConfigMap:
307 ex.Printf(ctx, "redacting configmap %s", QName(obj))
308 data := map[string]string{}
309 for k := range obj.Data {
310 data[k] = "<redacted>"
311 }
312 obj.Data = data
313 obj.BinaryData = nil
314 case *kates.Deployment:
315 for _, c := range obj.Spec.Template.Spec.Containers {
316 filtered := []kates.EnvVar{}
317 for _, e := range c.Env {
318 copy := e
319 if e.Value != "" {
320 if isAmbassadorResource(obj) {
321 if strings.Contains(e.Name, "AUTH") || strings.Contains(e.Name, "PASSWORD") {
322 ex.Printf(ctx, "redacting env var %s", e.Name)
323 copy.Value = "<redacted>"
324 }
325 } else {
326 ex.Printf(ctx, "redacting env var %s", e.Name)
327 copy.Value = "<redacted>"
328 }
329 }
330 filtered = append(filtered, copy)
331 }
332 c.Env = filtered
333 }
334 }
335
336 return object
337 }
338
339 func isAmbassadorResource(object kates.Object) bool {
340 labels := object.GetLabels()
341 if labels["product"] == "aes" {
342 return true
343 }
344
345 return false
346 }
347
348
349 func (ex *Extraction) CaptureEnviron(ctx context.Context) {
350 ex.Environ = map[string]string{}
351 for _, e := range os.Environ() {
352 parts := strings.SplitN(e, "=", 2)
353 if len(parts) != 2 {
354 ex.Printf(ctx, "unable to split os.Environ() result %v", e)
355 continue
356 }
357 k := parts[0]
358 v := parts[1]
359 if strings.Contains(k, "AUTH") || strings.Contains(k, "PASSWORD") {
360 v = "<redacted>"
361 ex.Printf(ctx, "redacting %s environmen variable", k)
362 }
363 ex.Environ[k] = v
364 }
365 ex.Printf(ctx, "extracted %d environment variables", len(ex.Environ))
366 }
367
368
369 func (ex *Extraction) CaptureSnapshot(ctx context.Context) {
370 cmd := dexec.CommandContext(ctx, "grab-snapshots", "-o", "/dev/stdout")
371 cmd.DisableLogging = true
372 cmd.Stderr = nil
373 snapshot, err := cmd.Output()
374 if err != nil {
375 ex.Warnf(ctx, err, "error extracting local snapshot")
376 return
377 }
378 ex.Snapshots["local"] = snapshot
379 }
380
381
382 func (ex *Extraction) WriteArchive(ctx context.Context, filename string, podLogs PodLogs) error {
383 manifests, err := marshalManifests(ex.Resources)
384 if err != nil {
385 return errors.Wrapf(err, "marshalling resources")
386 }
387
388 logTotal := 0
389 for k, v := range podLogs {
390 logTotal += len(v)
391 ex.Printf(ctx, "extracted %d log entries from pod %s", len(v), k)
392 }
393 ex.Printf(ctx, "extracted %d total log entries", logTotal)
394
395 out, err := os.Create(filename)
396 if err != nil {
397 return errors.Wrapf(err, "creating output")
398 }
399 ex.Printf(ctx, "created %s", filename)
400 defer func() {
401 out.Close()
402 ex.Printf(ctx, "closed %s", filename)
403 }()
404
405 gw := gzip.NewWriter(out)
406 defer gw.Close()
407 tw := tar.NewWriter(gw)
408 defer tw.Close()
409
410 archive := func(name string, content []byte) error {
411 ex.Printf(ctx, "%s: adding %s (%d bytes)", filename, name, len(content))
412 header := &tar.Header{
413 Name: name,
414 Size: int64(len(content)),
415 Mode: 0777,
416 ModTime: time.Now(),
417 }
418
419 err = tw.WriteHeader(header)
420 if err != nil {
421 return errors.Wrapf(err, "writing archive header %s", name)
422 }
423 _, err = io.Copy(tw, bytes.NewReader(content))
424 if err != nil {
425 return errors.Wrapf(err, "writing archive entry %s", name)
426 }
427
428 return nil
429 }
430
431 err = archive("manifests.yaml", manifests)
432 if err != nil {
433 return err
434 }
435
436 archiveJson := func(name string, value interface{}) error {
437 bytes, err := json.MarshalIndent(value, "", " ")
438 if err != nil {
439 return err
440 }
441 return archive(name, bytes)
442 }
443
444 err = archiveJson("pods.log", podLogs)
445 if err != nil {
446 return err
447 }
448
449 for k, v := range ex.Snapshots {
450 err = archive(fmt.Sprintf("%s.snapshot.tgz", k), v)
451 if err != nil {
452 return err
453 }
454 }
455
456 if kates.InCluster() {
457 err = archiveJson("environ.json", ex.Environ)
458 if err != nil {
459 return err
460 }
461 }
462
463 return archiveJson("extraction.log", ex.ExtractionLog)
464 }
465
466 func QName(obj kates.Object) string {
467 return fmt.Sprintf("%s.%s", obj.GetName(), obj.GetNamespace())
468 }
469
View as plain text