...

Source file src/github.com/linkerd/linkerd2/pkg/protohttp/protohttp.go

Documentation: github.com/linkerd/linkerd2/pkg/protohttp

     1  package protohttp
     2  
     3  import (
     4  	"bufio"
     5  	"encoding/binary"
     6  	"errors"
     7  	"fmt"
     8  	"io"
     9  	"net/http"
    10  
    11  	"github.com/linkerd/linkerd2/pkg/k8s"
    12  	"github.com/linkerd/linkerd2/pkg/util"
    13  	metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    14  	log "github.com/sirupsen/logrus"
    15  	"google.golang.org/grpc/status"
    16  	"google.golang.org/protobuf/proto"
    17  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    18  )
    19  
    20  const (
    21  	errorHeader                = "linkerd-error"
    22  	defaultHTTPErrorStatusCode = http.StatusInternalServerError
    23  	contentTypeHeader          = "Content-Type"
    24  	protobufContentType        = "application/octet-stream"
    25  	numBytesForMessageLength   = 4
    26  )
    27  
    28  // HTTPError is an error which indicates the HTTP response contained an error
    29  type HTTPError struct {
    30  	Code         int
    31  	WrappedError error
    32  }
    33  
    34  // FlushableResponseWriter wraps a ResponseWriter for use in streaming
    35  // responses
    36  type FlushableResponseWriter interface {
    37  	http.ResponseWriter
    38  	http.Flusher
    39  }
    40  
    41  // Error satisfies the error interface for HTTPError.
    42  func (e HTTPError) Error() string {
    43  	return fmt.Sprintf("HTTP error, status Code [%d] (%v)", e.Code, e.WrappedError)
    44  }
    45  
    46  // HTTPRequestToProto converts an HTTP Request to a protobuf request.
    47  func HTTPRequestToProto(req *http.Request, protoRequestOut proto.Message) error {
    48  	bytes, err := util.ReadAllLimit(req.Body, 100*util.MB)
    49  	if err != nil {
    50  		return HTTPError{
    51  			Code:         http.StatusBadRequest,
    52  			WrappedError: err,
    53  		}
    54  	}
    55  
    56  	err = proto.Unmarshal(bytes, protoRequestOut)
    57  	if err != nil {
    58  		return HTTPError{
    59  			Code:         http.StatusBadRequest,
    60  			WrappedError: err,
    61  		}
    62  	}
    63  
    64  	return nil
    65  }
    66  
    67  // WriteErrorToHTTPResponse writes a protobuf-encoded error to an HTTP Response.
    68  func WriteErrorToHTTPResponse(w http.ResponseWriter, errorObtained error) {
    69  	statusCode := defaultHTTPErrorStatusCode
    70  	errorToReturn := errorObtained
    71  
    72  	var he HTTPError
    73  	if errors.As(errorObtained, &he) {
    74  		statusCode = he.Code
    75  		errorToReturn = he.WrappedError
    76  	}
    77  
    78  	w.Header().Set(errorHeader, http.StatusText(statusCode))
    79  
    80  	errorMessageToReturn := errorToReturn.Error()
    81  	if grpcError, ok := status.FromError(errorObtained); ok {
    82  		errorMessageToReturn = grpcError.Message()
    83  	}
    84  
    85  	errorAsProto := &metricsPb.ApiError{Error: errorMessageToReturn}
    86  
    87  	err := WriteProtoToHTTPResponse(w, errorAsProto)
    88  	if err != nil {
    89  		log.Errorf("Error writing error to http response: %v", err)
    90  		w.Header().Set(errorHeader, err.Error())
    91  	}
    92  }
    93  
    94  // WriteProtoToHTTPResponse writes a protobuf-encoded message to an HTTP
    95  // Response.
    96  func WriteProtoToHTTPResponse(w http.ResponseWriter, msg proto.Message) error {
    97  	w.Header().Set(contentTypeHeader, protobufContentType)
    98  	marshalledProtobufMessage, err := proto.Marshal(msg)
    99  	if err != nil {
   100  		return err
   101  	}
   102  
   103  	fullPayload := SerializeAsPayload(marshalledProtobufMessage)
   104  	_, err = w.Write(fullPayload)
   105  	return err
   106  }
   107  
   108  // NewStreamingWriter takes a ResponseWriter and returns it wrapped in a
   109  // FlushableResponseWriter.
   110  func NewStreamingWriter(w http.ResponseWriter) (FlushableResponseWriter, error) {
   111  	flushableWriter, ok := w.(FlushableResponseWriter)
   112  	if !ok {
   113  		return nil, fmt.Errorf("streaming not supported by this writer")
   114  	}
   115  
   116  	flushableWriter.Header().Set("Connection", "keep-alive")
   117  	flushableWriter.Header().Set("Transfer-Encoding", "chunked")
   118  	return flushableWriter, nil
   119  }
   120  
   121  // SerializeAsPayload appends a 4-byte length in front of a byte slice.
   122  func SerializeAsPayload(messageContentsInBytes []byte) []byte {
   123  	lengthOfThePayload := uint32(len(messageContentsInBytes))
   124  
   125  	messageLengthInBytes := make([]byte, numBytesForMessageLength)
   126  	binary.LittleEndian.PutUint32(messageLengthInBytes, lengthOfThePayload)
   127  
   128  	return append(messageLengthInBytes, messageContentsInBytes...)
   129  }
   130  
   131  func deserializePayloadFromReader(reader *bufio.Reader) ([]byte, error) {
   132  	messageLengthAsBytes := make([]byte, numBytesForMessageLength)
   133  	_, err := io.ReadFull(reader, messageLengthAsBytes)
   134  	if err != nil {
   135  		return nil, fmt.Errorf("error while reading message length: %w", err)
   136  	}
   137  	messageLength := int(binary.LittleEndian.Uint32(messageLengthAsBytes))
   138  
   139  	messageContentsAsBytes := make([]byte, messageLength)
   140  	_, err = io.ReadFull(reader, messageContentsAsBytes)
   141  	if err != nil {
   142  		return nil, fmt.Errorf("error while reading bytes from message: %w", err)
   143  	}
   144  
   145  	return messageContentsAsBytes, nil
   146  }
   147  
   148  // CheckIfResponseHasError checks an HTTP Response for errors and returns error
   149  // information with the following precedence:
   150  // 1. "linkerd-error" header, with protobuf-encoded apiError
   151  // 2. non-200 Status Code, with Kubernetes StatusError
   152  // 3. non-200 Status Code
   153  func CheckIfResponseHasError(rsp *http.Response) error {
   154  	// check for protobuf-encoded error
   155  	errorMsg := rsp.Header.Get(errorHeader)
   156  	if errorMsg != "" {
   157  		reader := bufio.NewReader(rsp.Body)
   158  		var apiError metricsPb.ApiError
   159  
   160  		err := FromByteStreamToProtocolBuffers(reader, &apiError)
   161  		if err != nil {
   162  			return fmt.Errorf("response has %s header [%s], but response body didn't contain protobuf error: %w", errorHeader, errorMsg, err)
   163  		}
   164  
   165  		return errors.New(apiError.Error)
   166  	}
   167  
   168  	// check for JSON-encoded error
   169  	if rsp.StatusCode != http.StatusOK {
   170  		if rsp.Body != nil {
   171  			bytes, err := util.ReadAllLimit(rsp.Body, 100*util.MB)
   172  			if err == nil && len(bytes) > 0 {
   173  				body := string(bytes)
   174  				obj, err := k8s.ToRuntimeObject(body)
   175  				if err == nil {
   176  					return HTTPError{Code: rsp.StatusCode, WrappedError: kerrors.FromObject(obj)}
   177  				}
   178  
   179  				body = fmt.Sprintf("unexpected API response: %s", body)
   180  				return HTTPError{Code: rsp.StatusCode, WrappedError: errors.New(body)}
   181  			}
   182  		}
   183  
   184  		return HTTPError{Code: rsp.StatusCode, WrappedError: errors.New("unexpected API response")}
   185  	}
   186  
   187  	return nil
   188  }
   189  
   190  // FromByteStreamToProtocolBuffers converts a byte stream to a protobuf message.
   191  func FromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader, out proto.Message) error {
   192  	messageAsBytes, err := deserializePayloadFromReader(byteStreamContainingMessage)
   193  	if err != nil {
   194  		return fmt.Errorf("error reading byte stream header: %w", err)
   195  	}
   196  
   197  	err = proto.Unmarshal(messageAsBytes, out)
   198  	if err != nil {
   199  		return fmt.Errorf("error unmarshalling array of [%d] bytes error: %w", len(messageAsBytes), err)
   200  	}
   201  
   202  	return nil
   203  }
   204  

View as plain text