...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/gateways.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/linkerd/linkerd2/pkg/k8s"
     8  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
     9  	"github.com/prometheus/common/model"
    10  	corev1 "k8s.io/api/core/v1"
    11  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    12  )
    13  
    14  const (
    15  	gatewayAliveQuery           = "sum(gateway_alive%s) by (%s)"
    16  	gatewayLatencyQuantileQuery = "histogram_quantile(%s, sum(irate(gateway_probe_latency_ms_bucket%s[%s])) by (le, %s))"
    17  )
    18  
    19  func (s *grpcServer) Gateways(ctx context.Context, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) {
    20  	array := []*pb.GatewaysTable_Row{}
    21  	metrics, err := s.getGatewaysMetrics(ctx, req, req.TimeWindow)
    22  
    23  	if err != nil {
    24  		return nil, err
    25  	}
    26  
    27  	for _, v := range metrics {
    28  		array = append(array, v)
    29  	}
    30  	return &pb.GatewaysResponse{
    31  		Response: &pb.GatewaysResponse_Ok_{
    32  			Ok: &pb.GatewaysResponse_Ok{
    33  				GatewaysTable: &pb.GatewaysTable{
    34  					Rows: array,
    35  				},
    36  			},
    37  		},
    38  	}, nil
    39  }
    40  
    41  func buildGatewaysRequestLabels(req *pb.GatewaysRequest) (labels model.LabelSet, labelNames model.LabelNames) {
    42  	labels = model.LabelSet{}
    43  
    44  	if req.GatewayNamespace != "" {
    45  		labels[gatewayNamespaceLabel] = model.LabelValue(req.GatewayNamespace)
    46  	}
    47  
    48  	if req.RemoteClusterName != "" {
    49  		labels[remoteClusterNameLabel] = model.LabelValue(req.RemoteClusterName)
    50  	}
    51  
    52  	groupBy := model.LabelNames{gatewayNamespaceLabel, remoteClusterNameLabel, gatewayNameLabel}
    53  
    54  	return labels, groupBy
    55  }
    56  
    57  // this function returns a map of target cluster to the number of services mirrored
    58  // from it
    59  func (s *grpcServer) getNumServicesMap(ctx context.Context) (map[string]uint64, error) {
    60  
    61  	results := make(map[string]uint64)
    62  	selector := fmt.Sprintf("%s,!%s", k8s.MirroredResourceLabel, k8s.MirroredGatewayLabel)
    63  	services, err := s.k8sAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(ctx, metav1.ListOptions{LabelSelector: selector})
    64  	if err != nil {
    65  		return nil, err
    66  	}
    67  
    68  	for _, svc := range services.Items {
    69  		clusterName := svc.Labels[k8s.RemoteClusterNameLabel]
    70  		results[clusterName]++
    71  	}
    72  
    73  	return results, nil
    74  }
    75  
    76  func processPrometheusResult(results []promResult, numSvcMap map[string]uint64) map[string]*pb.GatewaysTable_Row {
    77  
    78  	rows := make(map[string]*pb.GatewaysTable_Row)
    79  
    80  	for _, result := range results {
    81  		for _, sample := range result.vec {
    82  
    83  			clusterName := string(sample.Metric[remoteClusterNameLabel])
    84  			numPairedSvc := numSvcMap[clusterName]
    85  
    86  			addRow := func() {
    87  				if rows[clusterName] == nil {
    88  					rows[clusterName] = &pb.GatewaysTable_Row{}
    89  					rows[clusterName].ClusterName = clusterName
    90  					rows[clusterName].PairedServices = numPairedSvc
    91  				}
    92  			}
    93  
    94  			value := extractSampleValue(sample)
    95  
    96  			switch result.prom {
    97  			case promGatewayAlive:
    98  				addRow()
    99  				rows[clusterName].Alive = value > 0
   100  			case promLatencyP50:
   101  				addRow()
   102  				rows[clusterName].LatencyMsP50 = value
   103  			case promLatencyP95:
   104  				addRow()
   105  				rows[clusterName].LatencyMsP95 = value
   106  			case promLatencyP99:
   107  				addRow()
   108  				rows[clusterName].LatencyMsP99 = value
   109  			}
   110  		}
   111  	}
   112  
   113  	return rows
   114  }
   115  
   116  func (s *grpcServer) getGatewaysMetrics(ctx context.Context, req *pb.GatewaysRequest, timeWindow string) (map[string]*pb.GatewaysTable_Row, error) {
   117  	labels, groupBy := buildGatewaysRequestLabels(req)
   118  
   119  	promQueries := map[promType]string{
   120  		promGatewayAlive: fmt.Sprintf(gatewayAliveQuery, labels.String(), groupBy.String()),
   121  	}
   122  
   123  	quantileQueries := generateQuantileQueries(gatewayLatencyQuantileQuery, labels.String(), timeWindow, groupBy.String())
   124  	metricsResp, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
   125  
   126  	if err != nil {
   127  		return nil, err
   128  	}
   129  	numSvcMap, err := s.getNumServicesMap(ctx)
   130  
   131  	if err != nil {
   132  		return nil, err
   133  	}
   134  
   135  	rowsMap := processPrometheusResult(metricsResp, numSvcMap)
   136  
   137  	return rowsMap, nil
   138  }
   139  

View as plain text