...

Source file src/github.com/linkerd/linkerd2/viz/tap/api/handlers.go

Documentation: github.com/linkerd/linkerd2/viz/tap/api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"net/http"
     8  	"strings"
     9  
    10  	"github.com/go-openapi/spec"
    11  	"github.com/julienschmidt/httprouter"
    12  	"github.com/linkerd/linkerd2/controller/k8s"
    13  	pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
    14  	"github.com/linkerd/linkerd2/pkg/protohttp"
    15  	pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
    16  	"github.com/linkerd/linkerd2/viz/tap/pkg"
    17  	"github.com/prometheus/client_golang/prometheus/promhttp"
    18  	"github.com/sirupsen/logrus"
    19  	"google.golang.org/grpc/metadata"
    20  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    21  	"k8s.io/apimachinery/pkg/runtime/schema"
    22  	"k8s.io/apimachinery/pkg/version"
    23  )
    24  
    25  type handler struct {
    26  	k8sAPI         *k8s.API
    27  	usernameHeader string
    28  	groupHeader    string
    29  	grpcTapServer  pb.TapServer
    30  	log            *logrus.Entry
    31  }
    32  
    33  // TODO: share with api_handlers.go
    34  type jsonError struct {
    35  	Error string `json:"error"`
    36  }
    37  
    38  var (
    39  	gvk = &schema.GroupVersionKind{
    40  		Group:   "tap.linkerd.io",
    41  		Version: "v1alpha1",
    42  		Kind:    "Tap",
    43  	}
    44  
    45  	gvfd = metav1.GroupVersionForDiscovery{
    46  		GroupVersion: gvk.GroupVersion().String(),
    47  		Version:      gvk.Version,
    48  	}
    49  
    50  	apiGroup = metav1.APIGroup{
    51  		Name:             gvk.Group,
    52  		Versions:         []metav1.GroupVersionForDiscovery{gvfd},
    53  		PreferredVersion: gvfd,
    54  	}
    55  
    56  	resources = []struct {
    57  		name       string
    58  		namespaced bool
    59  	}{
    60  		{"namespaces", false},
    61  		{"pods", true},
    62  		{"replicationcontrollers", true},
    63  		{"services", true},
    64  		{"daemonsets", true},
    65  		{"deployments", true},
    66  		{"replicasets", true},
    67  		{"statefulsets", true},
    68  		{"jobs", true},
    69  		{"cronjobs", true},
    70  	}
    71  )
    72  
    73  func initRouter(h *handler) *httprouter.Router {
    74  	router := &httprouter.Router{}
    75  
    76  	router.GET("/", handleRoot)
    77  	router.GET("/apis", handleAPIs)
    78  	router.GET("/apis/"+gvk.Group, handleAPIGroup)
    79  	router.GET("/apis/"+gvk.GroupVersion().String(), handleAPIResourceList)
    80  	router.GET("/healthz", handleHealthz)
    81  	router.GET("/healthz/log", handleHealthz)
    82  	router.GET("/healthz/ping", handleHealthz)
    83  	router.GET("/metrics", handleMetrics)
    84  	router.GET("/openapi/v2", handleOpenAPI)
    85  	router.GET("/version", handleVersion)
    86  	router.NotFound = handleNotFound()
    87  
    88  	for _, res := range resources {
    89  		route := ""
    90  		if !res.namespaced {
    91  			route = fmt.Sprintf("/apis/%s/watch/%s/:namespace", gvk.GroupVersion().String(), res.name)
    92  		} else {
    93  			route = fmt.Sprintf("/apis/%s/watch/namespaces/:namespace/%s/:name", gvk.GroupVersion().String(), res.name)
    94  		}
    95  
    96  		router.GET(route, handleRoot)
    97  		router.POST(route+"/tap", h.handleTap)
    98  	}
    99  
   100  	return router
   101  }
   102  
   103  // POST /apis/tap.linkerd.io/v1alpha1/watch/namespaces/:namespace/tap
   104  // POST /apis/tap.linkerd.io/v1alpha1/watch/namespaces/:namespace/:resource/:name/tap
   105  func (h *handler) handleTap(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
   106  	namespace := p.ByName("namespace")
   107  	name := p.ByName("name")
   108  	resource := ""
   109  
   110  	path := strings.Split(req.URL.Path, "/")
   111  	if len(path) == 8 {
   112  		resource = path[5]
   113  	} else if len(path) == 10 {
   114  		resource = path[7]
   115  	} else {
   116  		err := fmt.Errorf("invalid path: %q", req.URL.Path)
   117  		h.log.Error(err)
   118  		renderJSONError(w, err, http.StatusBadRequest)
   119  		return
   120  	}
   121  
   122  	h.log.Debugf("SubjectAccessReview: namespace: %q, resource: %q, name: %q, user: %q, group: %q",
   123  		namespace, resource, name, h.usernameHeader, h.groupHeader,
   124  	)
   125  
   126  	// TODO: it's possible this SubjectAccessReview is redundant, consider
   127  	// removing, more info at https://github.com/linkerd/linkerd2/issues/3182
   128  	err := pkgK8s.ResourceAuthzForUser(
   129  		req.Context(),
   130  		h.k8sAPI.Client,
   131  		namespace,
   132  		"watch",
   133  		gvk.Group,
   134  		gvk.Version,
   135  		resource,
   136  		"tap",
   137  		name,
   138  		req.Header.Get(h.usernameHeader),
   139  		req.Header.Values(h.groupHeader),
   140  	)
   141  	if err != nil {
   142  		err = fmt.Errorf("tap authorization failed (%w), visit %s for more information", err, pkg.TapRbacURL)
   143  		h.log.Error(err)
   144  		renderJSONError(w, err, http.StatusForbidden)
   145  		return
   146  	}
   147  
   148  	tapReq := pb.TapByResourceRequest{}
   149  	err = protohttp.HTTPRequestToProto(req, &tapReq)
   150  	if err != nil {
   151  		err = fmt.Errorf("Error decoding Tap Request proto: %w", err)
   152  		h.log.Error(err)
   153  		protohttp.WriteErrorToHTTPResponse(w, err)
   154  		return
   155  	}
   156  
   157  	url := pkg.TapReqToURL(&tapReq)
   158  	if url != req.URL.Path {
   159  		err = fmt.Errorf("tap request body did not match APIServer URL: %q != %q", url, req.URL.Path)
   160  		h.log.Error(err)
   161  		protohttp.WriteErrorToHTTPResponse(w, err)
   162  		return
   163  	}
   164  
   165  	flushableWriter, err := protohttp.NewStreamingWriter(w)
   166  	if err != nil {
   167  		h.log.Error(err)
   168  		protohttp.WriteErrorToHTTPResponse(w, err)
   169  		return
   170  	}
   171  
   172  	serverStream := serverStream{w: flushableWriter, req: req, log: h.log}
   173  	// This API endpoint is marked as deprecated but it's still used.
   174  	//nolint:staticcheck
   175  	err = h.grpcTapServer.TapByResource(&tapReq, &serverStream)
   176  	if err != nil {
   177  		h.log.Errorf("TapByResource failed: %q", err)
   178  		protohttp.WriteErrorToHTTPResponse(flushableWriter, err)
   179  		return
   180  	}
   181  }
   182  
   183  // GET (not found)
   184  func handleNotFound() http.Handler {
   185  	return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
   186  		handlePaths(w, http.StatusNotFound)
   187  	})
   188  
   189  }
   190  
   191  // GET /
   192  // GET /apis/tap.linkerd.io/v1alpha1/watch/namespaces/:namespace
   193  // GET /apis/tap.linkerd.io/v1alpha1/watch/namespaces/:namespace/:resource/:name
   194  func handleRoot(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   195  	handlePaths(w, http.StatusOK)
   196  }
   197  
   198  // GET /
   199  // GET (not found)
   200  func handlePaths(w http.ResponseWriter, status int) {
   201  	paths := map[string][]string{
   202  		"paths": {
   203  			"/apis",
   204  			"/apis/" + gvk.Group,
   205  			"/apis/" + gvk.GroupVersion().String(),
   206  			"/healthz",
   207  			"/healthz/log",
   208  			"/healthz/ping",
   209  			"/metrics",
   210  			"/openapi/v2",
   211  			"/version",
   212  		},
   213  	}
   214  
   215  	renderJSON(w, paths, status)
   216  }
   217  
   218  // GET /apis
   219  func handleAPIs(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   220  	groupList := metav1.APIGroupList{
   221  		TypeMeta: metav1.TypeMeta{
   222  			Kind: "APIGroupList",
   223  		},
   224  		Groups: []metav1.APIGroup{
   225  			apiGroup,
   226  		},
   227  	}
   228  
   229  	renderJSON(w, groupList, http.StatusOK)
   230  }
   231  
   232  // GET /apis/tap.linkerd.io
   233  func handleAPIGroup(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   234  	groupWithType := apiGroup
   235  	groupWithType.TypeMeta = metav1.TypeMeta{
   236  		Kind:       "APIGroup",
   237  		APIVersion: "v1",
   238  	}
   239  
   240  	renderJSON(w, groupWithType, http.StatusOK)
   241  }
   242  
   243  // GET /apis/tap.linkerd.io/v1alpha1
   244  // this is required for `kubectl api-resources` to work
   245  func handleAPIResourceList(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   246  	resList := metav1.APIResourceList{
   247  		TypeMeta: metav1.TypeMeta{
   248  			Kind:       "APIResourceList",
   249  			APIVersion: "v1",
   250  		},
   251  		GroupVersion: gvk.GroupVersion().String(),
   252  		APIResources: []metav1.APIResource{},
   253  	}
   254  
   255  	for _, res := range resources {
   256  		resList.APIResources = append(resList.APIResources,
   257  			metav1.APIResource{
   258  				Name:       res.name,
   259  				Namespaced: res.namespaced,
   260  				Kind:       gvk.Kind,
   261  				Verbs:      metav1.Verbs{"watch"},
   262  			})
   263  		resList.APIResources = append(resList.APIResources,
   264  			metav1.APIResource{
   265  				Name:       fmt.Sprintf("%s/tap", res.name),
   266  				Namespaced: res.namespaced,
   267  				Kind:       gvk.Kind,
   268  				Verbs:      metav1.Verbs{"watch"},
   269  			})
   270  	}
   271  
   272  	renderJSON(w, resList, http.StatusOK)
   273  }
   274  
   275  // GET /healthz
   276  // GET /healthz/logs
   277  // GET /healthz/ping
   278  func handleHealthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   279  	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
   280  	w.Write([]byte("ok"))
   281  }
   282  
   283  // GET /metrics
   284  func handleMetrics(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
   285  	promhttp.Handler().ServeHTTP(w, req)
   286  }
   287  
   288  // GET /openapi/v2
   289  func handleOpenAPI(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   290  	swagger := spec.Swagger{
   291  		SwaggerProps: spec.SwaggerProps{
   292  			Swagger: "2.0",
   293  			Info: &spec.Info{
   294  				InfoProps: spec.InfoProps{
   295  					Title:   "Api",
   296  					Version: "v0",
   297  				},
   298  			},
   299  			Paths: &spec.Paths{
   300  				Paths: map[string]spec.PathItem{
   301  					"/":                                    mkPathItem("get all paths"),
   302  					"/apis":                                mkPathItem("get available API versions"),
   303  					"/apis/" + gvk.Group:                   mkPathItem("get information of a group"),
   304  					"/apis/" + gvk.GroupVersion().String(): mkPathItem("get available resources"),
   305  				},
   306  			},
   307  		},
   308  	}
   309  
   310  	renderJSON(w, swagger, http.StatusOK)
   311  }
   312  
   313  func mkPathItem(desc string) spec.PathItem {
   314  	return spec.PathItem{
   315  		PathItemProps: spec.PathItemProps{
   316  			Get: &spec.Operation{
   317  				OperationProps: spec.OperationProps{
   318  					Description: desc,
   319  					Consumes:    []string{"application/json"},
   320  					Produces:    []string{"application/json"},
   321  					Responses: &spec.Responses{
   322  						ResponsesProps: spec.ResponsesProps{
   323  							StatusCodeResponses: map[int]spec.Response{
   324  								200: spec.Response{
   325  									Refable: spec.Refable{Ref: spec.MustCreateRef("n/a")},
   326  									ResponseProps: spec.ResponseProps{
   327  										Description: "OK response",
   328  									},
   329  								},
   330  							},
   331  						},
   332  					},
   333  					ID: "tapResourceV0",
   334  				},
   335  			},
   336  		},
   337  	}
   338  }
   339  
   340  // GET /version
   341  func handleVersion(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
   342  	renderJSON(w, version.Info{}, http.StatusOK)
   343  }
   344  
   345  func renderJSON(w http.ResponseWriter, obj interface{}, status int) {
   346  	bytes, err := json.MarshalIndent(obj, "", "  ")
   347  	if err != nil {
   348  		renderJSONError(w, err, http.StatusInternalServerError)
   349  		return
   350  	}
   351  
   352  	w.Header().Set("Content-Type", "application/json")
   353  	w.WriteHeader(status)
   354  	w.Write(bytes)
   355  }
   356  
   357  // TODO: share with api_handlers.go
   358  func renderJSONError(w http.ResponseWriter, err error, status int) {
   359  	w.Header().Set("Content-Type", "application/json")
   360  	rsp, _ := json.Marshal(jsonError{Error: err.Error()})
   361  	w.WriteHeader(status)
   362  	w.Write(rsp)
   363  }
   364  
   365  // serverStream provides functionality that satisfies the
   366  // tap.Tap_TapByResourceServer. This allows the tap APIServer to call
   367  // GRPCTapServer.TapByResource() directly, rather than make the request to an
   368  // actual gRPC over the network.
   369  //
   370  // TODO: Share this code with streamServer and destinationServer in
   371  // http_server.go.
   372  type serverStream struct {
   373  	w   protohttp.FlushableResponseWriter
   374  	req *http.Request
   375  	log *logrus.Entry
   376  }
   377  
   378  // Satisfy the grpc.ServerStream interface
   379  func (s serverStream) SetHeader(metadata.MD) error  { return nil }
   380  func (s serverStream) SendHeader(metadata.MD) error { return nil }
   381  func (s serverStream) SetTrailer(metadata.MD)       {}
   382  func (s serverStream) Context() context.Context     { return s.req.Context() }
   383  func (s serverStream) SendMsg(interface{}) error    { return nil }
   384  func (s serverStream) RecvMsg(interface{}) error    { return nil }
   385  
   386  // Satisfy the tap.Tap_TapByResourceServer interface
   387  func (s *serverStream) Send(m *pb.TapEvent) error {
   388  	err := protohttp.WriteProtoToHTTPResponse(s.w, m)
   389  	if err != nil {
   390  		s.log.Errorf("Error writing proto to HTTP Response: %s", err)
   391  		protohttp.WriteErrorToHTTPResponse(s.w, err)
   392  		return err
   393  	}
   394  
   395  	s.w.Flush()
   396  	return nil
   397  }
   398  

View as plain text