...

Source file src/github.com/linkerd/linkerd2/viz/cmd/profile.go

Documentation: github.com/linkerd/linkerd2/viz/cmd

     1  package cmd
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"io"
    10  	"net"
    11  	"os"
    12  	"sort"
    13  	"strings"
    14  	"time"
    15  
    16  	sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
    17  	pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
    18  	"github.com/linkerd/linkerd2/pkg/healthcheck"
    19  	"github.com/linkerd/linkerd2/pkg/k8s"
    20  	"github.com/linkerd/linkerd2/pkg/profiles"
    21  	"github.com/linkerd/linkerd2/pkg/protohttp"
    22  	"github.com/linkerd/linkerd2/viz/pkg/api"
    23  	hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
    24  	vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
    25  	pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
    26  	"github.com/linkerd/linkerd2/viz/tap/pkg"
    27  	log "github.com/sirupsen/logrus"
    28  	"github.com/spf13/cobra"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/util/validation"
    31  	"sigs.k8s.io/yaml"
    32  )
    33  
    34  type profileOptions struct {
    35  	name          string
    36  	namespace     string
    37  	tap           string
    38  	tapDuration   time.Duration
    39  	tapRouteLimit uint
    40  	output        string
    41  }
    42  
    43  func newProfileOptions() *profileOptions {
    44  	return &profileOptions{
    45  		tapDuration:   5 * time.Second,
    46  		tapRouteLimit: 20,
    47  		output:        "yaml",
    48  	}
    49  }
    50  
    51  func (options *profileOptions) validate() error {
    52  	if options.tap == "" {
    53  		return errors.New("The --tap flag must be specified")
    54  	}
    55  	// a DNS-1035 label must consist of lower case alphanumeric characters or '-',
    56  	// start with an alphabetic character, and end with an alphanumeric character
    57  	if errs := validation.IsDNS1035Label(options.name); len(errs) != 0 {
    58  		return fmt.Errorf("invalid service %q: %v", options.name, errs)
    59  	}
    60  	// a DNS-1123 label must consist of lower case alphanumeric characters or '-',
    61  	// and must start and end with an alphanumeric character
    62  	if errs := validation.IsDNS1123Label(options.namespace); len(errs) != 0 {
    63  		return fmt.Errorf("invalid namespace %q: %v", options.namespace, errs)
    64  	}
    65  	return nil
    66  }
    67  
    68  // newCmdProfile creates a new cobra command for the Profile subcommand which
    69  // generates Linkerd service profile based off tap data.
    70  func newCmdProfile() *cobra.Command {
    71  	options := newProfileOptions()
    72  
    73  	cmd := &cobra.Command{
    74  		Use:   "profile [flags] --tap resource (SERVICE)",
    75  		Short: "Output service profile config for Kubernetes based off tap data",
    76  		Long:  "Output service profile config for Kubernetes based off tap data.",
    77  		Example: `  # Generate a profile by watching live traffic.
    78    linkerd viz profile -n emojivoto web-svc --tap deploy/web --tap-duration 10s --tap-route-limit 5
    79  `,
    80  		Args: cobra.ExactArgs(1),
    81  		ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
    82  			// Skip providing suggestions if one or more arguments are provided
    83  			// We either have a suggestion selected or more multiple args are provided
    84  			// which is not allowed for this command.
    85  			if len(args) > 0 {
    86  				return nil, cobra.ShellCompDirectiveError
    87  			}
    88  
    89  			k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
    90  			if err != nil {
    91  				return nil, cobra.ShellCompDirectiveError
    92  			}
    93  
    94  			if options.namespace == "" {
    95  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
    96  			}
    97  
    98  			cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
    99  
   100  			// Profiles require completion for only services so prepend the service resource
   101  			// type to the list of args
   102  			results, err := cc.Complete([]string{k8s.Service}, toComplete)
   103  			if err != nil {
   104  				return nil, cobra.ShellCompDirectiveError
   105  			}
   106  
   107  			return results, cobra.ShellCompDirectiveDefault
   108  		},
   109  		RunE: func(cmd *cobra.Command, args []string) error {
   110  			api.CheckClientOrExit(hc.VizOptions{
   111  				Options: &healthcheck.Options{
   112  					ControlPlaneNamespace: controlPlaneNamespace,
   113  					KubeConfig:            kubeconfigPath,
   114  					Impersonate:           impersonate,
   115  					ImpersonateGroup:      impersonateGroup,
   116  					KubeContext:           kubeContext,
   117  					APIAddr:               apiAddr,
   118  				},
   119  				VizNamespaceOverride: vizNamespace,
   120  			})
   121  			if options.namespace == "" {
   122  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   123  			}
   124  			options.name = args[0]
   125  			clusterDomain := "cluster.local"
   126  			var k8sAPI *k8s.KubernetesAPI
   127  			err := options.validate()
   128  			if err != nil {
   129  				return err
   130  			}
   131  			k8sAPI, err = k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
   132  			if err != nil {
   133  				return err
   134  			}
   135  			_, values, err := healthcheck.FetchCurrentConfiguration(cmd.Context(), k8sAPI, controlPlaneNamespace)
   136  			if err != nil {
   137  				return err
   138  			}
   139  			if cd := values.ClusterDomain; cd != "" {
   140  				clusterDomain = cd
   141  			}
   142  			return renderTapOutputProfile(cmd.Context(), k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), options.output, os.Stdout)
   143  		},
   144  	}
   145  	cmd.PersistentFlags().StringVar(&options.tap, "tap", options.tap, "Output a service profile based on tap data for the given target resource")
   146  	cmd.PersistentFlags().DurationVar(&options.tapDuration, "tap-duration", options.tapDuration, "Duration over which tap data is collected (for example: \"10s\", \"1m\", \"10m\")")
   147  	cmd.PersistentFlags().UintVar(&options.tapRouteLimit, "tap-route-limit", options.tapRouteLimit, "Max number of routes to add to the profile")
   148  	cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the service")
   149  	cmd.PersistentFlags().StringVarP(&options.output, "output", "o", options.output, "Output format. One of: yaml, json")
   150  
   151  	pkgcmd.ConfigureNamespaceFlagCompletion(
   152  		cmd, []string{"namespace"},
   153  		kubeconfigPath, impersonate, impersonateGroup, kubeContext)
   154  	return cmd
   155  }
   156  
   157  // renderTapOutputProfile performs a tap on the desired resource and generates
   158  // a service profile with routes pre-populated from the tap data
   159  // Only inbound tap traffic is considered.
   160  func renderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, format string, w io.Writer) error {
   161  	requestParams := pkg.TapRequestParams{
   162  		Resource:  tapResource,
   163  		Namespace: namespace,
   164  	}
   165  	log.Debugf("Running `linkerd tap %s --namespace %s`", tapResource, namespace)
   166  	req, err := pkg.BuildTapByResourceRequest(requestParams)
   167  	if err != nil {
   168  		return err
   169  	}
   170  	profile, err := tapToServiceProfile(ctx, k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit)
   171  	if err != nil {
   172  		return err
   173  	}
   174  	var output []byte
   175  	if format == "yaml" {
   176  		output, err = yaml.Marshal(profile)
   177  	} else if format == "json" {
   178  		output, err = json.Marshal(profile)
   179  	} else {
   180  		return errors.New("output format must be one of yaml or json")
   181  	}
   182  	if err != nil {
   183  		return fmt.Errorf("Error writing Service Profile: %w", err)
   184  	}
   185  	w.Write(output)
   186  	return nil
   187  }
   188  
   189  func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
   190  	profile := sp.ServiceProfile{
   191  		ObjectMeta: metav1.ObjectMeta{
   192  			Name:      fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
   193  			Namespace: namespace,
   194  		},
   195  		TypeMeta: profiles.ServiceProfileMeta,
   196  	}
   197  	ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration)
   198  	defer cancel()
   199  	reader, body, err := pkg.Reader(ctxWithTime, k8sAPI, tapReq)
   200  	if err != nil {
   201  		return profile, err
   202  	}
   203  	defer body.Close()
   204  	routes := routeSpecFromTap(reader, routeLimit)
   205  	profile.Spec.Routes = routes
   206  	return profile, nil
   207  }
   208  
   209  func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSpec {
   210  	routes := make([]*sp.RouteSpec, 0)
   211  	routesMap := make(map[string]*sp.RouteSpec)
   212  	for {
   213  		log.Debug("Waiting for data...")
   214  		event := pb.TapEvent{}
   215  		err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
   216  		if err != nil {
   217  			// expected errors when hitting the tapDuration deadline
   218  			var e net.Error
   219  			if !errors.Is(err, io.EOF) &&
   220  				!(errors.As(err, &e) && e.Timeout()) &&
   221  				!errors.Is(err, context.DeadlineExceeded) &&
   222  				!strings.HasSuffix(err.Error(), pkg.ErrClosedResponseBody) {
   223  				fmt.Fprintln(os.Stderr, err)
   224  			}
   225  			break
   226  		}
   227  		routeSpec := getPathDataFromTap(&event)
   228  		log.Debugf("Created route spec: %v", routeSpec)
   229  		if routeSpec != nil {
   230  			routesMap[routeSpec.Name] = routeSpec
   231  			if len(routesMap) >= routeLimit {
   232  				break
   233  			}
   234  		}
   235  	}
   236  	for _, path := range sortMapKeys(routesMap) {
   237  		routes = append(routes, routesMap[path])
   238  	}
   239  	return routes
   240  }
   241  
   242  func sortMapKeys(m map[string]*sp.RouteSpec) (keys []string) {
   243  	for key := range m {
   244  		keys = append(keys, key)
   245  	}
   246  	sort.Strings(keys)
   247  	return
   248  }
   249  
   250  func getPathDataFromTap(event *pb.TapEvent) *sp.RouteSpec {
   251  	if event.GetProxyDirection() != pb.TapEvent_INBOUND {
   252  		return nil
   253  	}
   254  	switch ev := event.GetHttp().GetEvent().(type) {
   255  	case *pb.TapEvent_Http_RequestInit_:
   256  		path := ev.RequestInit.GetPath()
   257  		if path == "/" {
   258  			return nil
   259  		}
   260  
   261  		return profiles.MkRouteSpec(
   262  			path,
   263  			profiles.PathToRegex(path), // for now, no path consolidation
   264  			vizutil.HTTPMethodToString(ev.RequestInit.GetMethod()),
   265  			nil)
   266  	default:
   267  		return nil
   268  	}
   269  }
   270  

View as plain text