...

Source file src/github.com/linkerd/linkerd2/cli/cmd/endpoints.go

Documentation: github.com/linkerd/linkerd2/cli/cmd

     1  package cmd
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"os"
    10  	"sort"
    11  	"strings"
    12  	"sync"
    13  	"text/tabwriter"
    14  
    15  	destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
    16  	netPb "github.com/linkerd/linkerd2-proxy-api/go/net"
    17  	"github.com/linkerd/linkerd2/controller/api/destination"
    18  	"github.com/linkerd/linkerd2/pkg/addr"
    19  	pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
    20  	"github.com/linkerd/linkerd2/pkg/k8s"
    21  	log "github.com/sirupsen/logrus"
    22  	"github.com/spf13/cobra"
    23  	"google.golang.org/grpc"
    24  	"google.golang.org/grpc/status"
    25  )
    26  
    27  type endpointsOptions struct {
    28  	outputFormat   string
    29  	destinationPod string
    30  	contextToken   string
    31  }
    32  
    33  type (
    34  	// map[ServiceID]map[Port][]podData
    35  	endpointsInfo map[string]map[uint32][]podData
    36  	podData       struct {
    37  		name    string
    38  		address string
    39  		ip      string
    40  		weight  uint32
    41  		labels  map[string]string
    42  		http2   *destinationPb.Http2ClientParams
    43  	}
    44  )
    45  
    46  const (
    47  	podHeader       = "POD"
    48  	namespaceHeader = "NAMESPACE"
    49  	padding         = 3
    50  )
    51  
    52  // validate performs all validation on the command-line options.
    53  // It returns the first error encountered, or `nil` if the options are valid.
    54  func (o *endpointsOptions) validate() error {
    55  	if o.outputFormat == tableOutput || o.outputFormat == jsonOutput {
    56  		return nil
    57  	}
    58  
    59  	return fmt.Errorf("--output currently only supports %s and %s", tableOutput, jsonOutput)
    60  }
    61  
    62  func newEndpointsOptions() *endpointsOptions {
    63  	return &endpointsOptions{
    64  		outputFormat: tableOutput,
    65  	}
    66  }
    67  
    68  func newCmdEndpoints() *cobra.Command {
    69  	options := newEndpointsOptions()
    70  
    71  	example := `  # get all endpoints for the authorities emoji-svc.emojivoto.svc.cluster.local:8080 and web-svc.emojivoto.svc.cluster.local:80
    72    linkerd diagnostics endpoints emoji-svc.emojivoto.svc.cluster.local:8080 web-svc.emojivoto.svc.cluster.local:80
    73  
    74    # get that same information in json format
    75    linkerd diagnostics endpoints -o json emoji-svc.emojivoto.svc.cluster.local:8080 web-svc.emojivoto.svc.cluster.local:80
    76  
    77    # get the endpoints for authorities in Linkerd's control-plane itself
    78    linkerd diagnostics endpoints web.linkerd-viz.svc.cluster.local:8084`
    79  
    80  	cmd := &cobra.Command{
    81  		Use:     "endpoints [flags] authorities",
    82  		Aliases: []string{"ep"},
    83  		Short:   "Introspect Linkerd's service discovery state",
    84  		Long: `Introspect Linkerd's service discovery state.
    85  
    86  This command provides debug information about the internal state of the
    87  control-plane's destination container. It queries the same Destination service
    88  endpoint as the linkerd-proxy's, and returns the addresses associated with that
    89  destination.`,
    90  		Example: example,
    91  		Args:    cobra.MinimumNArgs(1),
    92  		RunE: func(cmd *cobra.Command, args []string) error {
    93  			err := options.validate()
    94  			if err != nil {
    95  				return err
    96  			}
    97  
    98  			var client destinationPb.DestinationClient
    99  			var conn *grpc.ClientConn
   100  			if apiAddr != "" {
   101  				client, conn, err = destination.NewClient(apiAddr)
   102  				if err != nil {
   103  					fmt.Fprintf(os.Stderr, "Error creating destination client: %s\n", err)
   104  					os.Exit(1)
   105  				}
   106  			} else {
   107  				k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
   108  				if err != nil {
   109  					return err
   110  				}
   111  
   112  				client, conn, err = destination.NewExternalClient(cmd.Context(), controlPlaneNamespace, k8sAPI, options.destinationPod)
   113  				if err != nil {
   114  					fmt.Fprintf(os.Stderr, "Error creating destination client: %s\n", err)
   115  					os.Exit(1)
   116  				}
   117  			}
   118  
   119  			defer conn.Close()
   120  
   121  			endpoints, err := requestEndpointsFromAPI(client, options.contextToken, args)
   122  			if err != nil {
   123  				fmt.Fprintf(os.Stderr, "Destination API error: %s\n", err)
   124  				os.Exit(1)
   125  			}
   126  
   127  			output := renderEndpoints(endpoints, options)
   128  			_, err = fmt.Print(output)
   129  
   130  			return err
   131  		},
   132  	}
   133  
   134  	cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, fmt.Sprintf("Output format; one of: \"%s\" or \"%s\"", tableOutput, jsonOutput))
   135  	cmd.PersistentFlags().StringVar(&options.destinationPod, "destination-pod", "", "Target a specific destination Pod when there are multiple running")
   136  	cmd.PersistentFlags().StringVar(&options.contextToken, "token", "", "The context token to use when making the request to the destination API")
   137  
   138  	pkgcmd.ConfigureOutputFlagCompletion(cmd)
   139  
   140  	return cmd
   141  }
   142  
   143  func requestEndpointsFromAPI(client destinationPb.DestinationClient, token string, authorities []string) (endpointsInfo, error) {
   144  	info := make(endpointsInfo)
   145  	// buffered channels to avoid blocking
   146  	events := make(chan *destinationPb.Update, len(authorities))
   147  	errs := make(chan error, len(authorities))
   148  	var wg sync.WaitGroup
   149  
   150  	for _, authority := range authorities {
   151  		wg.Add(1)
   152  		go func(authority string) {
   153  			defer wg.Done()
   154  			if len(errs) == 0 {
   155  				dest := &destinationPb.GetDestination{
   156  					Scheme:       "http:",
   157  					Path:         authority,
   158  					ContextToken: token,
   159  				}
   160  
   161  				rsp, err := client.Get(context.Background(), dest)
   162  				if err != nil {
   163  					errs <- err
   164  					return
   165  				}
   166  
   167  				event, err := rsp.Recv()
   168  				if err != nil {
   169  					if grpcError, ok := status.FromError(err); ok {
   170  						err = errors.New(grpcError.Message())
   171  					}
   172  					errs <- err
   173  					return
   174  				}
   175  				events <- event
   176  			}
   177  		}(authority)
   178  	}
   179  	// Block till all goroutines above are done
   180  	wg.Wait()
   181  
   182  	for i := 0; i < len(authorities); i++ {
   183  		select {
   184  		case err := <-errs:
   185  			// we only care about the first error
   186  			return nil, err
   187  		case event := <-events:
   188  			addressSet := event.GetAdd()
   189  			labels := addressSet.GetMetricLabels()
   190  			serviceID := labels["service"] + "." + labels["namespace"]
   191  			if _, ok := info[serviceID]; !ok {
   192  				info[serviceID] = make(map[uint32][]podData)
   193  			}
   194  
   195  			for _, addr := range addressSet.GetAddrs() {
   196  				tcpAddr := addr.GetAddr()
   197  				port := tcpAddr.GetPort()
   198  
   199  				if info[serviceID][port] == nil {
   200  					info[serviceID][port] = make([]podData, 0)
   201  				}
   202  
   203  				labels := addr.GetMetricLabels()
   204  				info[serviceID][port] = append(info[serviceID][port], podData{
   205  					name:    labels["pod"],
   206  					address: tcpAddr.String(),
   207  					ip:      getIP(tcpAddr),
   208  					weight:  addr.GetWeight(),
   209  					labels:  addr.GetMetricLabels(),
   210  					http2:   addr.GetHttp2(),
   211  				})
   212  			}
   213  		}
   214  	}
   215  
   216  	return info, nil
   217  }
   218  
   219  func getIP(tcpAddr *netPb.TcpAddress) string {
   220  	ip := addr.FromProxyAPI(tcpAddr.GetIp())
   221  	if ip == nil {
   222  		return ""
   223  	}
   224  	return addr.PublicIPToString(ip)
   225  }
   226  
   227  func renderEndpoints(endpoints endpointsInfo, options *endpointsOptions) string {
   228  	var buffer bytes.Buffer
   229  	w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', 0)
   230  	writeEndpointsToBuffer(endpoints, w, options)
   231  	w.Flush()
   232  
   233  	return buffer.String()
   234  }
   235  
   236  type rowEndpoint struct {
   237  	Namespace string `json:"namespace"`
   238  	IP        string `json:"ip"`
   239  	Port      uint32 `json:"port"`
   240  	Pod       string `json:"pod"`
   241  	Service   string `json:"service"`
   242  	Weight    uint32 `json:"weight"`
   243  
   244  	Http2 *destinationPb.Http2ClientParams `json:"http2,omitempty"`
   245  
   246  	Labels map[string]string `json:"labels"`
   247  }
   248  
   249  func writeEndpointsToBuffer(endpoints endpointsInfo, w *tabwriter.Writer, options *endpointsOptions) {
   250  	maxPodLength := len(podHeader)
   251  	maxNamespaceLength := len(namespaceHeader)
   252  	endpointsTables := map[string][]rowEndpoint{}
   253  
   254  	for serviceID, servicePort := range endpoints {
   255  		namespace := ""
   256  		parts := strings.SplitN(serviceID, ".", 2)
   257  		namespace = parts[1]
   258  
   259  		for port, podAddrs := range servicePort {
   260  			for _, pod := range podAddrs {
   261  				name := pod.name
   262  				parts := strings.SplitN(name, "/", 2)
   263  				if len(parts) == 2 {
   264  					name = parts[1]
   265  				}
   266  				row := rowEndpoint{
   267  					Namespace: namespace,
   268  					IP:        pod.ip,
   269  					Port:      port,
   270  					Pod:       name,
   271  					Service:   serviceID,
   272  					Weight:    pod.weight,
   273  					Labels:    pod.labels,
   274  					Http2:     pod.http2,
   275  				}
   276  
   277  				endpointsTables[namespace] = append(endpointsTables[namespace], row)
   278  
   279  				if len(name) > maxPodLength {
   280  					maxPodLength = len(name)
   281  				}
   282  				if len(namespace) > maxNamespaceLength {
   283  					maxNamespaceLength = len(namespace)
   284  				}
   285  			}
   286  
   287  			sort.Slice(endpointsTables[namespace], func(i, j int) bool {
   288  				return endpointsTables[namespace][i].Service < endpointsTables[namespace][j].Service
   289  			})
   290  		}
   291  	}
   292  
   293  	switch options.outputFormat {
   294  	case tableOutput:
   295  		if len(endpointsTables) == 0 {
   296  			fmt.Fprintln(os.Stderr, "No endpoints found.")
   297  			os.Exit(0)
   298  		}
   299  		printEndpointsTables(endpointsTables, w, maxPodLength, maxNamespaceLength)
   300  	case jsonOutput:
   301  		printEndpointsJSON(endpointsTables, w)
   302  	}
   303  }
   304  
   305  func printEndpointsTables(endpointsTables map[string][]rowEndpoint, w *tabwriter.Writer, maxPodLength int, maxNamespaceLength int) {
   306  	firstTable := true // don't print a newline before the first table
   307  
   308  	for _, ns := range sortNamespaceKeys(endpointsTables) {
   309  		if !firstTable {
   310  			fmt.Fprint(w, "\n")
   311  		}
   312  		firstTable = false
   313  		printEndpointsTable(ns, endpointsTables[ns], w, maxPodLength, maxNamespaceLength)
   314  	}
   315  }
   316  
   317  func printEndpointsTable(namespace string, rows []rowEndpoint, w *tabwriter.Writer, maxPodLength int, maxNamespaceLength int) {
   318  	headers := make([]string, 0)
   319  	templateString := "%s\t%d\t%s\t%s\n"
   320  
   321  	headers = append(headers, namespaceHeader+strings.Repeat(" ", maxNamespaceLength-len(namespaceHeader)))
   322  	templateString = "%s\t" + templateString
   323  
   324  	headers = append(headers, []string{
   325  		"IP",
   326  		"PORT",
   327  		podHeader + strings.Repeat(" ", maxPodLength-len(podHeader)),
   328  		"SERVICE",
   329  	}...)
   330  	fmt.Fprintln(w, strings.Join(headers, "\t"))
   331  
   332  	for _, row := range rows {
   333  		values := []interface{}{
   334  			namespace + strings.Repeat(" ", maxNamespaceLength-len(namespace)),
   335  			row.IP,
   336  			row.Port,
   337  			row.Pod,
   338  			row.Service,
   339  		}
   340  
   341  		fmt.Fprintf(w, templateString, values...)
   342  	}
   343  }
   344  
   345  func printEndpointsJSON(endpointsTables map[string][]rowEndpoint, w *tabwriter.Writer) {
   346  	entries := []rowEndpoint{}
   347  
   348  	for _, ns := range sortNamespaceKeys(endpointsTables) {
   349  		entries = append(entries, endpointsTables[ns]...)
   350  	}
   351  
   352  	b, err := json.MarshalIndent(entries, "", "  ")
   353  	if err != nil {
   354  		log.Error(err.Error())
   355  		return
   356  	}
   357  	fmt.Fprintf(w, "%s\n", b)
   358  }
   359  
   360  func sortNamespaceKeys(endpointsTables map[string][]rowEndpoint) []string {
   361  	var sortedKeys []string
   362  	for key := range endpointsTables {
   363  		sortedKeys = append(sortedKeys, key)
   364  	}
   365  	sort.Strings(sortedKeys)
   366  	return sortedKeys
   367  }
   368  

View as plain text