1
7
8 package parallel_support
9
10 import (
11 "encoding/json"
12 "io"
13 "net"
14 "net/http"
15
16 "github.com/onsi/ginkgo/v2/reporters"
17 "github.com/onsi/ginkgo/v2/types"
18 )
19
20
24 type httpServer struct {
25 listener net.Listener
26 handler *ServerHandler
27 }
28
29
30 func newHttpServer(parallelTotal int, reporter reporters.Reporter) (*httpServer, error) {
31 listener, err := net.Listen("tcp", "127.0.0.1:0")
32 if err != nil {
33 return nil, err
34 }
35 return &httpServer{
36 listener: listener,
37 handler: newServerHandler(parallelTotal, reporter),
38 }, nil
39 }
40
41
42 func (server *httpServer) Start() {
43 httpServer := &http.Server{}
44 mux := http.NewServeMux()
45 httpServer.Handler = mux
46
47
48 mux.HandleFunc("/suite-will-begin", server.specSuiteWillBegin)
49 mux.HandleFunc("/did-run", server.didRun)
50 mux.HandleFunc("/suite-did-end", server.specSuiteDidEnd)
51 mux.HandleFunc("/emit-output", server.emitOutput)
52 mux.HandleFunc("/progress-report", server.emitProgressReport)
53
54
55 mux.HandleFunc("/report-before-suite-completed", server.handleReportBeforeSuiteCompleted)
56 mux.HandleFunc("/report-before-suite-state", server.handleReportBeforeSuiteState)
57 mux.HandleFunc("/before-suite-completed", server.handleBeforeSuiteCompleted)
58 mux.HandleFunc("/before-suite-state", server.handleBeforeSuiteState)
59 mux.HandleFunc("/have-nonprimary-procs-finished", server.handleHaveNonprimaryProcsFinished)
60 mux.HandleFunc("/aggregated-nonprimary-procs-report", server.handleAggregatedNonprimaryProcsReport)
61 mux.HandleFunc("/counter", server.handleCounter)
62 mux.HandleFunc("/up", server.handleUp)
63 mux.HandleFunc("/abort", server.handleAbort)
64
65 go httpServer.Serve(server.listener)
66 }
67
68
69 func (server *httpServer) Close() {
70 server.listener.Close()
71 }
72
73
74 func (server *httpServer) Address() string {
75 return "http://" + server.listener.Addr().String()
76 }
77
78 func (server *httpServer) GetSuiteDone() chan interface{} {
79 return server.handler.done
80 }
81
82 func (server *httpServer) GetOutputDestination() io.Writer {
83 return server.handler.outputDestination
84 }
85
86 func (server *httpServer) SetOutputDestination(w io.Writer) {
87 server.handler.outputDestination = w
88 }
89
90 func (server *httpServer) RegisterAlive(node int, alive func() bool) {
91 server.handler.registerAlive(node, alive)
92 }
93
94
95
96
97
98
99 func (server *httpServer) decode(writer http.ResponseWriter, request *http.Request, object interface{}) bool {
100 defer request.Body.Close()
101 if json.NewDecoder(request.Body).Decode(object) != nil {
102 writer.WriteHeader(http.StatusBadRequest)
103 return false
104 }
105 return true
106 }
107
108 func (server *httpServer) handleError(err error, writer http.ResponseWriter) bool {
109 if err == nil {
110 return false
111 }
112 switch err {
113 case ErrorEarly:
114 writer.WriteHeader(http.StatusTooEarly)
115 case ErrorGone:
116 writer.WriteHeader(http.StatusGone)
117 case ErrorFailed:
118 writer.WriteHeader(http.StatusFailedDependency)
119 default:
120 writer.WriteHeader(http.StatusInternalServerError)
121 }
122 return true
123 }
124
125 func (server *httpServer) specSuiteWillBegin(writer http.ResponseWriter, request *http.Request) {
126 var report types.Report
127 if !server.decode(writer, request, &report) {
128 return
129 }
130
131 server.handleError(server.handler.SpecSuiteWillBegin(report, voidReceiver), writer)
132 }
133
134 func (server *httpServer) didRun(writer http.ResponseWriter, request *http.Request) {
135 var report types.SpecReport
136 if !server.decode(writer, request, &report) {
137 return
138 }
139
140 server.handleError(server.handler.DidRun(report, voidReceiver), writer)
141 }
142
143 func (server *httpServer) specSuiteDidEnd(writer http.ResponseWriter, request *http.Request) {
144 var report types.Report
145 if !server.decode(writer, request, &report) {
146 return
147 }
148 server.handleError(server.handler.SpecSuiteDidEnd(report, voidReceiver), writer)
149 }
150
151 func (server *httpServer) emitOutput(writer http.ResponseWriter, request *http.Request) {
152 output, err := io.ReadAll(request.Body)
153 if err != nil {
154 writer.WriteHeader(http.StatusInternalServerError)
155 return
156 }
157 var n int
158 server.handleError(server.handler.EmitOutput(output, &n), writer)
159 }
160
161 func (server *httpServer) emitProgressReport(writer http.ResponseWriter, request *http.Request) {
162 var report types.ProgressReport
163 if !server.decode(writer, request, &report) {
164 return
165 }
166 server.handleError(server.handler.EmitProgressReport(report, voidReceiver), writer)
167 }
168
169 func (server *httpServer) handleReportBeforeSuiteCompleted(writer http.ResponseWriter, request *http.Request) {
170 var state types.SpecState
171 if !server.decode(writer, request, &state) {
172 return
173 }
174
175 server.handleError(server.handler.ReportBeforeSuiteCompleted(state, voidReceiver), writer)
176 }
177
178 func (server *httpServer) handleReportBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
179 var state types.SpecState
180 if server.handleError(server.handler.ReportBeforeSuiteState(voidSender, &state), writer) {
181 return
182 }
183 json.NewEncoder(writer).Encode(state)
184 }
185
186 func (server *httpServer) handleBeforeSuiteCompleted(writer http.ResponseWriter, request *http.Request) {
187 var beforeSuiteState BeforeSuiteState
188 if !server.decode(writer, request, &beforeSuiteState) {
189 return
190 }
191
192 server.handleError(server.handler.BeforeSuiteCompleted(beforeSuiteState, voidReceiver), writer)
193 }
194
195 func (server *httpServer) handleBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
196 var beforeSuiteState BeforeSuiteState
197 if server.handleError(server.handler.BeforeSuiteState(voidSender, &beforeSuiteState), writer) {
198 return
199 }
200 json.NewEncoder(writer).Encode(beforeSuiteState)
201 }
202
203 func (server *httpServer) handleHaveNonprimaryProcsFinished(writer http.ResponseWriter, request *http.Request) {
204 if server.handleError(server.handler.HaveNonprimaryProcsFinished(voidSender, voidReceiver), writer) {
205 return
206 }
207 writer.WriteHeader(http.StatusOK)
208 }
209
210 func (server *httpServer) handleAggregatedNonprimaryProcsReport(writer http.ResponseWriter, request *http.Request) {
211 var aggregatedReport types.Report
212 if server.handleError(server.handler.AggregatedNonprimaryProcsReport(voidSender, &aggregatedReport), writer) {
213 return
214 }
215 json.NewEncoder(writer).Encode(aggregatedReport)
216 }
217
218 func (server *httpServer) handleCounter(writer http.ResponseWriter, request *http.Request) {
219 var n int
220 if server.handleError(server.handler.Counter(voidSender, &n), writer) {
221 return
222 }
223 json.NewEncoder(writer).Encode(ParallelIndexCounter{Index: n})
224 }
225
226 func (server *httpServer) handleUp(writer http.ResponseWriter, request *http.Request) {
227 writer.WriteHeader(http.StatusOK)
228 }
229
230 func (server *httpServer) handleAbort(writer http.ResponseWriter, request *http.Request) {
231 if request.Method == "GET" {
232 var shouldAbort bool
233 server.handler.ShouldAbort(voidSender, &shouldAbort)
234 if shouldAbort {
235 writer.WriteHeader(http.StatusGone)
236 } else {
237 writer.WriteHeader(http.StatusOK)
238 }
239 } else {
240 server.handler.Abort(voidSender, voidReceiver)
241 }
242 }
243
View as plain text