...

Source file src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/stream.go

Documentation: k8s.io/apimachinery/pkg/util/httpstream/wsstream

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package wsstream
    18  
    19  import (
    20  	"encoding/base64"
    21  	"io"
    22  	"net/http"
    23  	"sync"
    24  	"time"
    25  
    26  	"golang.org/x/net/websocket"
    27  
    28  	"k8s.io/apimachinery/pkg/util/runtime"
    29  )
    30  
    31  // The WebSocket subprotocol "binary.k8s.io" will only send messages to the
    32  // client and ignore messages sent to the server. The received messages are
    33  // the exact bytes written to the stream. Zero byte messages are possible.
    34  const binaryWebSocketProtocol = "binary.k8s.io"
    35  
    36  // The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the
    37  // client and ignore messages sent to the server. The received messages are
    38  // a base64 version of the bytes written to the stream. Zero byte messages are
    39  // possible.
    40  const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
    41  
    42  // ReaderProtocolConfig describes a websocket subprotocol with one stream.
    43  type ReaderProtocolConfig struct {
    44  	Binary bool
    45  }
    46  
    47  // NewDefaultReaderProtocols returns a stream protocol map with the
    48  // subprotocols "", "channel.k8s.io", "base64.channel.k8s.io".
    49  func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
    50  	return map[string]ReaderProtocolConfig{
    51  		"":                            {Binary: true},
    52  		binaryWebSocketProtocol:       {Binary: true},
    53  		base64BinaryWebSocketProtocol: {Binary: false},
    54  	}
    55  }
    56  
    57  // Reader supports returning an arbitrary byte stream over a websocket channel.
    58  type Reader struct {
    59  	err              chan error
    60  	r                io.Reader
    61  	ping             bool
    62  	timeout          time.Duration
    63  	protocols        map[string]ReaderProtocolConfig
    64  	selectedProtocol string
    65  
    66  	handleCrash func(additionalHandlers ...func(interface{})) // overridable for testing
    67  }
    68  
    69  // NewReader creates a WebSocket pipe that will copy the contents of r to a provided
    70  // WebSocket connection. If ping is true, a zero length message will be sent to the client
    71  // before the stream begins reading.
    72  //
    73  // The protocols parameter maps subprotocol names to StreamProtocols. The empty string
    74  // subprotocol name is used if websocket.Config.Protocol is empty.
    75  func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
    76  	return &Reader{
    77  		r:           r,
    78  		err:         make(chan error),
    79  		ping:        ping,
    80  		protocols:   protocols,
    81  		handleCrash: runtime.HandleCrash,
    82  	}
    83  }
    84  
    85  // SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
    86  // there is no timeout on the reader.
    87  func (r *Reader) SetIdleTimeout(duration time.Duration) {
    88  	r.timeout = duration
    89  }
    90  
    91  func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
    92  	supportedProtocols := make([]string, 0, len(r.protocols))
    93  	for p := range r.protocols {
    94  		supportedProtocols = append(supportedProtocols, p)
    95  	}
    96  	return handshake(config, req, supportedProtocols)
    97  }
    98  
    99  // Copy the reader to the response. The created WebSocket is closed after this
   100  // method completes.
   101  func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
   102  	go func() {
   103  		defer r.handleCrash()
   104  		websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
   105  	}()
   106  	return <-r.err
   107  }
   108  
   109  // handle implements a WebSocket handler.
   110  func (r *Reader) handle(ws *websocket.Conn) {
   111  	// Close the connection when the client requests it, or when we finish streaming, whichever happens first
   112  	closeConnOnce := &sync.Once{}
   113  	closeConn := func() {
   114  		closeConnOnce.Do(func() {
   115  			ws.Close()
   116  		})
   117  	}
   118  
   119  	negotiated := ws.Config().Protocol
   120  	r.selectedProtocol = negotiated[0]
   121  	defer close(r.err)
   122  	defer closeConn()
   123  
   124  	go func() {
   125  		defer runtime.HandleCrash()
   126  		// This blocks until the connection is closed.
   127  		// Client should not send anything.
   128  		IgnoreReceives(ws, r.timeout)
   129  		// Once the client closes, we should also close
   130  		closeConn()
   131  	}()
   132  
   133  	r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout)
   134  }
   135  
   136  func resetTimeout(ws *websocket.Conn, timeout time.Duration) {
   137  	if timeout > 0 {
   138  		ws.SetDeadline(time.Now().Add(timeout))
   139  	}
   140  }
   141  
   142  func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error {
   143  	buf := make([]byte, 2048)
   144  	if ping {
   145  		resetTimeout(ws, timeout)
   146  		if base64Encode {
   147  			if err := websocket.Message.Send(ws, ""); err != nil {
   148  				return err
   149  			}
   150  		} else {
   151  			if err := websocket.Message.Send(ws, []byte{}); err != nil {
   152  				return err
   153  			}
   154  		}
   155  	}
   156  	for {
   157  		resetTimeout(ws, timeout)
   158  		n, err := r.Read(buf)
   159  		if err != nil {
   160  			if err == io.EOF {
   161  				return nil
   162  			}
   163  			return err
   164  		}
   165  		if n > 0 {
   166  			if base64Encode {
   167  				if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil {
   168  					return err
   169  				}
   170  			} else {
   171  				if err := websocket.Message.Send(ws, buf[:n]); err != nil {
   172  					return err
   173  				}
   174  			}
   175  		}
   176  	}
   177  }
   178  

View as plain text