1 package parallel_support_test
2
3 import (
4 "fmt"
5 "os"
6 "time"
7
8 . "github.com/onsi/ginkgo/v2"
9 . "github.com/onsi/gomega"
10 "github.com/onsi/gomega/gbytes"
11
12 "github.com/onsi/ginkgo/v2/internal"
13 "github.com/onsi/ginkgo/v2/internal/parallel_support"
14 . "github.com/onsi/ginkgo/v2/internal/test_helpers"
15 "github.com/onsi/ginkgo/v2/types"
16 )
17
18 type ColorableStringerStruct struct {
19 Label string
20 Count int
21 }
22
23 func (s ColorableStringerStruct) String() string {
24 return fmt.Sprintf("%s %d", s.Label, s.Count)
25 }
26
27 func (s ColorableStringerStruct) ColorableString() string {
28 return fmt.Sprintf("{{red}}%s {{green}}%d{{/}}", s.Label, s.Count)
29 }
30
31 var _ = Describe("The Parallel Support Client & Server", func() {
32 for _, protocol := range []string{"RPC", "HTTP"} {
33 protocol := protocol
34 Describe(fmt.Sprintf("The %s protocol", protocol), Label(protocol), func() {
35 var (
36 server parallel_support.Server
37 client parallel_support.Client
38 reporter *FakeReporter
39 buffer *gbytes.Buffer
40 )
41
42 BeforeEach(func() {
43 GinkgoT().Setenv("GINKGO_PARALLEL_PROTOCOL", protocol)
44
45 var err error
46 reporter = NewFakeReporter()
47 server, err = parallel_support.NewServer(3, reporter)
48 Ω(err).ShouldNot(HaveOccurred())
49 server.Start()
50
51 buffer = gbytes.NewBuffer()
52 server.SetOutputDestination(buffer)
53
54 client = parallel_support.NewClient(server.Address())
55 Eventually(client.Connect).Should(BeTrue())
56
57 DeferCleanup(server.Close)
58 DeferCleanup(client.Close)
59 })
60
61 Describe("Reporting endpoints", func() {
62 var beginReport, thirdBeginReport types.Report
63 var endReport1, endReport2, endReport3 types.Report
64 var specReportA, specReportB, specReportC types.SpecReport
65
66 var t time.Time
67
68 BeforeEach(func() {
69 beginReport = types.Report{SuiteDescription: "my sweet suite"}
70 thirdBeginReport = types.Report{SuiteDescription: "last one in gets forwarded"}
71
72 specReportA = types.SpecReport{LeafNodeText: "A"}
73 specReportB = types.SpecReport{LeafNodeText: "B"}
74 specReportC = types.SpecReport{LeafNodeText: "C"}
75
76 t = time.Now()
77
78 endReport1 = types.Report{StartTime: t.Add(-time.Second), EndTime: t.Add(time.Second), SuiteSucceeded: true, SpecReports: types.SpecReports{specReportA}}
79 endReport2 = types.Report{StartTime: t.Add(-2 * time.Second), EndTime: t.Add(time.Second), SuiteSucceeded: true, SpecReports: types.SpecReports{specReportB}}
80 endReport3 = types.Report{StartTime: t.Add(-time.Second), EndTime: t.Add(2 * time.Second), SuiteSucceeded: false, SpecReports: types.SpecReports{specReportC}}
81 })
82
83 Context("before all procs have reported SuiteWillBegin", func() {
84 BeforeEach(func() {
85 Ω(client.PostSuiteWillBegin(beginReport)).Should(Succeed())
86 Ω(client.PostDidRun(specReportA)).Should(Succeed())
87 Ω(client.PostSuiteWillBegin(beginReport)).Should(Succeed())
88 Ω(client.PostDidRun(specReportB)).Should(Succeed())
89 })
90
91 It("should not forward anything to the attached reporter", func() {
92 Ω(reporter.Begin).Should(BeZero())
93 Ω(reporter.Will).Should(BeEmpty())
94 Ω(reporter.Did).Should(BeEmpty())
95 })
96
97 Context("when the final proc reports SuiteWillBegin", func() {
98 BeforeEach(func() {
99 Ω(client.PostSuiteWillBegin(thirdBeginReport)).Should(Succeed())
100 })
101
102 It("forwards to SuiteWillBegin and catches up on any received summaries", func() {
103 Ω(reporter.Begin).Should(Equal(thirdBeginReport))
104 Ω(reporter.Will.Names()).Should(ConsistOf("A", "B"))
105 Ω(reporter.Did.Names()).Should(ConsistOf("A", "B"))
106 })
107
108 Context("any subsequent summaries", func() {
109 BeforeEach(func() {
110 Ω(client.PostDidRun(specReportC)).Should(Succeed())
111 })
112
113 It("are forwarded immediately", func() {
114 Ω(reporter.Will.Names()).Should(ConsistOf("A", "B", "C"))
115 Ω(reporter.Did.Names()).Should(ConsistOf("A", "B", "C"))
116 })
117 })
118
119 Context("when SuiteDidEnd start arriving", func() {
120 BeforeEach(func() {
121 Ω(client.PostSuiteDidEnd(endReport1)).Should(Succeed())
122 Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
123 })
124
125 It("does not forward them yet...", func() {
126 Ω(reporter.End).Should(BeZero())
127 })
128
129 It("doesn't signal it's done", func() {
130 Ω(server.GetSuiteDone()).ShouldNot(BeClosed())
131 })
132
133 Context("when the final SuiteDidEnd arrive", func() {
134 BeforeEach(func() {
135 Ω(client.PostSuiteDidEnd(endReport3)).Should(Succeed())
136 })
137
138 It("forwards the aggregation of all received end summaries", func() {
139 Ω(reporter.End.StartTime.Unix()).Should(BeNumerically("~", t.Add(-2*time.Second).Unix()))
140 Ω(reporter.End.EndTime.Unix()).Should(BeNumerically("~", t.Add(2*time.Second).Unix()))
141 Ω(reporter.End.RunTime).Should(BeNumerically("~", 4*time.Second))
142 Ω(reporter.End.SuiteSucceeded).Should(BeFalse())
143 Ω(reporter.End.SpecReports).Should(ConsistOf(specReportA, specReportB, specReportC))
144 })
145
146 It("should signal it's done", func() {
147 Ω(server.GetSuiteDone()).Should(BeClosed())
148 })
149 })
150 })
151 })
152 })
153 })
154
155 Describe("supporting ReportEntries (which RPC struggled with when I first implemented it)", func() {
156 BeforeEach(func() {
157 Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
158 Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
159 Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
160 })
161 It("can pass in ReportEntries that include custom types", func() {
162 cl := types.NewCodeLocation(0)
163 entry, err := internal.NewReportEntry("No Value Entry", cl)
164 Ω(err).ShouldNot(HaveOccurred())
165 Ω(client.PostDidRun(types.SpecReport{
166 LeafNodeText: "no-value",
167 ReportEntries: types.ReportEntries{entry},
168 })).Should(Succeed())
169
170 entry, err = internal.NewReportEntry("String Value Entry", cl, "The String")
171 Ω(err).ShouldNot(HaveOccurred())
172 Ω(client.PostDidRun(types.SpecReport{
173 LeafNodeText: "string-value",
174 ReportEntries: types.ReportEntries{entry},
175 })).Should(Succeed())
176
177 entry, err = internal.NewReportEntry("Custom Type Value Entry", cl, ColorableStringerStruct{Label: "apples", Count: 17})
178 Ω(err).ShouldNot(HaveOccurred())
179 Ω(client.PostDidRun(types.SpecReport{
180 LeafNodeText: "custom-value",
181 ReportEntries: types.ReportEntries{entry},
182 })).Should(Succeed())
183
184 Ω(reporter.Did.Find("no-value").ReportEntries[0].Name).Should(Equal("No Value Entry"))
185 Ω(reporter.Did.Find("no-value").ReportEntries[0].StringRepresentation()).Should(Equal(""))
186
187 Ω(reporter.Did.Find("string-value").ReportEntries[0].Name).Should(Equal("String Value Entry"))
188 Ω(reporter.Did.Find("string-value").ReportEntries[0].StringRepresentation()).Should(Equal("The String"))
189
190 Ω(reporter.Did.Find("custom-value").ReportEntries[0].Name).Should(Equal("Custom Type Value Entry"))
191 Ω(reporter.Did.Find("custom-value").ReportEntries[0].StringRepresentation()).Should(Equal("{{red}}apples {{green}}17{{/}}"))
192 })
193 })
194
195 Describe("Streaming output", func() {
196 It("is configured to stream to stdout", func() {
197 server, err := parallel_support.NewServer(3, reporter)
198 Ω(err).ShouldNot(HaveOccurred())
199 Ω(server.GetOutputDestination().(*os.File).Fd()).Should(Equal(uintptr(1)))
200 })
201
202 It("streams output to the provided buffer", func() {
203 n, err := client.Write([]byte("hello"))
204 Ω(n).Should(Equal(5))
205 Ω(err).ShouldNot(HaveOccurred())
206 Ω(buffer).Should(gbytes.Say("hello"))
207 })
208 })
209
210 Describe("progress reports", func() {
211 It("can emit progress reports", func() {
212 pr := types.ProgressReport{LeafNodeText: "hola"}
213 Ω(client.PostEmitProgressReport(pr)).Should(Succeed())
214 Ω(reporter.ProgressReports).Should(ConsistOf(pr))
215 })
216 })
217
218 Describe("Synchronization endpoints", func() {
219 var proc1Exited, proc2Exited, proc3Exited chan interface{}
220 BeforeEach(func() {
221 proc1Exited, proc2Exited, proc3Exited = make(chan interface{}), make(chan interface{}), make(chan interface{})
222 aliveFunc := func(c chan interface{}) func() bool {
223 return func() bool {
224 select {
225 case <-c:
226 return false
227 default:
228 return true
229 }
230 }
231 }
232 server.RegisterAlive(1, aliveFunc(proc1Exited))
233 server.RegisterAlive(2, aliveFunc(proc2Exited))
234 server.RegisterAlive(3, aliveFunc(proc3Exited))
235 })
236
237 Describe("Managing ReportBeforeSuite synchronization", func() {
238 Context("when proc 1 succeeds", func() {
239 It("passes that success along to other procs", func() {
240 Ω(client.PostReportBeforeSuiteCompleted(types.SpecStatePassed)).Should(Succeed())
241 state, err := client.BlockUntilReportBeforeSuiteCompleted()
242 Ω(state).Should(Equal(types.SpecStatePassed))
243 Ω(err).ShouldNot(HaveOccurred())
244 })
245 })
246
247 Context("when proc 1 fails", func() {
248 It("passes that state information along to the other procs", func() {
249 Ω(client.PostReportBeforeSuiteCompleted(types.SpecStateFailed)).Should(Succeed())
250 state, err := client.BlockUntilReportBeforeSuiteCompleted()
251 Ω(state).Should(Equal(types.SpecStateFailed))
252 Ω(err).ShouldNot(HaveOccurred())
253 })
254 })
255
256 Context("when proc 1 disappears before reporting back", func() {
257 It("returns a meaningful error", func() {
258 close(proc1Exited)
259 state, err := client.BlockUntilReportBeforeSuiteCompleted()
260 Ω(state).Should(Equal(types.SpecStateFailed))
261 Ω(err).ShouldNot(HaveOccurred())
262 })
263 })
264
265 Context("when proc 1 hasn't responded yet", func() {
266 It("blocks until it does", func() {
267 done := make(chan interface{})
268 go func() {
269 defer GinkgoRecover()
270 state, err := client.BlockUntilReportBeforeSuiteCompleted()
271 Ω(state).Should(Equal(types.SpecStatePassed))
272 Ω(err).ShouldNot(HaveOccurred())
273 close(done)
274 }()
275 Consistently(done).ShouldNot(BeClosed())
276 Ω(client.PostReportBeforeSuiteCompleted(types.SpecStatePassed)).Should(Succeed())
277 Eventually(done).Should(BeClosed())
278 })
279 })
280 })
281
282 Describe("Managing SynchronizedBeforeSuite synchronization", func() {
283 Context("when proc 1 succeeds and returns data", func() {
284 It("passes that data along to other procs", func() {
285 Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, []byte("hello there"))).Should(Succeed())
286 state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
287 Ω(state).Should(Equal(types.SpecStatePassed))
288 Ω(data).Should(Equal([]byte("hello there")))
289 Ω(err).ShouldNot(HaveOccurred())
290 })
291 })
292
293 Context("when proc 1 succeeds and the data happens to be nil", func() {
294 It("passes reports success and returns nil", func() {
295 Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, nil)).Should(Succeed())
296 state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
297 Ω(state).Should(Equal(types.SpecStatePassed))
298 Ω(data).Should(BeNil())
299 Ω(err).ShouldNot(HaveOccurred())
300 })
301 })
302
303 Context("when proc 1 is skipped", func() {
304 It("passes that state information along to the other procs", func() {
305 Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStateSkipped, nil)).Should(Succeed())
306 state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
307 Ω(state).Should(Equal(types.SpecStateSkipped))
308 Ω(data).Should(BeNil())
309 Ω(err).ShouldNot(HaveOccurred())
310 })
311 })
312
313 Context("when proc 1 fails", func() {
314 It("passes that state information along to the other procs", func() {
315 Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStateFailed, nil)).Should(Succeed())
316 state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
317 Ω(state).Should(Equal(types.SpecStateFailed))
318 Ω(data).Should(BeNil())
319 Ω(err).ShouldNot(HaveOccurred())
320 })
321 })
322
323 Context("when proc 1 disappears before reporting back", func() {
324 It("returns a meaningful error", func() {
325 close(proc1Exited)
326 state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
327 Ω(state).Should(Equal(types.SpecStateInvalid))
328 Ω(data).Should(BeNil())
329 Ω(err).Should(MatchError(types.GinkgoErrors.SynchronizedBeforeSuiteDisappearedOnProc1()))
330 })
331 })
332
333 Context("when proc 1 hasn't responded yet", func() {
334 It("blocks until it does", func() {
335 done := make(chan interface{})
336 go func() {
337 defer GinkgoRecover()
338 state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
339 Ω(state).Should(Equal(types.SpecStatePassed))
340 Ω(data).Should(Equal([]byte("hello there")))
341 Ω(err).ShouldNot(HaveOccurred())
342 close(done)
343 }()
344 Consistently(done).ShouldNot(BeClosed())
345 Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, []byte("hello there"))).Should(Succeed())
346 Eventually(done).Should(BeClosed())
347 })
348 })
349 })
350
351 Describe("BlockUntilNonprimaryProcsHaveFinished", func() {
352 It("blocks until non-primary procs exit", func() {
353 done := make(chan interface{})
354 go func() {
355 defer GinkgoRecover()
356 Ω(client.BlockUntilNonprimaryProcsHaveFinished()).Should(Succeed())
357 close(done)
358 }()
359 Consistently(done).ShouldNot(BeClosed())
360 close(proc2Exited)
361 Consistently(done).ShouldNot(BeClosed())
362 close(proc3Exited)
363 Eventually(done).Should(BeClosed())
364 })
365 })
366
367 Describe("BlockUntilAggregatedNonprimaryProcsReport", func() {
368 var specReportA, specReportB types.SpecReport
369 var endReport2, endReport3 types.Report
370
371 BeforeEach(func() {
372 specReportA = types.SpecReport{LeafNodeText: "A"}
373 specReportB = types.SpecReport{LeafNodeText: "B"}
374 endReport2 = types.Report{SpecReports: types.SpecReports{specReportA}}
375 endReport3 = types.Report{SpecReports: types.SpecReports{specReportB}}
376 })
377
378 It("blocks until all non-primary procs exit, then returns the aggregated report", func() {
379 done := make(chan interface{})
380 go func() {
381 defer GinkgoRecover()
382 report, err := client.BlockUntilAggregatedNonprimaryProcsReport()
383 Ω(err).ShouldNot(HaveOccurred())
384 Ω(report.SpecReports).Should(ConsistOf(specReportA, specReportB))
385 close(done)
386 }()
387 Consistently(done).ShouldNot(BeClosed())
388
389 Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
390 close(proc2Exited)
391 Consistently(done).ShouldNot(BeClosed())
392
393 Ω(client.PostSuiteDidEnd(endReport3)).Should(Succeed())
394 close(proc3Exited)
395 Eventually(done).Should(BeClosed())
396 })
397
398 Context("when a non-primary proc disappears without reporting back", func() {
399 It("blocks returns an appropriate error", func() {
400 done := make(chan interface{})
401 go func() {
402 defer GinkgoRecover()
403 report, err := client.BlockUntilAggregatedNonprimaryProcsReport()
404 Ω(err).Should(Equal(types.GinkgoErrors.AggregatedReportUnavailableDueToNodeDisappearing()))
405 Ω(report).Should(BeZero())
406 close(done)
407 }()
408 Consistently(done).ShouldNot(BeClosed())
409
410 Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
411 close(proc2Exited)
412 Consistently(done).ShouldNot(BeClosed())
413
414 close(proc3Exited)
415 Eventually(done).Should(BeClosed())
416 })
417 })
418 })
419
420 Describe("Fetching counters", func() {
421 It("returns ascending counters", func() {
422 Ω(client.FetchNextCounter()).Should(Equal(0))
423 Ω(client.FetchNextCounter()).Should(Equal(1))
424 Ω(client.FetchNextCounter()).Should(Equal(2))
425 Ω(client.FetchNextCounter()).Should(Equal(3))
426 })
427 })
428
429 Describe("Aborting", func() {
430 It("should not abort by default", func() {
431 Ω(client.ShouldAbort()).Should(BeFalse())
432 })
433
434 Context("when told to abort", func() {
435 BeforeEach(func() {
436 Ω(client.PostAbort()).Should(Succeed())
437 })
438
439 It("should abort", func() {
440 Ω(client.ShouldAbort()).Should(BeTrue())
441 })
442 })
443 })
444
445 })
446 })
447 }
448 })
449
View as plain text