1 package api
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "strings"
9
10 "github.com/go-openapi/spec"
11 "github.com/julienschmidt/httprouter"
12 "github.com/linkerd/linkerd2/controller/k8s"
13 pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
14 "github.com/linkerd/linkerd2/pkg/protohttp"
15 pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
16 "github.com/linkerd/linkerd2/viz/tap/pkg"
17 "github.com/prometheus/client_golang/prometheus/promhttp"
18 "github.com/sirupsen/logrus"
19 "google.golang.org/grpc/metadata"
20 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21 "k8s.io/apimachinery/pkg/runtime/schema"
22 "k8s.io/apimachinery/pkg/version"
23 )
24
25 type handler struct {
26 k8sAPI *k8s.API
27 usernameHeader string
28 groupHeader string
29 grpcTapServer pb.TapServer
30 log *logrus.Entry
31 }
32
33
34 type jsonError struct {
35 Error string `json:"error"`
36 }
37
38 var (
39 gvk = &schema.GroupVersionKind{
40 Group: "tap.linkerd.io",
41 Version: "v1alpha1",
42 Kind: "Tap",
43 }
44
45 gvfd = metav1.GroupVersionForDiscovery{
46 GroupVersion: gvk.GroupVersion().String(),
47 Version: gvk.Version,
48 }
49
50 apiGroup = metav1.APIGroup{
51 Name: gvk.Group,
52 Versions: []metav1.GroupVersionForDiscovery{gvfd},
53 PreferredVersion: gvfd,
54 }
55
56 resources = []struct {
57 name string
58 namespaced bool
59 }{
60 {"namespaces", false},
61 {"pods", true},
62 {"replicationcontrollers", true},
63 {"services", true},
64 {"daemonsets", true},
65 {"deployments", true},
66 {"replicasets", true},
67 {"statefulsets", true},
68 {"jobs", true},
69 {"cronjobs", true},
70 }
71 )
72
73 func initRouter(h *handler) *httprouter.Router {
74 router := &httprouter.Router{}
75
76 router.GET("/", handleRoot)
77 router.GET("/apis", handleAPIs)
78 router.GET("/apis/"+gvk.Group, handleAPIGroup)
79 router.GET("/apis/"+gvk.GroupVersion().String(), handleAPIResourceList)
80 router.GET("/healthz", handleHealthz)
81 router.GET("/healthz/log", handleHealthz)
82 router.GET("/healthz/ping", handleHealthz)
83 router.GET("/metrics", handleMetrics)
84 router.GET("/openapi/v2", handleOpenAPI)
85 router.GET("/version", handleVersion)
86 router.NotFound = handleNotFound()
87
88 for _, res := range resources {
89 route := ""
90 if !res.namespaced {
91 route = fmt.Sprintf("/apis/%s/watch/%s/:namespace", gvk.GroupVersion().String(), res.name)
92 } else {
93 route = fmt.Sprintf("/apis/%s/watch/namespaces/:namespace/%s/:name", gvk.GroupVersion().String(), res.name)
94 }
95
96 router.GET(route, handleRoot)
97 router.POST(route+"/tap", h.handleTap)
98 }
99
100 return router
101 }
102
103
104
105 func (h *handler) handleTap(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
106 namespace := p.ByName("namespace")
107 name := p.ByName("name")
108 resource := ""
109
110 path := strings.Split(req.URL.Path, "/")
111 if len(path) == 8 {
112 resource = path[5]
113 } else if len(path) == 10 {
114 resource = path[7]
115 } else {
116 err := fmt.Errorf("invalid path: %q", req.URL.Path)
117 h.log.Error(err)
118 renderJSONError(w, err, http.StatusBadRequest)
119 return
120 }
121
122 h.log.Debugf("SubjectAccessReview: namespace: %q, resource: %q, name: %q, user: %q, group: %q",
123 namespace, resource, name, h.usernameHeader, h.groupHeader,
124 )
125
126
127
128 err := pkgK8s.ResourceAuthzForUser(
129 req.Context(),
130 h.k8sAPI.Client,
131 namespace,
132 "watch",
133 gvk.Group,
134 gvk.Version,
135 resource,
136 "tap",
137 name,
138 req.Header.Get(h.usernameHeader),
139 req.Header.Values(h.groupHeader),
140 )
141 if err != nil {
142 err = fmt.Errorf("tap authorization failed (%w), visit %s for more information", err, pkg.TapRbacURL)
143 h.log.Error(err)
144 renderJSONError(w, err, http.StatusForbidden)
145 return
146 }
147
148 tapReq := pb.TapByResourceRequest{}
149 err = protohttp.HTTPRequestToProto(req, &tapReq)
150 if err != nil {
151 err = fmt.Errorf("Error decoding Tap Request proto: %w", err)
152 h.log.Error(err)
153 protohttp.WriteErrorToHTTPResponse(w, err)
154 return
155 }
156
157 url := pkg.TapReqToURL(&tapReq)
158 if url != req.URL.Path {
159 err = fmt.Errorf("tap request body did not match APIServer URL: %q != %q", url, req.URL.Path)
160 h.log.Error(err)
161 protohttp.WriteErrorToHTTPResponse(w, err)
162 return
163 }
164
165 flushableWriter, err := protohttp.NewStreamingWriter(w)
166 if err != nil {
167 h.log.Error(err)
168 protohttp.WriteErrorToHTTPResponse(w, err)
169 return
170 }
171
172 serverStream := serverStream{w: flushableWriter, req: req, log: h.log}
173
174
175 err = h.grpcTapServer.TapByResource(&tapReq, &serverStream)
176 if err != nil {
177 h.log.Errorf("TapByResource failed: %q", err)
178 protohttp.WriteErrorToHTTPResponse(flushableWriter, err)
179 return
180 }
181 }
182
183
184 func handleNotFound() http.Handler {
185 return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
186 handlePaths(w, http.StatusNotFound)
187 })
188
189 }
190
191
192
193
194 func handleRoot(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
195 handlePaths(w, http.StatusOK)
196 }
197
198
199
200 func handlePaths(w http.ResponseWriter, status int) {
201 paths := map[string][]string{
202 "paths": {
203 "/apis",
204 "/apis/" + gvk.Group,
205 "/apis/" + gvk.GroupVersion().String(),
206 "/healthz",
207 "/healthz/log",
208 "/healthz/ping",
209 "/metrics",
210 "/openapi/v2",
211 "/version",
212 },
213 }
214
215 renderJSON(w, paths, status)
216 }
217
218
219 func handleAPIs(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
220 groupList := metav1.APIGroupList{
221 TypeMeta: metav1.TypeMeta{
222 Kind: "APIGroupList",
223 },
224 Groups: []metav1.APIGroup{
225 apiGroup,
226 },
227 }
228
229 renderJSON(w, groupList, http.StatusOK)
230 }
231
232
233 func handleAPIGroup(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
234 groupWithType := apiGroup
235 groupWithType.TypeMeta = metav1.TypeMeta{
236 Kind: "APIGroup",
237 APIVersion: "v1",
238 }
239
240 renderJSON(w, groupWithType, http.StatusOK)
241 }
242
243
244
245 func handleAPIResourceList(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
246 resList := metav1.APIResourceList{
247 TypeMeta: metav1.TypeMeta{
248 Kind: "APIResourceList",
249 APIVersion: "v1",
250 },
251 GroupVersion: gvk.GroupVersion().String(),
252 APIResources: []metav1.APIResource{},
253 }
254
255 for _, res := range resources {
256 resList.APIResources = append(resList.APIResources,
257 metav1.APIResource{
258 Name: res.name,
259 Namespaced: res.namespaced,
260 Kind: gvk.Kind,
261 Verbs: metav1.Verbs{"watch"},
262 })
263 resList.APIResources = append(resList.APIResources,
264 metav1.APIResource{
265 Name: fmt.Sprintf("%s/tap", res.name),
266 Namespaced: res.namespaced,
267 Kind: gvk.Kind,
268 Verbs: metav1.Verbs{"watch"},
269 })
270 }
271
272 renderJSON(w, resList, http.StatusOK)
273 }
274
275
276
277
278 func handleHealthz(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
279 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
280 w.Write([]byte("ok"))
281 }
282
283
284 func handleMetrics(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
285 promhttp.Handler().ServeHTTP(w, req)
286 }
287
288
289 func handleOpenAPI(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
290 swagger := spec.Swagger{
291 SwaggerProps: spec.SwaggerProps{
292 Swagger: "2.0",
293 Info: &spec.Info{
294 InfoProps: spec.InfoProps{
295 Title: "Api",
296 Version: "v0",
297 },
298 },
299 Paths: &spec.Paths{
300 Paths: map[string]spec.PathItem{
301 "/": mkPathItem("get all paths"),
302 "/apis": mkPathItem("get available API versions"),
303 "/apis/" + gvk.Group: mkPathItem("get information of a group"),
304 "/apis/" + gvk.GroupVersion().String(): mkPathItem("get available resources"),
305 },
306 },
307 },
308 }
309
310 renderJSON(w, swagger, http.StatusOK)
311 }
312
313 func mkPathItem(desc string) spec.PathItem {
314 return spec.PathItem{
315 PathItemProps: spec.PathItemProps{
316 Get: &spec.Operation{
317 OperationProps: spec.OperationProps{
318 Description: desc,
319 Consumes: []string{"application/json"},
320 Produces: []string{"application/json"},
321 Responses: &spec.Responses{
322 ResponsesProps: spec.ResponsesProps{
323 StatusCodeResponses: map[int]spec.Response{
324 200: spec.Response{
325 Refable: spec.Refable{Ref: spec.MustCreateRef("n/a")},
326 ResponseProps: spec.ResponseProps{
327 Description: "OK response",
328 },
329 },
330 },
331 },
332 },
333 ID: "tapResourceV0",
334 },
335 },
336 },
337 }
338 }
339
340
341 func handleVersion(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
342 renderJSON(w, version.Info{}, http.StatusOK)
343 }
344
345 func renderJSON(w http.ResponseWriter, obj interface{}, status int) {
346 bytes, err := json.MarshalIndent(obj, "", " ")
347 if err != nil {
348 renderJSONError(w, err, http.StatusInternalServerError)
349 return
350 }
351
352 w.Header().Set("Content-Type", "application/json")
353 w.WriteHeader(status)
354 w.Write(bytes)
355 }
356
357
358 func renderJSONError(w http.ResponseWriter, err error, status int) {
359 w.Header().Set("Content-Type", "application/json")
360 rsp, _ := json.Marshal(jsonError{Error: err.Error()})
361 w.WriteHeader(status)
362 w.Write(rsp)
363 }
364
365
366
367
368
369
370
371
372 type serverStream struct {
373 w protohttp.FlushableResponseWriter
374 req *http.Request
375 log *logrus.Entry
376 }
377
378
379 func (s serverStream) SetHeader(metadata.MD) error { return nil }
380 func (s serverStream) SendHeader(metadata.MD) error { return nil }
381 func (s serverStream) SetTrailer(metadata.MD) {}
382 func (s serverStream) Context() context.Context { return s.req.Context() }
383 func (s serverStream) SendMsg(interface{}) error { return nil }
384 func (s serverStream) RecvMsg(interface{}) error { return nil }
385
386
387 func (s *serverStream) Send(m *pb.TapEvent) error {
388 err := protohttp.WriteProtoToHTTPResponse(s.w, m)
389 if err != nil {
390 s.log.Errorf("Error writing proto to HTTP Response: %s", err)
391 protohttp.WriteErrorToHTTPResponse(s.w, err)
392 return err
393 }
394
395 s.w.Flush()
396 return nil
397 }
398
View as plain text