...

Source file src/github.com/linkerd/linkerd2/web/srv/api_handlers.go

Documentation: github.com/linkerd/linkerd2/web/srv

     1  package srv
     2  
     3  import (
     4  	"bytes"
     5  	"encoding/json"
     6  	"errors"
     7  	"fmt"
     8  	"io"
     9  	"net/http"
    10  	"regexp"
    11  	"strings"
    12  	"time"
    13  
    14  	"github.com/gorilla/websocket"
    15  	"github.com/julienschmidt/httprouter"
    16  	"github.com/linkerd/linkerd2/pkg/healthcheck"
    17  	"github.com/linkerd/linkerd2/pkg/k8s"
    18  	"github.com/linkerd/linkerd2/pkg/protohttp"
    19  	metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    20  	vizUtil "github.com/linkerd/linkerd2/viz/metrics-api/util"
    21  	tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
    22  	tappkg "github.com/linkerd/linkerd2/viz/tap/pkg"
    23  	log "github.com/sirupsen/logrus"
    24  	"google.golang.org/protobuf/encoding/protojson"
    25  	"google.golang.org/protobuf/proto"
    26  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"sigs.k8s.io/yaml"
    29  )
    30  
    31  // Control Frame payload size can be no bigger than 125 bytes. 2 bytes are
    32  // reserved for the status code when formatting the message.
    33  const maxControlFrameMsgSize = 123
    34  
    35  type (
    36  	jsonError struct {
    37  		Error string `json:"error"`
    38  	}
    39  )
    40  
    41  var (
    42  	defaultResourceType = k8s.Deployment
    43  	pbMarshaler         = protojson.MarshalOptions{EmitUnpopulated: true}
    44  	maxMessageSize      = 2048
    45  	websocketUpgrader   = websocket.Upgrader{
    46  		ReadBufferSize:  maxMessageSize,
    47  		WriteBufferSize: maxMessageSize,
    48  	}
    49  
    50  	// Checks whose description matches the following regexp won't be included
    51  	// in the handleApiCheck output. In the context of the dashboard, some
    52  	// checks like cli or kubectl versions ones may not be relevant.
    53  	//
    54  	// TODO(tegioz): use more reliable way to identify the checks that should
    55  	// not be displayed in the dashboard (hint anchor is not unique).
    56  	excludedChecksRE = regexp.MustCompile(`(?i)cli|(?i)kubectl`)
    57  )
    58  
    59  func renderJSONError(w http.ResponseWriter, err error, status int) {
    60  	w.Header().Set("Content-Type", "application/json")
    61  	log.Error(err.Error())
    62  	rsp, _ := json.Marshal(jsonError{Error: err.Error()})
    63  	w.WriteHeader(status)
    64  	w.Write(rsp)
    65  }
    66  
    67  func renderJSON(w http.ResponseWriter, resp interface{}) {
    68  	w.Header().Set("Content-Type", "application/json")
    69  	jsonResp, err := json.Marshal(resp)
    70  	if err != nil {
    71  		renderJSONError(w, err, http.StatusInternalServerError)
    72  		return
    73  	}
    74  	w.Write(jsonResp)
    75  }
    76  
    77  func renderJSONPb(w http.ResponseWriter, msg proto.Message) {
    78  	w.Header().Set("Content-Type", "application/json")
    79  	json, err := pbMarshaler.Marshal(msg)
    80  	if err != nil {
    81  		renderJSONError(w, err, http.StatusBadRequest)
    82  	}
    83  	w.Write(json)
    84  }
    85  
    86  func renderJSONBytes(w http.ResponseWriter, b []byte) {
    87  	w.Header().Set("Content-Type", "application/json")
    88  	w.Write(b)
    89  }
    90  
    91  func (h *handler) handleAPIVersion(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
    92  	resp := map[string]interface{}{
    93  		"version": h.version,
    94  	}
    95  	renderJSON(w, resp)
    96  }
    97  
    98  func (h *handler) handleAPIPods(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
    99  	pods, err := h.apiClient.ListPods(req.Context(), &metricsPb.ListPodsRequest{
   100  		Selector: &metricsPb.ResourceSelection{
   101  			Resource: &metricsPb.Resource{
   102  				Namespace: req.FormValue("namespace"),
   103  			},
   104  		},
   105  	})
   106  
   107  	if err != nil {
   108  		renderJSONError(w, err, http.StatusInternalServerError)
   109  		return
   110  	}
   111  
   112  	renderJSONPb(w, pods)
   113  }
   114  
   115  func (h *handler) handleAPIServices(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   116  	services, err := h.apiClient.ListServices(req.Context(), &metricsPb.ListServicesRequest{
   117  		Namespace: req.FormValue("namespace"),
   118  	})
   119  
   120  	if err != nil {
   121  		renderJSONError(w, err, http.StatusInternalServerError)
   122  		return
   123  	}
   124  
   125  	renderJSONPb(w, services)
   126  }
   127  
   128  func (h *handler) handleAPIStat(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   129  	// Try to get stat summary from cache using the query as key
   130  	cachedResultJSON, ok := h.statCache.Get(req.URL.RawQuery)
   131  	if ok {
   132  		// Cache hit, render cached json result
   133  		renderJSONBytes(w, cachedResultJSON.([]byte))
   134  		return
   135  	}
   136  
   137  	trueStr := fmt.Sprintf("%t", true)
   138  
   139  	requestParams := vizUtil.StatsSummaryRequestParams{
   140  		StatsBaseRequestParams: vizUtil.StatsBaseRequestParams{
   141  			TimeWindow:    req.FormValue("window"),
   142  			ResourceName:  req.FormValue("resource_name"),
   143  			ResourceType:  req.FormValue("resource_type"),
   144  			Namespace:     req.FormValue("namespace"),
   145  			AllNamespaces: req.FormValue("all_namespaces") == trueStr,
   146  		},
   147  		ToName:        req.FormValue("to_name"),
   148  		ToType:        req.FormValue("to_type"),
   149  		ToNamespace:   req.FormValue("to_namespace"),
   150  		FromName:      req.FormValue("from_name"),
   151  		FromType:      req.FormValue("from_type"),
   152  		FromNamespace: req.FormValue("from_namespace"),
   153  		SkipStats:     req.FormValue("skip_stats") == trueStr,
   154  		TCPStats:      req.FormValue("tcp_stats") == trueStr,
   155  	}
   156  
   157  	// default to returning deployment stats
   158  	if requestParams.ResourceType == "" {
   159  		requestParams.ResourceType = defaultResourceType
   160  	}
   161  
   162  	statRequest, err := vizUtil.BuildStatSummaryRequest(requestParams)
   163  	if err != nil {
   164  		renderJSONError(w, err, http.StatusInternalServerError)
   165  		return
   166  	}
   167  
   168  	result, err := h.apiClient.StatSummary(req.Context(), statRequest)
   169  	if err != nil {
   170  		renderJSONError(w, err, http.StatusInternalServerError)
   171  		return
   172  	}
   173  
   174  	// Marshal result into json and cache it
   175  	json, err := pbMarshaler.Marshal(result)
   176  	if err != nil {
   177  		renderJSONError(w, err, http.StatusInternalServerError)
   178  		return
   179  	}
   180  	var resultJSON bytes.Buffer
   181  	resultJSON.Write(json)
   182  
   183  	h.statCache.SetDefault(req.URL.RawQuery, resultJSON.Bytes())
   184  
   185  	renderJSONBytes(w, resultJSON.Bytes())
   186  }
   187  
   188  func (h *handler) handleAPITopRoutes(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   189  	requestParams := vizUtil.TopRoutesRequestParams{
   190  		StatsBaseRequestParams: vizUtil.StatsBaseRequestParams{
   191  			TimeWindow:   req.FormValue("window"),
   192  			ResourceName: req.FormValue("resource_name"),
   193  			ResourceType: req.FormValue("resource_type"),
   194  			Namespace:    req.FormValue("namespace"),
   195  		},
   196  		ToName:      req.FormValue("to_name"),
   197  		ToType:      req.FormValue("to_type"),
   198  		ToNamespace: req.FormValue("to_namespace"),
   199  	}
   200  
   201  	topReq, err := vizUtil.BuildTopRoutesRequest(requestParams)
   202  	if err != nil {
   203  		renderJSONError(w, err, http.StatusBadRequest)
   204  		return
   205  	}
   206  
   207  	result, err := h.apiClient.TopRoutes(req.Context(), topReq)
   208  	if err != nil {
   209  		renderJSONError(w, err, http.StatusInternalServerError)
   210  		return
   211  	}
   212  
   213  	renderJSONPb(w, result)
   214  }
   215  
   216  // Control frame payload size must be no longer than `maxControlFrameMsgSize`
   217  // bytes. In the case of an unexpected HTTP status code or unexpected error,
   218  // truncate the message after `maxControlFrameMsgSize` bytes so that the web
   219  // socket message is properly written.
   220  func validateControlFrameMsg(err error) string {
   221  	log.Debugf("tap error: %s", err.Error())
   222  
   223  	msg := err.Error()
   224  	if len(msg) > maxControlFrameMsgSize {
   225  		return msg[:maxControlFrameMsgSize]
   226  	}
   227  
   228  	return msg
   229  }
   230  
   231  func websocketError(ws *websocket.Conn, wsError int, err error) {
   232  	msg := validateControlFrameMsg(err)
   233  
   234  	err = ws.WriteControl(websocket.CloseMessage,
   235  		websocket.FormatCloseMessage(wsError, msg),
   236  		time.Time{})
   237  	if err != nil {
   238  		log.Errorf("Unexpected websocket error: %s", err)
   239  	}
   240  }
   241  
   242  func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   243  	ws, err := websocketUpgrader.Upgrade(w, req, nil)
   244  	if err != nil {
   245  		renderJSONError(w, err, http.StatusInternalServerError)
   246  		return
   247  	}
   248  	defer ws.Close()
   249  
   250  	messageType, message, err := ws.ReadMessage()
   251  	if err != nil {
   252  		websocketError(ws, websocket.CloseInternalServerErr, err)
   253  		return
   254  	}
   255  
   256  	if messageType != websocket.TextMessage {
   257  		websocketError(ws, websocket.CloseUnsupportedData, errors.New("messageType not supported"))
   258  		return
   259  	}
   260  
   261  	var requestParams tappkg.TapRequestParams
   262  	err = json.Unmarshal(message, &requestParams)
   263  	if err != nil {
   264  		websocketError(ws, websocket.CloseInternalServerErr, err)
   265  		return
   266  	}
   267  
   268  	tapReq, err := tappkg.BuildTapByResourceRequest(requestParams)
   269  	if err != nil {
   270  		websocketError(ws, websocket.CloseInternalServerErr, err)
   271  		return
   272  	}
   273  
   274  	go func() {
   275  		reader, body, err := tappkg.Reader(req.Context(), h.k8sAPI, tapReq)
   276  		if err != nil {
   277  			// If there was a [403] error when initiating a tap, close the
   278  			// socket with `ClosePolicyViolation` status code so that the error
   279  			// renders without the error prefix in the banner
   280  			var he protohttp.HTTPError
   281  			if errors.Is(err, &he) && he.Code == http.StatusForbidden {
   282  				err := fmt.Errorf("missing authorization, visit %s to remedy", tappkg.TapRbacURL)
   283  				websocketError(ws, websocket.ClosePolicyViolation, err)
   284  				return
   285  			}
   286  
   287  			// All other errors from initiating a tap should close with
   288  			// `CloseInternalServerErr` status code
   289  			websocketError(ws, websocket.CloseInternalServerErr, err)
   290  			return
   291  		}
   292  		defer body.Close()
   293  
   294  		for {
   295  			event := tapPb.TapEvent{}
   296  			err := protohttp.FromByteStreamToProtocolBuffers(reader, &event)
   297  			if err != nil {
   298  				if errors.Is(err, io.EOF) {
   299  					break
   300  				}
   301  				websocketError(ws, websocket.CloseInternalServerErr, err)
   302  				break
   303  			}
   304  
   305  			json, err := pbMarshaler.Marshal(&event)
   306  			if err != nil {
   307  				websocketError(ws, websocket.CloseUnsupportedData, err)
   308  				break
   309  			}
   310  			buf := new(bytes.Buffer)
   311  			buf.Write(json)
   312  
   313  			if err := ws.WriteMessage(websocket.TextMessage, buf.Bytes()); err != nil {
   314  				if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
   315  					log.Error(err)
   316  				}
   317  				break
   318  			}
   319  		}
   320  	}()
   321  
   322  	for {
   323  		_, _, err := ws.ReadMessage()
   324  		if err != nil {
   325  			log.Debugf("Received close frame: %v", err)
   326  			if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
   327  				log.Errorf("Unexpected close error: %s", err)
   328  			}
   329  			return
   330  		}
   331  	}
   332  }
   333  
   334  func (h *handler) handleAPIEdges(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   335  	requestParams := vizUtil.EdgesRequestParams{
   336  		Namespace:    req.FormValue("namespace"),
   337  		ResourceType: req.FormValue("resource_type"),
   338  	}
   339  
   340  	edgesRequest, err := vizUtil.BuildEdgesRequest(requestParams)
   341  	if err != nil {
   342  		renderJSONError(w, err, http.StatusInternalServerError)
   343  		return
   344  	}
   345  
   346  	result, err := h.apiClient.Edges(req.Context(), edgesRequest)
   347  	if err != nil {
   348  		renderJSONError(w, err, http.StatusInternalServerError)
   349  		return
   350  	}
   351  	renderJSONPb(w, result)
   352  }
   353  
   354  func (h *handler) handleAPICheck(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   355  	type CheckResult struct {
   356  		*healthcheck.CheckResult
   357  		ErrMsg  string `json:",omitempty"`
   358  		HintURL string `json:",omitempty"`
   359  	}
   360  
   361  	success := true
   362  	results := make(map[healthcheck.CategoryID][]*CheckResult)
   363  
   364  	collectResults := func(result *healthcheck.CheckResult) {
   365  		if result.Retry || excludedChecksRE.MatchString(result.Description) {
   366  			return
   367  		}
   368  		var errMsg, hintURL string
   369  		if result.Err != nil {
   370  			if !result.Warning {
   371  				success = false
   372  			}
   373  			errMsg = result.Err.Error()
   374  			hintURL = result.HintURL
   375  		}
   376  		results[result.Category] = append(results[result.Category], &CheckResult{
   377  			CheckResult: result,
   378  			ErrMsg:      errMsg,
   379  			HintURL:     hintURL,
   380  		})
   381  	}
   382  	// TODO (tegioz): ignore runchecks results until we stop filtering checks
   383  	// in this method (see #3670 for more details)
   384  	_, _ = h.hc.RunChecks(collectResults)
   385  
   386  	renderJSON(w, map[string]interface{}{
   387  		"success": success,
   388  		"results": results,
   389  	})
   390  }
   391  
   392  func (h *handler) handleAPIResourceDefinition(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
   393  	var missingParams []string
   394  	requiredParams := []string{"namespace", "resource_type", "resource_name"}
   395  	for _, param := range requiredParams {
   396  		if req.FormValue(param) == "" {
   397  			missingParams = append(missingParams, param)
   398  		}
   399  	}
   400  	if len(missingParams) != 0 {
   401  		renderJSONError(w, fmt.Errorf("required params not provided: %s", strings.Join(missingParams, ", ")), http.StatusBadRequest)
   402  		return
   403  	}
   404  
   405  	namespace := req.FormValue("namespace")
   406  	resourceType := req.FormValue("resource_type")
   407  	resourceName := req.FormValue("resource_name")
   408  
   409  	var resource interface{}
   410  	var err error
   411  	options := metav1.GetOptions{}
   412  	switch resourceType {
   413  	case k8s.CronJob:
   414  		resource, err = h.k8sAPI.BatchV1beta1().CronJobs(namespace).Get(req.Context(), resourceName, options)
   415  	case k8s.DaemonSet:
   416  		resource, err = h.k8sAPI.AppsV1().DaemonSets(namespace).Get(req.Context(), resourceName, options)
   417  	case k8s.Deployment:
   418  		resource, err = h.k8sAPI.AppsV1().Deployments(namespace).Get(req.Context(), resourceName, options)
   419  	case k8s.Service:
   420  		resource, err = h.k8sAPI.CoreV1().Services(namespace).Get(req.Context(), resourceName, options)
   421  	case k8s.Job:
   422  		resource, err = h.k8sAPI.BatchV1().Jobs(namespace).Get(req.Context(), resourceName, options)
   423  	case k8s.Pod:
   424  		resource, err = h.k8sAPI.CoreV1().Pods(namespace).Get(req.Context(), resourceName, options)
   425  	case k8s.ReplicationController:
   426  		resource, err = h.k8sAPI.CoreV1().ReplicationControllers(namespace).Get(req.Context(), resourceName, options)
   427  	case k8s.ReplicaSet:
   428  		resource, err = h.k8sAPI.AppsV1().ReplicaSets(namespace).Get(req.Context(), resourceName, options)
   429  	default:
   430  		renderJSONError(w, errors.New("Invalid resource type: "+resourceType), http.StatusBadRequest)
   431  		return
   432  	}
   433  	if err != nil {
   434  		renderJSONError(w, err, http.StatusInternalServerError)
   435  		return
   436  	}
   437  
   438  	resourceDefinition, err := yaml.Marshal(resource)
   439  	if err != nil {
   440  		renderJSONError(w, err, http.StatusInternalServerError)
   441  		return
   442  	}
   443  	w.Header().Set("Content-Type", "text/yaml")
   444  	w.Write(resourceDefinition)
   445  }
   446  
   447  func (h *handler) handleGetExtensions(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
   448  	ctx := req.Context()
   449  	extensionName := req.FormValue("extension_name")
   450  
   451  	type Extension struct {
   452  		Name      string `json:"name"`
   453  		UID       string `json:"uid"`
   454  		Namespace string `json:"namespace"`
   455  	}
   456  
   457  	resp := map[string]interface{}{}
   458  	if extensionName != "" {
   459  		ns, err := h.k8sAPI.GetNamespaceWithExtensionLabel(ctx, extensionName)
   460  		if err != nil && kerrors.IsNotFound(err) {
   461  			renderJSON(w, resp)
   462  			return
   463  		} else if err != nil {
   464  			renderJSONError(w, err, http.StatusInternalServerError)
   465  			return
   466  		}
   467  
   468  		resp["data"] = Extension{
   469  			UID:       string(ns.UID),
   470  			Name:      ns.GetLabels()[k8s.LinkerdExtensionLabel],
   471  			Namespace: ns.Name,
   472  		}
   473  
   474  		renderJSON(w, resp)
   475  		return
   476  	}
   477  
   478  	installedExtensions, err := h.k8sAPI.GetAllNamespacesWithExtensionLabel(ctx)
   479  	if err != nil {
   480  		renderJSONError(w, err, http.StatusInternalServerError)
   481  		return
   482  	}
   483  
   484  	extensionList := make([]Extension, len(installedExtensions))
   485  
   486  	for i, installedExtension := range installedExtensions {
   487  		extensionList[i] = Extension{
   488  			UID:       string(installedExtension.GetObjectMeta().GetUID()),
   489  			Name:      installedExtension.GetLabels()[k8s.LinkerdExtensionLabel],
   490  			Namespace: installedExtension.GetName(),
   491  		}
   492  	}
   493  
   494  	resp["extensions"] = extensionList
   495  	renderJSON(w, resp)
   496  }
   497  
   498  func (h *handler) handleAPIGateways(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
   499  	window := req.FormValue("window")
   500  	if window == "" {
   501  		window = "1m"
   502  	}
   503  	_, err := time.ParseDuration(window)
   504  	if err != nil {
   505  		renderJSONError(w, err, http.StatusInternalServerError)
   506  		return
   507  	}
   508  	gatewayRequest := &metricsPb.GatewaysRequest{
   509  		TimeWindow:        window,
   510  		GatewayNamespace:  req.FormValue("gatewayNamespace"),
   511  		RemoteClusterName: req.FormValue("remoteClusterName"),
   512  	}
   513  	result, err := h.apiClient.Gateways(req.Context(), gatewayRequest)
   514  	if err != nil {
   515  		renderJSONError(w, err, http.StatusInternalServerError)
   516  		return
   517  	}
   518  	renderJSONPb(w, result)
   519  }
   520  

View as plain text