...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/prometheus.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"math"
     8  	"sort"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/linkerd/linkerd2/pkg/k8s"
    13  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    14  	"github.com/prometheus/common/model"
    15  	log "github.com/sirupsen/logrus"
    16  	"go.opencensus.io/trace"
    17  )
    18  
    19  type promType string
    20  type promResult struct {
    21  	prom promType
    22  	vec  model.Vector
    23  	err  error
    24  }
    25  
    26  const (
    27  	promGatewayAlive    = promType("QUERY_GATEWAY_ALIVE")
    28  	promRequests        = promType("QUERY_REQUESTS")
    29  	promAllowedRequests = promType("QUERY_ALLOWED_REQUESTS")
    30  	promDeniedRequests  = promType("QUERY_DENIED_REQUESTS")
    31  	promActualRequests  = promType("QUERY_ACTUAL_REQUESTS")
    32  	promTCPConnections  = promType("QUERY_TCP_CONNECTIONS")
    33  	promTCPReadBytes    = promType("QUERY_TCP_READ_BYTES")
    34  	promTCPWriteBytes   = promType("QUERY_TCP_WRITE_BYTES")
    35  	promLatencyP50      = promType("0.5")
    36  	promLatencyP95      = promType("0.95")
    37  	promLatencyP99      = promType("0.99")
    38  
    39  	namespaceLabel         = model.LabelName("namespace")
    40  	dstNamespaceLabel      = model.LabelName("dst_namespace")
    41  	gatewayNameLabel       = model.LabelName("gateway_name")
    42  	gatewayNamespaceLabel  = model.LabelName("gateway_namespace")
    43  	remoteClusterNameLabel = model.LabelName("target_cluster_name")
    44  	authorityLabel         = model.LabelName("authority")
    45  	serverKindLabel        = model.LabelName("srv_kind")
    46  	serverNameLabel        = model.LabelName("srv_name")
    47  	authorizationKindLabel = model.LabelName("authz_kind")
    48  	authorizationNameLabel = model.LabelName("authz_name")
    49  	routeKindLabel         = model.LabelName("route_kind")
    50  	routeNameLabel         = model.LabelName("route_name")
    51  )
    52  
    53  var (
    54  	// ErrNoPrometheusInstance is returned when there is no prometheus instance configured
    55  	ErrNoPrometheusInstance = errors.New("No prometheus instance to connect")
    56  )
    57  
    58  func extractSampleValue(sample *model.Sample) uint64 {
    59  	value := uint64(0)
    60  	if !math.IsNaN(float64(sample.Value)) {
    61  		value = uint64(math.Round(float64(sample.Value)))
    62  	}
    63  	return value
    64  }
    65  
    66  func (s *grpcServer) queryProm(ctx context.Context, query string) (model.Vector, error) {
    67  	log.Debugf("Query request: %q", query)
    68  
    69  	_, span := trace.StartSpan(ctx, "query.prometheus")
    70  	defer span.End()
    71  	span.AddAttributes(trace.StringAttribute("queryString", query))
    72  
    73  	if s.prometheusAPI == nil {
    74  		return nil, ErrNoPrometheusInstance
    75  	}
    76  
    77  	// single data point (aka summary) query
    78  	res, warn, err := s.prometheusAPI.Query(ctx, query, time.Time{})
    79  	if err != nil {
    80  		return nil, fmt.Errorf("Query failed: %q: %w", query, err)
    81  	}
    82  	if warn != nil {
    83  		log.Warnf("%v", warn)
    84  	}
    85  	log.Debugf("Query response:\n\t%+v", res)
    86  
    87  	if res.Type() != model.ValVector {
    88  		return nil, fmt.Errorf("Unexpected query result type (expected Vector): %s", res.Type())
    89  	}
    90  
    91  	return res.(model.Vector), nil
    92  }
    93  
    94  // add filtering by resource type
    95  // note that metricToKey assumes the label ordering (namespace, name)
    96  func promGroupByLabelNames(resource *pb.Resource) model.LabelNames {
    97  	names := model.LabelNames{namespaceLabel}
    98  
    99  	if resource.Type != k8s.Namespace {
   100  		names = append(names, promResourceType(resource))
   101  	}
   102  	return names
   103  }
   104  
   105  // add filtering by resource type
   106  // note that metricToKey assumes the label ordering (namespace, name)
   107  func promDstGroupByLabelNames(resource *pb.Resource) model.LabelNames {
   108  	names := model.LabelNames{dstNamespaceLabel}
   109  
   110  	if isNonK8sResourceQuery(resource.GetType()) {
   111  		names = append(names, promResourceType(resource))
   112  	} else if resource.Type != k8s.Namespace {
   113  		names = append(names, "dst_"+promResourceType(resource))
   114  	}
   115  	return names
   116  }
   117  
   118  // query a named resource
   119  func promQueryLabels(resource *pb.Resource) model.LabelSet {
   120  	set := model.LabelSet{}
   121  	if resource != nil {
   122  		if resource.Name != "" {
   123  			if resource.GetType() == k8s.Server {
   124  				set[serverKindLabel] = model.LabelValue("server")
   125  				set[serverNameLabel] = model.LabelValue(resource.GetName())
   126  			} else if resource.GetType() == k8s.ServerAuthorization {
   127  				set[authorizationKindLabel] = model.LabelValue("serverauthorization")
   128  				set[authorizationNameLabel] = model.LabelValue(resource.GetName())
   129  			} else if resource.GetType() == k8s.AuthorizationPolicy {
   130  				set[authorizationKindLabel] = model.LabelValue("authorizationpolicy")
   131  				set[authorizationNameLabel] = model.LabelValue(resource.GetName())
   132  			} else if resource.GetType() == k8s.HTTPRoute {
   133  				set[routeNameLabel] = model.LabelValue(resource.GetName())
   134  			} else if resource.GetType() != k8s.Service {
   135  				set[promResourceType(resource)] = model.LabelValue(resource.Name)
   136  			}
   137  		}
   138  		if shouldAddNamespaceLabel(resource) {
   139  			set[namespaceLabel] = model.LabelValue(resource.Namespace)
   140  		}
   141  	}
   142  	return set
   143  }
   144  
   145  // query a named resource
   146  func promDstQueryLabels(resource *pb.Resource) model.LabelSet {
   147  	set := model.LabelSet{}
   148  	if resource.Name != "" {
   149  		if isNonK8sResourceQuery(resource.GetType()) {
   150  			set[promResourceType(resource)] = model.LabelValue(resource.Name)
   151  		} else {
   152  			set["dst_"+promResourceType(resource)] = model.LabelValue(resource.Name)
   153  			if shouldAddNamespaceLabel(resource) {
   154  				set[dstNamespaceLabel] = model.LabelValue(resource.Namespace)
   155  			}
   156  		}
   157  	}
   158  
   159  	return set
   160  }
   161  
   162  // insert a not-nil check into a LabelSet to verify that data for a specified
   163  // label name exists. due to the `!=` this must be inserted as a string. the
   164  // structure of this code is taken from the Prometheus labelset.go library.
   165  func generateLabelStringWithExclusion(l model.LabelSet, labelNames ...string) string {
   166  	lstrs := make([]string, 0, len(l))
   167  	for l, v := range l {
   168  		lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
   169  	}
   170  	for _, labelName := range labelNames {
   171  		lstrs = append(lstrs, fmt.Sprintf(`%s!=""`, labelName))
   172  	}
   173  
   174  	sort.Strings(lstrs)
   175  	return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
   176  }
   177  
   178  // insert a regex-match check into a LabelSet for labels that match the provided
   179  // string. this is modeled on generateLabelStringWithExclusion().
   180  func generateLabelStringWithRegex(l model.LabelSet, labelName string, stringToMatch string) string {
   181  	lstrs := make([]string, 0, len(l))
   182  	for l, v := range l {
   183  		lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
   184  	}
   185  	lstrs = append(lstrs, fmt.Sprintf(`%s=~"^%s.*"`, labelName, stringToMatch))
   186  
   187  	sort.Strings(lstrs)
   188  	return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
   189  }
   190  
   191  // generate Prometheus queries for latency quantiles, based on a quantile query
   192  // template, query labels, a time window and grouping.
   193  func generateQuantileQueries(quantileQuery, labels, timeWindow, groupBy string) map[promType]string {
   194  	return map[promType]string{
   195  		promLatencyP50: fmt.Sprintf(quantileQuery, promLatencyP50, labels, timeWindow, groupBy),
   196  		promLatencyP95: fmt.Sprintf(quantileQuery, promLatencyP95, labels, timeWindow, groupBy),
   197  		promLatencyP99: fmt.Sprintf(quantileQuery, promLatencyP99, labels, timeWindow, groupBy),
   198  	}
   199  }
   200  
   201  // determine if we should add "namespace=<namespace>" to a named query
   202  func shouldAddNamespaceLabel(resource *pb.Resource) bool {
   203  	return resource.Type != k8s.Namespace && resource.Namespace != ""
   204  }
   205  
   206  // query for inbound or outbound requests
   207  func promDirectionLabels(direction string) model.LabelSet {
   208  	return model.LabelSet{
   209  		model.LabelName("direction"): model.LabelValue(direction),
   210  	}
   211  }
   212  
   213  func promPeerLabel(peer string) model.LabelSet {
   214  	return model.LabelSet{
   215  		model.LabelName("peer"): model.LabelValue(peer),
   216  	}
   217  }
   218  
   219  func promResourceType(resource *pb.Resource) model.LabelName {
   220  	l5dLabel := k8s.KindToL5DLabel(resource.Type)
   221  	return model.LabelName(l5dLabel)
   222  }
   223  
   224  func (s *grpcServer) getPrometheusMetrics(ctx context.Context, requestQueries map[promType]string, latencyQueries map[promType]string) ([]promResult, error) {
   225  	resultChan := make(chan promResult)
   226  
   227  	for pt, query := range requestQueries {
   228  		go func(typ promType, promQuery string) {
   229  			resultVector, err := s.queryProm(ctx, promQuery)
   230  			resultChan <- promResult{
   231  				prom: typ,
   232  				vec:  resultVector,
   233  				err:  err,
   234  			}
   235  		}(pt, query)
   236  	}
   237  
   238  	for quantile, query := range latencyQueries {
   239  		go func(qt promType, promQuery string) {
   240  			resultVector, err := s.queryProm(ctx, promQuery)
   241  			resultChan <- promResult{
   242  				prom: qt,
   243  				vec:  resultVector,
   244  				err:  err,
   245  			}
   246  		}(quantile, query)
   247  	}
   248  	// process results, receive one message per prometheus query type
   249  	var err error
   250  	results := []promResult{}
   251  	for i := 0; i < len(latencyQueries)+len(requestQueries); i++ {
   252  		result := <-resultChan
   253  		if result.err != nil {
   254  			log.Errorf("queryProm failed with: %s", result.err)
   255  			err = result.err
   256  		} else {
   257  			results = append(results, result)
   258  		}
   259  	}
   260  	if err != nil {
   261  		return nil, err
   262  	}
   263  
   264  	return results, nil
   265  }
   266  

View as plain text