1 package api
2
3 import (
4 "context"
5 "fmt"
6 "reflect"
7 "sort"
8 "strings"
9
10 "github.com/linkerd/linkerd2/pkg/k8s"
11 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
12 vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
13 "github.com/prometheus/common/model"
14 log "github.com/sirupsen/logrus"
15 "google.golang.org/protobuf/proto"
16 corev1 "k8s.io/api/core/v1"
17 kerrors "k8s.io/apimachinery/pkg/api/errors"
18 "k8s.io/apimachinery/pkg/api/meta"
19 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20 "k8s.io/apimachinery/pkg/labels"
21 "k8s.io/apimachinery/pkg/runtime"
22 )
23
24 type resourceResult struct {
25 res *pb.StatTable
26 err error
27 }
28 type k8sStat struct {
29 object metav1.Object
30 podStats *podStats
31 }
32
33 type rKey struct {
34 Namespace string
35 Type string
36 Name string
37 }
38
39 type dstKey struct {
40 Namespace string
41 Service string
42 Dst string
43 Weight string
44 }
45
46 const (
47 success = "success"
48 failure = "failure"
49
50 reqQuery = "sum(increase(response_total%s[%s])) by (%s, classification, tls)"
51 latencyQuantileQuery = "histogram_quantile(%s, sum(irate(response_latency_ms_bucket%s[%s])) by (le, %s))"
52 tcpConnectionsQuery = "sum(tcp_open_connections%s) by (%s)"
53 tcpReadBytesQuery = "sum(increase(tcp_read_bytes_total%s[%s])) by (%s)"
54 tcpWriteBytesQuery = "sum(increase(tcp_write_bytes_total%s[%s])) by (%s)"
55
56 regexAny = ".+"
57 )
58
59 type podStats struct {
60 status string
61 inMesh uint64
62 total uint64
63 failed uint64
64 errors map[string]*pb.PodErrors
65 }
66
67 func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest) (*pb.StatSummaryResponse, error) {
68
69
70 if req.GetSelector().GetResource() == nil {
71 return statSummaryError(req, "StatSummary request missing Selector Resource"), nil
72 }
73
74
75 if req.GetFromResource() != nil && req.GetFromResource().GetType() == k8s.Service {
76 return statSummaryError(req, "service is not supported as a target on 'from' queries, or as a target with 'to' queries"), nil
77 }
78
79
80 if req.GetFromResource() != nil {
81 if isPolicyResource(req.GetSelector().GetResource()) ||
82 isPolicyResource(req.GetFromResource()) {
83 return statSummaryError(req, "'from' queries are not supported with policy resources, as they have inbound metrics only"), nil
84 }
85 }
86
87 if req.GetToResource() != nil {
88 if isPolicyResource(req.GetSelector().GetResource()) ||
89 isPolicyResource(req.GetToResource()) {
90 return statSummaryError(req, "'to' queries are not supported with policy resources, as they have inbound metrics only"), nil
91 }
92 }
93
94 switch ob := req.Outbound.(type) {
95 case *pb.StatSummaryRequest_ToResource:
96 if ob.ToResource.Type == k8s.All {
97 return statSummaryError(req, "resource type 'all' is not supported as a filter"), nil
98 }
99 case *pb.StatSummaryRequest_FromResource:
100 if ob.FromResource.Type == k8s.All {
101 return statSummaryError(req, "resource type 'all' is not supported as a filter"), nil
102 }
103 }
104
105 err := s.validateTimeWindow(ctx, req.TimeWindow)
106 if err != nil {
107 return statSummaryError(req, fmt.Sprintf("invalid time window: %s", err)), nil
108 }
109
110 statTables := make([]*pb.StatTable, 0)
111
112 var resourcesToQuery []string
113 if req.Selector.Resource.Type == k8s.All {
114 resourcesToQuery = k8s.StatAllResourceTypes
115 } else {
116 resourcesToQuery = []string{req.Selector.Resource.Type}
117 }
118
119
120 resultChan := make(chan resourceResult)
121
122 for _, resource := range resourcesToQuery {
123 statReq := proto.Clone(req).(*pb.StatSummaryRequest)
124 statReq.Selector.Resource.Type = resource
125
126 go func() {
127 if isNonK8sResourceQuery(statReq.GetSelector().GetResource().GetType()) {
128 resultChan <- s.nonK8sResourceQuery(ctx, statReq)
129 } else if statReq.GetSelector().GetResource().GetType() == k8s.Service {
130 resultChan <- s.serviceResourceQuery(ctx, statReq)
131 } else if isPolicyResource(statReq.GetSelector().GetResource()) {
132 resultChan <- s.policyResourceQuery(ctx, statReq)
133 } else {
134 resultChan <- s.k8sResourceQuery(ctx, statReq)
135 }
136 }()
137 }
138
139 for i := 0; i < len(resourcesToQuery); i++ {
140 result := <-resultChan
141 if result.err != nil {
142 return nil, vizutil.GRPCError(result.err)
143 }
144 statTables = append(statTables, result.res)
145 }
146
147 rsp := pb.StatSummaryResponse{
148 Response: &pb.StatSummaryResponse_Ok_{
149 Ok: &pb.StatSummaryResponse_Ok{
150 StatTables: statTables,
151 },
152 },
153 }
154
155 log.Debugf("Sent response as %+v\n", statTables)
156 return &rsp, nil
157 }
158
159 func statSummaryError(req *pb.StatSummaryRequest, message string) *pb.StatSummaryResponse {
160 return &pb.StatSummaryResponse{
161 Response: &pb.StatSummaryResponse_Error{
162 Error: &pb.ResourceError{
163 Resource: req.GetSelector().GetResource(),
164 Error: message,
165 },
166 },
167 }
168 }
169
170 func (s *grpcServer) getKubernetesObjectStats(req *pb.StatSummaryRequest) (map[rKey]k8sStat, error) {
171 requestedResource := req.GetSelector().GetResource()
172
173 labelSelector, err := getLabelSelector(req)
174 if err != nil {
175 return nil, err
176 }
177
178 objects, err := s.k8sAPI.GetObjects(requestedResource.Namespace, requestedResource.Type, requestedResource.Name, labelSelector)
179 if err != nil {
180 return nil, err
181 }
182
183 objectMap := map[rKey]k8sStat{}
184
185 for _, object := range objects {
186 metaObj, err := meta.Accessor(object)
187 if err != nil {
188 return nil, err
189 }
190
191 key := rKey{
192 Name: metaObj.GetName(),
193 Namespace: metaObj.GetNamespace(),
194 Type: requestedResource.GetType(),
195 }
196
197 podStats, err := s.getPodStats(object)
198 if err != nil {
199 return nil, err
200 }
201
202 objectMap[key] = k8sStat{
203 object: metaObj,
204 podStats: podStats,
205 }
206 }
207 return objectMap, nil
208 }
209
210 func (s *grpcServer) k8sResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
211
212 k8sObjects, err := s.getKubernetesObjectStats(req)
213 if err != nil {
214 return resourceResult{res: nil, err: err}
215 }
216
217 var requestMetrics map[rKey]*pb.BasicStats
218 var tcpMetrics map[rKey]*pb.TcpStats
219 if !req.SkipStats {
220 requestMetrics, tcpMetrics, err = s.getStatMetrics(ctx, req, req.TimeWindow)
221 if err != nil {
222 return resourceResult{res: nil, err: err}
223 }
224 }
225
226 rows := make([]*pb.StatTable_PodGroup_Row, 0)
227 keys := getResultKeys(req, k8sObjects, requestMetrics)
228
229 for _, key := range keys {
230 objInfo, ok := k8sObjects[key]
231 if !ok {
232 continue
233 }
234
235 var tcpStats *pb.TcpStats
236 if req.TcpStats {
237 tcpStats = tcpMetrics[key]
238 }
239
240 var basicStats *pb.BasicStats
241 if !reflect.DeepEqual(requestMetrics[key], &pb.BasicStats{}) {
242 basicStats = requestMetrics[key]
243 }
244
245 k8sResource := objInfo.object
246 row := pb.StatTable_PodGroup_Row{
247 Resource: &pb.Resource{
248 Name: k8sResource.GetName(),
249 Namespace: k8sResource.GetNamespace(),
250 Type: req.GetSelector().GetResource().GetType(),
251 },
252 TimeWindow: req.TimeWindow,
253 Stats: basicStats,
254 TcpStats: tcpStats,
255 }
256
257 podStat := objInfo.podStats
258 row.Status = podStat.status
259 row.MeshedPodCount = podStat.inMesh
260 row.RunningPodCount = podStat.total
261 row.FailedPodCount = podStat.failed
262 row.ErrorsByPod = podStat.errors
263
264 rows = append(rows, &row)
265 }
266
267 rsp := pb.StatTable{
268 Table: &pb.StatTable_PodGroup_{
269 PodGroup: &pb.StatTable_PodGroup{
270 Rows: rows,
271 },
272 },
273 }
274
275 return resourceResult{res: &rsp, err: nil}
276 }
277
278 func (s *grpcServer) serviceResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
279
280 rows := make([]*pb.StatTable_PodGroup_Row, 0)
281 dstBasicStats := make(map[dstKey]*pb.BasicStats)
282 dstTCPStats := make(map[dstKey]*pb.TcpStats)
283
284 if !req.SkipStats {
285 var err error
286 dstBasicStats, dstTCPStats, err = s.getServiceMetrics(ctx, req, req.TimeWindow)
287 if err != nil {
288 return resourceResult{res: nil, err: err}
289 }
290 }
291
292 weights := make(map[dstKey]string)
293 for k := range dstBasicStats {
294 weights[k] = ""
295 }
296
297 name := req.GetSelector().GetResource().GetName()
298 namespace := req.GetSelector().GetResource().GetNamespace()
299
300
301 spName := fmt.Sprintf("%s.%s.svc.%s", name, namespace, s.clusterDomain)
302 sp, err := s.k8sAPI.SP().Lister().ServiceProfiles(namespace).Get(spName)
303 if err == nil {
304 for _, weightedDst := range sp.Spec.DstOverrides {
305 weights[dstKey{
306 Namespace: namespace,
307 Service: name,
308 Dst: dstFromAuthority(weightedDst.Authority),
309 }] = weightedDst.Weight.String()
310 }
311 } else if !kerrors.IsNotFound(err) {
312 log.Errorf("Failed to get weights from ServiceProfile %q: %v", spName, err)
313 }
314
315 for k, weight := range weights {
316 row := pb.StatTable_PodGroup_Row{
317 Resource: &pb.Resource{
318 Name: k.Service,
319 Namespace: k.Namespace,
320 Type: req.GetSelector().GetResource().GetType(),
321 },
322 TimeWindow: req.TimeWindow,
323 Stats: dstBasicStats[k],
324 TcpStats: dstTCPStats[k],
325 }
326
327
328 if weight != "" {
329 row.TsStats = &pb.TrafficSplitStats{
330 Apex: k.Service,
331 Leaf: k.Dst,
332 Weight: weight,
333 }
334 }
335 rows = append(rows, &row)
336 }
337
338
339 rows = sortTrafficSplitRows(rows)
340
341 rsp := pb.StatTable{
342 Table: &pb.StatTable_PodGroup_{
343 PodGroup: &pb.StatTable_PodGroup{
344 Rows: rows,
345 },
346 },
347 }
348
349 return resourceResult{res: &rsp, err: nil}
350 }
351
352 func sortTrafficSplitRows(rows []*pb.StatTable_PodGroup_Row) []*pb.StatTable_PodGroup_Row {
353 sort.Slice(rows, func(i, j int) bool {
354 if rows[i].TsStats != nil && rows[j].TsStats != nil {
355 key1 := rows[i].TsStats.Apex + rows[i].TsStats.Leaf
356 key2 := rows[j].TsStats.Apex + rows[j].TsStats.Leaf
357 return key1 < key2
358 }
359 return false
360 })
361 return rows
362 }
363
364 func (s *grpcServer) nonK8sResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
365 var requestMetrics map[rKey]*pb.BasicStats
366 if !req.SkipStats {
367 var err error
368 requestMetrics, _, err = s.getStatMetrics(ctx, req, req.TimeWindow)
369 if err != nil {
370 return resourceResult{res: nil, err: err}
371 }
372 }
373 rows := make([]*pb.StatTable_PodGroup_Row, 0)
374
375 for rkey, metrics := range requestMetrics {
376 rkey.Type = req.GetSelector().GetResource().GetType()
377
378 row := pb.StatTable_PodGroup_Row{
379 Resource: &pb.Resource{
380 Type: rkey.Type,
381 Namespace: rkey.Namespace,
382 Name: rkey.Name,
383 },
384 TimeWindow: req.TimeWindow,
385 Stats: metrics,
386 }
387 rows = append(rows, &row)
388 }
389
390 rsp := pb.StatTable{
391 Table: &pb.StatTable_PodGroup_{
392 PodGroup: &pb.StatTable_PodGroup{
393 Rows: rows,
394 },
395 },
396 }
397 return resourceResult{res: &rsp, err: nil}
398 }
399
400 func isNonK8sResourceQuery(resourceType string) bool {
401 return resourceType == k8s.Authority
402 }
403
404
405 func getResultKeys(
406 req *pb.StatSummaryRequest,
407 k8sObjects map[rKey]k8sStat,
408 metricResults map[rKey]*pb.BasicStats,
409 ) []rKey {
410 var keys []rKey
411
412 if req.GetOutbound() == nil || req.GetNone() != nil {
413
414 for key := range k8sObjects {
415 keys = append(keys, key)
416 }
417 } else {
418
419
420 for key := range metricResults {
421 keys = append(keys, key)
422 }
423 }
424 return keys
425 }
426
427 func buildRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
428
429
430
431 switch out := req.Outbound.(type) {
432 case *pb.StatSummaryRequest_ToResource:
433 labelNames = promGroupByLabelNames(req.Selector.Resource)
434
435 labels = labels.Merge(promDstQueryLabels(out.ToResource))
436 labels = labels.Merge(promQueryLabels(req.Selector.Resource))
437 labels = labels.Merge(promDirectionLabels("outbound"))
438
439 case *pb.StatSummaryRequest_FromResource:
440 labelNames = promDstGroupByLabelNames(req.Selector.Resource)
441
442 labels = labels.Merge(promQueryLabels(out.FromResource))
443 labels = labels.Merge(promDstQueryLabels(req.Selector.Resource))
444 labels = labels.Merge(promDirectionLabels("outbound"))
445
446 default:
447 labelNames = promGroupByLabelNames(req.Selector.Resource)
448
449 labels = labels.Merge(promQueryLabels(req.Selector.Resource))
450 labels = labels.Merge(promDirectionLabels("inbound"))
451 }
452
453 return
454 }
455
456 func buildServiceRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
457
458
459
460 labels = model.LabelSet{
461 "direction": model.LabelValue("outbound"),
462 }
463
464 switch out := req.Outbound.(type) {
465 case *pb.StatSummaryRequest_ToResource:
466
467
468
469 labels = labels.Merge(promDstQueryLabels(out.ToResource))
470
471 case *pb.StatSummaryRequest_FromResource:
472
473 labels = labels.Merge(promQueryLabels(out.FromResource))
474
475 default:
476
477 }
478
479 groupBy := model.LabelNames{model.LabelName("dst_namespace"), model.LabelName("dst_service")}
480
481 return labels, groupBy
482 }
483
484 func buildTCPStatsRequestLabels(req *pb.StatSummaryRequest, reqLabels model.LabelSet) string {
485 switch req.Outbound.(type) {
486 case *pb.StatSummaryRequest_ToResource, *pb.StatSummaryRequest_FromResource:
487
488 reqLabels = reqLabels.Merge(promPeerLabel("dst"))
489
490 default:
491
492 reqLabels = reqLabels.Merge(promPeerLabel("src"))
493 }
494 return reqLabels.String()
495 }
496
497 func (s *grpcServer) getStatMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, error) {
498 reqLabels, groupBy := buildRequestLabels(req)
499 promQueries := map[promType]string{
500 promRequests: fmt.Sprintf(reqQuery, reqLabels.String(), timeWindow, groupBy.String()),
501 }
502
503 if req.TcpStats {
504 promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, reqLabels.String(), groupBy.String())
505
506 tcpLabels := buildTCPStatsRequestLabels(req, reqLabels)
507 promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabels, timeWindow, groupBy.String())
508 promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabels, timeWindow, groupBy.String())
509 }
510
511 quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
512 results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
513
514 if err != nil {
515 return nil, nil, err
516 }
517
518 basicStats, tcpStats, _ := processPrometheusMetrics(req, results, groupBy)
519 return basicStats, tcpStats, nil
520 }
521
522 func (s *grpcServer) getServiceMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[dstKey]*pb.BasicStats, map[dstKey]*pb.TcpStats, error) {
523 dstBasicStats := make(map[dstKey]*pb.BasicStats)
524 dstTCPStats := make(map[dstKey]*pb.TcpStats)
525 labels, groupBy := buildServiceRequestLabels(req)
526
527 service := req.GetSelector().GetResource().GetName()
528 namespace := req.GetSelector().GetResource().GetNamespace()
529
530 if service == "" {
531 service = regexAny
532 }
533 authority := fmt.Sprintf("%s.%s.svc.%s", service, namespace, s.clusterDomain)
534
535 reqLabels := generateLabelStringWithRegex(labels, string(authorityLabel), authority)
536
537 promQueries := map[promType]string{
538 promRequests: fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String()),
539 }
540
541 if req.TcpStats {
542
543 tcpLabels := labels.Merge(promPeerLabel("dst"))
544 tcpLabelString := generateLabelStringWithRegex(tcpLabels, string(authorityLabel), authority)
545 promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, tcpLabelString, groupBy.String())
546 promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabelString, timeWindow, groupBy.String())
547 promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabelString, timeWindow, groupBy.String())
548 }
549
550 quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels, timeWindow, groupBy.String())
551 results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
552 if err != nil {
553 return nil, nil, err
554 }
555
556 basicStats, tcpStats, _ := processPrometheusMetrics(req, results, groupBy)
557
558 for rKey, basicStatsVal := range basicStats {
559
560
561 svcName := service
562 if svcName == regexAny {
563 svcName = rKey.Name
564 }
565
566 dstBasicStats[dstKey{
567 Namespace: rKey.Namespace,
568 Service: svcName,
569 Dst: rKey.Name,
570 }] = basicStatsVal
571 }
572
573 for rKey, tcpStatsVal := range tcpStats {
574
575
576 svcName := service
577 if svcName == regexAny {
578 svcName = rKey.Name
579 }
580
581 dstTCPStats[dstKey{
582 Namespace: rKey.Namespace,
583 Service: svcName,
584 Dst: rKey.Name,
585 }] = tcpStatsVal
586 }
587
588 return dstBasicStats, dstTCPStats, nil
589 }
590
591 func processPrometheusMetrics(req *pb.StatSummaryRequest, results []promResult, groupBy model.LabelNames) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, map[rKey]*pb.ServerStats) {
592 basicStats := make(map[rKey]*pb.BasicStats)
593 tcpStats := make(map[rKey]*pb.TcpStats)
594 authzStats := make(map[rKey]*pb.ServerStats)
595
596 for _, result := range results {
597 for _, sample := range result.vec {
598 resource := metricToKey(req, sample.Metric, groupBy)
599
600 addBasicStats := func() {
601 if basicStats[resource] == nil {
602 basicStats[resource] = &pb.BasicStats{}
603 }
604 }
605 addTCPStats := func() {
606 if tcpStats[resource] == nil {
607 tcpStats[resource] = &pb.TcpStats{}
608 }
609 }
610
611 if authzStats[resource] == nil {
612 srv := pb.Resource{
613 Type: string(sample.Metric[serverKindLabel]),
614 Name: string(sample.Metric[serverNameLabel]),
615 }
616 route := pb.Resource{
617 Type: string(sample.Metric[routeKindLabel]),
618 Name: string(sample.Metric[routeNameLabel]),
619 }
620 authz := pb.Resource{
621 Type: string(sample.Metric[authorizationKindLabel]),
622 Name: string(sample.Metric[authorizationNameLabel]),
623 }
624 authzStats[resource] = &pb.ServerStats{
625 Srv: &srv,
626 Route: &route,
627 Authz: &authz,
628 }
629 }
630
631 value := extractSampleValue(sample)
632
633 switch result.prom {
634 case promRequests:
635 addBasicStats()
636 switch string(sample.Metric[model.LabelName("classification")]) {
637 case success:
638 basicStats[resource].SuccessCount += value
639 case failure:
640 basicStats[resource].FailureCount += value
641 }
642 case promLatencyP50:
643 addBasicStats()
644 basicStats[resource].LatencyMsP50 = value
645 case promLatencyP95:
646 addBasicStats()
647 basicStats[resource].LatencyMsP95 = value
648 case promLatencyP99:
649 addBasicStats()
650 basicStats[resource].LatencyMsP99 = value
651 case promTCPConnections:
652 addTCPStats()
653 tcpStats[resource].OpenConnections = value
654 case promTCPReadBytes:
655 addTCPStats()
656 tcpStats[resource].ReadBytesTotal = value
657 case promTCPWriteBytes:
658 addTCPStats()
659 tcpStats[resource].WriteBytesTotal = value
660 case promAllowedRequests:
661 authzStats[resource].AllowedCount = value
662 case promDeniedRequests:
663 authzStats[resource].DeniedCount = value
664 }
665 }
666 }
667
668 return basicStats, tcpStats, authzStats
669 }
670
671 func metricToKey(req *pb.StatSummaryRequest, metric model.Metric, groupBy model.LabelNames) rKey {
672
673
674
675 key := rKey{
676 Type: req.GetSelector().GetResource().GetType(),
677 Name: string(metric[groupBy[len(groupBy)-1]]),
678 }
679
680 if len(groupBy) >= 2 {
681 key.Namespace = string(metric[groupBy[len(groupBy)-2]])
682 }
683
684 return key
685 }
686
687 func (s *grpcServer) getPodStats(obj runtime.Object) (*podStats, error) {
688 pods, err := s.k8sAPI.GetPodsFor(obj, true)
689 if err != nil {
690 return nil, err
691 }
692 podErrors := make(map[string]*pb.PodErrors)
693 meshCount := &podStats{}
694
695 if pod, ok := obj.(*corev1.Pod); ok {
696 meshCount.status = k8s.GetPodStatus(*pod)
697 }
698
699 for _, pod := range pods {
700 if pod.Status.Phase == corev1.PodFailed {
701 meshCount.failed++
702 } else {
703 meshCount.total++
704 if k8s.IsMeshed(pod, s.controllerNamespace) {
705 meshCount.inMesh++
706 }
707 }
708
709 errors := checkContainerErrors(pod.Status.ContainerStatuses)
710 errors = append(errors, checkContainerErrors(pod.Status.InitContainerStatuses)...)
711
712 if len(errors) > 0 {
713 podErrors[pod.Name] = &pb.PodErrors{Errors: errors}
714 }
715 }
716 meshCount.errors = podErrors
717 return meshCount, nil
718 }
719
720 func toPodError(container, image, reason, message string) *pb.PodErrors_PodError {
721 return &pb.PodErrors_PodError{
722 Error: &pb.PodErrors_PodError_Container{
723 Container: &pb.PodErrors_PodError_ContainerError{
724 Message: message,
725 Container: container,
726 Image: image,
727 Reason: reason,
728 },
729 },
730 }
731 }
732
733 func checkContainerErrors(containerStatuses []corev1.ContainerStatus) []*pb.PodErrors_PodError {
734 errors := []*pb.PodErrors_PodError{}
735 for _, st := range containerStatuses {
736 if !st.Ready {
737 if st.State.Waiting != nil {
738 errors = append(errors, toPodError(st.Name, st.Image, st.State.Waiting.Reason, st.State.Waiting.Message))
739 }
740
741 if st.State.Terminated != nil && (st.State.Terminated.ExitCode != 0 || st.State.Terminated.Signal != 0) {
742 errors = append(errors, toPodError(st.Name, st.Image, st.State.Terminated.Reason, st.State.Terminated.Message))
743 }
744
745 if st.LastTerminationState.Waiting != nil {
746 errors = append(errors, toPodError(st.Name, st.Image, st.LastTerminationState.Waiting.Reason, st.LastTerminationState.Waiting.Message))
747 }
748
749 if st.LastTerminationState.Terminated != nil {
750 errors = append(errors, toPodError(st.Name, st.Image, st.LastTerminationState.Terminated.Reason, st.LastTerminationState.Terminated.Message))
751 }
752 }
753 }
754 return errors
755 }
756
757 func getLabelSelector(req *pb.StatSummaryRequest) (labels.Selector, error) {
758 labelSelector := labels.Everything()
759 if s := req.GetSelector().GetLabelSelector(); s != "" {
760 var err error
761 labelSelector, err = labels.Parse(s)
762 if err != nil {
763 return nil, fmt.Errorf("invalid label selector %q: %w", s, err)
764 }
765 }
766 return labelSelector, nil
767 }
768
769 func dstFromAuthority(authority string) string {
770
771 labels := strings.Split(authority, ".")
772 if len(labels) >= 3 && labels[2] == "svc" {
773
774 return labels[0]
775 }
776 return authority
777 }
778
View as plain text