1 package srv
2
3 import (
4 "bytes"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "io"
9 "net/http"
10 "regexp"
11 "strings"
12 "time"
13
14 "github.com/gorilla/websocket"
15 "github.com/julienschmidt/httprouter"
16 "github.com/linkerd/linkerd2/pkg/healthcheck"
17 "github.com/linkerd/linkerd2/pkg/k8s"
18 "github.com/linkerd/linkerd2/pkg/protohttp"
19 metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
20 vizUtil "github.com/linkerd/linkerd2/viz/metrics-api/util"
21 tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
22 tappkg "github.com/linkerd/linkerd2/viz/tap/pkg"
23 log "github.com/sirupsen/logrus"
24 "google.golang.org/protobuf/encoding/protojson"
25 "google.golang.org/protobuf/proto"
26 kerrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "sigs.k8s.io/yaml"
29 )
30
31
32
33 const maxControlFrameMsgSize = 123
34
35 type (
36 jsonError struct {
37 Error string `json:"error"`
38 }
39 )
40
41 var (
42 defaultResourceType = k8s.Deployment
43 pbMarshaler = protojson.MarshalOptions{EmitUnpopulated: true}
44 maxMessageSize = 2048
45 websocketUpgrader = websocket.Upgrader{
46 ReadBufferSize: maxMessageSize,
47 WriteBufferSize: maxMessageSize,
48 }
49
50
51
52
53
54
55
56 excludedChecksRE = regexp.MustCompile(`(?i)cli|(?i)kubectl`)
57 )
58
59 func renderJSONError(w http.ResponseWriter, err error, status int) {
60 w.Header().Set("Content-Type", "application/json")
61 log.Error(err.Error())
62 rsp, _ := json.Marshal(jsonError{Error: err.Error()})
63 w.WriteHeader(status)
64 w.Write(rsp)
65 }
66
67 func renderJSON(w http.ResponseWriter, resp interface{}) {
68 w.Header().Set("Content-Type", "application/json")
69 jsonResp, err := json.Marshal(resp)
70 if err != nil {
71 renderJSONError(w, err, http.StatusInternalServerError)
72 return
73 }
74 w.Write(jsonResp)
75 }
76
77 func renderJSONPb(w http.ResponseWriter, msg proto.Message) {
78 w.Header().Set("Content-Type", "application/json")
79 json, err := pbMarshaler.Marshal(msg)
80 if err != nil {
81 renderJSONError(w, err, http.StatusBadRequest)
82 }
83 w.Write(json)
84 }
85
86 func renderJSONBytes(w http.ResponseWriter, b []byte) {
87 w.Header().Set("Content-Type", "application/json")
88 w.Write(b)
89 }
90
91 func (h *handler) handleAPIVersion(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
92 resp := map[string]interface{}{
93 "version": h.version,
94 }
95 renderJSON(w, resp)
96 }
97
98 func (h *handler) handleAPIPods(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
99 pods, err := h.apiClient.ListPods(req.Context(), &metricsPb.ListPodsRequest{
100 Selector: &metricsPb.ResourceSelection{
101 Resource: &metricsPb.Resource{
102 Namespace: req.FormValue("namespace"),
103 },
104 },
105 })
106
107 if err != nil {
108 renderJSONError(w, err, http.StatusInternalServerError)
109 return
110 }
111
112 renderJSONPb(w, pods)
113 }
114
115 func (h *handler) handleAPIServices(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
116 services, err := h.apiClient.ListServices(req.Context(), &metricsPb.ListServicesRequest{
117 Namespace: req.FormValue("namespace"),
118 })
119
120 if err != nil {
121 renderJSONError(w, err, http.StatusInternalServerError)
122 return
123 }
124
125 renderJSONPb(w, services)
126 }
127
128 func (h *handler) handleAPIStat(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
129
130 cachedResultJSON, ok := h.statCache.Get(req.URL.RawQuery)
131 if ok {
132
133 renderJSONBytes(w, cachedResultJSON.([]byte))
134 return
135 }
136
137 trueStr := fmt.Sprintf("%t", true)
138
139 requestParams := vizUtil.StatsSummaryRequestParams{
140 StatsBaseRequestParams: vizUtil.StatsBaseRequestParams{
141 TimeWindow: req.FormValue("window"),
142 ResourceName: req.FormValue("resource_name"),
143 ResourceType: req.FormValue("resource_type"),
144 Namespace: req.FormValue("namespace"),
145 AllNamespaces: req.FormValue("all_namespaces") == trueStr,
146 },
147 ToName: req.FormValue("to_name"),
148 ToType: req.FormValue("to_type"),
149 ToNamespace: req.FormValue("to_namespace"),
150 FromName: req.FormValue("from_name"),
151 FromType: req.FormValue("from_type"),
152 FromNamespace: req.FormValue("from_namespace"),
153 SkipStats: req.FormValue("skip_stats") == trueStr,
154 TCPStats: req.FormValue("tcp_stats") == trueStr,
155 }
156
157
158 if requestParams.ResourceType == "" {
159 requestParams.ResourceType = defaultResourceType
160 }
161
162 statRequest, err := vizUtil.BuildStatSummaryRequest(requestParams)
163 if err != nil {
164 renderJSONError(w, err, http.StatusInternalServerError)
165 return
166 }
167
168 result, err := h.apiClient.StatSummary(req.Context(), statRequest)
169 if err != nil {
170 renderJSONError(w, err, http.StatusInternalServerError)
171 return
172 }
173
174
175 json, err := pbMarshaler.Marshal(result)
176 if err != nil {
177 renderJSONError(w, err, http.StatusInternalServerError)
178 return
179 }
180 var resultJSON bytes.Buffer
181 resultJSON.Write(json)
182
183 h.statCache.SetDefault(req.URL.RawQuery, resultJSON.Bytes())
184
185 renderJSONBytes(w, resultJSON.Bytes())
186 }
187
188 func (h *handler) handleAPITopRoutes(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
189 requestParams := vizUtil.TopRoutesRequestParams{
190 StatsBaseRequestParams: vizUtil.StatsBaseRequestParams{
191 TimeWindow: req.FormValue("window"),
192 ResourceName: req.FormValue("resource_name"),
193 ResourceType: req.FormValue("resource_type"),
194 Namespace: req.FormValue("namespace"),
195 },
196 ToName: req.FormValue("to_name"),
197 ToType: req.FormValue("to_type"),
198 ToNamespace: req.FormValue("to_namespace"),
199 }
200
201 topReq, err := vizUtil.BuildTopRoutesRequest(requestParams)
202 if err != nil {
203 renderJSONError(w, err, http.StatusBadRequest)
204 return
205 }
206
207 result, err := h.apiClient.TopRoutes(req.Context(), topReq)
208 if err != nil {
209 renderJSONError(w, err, http.StatusInternalServerError)
210 return
211 }
212
213 renderJSONPb(w, result)
214 }
215
216
217
218
219
220 func validateControlFrameMsg(err error) string {
221 log.Debugf("tap error: %s", err.Error())
222
223 msg := err.Error()
224 if len(msg) > maxControlFrameMsgSize {
225 return msg[:maxControlFrameMsgSize]
226 }
227
228 return msg
229 }
230
231 func websocketError(ws *websocket.Conn, wsError int, err error) {
232 msg := validateControlFrameMsg(err)
233
234 err = ws.WriteControl(websocket.CloseMessage,
235 websocket.FormatCloseMessage(wsError, msg),
236 time.Time{})
237 if err != nil {
238 log.Errorf("Unexpected websocket error: %s", err)
239 }
240 }
241
242 func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
243 ws, err := websocketUpgrader.Upgrade(w, req, nil)
244 if err != nil {
245 renderJSONError(w, err, http.StatusInternalServerError)
246 return
247 }
248 defer ws.Close()
249
250 messageType, message, err := ws.ReadMessage()
251 if err != nil {
252 websocketError(ws, websocket.CloseInternalServerErr, err)
253 return
254 }
255
256 if messageType != websocket.TextMessage {
257 websocketError(ws, websocket.CloseUnsupportedData, errors.New("messageType not supported"))
258 return
259 }
260
261 var requestParams tappkg.TapRequestParams
262 err = json.Unmarshal(message, &requestParams)
263 if err != nil {
264 websocketError(ws, websocket.CloseInternalServerErr, err)
265 return
266 }
267
268 tapReq, err := tappkg.BuildTapByResourceRequest(requestParams)
269 if err != nil {
270 websocketError(ws, websocket.CloseInternalServerErr, err)
271 return
272 }
273
274 go func() {
275 reader, body, err := tappkg.Reader(req.Context(), h.k8sAPI, tapReq)
276 if err != nil {
277
278
279
280 var he protohttp.HTTPError
281 if errors.Is(err, &he) && he.Code == http.StatusForbidden {
282 err := fmt.Errorf("missing authorization, visit %s to remedy", tappkg.TapRbacURL)
283 websocketError(ws, websocket.ClosePolicyViolation, err)
284 return
285 }
286
287
288
289 websocketError(ws, websocket.CloseInternalServerErr, err)
290 return
291 }
292 defer body.Close()
293
294 for {
295 event := tapPb.TapEvent{}
296 err := protohttp.FromByteStreamToProtocolBuffers(reader, &event)
297 if err != nil {
298 if errors.Is(err, io.EOF) {
299 break
300 }
301 websocketError(ws, websocket.CloseInternalServerErr, err)
302 break
303 }
304
305 json, err := pbMarshaler.Marshal(&event)
306 if err != nil {
307 websocketError(ws, websocket.CloseUnsupportedData, err)
308 break
309 }
310 buf := new(bytes.Buffer)
311 buf.Write(json)
312
313 if err := ws.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
314 if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
315 log.Error(err)
316 }
317 break
318 }
319 }
320 }()
321
322 for {
323 _, _, err := ws.ReadMessage()
324 if err != nil {
325 log.Debugf("Received close frame: %v", err)
326 if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
327 log.Errorf("Unexpected close error: %s", err)
328 }
329 return
330 }
331 }
332 }
333
334 func (h *handler) handleAPIEdges(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
335 requestParams := vizUtil.EdgesRequestParams{
336 Namespace: req.FormValue("namespace"),
337 ResourceType: req.FormValue("resource_type"),
338 }
339
340 edgesRequest, err := vizUtil.BuildEdgesRequest(requestParams)
341 if err != nil {
342 renderJSONError(w, err, http.StatusInternalServerError)
343 return
344 }
345
346 result, err := h.apiClient.Edges(req.Context(), edgesRequest)
347 if err != nil {
348 renderJSONError(w, err, http.StatusInternalServerError)
349 return
350 }
351 renderJSONPb(w, result)
352 }
353
354 func (h *handler) handleAPICheck(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
355 type CheckResult struct {
356 *healthcheck.CheckResult
357 ErrMsg string `json:",omitempty"`
358 HintURL string `json:",omitempty"`
359 }
360
361 success := true
362 results := make(map[healthcheck.CategoryID][]*CheckResult)
363
364 collectResults := func(result *healthcheck.CheckResult) {
365 if result.Retry || excludedChecksRE.MatchString(result.Description) {
366 return
367 }
368 var errMsg, hintURL string
369 if result.Err != nil {
370 if !result.Warning {
371 success = false
372 }
373 errMsg = result.Err.Error()
374 hintURL = result.HintURL
375 }
376 results[result.Category] = append(results[result.Category], &CheckResult{
377 CheckResult: result,
378 ErrMsg: errMsg,
379 HintURL: hintURL,
380 })
381 }
382
383
384 _, _ = h.hc.RunChecks(collectResults)
385
386 renderJSON(w, map[string]interface{}{
387 "success": success,
388 "results": results,
389 })
390 }
391
392 func (h *handler) handleAPIResourceDefinition(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
393 var missingParams []string
394 requiredParams := []string{"namespace", "resource_type", "resource_name"}
395 for _, param := range requiredParams {
396 if req.FormValue(param) == "" {
397 missingParams = append(missingParams, param)
398 }
399 }
400 if len(missingParams) != 0 {
401 renderJSONError(w, fmt.Errorf("required params not provided: %s", strings.Join(missingParams, ", ")), http.StatusBadRequest)
402 return
403 }
404
405 namespace := req.FormValue("namespace")
406 resourceType := req.FormValue("resource_type")
407 resourceName := req.FormValue("resource_name")
408
409 var resource interface{}
410 var err error
411 options := metav1.GetOptions{}
412 switch resourceType {
413 case k8s.CronJob:
414 resource, err = h.k8sAPI.BatchV1beta1().CronJobs(namespace).Get(req.Context(), resourceName, options)
415 case k8s.DaemonSet:
416 resource, err = h.k8sAPI.AppsV1().DaemonSets(namespace).Get(req.Context(), resourceName, options)
417 case k8s.Deployment:
418 resource, err = h.k8sAPI.AppsV1().Deployments(namespace).Get(req.Context(), resourceName, options)
419 case k8s.Service:
420 resource, err = h.k8sAPI.CoreV1().Services(namespace).Get(req.Context(), resourceName, options)
421 case k8s.Job:
422 resource, err = h.k8sAPI.BatchV1().Jobs(namespace).Get(req.Context(), resourceName, options)
423 case k8s.Pod:
424 resource, err = h.k8sAPI.CoreV1().Pods(namespace).Get(req.Context(), resourceName, options)
425 case k8s.ReplicationController:
426 resource, err = h.k8sAPI.CoreV1().ReplicationControllers(namespace).Get(req.Context(), resourceName, options)
427 case k8s.ReplicaSet:
428 resource, err = h.k8sAPI.AppsV1().ReplicaSets(namespace).Get(req.Context(), resourceName, options)
429 default:
430 renderJSONError(w, errors.New("Invalid resource type: "+resourceType), http.StatusBadRequest)
431 return
432 }
433 if err != nil {
434 renderJSONError(w, err, http.StatusInternalServerError)
435 return
436 }
437
438 resourceDefinition, err := yaml.Marshal(resource)
439 if err != nil {
440 renderJSONError(w, err, http.StatusInternalServerError)
441 return
442 }
443 w.Header().Set("Content-Type", "text/yaml")
444 w.Write(resourceDefinition)
445 }
446
447 func (h *handler) handleGetExtensions(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
448 ctx := req.Context()
449 extensionName := req.FormValue("extension_name")
450
451 type Extension struct {
452 Name string `json:"name"`
453 UID string `json:"uid"`
454 Namespace string `json:"namespace"`
455 }
456
457 resp := map[string]interface{}{}
458 if extensionName != "" {
459 ns, err := h.k8sAPI.GetNamespaceWithExtensionLabel(ctx, extensionName)
460 if err != nil && kerrors.IsNotFound(err) {
461 renderJSON(w, resp)
462 return
463 } else if err != nil {
464 renderJSONError(w, err, http.StatusInternalServerError)
465 return
466 }
467
468 resp["data"] = Extension{
469 UID: string(ns.UID),
470 Name: ns.GetLabels()[k8s.LinkerdExtensionLabel],
471 Namespace: ns.Name,
472 }
473
474 renderJSON(w, resp)
475 return
476 }
477
478 installedExtensions, err := h.k8sAPI.GetAllNamespacesWithExtensionLabel(ctx)
479 if err != nil {
480 renderJSONError(w, err, http.StatusInternalServerError)
481 return
482 }
483
484 extensionList := make([]Extension, len(installedExtensions))
485
486 for i, installedExtension := range installedExtensions {
487 extensionList[i] = Extension{
488 UID: string(installedExtension.GetObjectMeta().GetUID()),
489 Name: installedExtension.GetLabels()[k8s.LinkerdExtensionLabel],
490 Namespace: installedExtension.GetName(),
491 }
492 }
493
494 resp["extensions"] = extensionList
495 renderJSON(w, resp)
496 }
497
498 func (h *handler) handleAPIGateways(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
499 window := req.FormValue("window")
500 if window == "" {
501 window = "1m"
502 }
503 _, err := time.ParseDuration(window)
504 if err != nil {
505 renderJSONError(w, err, http.StatusInternalServerError)
506 return
507 }
508 gatewayRequest := &metricsPb.GatewaysRequest{
509 TimeWindow: window,
510 GatewayNamespace: req.FormValue("gatewayNamespace"),
511 RemoteClusterName: req.FormValue("remoteClusterName"),
512 }
513 result, err := h.apiClient.Gateways(req.Context(), gatewayRequest)
514 if err != nil {
515 renderJSONError(w, err, http.StatusInternalServerError)
516 return
517 }
518 renderJSONPb(w, result)
519 }
520
View as plain text