1 package api
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "math"
8 "sort"
9 "strings"
10 "time"
11
12 "github.com/linkerd/linkerd2/pkg/k8s"
13 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
14 "github.com/prometheus/common/model"
15 log "github.com/sirupsen/logrus"
16 "go.opencensus.io/trace"
17 )
18
19 type promType string
20 type promResult struct {
21 prom promType
22 vec model.Vector
23 err error
24 }
25
26 const (
27 promGatewayAlive = promType("QUERY_GATEWAY_ALIVE")
28 promRequests = promType("QUERY_REQUESTS")
29 promAllowedRequests = promType("QUERY_ALLOWED_REQUESTS")
30 promDeniedRequests = promType("QUERY_DENIED_REQUESTS")
31 promActualRequests = promType("QUERY_ACTUAL_REQUESTS")
32 promTCPConnections = promType("QUERY_TCP_CONNECTIONS")
33 promTCPReadBytes = promType("QUERY_TCP_READ_BYTES")
34 promTCPWriteBytes = promType("QUERY_TCP_WRITE_BYTES")
35 promLatencyP50 = promType("0.5")
36 promLatencyP95 = promType("0.95")
37 promLatencyP99 = promType("0.99")
38
39 namespaceLabel = model.LabelName("namespace")
40 dstNamespaceLabel = model.LabelName("dst_namespace")
41 gatewayNameLabel = model.LabelName("gateway_name")
42 gatewayNamespaceLabel = model.LabelName("gateway_namespace")
43 remoteClusterNameLabel = model.LabelName("target_cluster_name")
44 authorityLabel = model.LabelName("authority")
45 serverKindLabel = model.LabelName("srv_kind")
46 serverNameLabel = model.LabelName("srv_name")
47 authorizationKindLabel = model.LabelName("authz_kind")
48 authorizationNameLabel = model.LabelName("authz_name")
49 routeKindLabel = model.LabelName("route_kind")
50 routeNameLabel = model.LabelName("route_name")
51 )
52
53 var (
54
55 ErrNoPrometheusInstance = errors.New("No prometheus instance to connect")
56 )
57
58 func extractSampleValue(sample *model.Sample) uint64 {
59 value := uint64(0)
60 if !math.IsNaN(float64(sample.Value)) {
61 value = uint64(math.Round(float64(sample.Value)))
62 }
63 return value
64 }
65
66 func (s *grpcServer) queryProm(ctx context.Context, query string) (model.Vector, error) {
67 log.Debugf("Query request: %q", query)
68
69 _, span := trace.StartSpan(ctx, "query.prometheus")
70 defer span.End()
71 span.AddAttributes(trace.StringAttribute("queryString", query))
72
73 if s.prometheusAPI == nil {
74 return nil, ErrNoPrometheusInstance
75 }
76
77
78 res, warn, err := s.prometheusAPI.Query(ctx, query, time.Time{})
79 if err != nil {
80 return nil, fmt.Errorf("Query failed: %q: %w", query, err)
81 }
82 if warn != nil {
83 log.Warnf("%v", warn)
84 }
85 log.Debugf("Query response:\n\t%+v", res)
86
87 if res.Type() != model.ValVector {
88 return nil, fmt.Errorf("Unexpected query result type (expected Vector): %s", res.Type())
89 }
90
91 return res.(model.Vector), nil
92 }
93
94
95
96 func promGroupByLabelNames(resource *pb.Resource) model.LabelNames {
97 names := model.LabelNames{namespaceLabel}
98
99 if resource.Type != k8s.Namespace {
100 names = append(names, promResourceType(resource))
101 }
102 return names
103 }
104
105
106
107 func promDstGroupByLabelNames(resource *pb.Resource) model.LabelNames {
108 names := model.LabelNames{dstNamespaceLabel}
109
110 if isNonK8sResourceQuery(resource.GetType()) {
111 names = append(names, promResourceType(resource))
112 } else if resource.Type != k8s.Namespace {
113 names = append(names, "dst_"+promResourceType(resource))
114 }
115 return names
116 }
117
118
119 func promQueryLabels(resource *pb.Resource) model.LabelSet {
120 set := model.LabelSet{}
121 if resource != nil {
122 if resource.Name != "" {
123 if resource.GetType() == k8s.Server {
124 set[serverKindLabel] = model.LabelValue("server")
125 set[serverNameLabel] = model.LabelValue(resource.GetName())
126 } else if resource.GetType() == k8s.ServerAuthorization {
127 set[authorizationKindLabel] = model.LabelValue("serverauthorization")
128 set[authorizationNameLabel] = model.LabelValue(resource.GetName())
129 } else if resource.GetType() == k8s.AuthorizationPolicy {
130 set[authorizationKindLabel] = model.LabelValue("authorizationpolicy")
131 set[authorizationNameLabel] = model.LabelValue(resource.GetName())
132 } else if resource.GetType() == k8s.HTTPRoute {
133 set[routeNameLabel] = model.LabelValue(resource.GetName())
134 } else if resource.GetType() != k8s.Service {
135 set[promResourceType(resource)] = model.LabelValue(resource.Name)
136 }
137 }
138 if shouldAddNamespaceLabel(resource) {
139 set[namespaceLabel] = model.LabelValue(resource.Namespace)
140 }
141 }
142 return set
143 }
144
145
146 func promDstQueryLabels(resource *pb.Resource) model.LabelSet {
147 set := model.LabelSet{}
148 if resource.Name != "" {
149 if isNonK8sResourceQuery(resource.GetType()) {
150 set[promResourceType(resource)] = model.LabelValue(resource.Name)
151 } else {
152 set["dst_"+promResourceType(resource)] = model.LabelValue(resource.Name)
153 if shouldAddNamespaceLabel(resource) {
154 set[dstNamespaceLabel] = model.LabelValue(resource.Namespace)
155 }
156 }
157 }
158
159 return set
160 }
161
162
163
164
165 func generateLabelStringWithExclusion(l model.LabelSet, labelNames ...string) string {
166 lstrs := make([]string, 0, len(l))
167 for l, v := range l {
168 lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
169 }
170 for _, labelName := range labelNames {
171 lstrs = append(lstrs, fmt.Sprintf(`%s!=""`, labelName))
172 }
173
174 sort.Strings(lstrs)
175 return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
176 }
177
178
179
180 func generateLabelStringWithRegex(l model.LabelSet, labelName string, stringToMatch string) string {
181 lstrs := make([]string, 0, len(l))
182 for l, v := range l {
183 lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v))
184 }
185 lstrs = append(lstrs, fmt.Sprintf(`%s=~"^%s.*"`, labelName, stringToMatch))
186
187 sort.Strings(lstrs)
188 return fmt.Sprintf("{%s}", strings.Join(lstrs, ", "))
189 }
190
191
192
193 func generateQuantileQueries(quantileQuery, labels, timeWindow, groupBy string) map[promType]string {
194 return map[promType]string{
195 promLatencyP50: fmt.Sprintf(quantileQuery, promLatencyP50, labels, timeWindow, groupBy),
196 promLatencyP95: fmt.Sprintf(quantileQuery, promLatencyP95, labels, timeWindow, groupBy),
197 promLatencyP99: fmt.Sprintf(quantileQuery, promLatencyP99, labels, timeWindow, groupBy),
198 }
199 }
200
201
202 func shouldAddNamespaceLabel(resource *pb.Resource) bool {
203 return resource.Type != k8s.Namespace && resource.Namespace != ""
204 }
205
206
207 func promDirectionLabels(direction string) model.LabelSet {
208 return model.LabelSet{
209 model.LabelName("direction"): model.LabelValue(direction),
210 }
211 }
212
213 func promPeerLabel(peer string) model.LabelSet {
214 return model.LabelSet{
215 model.LabelName("peer"): model.LabelValue(peer),
216 }
217 }
218
219 func promResourceType(resource *pb.Resource) model.LabelName {
220 l5dLabel := k8s.KindToL5DLabel(resource.Type)
221 return model.LabelName(l5dLabel)
222 }
223
224 func (s *grpcServer) getPrometheusMetrics(ctx context.Context, requestQueries map[promType]string, latencyQueries map[promType]string) ([]promResult, error) {
225 resultChan := make(chan promResult)
226
227 for pt, query := range requestQueries {
228 go func(typ promType, promQuery string) {
229 resultVector, err := s.queryProm(ctx, promQuery)
230 resultChan <- promResult{
231 prom: typ,
232 vec: resultVector,
233 err: err,
234 }
235 }(pt, query)
236 }
237
238 for quantile, query := range latencyQueries {
239 go func(qt promType, promQuery string) {
240 resultVector, err := s.queryProm(ctx, promQuery)
241 resultChan <- promResult{
242 prom: qt,
243 vec: resultVector,
244 err: err,
245 }
246 }(quantile, query)
247 }
248
249 var err error
250 results := []promResult{}
251 for i := 0; i < len(latencyQueries)+len(requestQueries); i++ {
252 result := <-resultChan
253 if result.err != nil {
254 log.Errorf("queryProm failed with: %s", result.err)
255 err = result.err
256 } else {
257 results = append(results, result)
258 }
259 }
260 if err != nil {
261 return nil, err
262 }
263
264 return results, nil
265 }
266
View as plain text