...
1
16
17 package remotecommand
18
19 import (
20 "encoding/json"
21 "io"
22 "net/http"
23 "sync"
24
25 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/util/runtime"
27 )
28
29
30
31 type streamProtocolV3 struct {
32 *streamProtocolV2
33
34 resizeStream io.Writer
35 }
36
37 var _ streamProtocolHandler = &streamProtocolV3{}
38
39 func newStreamProtocolV3(options StreamOptions) streamProtocolHandler {
40 return &streamProtocolV3{
41 streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2),
42 }
43 }
44
45 func (p *streamProtocolV3) createStreams(conn streamCreator) error {
46
47 if err := p.streamProtocolV2.createStreams(conn); err != nil {
48 return err
49 }
50
51
52 if p.Tty {
53 headers := http.Header{}
54 headers.Set(v1.StreamType, v1.StreamTypeResize)
55 var err error
56 p.resizeStream, err = conn.CreateStream(headers)
57 if err != nil {
58 return err
59 }
60 }
61
62 return nil
63 }
64
65 func (p *streamProtocolV3) handleResizes() {
66 if p.resizeStream == nil || p.TerminalSizeQueue == nil {
67 return
68 }
69 go func() {
70 defer runtime.HandleCrash()
71
72 encoder := json.NewEncoder(p.resizeStream)
73 for {
74 size := p.TerminalSizeQueue.Next()
75 if size == nil {
76 return
77 }
78 if err := encoder.Encode(&size); err != nil {
79 runtime.HandleError(err)
80 }
81 }
82 }()
83 }
84
85 func (p *streamProtocolV3) stream(conn streamCreator) error {
86 if err := p.createStreams(conn); err != nil {
87 return err
88 }
89
90
91
92 errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
93
94 p.handleResizes()
95
96 p.copyStdin()
97
98 var wg sync.WaitGroup
99 p.copyStdout(&wg)
100 p.copyStderr(&wg)
101
102
103 wg.Wait()
104
105
106 return <-errorChan
107 }
108
109 type errorDecoderV3 struct {
110 errorDecoderV2
111 }
112
View as plain text