1 package cmd
2
3 import (
4 "bufio"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "sort"
10 "strconv"
11 "strings"
12 "time"
13
14 "github.com/linkerd/linkerd2/pkg/addr"
15 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
16 "github.com/linkerd/linkerd2/pkg/healthcheck"
17 "github.com/linkerd/linkerd2/pkg/k8s"
18 "github.com/linkerd/linkerd2/pkg/protohttp"
19 metricsAPI "github.com/linkerd/linkerd2/viz/metrics-api"
20 metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
21 "github.com/linkerd/linkerd2/viz/pkg/api"
22 hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
23 vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
24 tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
25 "github.com/linkerd/linkerd2/viz/tap/pkg"
26 runewidth "github.com/mattn/go-runewidth"
27 termbox "github.com/nsf/termbox-go"
28 log "github.com/sirupsen/logrus"
29 "github.com/spf13/cobra"
30 "google.golang.org/grpc/codes"
31 )
32
33 type topOptions struct {
34 namespace string
35 toResource string
36 toNamespace string
37 maxRps float32
38 scheme string
39 method string
40 authority string
41 path string
42 hideSources bool
43 routes bool
44 labelSelector string
45 }
46
47 type topRequest struct {
48 event *tapPb.TapEvent
49 reqInit *tapPb.TapEvent_Http_RequestInit
50 rspInit *tapPb.TapEvent_Http_ResponseInit
51 rspEnd *tapPb.TapEvent_Http_ResponseEnd
52 }
53
54 type topRequestID struct {
55 src string
56 dst string
57 stream uint64
58 }
59
60 func (id topRequestID) String() string {
61 return fmt.Sprintf("%s->%s(%d)", id.src, id.dst, id.stream)
62 }
63
64 type tableColumn struct {
65 header string
66 width int
67
68
69
70 key bool
71
72 display bool
73
74 flexible bool
75 rightAlign bool
76 value func(tableRow) string
77 }
78
79 type tableRow struct {
80 path string
81 method string
82 route string
83 source string
84 destination string
85 count int
86 best time.Duration
87 worst time.Duration
88 last time.Duration
89 successes int
90 failures int
91 }
92
93 func (r tableRow) merge(other tableRow) tableRow {
94 r.count += other.count
95 if other.best.Nanoseconds() < r.best.Nanoseconds() {
96 r.best = other.best
97 }
98 if other.worst.Nanoseconds() > r.worst.Nanoseconds() {
99 r.worst = other.worst
100 }
101 r.last = other.last
102 r.successes += other.successes
103 r.failures += other.failures
104 return r
105 }
106
107 type column int
108
109 const (
110 sourceColumn column = iota
111 destinationColumn
112 methodColumn
113 pathColumn
114 routeColumn
115 countColumn
116 bestColumn
117 worstColumn
118 lastColumn
119 successRateColumn
120
121 columnCount
122 )
123
124 type topTable struct {
125 columns [columnCount]tableColumn
126 rows []tableRow
127 }
128
129 func newTopTable() *topTable {
130 table := topTable{}
131
132 table.columns[sourceColumn] =
133 tableColumn{
134 header: "Source",
135 width: 23,
136 key: true,
137 display: true,
138 flexible: true,
139 value: func(r tableRow) string {
140 return r.source
141 },
142 }
143
144 table.columns[destinationColumn] =
145 tableColumn{
146 header: "Destination",
147 width: 23,
148 key: true,
149 display: true,
150 flexible: true,
151 value: func(r tableRow) string {
152 return r.destination
153 },
154 }
155
156 table.columns[methodColumn] =
157 tableColumn{
158 header: "Method",
159 width: 10,
160 key: true,
161 display: true,
162 flexible: false,
163 value: func(r tableRow) string {
164 return r.method
165 },
166 }
167
168 table.columns[pathColumn] =
169 tableColumn{
170 header: "Path",
171 width: 37,
172 key: true,
173 display: true,
174 flexible: true,
175 value: func(r tableRow) string {
176 return r.path
177 },
178 }
179
180 table.columns[routeColumn] =
181 tableColumn{
182 header: "Route",
183 width: 47,
184 key: false,
185 display: false,
186 flexible: true,
187 value: func(r tableRow) string {
188 return r.route
189 },
190 }
191
192 table.columns[countColumn] =
193 tableColumn{
194 header: "Count",
195 width: 6,
196 key: false,
197 display: true,
198 flexible: false,
199 rightAlign: true,
200 value: func(r tableRow) string {
201 return strconv.Itoa(r.count)
202 },
203 }
204
205 table.columns[bestColumn] =
206 tableColumn{
207 header: "Best",
208 width: 6,
209 key: false,
210 display: true,
211 flexible: false,
212 rightAlign: true,
213 value: func(r tableRow) string {
214 return formatDuration(r.best)
215 },
216 }
217
218 table.columns[worstColumn] =
219 tableColumn{
220 header: "Worst",
221 width: 6,
222 key: false,
223 display: true,
224 flexible: false,
225 rightAlign: true,
226 value: func(r tableRow) string {
227 return formatDuration(r.worst)
228 },
229 }
230
231 table.columns[lastColumn] =
232 tableColumn{
233 header: "Last",
234 width: 6,
235 key: false,
236 display: true,
237 flexible: false,
238 rightAlign: true,
239 value: func(r tableRow) string {
240 return formatDuration(r.last)
241 },
242 }
243
244 table.columns[successRateColumn] =
245 tableColumn{
246 header: "Success Rate",
247 width: 12,
248 key: false,
249 display: true,
250 flexible: false,
251 rightAlign: true,
252 value: func(r tableRow) string {
253 return fmt.Sprintf("%.2f%%", 100.0*float32(r.successes)/float32(r.successes+r.failures))
254 },
255 }
256
257 return &table
258 }
259
260 const (
261 headerHeight = 4
262 columnSpacing = 2
263 xOffset = 5
264 )
265
266 func newTopOptions() *topOptions {
267 return &topOptions{
268 toResource: "",
269 toNamespace: "",
270 maxRps: maxRps,
271 scheme: "",
272 method: "",
273 authority: "",
274 path: "",
275 hideSources: false,
276 routes: false,
277 labelSelector: "",
278 }
279 }
280
281
282 func NewCmdTop() *cobra.Command {
283 options := newTopOptions()
284
285 table := newTopTable()
286
287 cmd := &cobra.Command{
288 Use: "top [flags] (RESOURCE)",
289 Short: "Display sorted information about live traffic",
290 Long: `Display sorted information about live traffic.
291
292 The RESOURCE argument specifies the target resource(s) to view traffic for:
293 (TYPE [NAME] | TYPE/NAME)
294
295 Examples:
296 * cronjob/my-cronjob
297 * deploy
298 * deploy/my-deploy
299 * deploy my-deploy
300 * ds/my-daemonset
301 * job/my-job
302 * ns/my-ns
303 * rs
304 * rs/my-replicaset
305 * sts
306 * sts/my-statefulset
307
308 Valid resource types include:
309 * cronjobs
310 * daemonsets
311 * deployments
312 * jobs
313 * namespaces
314 * pods
315 * replicasets
316 * replicationcontrollers
317 * statefulsets
318 * services (only supported as a --to resource)`,
319 Example: ` # display traffic for the web deployment in the default namespace
320 linkerd viz top deploy/web
321
322 # display traffic for the web-dlbvj pod in the default namespace
323 linkerd viz top pod/web-dlbvj`,
324 Args: cobra.RangeArgs(1, 2),
325 ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
326
327
328
329 if len(args) > 1 {
330 return nil, cobra.ShellCompDirectiveError
331 }
332
333 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
334 if err != nil {
335 return nil, cobra.ShellCompDirectiveError
336 }
337
338 if options.namespace == "" {
339 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
340 }
341
342 cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
343
344 results, err := cc.Complete(args, toComplete)
345 if err != nil {
346 return nil, cobra.ShellCompDirectiveError
347 }
348
349 return results, cobra.ShellCompDirectiveDefault
350 },
351 RunE: func(cmd *cobra.Command, args []string) error {
352 if options.namespace == "" {
353 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
354 }
355
356 api.CheckClientOrExit(hc.VizOptions{
357 Options: &healthcheck.Options{
358 ControlPlaneNamespace: controlPlaneNamespace,
359 KubeConfig: kubeconfigPath,
360 Impersonate: impersonate,
361 ImpersonateGroup: impersonateGroup,
362 KubeContext: kubeContext,
363 APIAddr: apiAddr,
364 },
365 VizNamespaceOverride: vizNamespace,
366 })
367
368 requestParams := pkg.TapRequestParams{
369 Resource: strings.Join(args, "/"),
370 Namespace: options.namespace,
371 ToResource: options.toResource,
372 ToNamespace: options.toNamespace,
373 MaxRps: options.maxRps,
374 Scheme: options.scheme,
375 Method: options.method,
376 Authority: options.authority,
377 Path: options.path,
378 LabelSelector: options.labelSelector,
379 }
380
381 if options.hideSources {
382 table.columns[sourceColumn].key = false
383 table.columns[sourceColumn].display = false
384 }
385
386 if options.routes {
387 table.columns[methodColumn].key = false
388 table.columns[methodColumn].display = false
389 table.columns[pathColumn].key = false
390 table.columns[pathColumn].display = false
391 table.columns[routeColumn].key = true
392 table.columns[routeColumn].display = true
393 }
394
395 req, err := pkg.BuildTapByResourceRequest(requestParams)
396 if err != nil {
397 return err
398 }
399
400 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
401 if err != nil {
402 return err
403 }
404
405 return getTrafficByResourceFromAPI(cmd.Context(), k8sAPI, req, table)
406 },
407 }
408
409 cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace,
410 "Namespace of the specified resource")
411 cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource,
412 "Display requests to this resource")
413 cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace,
414 "Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
415 cmd.PersistentFlags().Float32Var(&options.maxRps, "max-rps", options.maxRps,
416 "Maximum requests per second to tap.")
417 cmd.PersistentFlags().StringVar(&options.scheme, "scheme", options.scheme,
418 "Display requests with this scheme")
419 cmd.PersistentFlags().StringVar(&options.method, "method", options.method,
420 "Display requests with this HTTP method")
421 cmd.PersistentFlags().StringVar(&options.authority, "authority", options.authority,
422 "Display requests with this :authority")
423 cmd.PersistentFlags().StringVar(&options.path, "path", options.path,
424 "Display requests with paths that start with this prefix")
425 cmd.PersistentFlags().BoolVar(&options.hideSources, "hide-sources", options.hideSources, "Hide the source column")
426 cmd.PersistentFlags().BoolVar(&options.routes, "routes", options.routes, "Display data per route instead of per path")
427 cmd.PersistentFlags().StringVarP(&options.labelSelector, "selector", "l", options.labelSelector, "Selector (label query) to filter on, supports '=', '==', and '!='")
428
429 pkgcmd.ConfigureNamespaceFlagCompletion(
430 cmd, []string{"namespace", "to-namespace"},
431 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
432 return cmd
433 }
434
435 func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *tapPb.TapByResourceRequest, table *topTable) error {
436 reader, body, err := pkg.Reader(ctx, k8sAPI, req)
437 if err != nil {
438 return err
439 }
440 defer body.Close()
441
442 err = termbox.Init()
443 if err != nil {
444 return err
445 }
446 defer termbox.Close()
447
448
449
450
451
452
453
454
455 eventCh := make(chan *tapPb.TapEvent)
456 requestCh := make(chan topRequest, 100)
457
458
459
460
461
462
463 closing := make(chan struct{}, 1)
464 done := make(chan struct{})
465 horizontalScroll := make(chan int)
466
467 go pollInput(done, horizontalScroll)
468 go recvEvents(reader, eventCh, closing)
469 go processEvents(eventCh, requestCh, done)
470
471 go func() {
472 <-closing
473 }()
474
475 renderTable(table, requestCh, done, horizontalScroll)
476
477 return nil
478 }
479
480 func recvEvents(tapByteStream *bufio.Reader, eventCh chan<- *tapPb.TapEvent, closing chan<- struct{}) {
481 for {
482 event := &tapPb.TapEvent{}
483 err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, event)
484 if err != nil {
485 if errors.Is(err, io.EOF) {
486 fmt.Println("Tap stream terminated")
487 } else if !strings.HasSuffix(err.Error(), pkg.ErrClosedResponseBody) {
488 fmt.Println(err.Error())
489 }
490
491 closing <- struct{}{}
492 return
493 }
494
495 eventCh <- event
496 }
497 }
498
499 func processEvents(eventCh <-chan *tapPb.TapEvent, requestCh chan<- topRequest, done <-chan struct{}) {
500 outstandingRequests := make(map[topRequestID]topRequest)
501
502 for {
503 select {
504 case <-done:
505 return
506 case event := <-eventCh:
507 id := topRequestID{
508 src: addr.PublicAddressToString(event.GetSource()),
509 dst: addr.PublicAddressToString(event.GetDestination()),
510 }
511 switch ev := event.GetHttp().GetEvent().(type) {
512 case *tapPb.TapEvent_Http_RequestInit_:
513 id.stream = ev.RequestInit.GetId().Stream
514 outstandingRequests[id] = topRequest{
515 event: event,
516 reqInit: ev.RequestInit,
517 }
518
519 case *tapPb.TapEvent_Http_ResponseInit_:
520 id.stream = ev.ResponseInit.GetId().Stream
521 if req, ok := outstandingRequests[id]; ok {
522 req.rspInit = ev.ResponseInit
523 outstandingRequests[id] = req
524 } else {
525 log.Warnf("Got ResponseInit for unknown stream: %s", id)
526 }
527
528 case *tapPb.TapEvent_Http_ResponseEnd_:
529 id.stream = ev.ResponseEnd.GetId().Stream
530 if req, ok := outstandingRequests[id]; ok {
531 req.rspEnd = ev.ResponseEnd
532 requestCh <- req
533 } else {
534 log.Warnf("Got ResponseEnd for unknown stream: %s", id)
535 }
536 }
537 }
538 }
539 }
540
541 func pollInput(done chan<- struct{}, horizontalScroll chan int) {
542 for {
543 switch ev := termbox.PollEvent(); ev.Type {
544 case termbox.EventKey:
545 if ev.Ch == 'q' || ev.Key == termbox.KeyCtrlC {
546 close(done)
547 return
548 }
549 if ev.Ch == 'a' || ev.Key == termbox.KeyArrowLeft {
550 horizontalScroll <- xOffset
551 }
552 if ev.Ch == 'd' || ev.Key == termbox.KeyArrowRight {
553 horizontalScroll <- -xOffset
554 }
555 }
556 }
557 }
558
559 func renderTable(table *topTable, requestCh <-chan topRequest, done <-chan struct{}, horizontalScroll chan int) {
560 scrollpos := 0
561 ticker := time.NewTicker(100 * time.Millisecond)
562 width, _ := termbox.Size()
563 tablewidth := table.tableWidthCalc()
564
565 for {
566 select {
567 case <-done:
568 return
569 case req := <-requestCh:
570 table.insert(req)
571 case <-ticker.C:
572 termbox.Clear(termbox.ColorDefault, termbox.ColorDefault)
573 width, _ = termbox.Size()
574 table.adjustColumnWidths()
575 tablewidth = table.tableWidthCalc()
576 table.renderHeaders(scrollpos)
577 table.renderBody(scrollpos)
578 termbox.Flush()
579 case offset := <-horizontalScroll:
580 if (offset > 0 && scrollpos < 0) || (offset < 0 && scrollpos > (width-tablewidth)) {
581 scrollpos += offset
582 }
583 }
584 }
585 }
586
587 func newRow(req topRequest) (tableRow, error) {
588 path := req.reqInit.GetPath()
589 route := req.event.GetRouteMeta().GetLabels()["route"]
590 if route == "" {
591 route = metricsAPI.DefaultRouteName
592 }
593
594 source := stripPort(addr.PublicAddressToString(req.event.GetSource()))
595 if pod := req.event.SourceMeta.Labels["pod"]; pod != "" {
596 source = pod
597 }
598 destination := stripPort(addr.PublicAddressToString(req.event.GetDestination()))
599 if pod := req.event.DestinationMeta.Labels["pod"]; pod != "" {
600 destination = pod
601 }
602
603 err := req.rspEnd.GetSinceRequestInit().CheckValid()
604 if err != nil {
605 return tableRow{}, fmt.Errorf("error parsing duration %v: %w", req.rspEnd.GetSinceRequestInit(), err)
606 }
607 latency := req.rspEnd.GetSinceRequestInit().AsDuration()
608
609
610
611 success := req.rspInit.GetHttpStatus() < 500
612 if success {
613 switch eos := req.rspEnd.GetEos().GetEnd().(type) {
614 case *metricsPb.Eos_GrpcStatusCode:
615 switch codes.Code(eos.GrpcStatusCode) {
616 case codes.Unknown,
617 codes.DeadlineExceeded,
618 codes.Internal,
619 codes.Unavailable,
620 codes.DataLoss:
621 success = false
622 default:
623 success = true
624 }
625
626 case *metricsPb.Eos_ResetErrorCode:
627 success = false
628 }
629 }
630
631 successes := 0
632 failures := 0
633 if success {
634 successes = 1
635 } else {
636 failures = 1
637 }
638
639 return tableRow{
640 path: path,
641 method: vizutil.HTTPMethodToString(req.reqInit.GetMethod()),
642 route: route,
643 source: source,
644 destination: destination,
645 best: latency,
646 worst: latency,
647 last: latency,
648 count: 1,
649 successes: successes,
650 failures: failures,
651 }, nil
652 }
653
654 func (t *topTable) insert(req topRequest) {
655 insert, err := newRow(req)
656 if err != nil {
657 log.Error(err.Error())
658 return
659 }
660
661 found := false
662
663 for i, row := range t.rows {
664 match := true
665
666 for _, col := range t.columns {
667 if col.key {
668 if col.value(row) != col.value(insert) {
669 match = false
670 break
671 }
672 }
673 }
674 if match {
675 found = true
676 t.rows[i] = t.rows[i].merge(insert)
677 break
678 }
679 }
680 if !found {
681 t.rows = append(t.rows, insert)
682 }
683 }
684
685 func stripPort(address string) string {
686 return strings.Split(address, ":")[0]
687 }
688
689 func (t *topTable) renderHeaders(scrollpos int) {
690 tbprint(0, 0, "(press q to quit)")
691 tbprint(0, 1, "(press a/LeftArrowKey to scroll left, d/RightArrowKey to scroll right)")
692 x := scrollpos
693 for _, col := range t.columns {
694 if !col.display {
695 continue
696 }
697 padding := 0
698 if col.rightAlign {
699 padding = col.width - runewidth.StringWidth(col.header)
700 }
701 tbprintBold(x+padding, headerHeight-1, col.header)
702 x += col.width + columnSpacing
703 }
704 }
705
706 func (t *topTable) tableWidthCalc() int {
707 tablewidth := 0
708 for i := range t.columns {
709 tablewidth = tablewidth + t.columns[i].width + columnSpacing
710 }
711 return tablewidth - columnSpacing
712 }
713
714 func (t *topTable) adjustColumnWidths() {
715 for i, col := range t.columns {
716 if !col.flexible {
717 continue
718 }
719 t.columns[i].width = runewidth.StringWidth(col.header)
720 for _, row := range t.rows {
721 cellWidth := runewidth.StringWidth(col.value(row))
722 if cellWidth > t.columns[i].width {
723 t.columns[i].width = cellWidth
724 }
725 }
726 }
727 }
728
729 func (t *topTable) renderBody(scrollpos int) {
730 sort.SliceStable(t.rows, func(i, j int) bool {
731 return t.rows[i].count > t.rows[j].count
732 })
733
734 for i, row := range t.rows {
735 x := scrollpos
736
737 for _, col := range t.columns {
738 if !col.display {
739 continue
740 }
741 value := col.value(row)
742 padding := 0
743 if col.rightAlign {
744 padding = col.width - runewidth.StringWidth(value)
745 }
746 tbprint(x+padding, i+headerHeight, value)
747 x += col.width + columnSpacing
748 }
749 }
750 }
751
752 func tbprint(x, y int, msg string) {
753 for _, c := range msg {
754 termbox.SetCell(x, y, c, termbox.ColorDefault, termbox.ColorDefault)
755 x += runewidth.RuneWidth(c)
756 }
757 }
758
759 func tbprintBold(x, y int, msg string) {
760 for _, c := range msg {
761 termbox.SetCell(x, y, c, termbox.AttrBold, termbox.ColorDefault)
762 x += runewidth.RuneWidth(c)
763 }
764 }
765
766 func formatDuration(d time.Duration) string {
767 if d < time.Millisecond {
768 return d.Round(time.Microsecond).String()
769 }
770 if d < time.Second {
771 return d.Round(time.Millisecond).String()
772 }
773 return d.Round(time.Second).String()
774 }
775
View as plain text