1 package api
2
3 import (
4 "context"
5 "fmt"
6
7 "github.com/linkerd/linkerd2/pkg/k8s"
8 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
9 "github.com/prometheus/common/model"
10 corev1 "k8s.io/api/core/v1"
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 )
13
14 const (
15 gatewayAliveQuery = "sum(gateway_alive%s) by (%s)"
16 gatewayLatencyQuantileQuery = "histogram_quantile(%s, sum(irate(gateway_probe_latency_ms_bucket%s[%s])) by (le, %s))"
17 )
18
19 func (s *grpcServer) Gateways(ctx context.Context, req *pb.GatewaysRequest) (*pb.GatewaysResponse, error) {
20 array := []*pb.GatewaysTable_Row{}
21 metrics, err := s.getGatewaysMetrics(ctx, req, req.TimeWindow)
22
23 if err != nil {
24 return nil, err
25 }
26
27 for _, v := range metrics {
28 array = append(array, v)
29 }
30 return &pb.GatewaysResponse{
31 Response: &pb.GatewaysResponse_Ok_{
32 Ok: &pb.GatewaysResponse_Ok{
33 GatewaysTable: &pb.GatewaysTable{
34 Rows: array,
35 },
36 },
37 },
38 }, nil
39 }
40
41 func buildGatewaysRequestLabels(req *pb.GatewaysRequest) (labels model.LabelSet, labelNames model.LabelNames) {
42 labels = model.LabelSet{}
43
44 if req.GatewayNamespace != "" {
45 labels[gatewayNamespaceLabel] = model.LabelValue(req.GatewayNamespace)
46 }
47
48 if req.RemoteClusterName != "" {
49 labels[remoteClusterNameLabel] = model.LabelValue(req.RemoteClusterName)
50 }
51
52 groupBy := model.LabelNames{gatewayNamespaceLabel, remoteClusterNameLabel, gatewayNameLabel}
53
54 return labels, groupBy
55 }
56
57
58
59 func (s *grpcServer) getNumServicesMap(ctx context.Context) (map[string]uint64, error) {
60
61 results := make(map[string]uint64)
62 selector := fmt.Sprintf("%s,!%s", k8s.MirroredResourceLabel, k8s.MirroredGatewayLabel)
63 services, err := s.k8sAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(ctx, metav1.ListOptions{LabelSelector: selector})
64 if err != nil {
65 return nil, err
66 }
67
68 for _, svc := range services.Items {
69 clusterName := svc.Labels[k8s.RemoteClusterNameLabel]
70 results[clusterName]++
71 }
72
73 return results, nil
74 }
75
76 func processPrometheusResult(results []promResult, numSvcMap map[string]uint64) map[string]*pb.GatewaysTable_Row {
77
78 rows := make(map[string]*pb.GatewaysTable_Row)
79
80 for _, result := range results {
81 for _, sample := range result.vec {
82
83 clusterName := string(sample.Metric[remoteClusterNameLabel])
84 numPairedSvc := numSvcMap[clusterName]
85
86 addRow := func() {
87 if rows[clusterName] == nil {
88 rows[clusterName] = &pb.GatewaysTable_Row{}
89 rows[clusterName].ClusterName = clusterName
90 rows[clusterName].PairedServices = numPairedSvc
91 }
92 }
93
94 value := extractSampleValue(sample)
95
96 switch result.prom {
97 case promGatewayAlive:
98 addRow()
99 rows[clusterName].Alive = value > 0
100 case promLatencyP50:
101 addRow()
102 rows[clusterName].LatencyMsP50 = value
103 case promLatencyP95:
104 addRow()
105 rows[clusterName].LatencyMsP95 = value
106 case promLatencyP99:
107 addRow()
108 rows[clusterName].LatencyMsP99 = value
109 }
110 }
111 }
112
113 return rows
114 }
115
116 func (s *grpcServer) getGatewaysMetrics(ctx context.Context, req *pb.GatewaysRequest, timeWindow string) (map[string]*pb.GatewaysTable_Row, error) {
117 labels, groupBy := buildGatewaysRequestLabels(req)
118
119 promQueries := map[promType]string{
120 promGatewayAlive: fmt.Sprintf(gatewayAliveQuery, labels.String(), groupBy.String()),
121 }
122
123 quantileQueries := generateQuantileQueries(gatewayLatencyQuantileQuery, labels.String(), timeWindow, groupBy.String())
124 metricsResp, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
125
126 if err != nil {
127 return nil, err
128 }
129 numSvcMap, err := s.getNumServicesMap(ctx)
130
131 if err != nil {
132 return nil, err
133 }
134
135 rowsMap := processPrometheusResult(metricsResp, numSvcMap)
136
137 return rowsMap, nil
138 }
139
View as plain text