...
1 package pkg
2
3 import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "io"
9 "net/http"
10 "net/url"
11
12 "github.com/linkerd/linkerd2/pkg/k8s"
13 "github.com/linkerd/linkerd2/pkg/protohttp"
14 pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
15 log "github.com/sirupsen/logrus"
16 "google.golang.org/protobuf/proto"
17 )
18
19 const (
20
21 ErrClosedResponseBody = "http2: response body closed"
22 )
23
24
25
26 const TapRbacURL = "https://linkerd.io/tap-rbac"
27
28
29
30 func Reader(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest) (*bufio.Reader, io.ReadCloser, error) {
31 client, err := k8sAPI.NewClient()
32 if err != nil {
33 return nil, nil, err
34 }
35
36 reqBytes, err := proto.Marshal(req)
37 if err != nil {
38 return nil, nil, err
39 }
40
41 url, err := url.Parse(k8sAPI.Host)
42 if err != nil {
43 return nil, nil, err
44 }
45 url.Path = fmt.Sprintf("%s%s", url.Path, TapReqToURL(req))
46
47 httpReq, err := http.NewRequest(
48 http.MethodPost,
49 url.String(),
50 bytes.NewReader(reqBytes),
51 )
52 if err != nil {
53 return nil, nil, err
54 }
55
56 httpRsp, err := client.Do(httpReq.WithContext(ctx))
57 if err != nil {
58 log.Debugf("Error invoking [%s]: %v", url, err)
59 return nil, nil, err
60 }
61
62 log.Debugf("Response from [%s] had headers: %v", url, httpRsp.Header)
63
64 if err := protohttp.CheckIfResponseHasError(httpRsp); err != nil {
65 httpRsp.Body.Close()
66 return nil, nil, err
67 }
68
69 reader := bufio.NewReader(httpRsp.Body)
70
71 return reader, httpRsp.Body, nil
72 }
73
View as plain text