...

Source file src/k8s.io/client-go/tools/remotecommand/v1.go

Documentation: k8s.io/client-go/tools/remotecommand

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package remotecommand
    18  
    19  import (
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  
    24  	"k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/util/httpstream"
    26  	"k8s.io/klog/v2"
    27  )
    28  
    29  // streamProtocolV1 implements the first version of the streaming exec & attach
    30  // protocol. This version has some bugs, such as not being able to detect when
    31  // non-interactive stdin data has ended. See https://issues.k8s.io/13394 and
    32  // https://issues.k8s.io/13395 for more details.
    33  type streamProtocolV1 struct {
    34  	StreamOptions
    35  
    36  	errorStream  httpstream.Stream
    37  	remoteStdin  httpstream.Stream
    38  	remoteStdout httpstream.Stream
    39  	remoteStderr httpstream.Stream
    40  }
    41  
    42  var _ streamProtocolHandler = &streamProtocolV1{}
    43  
    44  func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
    45  	return &streamProtocolV1{
    46  		StreamOptions: options,
    47  	}
    48  }
    49  
    50  func (p *streamProtocolV1) stream(conn streamCreator) error {
    51  	doneChan := make(chan struct{}, 2)
    52  	errorChan := make(chan error)
    53  
    54  	cp := func(s string, dst io.Writer, src io.Reader) {
    55  		klog.V(6).Infof("Copying %s", s)
    56  		defer klog.V(6).Infof("Done copying %s", s)
    57  		if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
    58  			klog.Errorf("Error copying %s: %v", s, err)
    59  		}
    60  		if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr {
    61  			doneChan <- struct{}{}
    62  		}
    63  	}
    64  
    65  	// set up all the streams first
    66  	var err error
    67  	headers := http.Header{}
    68  	headers.Set(v1.StreamType, v1.StreamTypeError)
    69  	p.errorStream, err = conn.CreateStream(headers)
    70  	if err != nil {
    71  		return err
    72  	}
    73  	defer p.errorStream.Reset()
    74  
    75  	// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
    76  	// goroutines until it's received all of the streams. If the client creates the stdin stream and
    77  	// immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
    78  	// spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
    79  	// getting processed because the server hasn't started its copying, and it won't do that until it
    80  	// gets all the streams. By creating all the streams first, we ensure that the server is ready to
    81  	// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
    82  	if p.Stdin != nil {
    83  		headers.Set(v1.StreamType, v1.StreamTypeStdin)
    84  		p.remoteStdin, err = conn.CreateStream(headers)
    85  		if err != nil {
    86  			return err
    87  		}
    88  		defer p.remoteStdin.Reset()
    89  	}
    90  
    91  	if p.Stdout != nil {
    92  		headers.Set(v1.StreamType, v1.StreamTypeStdout)
    93  		p.remoteStdout, err = conn.CreateStream(headers)
    94  		if err != nil {
    95  			return err
    96  		}
    97  		defer p.remoteStdout.Reset()
    98  	}
    99  
   100  	if p.Stderr != nil && !p.Tty {
   101  		headers.Set(v1.StreamType, v1.StreamTypeStderr)
   102  		p.remoteStderr, err = conn.CreateStream(headers)
   103  		if err != nil {
   104  			return err
   105  		}
   106  		defer p.remoteStderr.Reset()
   107  	}
   108  
   109  	// now that all the streams have been created, proceed with reading & copying
   110  
   111  	// always read from errorStream
   112  	go func() {
   113  		message, err := io.ReadAll(p.errorStream)
   114  		if err != nil && err != io.EOF {
   115  			errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
   116  			return
   117  		}
   118  		if len(message) > 0 {
   119  			errorChan <- fmt.Errorf("Error executing remote command: %s", message)
   120  			return
   121  		}
   122  	}()
   123  
   124  	if p.Stdin != nil {
   125  		// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
   126  		// because stdin is not closed until the process exits. If we try to call
   127  		// stdin.Close(), it returns no error but doesn't unblock the copy. It will
   128  		// exit when the process exits, instead.
   129  		go cp(v1.StreamTypeStdin, p.remoteStdin, readerWrapper{p.Stdin})
   130  	}
   131  
   132  	waitCount := 0
   133  	completedStreams := 0
   134  
   135  	if p.Stdout != nil {
   136  		waitCount++
   137  		go cp(v1.StreamTypeStdout, p.Stdout, p.remoteStdout)
   138  	}
   139  
   140  	if p.Stderr != nil && !p.Tty {
   141  		waitCount++
   142  		go cp(v1.StreamTypeStderr, p.Stderr, p.remoteStderr)
   143  	}
   144  
   145  Loop:
   146  	for {
   147  		select {
   148  		case <-doneChan:
   149  			completedStreams++
   150  			if completedStreams == waitCount {
   151  				break Loop
   152  			}
   153  		case err := <-errorChan:
   154  			return err
   155  		}
   156  	}
   157  
   158  	return nil
   159  }
   160  

View as plain text