...

Source file src/github.com/linkerd/linkerd2/viz/cmd/stat.go

Documentation: github.com/linkerd/linkerd2/viz/cmd

     1  package cmd
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"fmt"
     8  	"os"
     9  	"sort"
    10  	"strings"
    11  	"text/tabwriter"
    12  	"time"
    13  
    14  	pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
    15  	"github.com/linkerd/linkerd2/pkg/healthcheck"
    16  	"github.com/linkerd/linkerd2/pkg/k8s"
    17  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    18  	"github.com/linkerd/linkerd2/viz/metrics-api/util"
    19  	"github.com/linkerd/linkerd2/viz/pkg/api"
    20  	hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
    21  	pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
    22  	log "github.com/sirupsen/logrus"
    23  	"github.com/spf13/cobra"
    24  	v1 "k8s.io/api/core/v1"
    25  )
    26  
    27  type statOptions struct {
    28  	statOptionsBase
    29  	toNamespace   string
    30  	toResource    string
    31  	fromNamespace string
    32  	fromResource  string
    33  	allNamespaces bool
    34  	labelSelector string
    35  	unmeshed      bool
    36  }
    37  
    38  type statOptionsBase struct {
    39  	// namespace is only referenced from the outer struct statOptions (e.g.
    40  	// options.namespace where options's type is _not_ this struct).
    41  	// structcheck issues a false positive for this field as it does not think
    42  	// it's used.
    43  	//nolint:structcheck
    44  	namespace    string
    45  	timeWindow   string
    46  	outputFormat string
    47  }
    48  
    49  func newStatOptionsBase() *statOptionsBase {
    50  	return &statOptionsBase{
    51  		timeWindow:   "1m",
    52  		outputFormat: tableOutput,
    53  	}
    54  }
    55  
    56  func (o *statOptionsBase) validateOutputFormat() error {
    57  	switch o.outputFormat {
    58  	case tableOutput, jsonOutput, wideOutput:
    59  		return nil
    60  	default:
    61  		return fmt.Errorf("--output currently only supports %s, %s and %s", tableOutput, jsonOutput, wideOutput)
    62  	}
    63  }
    64  
    65  type indexedResults struct {
    66  	ix   int
    67  	rows []*pb.StatTable_PodGroup_Row
    68  	err  error
    69  }
    70  
    71  func newStatOptions() *statOptions {
    72  	return &statOptions{
    73  		statOptionsBase: *newStatOptionsBase(),
    74  		toNamespace:     "",
    75  		toResource:      "",
    76  		fromNamespace:   "",
    77  		fromResource:    "",
    78  		allNamespaces:   false,
    79  		labelSelector:   "",
    80  		unmeshed:        false,
    81  	}
    82  }
    83  
    84  // NewCmdStat creates a new cobra command `stat` for stat functionality
    85  func NewCmdStat() *cobra.Command {
    86  	options := newStatOptions()
    87  
    88  	cmd := &cobra.Command{
    89  		Use:   "stat [flags] (RESOURCES)",
    90  		Short: "Display traffic stats about one or many resources",
    91  		Long: `Display traffic stats about one or many resources.
    92  
    93    The RESOURCES argument specifies the target resource(s) to aggregate stats over:
    94    (TYPE [NAME] | TYPE/NAME)
    95    or (TYPE [NAME1] [NAME2]...)
    96    or (TYPE1/NAME1 TYPE2/NAME2...)
    97  
    98    Examples:
    99    * cronjob/my-cronjob
   100    * deploy
   101    * deploy/my-deploy
   102    * deploy/ po/
   103    * ds/my-daemonset
   104    * job/my-job
   105    * ns/my-ns
   106    * po/mypod1 rc/my-replication-controller
   107    * po mypod1 mypod2
   108    * rc/my-replication-controller
   109    * rs
   110    * rs/my-replicaset
   111    * sts/my-statefulset
   112    * ts/my-split
   113    * authority
   114    * au/my-authority
   115    * httproute/my-route
   116    * route/my-route
   117    * all
   118  
   119    Valid resource types include:
   120    * cronjobs
   121    * daemonsets
   122    * deployments
   123    * namespaces
   124    * jobs
   125    * pods
   126    * replicasets
   127    * replicationcontrollers
   128    * statefulsets
   129    * authorities (not supported in --from)
   130    * authorizationpolicies (not supported in --from)
   131    * httproutes (not supported in --from)
   132    * services (not supported in --from)
   133    * servers (not supported in --from)
   134    * serverauthorizations (not supported in --from)
   135    * all (all resource types, not supported in --from or --to)
   136  
   137  This command will hide resources that have completed, such as pods that are in the Succeeded or Failed phases.
   138  If no resource name is specified, displays stats about all resources of the specified RESOURCETYPE`,
   139  		Example: `  # Get all deployments in the test namespace.
   140    linkerd viz stat deployments -n test
   141  
   142    # Get the hello1 replication controller in the test namespace.
   143    linkerd viz stat replicationcontrollers hello1 -n test
   144  
   145    # Get all namespaces.
   146    linkerd viz stat namespaces
   147  
   148    # Get all inbound stats to the web deployment.
   149    linkerd viz stat deploy/web
   150  
   151    # Get all inbound stats to the pod1 and pod2 pods
   152    linkerd viz stat po pod1 pod2
   153  
   154    # Get all inbound stats to the pod1 pod and the web deployment
   155    linkerd viz stat po/pod1 deploy/web
   156  
   157    # Get all pods in all namespaces that call the hello1 deployment in the test namespace.
   158    linkerd viz stat pods --to deploy/hello1 --to-namespace test --all-namespaces
   159  
   160    # Get all pods in all namespaces that call the hello1 service in the test namespace.
   161    linkerd viz stat pods --to svc/hello1 --to-namespace test --all-namespaces
   162  
   163    # Get the web service. With Services, metrics are generated from the outbound metrics
   164    # of clients, and thus will not include unmeshed client request metrics.
   165    linkerd viz stat svc/web
   166  
   167    # Get the web services and metrics for any traffic coming to the service from the hello1 deployment
   168    # in the test namespace.
   169    linkerd viz stat svc/web --from deploy/hello1 --from-namespace test
   170  
   171    # Get the web services and metrics for all the traffic that reaches the web-pod1 pod
   172    # in the test namespace exclusively.
   173    linkerd viz stat svc/web --to pod/web-pod1 --to-namespace test
   174  
   175    # Get all services in all namespaces that receive calls from hello1 deployment in the test namespace.
   176    linkerd viz stat services --from deploy/hello1 --from-namespace test --all-namespaces
   177  
   178    # Get all namespaces that receive traffic from the default namespace.
   179    linkerd viz stat namespaces --from ns/default
   180  
   181    # Get all inbound stats to the test namespace.
   182    linkerd viz stat ns/test
   183  
   184    # Get all inbound stats to the emoji-grpc server
   185    linkerd viz stat server/emoji-grpc
   186  
   187    # Get all inbound stats to the web-public server authorization resource
   188    linkerd viz stat serverauthorization/web-public
   189  
   190    # Get all inbound stats to the web-get and web-delete HTTP route resources
   191    linkerd viz stat route/web-get route/web-delete
   192  
   193    # Get all inbound stats to the web-authz authorization policy resource
   194    linkerd viz stat authorizationpolicy/web-authz
   195    `,
   196  		Args: cobra.MinimumNArgs(1),
   197  		ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
   198  
   199  			k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
   200  			if err != nil {
   201  				return nil, cobra.ShellCompDirectiveError
   202  			}
   203  
   204  			if options.namespace == "" {
   205  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   206  			}
   207  
   208  			if options.allNamespaces {
   209  				options.namespace = v1.NamespaceAll
   210  			}
   211  
   212  			cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
   213  
   214  			results, err := cc.Complete(args, toComplete)
   215  			if err != nil {
   216  				return nil, cobra.ShellCompDirectiveError
   217  			}
   218  
   219  			return results, cobra.ShellCompDirectiveDefault
   220  		},
   221  		RunE: func(cmd *cobra.Command, args []string) error {
   222  			if options.namespace == "" {
   223  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   224  			}
   225  
   226  			reqs, err := buildStatSummaryRequests(args, options)
   227  			if err != nil {
   228  				return fmt.Errorf("error creating metrics request while making stats request: %w", err)
   229  			}
   230  
   231  			// The gRPC client is concurrency-safe, so we can reuse it in all the following goroutines
   232  			// https://github.com/grpc/grpc-go/issues/682
   233  			client := api.CheckClientOrExit(hc.VizOptions{
   234  				Options: &healthcheck.Options{
   235  					ControlPlaneNamespace: controlPlaneNamespace,
   236  					KubeConfig:            kubeconfigPath,
   237  					Impersonate:           impersonate,
   238  					ImpersonateGroup:      impersonateGroup,
   239  					KubeContext:           kubeContext,
   240  					APIAddr:               apiAddr,
   241  				},
   242  				VizNamespaceOverride: vizNamespace,
   243  			})
   244  
   245  			c := make(chan indexedResults, len(reqs))
   246  			for num, req := range reqs {
   247  				go func(num int, req *pb.StatSummaryRequest) {
   248  					resp, err := requestStatsFromAPI(client, req)
   249  					rows := respToRows(resp)
   250  					c <- indexedResults{num, rows, err}
   251  				}(num, req)
   252  			}
   253  
   254  			totalRows := make([]*pb.StatTable_PodGroup_Row, 0)
   255  			i := 0
   256  			for res := range c {
   257  				if res.err != nil {
   258  					fmt.Fprint(os.Stderr, res.err.Error())
   259  					os.Exit(1)
   260  				}
   261  				totalRows = append(totalRows, res.rows...)
   262  				if i++; i == len(reqs) {
   263  					close(c)
   264  				}
   265  			}
   266  
   267  			output := renderStatStats(totalRows, options)
   268  			_, err = fmt.Print(output)
   269  
   270  			return err
   271  		},
   272  	}
   273  
   274  	cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified resource")
   275  	cmd.PersistentFlags().StringVarP(&options.timeWindow, "time-window", "t", options.timeWindow, "Stat window (for example: \"15s\", \"1m\", \"10m\", \"1h\"). Needs to be at least 15s.")
   276  	cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource, "If present, restricts outbound stats to the specified resource name")
   277  	cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace, "Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
   278  	cmd.PersistentFlags().StringVar(&options.fromResource, "from", options.fromResource, "If present, restricts outbound stats from the specified resource name")
   279  	cmd.PersistentFlags().StringVar(&options.fromNamespace, "from-namespace", options.fromNamespace, "Sets the namespace used from lookup the \"--from\" resource; by default the current \"--namespace\" is used")
   280  	cmd.PersistentFlags().BoolVarP(&options.allNamespaces, "all-namespaces", "A", options.allNamespaces, "If present, returns stats across all namespaces, ignoring the \"--namespace\" flag")
   281  	cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, "Output format; one of: \"table\" or \"json\" or \"wide\"")
   282  	cmd.PersistentFlags().StringVarP(&options.labelSelector, "selector", "l", options.labelSelector, "Selector (label query) to filter on, supports '=', '==', and '!='")
   283  	cmd.PersistentFlags().BoolVar(&options.unmeshed, "unmeshed", options.unmeshed, "If present, include unmeshed resources in the output")
   284  
   285  	pkgcmd.ConfigureNamespaceFlagCompletion(
   286  		cmd, []string{"namespace", "to-namespace", "from-namespace"},
   287  		kubeconfigPath, impersonate, impersonateGroup, kubeContext)
   288  	return cmd
   289  }
   290  
   291  func respToRows(resp *pb.StatSummaryResponse) []*pb.StatTable_PodGroup_Row {
   292  	rows := make([]*pb.StatTable_PodGroup_Row, 0)
   293  	if resp != nil {
   294  		for _, statTable := range resp.GetOk().StatTables {
   295  			rows = append(rows, statTable.GetPodGroup().Rows...)
   296  		}
   297  	}
   298  	return rows
   299  }
   300  
   301  func requestStatsFromAPI(client pb.ApiClient, req *pb.StatSummaryRequest) (*pb.StatSummaryResponse, error) {
   302  	resp, err := client.StatSummary(context.Background(), req)
   303  	if err != nil {
   304  		return nil, fmt.Errorf("StatSummary API error: %w", err)
   305  	}
   306  	if e := resp.GetError(); e != nil {
   307  		return nil, fmt.Errorf("StatSummary API response error: %v", e.Error)
   308  	}
   309  
   310  	return resp, nil
   311  }
   312  
   313  func renderStatStats(rows []*pb.StatTable_PodGroup_Row, options *statOptions) string {
   314  	var buffer bytes.Buffer
   315  	w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
   316  	writeStatsToBuffer(rows, w, options)
   317  	w.Flush()
   318  
   319  	return renderStats(buffer, &options.statOptionsBase)
   320  }
   321  
   322  const padding = 3
   323  
   324  type rowStats struct {
   325  	route              string
   326  	dst                string
   327  	requestRate        float64
   328  	successRate        float64
   329  	latencyP50         uint64
   330  	latencyP95         uint64
   331  	latencyP99         uint64
   332  	tcpOpenConnections uint64
   333  	tcpReadBytes       float64
   334  	tcpWriteBytes      float64
   335  }
   336  
   337  type srvStats struct {
   338  	unauthorizedRate float64
   339  	server           string
   340  }
   341  
   342  type row struct {
   343  	meshed string
   344  	status string
   345  	*rowStats
   346  	*tsStats
   347  	*dstStats
   348  	*srvStats
   349  }
   350  
   351  type tsStats struct {
   352  	apex   string
   353  	leaf   string
   354  	weight string
   355  }
   356  
   357  type dstStats struct {
   358  	dst    string
   359  	weight string
   360  }
   361  
   362  var (
   363  	nameHeader      = "NAME"
   364  	namespaceHeader = "NAMESPACE"
   365  	apexHeader      = "APEX"
   366  	leafHeader      = "LEAF"
   367  	weightHeader    = "WEIGHT"
   368  )
   369  
   370  func statHasRequestData(stat *pb.BasicStats) bool {
   371  	return stat.GetSuccessCount() != 0 || stat.GetFailureCount() != 0 || stat.GetActualSuccessCount() != 0 || stat.GetActualFailureCount() != 0
   372  }
   373  
   374  func isPodOwnerResource(typ string) bool {
   375  	return typ != k8s.Authority && typ != k8s.Service && typ != k8s.Server && typ != k8s.ServerAuthorization && typ != k8s.AuthorizationPolicy && typ != k8s.HTTPRoute
   376  }
   377  
   378  func writeStatsToBuffer(rows []*pb.StatTable_PodGroup_Row, w *tabwriter.Writer, options *statOptions) {
   379  	maxNameLength := len(nameHeader)
   380  	maxNamespaceLength := len(namespaceHeader)
   381  	maxApexLength := len(apexHeader)
   382  	maxLeafLength := len(leafHeader)
   383  	maxDstLength := len(dstHeader)
   384  	maxWeightLength := len(weightHeader)
   385  
   386  	statTables := make(map[string]map[string]*row)
   387  
   388  	prefixTypes := make(map[string]bool)
   389  	for _, r := range rows {
   390  		prefixTypes[r.Resource.Type] = true
   391  	}
   392  	usePrefix := false
   393  	if len(prefixTypes) > 1 {
   394  		usePrefix = true
   395  	}
   396  
   397  	for _, r := range rows {
   398  
   399  		// Skip unmeshed pods if the unmeshed option isn't enabled.
   400  		if !options.unmeshed && r.GetMeshedPodCount() == 0 &&
   401  			// Skip only if the resource can own pods
   402  			isPodOwnerResource(r.Resource.Type) &&
   403  			// Skip only if --from isn't specified (unmeshed resources can show
   404  			// stats in --from mode because metrics are collected on the client
   405  			// side).
   406  			options.fromResource == "" {
   407  			continue
   408  		}
   409  
   410  		name := r.Resource.Name
   411  		nameWithPrefix := name
   412  		if usePrefix {
   413  			nameWithPrefix = getNamePrefix(r.Resource.Type) + nameWithPrefix
   414  		}
   415  
   416  		namespace := r.Resource.Namespace
   417  		key := fmt.Sprintf("%s/%s", namespace, name)
   418  
   419  		resourceKey := r.Resource.Type
   420  
   421  		if _, ok := statTables[resourceKey]; !ok {
   422  			statTables[resourceKey] = make(map[string]*row)
   423  		}
   424  
   425  		if len(nameWithPrefix) > maxNameLength {
   426  			maxNameLength = len(nameWithPrefix)
   427  		}
   428  
   429  		if len(namespace) > maxNamespaceLength {
   430  			maxNamespaceLength = len(namespace)
   431  		}
   432  
   433  		statTables[resourceKey][key] = &row{}
   434  		if resourceKey != k8s.Server && resourceKey != k8s.ServerAuthorization {
   435  			meshedCount := fmt.Sprintf("%d/%d", r.MeshedPodCount, r.RunningPodCount)
   436  			if resourceKey == k8s.Authority || resourceKey == k8s.Service {
   437  				meshedCount = "-"
   438  			}
   439  			statTables[resourceKey][key] = &row{
   440  				meshed: meshedCount,
   441  				status: r.Status,
   442  			}
   443  		}
   444  
   445  		if r.Stats != nil && statHasRequestData(r.Stats) {
   446  			statTables[resourceKey][key].rowStats = &rowStats{
   447  				requestRate:        getRequestRate(r.Stats.GetSuccessCount(), r.Stats.GetFailureCount(), r.TimeWindow),
   448  				successRate:        getSuccessRate(r.Stats.GetSuccessCount(), r.Stats.GetFailureCount()),
   449  				latencyP50:         r.Stats.LatencyMsP50,
   450  				latencyP95:         r.Stats.LatencyMsP95,
   451  				latencyP99:         r.Stats.LatencyMsP99,
   452  				tcpOpenConnections: r.GetTcpStats().GetOpenConnections(),
   453  				tcpReadBytes:       getByteRate(r.GetTcpStats().GetReadBytesTotal(), r.TimeWindow),
   454  				tcpWriteBytes:      getByteRate(r.GetTcpStats().GetWriteBytesTotal(), r.TimeWindow),
   455  			}
   456  		}
   457  
   458  		if r.SrvStats != nil {
   459  			statTables[resourceKey][key].srvStats = &srvStats{
   460  				unauthorizedRate: getSuccessRate(r.SrvStats.GetDeniedCount(), r.SrvStats.GetAllowedCount()),
   461  				server:           r.GetSrvStats().GetSrv().GetName(),
   462  			}
   463  		}
   464  	}
   465  
   466  	switch options.outputFormat {
   467  	case tableOutput, wideOutput:
   468  		if len(statTables) == 0 {
   469  			fmt.Fprintln(os.Stderr, "No traffic found.")
   470  			return
   471  		}
   472  		printStatTables(statTables, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength, options)
   473  	case jsonOutput:
   474  		printStatJSON(statTables, w)
   475  	}
   476  }
   477  
   478  func printStatTables(statTables map[string]map[string]*row, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength int, options *statOptions) {
   479  	usePrefix := false
   480  	if len(statTables) > 1 {
   481  		usePrefix = true
   482  	}
   483  
   484  	firstDisplayedStat := true // don't print a newline before the first stat
   485  	for _, resourceType := range k8s.AllResources {
   486  		if stats, ok := statTables[resourceType]; ok {
   487  			if !firstDisplayedStat {
   488  				fmt.Fprint(w, "\n")
   489  			}
   490  			firstDisplayedStat = false
   491  			resourceTypeLabel := resourceType
   492  			if !usePrefix {
   493  				resourceTypeLabel = ""
   494  			}
   495  			printSingleStatTable(stats, resourceTypeLabel, resourceType, w, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength, options)
   496  		}
   497  	}
   498  }
   499  
   500  func showTCPBytes(options *statOptions, resourceType string) bool {
   501  	return (options.outputFormat == wideOutput || options.outputFormat == jsonOutput) &&
   502  		showTCPConns(resourceType)
   503  }
   504  
   505  func showTCPConns(resourceType string) bool {
   506  	return resourceType != k8s.Authority && resourceType != k8s.ServerAuthorization && resourceType != k8s.AuthorizationPolicy && resourceType != k8s.HTTPRoute
   507  }
   508  
   509  func printSingleStatTable(stats map[string]*row, resourceTypeLabel, resourceType string, w *tabwriter.Writer, maxNameLength, maxNamespaceLength, maxLeafLength, maxApexLength, maxDstLength, maxWeightLength int, options *statOptions) {
   510  	headers := make([]string, 0)
   511  	nameTemplate := fmt.Sprintf("%%-%ds", maxNameLength)
   512  	namespaceTemplate := fmt.Sprintf("%%-%ds", maxNamespaceLength)
   513  	apexTemplate := fmt.Sprintf("%%-%ds", maxApexLength)
   514  	leafTemplate := fmt.Sprintf("%%-%ds", maxLeafLength)
   515  	dstTemplate := fmt.Sprintf("%%-%ds", maxDstLength)
   516  	weightTemplate := fmt.Sprintf("%%-%ds", maxWeightLength)
   517  
   518  	hasDstStats := false
   519  	for _, r := range stats {
   520  		if r.dstStats != nil {
   521  			hasDstStats = true
   522  		}
   523  	}
   524  
   525  	hasTsStats := false
   526  	for _, r := range stats {
   527  		if r.tsStats != nil {
   528  			hasTsStats = true
   529  		}
   530  	}
   531  
   532  	if options.allNamespaces {
   533  		headers = append(headers,
   534  			fmt.Sprintf(namespaceTemplate, namespaceHeader))
   535  	}
   536  
   537  	headers = append(headers,
   538  		fmt.Sprintf(nameTemplate, nameHeader))
   539  
   540  	if resourceType == k8s.Pod {
   541  		headers = append(headers, "STATUS")
   542  	}
   543  
   544  	if resourceType == k8s.HTTPRoute {
   545  		headers = append(headers, "SERVER")
   546  	}
   547  
   548  	if hasDstStats {
   549  		headers = append(headers,
   550  			fmt.Sprintf(dstTemplate, dstHeader),
   551  			fmt.Sprintf(weightTemplate, weightHeader))
   552  	} else if hasTsStats {
   553  		headers = append(headers,
   554  			fmt.Sprintf(apexTemplate, apexHeader),
   555  			fmt.Sprintf(leafTemplate, leafHeader),
   556  			fmt.Sprintf(weightTemplate, weightHeader))
   557  	} else if resourceType != k8s.Server && resourceType != k8s.ServerAuthorization && resourceType != k8s.AuthorizationPolicy && resourceType != k8s.HTTPRoute {
   558  		headers = append(headers, "MESHED")
   559  	}
   560  
   561  	if resourceType == k8s.Server || resourceType == k8s.HTTPRoute {
   562  		headers = append(headers, "UNAUTHORIZED")
   563  	}
   564  
   565  	headers = append(headers, []string{
   566  		"SUCCESS",
   567  		"RPS",
   568  		"LATENCY_P50",
   569  		"LATENCY_P95",
   570  		"LATENCY_P99",
   571  	}...)
   572  
   573  	if showTCPConns(resourceType) {
   574  		headers = append(headers, "TCP_CONN")
   575  	}
   576  
   577  	if showTCPBytes(options, resourceType) {
   578  		headers = append(headers, []string{
   579  			"READ_BYTES/SEC",
   580  			"WRITE_BYTES/SEC",
   581  		}...)
   582  	}
   583  
   584  	headers[len(headers)-1] = headers[len(headers)-1] + "\t" // trailing \t is required to format last column
   585  
   586  	fmt.Fprintln(w, strings.Join(headers, "\t"))
   587  
   588  	sortedKeys := sortStatsKeys(stats)
   589  	for _, key := range sortedKeys {
   590  		namespace, name := namespaceName(resourceTypeLabel, key)
   591  		values := make([]interface{}, 0)
   592  		templateString := "%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
   593  		templateStringEmpty := "%s\t%s\t-\t-\t-\t-\t-\t"
   594  		if resourceType == k8s.Pod {
   595  			templateString = "%s\t" + templateString
   596  			templateStringEmpty = "%s\t" + templateStringEmpty
   597  		}
   598  
   599  		if hasTsStats {
   600  			templateString = "%s\t%s\t%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
   601  			templateStringEmpty = "%s\t%s\t%s\t%s\t-\t-\t-\t-\t-\t"
   602  		} else if hasDstStats {
   603  			templateString = "%s\t%s\t%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
   604  			templateStringEmpty = "%s\t%s\t%s\t-\t-\t-\t-\t-\t"
   605  		} else if resourceType == k8s.ServerAuthorization || resourceType == k8s.AuthorizationPolicy {
   606  			templateString = "%s\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
   607  			templateStringEmpty = "%s\t-\t-\t-\t-\t-\t"
   608  		} else if resourceType == k8s.Server {
   609  			templateString = "%s\t%.1frps\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
   610  			templateStringEmpty = "%s\t%.1frps\t-\t-\t-\t-\t-\t"
   611  		} else if resourceType == k8s.HTTPRoute {
   612  			templateString = "%s\t%s\t%.1frps\t%.2f%%\t%.1frps\t%dms\t%dms\t%dms\t"
   613  			templateStringEmpty = "%s\t%s\t%.1frps\t-\t-\t-\t-\t-\t"
   614  		}
   615  
   616  		if showTCPConns(resourceType) {
   617  			templateString += "%d\t"
   618  			templateStringEmpty += "-\t"
   619  		}
   620  
   621  		if showTCPBytes(options, resourceType) {
   622  			templateString += "%.1fB/s\t%.1fB/s\t"
   623  			templateStringEmpty += "-\t-\t"
   624  		}
   625  
   626  		if options.allNamespaces {
   627  			values = append(values,
   628  				namespace+strings.Repeat(" ", maxNamespaceLength-len(namespace)))
   629  			templateString = "%s\t" + templateString
   630  			templateStringEmpty = "%s\t" + templateStringEmpty
   631  		}
   632  
   633  		templateString += "\n"
   634  		templateStringEmpty += "\n"
   635  
   636  		padding := 0
   637  		if maxNameLength > len(name) {
   638  			padding = maxNameLength - len(name)
   639  		}
   640  
   641  		apexPadding := 0
   642  		leafPadding := 0
   643  		dstPadding := 0
   644  
   645  		if stats[key].tsStats != nil {
   646  			if maxApexLength > len(stats[key].tsStats.apex) {
   647  				apexPadding = maxApexLength - len(stats[key].tsStats.apex)
   648  			}
   649  			if maxLeafLength > len(stats[key].tsStats.leaf) {
   650  				leafPadding = maxLeafLength - len(stats[key].tsStats.leaf)
   651  			}
   652  		} else if stats[key].dstStats != nil {
   653  			if maxDstLength > len(stats[key].dstStats.dst) {
   654  				dstPadding = maxDstLength - len(stats[key].dstStats.dst)
   655  			}
   656  		}
   657  
   658  		values = append(values, name+strings.Repeat(" ", padding))
   659  		if resourceType == k8s.Pod {
   660  			values = append(values, stats[key].status)
   661  		}
   662  
   663  		if hasTsStats {
   664  			values = append(values,
   665  				stats[key].tsStats.apex+strings.Repeat(" ", apexPadding),
   666  				stats[key].tsStats.leaf+strings.Repeat(" ", leafPadding),
   667  				stats[key].tsStats.weight,
   668  			)
   669  		} else if hasDstStats {
   670  			values = append(values,
   671  				stats[key].dstStats.dst+strings.Repeat(" ", dstPadding),
   672  				stats[key].dstStats.weight,
   673  			)
   674  		} else if resourceType != k8s.ServerAuthorization && resourceType != k8s.Server && resourceType != k8s.AuthorizationPolicy && resourceType != k8s.HTTPRoute {
   675  			values = append(values, []interface{}{
   676  				stats[key].meshed,
   677  			}...)
   678  		}
   679  
   680  		if resourceType == k8s.HTTPRoute {
   681  			values = append(values, stats[key].srvStats.server)
   682  		}
   683  
   684  		if resourceType == k8s.Server || resourceType == k8s.HTTPRoute {
   685  			var unauthorizedRate float64
   686  			if stats[key].srvStats != nil {
   687  				unauthorizedRate = stats[key].srvStats.unauthorizedRate
   688  			}
   689  			values = append(values, []interface{}{
   690  				unauthorizedRate,
   691  			}...)
   692  		}
   693  
   694  		if stats[key].rowStats != nil {
   695  			values = append(values, []interface{}{
   696  				stats[key].successRate * 100,
   697  				stats[key].requestRate,
   698  				stats[key].latencyP50,
   699  				stats[key].latencyP95,
   700  				stats[key].latencyP99,
   701  			}...)
   702  
   703  			if showTCPConns(resourceType) {
   704  				values = append(values, stats[key].tcpOpenConnections)
   705  			}
   706  
   707  			if showTCPBytes(options, resourceType) {
   708  				values = append(values, []interface{}{
   709  					stats[key].tcpReadBytes,
   710  					stats[key].tcpWriteBytes,
   711  				}...)
   712  			}
   713  
   714  			fmt.Fprintf(w, templateString, values...)
   715  		} else {
   716  			fmt.Fprintf(w, templateStringEmpty, values...)
   717  		}
   718  	}
   719  }
   720  
   721  func namespaceName(resourceType string, key string) (string, string) {
   722  	parts := strings.Split(key, "/")
   723  	namespace := parts[0]
   724  	namePrefix := getNamePrefix(resourceType)
   725  	name := namePrefix + parts[1]
   726  	return namespace, name
   727  }
   728  
   729  // Using pointers where the value is NA and the corresponding json is null
   730  type jsonStats struct {
   731  	Namespace      string   `json:"namespace"`
   732  	Kind           string   `json:"kind"`
   733  	Name           string   `json:"name"`
   734  	Meshed         string   `json:"meshed,omitempty"`
   735  	Success        *float64 `json:"success"`
   736  	Rps            *float64 `json:"rps"`
   737  	LatencyMSp50   *uint64  `json:"latency_ms_p50"`
   738  	LatencyMSp95   *uint64  `json:"latency_ms_p95"`
   739  	LatencyMSp99   *uint64  `json:"latency_ms_p99"`
   740  	TCPConnections *uint64  `json:"tcp_open_connections,omitempty"`
   741  	TCPReadBytes   *float64 `json:"tcp_read_bytes_rate,omitempty"`
   742  	TCPWriteBytes  *float64 `json:"tcp_write_bytes_rate,omitempty"`
   743  	Apex           string   `json:"apex,omitempty"`
   744  	Leaf           string   `json:"leaf,omitempty"`
   745  	Dst            string   `json:"dst,omitempty"`
   746  	Weight         string   `json:"weight,omitempty"`
   747  	Unauthorized   *float64 `json:"unauthorized,omitempty"`
   748  }
   749  
   750  func printStatJSON(statTables map[string]map[string]*row, w *tabwriter.Writer) {
   751  	// avoid nil initialization so that if there are not stats it gets marshalled as an empty array vs null
   752  	entries := []*jsonStats{}
   753  	for _, resourceType := range k8s.AllResources {
   754  		if stats, ok := statTables[resourceType]; ok {
   755  			sortedKeys := sortStatsKeys(stats)
   756  			for _, key := range sortedKeys {
   757  				namespace, name := namespaceName("", key)
   758  				entry := &jsonStats{
   759  					Namespace: namespace,
   760  					Kind:      resourceType,
   761  					Name:      name,
   762  				}
   763  
   764  				if stats[key].rowStats != nil {
   765  					entry.Success = &stats[key].successRate
   766  					entry.Rps = &stats[key].requestRate
   767  					entry.LatencyMSp50 = &stats[key].latencyP50
   768  					entry.LatencyMSp95 = &stats[key].latencyP95
   769  					entry.LatencyMSp99 = &stats[key].latencyP99
   770  
   771  					if showTCPConns(resourceType) {
   772  						entry.TCPConnections = &stats[key].tcpOpenConnections
   773  						entry.TCPReadBytes = &stats[key].tcpReadBytes
   774  						entry.TCPWriteBytes = &stats[key].tcpWriteBytes
   775  					}
   776  				}
   777  
   778  				if stats[key].tsStats != nil {
   779  					entry.Apex = stats[key].apex
   780  					entry.Leaf = stats[key].leaf
   781  					entry.Weight = stats[key].tsStats.weight
   782  				} else if stats[key].dstStats != nil {
   783  					entry.Dst = stats[key].dstStats.dst
   784  					entry.Weight = stats[key].dstStats.weight
   785  				}
   786  
   787  				if resourceType == k8s.Server {
   788  					if stats[key].srvStats != nil {
   789  						entry.Unauthorized = &stats[key].srvStats.unauthorizedRate
   790  					}
   791  				}
   792  				entries = append(entries, entry)
   793  			}
   794  		}
   795  	}
   796  	b, err := json.MarshalIndent(entries, "", "  ")
   797  	if err != nil {
   798  		log.Error(err.Error())
   799  		return
   800  	}
   801  	fmt.Fprintf(w, "%s\n", b)
   802  }
   803  
   804  func getNamePrefix(resourceType string) string {
   805  	if resourceType == "" {
   806  		return ""
   807  	}
   808  
   809  	canonicalType := k8s.ShortNameFromCanonicalResourceName(resourceType)
   810  	return canonicalType + "/"
   811  }
   812  
   813  func buildStatSummaryRequests(resources []string, options *statOptions) ([]*pb.StatSummaryRequest, error) {
   814  	targets, err := pkgUtil.BuildResources(options.namespace, resources)
   815  	if err != nil {
   816  		return nil, err
   817  	}
   818  
   819  	var toRes, fromRes *pb.Resource
   820  	if options.toResource != "" {
   821  		toRes, err = pkgUtil.BuildResource(options.toNamespace, options.toResource)
   822  		if err != nil {
   823  			return nil, err
   824  		}
   825  	}
   826  	if options.fromResource != "" {
   827  		fromRes, err = pkgUtil.BuildResource(options.fromNamespace, options.fromResource)
   828  		if err != nil {
   829  			return nil, err
   830  		}
   831  	}
   832  
   833  	requests := make([]*pb.StatSummaryRequest, 0)
   834  	for _, target := range targets {
   835  		err = options.validate(target.Type)
   836  		if err != nil {
   837  			return nil, err
   838  		}
   839  
   840  		requestParams := util.StatsSummaryRequestParams{
   841  			StatsBaseRequestParams: util.StatsBaseRequestParams{
   842  				TimeWindow:    options.timeWindow,
   843  				ResourceName:  target.Name,
   844  				ResourceType:  target.Type,
   845  				Namespace:     options.namespace,
   846  				AllNamespaces: options.allNamespaces,
   847  			},
   848  			ToNamespace:   options.toNamespace,
   849  			FromNamespace: options.fromNamespace,
   850  			TCPStats:      true,
   851  			LabelSelector: options.labelSelector,
   852  		}
   853  		if fromRes != nil {
   854  			requestParams.FromName = fromRes.Name
   855  			requestParams.FromType = fromRes.Type
   856  		}
   857  		if toRes != nil {
   858  			requestParams.ToName = toRes.Name
   859  			requestParams.ToType = toRes.Type
   860  		}
   861  
   862  		req, err := util.BuildStatSummaryRequest(requestParams)
   863  		if err != nil {
   864  			return nil, err
   865  		}
   866  		requests = append(requests, req)
   867  	}
   868  
   869  	return requests, nil
   870  }
   871  
   872  func sortStatsKeys(stats map[string]*row) []string {
   873  	var sortedKeys []string
   874  	for key := range stats {
   875  		sortedKeys = append(sortedKeys, key)
   876  	}
   877  	sort.Strings(sortedKeys)
   878  	return sortedKeys
   879  }
   880  
   881  // validate performs all validation on the command-line options.
   882  // It returns the first error encountered, or `nil` if the options are valid.
   883  func (o *statOptions) validate(resourceType string) error {
   884  	err := o.validateConflictingFlags()
   885  	if err != nil {
   886  		return err
   887  	}
   888  
   889  	if resourceType == k8s.Namespace {
   890  		err := o.validateNamespaceFlags()
   891  		if err != nil {
   892  			return err
   893  		}
   894  	}
   895  
   896  	return o.validateOutputFormat()
   897  }
   898  
   899  // validateConflictingFlags validates that the options do not contain mutually
   900  // exclusive flags.
   901  func (o *statOptions) validateConflictingFlags() error {
   902  	if o.toResource != "" && o.fromResource != "" {
   903  		return fmt.Errorf("--to and --from flags are mutually exclusive")
   904  	}
   905  
   906  	if o.toNamespace != "" && o.fromNamespace != "" {
   907  		return fmt.Errorf("--to-namespace and --from-namespace flags are mutually exclusive")
   908  	}
   909  
   910  	if o.allNamespaces && o.namespace != pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext) {
   911  		return fmt.Errorf("--all-namespaces and --namespace flags are mutually exclusive")
   912  	}
   913  
   914  	return nil
   915  }
   916  
   917  // validateNamespaceFlags performs additional validation for options when the target
   918  // resource type is a namespace.
   919  func (o *statOptions) validateNamespaceFlags() error {
   920  	if o.toNamespace != "" {
   921  		return fmt.Errorf("--to-namespace flag is incompatible with namespace resource type")
   922  	}
   923  
   924  	if o.fromNamespace != "" {
   925  		return fmt.Errorf("--from-namespace flag is incompatible with namespace resource type")
   926  	}
   927  
   928  	// Note: technically, this allows you to say `stat ns --namespace <default-namespace-from-kubectl-context>`, but that
   929  	// seems like an edge case.
   930  	if o.namespace != pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext) {
   931  		return fmt.Errorf("--namespace flag is incompatible with namespace resource type")
   932  	}
   933  
   934  	return nil
   935  }
   936  
   937  // get byte rate calculates the read/write byte rate
   938  func getByteRate(bytes uint64, timeWindow string) float64 {
   939  	windowLength, err := time.ParseDuration(timeWindow)
   940  	if err != nil {
   941  		log.Error(err.Error())
   942  		return 0.0
   943  	}
   944  	return float64(bytes) / windowLength.Seconds()
   945  }
   946  
   947  func renderStats(buffer bytes.Buffer, options *statOptionsBase) string {
   948  	var out string
   949  	switch options.outputFormat {
   950  	case jsonOutput:
   951  		out = buffer.String()
   952  	default:
   953  		// strip left padding on the first column
   954  		b := buffer.Bytes()
   955  		if len(b) > padding {
   956  			out = string(b[padding:])
   957  		}
   958  		out = strings.ReplaceAll(out, "\n"+strings.Repeat(" ", padding), "\n")
   959  	}
   960  
   961  	return out
   962  }
   963  

View as plain text