...
1
16
17 package remotecommand
18
19 import (
20 "fmt"
21 "io"
22 "net/http"
23
24 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/util/httpstream"
26 "k8s.io/klog/v2"
27 )
28
29
30
31
32
33 type streamProtocolV1 struct {
34 StreamOptions
35
36 errorStream httpstream.Stream
37 remoteStdin httpstream.Stream
38 remoteStdout httpstream.Stream
39 remoteStderr httpstream.Stream
40 }
41
42 var _ streamProtocolHandler = &streamProtocolV1{}
43
44 func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
45 return &streamProtocolV1{
46 StreamOptions: options,
47 }
48 }
49
50 func (p *streamProtocolV1) stream(conn streamCreator) error {
51 doneChan := make(chan struct{}, 2)
52 errorChan := make(chan error)
53
54 cp := func(s string, dst io.Writer, src io.Reader) {
55 klog.V(6).Infof("Copying %s", s)
56 defer klog.V(6).Infof("Done copying %s", s)
57 if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
58 klog.Errorf("Error copying %s: %v", s, err)
59 }
60 if s == v1.StreamTypeStdout || s == v1.StreamTypeStderr {
61 doneChan <- struct{}{}
62 }
63 }
64
65
66 var err error
67 headers := http.Header{}
68 headers.Set(v1.StreamType, v1.StreamTypeError)
69 p.errorStream, err = conn.CreateStream(headers)
70 if err != nil {
71 return err
72 }
73 defer p.errorStream.Reset()
74
75
76
77
78
79
80
81
82 if p.Stdin != nil {
83 headers.Set(v1.StreamType, v1.StreamTypeStdin)
84 p.remoteStdin, err = conn.CreateStream(headers)
85 if err != nil {
86 return err
87 }
88 defer p.remoteStdin.Reset()
89 }
90
91 if p.Stdout != nil {
92 headers.Set(v1.StreamType, v1.StreamTypeStdout)
93 p.remoteStdout, err = conn.CreateStream(headers)
94 if err != nil {
95 return err
96 }
97 defer p.remoteStdout.Reset()
98 }
99
100 if p.Stderr != nil && !p.Tty {
101 headers.Set(v1.StreamType, v1.StreamTypeStderr)
102 p.remoteStderr, err = conn.CreateStream(headers)
103 if err != nil {
104 return err
105 }
106 defer p.remoteStderr.Reset()
107 }
108
109
110
111
112 go func() {
113 message, err := io.ReadAll(p.errorStream)
114 if err != nil && err != io.EOF {
115 errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
116 return
117 }
118 if len(message) > 0 {
119 errorChan <- fmt.Errorf("Error executing remote command: %s", message)
120 return
121 }
122 }()
123
124 if p.Stdin != nil {
125
126
127
128
129 go cp(v1.StreamTypeStdin, p.remoteStdin, readerWrapper{p.Stdin})
130 }
131
132 waitCount := 0
133 completedStreams := 0
134
135 if p.Stdout != nil {
136 waitCount++
137 go cp(v1.StreamTypeStdout, p.Stdout, p.remoteStdout)
138 }
139
140 if p.Stderr != nil && !p.Tty {
141 waitCount++
142 go cp(v1.StreamTypeStderr, p.Stderr, p.remoteStderr)
143 }
144
145 Loop:
146 for {
147 select {
148 case <-doneChan:
149 completedStreams++
150 if completedStreams == waitCount {
151 break Loop
152 }
153 case err := <-errorChan:
154 return err
155 }
156 }
157
158 return nil
159 }
160
View as plain text