...

Source file src/github.com/onsi/ginkgo/v2/internal/parallel_support/client_server_test.go

Documentation: github.com/onsi/ginkgo/v2/internal/parallel_support

     1  package parallel_support_test
     2  
     3  import (
     4  	"fmt"
     5  	"os"
     6  	"time"
     7  
     8  	. "github.com/onsi/ginkgo/v2"
     9  	. "github.com/onsi/gomega"
    10  	"github.com/onsi/gomega/gbytes"
    11  
    12  	"github.com/onsi/ginkgo/v2/internal"
    13  	"github.com/onsi/ginkgo/v2/internal/parallel_support"
    14  	. "github.com/onsi/ginkgo/v2/internal/test_helpers"
    15  	"github.com/onsi/ginkgo/v2/types"
    16  )
    17  
    18  type ColorableStringerStruct struct {
    19  	Label string
    20  	Count int
    21  }
    22  
    23  func (s ColorableStringerStruct) String() string {
    24  	return fmt.Sprintf("%s %d", s.Label, s.Count)
    25  }
    26  
    27  func (s ColorableStringerStruct) ColorableString() string {
    28  	return fmt.Sprintf("{{red}}%s {{green}}%d{{/}}", s.Label, s.Count)
    29  }
    30  
    31  var _ = Describe("The Parallel Support Client & Server", func() {
    32  	for _, protocol := range []string{"RPC", "HTTP"} {
    33  		protocol := protocol
    34  		Describe(fmt.Sprintf("The %s protocol", protocol), Label(protocol), func() {
    35  			var (
    36  				server   parallel_support.Server
    37  				client   parallel_support.Client
    38  				reporter *FakeReporter
    39  				buffer   *gbytes.Buffer
    40  			)
    41  
    42  			BeforeEach(func() {
    43  				GinkgoT().Setenv("GINKGO_PARALLEL_PROTOCOL", protocol)
    44  
    45  				var err error
    46  				reporter = NewFakeReporter()
    47  				server, err = parallel_support.NewServer(3, reporter)
    48  				Ω(err).ShouldNot(HaveOccurred())
    49  				server.Start()
    50  
    51  				buffer = gbytes.NewBuffer()
    52  				server.SetOutputDestination(buffer)
    53  
    54  				client = parallel_support.NewClient(server.Address())
    55  				Eventually(client.Connect).Should(BeTrue())
    56  
    57  				DeferCleanup(server.Close)
    58  				DeferCleanup(client.Close)
    59  			})
    60  
    61  			Describe("Reporting endpoints", func() {
    62  				var beginReport, thirdBeginReport types.Report
    63  				var endReport1, endReport2, endReport3 types.Report
    64  				var specReportA, specReportB, specReportC types.SpecReport
    65  
    66  				var t time.Time
    67  
    68  				BeforeEach(func() {
    69  					beginReport = types.Report{SuiteDescription: "my sweet suite"}
    70  					thirdBeginReport = types.Report{SuiteDescription: "last one in gets forwarded"}
    71  
    72  					specReportA = types.SpecReport{LeafNodeText: "A"}
    73  					specReportB = types.SpecReport{LeafNodeText: "B"}
    74  					specReportC = types.SpecReport{LeafNodeText: "C"}
    75  
    76  					t = time.Now()
    77  
    78  					endReport1 = types.Report{StartTime: t.Add(-time.Second), EndTime: t.Add(time.Second), SuiteSucceeded: true, SpecReports: types.SpecReports{specReportA}}
    79  					endReport2 = types.Report{StartTime: t.Add(-2 * time.Second), EndTime: t.Add(time.Second), SuiteSucceeded: true, SpecReports: types.SpecReports{specReportB}}
    80  					endReport3 = types.Report{StartTime: t.Add(-time.Second), EndTime: t.Add(2 * time.Second), SuiteSucceeded: false, SpecReports: types.SpecReports{specReportC}}
    81  				})
    82  
    83  				Context("before all procs have reported SuiteWillBegin", func() {
    84  					BeforeEach(func() {
    85  						Ω(client.PostSuiteWillBegin(beginReport)).Should(Succeed())
    86  						Ω(client.PostDidRun(specReportA)).Should(Succeed())
    87  						Ω(client.PostSuiteWillBegin(beginReport)).Should(Succeed())
    88  						Ω(client.PostDidRun(specReportB)).Should(Succeed())
    89  					})
    90  
    91  					It("should not forward anything to the attached reporter", func() {
    92  						Ω(reporter.Begin).Should(BeZero())
    93  						Ω(reporter.Will).Should(BeEmpty())
    94  						Ω(reporter.Did).Should(BeEmpty())
    95  					})
    96  
    97  					Context("when the final proc reports SuiteWillBegin", func() {
    98  						BeforeEach(func() {
    99  							Ω(client.PostSuiteWillBegin(thirdBeginReport)).Should(Succeed())
   100  						})
   101  
   102  						It("forwards to SuiteWillBegin and catches up on any received summaries", func() {
   103  							Ω(reporter.Begin).Should(Equal(thirdBeginReport))
   104  							Ω(reporter.Will.Names()).Should(ConsistOf("A", "B"))
   105  							Ω(reporter.Did.Names()).Should(ConsistOf("A", "B"))
   106  						})
   107  
   108  						Context("any subsequent summaries", func() {
   109  							BeforeEach(func() {
   110  								Ω(client.PostDidRun(specReportC)).Should(Succeed())
   111  							})
   112  
   113  							It("are forwarded immediately", func() {
   114  								Ω(reporter.Will.Names()).Should(ConsistOf("A", "B", "C"))
   115  								Ω(reporter.Did.Names()).Should(ConsistOf("A", "B", "C"))
   116  							})
   117  						})
   118  
   119  						Context("when SuiteDidEnd start arriving", func() {
   120  							BeforeEach(func() {
   121  								Ω(client.PostSuiteDidEnd(endReport1)).Should(Succeed())
   122  								Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
   123  							})
   124  
   125  							It("does not forward them yet...", func() {
   126  								Ω(reporter.End).Should(BeZero())
   127  							})
   128  
   129  							It("doesn't signal it's done", func() {
   130  								Ω(server.GetSuiteDone()).ShouldNot(BeClosed())
   131  							})
   132  
   133  							Context("when the final SuiteDidEnd arrive", func() {
   134  								BeforeEach(func() {
   135  									Ω(client.PostSuiteDidEnd(endReport3)).Should(Succeed())
   136  								})
   137  
   138  								It("forwards the aggregation of all received end summaries", func() {
   139  									Ω(reporter.End.StartTime.Unix()).Should(BeNumerically("~", t.Add(-2*time.Second).Unix()))
   140  									Ω(reporter.End.EndTime.Unix()).Should(BeNumerically("~", t.Add(2*time.Second).Unix()))
   141  									Ω(reporter.End.RunTime).Should(BeNumerically("~", 4*time.Second))
   142  									Ω(reporter.End.SuiteSucceeded).Should(BeFalse())
   143  									Ω(reporter.End.SpecReports).Should(ConsistOf(specReportA, specReportB, specReportC))
   144  								})
   145  
   146  								It("should signal it's done", func() {
   147  									Ω(server.GetSuiteDone()).Should(BeClosed())
   148  								})
   149  							})
   150  						})
   151  					})
   152  				})
   153  			})
   154  
   155  			Describe("supporting ReportEntries (which RPC struggled with when I first implemented it)", func() {
   156  				BeforeEach(func() {
   157  					Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
   158  					Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
   159  					Ω(client.PostSuiteWillBegin(types.Report{SuiteDescription: "my sweet suite"})).Should(Succeed())
   160  				})
   161  				It("can pass in ReportEntries that include custom types", func() {
   162  					cl := types.NewCodeLocation(0)
   163  					entry, err := internal.NewReportEntry("No Value Entry", cl)
   164  					Ω(err).ShouldNot(HaveOccurred())
   165  					Ω(client.PostDidRun(types.SpecReport{
   166  						LeafNodeText:  "no-value",
   167  						ReportEntries: types.ReportEntries{entry},
   168  					})).Should(Succeed())
   169  
   170  					entry, err = internal.NewReportEntry("String Value Entry", cl, "The String")
   171  					Ω(err).ShouldNot(HaveOccurred())
   172  					Ω(client.PostDidRun(types.SpecReport{
   173  						LeafNodeText:  "string-value",
   174  						ReportEntries: types.ReportEntries{entry},
   175  					})).Should(Succeed())
   176  
   177  					entry, err = internal.NewReportEntry("Custom Type Value Entry", cl, ColorableStringerStruct{Label: "apples", Count: 17})
   178  					Ω(err).ShouldNot(HaveOccurred())
   179  					Ω(client.PostDidRun(types.SpecReport{
   180  						LeafNodeText:  "custom-value",
   181  						ReportEntries: types.ReportEntries{entry},
   182  					})).Should(Succeed())
   183  
   184  					Ω(reporter.Did.Find("no-value").ReportEntries[0].Name).Should(Equal("No Value Entry"))
   185  					Ω(reporter.Did.Find("no-value").ReportEntries[0].StringRepresentation()).Should(Equal(""))
   186  
   187  					Ω(reporter.Did.Find("string-value").ReportEntries[0].Name).Should(Equal("String Value Entry"))
   188  					Ω(reporter.Did.Find("string-value").ReportEntries[0].StringRepresentation()).Should(Equal("The String"))
   189  
   190  					Ω(reporter.Did.Find("custom-value").ReportEntries[0].Name).Should(Equal("Custom Type Value Entry"))
   191  					Ω(reporter.Did.Find("custom-value").ReportEntries[0].StringRepresentation()).Should(Equal("{{red}}apples {{green}}17{{/}}"))
   192  				})
   193  			})
   194  
   195  			Describe("Streaming output", func() {
   196  				It("is configured to stream to stdout", func() {
   197  					server, err := parallel_support.NewServer(3, reporter)
   198  					Ω(err).ShouldNot(HaveOccurred())
   199  					Ω(server.GetOutputDestination().(*os.File).Fd()).Should(Equal(uintptr(1)))
   200  				})
   201  
   202  				It("streams output to the provided buffer", func() {
   203  					n, err := client.Write([]byte("hello"))
   204  					Ω(n).Should(Equal(5))
   205  					Ω(err).ShouldNot(HaveOccurred())
   206  					Ω(buffer).Should(gbytes.Say("hello"))
   207  				})
   208  			})
   209  
   210  			Describe("progress reports", func() {
   211  				It("can emit progress reports", func() {
   212  					pr := types.ProgressReport{LeafNodeText: "hola"}
   213  					Ω(client.PostEmitProgressReport(pr)).Should(Succeed())
   214  					Ω(reporter.ProgressReports).Should(ConsistOf(pr))
   215  				})
   216  			})
   217  
   218  			Describe("Synchronization endpoints", func() {
   219  				var proc1Exited, proc2Exited, proc3Exited chan interface{}
   220  				BeforeEach(func() {
   221  					proc1Exited, proc2Exited, proc3Exited = make(chan interface{}), make(chan interface{}), make(chan interface{})
   222  					aliveFunc := func(c chan interface{}) func() bool {
   223  						return func() bool {
   224  							select {
   225  							case <-c:
   226  								return false
   227  							default:
   228  								return true
   229  							}
   230  						}
   231  					}
   232  					server.RegisterAlive(1, aliveFunc(proc1Exited))
   233  					server.RegisterAlive(2, aliveFunc(proc2Exited))
   234  					server.RegisterAlive(3, aliveFunc(proc3Exited))
   235  				})
   236  
   237  				Describe("Managing ReportBeforeSuite synchronization", func() {
   238  					Context("when proc 1 succeeds", func() {
   239  						It("passes that success along to other procs", func() {
   240  							Ω(client.PostReportBeforeSuiteCompleted(types.SpecStatePassed)).Should(Succeed())
   241  							state, err := client.BlockUntilReportBeforeSuiteCompleted()
   242  							Ω(state).Should(Equal(types.SpecStatePassed))
   243  							Ω(err).ShouldNot(HaveOccurred())
   244  						})
   245  					})
   246  
   247  					Context("when proc 1 fails", func() {
   248  						It("passes that state information along to the other procs", func() {
   249  							Ω(client.PostReportBeforeSuiteCompleted(types.SpecStateFailed)).Should(Succeed())
   250  							state, err := client.BlockUntilReportBeforeSuiteCompleted()
   251  							Ω(state).Should(Equal(types.SpecStateFailed))
   252  							Ω(err).ShouldNot(HaveOccurred())
   253  						})
   254  					})
   255  
   256  					Context("when proc 1 disappears before reporting back", func() {
   257  						It("returns a meaningful error", func() {
   258  							close(proc1Exited)
   259  							state, err := client.BlockUntilReportBeforeSuiteCompleted()
   260  							Ω(state).Should(Equal(types.SpecStateFailed))
   261  							Ω(err).ShouldNot(HaveOccurred())
   262  						})
   263  					})
   264  
   265  					Context("when proc 1 hasn't responded yet", func() {
   266  						It("blocks until it does", func() {
   267  							done := make(chan interface{})
   268  							go func() {
   269  								defer GinkgoRecover()
   270  								state, err := client.BlockUntilReportBeforeSuiteCompleted()
   271  								Ω(state).Should(Equal(types.SpecStatePassed))
   272  								Ω(err).ShouldNot(HaveOccurred())
   273  								close(done)
   274  							}()
   275  							Consistently(done).ShouldNot(BeClosed())
   276  							Ω(client.PostReportBeforeSuiteCompleted(types.SpecStatePassed)).Should(Succeed())
   277  							Eventually(done).Should(BeClosed())
   278  						})
   279  					})
   280  				})
   281  
   282  				Describe("Managing SynchronizedBeforeSuite synchronization", func() {
   283  					Context("when proc 1 succeeds and returns data", func() {
   284  						It("passes that data along to other procs", func() {
   285  							Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, []byte("hello there"))).Should(Succeed())
   286  							state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
   287  							Ω(state).Should(Equal(types.SpecStatePassed))
   288  							Ω(data).Should(Equal([]byte("hello there")))
   289  							Ω(err).ShouldNot(HaveOccurred())
   290  						})
   291  					})
   292  
   293  					Context("when proc 1 succeeds and the data happens to be nil", func() {
   294  						It("passes reports success and returns nil", func() {
   295  							Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, nil)).Should(Succeed())
   296  							state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
   297  							Ω(state).Should(Equal(types.SpecStatePassed))
   298  							Ω(data).Should(BeNil())
   299  							Ω(err).ShouldNot(HaveOccurred())
   300  						})
   301  					})
   302  
   303  					Context("when proc 1 is skipped", func() {
   304  						It("passes that state information along to the other procs", func() {
   305  							Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStateSkipped, nil)).Should(Succeed())
   306  							state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
   307  							Ω(state).Should(Equal(types.SpecStateSkipped))
   308  							Ω(data).Should(BeNil())
   309  							Ω(err).ShouldNot(HaveOccurred())
   310  						})
   311  					})
   312  
   313  					Context("when proc 1 fails", func() {
   314  						It("passes that state information along to the other procs", func() {
   315  							Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStateFailed, nil)).Should(Succeed())
   316  							state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
   317  							Ω(state).Should(Equal(types.SpecStateFailed))
   318  							Ω(data).Should(BeNil())
   319  							Ω(err).ShouldNot(HaveOccurred())
   320  						})
   321  					})
   322  
   323  					Context("when proc 1 disappears before reporting back", func() {
   324  						It("returns a meaningful error", func() {
   325  							close(proc1Exited)
   326  							state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
   327  							Ω(state).Should(Equal(types.SpecStateInvalid))
   328  							Ω(data).Should(BeNil())
   329  							Ω(err).Should(MatchError(types.GinkgoErrors.SynchronizedBeforeSuiteDisappearedOnProc1()))
   330  						})
   331  					})
   332  
   333  					Context("when proc 1 hasn't responded yet", func() {
   334  						It("blocks until it does", func() {
   335  							done := make(chan interface{})
   336  							go func() {
   337  								defer GinkgoRecover()
   338  								state, data, err := client.BlockUntilSynchronizedBeforeSuiteData()
   339  								Ω(state).Should(Equal(types.SpecStatePassed))
   340  								Ω(data).Should(Equal([]byte("hello there")))
   341  								Ω(err).ShouldNot(HaveOccurred())
   342  								close(done)
   343  							}()
   344  							Consistently(done).ShouldNot(BeClosed())
   345  							Ω(client.PostSynchronizedBeforeSuiteCompleted(types.SpecStatePassed, []byte("hello there"))).Should(Succeed())
   346  							Eventually(done).Should(BeClosed())
   347  						})
   348  					})
   349  				})
   350  
   351  				Describe("BlockUntilNonprimaryProcsHaveFinished", func() {
   352  					It("blocks until non-primary procs exit", func() {
   353  						done := make(chan interface{})
   354  						go func() {
   355  							defer GinkgoRecover()
   356  							Ω(client.BlockUntilNonprimaryProcsHaveFinished()).Should(Succeed())
   357  							close(done)
   358  						}()
   359  						Consistently(done).ShouldNot(BeClosed())
   360  						close(proc2Exited)
   361  						Consistently(done).ShouldNot(BeClosed())
   362  						close(proc3Exited)
   363  						Eventually(done).Should(BeClosed())
   364  					})
   365  				})
   366  
   367  				Describe("BlockUntilAggregatedNonprimaryProcsReport", func() {
   368  					var specReportA, specReportB types.SpecReport
   369  					var endReport2, endReport3 types.Report
   370  
   371  					BeforeEach(func() {
   372  						specReportA = types.SpecReport{LeafNodeText: "A"}
   373  						specReportB = types.SpecReport{LeafNodeText: "B"}
   374  						endReport2 = types.Report{SpecReports: types.SpecReports{specReportA}}
   375  						endReport3 = types.Report{SpecReports: types.SpecReports{specReportB}}
   376  					})
   377  
   378  					It("blocks until all non-primary procs exit, then returns the aggregated report", func() {
   379  						done := make(chan interface{})
   380  						go func() {
   381  							defer GinkgoRecover()
   382  							report, err := client.BlockUntilAggregatedNonprimaryProcsReport()
   383  							Ω(err).ShouldNot(HaveOccurred())
   384  							Ω(report.SpecReports).Should(ConsistOf(specReportA, specReportB))
   385  							close(done)
   386  						}()
   387  						Consistently(done).ShouldNot(BeClosed())
   388  
   389  						Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
   390  						close(proc2Exited)
   391  						Consistently(done).ShouldNot(BeClosed())
   392  
   393  						Ω(client.PostSuiteDidEnd(endReport3)).Should(Succeed())
   394  						close(proc3Exited)
   395  						Eventually(done).Should(BeClosed())
   396  					})
   397  
   398  					Context("when a non-primary proc disappears without reporting back", func() {
   399  						It("blocks returns an appropriate error", func() {
   400  							done := make(chan interface{})
   401  							go func() {
   402  								defer GinkgoRecover()
   403  								report, err := client.BlockUntilAggregatedNonprimaryProcsReport()
   404  								Ω(err).Should(Equal(types.GinkgoErrors.AggregatedReportUnavailableDueToNodeDisappearing()))
   405  								Ω(report).Should(BeZero())
   406  								close(done)
   407  							}()
   408  							Consistently(done).ShouldNot(BeClosed())
   409  
   410  							Ω(client.PostSuiteDidEnd(endReport2)).Should(Succeed())
   411  							close(proc2Exited)
   412  							Consistently(done).ShouldNot(BeClosed())
   413  
   414  							close(proc3Exited)
   415  							Eventually(done).Should(BeClosed())
   416  						})
   417  					})
   418  				})
   419  
   420  				Describe("Fetching counters", func() {
   421  					It("returns ascending counters", func() {
   422  						Ω(client.FetchNextCounter()).Should(Equal(0))
   423  						Ω(client.FetchNextCounter()).Should(Equal(1))
   424  						Ω(client.FetchNextCounter()).Should(Equal(2))
   425  						Ω(client.FetchNextCounter()).Should(Equal(3))
   426  					})
   427  				})
   428  
   429  				Describe("Aborting", func() {
   430  					It("should not abort by default", func() {
   431  						Ω(client.ShouldAbort()).Should(BeFalse())
   432  					})
   433  
   434  					Context("when told to abort", func() {
   435  						BeforeEach(func() {
   436  							Ω(client.PostAbort()).Should(Succeed())
   437  						})
   438  
   439  						It("should abort", func() {
   440  							Ω(client.ShouldAbort()).Should(BeTrue())
   441  						})
   442  					})
   443  				})
   444  
   445  			})
   446  		})
   447  	}
   448  })
   449  

View as plain text