1 package api
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7
8 "github.com/linkerd/linkerd2/pkg/k8s"
9 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
10 "github.com/prometheus/common/model"
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13 "k8s.io/apimachinery/pkg/runtime/schema"
14 )
15
16 const (
17 httpAuthzDenyQuery = "sum(increase(inbound_http_authz_deny_total%s[%s])) by (%s)"
18 httpAuthzAllowQuery = "sum(increase(inbound_http_authz_allow_total%s[%s])) by (%s)"
19 )
20
21 func isPolicyResource(resource *pb.Resource) bool {
22 if resource != nil {
23 if resource.GetType() == k8s.Server ||
24 resource.GetType() == k8s.ServerAuthorization ||
25 resource.GetType() == k8s.AuthorizationPolicy ||
26 resource.GetType() == k8s.HTTPRoute {
27 return true
28 }
29 }
30 return false
31 }
32
33 func (s *grpcServer) policyResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
34
35 policyResources, err := s.getPolicyResourceKeys(req)
36 if err != nil {
37 return resourceResult{res: nil, err: err}
38 }
39
40 var requestMetrics map[rKey]*pb.BasicStats
41 var tcpMetrics map[rKey]*pb.TcpStats
42 var authzMetrics map[rKey]*pb.ServerStats
43 if !req.SkipStats {
44 requestMetrics, tcpMetrics, authzMetrics, err = s.getPolicyMetrics(ctx, req, req.TimeWindow)
45 if err != nil {
46 return resourceResult{res: nil, err: err}
47 }
48 }
49
50 rows := make([]*pb.StatTable_PodGroup_Row, 0)
51 for _, key := range policyResources {
52 row := pb.StatTable_PodGroup_Row{
53 Resource: &pb.Resource{
54 Name: key.Name,
55 Namespace: key.Namespace,
56 Type: req.GetSelector().GetResource().GetType(),
57 },
58 TimeWindow: req.TimeWindow,
59 Stats: requestMetrics[key],
60 TcpStats: tcpMetrics[key],
61 SrvStats: authzMetrics[key],
62 }
63
64 rows = append(rows, &row)
65 }
66
67 rsp := pb.StatTable{
68 Table: &pb.StatTable_PodGroup_{
69 PodGroup: &pb.StatTable_PodGroup{
70 Rows: rows,
71 },
72 },
73 }
74 return resourceResult{res: &rsp, err: nil}
75 }
76
77 func (s *grpcServer) getPolicyResourceKeys(req *pb.StatSummaryRequest) ([]rKey, error) {
78 var err error
79 var unstructuredResources *unstructured.UnstructuredList
80
81
82 var gvr schema.GroupVersionResource
83 if req.GetSelector().Resource.GetType() == k8s.Server {
84 gvr = k8s.ServerGVR
85 } else if req.GetSelector().Resource.GetType() == k8s.ServerAuthorization {
86 gvr = k8s.SazGVR
87 } else if req.GetSelector().Resource.GetType() == k8s.AuthorizationPolicy {
88 gvr = k8s.AuthorizationPolicyGVR
89 } else if req.GetSelector().Resource.GetType() == k8s.HTTPRoute {
90 gvr = k8s.HTTPRouteGVR
91 }
92
93 res := req.GetSelector().GetResource()
94 labelSelector, err := getLabelSelector(req)
95 if err != nil {
96 return nil, err
97 }
98
99 if res.GetNamespace() == "" {
100 unstructuredResources, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace("").
101 List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
102 } else if res.GetName() == "" {
103 unstructuredResources, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace(res.GetNamespace()).
104 List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
105 } else {
106 var ts *unstructured.Unstructured
107 ts, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace(res.GetNamespace()).
108 Get(context.TODO(), res.GetName(), metav1.GetOptions{})
109 if err != nil {
110 return nil, err
111 }
112 unstructuredResources = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{*ts}}
113 }
114 if err != nil {
115 return nil, err
116 }
117
118 var resourceKeys []rKey
119 for _, resource := range unstructuredResources.Items {
120
121 resourceKeys = append(resourceKeys, rKey{
122 Namespace: resource.GetNamespace(),
123
124 Type: strings.ToLower(resource.GetKind()[0:len(resource.GetKind())]),
125 Name: resource.GetName(),
126 })
127 }
128 return resourceKeys, nil
129 }
130
131 func (s *grpcServer) getPolicyMetrics(
132 ctx context.Context,
133 req *pb.StatSummaryRequest,
134 timeWindow string,
135 ) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, map[rKey]*pb.ServerStats, error) {
136 labels, groupBy := buildServerRequestLabels(req)
137
138 reqLabels := labels.Merge(model.LabelSet{
139 "direction": model.LabelValue("inbound"),
140 })
141
142 promQueries := make(map[promType]string)
143 if req.GetSelector().GetResource().GetType() == k8s.Server {
144
145 if req.TcpStats {
146
147 tcpLabels := reqLabels.Merge(promPeerLabel("src"))
148 promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, tcpLabels.String(), groupBy.String())
149 promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabels.String(), timeWindow, groupBy.String())
150 promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabels.String(), timeWindow, groupBy.String())
151 }
152 }
153
154 promQueries[promRequests] = fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String())
155
156 promQueries[promAllowedRequests] = fmt.Sprintf(httpAuthzAllowQuery, labels, timeWindow, groupBy.String())
157 promQueries[promDeniedRequests] = fmt.Sprintf(httpAuthzDenyQuery, labels, timeWindow, groupBy.String())
158 quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
159 results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
160 if err != nil {
161 return nil, nil, nil, err
162 }
163
164 basicStats, tcpStats, authzStats := processPrometheusMetrics(req, results, groupBy)
165 return basicStats, tcpStats, authzStats, nil
166 }
167
168 func buildServerRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
169 if req.GetSelector().GetResource().GetNamespace() != "" {
170 labels = labels.Merge(model.LabelSet{
171 namespaceLabel: model.LabelValue(req.GetSelector().GetResource().GetNamespace()),
172 })
173 }
174 var groupBy model.LabelNames
175 if req.GetSelector().GetResource().GetType() == k8s.Server {
176
177 groupBy = model.LabelNames{serverKindLabel, namespaceLabel, serverNameLabel}
178 labels = labels.Merge(model.LabelSet{
179 serverKindLabel: model.LabelValue("server"),
180 })
181 if req.GetSelector().GetResource().GetName() != "" {
182 labels = labels.Merge(model.LabelSet{
183 serverNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
184 })
185 }
186 } else if req.GetSelector().GetResource().GetType() == k8s.ServerAuthorization {
187
188 groupBy = model.LabelNames{namespaceLabel, authorizationNameLabel}
189 labels = labels.Merge(model.LabelSet{
190 authorizationKindLabel: model.LabelValue("serverauthorization"),
191 })
192 if req.GetSelector().GetResource().GetName() != "" {
193 labels = labels.Merge(model.LabelSet{
194 authorizationNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
195 })
196 }
197 } else if req.GetSelector().GetResource().GetType() == k8s.AuthorizationPolicy {
198
199 groupBy = model.LabelNames{namespaceLabel, authorizationNameLabel}
200 labels = labels.Merge(model.LabelSet{
201 authorizationKindLabel: model.LabelValue("authorizationpolicy"),
202 })
203 if req.GetSelector().GetResource().GetName() != "" {
204 labels = labels.Merge(model.LabelSet{
205 authorizationNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
206 })
207 }
208 } else if req.GetSelector().GetResource().GetType() == k8s.HTTPRoute {
209
210 groupBy = model.LabelNames{serverNameLabel, serverKindLabel, routeNameLabel, routeKindLabel, namespaceLabel, routeNameLabel}
211 if req.GetSelector().GetResource().GetName() != "" {
212 labels = labels.Merge(model.LabelSet{
213 routeNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
214 })
215 }
216 }
217
218 switch out := req.Outbound.(type) {
219 case *pb.StatSummaryRequest_ToResource:
220
221
222
223 labels = labels.Merge(promQueryLabels(out.ToResource))
224
225
226 default:
227
228 }
229
230 return labels, groupBy
231 }
232
View as plain text