1 package api
2
3 import (
4 "context"
5 "fmt"
6 "sort"
7
8 "github.com/linkerd/linkerd2/pkg/k8s"
9 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
10 "github.com/prometheus/common/model"
11 log "github.com/sirupsen/logrus"
12 v1 "k8s.io/api/core/v1"
13 )
14
15 const (
16 edgesQuery = "sum(%s%s) by (%s, dst_%s, pod, server_id, namespace, dst_namespace, no_tls_reason)"
17 )
18
19 var formatMsg = map[string]string{
20 "disabled": "Disabled",
21 "loopback": "Loopback",
22 "no_authority_in_http_request": "No Authority In HTTP Request",
23 "not_http": "Not HTTP",
24 "not_provided_by_remote": "Not Provided By Remote",
25 "not_provided_by_service_discovery": "Not Provided By Service Discovery",
26 }
27
28 type edgeKey struct {
29 src string
30 srcNs string
31 dst string
32 dstNs string
33 }
34
35 func (s *grpcServer) Edges(ctx context.Context, req *pb.EdgesRequest) (*pb.EdgesResponse, error) {
36 log.Debugf("Edges request: %+v", req)
37 if req.GetSelector().GetResource() == nil {
38 return edgesError(req, "Edges request missing Selector Resource"), nil
39 }
40
41 resourceType := promResourceType(req.GetSelector().GetResource())
42 dstResourceType := "dst_" + resourceType
43 labelsOutbound := promDirectionLabels("outbound")
44 labelsOutboundStr := generateLabelStringWithExclusion(labelsOutbound, string(resourceType), string(dstResourceType))
45 query := fmt.Sprintf(edgesQuery, "tcp_open_connections", labelsOutboundStr, resourceType, resourceType)
46
47 promResult, err := s.queryProm(ctx, query)
48 if err != nil {
49 return edgesError(req, err.Error()), nil
50 }
51
52 edgeMap := make(map[edgeKey]*pb.Edge)
53
54 for _, sample := range promResult {
55 if sample.Value == 0.0 {
56 continue
57 }
58 key := edgeKey{
59 src: string(sample.Metric[resourceType]),
60 srcNs: string(sample.Metric[model.LabelName("namespace")]),
61 dst: string(sample.Metric[dstResourceType]),
62 dstNs: string(sample.Metric[model.LabelName("dst_namespace")]),
63 }
64 requestedNs := req.GetSelector().GetResource().GetNamespace()
65 if requestedNs != v1.NamespaceAll {
66 if requestedNs != key.srcNs && requestedNs != key.dstNs {
67 continue
68 }
69 }
70 if _, ok := edgeMap[key]; !ok {
71
72 clientID, err := s.getPodIdentity(string(sample.Metric[model.LabelName("pod")]), key.srcNs)
73 if err != nil {
74 log.Warnf("failed to get pod identity for %s: %v", sample.Metric[model.LabelName("pod")], err)
75 continue
76 }
77
78 edgeMap[key] = &pb.Edge{
79 Src: &pb.Resource{
80 Namespace: key.srcNs,
81 Name: key.src,
82 Type: string(resourceType),
83 },
84 Dst: &pb.Resource{
85 Namespace: key.dstNs,
86 Name: key.dst,
87 Type: string(resourceType),
88 },
89 ServerId: string(sample.Metric[model.LabelName("server_id")]),
90 ClientId: clientID,
91 NoIdentityMsg: formatMsg[string(sample.Metric[model.LabelName("no_tls_reason")])],
92 }
93 }
94 }
95
96 edges := []*pb.Edge{}
97 for _, edge := range edgeMap {
98 edges = append(edges, edge)
99 }
100 edges = sortEdgeRows(edges)
101
102 return &pb.EdgesResponse{
103 Response: &pb.EdgesResponse_Ok_{
104 Ok: &pb.EdgesResponse_Ok{
105 Edges: edges,
106 },
107 },
108 }, nil
109 }
110
111 func edgesError(req *pb.EdgesRequest, message string) *pb.EdgesResponse {
112 return &pb.EdgesResponse{
113 Response: &pb.EdgesResponse_Error{
114 Error: &pb.ResourceError{
115 Resource: req.GetSelector().GetResource(),
116 Error: message,
117 },
118 },
119 }
120 }
121
122 func (s *grpcServer) getPodIdentity(pod string, namespace string) (string, error) {
123 po, err := s.k8sAPI.Pod().Lister().Pods(namespace).Get(pod)
124 if err != nil {
125 return "", err
126 }
127 return k8s.PodIdentity(po)
128 }
129
130 func sortEdgeRows(rows []*pb.Edge) []*pb.Edge {
131 sort.Slice(rows, func(i, j int) bool {
132 keyI := rows[i].GetSrc().GetNamespace() + rows[i].GetDst().GetNamespace() + rows[i].GetSrc().GetName() + rows[i].GetDst().GetName()
133 keyJ := rows[j].GetSrc().GetNamespace() + rows[j].GetDst().GetNamespace() + rows[j].GetSrc().GetName() + rows[j].GetDst().GetName()
134 return keyI < keyJ
135 })
136 return rows
137 }
138
View as plain text