...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/top_routes.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"sort"
     8  	"strings"
     9  
    10  	sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
    11  	api "github.com/linkerd/linkerd2/controller/k8s"
    12  	"github.com/linkerd/linkerd2/pkg/k8s"
    13  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    14  	"github.com/prometheus/common/model"
    15  	log "github.com/sirupsen/logrus"
    16  	"k8s.io/apimachinery/pkg/labels"
    17  	"k8s.io/apimachinery/pkg/runtime"
    18  )
    19  
    20  const (
    21  	routeReqQuery             = "sum(increase(route_response_total%s[%s])) by (%s, dst, classification)"
    22  	actualRouteReqQuery       = "sum(increase(route_actual_response_total%s[%s])) by (%s, dst, classification)"
    23  	routeLatencyQuantileQuery = "histogram_quantile(%s, sum(irate(route_response_latency_ms_bucket%s[%s])) by (le, dst, %s))"
    24  	dstLabel                  = `dst=~"(%s)(:\\d+)?"`
    25  	// DefaultRouteName is the name to display for requests that don't match any routes.
    26  	DefaultRouteName = "[DEFAULT]"
    27  )
    28  
    29  type dstAndRoute struct {
    30  	dst   string
    31  	route string
    32  }
    33  
    34  type indexedTable = map[dstAndRoute]*pb.RouteTable_Row
    35  
    36  type resourceTable struct {
    37  	resource string
    38  	table    indexedTable
    39  }
    40  
    41  func (s *grpcServer) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest) (*pb.TopRoutesResponse, error) {
    42  	log.Debugf("TopRoutes request: %+v", req)
    43  
    44  	if !s.k8sAPI.SPAvailable() {
    45  		return topRoutesError(req, "Routes are not available"), nil
    46  	}
    47  
    48  	errRsp := validateRequest(req)
    49  	if errRsp != nil {
    50  		return errRsp, nil
    51  	}
    52  
    53  	// TopRoutes will return one table for each resource object requested.
    54  	tables := make([]resourceTable, 0)
    55  	targetResource := req.GetSelector().GetResource()
    56  	labelSelector, err := getTopLabelSelector(req)
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  	if targetResource.GetType() == k8s.Authority {
    61  		// Authority cannot be the target because authorities don't have namespaces,
    62  		// therefore there is no namespace in which to look for a service profile.
    63  		return topRoutesError(req, "Authority cannot be the target of a routes query; try using an authority in the --to flag instead"), nil
    64  	}
    65  
    66  	err = s.validateTimeWindow(ctx, req.TimeWindow)
    67  	if err != nil {
    68  		return topRoutesError(req, fmt.Sprintf("invalid time window: %s", err)), nil
    69  	}
    70  
    71  	// Non-authority resource
    72  	objects, err := s.k8sAPI.GetObjects(targetResource.Namespace, targetResource.Type, targetResource.Name, labelSelector)
    73  	if err != nil {
    74  		return nil, err
    75  	}
    76  
    77  	// Create a table for each object in the resource.
    78  	for _, obj := range objects {
    79  		table, err := s.topRoutesFor(ctx, req, obj)
    80  		if err != nil {
    81  			// No samples for this object, skip it.
    82  			continue
    83  		}
    84  		tables = append(tables, *table)
    85  	}
    86  
    87  	if len(tables) == 0 {
    88  		return topRoutesError(req, "No Service Profiles found for selected resources"), nil
    89  	}
    90  
    91  	// Construct response.
    92  	routeTables := make([]*pb.RouteTable, 0)
    93  
    94  	for _, t := range tables {
    95  		rows := make([]*pb.RouteTable_Row, 0)
    96  		for _, row := range t.table {
    97  			rows = append(rows, row)
    98  		}
    99  		routeTables = append(routeTables, &pb.RouteTable{
   100  			Resource: t.resource,
   101  			Rows:     rows,
   102  		})
   103  	}
   104  
   105  	return &pb.TopRoutesResponse{
   106  		Response: &pb.TopRoutesResponse_Ok_{
   107  			Ok: &pb.TopRoutesResponse_Ok{
   108  				Routes: routeTables,
   109  			},
   110  		},
   111  	}, nil
   112  }
   113  
   114  // topRoutesFor constructs a resource table for the given resource object.
   115  func (s *grpcServer) topRoutesFor(ctx context.Context, req *pb.TopRoutesRequest, object runtime.Object) (*resourceTable, error) {
   116  	// requestedResource is the destination resource.  For inbound queries, it is the target resource.
   117  	// For outbound (i.e. --to) queries, it is the ToResource.  We will look at the service profiles
   118  	// of this destination resource.
   119  	name, err := api.GetNameOf(object)
   120  	if err != nil {
   121  		return nil, err
   122  	}
   123  	clientNs := req.GetSelector().GetResource().GetNamespace()
   124  	typ := req.GetSelector().GetResource().GetType()
   125  	labelSelector, err := getTopLabelSelector(req)
   126  	if err != nil {
   127  		return nil, err
   128  	}
   129  	targetResource := &pb.Resource{
   130  		Name:      name,
   131  		Namespace: req.GetSelector().GetResource().GetNamespace(),
   132  		Type:      typ,
   133  	}
   134  	requestedResource := targetResource
   135  	if req.GetToResource() != nil {
   136  		requestedResource = req.GetToResource()
   137  	}
   138  
   139  	profiles := make(map[string]*sp.ServiceProfile)
   140  
   141  	if requestedResource.GetType() == k8s.Authority {
   142  		// Authorities may not be a source, so we know this is a ToResource.
   143  		profiles, err = s.getProfilesForAuthority(requestedResource.GetName(), clientNs, labelSelector)
   144  		if err != nil {
   145  			return nil, err
   146  		}
   147  	} else {
   148  		// Non-authority resource.
   149  		// Lookup individual resource objects.
   150  		objects, err := s.k8sAPI.GetObjects(requestedResource.Namespace, requestedResource.Type, requestedResource.Name, labelSelector)
   151  		if err != nil {
   152  			return nil, err
   153  		}
   154  		// Find service profiles for all services in all objects in the resource.
   155  		for _, obj := range objects {
   156  			// Lookup services for each object.
   157  			services, err := s.k8sAPI.GetServicesFor(obj, false)
   158  			if err != nil {
   159  				return nil, err
   160  			}
   161  
   162  			for _, svc := range services {
   163  				p := s.k8sAPI.GetServiceProfileFor(svc, clientNs, s.clusterDomain)
   164  				profiles[svc.GetName()] = p
   165  			}
   166  		}
   167  	}
   168  
   169  	metrics, err := s.getRouteMetrics(ctx, req, profiles, targetResource)
   170  	if err != nil {
   171  		return nil, err
   172  	}
   173  
   174  	return &resourceTable{
   175  		resource: fmt.Sprintf("%s/%s", typ, name),
   176  		table:    metrics,
   177  	}, nil
   178  }
   179  
   180  func topRoutesError(req *pb.TopRoutesRequest, message string) *pb.TopRoutesResponse {
   181  	return &pb.TopRoutesResponse{
   182  		Response: &pb.TopRoutesResponse_Error{
   183  			Error: &pb.ResourceError{
   184  				Resource: req.GetSelector().GetResource(),
   185  				Error:    message,
   186  			},
   187  		},
   188  	}
   189  }
   190  
   191  func validateRequest(req *pb.TopRoutesRequest) *pb.TopRoutesResponse {
   192  	if req.GetSelector().GetResource() == nil {
   193  		return topRoutesError(req, "TopRoutes request missing Selector Resource")
   194  	}
   195  
   196  	if req.GetNone() == nil {
   197  		// This is an outbound (--to) request.
   198  		targetType := req.GetSelector().GetResource().GetType()
   199  		if targetType == k8s.Service || targetType == k8s.Authority {
   200  			return topRoutesError(req, fmt.Sprintf("The %s resource type is not supported with 'to' queries", targetType))
   201  		}
   202  	}
   203  	return nil
   204  }
   205  
   206  func (s *grpcServer) getProfilesForAuthority(authority string, clientNs string, labelSelector labels.Selector) (map[string]*sp.ServiceProfile, error) {
   207  	if authority == "" {
   208  		// All authorities
   209  		ps, err := s.k8sAPI.SP().Lister().ServiceProfiles(clientNs).List(labelSelector)
   210  		if err != nil {
   211  			return nil, err
   212  		}
   213  
   214  		if len(ps) == 0 {
   215  			return nil, errors.New("No ServiceProfiles found")
   216  		}
   217  
   218  		profiles := make(map[string]*sp.ServiceProfile)
   219  
   220  		for _, p := range ps {
   221  			profiles[p.Name] = p
   222  		}
   223  
   224  		return profiles, nil
   225  	}
   226  	// Specific authority
   227  	p, err := s.k8sAPI.SP().Lister().ServiceProfiles(clientNs).Get(authority)
   228  	if err != nil {
   229  		return nil, err
   230  	}
   231  	return map[string]*sp.ServiceProfile{
   232  		p.Name: p,
   233  	}, nil
   234  }
   235  
   236  func (s *grpcServer) getRouteMetrics(ctx context.Context, req *pb.TopRoutesRequest, profiles map[string]*sp.ServiceProfile, resource *pb.Resource) (indexedTable, error) {
   237  	timeWindow := req.TimeWindow
   238  
   239  	dsts := make([]string, 0)
   240  	for _, p := range profiles {
   241  		dsts = append(dsts, p.GetName())
   242  	}
   243  
   244  	reqLabels := s.buildRouteLabels(req, dsts, resource)
   245  	groupBy := "rt_route"
   246  
   247  	queries := map[promType]string{
   248  		promRequests: fmt.Sprintf(routeReqQuery, reqLabels, timeWindow, groupBy),
   249  	}
   250  
   251  	if req.GetOutbound() != nil && req.GetNone() == nil {
   252  		// If this req has an Outbound, then query the actual request counts as well.
   253  		queries[promActualRequests] = fmt.Sprintf(actualRouteReqQuery, reqLabels, timeWindow, groupBy)
   254  	}
   255  
   256  	quantileQueries := generateQuantileQueries(routeLatencyQuantileQuery, reqLabels, timeWindow, groupBy)
   257  	results, err := s.getPrometheusMetrics(ctx, queries, quantileQueries)
   258  	if err != nil {
   259  		return nil, err
   260  	}
   261  
   262  	table := make(indexedTable)
   263  	for service, profile := range profiles {
   264  		for _, route := range profile.Spec.Routes {
   265  			key := dstAndRoute{
   266  				dst:   profile.GetName(),
   267  				route: route.Name,
   268  			}
   269  			table[key] = &pb.RouteTable_Row{
   270  				Authority: service,
   271  				Route:     route.Name,
   272  				Stats:     &pb.BasicStats{},
   273  			}
   274  		}
   275  		defaultKey := dstAndRoute{
   276  			dst:   profile.GetName(),
   277  			route: "",
   278  		}
   279  		table[defaultKey] = &pb.RouteTable_Row{
   280  			Authority: service,
   281  			Route:     DefaultRouteName,
   282  			Stats:     &pb.BasicStats{},
   283  		}
   284  	}
   285  
   286  	processRouteMetrics(results, timeWindow, table)
   287  
   288  	return table, nil
   289  }
   290  
   291  func (s *grpcServer) buildRouteLabels(req *pb.TopRoutesRequest, dsts []string, resource *pb.Resource) string {
   292  	// labels: the labels for the resource we want to query for
   293  	var labels model.LabelSet
   294  
   295  	switch req.Outbound.(type) {
   296  
   297  	case *pb.TopRoutesRequest_ToResource:
   298  		labels = labels.Merge(promQueryLabels(resource))
   299  		labels = labels.Merge(promDirectionLabels("outbound"))
   300  		return renderLabels(labels, dsts)
   301  
   302  	default:
   303  		labels = labels.Merge(promDirectionLabels("inbound"))
   304  		labels = labels.Merge(promQueryLabels(resource))
   305  		return renderLabels(labels, dsts)
   306  	}
   307  }
   308  
   309  func renderLabels(labels model.LabelSet, services []string) string {
   310  	pairs := make([]string, 0)
   311  	for k, v := range labels {
   312  		pairs = append(pairs, fmt.Sprintf("%s=%q", k, v))
   313  	}
   314  	if len(services) > 0 {
   315  		pairs = append(pairs, fmt.Sprintf(dstLabel, strings.Join(services, "|")))
   316  	}
   317  	sort.Strings(pairs)
   318  	return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
   319  }
   320  
   321  func processRouteMetrics(results []promResult, timeWindow string, table indexedTable) {
   322  	for _, result := range results {
   323  		for _, sample := range result.vec {
   324  			route := string(sample.Metric[model.LabelName("rt_route")])
   325  			dst := string(sample.Metric[model.LabelName("dst")])
   326  			dst = strings.Split(dst, ":")[0] // Truncate port, if there is one.
   327  
   328  			key := dstAndRoute{dst, route}
   329  
   330  			if table[key] == nil {
   331  				log.Warnf("Found stats for unknown route: %s:%s", dst, route)
   332  				continue
   333  			}
   334  
   335  			table[key].TimeWindow = timeWindow
   336  			value := extractSampleValue(sample)
   337  
   338  			switch result.prom {
   339  			case promRequests:
   340  				switch string(sample.Metric[model.LabelName("classification")]) {
   341  				case success:
   342  					table[key].Stats.SuccessCount += value
   343  				case failure:
   344  					table[key].Stats.FailureCount += value
   345  				}
   346  			case promActualRequests:
   347  				switch string(sample.Metric[model.LabelName("classification")]) {
   348  				case success:
   349  					table[key].Stats.ActualSuccessCount += value
   350  				case failure:
   351  					table[key].Stats.ActualFailureCount += value
   352  				}
   353  			case promLatencyP50:
   354  				table[key].Stats.LatencyMsP50 = value
   355  			case promLatencyP95:
   356  				table[key].Stats.LatencyMsP95 = value
   357  			case promLatencyP99:
   358  				table[key].Stats.LatencyMsP99 = value
   359  			}
   360  		}
   361  	}
   362  }
   363  
   364  // generate correct label.Selector object according to the request
   365  func getTopLabelSelector(req *pb.TopRoutesRequest) (labels.Selector, error) {
   366  	labelSelector := labels.Everything()
   367  	if s := req.GetSelector().GetLabelSelector(); s != "" {
   368  		var err error
   369  		labelSelector, err = labels.Parse(s)
   370  		if err != nil {
   371  			return nil, fmt.Errorf("invalid label selector %q: %w", s, err)
   372  		}
   373  	}
   374  	return labelSelector, nil
   375  }
   376  

View as plain text