...

Source file src/github.com/onsi/ginkgo/v2/internal/parallel_support/server_handler.go

Documentation: github.com/onsi/ginkgo/v2/internal/parallel_support

     1  package parallel_support
     2  
     3  import (
     4  	"io"
     5  	"os"
     6  	"sync"
     7  
     8  	"github.com/onsi/ginkgo/v2/reporters"
     9  	"github.com/onsi/ginkgo/v2/types"
    10  )
    11  
    12  type Void struct{}
    13  
    14  var voidReceiver *Void = &Void{}
    15  var voidSender Void
    16  
    17  // ServerHandler is an RPC-compatible handler that is shared between the http server and the rpc server.
    18  // It handles all the business logic to avoid duplication between the two servers
    19  
    20  type ServerHandler struct {
    21  	done                   chan interface{}
    22  	outputDestination      io.Writer
    23  	reporter               reporters.Reporter
    24  	alives                 []func() bool
    25  	lock                   *sync.Mutex
    26  	beforeSuiteState       BeforeSuiteState
    27  	reportBeforeSuiteState types.SpecState
    28  	parallelTotal          int
    29  	counter                int
    30  	counterLock            *sync.Mutex
    31  	shouldAbort            bool
    32  
    33  	numSuiteDidBegins int
    34  	numSuiteDidEnds   int
    35  	aggregatedReport  types.Report
    36  	reportHoldingArea []types.SpecReport
    37  }
    38  
    39  func newServerHandler(parallelTotal int, reporter reporters.Reporter) *ServerHandler {
    40  	return &ServerHandler{
    41  		reporter:         reporter,
    42  		lock:             &sync.Mutex{},
    43  		counterLock:      &sync.Mutex{},
    44  		alives:           make([]func() bool, parallelTotal),
    45  		beforeSuiteState: BeforeSuiteState{Data: nil, State: types.SpecStateInvalid},
    46  
    47  		parallelTotal:     parallelTotal,
    48  		outputDestination: os.Stdout,
    49  		done:              make(chan interface{}),
    50  	}
    51  }
    52  
    53  func (handler *ServerHandler) SpecSuiteWillBegin(report types.Report, _ *Void) error {
    54  	handler.lock.Lock()
    55  	defer handler.lock.Unlock()
    56  
    57  	handler.numSuiteDidBegins += 1
    58  
    59  	// all summaries are identical, so it's fine to simply emit the last one of these
    60  	if handler.numSuiteDidBegins == handler.parallelTotal {
    61  		handler.reporter.SuiteWillBegin(report)
    62  
    63  		for _, summary := range handler.reportHoldingArea {
    64  			handler.reporter.WillRun(summary)
    65  			handler.reporter.DidRun(summary)
    66  		}
    67  
    68  		handler.reportHoldingArea = nil
    69  	}
    70  
    71  	return nil
    72  }
    73  
    74  func (handler *ServerHandler) DidRun(report types.SpecReport, _ *Void) error {
    75  	handler.lock.Lock()
    76  	defer handler.lock.Unlock()
    77  
    78  	if handler.numSuiteDidBegins == handler.parallelTotal {
    79  		handler.reporter.WillRun(report)
    80  		handler.reporter.DidRun(report)
    81  	} else {
    82  		handler.reportHoldingArea = append(handler.reportHoldingArea, report)
    83  	}
    84  
    85  	return nil
    86  }
    87  
    88  func (handler *ServerHandler) SpecSuiteDidEnd(report types.Report, _ *Void) error {
    89  	handler.lock.Lock()
    90  	defer handler.lock.Unlock()
    91  
    92  	handler.numSuiteDidEnds += 1
    93  	if handler.numSuiteDidEnds == 1 {
    94  		handler.aggregatedReport = report
    95  	} else {
    96  		handler.aggregatedReport = handler.aggregatedReport.Add(report)
    97  	}
    98  
    99  	if handler.numSuiteDidEnds == handler.parallelTotal {
   100  		handler.reporter.SuiteDidEnd(handler.aggregatedReport)
   101  		close(handler.done)
   102  	}
   103  
   104  	return nil
   105  }
   106  
   107  func (handler *ServerHandler) EmitOutput(output []byte, n *int) error {
   108  	var err error
   109  	*n, err = handler.outputDestination.Write(output)
   110  	return err
   111  }
   112  
   113  func (handler *ServerHandler) EmitProgressReport(report types.ProgressReport, _ *Void) error {
   114  	handler.lock.Lock()
   115  	defer handler.lock.Unlock()
   116  	handler.reporter.EmitProgressReport(report)
   117  	return nil
   118  }
   119  
   120  func (handler *ServerHandler) registerAlive(proc int, alive func() bool) {
   121  	handler.lock.Lock()
   122  	defer handler.lock.Unlock()
   123  	handler.alives[proc-1] = alive
   124  }
   125  
   126  func (handler *ServerHandler) procIsAlive(proc int) bool {
   127  	handler.lock.Lock()
   128  	defer handler.lock.Unlock()
   129  	alive := handler.alives[proc-1]
   130  	if alive == nil {
   131  		return true
   132  	}
   133  	return alive()
   134  }
   135  
   136  func (handler *ServerHandler) haveNonprimaryProcsFinished() bool {
   137  	for i := 2; i <= handler.parallelTotal; i++ {
   138  		if handler.procIsAlive(i) {
   139  			return false
   140  		}
   141  	}
   142  	return true
   143  }
   144  
   145  func (handler *ServerHandler) ReportBeforeSuiteCompleted(reportBeforeSuiteState types.SpecState, _ *Void) error {
   146  	handler.lock.Lock()
   147  	defer handler.lock.Unlock()
   148  	handler.reportBeforeSuiteState = reportBeforeSuiteState
   149  
   150  	return nil
   151  }
   152  
   153  func (handler *ServerHandler) ReportBeforeSuiteState(_ Void, reportBeforeSuiteState *types.SpecState) error {
   154  	proc1IsAlive := handler.procIsAlive(1)
   155  	handler.lock.Lock()
   156  	defer handler.lock.Unlock()
   157  	if handler.reportBeforeSuiteState == types.SpecStateInvalid {
   158  		if proc1IsAlive {
   159  			return ErrorEarly
   160  		} else {
   161  			return ErrorGone
   162  		}
   163  	}
   164  	*reportBeforeSuiteState = handler.reportBeforeSuiteState
   165  	return nil
   166  }
   167  
   168  func (handler *ServerHandler) BeforeSuiteCompleted(beforeSuiteState BeforeSuiteState, _ *Void) error {
   169  	handler.lock.Lock()
   170  	defer handler.lock.Unlock()
   171  	handler.beforeSuiteState = beforeSuiteState
   172  
   173  	return nil
   174  }
   175  
   176  func (handler *ServerHandler) BeforeSuiteState(_ Void, beforeSuiteState *BeforeSuiteState) error {
   177  	proc1IsAlive := handler.procIsAlive(1)
   178  	handler.lock.Lock()
   179  	defer handler.lock.Unlock()
   180  	if handler.beforeSuiteState.State == types.SpecStateInvalid {
   181  		if proc1IsAlive {
   182  			return ErrorEarly
   183  		} else {
   184  			return ErrorGone
   185  		}
   186  	}
   187  	*beforeSuiteState = handler.beforeSuiteState
   188  	return nil
   189  }
   190  
   191  func (handler *ServerHandler) HaveNonprimaryProcsFinished(_ Void, _ *Void) error {
   192  	if handler.haveNonprimaryProcsFinished() {
   193  		return nil
   194  	} else {
   195  		return ErrorEarly
   196  	}
   197  }
   198  
   199  func (handler *ServerHandler) AggregatedNonprimaryProcsReport(_ Void, report *types.Report) error {
   200  	if handler.haveNonprimaryProcsFinished() {
   201  		handler.lock.Lock()
   202  		defer handler.lock.Unlock()
   203  		if handler.numSuiteDidEnds == handler.parallelTotal-1 {
   204  			*report = handler.aggregatedReport
   205  			return nil
   206  		} else {
   207  			return ErrorGone
   208  		}
   209  	} else {
   210  		return ErrorEarly
   211  	}
   212  }
   213  
   214  func (handler *ServerHandler) Counter(_ Void, counter *int) error {
   215  	handler.counterLock.Lock()
   216  	defer handler.counterLock.Unlock()
   217  	*counter = handler.counter
   218  	handler.counter++
   219  	return nil
   220  }
   221  
   222  func (handler *ServerHandler) Abort(_ Void, _ *Void) error {
   223  	handler.lock.Lock()
   224  	defer handler.lock.Unlock()
   225  	handler.shouldAbort = true
   226  	return nil
   227  }
   228  
   229  func (handler *ServerHandler) ShouldAbort(_ Void, shouldAbort *bool) error {
   230  	handler.lock.Lock()
   231  	defer handler.lock.Unlock()
   232  	*shouldAbort = handler.shouldAbort
   233  	return nil
   234  }
   235  

View as plain text