1 package cmd
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "os"
9 "sort"
10 "strings"
11 "text/tabwriter"
12 "time"
13
14 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
15 "github.com/linkerd/linkerd2/pkg/healthcheck"
16 "github.com/linkerd/linkerd2/pkg/k8s"
17 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
18 "github.com/linkerd/linkerd2/viz/metrics-api/util"
19 "github.com/linkerd/linkerd2/viz/pkg/api"
20 hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
21 pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
22 log "github.com/sirupsen/logrus"
23 "github.com/spf13/cobra"
24 v1 "k8s.io/api/core/v1"
25 )
26
27 type statOptions struct {
28 statOptionsBase
29 toNamespace string
30 toResource string
31 fromNamespace string
32 fromResource string
33 allNamespaces bool
34 labelSelector string
35 unmeshed bool
36 }
37
38 type statOptionsBase struct {
39
40
41
42
43
44 namespace string
45 timeWindow string
46 outputFormat string
47 }
48
49 func newStatOptionsBase() *statOptionsBase {
50 return &statOptionsBase{
51 timeWindow: "1m",
52 outputFormat: tableOutput,
53 }
54 }
55
56 func (o *statOptionsBase) validateOutputFormat() error {
57 switch o.outputFormat {
58 case tableOutput, jsonOutput, wideOutput:
59 return nil
60 default:
61 return fmt.Errorf("--output currently only supports %s, %s and %s", tableOutput, jsonOutput, wideOutput)
62 }
63 }
64
65 type indexedResults struct {
66 ix int
67 rows []*pb.StatTable_PodGroup_Row
68 err error
69 }
70
71 func newStatOptions() *statOptions {
72 return &statOptions{
73 statOptionsBase: *newStatOptionsBase(),
74 toNamespace: "",
75 toResource: "",
76 fromNamespace: "",
77 fromResource: "",
78 allNamespaces: false,
79 labelSelector: "",
80 unmeshed: false,
81 }
82 }
83
84
85 func NewCmdStat() *cobra.Command {
86 options := newStatOptions()
87
88 cmd := &cobra.Command{
89 Use: "stat [flags] (RESOURCES)",
90 Short: "Display traffic stats about one or many resources",
91 Long: `Display traffic stats about one or many resources.
92
93 The RESOURCES argument specifies the target resource(s) to aggregate stats over:
94 (TYPE [NAME] | TYPE/NAME)
95 or (TYPE [NAME1] [NAME2]...)
96 or (TYPE1/NAME1 TYPE2/NAME2...)
97
98 Examples:
99 * cronjob/my-cronjob
100 * deploy
101 * deploy/my-deploy
102 * deploy/ po/
103 * ds/my-daemonset
104 * job/my-job
105 * ns/my-ns
106 * po/mypod1 rc/my-replication-controller
107 * po mypod1 mypod2
108 * rc/my-replication-controller
109 * rs
110 * rs/my-replicaset
111 * sts/my-statefulset
112 * ts/my-split
113 * authority
114 * au/my-authority
115 * httproute/my-route
116 * route/my-route
117 * all
118
119 Valid resource types include:
120 * cronjobs
121 * daemonsets
122 * deployments
123 * namespaces
124 * jobs
125 * pods
126 * replicasets
127 * replicationcontrollers
128 * statefulsets
129 * authorities (not supported in --from)
130 * authorizationpolicies (not supported in --from)
131 * httproutes (not supported in --from)
132 * services (not supported in --from)
133 * servers (not supported in --from)
134 * serverauthorizations (not supported in --from)
135 * all (all resource types, not supported in --from or --to)
136
137 This command will hide resources that have completed, such as pods that are in the Succeeded or Failed phases.
138 If no resource name is specified, displays stats about all resources of the specified RESOURCETYPE`,
139 Example: ` # Get all deployments in the test namespace.
140 linkerd viz stat deployments -n test
141
142 # Get the hello1 replication controller in the test namespace.
143 linkerd viz stat replicationcontrollers hello1 -n test
144
145 # Get all namespaces.
146 linkerd viz stat namespaces
147
148 # Get all inbound stats to the web deployment.
149 linkerd viz stat deploy/web
150
151 # Get all inbound stats to the pod1 and pod2 pods
152 linkerd viz stat po pod1 pod2
153
154 # Get all inbound stats to the pod1 pod and the web deployment
155 linkerd viz stat po/pod1 deploy/web
156
157 # Get all pods in all namespaces that call the hello1 deployment in the test namespace.
158 linkerd viz stat pods --to deploy/hello1 --to-namespace test --all-namespaces
159
160 # Get all pods in all namespaces that call the hello1 service in the test namespace.
161 linkerd viz stat pods --to svc/hello1 --to-namespace test --all-namespaces
162
163 # Get the web service. With Services, metrics are generated from the outbound metrics
164 # of clients, and thus will not include unmeshed client request metrics.
165 linkerd viz stat svc/web
166
167 # Get the web services and metrics for any traffic coming to the service from the hello1 deployment
168 # in the test namespace.
169 linkerd viz stat svc/web --from deploy/hello1 --from-namespace test
170
171 # Get the web services and metrics for all the traffic that reaches the web-pod1 pod
172 # in the test namespace exclusively.
173 linkerd viz stat svc/web --to pod/web-pod1 --to-namespace test
174
175 # Get all services in all namespaces that receive calls from hello1 deployment in the test namespace.
176 linkerd viz stat services --from deploy/hello1 --from-namespace test --all-namespaces
177
178 # Get all namespaces that receive traffic from the default namespace.
179 linkerd viz stat namespaces --from ns/default
180
181 # Get all inbound stats to the test namespace.
182 linkerd viz stat ns/test
183
184 # Get all inbound stats to the emoji-grpc server
185 linkerd viz stat server/emoji-grpc
186
187 # Get all inbound stats to the web-public server authorization resource
188 linkerd viz stat serverauthorization/web-public
189
190 # Get all inbound stats to the web-get and web-delete HTTP route resources
191 linkerd viz stat route/web-get route/web-delete
192
193 # Get all inbound stats to the web-authz authorization policy resource
194 linkerd viz stat authorizationpolicy/web-authz
195 `,
196 Args: cobra.MinimumNArgs(1),
197 ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
198
199 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
200 if err != nil {
201 return nil, cobra.ShellCompDirectiveError
202 }
203
204 if options.namespace == "" {
205 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
206 }
207
208 if options.allNamespaces {
209 options.namespace = v1.NamespaceAll
210 }
211
212 cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
213
214 results, err := cc.Complete(args, toComplete)
215 if err != nil {
216 return nil, cobra.ShellCompDirectiveError
217 }
218
219 return results, cobra.ShellCompDirectiveDefault
220 },
221 RunE: func(cmd *cobra.Command, args []string) error {
222 if options.namespace == "" {
223 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
224 }
225
226 reqs, err := buildStatSummaryRequests(args, options)
227 if err != nil {
228 return fmt.Errorf("error creating metrics request while making stats request: %w", err)
229 }
230
231
232
233 client := api.CheckClientOrExit(hc.VizOptions{
234 Options: &healthcheck.Options{
235 ControlPlaneNamespace: controlPlaneNamespace,
236 KubeConfig: kubeconfigPath,
237 Impersonate: impersonate,
238 ImpersonateGroup: impersonateGroup,
239 KubeContext: kubeContext,
240 APIAddr: apiAddr,
241 },
242 VizNamespaceOverride: vizNamespace,
243 })
244
245 c := make(chan indexedResults, len(reqs))
246 for num, req := range reqs {
247 go func(num int, req *pb.StatSummaryRequest) {
248 resp, err := requestStatsFromAPI(client, req)
249 rows := respToRows(resp)
250 c <- indexedResults{num, rows, err}
251 }(num, req)
252 }
253
254 totalRows := make([]*pb.StatTable_PodGroup_Row, 0)
255 i := 0
256 for res := range c {
257 if res.err != nil {
258 fmt.Fprint(os.Stderr, res.err.Error())
259 os.Exit(1)
260 }
261 totalRows = append(totalRows, res.rows...)
262 if i++; i == len(reqs) {
263 close(c)
264 }
265 }
266
267 output := renderStatStats(totalRows, options)
268 _, err = fmt.Print(output)
269
270 return err
271 },
272 }
273
274 cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified resource")
275 cmd.PersistentFlags().StringVarP(&options.timeWindow, "time-window", "t", options.timeWindow, "Stat window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.")
276 cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource, "If present, restricts outbound stats to the specified resource name")
277 cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace, "Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
278 cmd.PersistentFlags().StringVar(&options.fromResource, "from", options.fromResource, "If present, restricts outbound stats from the specified resource name")
279 cmd.PersistentFlags().StringVar(&options.fromNamespace, "from-namespace", options.fromNamespace, "Sets the namespace used from lookup the \"--from\" resource; by default the current \"--namespace\" is used")
280 cmd.PersistentFlags().BoolVarP(&options.allNamespaces, "all-namespaces", "A", options.allNamespaces, "If present, returns stats across all namespaces, ignoring the \"--namespace\" flag")
281 cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, "Output format; one of: \"table\" or \"json\" or \"wide\"")
282 cmd.PersistentFlags().StringVarP(&options.labelSelector, "selector", "l", options.labelSelector, "Selector (label query) to filter on, supports '=', '==', and '!='")
283 cmd.PersistentFlags().BoolVar(&options.unmeshed, "unmeshed", options.unmeshed, "If present, include unmeshed resources in the output")
284
285 pkgcmd.ConfigureNamespaceFlagCompletion(
286 cmd, []string{"namespace", "to-namespace", "from-namespace"},
287 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
288 return cmd
289 }
290
291 func respToRows(resp *pb.StatSummaryResponse) []*pb.StatTable_PodGroup_Row {
292 rows := make([]*pb.StatTable_PodGroup_Row, 0)
293 if resp != nil {
294 for _, statTable := range resp.GetOk().StatTables {
295 rows = append(rows, statTable.GetPodGroup().Rows...)
296 }
297 }
298 return rows
299 }
300
301 func requestStatsFromAPI(client pb.ApiClient, req *pb.StatSummaryRequest) (*pb.StatSummaryResponse, error) {
302 resp, err := client.StatSummary(context.Background(), req)
303 if err != nil {
304 return nil, fmt.Errorf("StatSummary API error: %w", err)
305 }
306 if e := resp.GetError(); e != nil {
307 return nil, fmt.Errorf("StatSummary API response error: %v", e.Error)
308 }
309
310 return resp, nil
311 }
312
313 func renderStatStats(rows []*pb.StatTable_PodGroup_Row, options *statOptions) string {
314 var buffer bytes.Buffer
315 w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
316 writeStatsToBuffer(rows, w, options)
317 w.Flush()
318
319 return renderStats(buffer, &options.statOptionsBase)
320 }
321
322 const padding = 3
323
324 type rowStats struct {
325 route string
326 dst string
327 requestRate float64
328 successRate float64
329 latencyP50 uint64
330 latencyP95 uint64
331 latencyP99 uint64
332 tcpOpenConnections uint64
333 tcpReadBytes float64
334 tcpWriteBytes float64
335 }
336
337 type srvStats struct {
338 unauthorizedRate float64
339 server string
340 }
341
342 type row struct {
343 meshed string
344 status string
345 *rowStats
346 *tsStats
347 *dstStats
348 *srvStats
349 }
350
351 type tsStats struct {
352 apex string
353 leaf string
354 weight string
355 }
356
357 type dstStats struct {
358 dst string
359 weight string
360 }
361
362 var (
363 nameHeader = "NAME"
364 namespaceHeader = "NAMESPACE"
365 apexHeader = "APEX"
366 leafHeader = "LEAF"
367 weightHeader = "WEIGHT"
368 )
369
370 func statHasRequestData(stat *pb.BasicStats) bool {
371 return stat.GetSuccessCount() != 0 || stat.GetFailureCount() != 0 || stat.GetActualSuccessCount() != 0 || stat.GetActualFailureCount() != 0
372 }
373
374 func isPodOwnerResource(typ string) bool {
375 return typ != k8s.Authority && typ != k8s.Service && typ != k8s.Server && typ != k8s.ServerAuthorization && typ != k8s.AuthorizationPolicy && typ != k8s.HTTPRoute
376 }
377
378 func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer, options *statOptions) {
379 maxNameLength := len(nameHeader)
380 maxNamespaceLength := len(namespaceHeader)
381 maxApexLength := len(apexHeader)
382 maxLeafLength := len(leafHeader)
383 maxDstLength := len(dstHeader)
384 maxWeightLength := len(weightHeader)
385
386 statTables := make(map[string]map[string]*row)
387
388 prefixTypes := make(map[string]bool)
389 for _, r := range rows {
390 prefixTypes[r.Resource.Type] = true
391 }
392 usePrefix := false
393 if len(prefixTypes) > 1 {
394 usePrefix = true
395 }
396
397 for _, r := range rows {
398
399
400 if !options.unmeshed && r.GetMeshedPodCount() == 0 &&
401
402 isPodOwnerResource(r.Resource.Type) &&
403
404
405
406 options.fromResource == "" {
407 continue
408 }
409
410 name := r.Resource.Name
411 nameWithPrefix := name
412 if usePrefix {
413 nameWithPrefix = getNamePrefix(r.Resource.Type) + nameWithPrefix
414 }
415
416 namespace := r.Resource.Namespace
417 key := fmt.Sprintf("%s/%s", namespace, name)
418
419 resourceKey := r.Resource.Type
420
421 if _, ok := statTables[resourceKey]; !ok {
422 statTables[resourceKey] = make(map[string]*row)
423 }
424
425 if len(nameWithPrefix) > maxNameLength {
426 maxNameLength = len(nameWithPrefix)
427 }
428
429 if len(namespace) > maxNamespaceLength {
430 maxNamespaceLength = len(namespace)
431 }
432
433 statTables[resourceKey][key] = &row{}
434 if resourceKey != k8s.Server && resourceKey != k8s.ServerAuthorization {
435 meshedCount := fmt.Sprintf("%d/%d", r.MeshedPodCount, r.RunningPodCount)
436 if resourceKey == k8s.Authority || resourceKey == k8s.Service {
437 meshedCount = "-"
438 }
439 statTables[resourceKey][key] = &row{
440 meshed: meshedCount,
441 status: r.Status,
442 }
443 }
444
445 if r.Stats != nil && statHasRequestData(r.Stats) {
446 statTables[resourceKey][key].rowStats = &rowStats{
447 requestRate: getRequestRate(r.Stats.GetSuccessCount(), r.Stats.GetFailureCount(), r.TimeWindow),
448 successRate: getSuccessRate(r.Stats.GetSuccessCount(), r.Stats.GetFailureCount()),
449 latencyP50: r.Stats.LatencyMsP50,
450 latencyP95: r.Stats.LatencyMsP95,
451 latencyP99: r.Stats.LatencyMsP99,
452 tcpOpenConnections: r.GetTcpStats().GetOpenConnections(),
453 tcpReadBytes: getByteRate(r.GetTcpStats().GetReadBytesTotal(), r.TimeWindow),
454 tcpWriteBytes: getByteRate(r.GetTcpStats().GetWriteBytesTotal(), r.TimeWindow),
455 }
456 }
457
458 if r.SrvStats != nil {
459 statTables[resourceKey][key].srvStats = &srvStats{
460 unauthorizedRate: getSuccessRate(r.SrvStats.GetDeniedCount(), r.SrvStats.GetAllowedCount()),
461 server: r.GetSrvStats().GetSrv().GetName(),
462 }
463 }
464 }
465
466 switch options.outputFormat {
467 case tableOutput, wideOutput:
468 if len(statTables) == 0 {
469 fmt.Fprintln(os.Stderr, "No traffic found.")
470 return
471 }
472 printStatTables(statTables, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength, options)
473 case jsonOutput:
474 printStatJSON(statTables, w)
475 }
476 }
477
478 func printStatTables(statTables map[string]map[string]*row, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength int, options *statOptions) {
479 usePrefix := false
480 if len(statTables) > 1 {
481 usePrefix = true
482 }
483
484 firstDisplayedStat := true
485 for _, resourceType := range k8s.AllResources {
486 if stats, ok := statTables[resourceType]; ok {
487 if !firstDisplayedStat {
488 fmt.Fprint(w, "\n")
489 }
490 firstDisplayedStat = false
491 resourceTypeLabel := resourceType
492 if !usePrefix {
493 resourceTypeLabel = ""
494 }
495 printSingleStatTable(stats, resourceTypeLabel, resourceType, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength, options)
496 }
497 }
498 }
499
500 func showTCPBytes(options *statOptions, resourceType string) bool {
501 return (options.outputFormat == wideOutput || options.outputFormat == jsonOutput) &&
502 showTCPConns(resourceType)
503 }
504
505 func showTCPConns(resourceType string) bool {
506 return resourceType != k8s.Authority && resourceType != k8s.ServerAuthorization && resourceType != k8s.AuthorizationPolicy && resourceType != k8s.HTTPRoute
507 }
508
509 func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType string, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength int, options *statOptions) {
510 headers := make([]string, 0)
511 nameTemplate := fmt.Sprintf("%%-%ds", maxNameLength)
512 namespaceTemplate := fmt.Sprintf("%%-%ds", maxNamespaceLength)
513 apexTemplate := fmt.Sprintf("%%-%ds", maxApexLength)
514 leafTemplate := fmt.Sprintf("%%-%ds", maxLeafLength)
515 dstTemplate := fmt.Sprintf("%%-%ds", maxDstLength)
516 weightTemplate := fmt.Sprintf("%%-%ds", maxWeightLength)
517
518 hasDstStats := false
519 for _, r := range stats {
520 if r.dstStats != nil {
521 hasDstStats = true
522 }
523 }
524
525 hasTsStats := false
526 for _, r := range stats {
527 if r.tsStats != nil {
528 hasTsStats = true
529 }
530 }
531
532 if options.allNamespaces {
533 headers = append(headers,
534 fmt.Sprintf(namespaceTemplate, namespaceHeader))
535 }
536
537 headers = append(headers,
538 fmt.Sprintf(nameTemplate, nameHeader))
539
540 if resourceType == k8s.Pod {
541 headers = append(headers, "STATUS")
542 }
543
544 if resourceType == k8s.HTTPRoute {
545 headers = append(headers, "SERVER")
546 }
547
548 if hasDstStats {
549 headers = append(headers,
550 fmt.Sprintf(dstTemplate, dstHeader),
551 fmt.Sprintf(weightTemplate, weightHeader))
552 } else if hasTsStats {
553 headers = append(headers,
554 fmt.Sprintf(apexTemplate, apexHeader),
555 fmt.Sprintf(leafTemplate, leafHeader),
556 fmt.Sprintf(weightTemplate, weightHeader))
557 } else if resourceType != k8s.Server && resourceType != k8s.ServerAuthorization && resourceType != k8s.AuthorizationPolicy && resourceType != k8s.HTTPRoute {
558 headers = append(headers, "MESHED")
559 }
560
561 if resourceType == k8s.Server || resourceType == k8s.HTTPRoute {
562 headers = append(headers, "UNAUTHORIZED")
563 }
564
565 headers = append(headers, []string{
566 "SUCCESS",
567 "RPS",
568 "LATENCY_P50",
569 "LATENCY_P95",
570 "LATENCY_P99",
571 }...)
572
573 if showTCPConns(resourceType) {
574 headers = append(headers, "TCP_CONN")
575 }
576
577 if showTCPBytes(options, resourceType) {
578 headers = append(headers, []string{
579 "READ_BYTES/SEC",
580 "WRITE_BYTES/SEC",
581 }...)
582 }
583
584 headers[len(headers)-1] = headers[len(headers)-1] + "\t"
585
586 fmt.Fprintln(w, strings.Join(headers, "\t"))
587
588 sortedKeys := sortStatsKeys(stats)
589 for _, key := range sortedKeys {
590 namespace, name := namespaceName(resourceTypeLabel, key)
591 values := make([]interface{}, 0)
592 templateString := "%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
593 templateStringEmpty := "%s\t%s\t-\t-\t-\t-\t-\t"
594 if resourceType == k8s.Pod {
595 templateString = "%s\t" + templateString
596 templateStringEmpty = "%s\t" + templateStringEmpty
597 }
598
599 if hasTsStats {
600 templateString = "%s\t%s\t%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
601 templateStringEmpty = "%s\t%s\t%s\t%s\t-\t-\t-\t-\t-\t"
602 } else if hasDstStats {
603 templateString = "%s\t%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
604 templateStringEmpty = "%s\t%s\t%s\t-\t-\t-\t-\t-\t"
605 } else if resourceType == k8s.ServerAuthorization || resourceType == k8s.AuthorizationPolicy {
606 templateString = "%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
607 templateStringEmpty = "%s\t-\t-\t-\t-\t-\t"
608 } else if resourceType == k8s.Server {
609 templateString = "%s\t%.1frps\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
610 templateStringEmpty = "%s\t%.1frps\t-\t-\t-\t-\t-\t"
611 } else if resourceType == k8s.HTTPRoute {
612 templateString = "%s\t%s\t%.1frps\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
613 templateStringEmpty = "%s\t%s\t%.1frps\t-\t-\t-\t-\t-\t"
614 }
615
616 if showTCPConns(resourceType) {
617 templateString += "%d\t"
618 templateStringEmpty += "-\t"
619 }
620
621 if showTCPBytes(options, resourceType) {
622 templateString += "%.1fB/s\t%.1fB/s\t"
623 templateStringEmpty += "-\t-\t"
624 }
625
626 if options.allNamespaces {
627 values = append(values,
628 namespace+strings.Repeat(" ", maxNamespaceLength-len(namespace)))
629 templateString = "%s\t" + templateString
630 templateStringEmpty = "%s\t" + templateStringEmpty
631 }
632
633 templateString += "\n"
634 templateStringEmpty += "\n"
635
636 padding := 0
637 if maxNameLength > len(name) {
638 padding = maxNameLength - len(name)
639 }
640
641 apexPadding := 0
642 leafPadding := 0
643 dstPadding := 0
644
645 if stats[key].tsStats != nil {
646 if maxApexLength > len(stats[key].tsStats.apex) {
647 apexPadding = maxApexLength - len(stats[key].tsStats.apex)
648 }
649 if maxLeafLength > len(stats[key].tsStats.leaf) {
650 leafPadding = maxLeafLength - len(stats[key].tsStats.leaf)
651 }
652 } else if stats[key].dstStats != nil {
653 if maxDstLength > len(stats[key].dstStats.dst) {
654 dstPadding = maxDstLength - len(stats[key].dstStats.dst)
655 }
656 }
657
658 values = append(values, name+strings.Repeat(" ", padding))
659 if resourceType == k8s.Pod {
660 values = append(values, stats[key].status)
661 }
662
663 if hasTsStats {
664 values = append(values,
665 stats[key].tsStats.apex+strings.Repeat(" ", apexPadding),
666 stats[key].tsStats.leaf+strings.Repeat(" ", leafPadding),
667 stats[key].tsStats.weight,
668 )
669 } else if hasDstStats {
670 values = append(values,
671 stats[key].dstStats.dst+strings.Repeat(" ", dstPadding),
672 stats[key].dstStats.weight,
673 )
674 } else if resourceType != k8s.ServerAuthorization && resourceType != k8s.Server && resourceType != k8s.AuthorizationPolicy && resourceType != k8s.HTTPRoute {
675 values = append(values, []interface{}{
676 stats[key].meshed,
677 }...)
678 }
679
680 if resourceType == k8s.HTTPRoute {
681 values = append(values, stats[key].srvStats.server)
682 }
683
684 if resourceType == k8s.Server || resourceType == k8s.HTTPRoute {
685 var unauthorizedRate float64
686 if stats[key].srvStats != nil {
687 unauthorizedRate = stats[key].srvStats.unauthorizedRate
688 }
689 values = append(values, []interface{}{
690 unauthorizedRate,
691 }...)
692 }
693
694 if stats[key].rowStats != nil {
695 values = append(values, []interface{}{
696 stats[key].successRate * 100,
697 stats[key].requestRate,
698 stats[key].latencyP50,
699 stats[key].latencyP95,
700 stats[key].latencyP99,
701 }...)
702
703 if showTCPConns(resourceType) {
704 values = append(values, stats[key].tcpOpenConnections)
705 }
706
707 if showTCPBytes(options, resourceType) {
708 values = append(values, []interface{}{
709 stats[key].tcpReadBytes,
710 stats[key].tcpWriteBytes,
711 }...)
712 }
713
714 fmt.Fprintf(w, templateString, values...)
715 } else {
716 fmt.Fprintf(w, templateStringEmpty, values...)
717 }
718 }
719 }
720
721 func namespaceName(resourceType string, key string) (string, string) {
722 parts := strings.Split(key, "/")
723 namespace := parts[0]
724 namePrefix := getNamePrefix(resourceType)
725 name := namePrefix + parts[1]
726 return namespace, name
727 }
728
729
730 type jsonStats struct {
731 Namespace string `json:"namespace"`
732 Kind string `json:"kind"`
733 Name string `json:"name"`
734 Meshed string `json:"meshed,omitempty"`
735 Success *float64 `json:"success"`
736 Rps *float64 `json:"rps"`
737 LatencyMSp50 *uint64 `json:"latency_ms_p50"`
738 LatencyMSp95 *uint64 `json:"latency_ms_p95"`
739 LatencyMSp99 *uint64 `json:"latency_ms_p99"`
740 TCPConnections *uint64 `json:"tcp_open_connections,omitempty"`
741 TCPReadBytes *float64 `json:"tcp_read_bytes_rate,omitempty"`
742 TCPWriteBytes *float64 `json:"tcp_write_bytes_rate,omitempty"`
743 Apex string `json:"apex,omitempty"`
744 Leaf string `json:"leaf,omitempty"`
745 Dst string `json:"dst,omitempty"`
746 Weight string `json:"weight,omitempty"`
747 Unauthorized *float64 `json:"unauthorized,omitempty"`
748 }
749
750 func printStatJSON(statTables map[string]map[string]*row, w *tabwriter.Writer) {
751
752 entries := []*jsonStats{}
753 for _, resourceType := range k8s.AllResources {
754 if stats, ok := statTables[resourceType]; ok {
755 sortedKeys := sortStatsKeys(stats)
756 for _, key := range sortedKeys {
757 namespace, name := namespaceName("", key)
758 entry := &jsonStats{
759 Namespace: namespace,
760 Kind: resourceType,
761 Name: name,
762 }
763
764 if stats[key].rowStats != nil {
765 entry.Success = &stats[key].successRate
766 entry.Rps = &stats[key].requestRate
767 entry.LatencyMSp50 = &stats[key].latencyP50
768 entry.LatencyMSp95 = &stats[key].latencyP95
769 entry.LatencyMSp99 = &stats[key].latencyP99
770
771 if showTCPConns(resourceType) {
772 entry.TCPConnections = &stats[key].tcpOpenConnections
773 entry.TCPReadBytes = &stats[key].tcpReadBytes
774 entry.TCPWriteBytes = &stats[key].tcpWriteBytes
775 }
776 }
777
778 if stats[key].tsStats != nil {
779 entry.Apex = stats[key].apex
780 entry.Leaf = stats[key].leaf
781 entry.Weight = stats[key].tsStats.weight
782 } else if stats[key].dstStats != nil {
783 entry.Dst = stats[key].dstStats.dst
784 entry.Weight = stats[key].dstStats.weight
785 }
786
787 if resourceType == k8s.Server {
788 if stats[key].srvStats != nil {
789 entry.Unauthorized = &stats[key].srvStats.unauthorizedRate
790 }
791 }
792 entries = append(entries, entry)
793 }
794 }
795 }
796 b, err := json.MarshalIndent(entries, "", " ")
797 if err != nil {
798 log.Error(err.Error())
799 return
800 }
801 fmt.Fprintf(w, "%s\n", b)
802 }
803
804 func getNamePrefix(resourceType string) string {
805 if resourceType == "" {
806 return ""
807 }
808
809 canonicalType := k8s.ShortNameFromCanonicalResourceName(resourceType)
810 return canonicalType + "/"
811 }
812
813 func buildStatSummaryRequests(resources []string, options *statOptions) ([]*pb.StatSummaryRequest, error) {
814 targets, err := pkgUtil.BuildResources(options.namespace, resources)
815 if err != nil {
816 return nil, err
817 }
818
819 var toRes, fromRes *pb.Resource
820 if options.toResource != "" {
821 toRes, err = pkgUtil.BuildResource(options.toNamespace, options.toResource)
822 if err != nil {
823 return nil, err
824 }
825 }
826 if options.fromResource != "" {
827 fromRes, err = pkgUtil.BuildResource(options.fromNamespace, options.fromResource)
828 if err != nil {
829 return nil, err
830 }
831 }
832
833 requests := make([]*pb.StatSummaryRequest, 0)
834 for _, target := range targets {
835 err = options.validate(target.Type)
836 if err != nil {
837 return nil, err
838 }
839
840 requestParams := util.StatsSummaryRequestParams{
841 StatsBaseRequestParams: util.StatsBaseRequestParams{
842 TimeWindow: options.timeWindow,
843 ResourceName: target.Name,
844 ResourceType: target.Type,
845 Namespace: options.namespace,
846 AllNamespaces: options.allNamespaces,
847 },
848 ToNamespace: options.toNamespace,
849 FromNamespace: options.fromNamespace,
850 TCPStats: true,
851 LabelSelector: options.labelSelector,
852 }
853 if fromRes != nil {
854 requestParams.FromName = fromRes.Name
855 requestParams.FromType = fromRes.Type
856 }
857 if toRes != nil {
858 requestParams.ToName = toRes.Name
859 requestParams.ToType = toRes.Type
860 }
861
862 req, err := util.BuildStatSummaryRequest(requestParams)
863 if err != nil {
864 return nil, err
865 }
866 requests = append(requests, req)
867 }
868
869 return requests, nil
870 }
871
872 func sortStatsKeys(stats map[string]*row) []string {
873 var sortedKeys []string
874 for key := range stats {
875 sortedKeys = append(sortedKeys, key)
876 }
877 sort.Strings(sortedKeys)
878 return sortedKeys
879 }
880
881
882
883 func (o *statOptions) validate(resourceType string) error {
884 err := o.validateConflictingFlags()
885 if err != nil {
886 return err
887 }
888
889 if resourceType == k8s.Namespace {
890 err := o.validateNamespaceFlags()
891 if err != nil {
892 return err
893 }
894 }
895
896 return o.validateOutputFormat()
897 }
898
899
900
901 func (o *statOptions) validateConflictingFlags() error {
902 if o.toResource != "" && o.fromResource != "" {
903 return fmt.Errorf("--to and --from flags are mutually exclusive")
904 }
905
906 if o.toNamespace != "" && o.fromNamespace != "" {
907 return fmt.Errorf("--to-namespace and --from-namespace flags are mutually exclusive")
908 }
909
910 if o.allNamespaces && o.namespace != pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext) {
911 return fmt.Errorf("--all-namespaces and --namespace flags are mutually exclusive")
912 }
913
914 return nil
915 }
916
917
918
919 func (o *statOptions) validateNamespaceFlags() error {
920 if o.toNamespace != "" {
921 return fmt.Errorf("--to-namespace flag is incompatible with namespace resource type")
922 }
923
924 if o.fromNamespace != "" {
925 return fmt.Errorf("--from-namespace flag is incompatible with namespace resource type")
926 }
927
928
929
930 if o.namespace != pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext) {
931 return fmt.Errorf("--namespace flag is incompatible with namespace resource type")
932 }
933
934 return nil
935 }
936
937
938 func getByteRate(bytes uint64, timeWindow string) float64 {
939 windowLength, err := time.ParseDuration(timeWindow)
940 if err != nil {
941 log.Error(err.Error())
942 return 0.0
943 }
944 return float64(bytes) / windowLength.Seconds()
945 }
946
947 func renderStats(buffer bytes.Buffer, options *statOptionsBase) string {
948 var out string
949 switch options.outputFormat {
950 case jsonOutput:
951 out = buffer.String()
952 default:
953
954 b := buffer.Bytes()
955 if len(b) > padding {
956 out = string(b[padding:])
957 }
958 out = strings.ReplaceAll(out, "\n"+strings.Repeat(" ", padding), "\n")
959 }
960
961 return out
962 }
963
View as plain text