...

Source file src/github.com/linkerd/linkerd2/viz/cmd/edges.go

Documentation: github.com/linkerd/linkerd2/viz/cmd

     1  package cmd
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"fmt"
     8  	"os"
     9  	"sort"
    10  	"strings"
    11  	"text/tabwriter"
    12  
    13  	"github.com/fatih/color"
    14  	pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
    15  	"github.com/linkerd/linkerd2/pkg/healthcheck"
    16  	"github.com/linkerd/linkerd2/pkg/k8s"
    17  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    18  	"github.com/linkerd/linkerd2/viz/metrics-api/util"
    19  	"github.com/linkerd/linkerd2/viz/pkg/api"
    20  	hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
    21  	pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
    22  	"github.com/spf13/cobra"
    23  	v1 "k8s.io/api/core/v1"
    24  )
    25  
    26  var (
    27  	okStatus = color.New(color.FgGreen, color.Bold).SprintFunc()("\u221A") // √
    28  
    29  )
    30  
    31  type edgesOptions struct {
    32  	namespace     string
    33  	outputFormat  string
    34  	allNamespaces bool
    35  }
    36  
    37  func newEdgesOptions() *edgesOptions {
    38  	return &edgesOptions{
    39  		outputFormat:  tableOutput,
    40  		allNamespaces: false,
    41  	}
    42  }
    43  
    44  type indexedEdgeResults struct {
    45  	ix   int
    46  	rows []*pb.Edge
    47  	err  error
    48  }
    49  
    50  // NewCmdEdges creates a new cobra command `edges` for edges functionality
    51  func NewCmdEdges() *cobra.Command {
    52  	options := newEdgesOptions()
    53  
    54  	cmd := &cobra.Command{
    55  		Use:   "edges [flags] (RESOURCETYPE)",
    56  		Short: "Display connections between resources, and Linkerd proxy identities",
    57  		Long: `Display connections between resources, and Linkerd proxy identities.
    58  
    59    The RESOURCETYPE argument specifies the type of resource to display edges within.
    60  
    61    Examples:
    62    * cronjob
    63    * deploy
    64    * ds
    65    * job
    66    * po
    67    * rc
    68    * rs
    69    * sts
    70  
    71    Valid resource types include:
    72    * cronjobs
    73    * daemonsets
    74    * deployments
    75    * jobs
    76    * pods
    77    * replicasets
    78    * replicationcontrollers
    79    * statefulsets`,
    80  		Example: `  # Get all edges between pods that either originate from or terminate in the test namespace.
    81    linkerd viz edges po -n test
    82  
    83    # Get all edges between pods that either originate from or terminate in the default namespace.
    84    linkerd viz edges po
    85  
    86    # Get all edges between pods in all namespaces.
    87    linkerd viz edges po --all-namespaces`,
    88  		Args: cobra.ExactArgs(1),
    89  		ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
    90  			// This command requires only one argument. If we already have
    91  			// one after requesting autocompletion i.e. [tab][tab]
    92  			// skip running validArgsFunction
    93  			if len(args) > 0 {
    94  				return []string{}, cobra.ShellCompDirectiveError
    95  			}
    96  
    97  			k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
    98  			if err != nil {
    99  				return nil, cobra.ShellCompDirectiveError
   100  			}
   101  
   102  			if options.namespace == "" {
   103  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   104  			}
   105  
   106  			if options.allNamespaces {
   107  				options.namespace = v1.NamespaceAll
   108  			}
   109  
   110  			cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
   111  
   112  			results, err := cc.Complete(args, toComplete)
   113  			if err != nil {
   114  				return nil, cobra.ShellCompDirectiveError
   115  			}
   116  
   117  			return results, cobra.ShellCompDirectiveDefault
   118  		},
   119  		RunE: func(cmd *cobra.Command, args []string) error {
   120  			if options.namespace == "" {
   121  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   122  			}
   123  
   124  			reqs, err := buildEdgesRequests(args, options)
   125  			if err != nil {
   126  				return fmt.Errorf("Error creating edges request: %w", err)
   127  			}
   128  
   129  			// The gRPC client is concurrency-safe, so we can reuse it in all the following goroutines
   130  			// https://github.com/grpc/grpc-go/issues/682
   131  			client := api.CheckClientOrExit(hc.VizOptions{
   132  				Options: &healthcheck.Options{
   133  					ControlPlaneNamespace: controlPlaneNamespace,
   134  					KubeConfig:            kubeconfigPath,
   135  					Impersonate:           impersonate,
   136  					ImpersonateGroup:      impersonateGroup,
   137  					KubeContext:           kubeContext,
   138  					APIAddr:               apiAddr,
   139  				},
   140  				VizNamespaceOverride: vizNamespace,
   141  			})
   142  
   143  			c := make(chan indexedEdgeResults, len(reqs))
   144  			for num, req := range reqs {
   145  				go func(num int, req *pb.EdgesRequest) {
   146  					resp, err := requestEdgesFromAPI(client, req)
   147  					rows := edgesRespToRows(resp)
   148  					c <- indexedEdgeResults{num, rows, err}
   149  				}(num, req)
   150  			}
   151  
   152  			totalRows := make([]*pb.Edge, 0)
   153  			i := 0
   154  			for res := range c {
   155  				if res.err != nil {
   156  					fmt.Fprint(os.Stderr, res.err.Error())
   157  					os.Exit(1)
   158  				}
   159  				totalRows = append(totalRows, res.rows...)
   160  				if i++; i == len(reqs) {
   161  					close(c)
   162  				}
   163  			}
   164  
   165  			output := renderEdgeStats(totalRows, options)
   166  			_, err = fmt.Print(output)
   167  
   168  			return err
   169  		},
   170  	}
   171  
   172  	cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified resource")
   173  	cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, "Output format; one of: \"table\" or \"json\" or \"wide\"")
   174  	cmd.PersistentFlags().BoolVarP(&options.allNamespaces, "all-namespaces", "A", options.allNamespaces, "If present, returns edges across all namespaces, ignoring the \"--namespace\" flag")
   175  
   176  	pkgcmd.ConfigureNamespaceFlagCompletion(
   177  		cmd, []string{"namespace"},
   178  		kubeconfigPath, impersonate, impersonateGroup, kubeContext)
   179  	return cmd
   180  }
   181  
   182  // validateEdgesRequestInputs ensures that the resource type and output format are both supported
   183  // by the edges command, since the edges command does not support all k8s resource types.
   184  func validateEdgesRequestInputs(targets []*pb.Resource, options *edgesOptions) error {
   185  	for _, target := range targets {
   186  		if target.Name != "" {
   187  			return fmt.Errorf("Edges cannot be returned for a specific resource name; remove %s from query", target.Name)
   188  		}
   189  		switch target.Type {
   190  		case "authority", "service", "all":
   191  			return fmt.Errorf("Resource type is not supported: %s", target.Type)
   192  		}
   193  	}
   194  
   195  	switch options.outputFormat {
   196  	case tableOutput, jsonOutput, wideOutput:
   197  		return nil
   198  	default:
   199  		return fmt.Errorf("--output supports %s, %s and %s", tableOutput, jsonOutput, wideOutput)
   200  	}
   201  }
   202  
   203  func buildEdgesRequests(resources []string, options *edgesOptions) ([]*pb.EdgesRequest, error) {
   204  	targets, err := pkgUtil.BuildResources(options.namespace, resources)
   205  
   206  	if err != nil {
   207  		return nil, err
   208  	}
   209  	err = validateEdgesRequestInputs(targets, options)
   210  	if err != nil {
   211  		return nil, err
   212  	}
   213  
   214  	requests := make([]*pb.EdgesRequest, 0)
   215  	for _, target := range targets {
   216  		requestParams := util.EdgesRequestParams{
   217  			ResourceType:  target.Type,
   218  			Namespace:     options.namespace,
   219  			AllNamespaces: options.allNamespaces,
   220  		}
   221  
   222  		req, err := util.BuildEdgesRequest(requestParams)
   223  		if err != nil {
   224  			return nil, err
   225  		}
   226  		requests = append(requests, req)
   227  	}
   228  	return requests, nil
   229  }
   230  
   231  func edgesRespToRows(resp *pb.EdgesResponse) []*pb.Edge {
   232  	rows := make([]*pb.Edge, 0)
   233  	if resp != nil {
   234  		rows = append(rows, resp.GetOk().Edges...)
   235  	}
   236  	return rows
   237  }
   238  
   239  func requestEdgesFromAPI(client pb.ApiClient, req *pb.EdgesRequest) (*pb.EdgesResponse, error) {
   240  	resp, err := client.Edges(context.Background(), req)
   241  	if err != nil {
   242  		return nil, fmt.Errorf("Edges API error: %w", err)
   243  	}
   244  	if e := resp.GetError(); e != nil {
   245  		return nil, fmt.Errorf("Edges API response error: %s", e.Error)
   246  	}
   247  	return resp, nil
   248  }
   249  
   250  func renderEdgeStats(rows []*pb.Edge, options *edgesOptions) string {
   251  	var buffer bytes.Buffer
   252  	w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
   253  	writeEdgesToBuffer(rows, w, options)
   254  	w.Flush()
   255  
   256  	return renderEdges(buffer, options)
   257  }
   258  
   259  type edgeRow struct {
   260  	src          string
   261  	srcNamespace string
   262  	dst          string
   263  	dstNamespace string
   264  	client       string
   265  	server       string
   266  	msg          string
   267  }
   268  
   269  const (
   270  	srcHeader          = "SRC"
   271  	dstHeader          = "DST"
   272  	srcNamespaceHeader = "SRC_NS"
   273  	dstNamespaceHeader = "DST_NS"
   274  	clientHeader       = "CLIENT_ID"
   275  	serverHeader       = "SERVER_ID"
   276  	msgHeader          = "SECURED"
   277  )
   278  
   279  func writeEdgesToBuffer(rows []*pb.Edge, w *tabwriter.Writer, options *edgesOptions) {
   280  	maxSrcLength := len(srcHeader)
   281  	maxDstLength := len(dstHeader)
   282  	maxSrcNamespaceLength := len(srcNamespaceHeader)
   283  	maxDstNamespaceLength := len(dstNamespaceHeader)
   284  	maxClientLength := len(clientHeader)
   285  	maxServerLength := len(serverHeader)
   286  	maxMsgLength := len(msgHeader)
   287  
   288  	edgeRows := []edgeRow{}
   289  	for _, r := range rows {
   290  		clientID := r.ClientId
   291  		serverID := r.ServerId
   292  		msg := r.NoIdentityMsg
   293  		if msg == "" && options.outputFormat != jsonOutput {
   294  			msg = okStatus
   295  		}
   296  		if len(clientID) > 0 {
   297  			parts := strings.Split(clientID, ".")
   298  			clientID = parts[0] + "." + parts[1]
   299  		}
   300  		if len(serverID) > 0 {
   301  			parts := strings.Split(serverID, ".")
   302  			serverID = parts[0] + "." + parts[1]
   303  		}
   304  
   305  		row := edgeRow{
   306  			client:       clientID,
   307  			server:       serverID,
   308  			msg:          msg,
   309  			src:          r.Src.Name,
   310  			srcNamespace: r.Src.Namespace,
   311  			dst:          r.Dst.Name,
   312  			dstNamespace: r.Dst.Namespace,
   313  		}
   314  
   315  		edgeRows = append(edgeRows, row)
   316  
   317  		if len(r.Src.Name) > maxSrcLength {
   318  			maxSrcLength = len(r.Src.Name)
   319  		}
   320  		if len(r.Src.Namespace) > maxSrcNamespaceLength {
   321  			maxSrcNamespaceLength = len(r.Src.Namespace)
   322  		}
   323  		if len(r.Dst.Name) > maxDstLength {
   324  			maxDstLength = len(r.Dst.Name)
   325  		}
   326  		if len(r.Dst.Namespace) > maxDstNamespaceLength {
   327  			maxDstNamespaceLength = len(r.Dst.Namespace)
   328  		}
   329  		if len(clientID) > maxClientLength {
   330  			maxClientLength = len(clientID)
   331  		}
   332  		if len(serverID) > maxServerLength {
   333  			maxServerLength = len(serverID)
   334  		}
   335  		if len(msg) > maxMsgLength {
   336  			maxMsgLength = len(msg)
   337  		}
   338  	}
   339  
   340  	// ordering the rows first by SRC/DST namespace, then by SRC/DST resource
   341  	sort.Slice(edgeRows, func(i, j int) bool {
   342  		keyI := edgeRows[i].srcNamespace + edgeRows[i].dstNamespace + edgeRows[i].src + edgeRows[i].dst
   343  		keyJ := edgeRows[j].srcNamespace + edgeRows[j].dstNamespace + edgeRows[j].src + edgeRows[j].dst
   344  		return keyI < keyJ
   345  	})
   346  
   347  	switch options.outputFormat {
   348  	case tableOutput, wideOutput:
   349  		if len(edgeRows) == 0 {
   350  			fmt.Fprintln(os.Stderr, "No edges found.")
   351  			os.Exit(0)
   352  		}
   353  		printEdgeTable(edgeRows, w, maxSrcLength, maxSrcNamespaceLength, maxDstLength, maxDstNamespaceLength, maxClientLength, maxServerLength, maxMsgLength, options.outputFormat)
   354  	case jsonOutput:
   355  		printEdgesJSON(edgeRows, w)
   356  	}
   357  }
   358  
   359  func printEdgeTable(edgeRows []edgeRow, w *tabwriter.Writer, maxSrcLength, maxSrcNamespaceLength, maxDstLength, maxDstNamespaceLength, maxClientLength, maxServerLength, maxMsgLength int, outputFormat string) {
   360  	srcTemplate := fmt.Sprintf("%%-%ds", maxSrcLength)
   361  	dstTemplate := fmt.Sprintf("%%-%ds", maxDstLength)
   362  	srcNamespaceTemplate := fmt.Sprintf("%%-%ds", maxSrcNamespaceLength)
   363  	dstNamespaceTemplate := fmt.Sprintf("%%-%ds", maxDstNamespaceLength)
   364  	msgTemplate := fmt.Sprintf("%%-%ds", maxMsgLength)
   365  	clientTemplate := fmt.Sprintf("%%-%ds", maxClientLength)
   366  	serverTemplate := fmt.Sprintf("%%-%ds", maxServerLength)
   367  
   368  	headers := []string{
   369  		fmt.Sprintf(srcTemplate, srcHeader),
   370  		fmt.Sprintf(dstTemplate, dstHeader),
   371  		fmt.Sprintf(srcNamespaceTemplate, srcNamespaceHeader),
   372  		fmt.Sprintf(dstNamespaceTemplate, dstNamespaceHeader),
   373  	}
   374  
   375  	if outputFormat == wideOutput {
   376  		headers = append(headers, fmt.Sprintf(clientTemplate, clientHeader), fmt.Sprintf(serverTemplate, serverHeader))
   377  	}
   378  
   379  	headers = append(headers, fmt.Sprintf(msgTemplate, msgHeader)+"\t")
   380  
   381  	fmt.Fprintln(w, strings.Join(headers, "\t"))
   382  
   383  	for _, row := range edgeRows {
   384  		values := []interface{}{
   385  			row.src,
   386  			row.dst,
   387  			row.srcNamespace,
   388  			row.dstNamespace,
   389  		}
   390  		templateString := fmt.Sprintf("%s\t%s\t%s\t%s\t", srcTemplate, dstTemplate, srcNamespaceTemplate, dstNamespaceTemplate)
   391  
   392  		if outputFormat == wideOutput {
   393  			templateString += fmt.Sprintf("%s\t%s\t", clientTemplate, serverTemplate)
   394  			values = append(values, row.client, row.server)
   395  		}
   396  
   397  		templateString += fmt.Sprintf("%s\t\n", msgTemplate)
   398  		values = append(values, row.msg)
   399  
   400  		fmt.Fprintf(w, templateString, values...)
   401  	}
   402  }
   403  
   404  func renderEdges(buffer bytes.Buffer, options *edgesOptions) string {
   405  	var out string
   406  	switch options.outputFormat {
   407  	case jsonOutput:
   408  		out = buffer.String()
   409  	default:
   410  		// strip left padding on the first column
   411  		out = string(buffer.Bytes()[padding:])
   412  		out = strings.ReplaceAll(out, "\n"+strings.Repeat(" ", padding), "\n")
   413  	}
   414  
   415  	return out
   416  }
   417  
   418  type edgesJSONStats struct {
   419  	Src          string `json:"src"`
   420  	SrcNamespace string `json:"src_namespace"`
   421  	Dst          string `json:"dst"`
   422  	DstNamespace string `json:"dst_namespace"`
   423  	Client       string `json:"client_id"`
   424  	Server       string `json:"server_id"`
   425  	Msg          string `json:"no_tls_reason"`
   426  }
   427  
   428  func printEdgesJSON(edgeRows []edgeRow, w *tabwriter.Writer) {
   429  	// avoid nil initialization so that if there are not stats it gets marshalled as an empty array vs null
   430  	entries := []*edgesJSONStats{}
   431  
   432  	for _, row := range edgeRows {
   433  		entry := &edgesJSONStats{
   434  			Src:          row.src,
   435  			SrcNamespace: row.srcNamespace,
   436  			Dst:          row.dst,
   437  			DstNamespace: row.dstNamespace,
   438  			Client:       row.client,
   439  			Server:       row.server,
   440  			Msg:          row.msg}
   441  		entries = append(entries, entry)
   442  	}
   443  
   444  	b, err := json.MarshalIndent(entries, "", "  ")
   445  	if err != nil {
   446  		fmt.Fprintf(os.Stderr, "Error marshalling JSON: %s\n", err)
   447  		return
   448  	}
   449  	fmt.Fprintf(w, "%s\n", b)
   450  }
   451  

View as plain text