...

Source file src/github.com/onsi/ginkgo/v2/internal/parallel_support/http_server.go

Documentation: github.com/onsi/ginkgo/v2/internal/parallel_support

     1  /*
     2  
     3  The remote package provides the pieces to allow Ginkgo test suites to report to remote listeners.
     4  This is used, primarily, to enable streaming parallel test output but has, in principal, broader applications (e.g. streaming test output to a browser).
     5  
     6  */
     7  
     8  package parallel_support
     9  
    10  import (
    11  	"encoding/json"
    12  	"io"
    13  	"net"
    14  	"net/http"
    15  
    16  	"github.com/onsi/ginkgo/v2/reporters"
    17  	"github.com/onsi/ginkgo/v2/types"
    18  )
    19  
    20  /*
    21  httpServer spins up on an automatically selected port and listens for communication from the forwarding reporter.
    22  It then forwards that communication to attached reporters.
    23  */
    24  type httpServer struct {
    25  	listener net.Listener
    26  	handler  *ServerHandler
    27  }
    28  
    29  // Create a new server, automatically selecting a port
    30  func newHttpServer(parallelTotal int, reporter reporters.Reporter) (*httpServer, error) {
    31  	listener, err := net.Listen("tcp", "127.0.0.1:0")
    32  	if err != nil {
    33  		return nil, err
    34  	}
    35  	return &httpServer{
    36  		listener: listener,
    37  		handler:  newServerHandler(parallelTotal, reporter),
    38  	}, nil
    39  }
    40  
    41  // Start the server.  You don't need to `go s.Start()`, just `s.Start()`
    42  func (server *httpServer) Start() {
    43  	httpServer := &http.Server{}
    44  	mux := http.NewServeMux()
    45  	httpServer.Handler = mux
    46  
    47  	//streaming endpoints
    48  	mux.HandleFunc("/suite-will-begin", server.specSuiteWillBegin)
    49  	mux.HandleFunc("/did-run", server.didRun)
    50  	mux.HandleFunc("/suite-did-end", server.specSuiteDidEnd)
    51  	mux.HandleFunc("/emit-output", server.emitOutput)
    52  	mux.HandleFunc("/progress-report", server.emitProgressReport)
    53  
    54  	//synchronization endpoints
    55  	mux.HandleFunc("/report-before-suite-completed", server.handleReportBeforeSuiteCompleted)
    56  	mux.HandleFunc("/report-before-suite-state", server.handleReportBeforeSuiteState)
    57  	mux.HandleFunc("/before-suite-completed", server.handleBeforeSuiteCompleted)
    58  	mux.HandleFunc("/before-suite-state", server.handleBeforeSuiteState)
    59  	mux.HandleFunc("/have-nonprimary-procs-finished", server.handleHaveNonprimaryProcsFinished)
    60  	mux.HandleFunc("/aggregated-nonprimary-procs-report", server.handleAggregatedNonprimaryProcsReport)
    61  	mux.HandleFunc("/counter", server.handleCounter)
    62  	mux.HandleFunc("/up", server.handleUp)
    63  	mux.HandleFunc("/abort", server.handleAbort)
    64  
    65  	go httpServer.Serve(server.listener)
    66  }
    67  
    68  // Stop the server
    69  func (server *httpServer) Close() {
    70  	server.listener.Close()
    71  }
    72  
    73  // The address the server can be reached it.  Pass this into the `ForwardingReporter`.
    74  func (server *httpServer) Address() string {
    75  	return "http://" + server.listener.Addr().String()
    76  }
    77  
    78  func (server *httpServer) GetSuiteDone() chan interface{} {
    79  	return server.handler.done
    80  }
    81  
    82  func (server *httpServer) GetOutputDestination() io.Writer {
    83  	return server.handler.outputDestination
    84  }
    85  
    86  func (server *httpServer) SetOutputDestination(w io.Writer) {
    87  	server.handler.outputDestination = w
    88  }
    89  
    90  func (server *httpServer) RegisterAlive(node int, alive func() bool) {
    91  	server.handler.registerAlive(node, alive)
    92  }
    93  
    94  //
    95  // Streaming Endpoints
    96  //
    97  
    98  // The server will forward all received messages to Ginkgo reporters registered with `RegisterReporters`
    99  func (server *httpServer) decode(writer http.ResponseWriter, request *http.Request, object interface{}) bool {
   100  	defer request.Body.Close()
   101  	if json.NewDecoder(request.Body).Decode(object) != nil {
   102  		writer.WriteHeader(http.StatusBadRequest)
   103  		return false
   104  	}
   105  	return true
   106  }
   107  
   108  func (server *httpServer) handleError(err error, writer http.ResponseWriter) bool {
   109  	if err == nil {
   110  		return false
   111  	}
   112  	switch err {
   113  	case ErrorEarly:
   114  		writer.WriteHeader(http.StatusTooEarly)
   115  	case ErrorGone:
   116  		writer.WriteHeader(http.StatusGone)
   117  	case ErrorFailed:
   118  		writer.WriteHeader(http.StatusFailedDependency)
   119  	default:
   120  		writer.WriteHeader(http.StatusInternalServerError)
   121  	}
   122  	return true
   123  }
   124  
   125  func (server *httpServer) specSuiteWillBegin(writer http.ResponseWriter, request *http.Request) {
   126  	var report types.Report
   127  	if !server.decode(writer, request, &report) {
   128  		return
   129  	}
   130  
   131  	server.handleError(server.handler.SpecSuiteWillBegin(report, voidReceiver), writer)
   132  }
   133  
   134  func (server *httpServer) didRun(writer http.ResponseWriter, request *http.Request) {
   135  	var report types.SpecReport
   136  	if !server.decode(writer, request, &report) {
   137  		return
   138  	}
   139  
   140  	server.handleError(server.handler.DidRun(report, voidReceiver), writer)
   141  }
   142  
   143  func (server *httpServer) specSuiteDidEnd(writer http.ResponseWriter, request *http.Request) {
   144  	var report types.Report
   145  	if !server.decode(writer, request, &report) {
   146  		return
   147  	}
   148  	server.handleError(server.handler.SpecSuiteDidEnd(report, voidReceiver), writer)
   149  }
   150  
   151  func (server *httpServer) emitOutput(writer http.ResponseWriter, request *http.Request) {
   152  	output, err := io.ReadAll(request.Body)
   153  	if err != nil {
   154  		writer.WriteHeader(http.StatusInternalServerError)
   155  		return
   156  	}
   157  	var n int
   158  	server.handleError(server.handler.EmitOutput(output, &n), writer)
   159  }
   160  
   161  func (server *httpServer) emitProgressReport(writer http.ResponseWriter, request *http.Request) {
   162  	var report types.ProgressReport
   163  	if !server.decode(writer, request, &report) {
   164  		return
   165  	}
   166  	server.handleError(server.handler.EmitProgressReport(report, voidReceiver), writer)
   167  }
   168  
   169  func (server *httpServer) handleReportBeforeSuiteCompleted(writer http.ResponseWriter, request *http.Request) {
   170  	var state types.SpecState
   171  	if !server.decode(writer, request, &state) {
   172  		return
   173  	}
   174  
   175  	server.handleError(server.handler.ReportBeforeSuiteCompleted(state, voidReceiver), writer)
   176  }
   177  
   178  func (server *httpServer) handleReportBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
   179  	var state types.SpecState
   180  	if server.handleError(server.handler.ReportBeforeSuiteState(voidSender, &state), writer) {
   181  		return
   182  	}
   183  	json.NewEncoder(writer).Encode(state)
   184  }
   185  
   186  func (server *httpServer) handleBeforeSuiteCompleted(writer http.ResponseWriter, request *http.Request) {
   187  	var beforeSuiteState BeforeSuiteState
   188  	if !server.decode(writer, request, &beforeSuiteState) {
   189  		return
   190  	}
   191  
   192  	server.handleError(server.handler.BeforeSuiteCompleted(beforeSuiteState, voidReceiver), writer)
   193  }
   194  
   195  func (server *httpServer) handleBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
   196  	var beforeSuiteState BeforeSuiteState
   197  	if server.handleError(server.handler.BeforeSuiteState(voidSender, &beforeSuiteState), writer) {
   198  		return
   199  	}
   200  	json.NewEncoder(writer).Encode(beforeSuiteState)
   201  }
   202  
   203  func (server *httpServer) handleHaveNonprimaryProcsFinished(writer http.ResponseWriter, request *http.Request) {
   204  	if server.handleError(server.handler.HaveNonprimaryProcsFinished(voidSender, voidReceiver), writer) {
   205  		return
   206  	}
   207  	writer.WriteHeader(http.StatusOK)
   208  }
   209  
   210  func (server *httpServer) handleAggregatedNonprimaryProcsReport(writer http.ResponseWriter, request *http.Request) {
   211  	var aggregatedReport types.Report
   212  	if server.handleError(server.handler.AggregatedNonprimaryProcsReport(voidSender, &aggregatedReport), writer) {
   213  		return
   214  	}
   215  	json.NewEncoder(writer).Encode(aggregatedReport)
   216  }
   217  
   218  func (server *httpServer) handleCounter(writer http.ResponseWriter, request *http.Request) {
   219  	var n int
   220  	if server.handleError(server.handler.Counter(voidSender, &n), writer) {
   221  		return
   222  	}
   223  	json.NewEncoder(writer).Encode(ParallelIndexCounter{Index: n})
   224  }
   225  
   226  func (server *httpServer) handleUp(writer http.ResponseWriter, request *http.Request) {
   227  	writer.WriteHeader(http.StatusOK)
   228  }
   229  
   230  func (server *httpServer) handleAbort(writer http.ResponseWriter, request *http.Request) {
   231  	if request.Method == "GET" {
   232  		var shouldAbort bool
   233  		server.handler.ShouldAbort(voidSender, &shouldAbort)
   234  		if shouldAbort {
   235  			writer.WriteHeader(http.StatusGone)
   236  		} else {
   237  			writer.WriteHeader(http.StatusOK)
   238  		}
   239  	} else {
   240  		server.handler.Abort(voidSender, voidReceiver)
   241  	}
   242  }
   243  

View as plain text