1 package protohttp
2
3 import (
4 "bufio"
5 "encoding/binary"
6 "errors"
7 "fmt"
8 "io"
9 "net/http"
10
11 "github.com/linkerd/linkerd2/pkg/k8s"
12 "github.com/linkerd/linkerd2/pkg/util"
13 metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
14 log "github.com/sirupsen/logrus"
15 "google.golang.org/grpc/status"
16 "google.golang.org/protobuf/proto"
17 kerrors "k8s.io/apimachinery/pkg/api/errors"
18 )
19
20 const (
21 errorHeader = "linkerd-error"
22 defaultHTTPErrorStatusCode = http.StatusInternalServerError
23 contentTypeHeader = "Content-Type"
24 protobufContentType = "application/octet-stream"
25 numBytesForMessageLength = 4
26 )
27
28
29 type HTTPError struct {
30 Code int
31 WrappedError error
32 }
33
34
35
36 type FlushableResponseWriter interface {
37 http.ResponseWriter
38 http.Flusher
39 }
40
41
42 func (e HTTPError) Error() string {
43 return fmt.Sprintf("HTTP error, status Code [%d] (%v)", e.Code, e.WrappedError)
44 }
45
46
47 func HTTPRequestToProto(req *http.Request, protoRequestOut proto.Message) error {
48 bytes, err := util.ReadAllLimit(req.Body, 100*util.MB)
49 if err != nil {
50 return HTTPError{
51 Code: http.StatusBadRequest,
52 WrappedError: err,
53 }
54 }
55
56 err = proto.Unmarshal(bytes, protoRequestOut)
57 if err != nil {
58 return HTTPError{
59 Code: http.StatusBadRequest,
60 WrappedError: err,
61 }
62 }
63
64 return nil
65 }
66
67
68 func WriteErrorToHTTPResponse(w http.ResponseWriter, errorObtained error) {
69 statusCode := defaultHTTPErrorStatusCode
70 errorToReturn := errorObtained
71
72 var he HTTPError
73 if errors.As(errorObtained, &he) {
74 statusCode = he.Code
75 errorToReturn = he.WrappedError
76 }
77
78 w.Header().Set(errorHeader, http.StatusText(statusCode))
79
80 errorMessageToReturn := errorToReturn.Error()
81 if grpcError, ok := status.FromError(errorObtained); ok {
82 errorMessageToReturn = grpcError.Message()
83 }
84
85 errorAsProto := &metricsPb.ApiError{Error: errorMessageToReturn}
86
87 err := WriteProtoToHTTPResponse(w, errorAsProto)
88 if err != nil {
89 log.Errorf("Error writing error to http response: %v", err)
90 w.Header().Set(errorHeader, err.Error())
91 }
92 }
93
94
95
96 func WriteProtoToHTTPResponse(w http.ResponseWriter, msg proto.Message) error {
97 w.Header().Set(contentTypeHeader, protobufContentType)
98 marshalledProtobufMessage, err := proto.Marshal(msg)
99 if err != nil {
100 return err
101 }
102
103 fullPayload := SerializeAsPayload(marshalledProtobufMessage)
104 _, err = w.Write(fullPayload)
105 return err
106 }
107
108
109
110 func NewStreamingWriter(w http.ResponseWriter) (FlushableResponseWriter, error) {
111 flushableWriter, ok := w.(FlushableResponseWriter)
112 if !ok {
113 return nil, fmt.Errorf("streaming not supported by this writer")
114 }
115
116 flushableWriter.Header().Set("Connection", "keep-alive")
117 flushableWriter.Header().Set("Transfer-Encoding", "chunked")
118 return flushableWriter, nil
119 }
120
121
122 func SerializeAsPayload(messageContentsInBytes []byte) []byte {
123 lengthOfThePayload := uint32(len(messageContentsInBytes))
124
125 messageLengthInBytes := make([]byte, numBytesForMessageLength)
126 binary.LittleEndian.PutUint32(messageLengthInBytes, lengthOfThePayload)
127
128 return append(messageLengthInBytes, messageContentsInBytes...)
129 }
130
131 func deserializePayloadFromReader(reader *bufio.Reader) ([]byte, error) {
132 messageLengthAsBytes := make([]byte, numBytesForMessageLength)
133 _, err := io.ReadFull(reader, messageLengthAsBytes)
134 if err != nil {
135 return nil, fmt.Errorf("error while reading message length: %w", err)
136 }
137 messageLength := int(binary.LittleEndian.Uint32(messageLengthAsBytes))
138
139 messageContentsAsBytes := make([]byte, messageLength)
140 _, err = io.ReadFull(reader, messageContentsAsBytes)
141 if err != nil {
142 return nil, fmt.Errorf("error while reading bytes from message: %w", err)
143 }
144
145 return messageContentsAsBytes, nil
146 }
147
148
149
150
151
152
153 func CheckIfResponseHasError(rsp *http.Response) error {
154
155 errorMsg := rsp.Header.Get(errorHeader)
156 if errorMsg != "" {
157 reader := bufio.NewReader(rsp.Body)
158 var apiError metricsPb.ApiError
159
160 err := FromByteStreamToProtocolBuffers(reader, &apiError)
161 if err != nil {
162 return fmt.Errorf("response has %s header [%s], but response body didn't contain protobuf error: %w", errorHeader, errorMsg, err)
163 }
164
165 return errors.New(apiError.Error)
166 }
167
168
169 if rsp.StatusCode != http.StatusOK {
170 if rsp.Body != nil {
171 bytes, err := util.ReadAllLimit(rsp.Body, 100*util.MB)
172 if err == nil && len(bytes) > 0 {
173 body := string(bytes)
174 obj, err := k8s.ToRuntimeObject(body)
175 if err == nil {
176 return HTTPError{Code: rsp.StatusCode, WrappedError: kerrors.FromObject(obj)}
177 }
178
179 body = fmt.Sprintf("unexpected API response: %s", body)
180 return HTTPError{Code: rsp.StatusCode, WrappedError: errors.New(body)}
181 }
182 }
183
184 return HTTPError{Code: rsp.StatusCode, WrappedError: errors.New("unexpected API response")}
185 }
186
187 return nil
188 }
189
190
191 func FromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader, out proto.Message) error {
192 messageAsBytes, err := deserializePayloadFromReader(byteStreamContainingMessage)
193 if err != nil {
194 return fmt.Errorf("error reading byte stream header: %w", err)
195 }
196
197 err = proto.Unmarshal(messageAsBytes, out)
198 if err != nil {
199 return fmt.Errorf("error unmarshalling array of [%d] bytes error: %w", len(messageAsBytes), err)
200 }
201
202 return nil
203 }
204
View as plain text