1 package api
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "strings"
9
10 sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
11 api "github.com/linkerd/linkerd2/controller/k8s"
12 "github.com/linkerd/linkerd2/pkg/k8s"
13 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
14 "github.com/prometheus/common/model"
15 log "github.com/sirupsen/logrus"
16 "k8s.io/apimachinery/pkg/labels"
17 "k8s.io/apimachinery/pkg/runtime"
18 )
19
20 const (
21 routeReqQuery = "sum(increase(route_response_total%s[%s])) by (%s, dst, classification)"
22 actualRouteReqQuery = "sum(increase(route_actual_response_total%s[%s])) by (%s, dst, classification)"
23 routeLatencyQuantileQuery = "histogram_quantile(%s, sum(irate(route_response_latency_ms_bucket%s[%s])) by (le, dst, %s))"
24 dstLabel = `dst=~"(%s)(:\\d+)?"`
25
26 DefaultRouteName = "[DEFAULT]"
27 )
28
29 type dstAndRoute struct {
30 dst string
31 route string
32 }
33
34 type indexedTable = map[dstAndRoute]*pb.RouteTable_Row
35
36 type resourceTable struct {
37 resource string
38 table indexedTable
39 }
40
41 func (s *grpcServer) TopRoutes(ctx context.Context, req *pb.TopRoutesRequest) (*pb.TopRoutesResponse, error) {
42 log.Debugf("TopRoutes request: %+v", req)
43
44 if !s.k8sAPI.SPAvailable() {
45 return topRoutesError(req, "Routes are not available"), nil
46 }
47
48 errRsp := validateRequest(req)
49 if errRsp != nil {
50 return errRsp, nil
51 }
52
53
54 tables := make([]resourceTable, 0)
55 targetResource := req.GetSelector().GetResource()
56 labelSelector, err := getTopLabelSelector(req)
57 if err != nil {
58 return nil, err
59 }
60 if targetResource.GetType() == k8s.Authority {
61
62
63 return topRoutesError(req, "Authority cannot be the target of a routes query; try using an authority in the --to flag instead"), nil
64 }
65
66 err = s.validateTimeWindow(ctx, req.TimeWindow)
67 if err != nil {
68 return topRoutesError(req, fmt.Sprintf("invalid time window: %s", err)), nil
69 }
70
71
72 objects, err := s.k8sAPI.GetObjects(targetResource.Namespace, targetResource.Type, targetResource.Name, labelSelector)
73 if err != nil {
74 return nil, err
75 }
76
77
78 for _, obj := range objects {
79 table, err := s.topRoutesFor(ctx, req, obj)
80 if err != nil {
81
82 continue
83 }
84 tables = append(tables, *table)
85 }
86
87 if len(tables) == 0 {
88 return topRoutesError(req, "No Service Profiles found for selected resources"), nil
89 }
90
91
92 routeTables := make([]*pb.RouteTable, 0)
93
94 for _, t := range tables {
95 rows := make([]*pb.RouteTable_Row, 0)
96 for _, row := range t.table {
97 rows = append(rows, row)
98 }
99 routeTables = append(routeTables, &pb.RouteTable{
100 Resource: t.resource,
101 Rows: rows,
102 })
103 }
104
105 return &pb.TopRoutesResponse{
106 Response: &pb.TopRoutesResponse_Ok_{
107 Ok: &pb.TopRoutesResponse_Ok{
108 Routes: routeTables,
109 },
110 },
111 }, nil
112 }
113
114
115 func (s *grpcServer) topRoutesFor(ctx context.Context, req *pb.TopRoutesRequest, object runtime.Object) (*resourceTable, error) {
116
117
118
119 name, err := api.GetNameOf(object)
120 if err != nil {
121 return nil, err
122 }
123 clientNs := req.GetSelector().GetResource().GetNamespace()
124 typ := req.GetSelector().GetResource().GetType()
125 labelSelector, err := getTopLabelSelector(req)
126 if err != nil {
127 return nil, err
128 }
129 targetResource := &pb.Resource{
130 Name: name,
131 Namespace: req.GetSelector().GetResource().GetNamespace(),
132 Type: typ,
133 }
134 requestedResource := targetResource
135 if req.GetToResource() != nil {
136 requestedResource = req.GetToResource()
137 }
138
139 profiles := make(map[string]*sp.ServiceProfile)
140
141 if requestedResource.GetType() == k8s.Authority {
142
143 profiles, err = s.getProfilesForAuthority(requestedResource.GetName(), clientNs, labelSelector)
144 if err != nil {
145 return nil, err
146 }
147 } else {
148
149
150 objects, err := s.k8sAPI.GetObjects(requestedResource.Namespace, requestedResource.Type, requestedResource.Name, labelSelector)
151 if err != nil {
152 return nil, err
153 }
154
155 for _, obj := range objects {
156
157 services, err := s.k8sAPI.GetServicesFor(obj, false)
158 if err != nil {
159 return nil, err
160 }
161
162 for _, svc := range services {
163 p := s.k8sAPI.GetServiceProfileFor(svc, clientNs, s.clusterDomain)
164 profiles[svc.GetName()] = p
165 }
166 }
167 }
168
169 metrics, err := s.getRouteMetrics(ctx, req, profiles, targetResource)
170 if err != nil {
171 return nil, err
172 }
173
174 return &resourceTable{
175 resource: fmt.Sprintf("%s/%s", typ, name),
176 table: metrics,
177 }, nil
178 }
179
180 func topRoutesError(req *pb.TopRoutesRequest, message string) *pb.TopRoutesResponse {
181 return &pb.TopRoutesResponse{
182 Response: &pb.TopRoutesResponse_Error{
183 Error: &pb.ResourceError{
184 Resource: req.GetSelector().GetResource(),
185 Error: message,
186 },
187 },
188 }
189 }
190
191 func validateRequest(req *pb.TopRoutesRequest) *pb.TopRoutesResponse {
192 if req.GetSelector().GetResource() == nil {
193 return topRoutesError(req, "TopRoutes request missing Selector Resource")
194 }
195
196 if req.GetNone() == nil {
197
198 targetType := req.GetSelector().GetResource().GetType()
199 if targetType == k8s.Service || targetType == k8s.Authority {
200 return topRoutesError(req, fmt.Sprintf("The %s resource type is not supported with 'to' queries", targetType))
201 }
202 }
203 return nil
204 }
205
206 func (s *grpcServer) getProfilesForAuthority(authority string, clientNs string, labelSelector labels.Selector) (map[string]*sp.ServiceProfile, error) {
207 if authority == "" {
208
209 ps, err := s.k8sAPI.SP().Lister().ServiceProfiles(clientNs).List(labelSelector)
210 if err != nil {
211 return nil, err
212 }
213
214 if len(ps) == 0 {
215 return nil, errors.New("No ServiceProfiles found")
216 }
217
218 profiles := make(map[string]*sp.ServiceProfile)
219
220 for _, p := range ps {
221 profiles[p.Name] = p
222 }
223
224 return profiles, nil
225 }
226
227 p, err := s.k8sAPI.SP().Lister().ServiceProfiles(clientNs).Get(authority)
228 if err != nil {
229 return nil, err
230 }
231 return map[string]*sp.ServiceProfile{
232 p.Name: p,
233 }, nil
234 }
235
236 func (s *grpcServer) getRouteMetrics(ctx context.Context, req *pb.TopRoutesRequest, profiles map[string]*sp.ServiceProfile, resource *pb.Resource) (indexedTable, error) {
237 timeWindow := req.TimeWindow
238
239 dsts := make([]string, 0)
240 for _, p := range profiles {
241 dsts = append(dsts, p.GetName())
242 }
243
244 reqLabels := s.buildRouteLabels(req, dsts, resource)
245 groupBy := "rt_route"
246
247 queries := map[promType]string{
248 promRequests: fmt.Sprintf(routeReqQuery, reqLabels, timeWindow, groupBy),
249 }
250
251 if req.GetOutbound() != nil && req.GetNone() == nil {
252
253 queries[promActualRequests] = fmt.Sprintf(actualRouteReqQuery, reqLabels, timeWindow, groupBy)
254 }
255
256 quantileQueries := generateQuantileQueries(routeLatencyQuantileQuery, reqLabels, timeWindow, groupBy)
257 results, err := s.getPrometheusMetrics(ctx, queries, quantileQueries)
258 if err != nil {
259 return nil, err
260 }
261
262 table := make(indexedTable)
263 for service, profile := range profiles {
264 for _, route := range profile.Spec.Routes {
265 key := dstAndRoute{
266 dst: profile.GetName(),
267 route: route.Name,
268 }
269 table[key] = &pb.RouteTable_Row{
270 Authority: service,
271 Route: route.Name,
272 Stats: &pb.BasicStats{},
273 }
274 }
275 defaultKey := dstAndRoute{
276 dst: profile.GetName(),
277 route: "",
278 }
279 table[defaultKey] = &pb.RouteTable_Row{
280 Authority: service,
281 Route: DefaultRouteName,
282 Stats: &pb.BasicStats{},
283 }
284 }
285
286 processRouteMetrics(results, timeWindow, table)
287
288 return table, nil
289 }
290
291 func (s *grpcServer) buildRouteLabels(req *pb.TopRoutesRequest, dsts []string, resource *pb.Resource) string {
292
293 var labels model.LabelSet
294
295 switch req.Outbound.(type) {
296
297 case *pb.TopRoutesRequest_ToResource:
298 labels = labels.Merge(promQueryLabels(resource))
299 labels = labels.Merge(promDirectionLabels("outbound"))
300 return renderLabels(labels, dsts)
301
302 default:
303 labels = labels.Merge(promDirectionLabels("inbound"))
304 labels = labels.Merge(promQueryLabels(resource))
305 return renderLabels(labels, dsts)
306 }
307 }
308
309 func renderLabels(labels model.LabelSet, services []string) string {
310 pairs := make([]string, 0)
311 for k, v := range labels {
312 pairs = append(pairs, fmt.Sprintf("%s=%q", k, v))
313 }
314 if len(services) > 0 {
315 pairs = append(pairs, fmt.Sprintf(dstLabel, strings.Join(services, "|")))
316 }
317 sort.Strings(pairs)
318 return fmt.Sprintf("{%s}", strings.Join(pairs, ", "))
319 }
320
321 func processRouteMetrics(results []promResult, timeWindow string, table indexedTable) {
322 for _, result := range results {
323 for _, sample := range result.vec {
324 route := string(sample.Metric[model.LabelName("rt_route")])
325 dst := string(sample.Metric[model.LabelName("dst")])
326 dst = strings.Split(dst, ":")[0]
327
328 key := dstAndRoute{dst, route}
329
330 if table[key] == nil {
331 log.Warnf("Found stats for unknown route: %s:%s", dst, route)
332 continue
333 }
334
335 table[key].TimeWindow = timeWindow
336 value := extractSampleValue(sample)
337
338 switch result.prom {
339 case promRequests:
340 switch string(sample.Metric[model.LabelName("classification")]) {
341 case success:
342 table[key].Stats.SuccessCount += value
343 case failure:
344 table[key].Stats.FailureCount += value
345 }
346 case promActualRequests:
347 switch string(sample.Metric[model.LabelName("classification")]) {
348 case success:
349 table[key].Stats.ActualSuccessCount += value
350 case failure:
351 table[key].Stats.ActualFailureCount += value
352 }
353 case promLatencyP50:
354 table[key].Stats.LatencyMsP50 = value
355 case promLatencyP95:
356 table[key].Stats.LatencyMsP95 = value
357 case promLatencyP99:
358 table[key].Stats.LatencyMsP99 = value
359 }
360 }
361 }
362 }
363
364
365 func getTopLabelSelector(req *pb.TopRoutesRequest) (labels.Selector, error) {
366 labelSelector := labels.Everything()
367 if s := req.GetSelector().GetLabelSelector(); s != "" {
368 var err error
369 labelSelector, err = labels.Parse(s)
370 if err != nil {
371 return nil, fmt.Errorf("invalid label selector %q: %w", s, err)
372 }
373 }
374 return labelSelector, nil
375 }
376
View as plain text