...

Source file src/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go

Documentation: k8s.io/apimachinery/pkg/util/httpstream

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package httpstream
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"io"
    23  	"net/http"
    24  	"strings"
    25  	"time"
    26  )
    27  
    28  const (
    29  	HeaderConnection               = "Connection"
    30  	HeaderUpgrade                  = "Upgrade"
    31  	HeaderProtocolVersion          = "X-Stream-Protocol-Version"
    32  	HeaderAcceptedProtocolVersions = "X-Accepted-Stream-Protocol-Versions"
    33  )
    34  
    35  // NewStreamHandler defines a function that is called when a new Stream is
    36  // received. If no error is returned, the Stream is accepted; otherwise,
    37  // the stream is rejected. After the reply frame has been sent, replySent is closed.
    38  type NewStreamHandler func(stream Stream, replySent <-chan struct{}) error
    39  
    40  // NoOpNewStreamHandler is a stream handler that accepts a new stream and
    41  // performs no other logic.
    42  func NoOpNewStreamHandler(stream Stream, replySent <-chan struct{}) error { return nil }
    43  
    44  // Dialer knows how to open a streaming connection to a server.
    45  type Dialer interface {
    46  
    47  	// Dial opens a streaming connection to a server using one of the protocols
    48  	// specified (in order of most preferred to least preferred).
    49  	Dial(protocols ...string) (Connection, string, error)
    50  }
    51  
    52  // UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade
    53  // HTTP requests to support multiplexed bidirectional streams. After RoundTrip()
    54  // is invoked, if the upgrade is successful, clients may retrieve the upgraded
    55  // connection by calling UpgradeRoundTripper.Connection().
    56  type UpgradeRoundTripper interface {
    57  	http.RoundTripper
    58  	// NewConnection validates the response and creates a new Connection.
    59  	NewConnection(resp *http.Response) (Connection, error)
    60  }
    61  
    62  // ResponseUpgrader knows how to upgrade HTTP requests and responses to
    63  // add streaming support to them.
    64  type ResponseUpgrader interface {
    65  	// UpgradeResponse upgrades an HTTP response to one that supports multiplexed
    66  	// streams. newStreamHandler will be called asynchronously whenever the
    67  	// other end of the upgraded connection creates a new stream.
    68  	UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler NewStreamHandler) Connection
    69  }
    70  
    71  // Connection represents an upgraded HTTP connection.
    72  type Connection interface {
    73  	// CreateStream creates a new Stream with the supplied headers.
    74  	CreateStream(headers http.Header) (Stream, error)
    75  	// Close resets all streams and closes the connection.
    76  	Close() error
    77  	// CloseChan returns a channel that is closed when the underlying connection is closed.
    78  	CloseChan() <-chan bool
    79  	// SetIdleTimeout sets the amount of time the connection may remain idle before
    80  	// it is automatically closed.
    81  	SetIdleTimeout(timeout time.Duration)
    82  	// RemoveStreams can be used to remove a set of streams from the Connection.
    83  	RemoveStreams(streams ...Stream)
    84  }
    85  
    86  // Stream represents a bidirectional communications channel that is part of an
    87  // upgraded connection.
    88  type Stream interface {
    89  	io.ReadWriteCloser
    90  	// Reset closes both directions of the stream, indicating that neither client
    91  	// or server can use it any more.
    92  	Reset() error
    93  	// Headers returns the headers used to create the stream.
    94  	Headers() http.Header
    95  	// Identifier returns the stream's ID.
    96  	Identifier() uint32
    97  }
    98  
    99  // UpgradeFailureError encapsulates the cause for why the streaming
   100  // upgrade request failed. Implements error interface.
   101  type UpgradeFailureError struct {
   102  	Cause error
   103  }
   104  
   105  func (u *UpgradeFailureError) Error() string {
   106  	return fmt.Sprintf("unable to upgrade streaming request: %s", u.Cause)
   107  }
   108  
   109  // IsUpgradeFailure returns true if the passed error is (or wrapped error contains)
   110  // the UpgradeFailureError.
   111  func IsUpgradeFailure(err error) bool {
   112  	if err == nil {
   113  		return false
   114  	}
   115  	var upgradeErr *UpgradeFailureError
   116  	return errors.As(err, &upgradeErr)
   117  }
   118  
   119  // IsUpgradeRequest returns true if the given request is a connection upgrade request
   120  func IsUpgradeRequest(req *http.Request) bool {
   121  	for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
   122  		if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) {
   123  			return true
   124  		}
   125  	}
   126  	return false
   127  }
   128  
   129  func negotiateProtocol(clientProtocols, serverProtocols []string) string {
   130  	for i := range clientProtocols {
   131  		for j := range serverProtocols {
   132  			if clientProtocols[i] == serverProtocols[j] {
   133  				return clientProtocols[i]
   134  			}
   135  		}
   136  	}
   137  	return ""
   138  }
   139  
   140  func commaSeparatedHeaderValues(header []string) []string {
   141  	var parsedClientProtocols []string
   142  	for i := range header {
   143  		for _, clientProtocol := range strings.Split(header[i], ",") {
   144  			if proto := strings.Trim(clientProtocol, " "); len(proto) > 0 {
   145  				parsedClientProtocols = append(parsedClientProtocols, proto)
   146  			}
   147  		}
   148  	}
   149  	return parsedClientProtocols
   150  }
   151  
   152  // Handshake performs a subprotocol negotiation. If the client did request a
   153  // subprotocol, Handshake will select the first common value found in
   154  // serverProtocols. If a match is found, Handshake adds a response header
   155  // indicating the chosen subprotocol. If no match is found, HTTP forbidden is
   156  // returned, along with a response header containing the list of protocols the
   157  // server can accept.
   158  func Handshake(req *http.Request, w http.ResponseWriter, serverProtocols []string) (string, error) {
   159  	clientProtocols := commaSeparatedHeaderValues(req.Header[http.CanonicalHeaderKey(HeaderProtocolVersion)])
   160  	if len(clientProtocols) == 0 {
   161  		return "", fmt.Errorf("unable to upgrade: %s is required", HeaderProtocolVersion)
   162  	}
   163  
   164  	if len(serverProtocols) == 0 {
   165  		panic(fmt.Errorf("unable to upgrade: serverProtocols is required"))
   166  	}
   167  
   168  	negotiatedProtocol := negotiateProtocol(clientProtocols, serverProtocols)
   169  	if len(negotiatedProtocol) == 0 {
   170  		for i := range serverProtocols {
   171  			w.Header().Add(HeaderAcceptedProtocolVersions, serverProtocols[i])
   172  		}
   173  		err := fmt.Errorf("unable to upgrade: unable to negotiate protocol: client supports %v, server accepts %v", clientProtocols, serverProtocols)
   174  		http.Error(w, err.Error(), http.StatusForbidden)
   175  		return "", err
   176  	}
   177  
   178  	w.Header().Add(HeaderProtocolVersion, negotiatedProtocol)
   179  	return negotiatedProtocol, nil
   180  }
   181  

View as plain text