...

Source file src/k8s.io/client-go/tools/remotecommand/spdy.go

Documentation: k8s.io/client-go/tools/remotecommand

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package remotecommand
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"net/url"
    24  
    25  	"k8s.io/apimachinery/pkg/util/httpstream"
    26  	"k8s.io/apimachinery/pkg/util/remotecommand"
    27  	restclient "k8s.io/client-go/rest"
    28  	"k8s.io/client-go/transport/spdy"
    29  	"k8s.io/klog/v2"
    30  )
    31  
    32  // spdyStreamExecutor handles transporting standard shell streams over an httpstream connection.
    33  type spdyStreamExecutor struct {
    34  	upgrader  spdy.Upgrader
    35  	transport http.RoundTripper
    36  
    37  	method          string
    38  	url             *url.URL
    39  	protocols       []string
    40  	rejectRedirects bool // if true, receiving redirect from upstream is an error
    41  }
    42  
    43  // NewSPDYExecutor connects to the provided server and upgrades the connection to
    44  // multiplexed bidirectional streams.
    45  func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
    46  	wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  	return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
    51  }
    52  
    53  // NewSPDYExecutorRejectRedirects returns an Executor that will upgrade the future
    54  // connection to a SPDY bi-directional streaming connection when calling "Stream" (deprecated)
    55  // or "StreamWithContext" (preferred). Additionally, if the upstream server returns a redirect
    56  // during the attempted upgrade in these "Stream" calls, an error is returned.
    57  func NewSPDYExecutorRejectRedirects(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
    58  	executor, err := NewSPDYExecutorForTransports(transport, upgrader, method, url)
    59  	if err != nil {
    60  		return nil, err
    61  	}
    62  	spdyExecutor := executor.(*spdyStreamExecutor)
    63  	spdyExecutor.rejectRedirects = true
    64  	return spdyExecutor, nil
    65  }
    66  
    67  // NewSPDYExecutorForTransports connects to the provided server using the given transport,
    68  // upgrades the response using the given upgrader to multiplexed bidirectional streams.
    69  func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
    70  	return NewSPDYExecutorForProtocols(
    71  		transport, upgrader, method, url,
    72  		remotecommand.StreamProtocolV5Name,
    73  		remotecommand.StreamProtocolV4Name,
    74  		remotecommand.StreamProtocolV3Name,
    75  		remotecommand.StreamProtocolV2Name,
    76  		remotecommand.StreamProtocolV1Name,
    77  	)
    78  }
    79  
    80  // NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
    81  // multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
    82  // callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
    83  func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
    84  	return &spdyStreamExecutor{
    85  		upgrader:  upgrader,
    86  		transport: transport,
    87  		method:    method,
    88  		url:       url,
    89  		protocols: protocols,
    90  	}, nil
    91  }
    92  
    93  // Stream opens a protocol streamer to the server and streams until a client closes
    94  // the connection or the server disconnects.
    95  func (e *spdyStreamExecutor) Stream(options StreamOptions) error {
    96  	return e.StreamWithContext(context.Background(), options)
    97  }
    98  
    99  // newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
   100  func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
   101  	req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
   102  	if err != nil {
   103  		return nil, nil, fmt.Errorf("error creating request: %v", err)
   104  	}
   105  
   106  	client := http.Client{Transport: e.transport}
   107  	if e.rejectRedirects {
   108  		client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
   109  			return fmt.Errorf("redirect not allowed")
   110  		}
   111  	}
   112  	conn, protocol, err := spdy.Negotiate(
   113  		e.upgrader,
   114  		&client,
   115  		req,
   116  		e.protocols...,
   117  	)
   118  	if err != nil {
   119  		return nil, nil, err
   120  	}
   121  
   122  	var streamer streamProtocolHandler
   123  
   124  	switch protocol {
   125  	case remotecommand.StreamProtocolV5Name:
   126  		streamer = newStreamProtocolV5(options)
   127  	case remotecommand.StreamProtocolV4Name:
   128  		streamer = newStreamProtocolV4(options)
   129  	case remotecommand.StreamProtocolV3Name:
   130  		streamer = newStreamProtocolV3(options)
   131  	case remotecommand.StreamProtocolV2Name:
   132  		streamer = newStreamProtocolV2(options)
   133  	case "":
   134  		klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
   135  		fallthrough
   136  	case remotecommand.StreamProtocolV1Name:
   137  		streamer = newStreamProtocolV1(options)
   138  	}
   139  
   140  	return conn, streamer, nil
   141  }
   142  
   143  // StreamWithContext opens a protocol streamer to the server and streams until a client closes
   144  // the connection or the server disconnects or the context is done.
   145  func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
   146  	conn, streamer, err := e.newConnectionAndStream(ctx, options)
   147  	if err != nil {
   148  		return err
   149  	}
   150  	defer conn.Close()
   151  
   152  	panicChan := make(chan any, 1)
   153  	errorChan := make(chan error, 1)
   154  	go func() {
   155  		defer func() {
   156  			if p := recover(); p != nil {
   157  				panicChan <- p
   158  			}
   159  		}()
   160  		errorChan <- streamer.stream(conn)
   161  	}()
   162  
   163  	select {
   164  	case p := <-panicChan:
   165  		panic(p)
   166  	case err := <-errorChan:
   167  		return err
   168  	case <-ctx.Done():
   169  		return ctx.Err()
   170  	}
   171  }
   172  

View as plain text