...
1 package internal
2
3 import (
4 "bytes"
5 "io"
6 "os"
7 "time"
8 )
9
10 const BAILOUT_TIME = 1 * time.Second
11 const BAILOUT_MESSAGE = `Ginkgo detected an issue while intercepting output.
12
13 When running in parallel, Ginkgo captures stdout and stderr output
14 and attaches it to the running spec. It looks like that process is getting
15 stuck for this suite.
16
17 This usually happens if you, or a library you are using, spin up an external
18 process and set cmd.Stdout = os.Stdout and/or cmd.Stderr = os.Stderr. This
19 causes the external process to keep Ginkgo's output interceptor pipe open and
20 causes output interception to hang.
21
22 Ginkgo has detected this and shortcircuited the capture process. The specs
23 will continue running after this message however output from the external
24 process that caused this issue will not be captured.
25
26 You have several options to fix this. In preferred order they are:
27
28 1. Pass GinkgoWriter instead of os.Stdout or os.Stderr to your process.
29 2. Ensure your process exits before the current spec completes. If your
30 process is long-lived and must cross spec boundaries, this option won't
31 work for you.
32 3. Pause Ginkgo's output interceptor before starting your process and then
33 resume it after. Use PauseOutputInterception() and ResumeOutputInterception()
34 to do this.
35 4. Set --output-interceptor-mode=none when running your Ginkgo suite. This will
36 turn off all output interception but allow specs to run in parallel without this
37 issue. You may miss important output if you do this including output from Go's
38 race detector.
39
40 More details on issue #851 - https://github.com/onsi/ginkgo/issues/851
41 `
42
43
47 type OutputInterceptor interface {
48 StartInterceptingOutput()
49 StartInterceptingOutputAndForwardTo(io.Writer)
50 StopInterceptingAndReturnOutput() string
51
52 PauseIntercepting()
53 ResumeIntercepting()
54
55 Shutdown()
56 }
57
58 type NoopOutputInterceptor struct{}
59
60 func (interceptor NoopOutputInterceptor) StartInterceptingOutput() {}
61 func (interceptor NoopOutputInterceptor) StartInterceptingOutputAndForwardTo(io.Writer) {}
62 func (interceptor NoopOutputInterceptor) StopInterceptingAndReturnOutput() string { return "" }
63 func (interceptor NoopOutputInterceptor) PauseIntercepting() {}
64 func (interceptor NoopOutputInterceptor) ResumeIntercepting() {}
65 func (interceptor NoopOutputInterceptor) Shutdown() {}
66
67 type pipePair struct {
68 reader *os.File
69 writer *os.File
70 }
71
72 func startPipeFactory(pipeChannel chan pipePair, shutdown chan interface{}) {
73 for {
74
75 pair := pipePair{}
76 pair.reader, pair.writer, _ = os.Pipe()
77 select {
78
79 case pipeChannel <- pair:
80 continue
81
82 case <-shutdown:
83 pair.reader.Close()
84 pair.writer.Close()
85 return
86 }
87 }
88 }
89
90 type interceptorImplementation interface {
91 CreateStdoutStderrClones() (*os.File, *os.File)
92 ConnectPipeToStdoutStderr(*os.File)
93 RestoreStdoutStderrFromClones(*os.File, *os.File)
94 ShutdownClones(*os.File, *os.File)
95 }
96
97 type genericOutputInterceptor struct {
98 intercepting bool
99
100 stdoutClone *os.File
101 stderrClone *os.File
102 pipe pipePair
103
104 shutdown chan interface{}
105 emergencyBailout chan interface{}
106 pipeChannel chan pipePair
107 interceptedContent chan string
108
109 forwardTo io.Writer
110 accumulatedOutput string
111
112 implementation interceptorImplementation
113 }
114
115 func (interceptor *genericOutputInterceptor) StartInterceptingOutput() {
116 interceptor.StartInterceptingOutputAndForwardTo(io.Discard)
117 }
118
119 func (interceptor *genericOutputInterceptor) StartInterceptingOutputAndForwardTo(w io.Writer) {
120 if interceptor.intercepting {
121 return
122 }
123 interceptor.accumulatedOutput = ""
124 interceptor.forwardTo = w
125 interceptor.ResumeIntercepting()
126 }
127
128 func (interceptor *genericOutputInterceptor) StopInterceptingAndReturnOutput() string {
129 if interceptor.intercepting {
130 interceptor.PauseIntercepting()
131 }
132 return interceptor.accumulatedOutput
133 }
134
135 func (interceptor *genericOutputInterceptor) ResumeIntercepting() {
136 if interceptor.intercepting {
137 return
138 }
139 interceptor.intercepting = true
140 if interceptor.stdoutClone == nil {
141 interceptor.stdoutClone, interceptor.stderrClone = interceptor.implementation.CreateStdoutStderrClones()
142 interceptor.shutdown = make(chan interface{})
143 go startPipeFactory(interceptor.pipeChannel, interceptor.shutdown)
144 }
145
146
147
148 interceptor.pipe = <-interceptor.pipeChannel
149
150 interceptor.emergencyBailout = make(chan interface{})
151
152
153 go func() {
154 buffer := &bytes.Buffer{}
155 destination := io.MultiWriter(buffer, interceptor.forwardTo)
156 copyFinished := make(chan interface{})
157 reader := interceptor.pipe.reader
158 go func() {
159 io.Copy(destination, reader)
160 reader.Close()
161 close(copyFinished)
162 }()
163 select {
164 case <-copyFinished:
165 interceptor.interceptedContent <- buffer.String()
166 case <-interceptor.emergencyBailout:
167 interceptor.interceptedContent <- ""
168 }
169 }()
170
171 interceptor.implementation.ConnectPipeToStdoutStderr(interceptor.pipe.writer)
172 }
173
174 func (interceptor *genericOutputInterceptor) PauseIntercepting() {
175 if !interceptor.intercepting {
176 return
177 }
178
179
180 interceptor.pipe.writer.Close()
181
182
183
184 interceptor.implementation.RestoreStdoutStderrFromClones(interceptor.stdoutClone, interceptor.stderrClone)
185
186 var content string
187 select {
188 case content = <-interceptor.interceptedContent:
189 case <-time.After(BAILOUT_TIME):
190
203 close(interceptor.emergencyBailout)
204 content = <-interceptor.interceptedContent + BAILOUT_MESSAGE
205 }
206
207 interceptor.accumulatedOutput += content
208 interceptor.intercepting = false
209 }
210
211 func (interceptor *genericOutputInterceptor) Shutdown() {
212 interceptor.PauseIntercepting()
213
214 if interceptor.stdoutClone != nil {
215 close(interceptor.shutdown)
216 interceptor.implementation.ShutdownClones(interceptor.stdoutClone, interceptor.stderrClone)
217 interceptor.stdoutClone = nil
218 interceptor.stderrClone = nil
219 }
220 }
221
222
223 func NewOSGlobalReassigningOutputInterceptor() OutputInterceptor {
224 return &genericOutputInterceptor{
225 interceptedContent: make(chan string),
226 pipeChannel: make(chan pipePair),
227 shutdown: make(chan interface{}),
228 implementation: &osGlobalReassigningOutputInterceptorImpl{},
229 }
230 }
231
232 type osGlobalReassigningOutputInterceptorImpl struct{}
233
234 func (impl *osGlobalReassigningOutputInterceptorImpl) CreateStdoutStderrClones() (*os.File, *os.File) {
235 return os.Stdout, os.Stderr
236 }
237
238 func (impl *osGlobalReassigningOutputInterceptorImpl) ConnectPipeToStdoutStderr(pipeWriter *os.File) {
239 os.Stdout = pipeWriter
240 os.Stderr = pipeWriter
241 }
242
243 func (impl *osGlobalReassigningOutputInterceptorImpl) RestoreStdoutStderrFromClones(stdoutClone *os.File, stderrClone *os.File) {
244 os.Stdout = stdoutClone
245 os.Stderr = stderrClone
246 }
247
248 func (impl *osGlobalReassigningOutputInterceptorImpl) ShutdownClones(_ *os.File, _ *os.File) {
249
250 }
251
View as plain text