...

Source file src/k8s.io/client-go/tools/remotecommand/v2.go

Documentation: k8s.io/client-go/tools/remotecommand

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package remotecommand
    18  
    19  import (
    20  	"fmt"
    21  	"io"
    22  	"net/http"
    23  	"sync"
    24  
    25  	"k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/util/runtime"
    27  )
    28  
    29  // streamProtocolV2 implements version 2 of the streaming protocol for attach
    30  // and exec. The original streaming protocol was metav1. As a result, this
    31  // version is referred to as version 2, even though it is the first actual
    32  // numbered version.
    33  type streamProtocolV2 struct {
    34  	StreamOptions
    35  
    36  	errorStream  io.Reader
    37  	remoteStdin  io.ReadWriteCloser
    38  	remoteStdout io.Reader
    39  	remoteStderr io.Reader
    40  }
    41  
    42  var _ streamProtocolHandler = &streamProtocolV2{}
    43  
    44  func newStreamProtocolV2(options StreamOptions) streamProtocolHandler {
    45  	return &streamProtocolV2{
    46  		StreamOptions: options,
    47  	}
    48  }
    49  
    50  func (p *streamProtocolV2) createStreams(conn streamCreator) error {
    51  	var err error
    52  	headers := http.Header{}
    53  
    54  	// set up error stream
    55  	headers.Set(v1.StreamType, v1.StreamTypeError)
    56  	p.errorStream, err = conn.CreateStream(headers)
    57  	if err != nil {
    58  		return err
    59  	}
    60  
    61  	// set up stdin stream
    62  	if p.Stdin != nil {
    63  		headers.Set(v1.StreamType, v1.StreamTypeStdin)
    64  		p.remoteStdin, err = conn.CreateStream(headers)
    65  		if err != nil {
    66  			return err
    67  		}
    68  	}
    69  
    70  	// set up stdout stream
    71  	if p.Stdout != nil {
    72  		headers.Set(v1.StreamType, v1.StreamTypeStdout)
    73  		p.remoteStdout, err = conn.CreateStream(headers)
    74  		if err != nil {
    75  			return err
    76  		}
    77  	}
    78  
    79  	// set up stderr stream
    80  	if p.Stderr != nil && !p.Tty {
    81  		headers.Set(v1.StreamType, v1.StreamTypeStderr)
    82  		p.remoteStderr, err = conn.CreateStream(headers)
    83  		if err != nil {
    84  			return err
    85  		}
    86  	}
    87  	return nil
    88  }
    89  
    90  func (p *streamProtocolV2) copyStdin() {
    91  	if p.Stdin != nil {
    92  		var once sync.Once
    93  
    94  		// copy from client's stdin to container's stdin
    95  		go func() {
    96  			defer runtime.HandleCrash()
    97  
    98  			// if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
    99  			// we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
   100  			// the executed command will remain running.
   101  			defer once.Do(func() { p.remoteStdin.Close() })
   102  
   103  			if _, err := io.Copy(p.remoteStdin, readerWrapper{p.Stdin}); err != nil {
   104  				runtime.HandleError(err)
   105  			}
   106  		}()
   107  
   108  		// read from remoteStdin until the stream is closed. this is essential to
   109  		// be able to exit interactive sessions cleanly and not leak goroutines or
   110  		// hang the client's terminal.
   111  		//
   112  		// TODO we aren't using go-dockerclient any more; revisit this to determine if it's still
   113  		// required by engine-api.
   114  		//
   115  		// go-dockerclient's current hijack implementation
   116  		// (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564)
   117  		// waits for all three streams (stdin/stdout/stderr) to finish copying
   118  		// before returning. When hijack finishes copying stdout/stderr, it calls
   119  		// Close() on its side of remoteStdin, which allows this copy to complete.
   120  		// When that happens, we must Close() on our side of remoteStdin, to
   121  		// allow the copy in hijack to complete, and hijack to return.
   122  		go func() {
   123  			defer runtime.HandleCrash()
   124  			defer once.Do(func() { p.remoteStdin.Close() })
   125  
   126  			// this "copy" doesn't actually read anything - it's just here to wait for
   127  			// the server to close remoteStdin.
   128  			if _, err := io.Copy(io.Discard, p.remoteStdin); err != nil {
   129  				runtime.HandleError(err)
   130  			}
   131  		}()
   132  	}
   133  }
   134  
   135  func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
   136  	if p.Stdout == nil {
   137  		return
   138  	}
   139  
   140  	wg.Add(1)
   141  	go func() {
   142  		defer runtime.HandleCrash()
   143  		defer wg.Done()
   144  		// make sure, packet in queue can be consumed.
   145  		// block in queue may lead to deadlock in conn.server
   146  		// issue: https://github.com/kubernetes/kubernetes/issues/96339
   147  		defer io.Copy(io.Discard, p.remoteStdout)
   148  
   149  		if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
   150  			runtime.HandleError(err)
   151  		}
   152  	}()
   153  }
   154  
   155  func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
   156  	if p.Stderr == nil || p.Tty {
   157  		return
   158  	}
   159  
   160  	wg.Add(1)
   161  	go func() {
   162  		defer runtime.HandleCrash()
   163  		defer wg.Done()
   164  		defer io.Copy(io.Discard, p.remoteStderr)
   165  
   166  		if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
   167  			runtime.HandleError(err)
   168  		}
   169  	}()
   170  }
   171  
   172  func (p *streamProtocolV2) stream(conn streamCreator) error {
   173  	if err := p.createStreams(conn); err != nil {
   174  		return err
   175  	}
   176  
   177  	// now that all the streams have been created, proceed with reading & copying
   178  
   179  	errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
   180  
   181  	p.copyStdin()
   182  
   183  	var wg sync.WaitGroup
   184  	p.copyStdout(&wg)
   185  	p.copyStderr(&wg)
   186  
   187  	// we're waiting for stdout/stderr to finish copying
   188  	wg.Wait()
   189  
   190  	// waits for errorStream to finish reading with an error or nil
   191  	return <-errorChan
   192  }
   193  
   194  // errorDecoderV2 interprets the error channel data as plain text.
   195  type errorDecoderV2 struct{}
   196  
   197  func (d *errorDecoderV2) decode(message []byte) error {
   198  	return fmt.Errorf("error executing remote command: %s", message)
   199  }
   200  

View as plain text