1 package cmd
2
3 import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "os"
11 "sort"
12 "strings"
13
14 "github.com/golang/protobuf/ptypes/duration"
15 netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
16 "github.com/linkerd/linkerd2/pkg/addr"
17 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
18 "github.com/linkerd/linkerd2/pkg/healthcheck"
19 "github.com/linkerd/linkerd2/pkg/k8s"
20 "github.com/linkerd/linkerd2/pkg/protohttp"
21 metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
22 "github.com/linkerd/linkerd2/viz/pkg/api"
23 hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
24 "github.com/linkerd/linkerd2/viz/pkg/jsonpath"
25 vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
26 tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
27 "github.com/linkerd/linkerd2/viz/tap/pkg"
28 log "github.com/sirupsen/logrus"
29 "github.com/spf13/cobra"
30 "google.golang.org/grpc/codes"
31 )
32
33 type renderTapEventFunc func(*tapPb.TapEvent, ...renderOptions) string
34
35 type tapOptions struct {
36 namespace string
37 toResource string
38 toNamespace string
39 maxRps float32
40 scheme string
41 method string
42 authority string
43 path string
44 output string
45 labelSelector string
46 }
47
48 type endpoint struct {
49 IP string `json:"ip"`
50 Port uint32 `json:"port"`
51 Metadata map[string]string `json:"metadata"`
52 }
53
54 type streamID struct {
55 Base uint32 `json:"base"`
56 Stream uint64 `json:"stream"`
57 }
58
59 type metadata interface {
60 isMetadata()
61 }
62
63 type metadataStr struct {
64 Name string `json:"name"`
65 ValueStr string `json:"valueStr"`
66 }
67
68 func (*metadataStr) isMetadata() {}
69
70 type metadataBin struct {
71 Name string `json:"name"`
72 ValueBin []byte `json:"valueBin"`
73 }
74
75 func (*metadataBin) isMetadata() {}
76
77 type requestInitEvent struct {
78 ID *streamID `json:"id"`
79 Method string `json:"method"`
80 Scheme string `json:"scheme"`
81 Authority string `json:"authority"`
82 Path string `json:"path"`
83 Headers []metadata `json:"headers"`
84 }
85
86 type responseInitEvent struct {
87 ID *streamID `json:"id"`
88 SinceRequestInit *duration.Duration `json:"sinceRequestInit"`
89 HTTPStatus uint32 `json:"httpStatus"`
90 Headers []metadata `json:"headers"`
91 }
92
93 type responseEndEvent struct {
94 ID *streamID `json:"id"`
95 SinceRequestInit *duration.Duration `json:"sinceRequestInit"`
96 SinceResponseInit *duration.Duration `json:"sinceResponseInit"`
97 ResponseBytes uint64 `json:"responseBytes"`
98 Trailers []metadata `json:"trailers"`
99 GrpcStatusCode uint32 `json:"grpcStatusCode"`
100 ResetErrorCode uint32 `json:"resetErrorCode,omitempty"`
101 }
102
103
104 type tapEvent struct {
105 Source *endpoint `json:"source"`
106 Destination *endpoint `json:"destination"`
107 RouteMeta map[string]string `json:"routeMeta"`
108 ProxyDirection string `json:"proxyDirection"`
109 RequestInitEvent *requestInitEvent `json:"requestInitEvent,omitempty"`
110 ResponseInitEvent *responseInitEvent `json:"responseInitEvent,omitempty"`
111 ResponseEndEvent *responseEndEvent `json:"responseEndEvent,omitempty"`
112 }
113
114 func newTapOptions() *tapOptions {
115 return &tapOptions{
116 toResource: "",
117 toNamespace: "",
118 maxRps: maxRps,
119 scheme: "",
120 method: "",
121 authority: "",
122 path: "",
123 output: "",
124 labelSelector: "",
125 }
126 }
127
128 type renderFilter struct {
129 JsonPath string
130 }
131
132 type renderOptions func(f *renderFilter)
133
134 func WithJsonPath(jsonPath string) renderOptions {
135 return func(r *renderFilter) {
136 r.JsonPath = jsonPath
137 }
138 }
139
140 func (o *tapOptions) validate() error {
141 if o.output == "" || o.output == wideOutput || o.output == jsonOutput || strings.HasPrefix(o.output, jsonPathOutput) {
142 return nil
143 }
144
145 return fmt.Errorf("output format \"%s\" not recognized", o.output)
146 }
147
148
149 func NewCmdTap() *cobra.Command {
150 options := newTapOptions()
151
152 cmd := &cobra.Command{
153 Use: "tap [flags] (RESOURCE)",
154 Short: "Listen to a traffic stream",
155 Long: `Listen to a traffic stream.
156
157 The RESOURCE argument specifies the target resource(s) to tap:
158 (TYPE [NAME] | TYPE/NAME)
159
160 Examples:
161 * cronjob/my-cronjob
162 * deploy
163 * deploy/my-deploy
164 * deploy my-deploy
165 * ds/my-daemonset
166 * job/my-job
167 * ns/my-ns
168 * rs
169 * rs/my-replicaset
170 * sts
171 * sts/my-statefulset
172
173 Valid resource types include:
174 * cronjobs
175 * daemonsets
176 * deployments
177 * jobs
178 * namespaces
179 * pods
180 * replicasets
181 * replicationcontrollers
182 * statefulsets
183 * services (only supported as a --to resource)`,
184 Example: ` # tap the web deployment in the default namespace
185 linkerd viz tap deploy/web
186
187 # tap the web-dlbvj pod in the default namespace
188 linkerd viz tap pod/web-dlbvj
189
190 # tap the test namespace, filter by request to prod namespace
191 linkerd viz tap ns/test --to ns/prod`,
192 Args: cobra.RangeArgs(1, 2),
193 ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
194
195
196
197 if len(args) > 1 {
198 return nil, cobra.ShellCompDirectiveError
199 }
200
201 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
202 if err != nil {
203 return nil, cobra.ShellCompDirectiveError
204 }
205
206 if options.namespace == "" {
207 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
208 }
209
210 cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
211
212 results, err := cc.Complete(args, toComplete)
213 if err != nil {
214 return nil, cobra.ShellCompDirectiveError
215 }
216
217 return results, cobra.ShellCompDirectiveDefault
218 },
219 RunE: func(cmd *cobra.Command, args []string) error {
220 if options.namespace == "" {
221 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
222 }
223
224 api.CheckClientOrExit(hc.VizOptions{
225 Options: &healthcheck.Options{
226 ControlPlaneNamespace: controlPlaneNamespace,
227 KubeConfig: kubeconfigPath,
228 Impersonate: impersonate,
229 ImpersonateGroup: impersonateGroup,
230 KubeContext: kubeContext,
231 APIAddr: apiAddr,
232 },
233 VizNamespaceOverride: vizNamespace,
234 })
235
236 requestParams := pkg.TapRequestParams{
237 Resource: strings.Join(args, "/"),
238 Namespace: options.namespace,
239 ToResource: options.toResource,
240 ToNamespace: options.toNamespace,
241 MaxRps: options.maxRps,
242 Scheme: options.scheme,
243 Method: options.method,
244 Authority: options.authority,
245 Path: options.path,
246 Extract: options.output == jsonOutput,
247 LabelSelector: options.labelSelector,
248 }
249
250 err := options.validate()
251 if err != nil {
252 return fmt.Errorf("validation error when executing tap command: %w", err)
253 }
254
255 req, err := pkg.BuildTapByResourceRequest(requestParams)
256 if err != nil {
257 fmt.Fprintln(os.Stderr, err.Error())
258 os.Exit(1)
259 }
260
261 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
262 if err != nil {
263 fmt.Fprintln(os.Stderr, err.Error())
264 os.Exit(1)
265 }
266
267 err = requestTapByResourceFromAPI(cmd.Context(), os.Stdout, k8sAPI, req, options)
268 if err != nil {
269 fmt.Fprintln(os.Stderr, err.Error())
270 os.Exit(1)
271 }
272
273 return nil
274 },
275 }
276
277 cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace,
278 "Namespace of the specified resource")
279 cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource,
280 "Display requests to this resource")
281 cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace,
282 "Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
283 cmd.PersistentFlags().Float32Var(&options.maxRps, "max-rps", options.maxRps,
284 "Maximum requests per second to tap.")
285 cmd.PersistentFlags().StringVar(&options.scheme, "scheme", options.scheme,
286 "Display requests with this scheme")
287 cmd.PersistentFlags().StringVar(&options.method, "method", options.method,
288 "Display requests with this HTTP method")
289 cmd.PersistentFlags().StringVar(&options.authority, "authority", options.authority,
290 "Display requests with this :authority")
291 cmd.PersistentFlags().StringVar(&options.path, "path", options.path,
292 "Display requests with paths that start with this prefix")
293 cmd.PersistentFlags().StringVarP(&options.output, "output", "o", options.output,
294 fmt.Sprintf("Output format. One of: \"%s\", \"%s\", \"%s\"", wideOutput, jsonOutput, jsonPathOutput))
295 cmd.PersistentFlags().StringVarP(&options.labelSelector, "selector", "l", options.labelSelector,
296 "Selector (label query) to filter on, supports '=', '==', and '!='")
297
298 pkgcmd.ConfigureNamespaceFlagCompletion(
299 cmd, []string{"namespace", "to-namespace"},
300 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
301 return cmd
302 }
303
304 func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.KubernetesAPI, req *tapPb.TapByResourceRequest, options *tapOptions) error {
305 reader, body, err := pkg.Reader(ctx, k8sAPI, req)
306 if err != nil {
307 return err
308 }
309 defer body.Close()
310
311 return writeTapEventsToBuffer(w, reader, options)
312 }
313
314 func writeTapEventsToBuffer(w io.Writer, tapByteStream *bufio.Reader, options *tapOptions) error {
315 output := options.output
316
317 switch {
318 case output == "":
319 return renderTapEvents(tapByteStream, w, renderTapEvent)
320 case output == wideOutput:
321 return renderTapEvents(tapByteStream, w, renderTapEventWide)
322 case output == jsonOutput:
323 return renderTapEvents(tapByteStream, w, renderTapEventJSON)
324 case strings.HasPrefix(output, jsonPathOutput):
325 jPathFilter, err := jsonpath.GetJsonPathFlagVal(output)
326 if err != nil {
327 return err
328 }
329 return renderTapEvents(tapByteStream, w, renderTapEventJSON, WithJsonPath(jPathFilter))
330 default:
331 return fmt.Errorf("unknown output format: %q", options.output)
332 }
333 }
334
335 func renderTapEvents(tapByteStream *bufio.Reader, w io.Writer, render renderTapEventFunc, opts ...renderOptions) error {
336 for {
337 log.Debug("Waiting for data...")
338 event := tapPb.TapEvent{}
339 err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
340 if err != nil {
341 if errors.Is(err, io.EOF) {
342 break
343 }
344 fmt.Fprintln(os.Stderr, err)
345 break
346 }
347 _, err = fmt.Fprintln(w, render(&event, opts...))
348 if err != nil {
349 return err
350 }
351 }
352
353 return nil
354 }
355
356 func renderTapEventWide(event *tapPb.TapEvent, _ ...renderOptions) string {
357 dst := dst(event)
358 src := src(event)
359
360 out := []string{renderTapEvent(event)}
361 out = append(out, src.formatResource()...)
362 out = append(out, dst.formatResource()...)
363 out = append(out, routeLabels(event)...)
364 return strings.Join(out, " ")
365 }
366
367
368 func renderTapEvent(event *tapPb.TapEvent, _ ...renderOptions) string {
369 dst := dst(event)
370 src := src(event)
371
372 proxy := "???"
373 tls := ""
374 switch event.GetProxyDirection() {
375 case tapPb.TapEvent_INBOUND:
376 proxy = "in "
377 tls = src.tlsStatus()
378 case tapPb.TapEvent_OUTBOUND:
379 proxy = "out"
380 tls = dst.tlsStatus()
381 default:
382
383 }
384
385 flow := fmt.Sprintf("proxy=%s %s %s tls=%s",
386 proxy,
387 src.formatAddr(),
388 dst.formatAddr(),
389 tls,
390 )
391
392 switch ev := event.GetHttp().GetEvent().(type) {
393 case *tapPb.TapEvent_Http_RequestInit_:
394 return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s",
395 ev.RequestInit.GetId().GetBase(),
396 ev.RequestInit.GetId().GetStream(),
397 flow,
398 vizutil.HTTPMethodToString(ev.RequestInit.GetMethod()),
399 ev.RequestInit.GetAuthority(),
400 ev.RequestInit.GetPath(),
401 )
402
403 case *tapPb.TapEvent_Http_ResponseInit_:
404 return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs",
405 ev.ResponseInit.GetId().GetBase(),
406 ev.ResponseInit.GetId().GetStream(),
407 flow,
408 ev.ResponseInit.GetHttpStatus(),
409 ev.ResponseInit.GetSinceRequestInit().AsDuration().Microseconds(),
410 )
411
412 case *tapPb.TapEvent_Http_ResponseEnd_:
413 switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) {
414 case *metricsPb.Eos_GrpcStatusCode:
415 return fmt.Sprintf(
416 "end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB",
417 ev.ResponseEnd.GetId().GetBase(),
418 ev.ResponseEnd.GetId().GetStream(),
419 flow,
420 codes.Code(eos.GrpcStatusCode),
421 ev.ResponseEnd.GetSinceResponseInit().AsDuration().Microseconds(),
422 ev.ResponseEnd.GetResponseBytes(),
423 )
424
425 case *metricsPb.Eos_ResetErrorCode:
426 return fmt.Sprintf(
427 "end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB",
428 ev.ResponseEnd.GetId().GetBase(),
429 ev.ResponseEnd.GetId().GetStream(),
430 flow,
431 eos.ResetErrorCode,
432 ev.ResponseEnd.GetSinceResponseInit().AsDuration().Microseconds(),
433 ev.ResponseEnd.GetResponseBytes(),
434 )
435
436 default:
437 return fmt.Sprintf("end id=%d:%d %s duration=%dµs response-length=%dB",
438 ev.ResponseEnd.GetId().GetBase(),
439 ev.ResponseEnd.GetId().GetStream(),
440 flow,
441 ev.ResponseEnd.GetSinceResponseInit().AsDuration().Microseconds(),
442 ev.ResponseEnd.GetResponseBytes(),
443 )
444 }
445
446 default:
447 return fmt.Sprintf("unknown %s", flow)
448 }
449 }
450
451
452 func renderTapEventJSON(event *tapPb.TapEvent, opts ...renderOptions) string {
453 filter := &renderFilter{}
454 for _, opt := range opts {
455 opt(filter)
456 }
457 m := mapPublicToDisplayTapEvent(event)
458 if filter.JsonPath != "" {
459 filteredJson, err := jsonpath.GetJsonFilteredByJPath(m, filter.JsonPath)
460 if err != nil {
461 return err.Error()
462 }
463 return filteredJson[0]
464 }
465 e, err := json.MarshalIndent(m, "", " ")
466 if err != nil {
467 return fmt.Sprintf("{\"error marshalling JSON\": \"%s\"}", err)
468 }
469 return string(e)
470 }
471
472
473 func mapPublicToDisplayTapEvent(event *tapPb.TapEvent) *tapEvent {
474
475 sip := addr.PublicIPToString(event.GetSource().GetIp())
476 src := &endpoint{
477 IP: sip,
478 Port: event.GetSource().GetPort(),
479 Metadata: event.GetSourceMeta().GetLabels(),
480 }
481
482
483 dip := addr.PublicIPToString(event.GetDestination().GetIp())
484 dst := &endpoint{
485 IP: dip,
486 Port: event.GetDestination().GetPort(),
487 Metadata: event.GetDestinationMeta().GetLabels(),
488 }
489
490 return &tapEvent{
491 Source: src,
492 Destination: dst,
493 RouteMeta: event.GetRouteMeta().GetLabels(),
494 ProxyDirection: event.GetProxyDirection().String(),
495 RequestInitEvent: getRequestInitEvent(event.GetHttp()),
496 ResponseInitEvent: getResponseInitEvent(event.GetHttp()),
497 ResponseEndEvent: getResponseEndEvent(event.GetHttp()),
498 }
499 }
500
501
502 func getRequestInitEvent(pubEv *tapPb.TapEvent_Http) *requestInitEvent {
503 reqI := pubEv.GetRequestInit()
504 if reqI == nil {
505 return nil
506 }
507 sid := &streamID{
508 Base: reqI.GetId().GetBase(),
509 Stream: reqI.GetId().GetStream(),
510 }
511 return &requestInitEvent{
512 ID: sid,
513 Method: formatMethod(reqI.GetMethod()),
514 Scheme: formatScheme(reqI.GetScheme()),
515 Authority: reqI.GetAuthority(),
516 Path: reqI.GetPath(),
517 Headers: formatHeadersTrailers(reqI.GetHeaders()),
518 }
519 }
520
521 func formatMethod(m *metricsPb.HttpMethod) string {
522 if x, ok := m.GetType().(*metricsPb.HttpMethod_Registered_); ok {
523 return x.Registered.String()
524 }
525 if s, ok := m.GetType().(*metricsPb.HttpMethod_Unregistered); ok {
526 return s.Unregistered
527 }
528 return ""
529 }
530
531 func formatScheme(s *metricsPb.Scheme) string {
532 if x, ok := s.GetType().(*metricsPb.Scheme_Registered_); ok {
533 return x.Registered.String()
534 }
535 if str, ok := s.GetType().(*metricsPb.Scheme_Unregistered); ok {
536 return str.Unregistered
537 }
538 return ""
539 }
540
541
542 func getResponseInitEvent(pubEv *tapPb.TapEvent_Http) *responseInitEvent {
543 resI := pubEv.GetResponseInit()
544 if resI == nil {
545 return nil
546 }
547 sid := &streamID{
548 Base: resI.GetId().GetBase(),
549 Stream: resI.GetId().GetStream(),
550 }
551 return &responseInitEvent{
552 ID: sid,
553 SinceRequestInit: resI.GetSinceRequestInit(),
554 HTTPStatus: resI.GetHttpStatus(),
555 Headers: formatHeadersTrailers(resI.GetHeaders()),
556 }
557 }
558
559
560 func getResponseEndEvent(pubEv *tapPb.TapEvent_Http) *responseEndEvent {
561 resE := pubEv.GetResponseEnd()
562 if resE == nil {
563 return nil
564 }
565 sid := &streamID{
566 Base: resE.GetId().GetBase(),
567 Stream: resE.GetId().GetStream(),
568 }
569 return &responseEndEvent{
570 ID: sid,
571 SinceRequestInit: resE.GetSinceRequestInit(),
572 SinceResponseInit: resE.GetSinceResponseInit(),
573 ResponseBytes: resE.GetResponseBytes(),
574 Trailers: formatHeadersTrailers(resE.GetTrailers()),
575 GrpcStatusCode: resE.GetEos().GetGrpcStatusCode(),
576 ResetErrorCode: resE.GetEos().GetResetErrorCode(),
577 }
578 }
579
580 func formatHeadersTrailers(hs *metricsPb.Headers) []metadata {
581 var fm []metadata
582 for _, h := range hs.GetHeaders() {
583 switch h.GetValue().(type) {
584 case *metricsPb.Headers_Header_ValueStr:
585 fht := &metadataStr{Name: h.GetName(), ValueStr: h.GetValueStr()}
586 fm = append(fm, fht)
587 continue
588 case *metricsPb.Headers_Header_ValueBin:
589 fht := &metadataBin{Name: h.GetName(), ValueBin: h.GetValueBin()}
590 fm = append(fm, fht)
591 continue
592 }
593 }
594 return fm
595 }
596
597
598 func src(event *tapPb.TapEvent) peer {
599 return peer{
600 address: event.GetSource(),
601 labels: event.GetSourceMeta().GetLabels(),
602 direction: "src",
603 }
604 }
605
606
607 func dst(event *tapPb.TapEvent) peer {
608 return peer{
609 address: event.GetDestination(),
610 labels: event.GetDestinationMeta().GetLabels(),
611 direction: "dst",
612 }
613 }
614
615 type peer struct {
616 address *netPb.TcpAddress
617 labels map[string]string
618 direction string
619 }
620
621
622
623 func (p *peer) formatAddr() string {
624 return fmt.Sprintf(
625 "%s=%s",
626 p.direction,
627 addr.PublicAddressToString(p.address),
628 )
629 }
630
631
632 func (p *peer) formatResource() []string {
633 labels := []string{}
634 for k, v := range p.labels {
635 labels = append(labels, fmt.Sprintf("%s_%s=%s", p.direction, k, v))
636 }
637 sort.Strings(labels)
638 return labels
639 }
640
641 func (p *peer) tlsStatus() string {
642 return p.labels["tls"]
643 }
644
645 func routeLabels(event *tapPb.TapEvent) []string {
646 out := []string{}
647 for key, val := range event.GetRouteMeta().GetLabels() {
648 out = append(out, fmt.Sprintf("rt_%s=%s", key, val))
649 }
650 return out
651 }
652
View as plain text