1 package parallel_support
2
3 import (
4 "bytes"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "time"
10
11 "github.com/onsi/ginkgo/v2/types"
12 )
13
14 type httpClient struct {
15 serverHost string
16 }
17
18 func newHttpClient(serverHost string) *httpClient {
19 return &httpClient{
20 serverHost: serverHost,
21 }
22 }
23
24 func (client *httpClient) Connect() bool {
25 resp, err := http.Get(client.serverHost + "/up")
26 if err != nil {
27 return false
28 }
29 resp.Body.Close()
30 return resp.StatusCode == http.StatusOK
31 }
32
33 func (client *httpClient) Close() error {
34 return nil
35 }
36
37 func (client *httpClient) post(path string, data interface{}) error {
38 var body io.Reader
39 if data != nil {
40 encoded, err := json.Marshal(data)
41 if err != nil {
42 return err
43 }
44 body = bytes.NewBuffer(encoded)
45 }
46 resp, err := http.Post(client.serverHost+path, "application/json", body)
47 if err != nil {
48 return err
49 }
50 defer resp.Body.Close()
51 if resp.StatusCode != http.StatusOK {
52 return fmt.Errorf("received unexpected status code %d", resp.StatusCode)
53 }
54 return nil
55 }
56
57 func (client *httpClient) poll(path string, data interface{}) error {
58 for {
59 resp, err := http.Get(client.serverHost + path)
60 if err != nil {
61 return err
62 }
63 if resp.StatusCode == http.StatusTooEarly {
64 resp.Body.Close()
65 time.Sleep(POLLING_INTERVAL)
66 continue
67 }
68 defer resp.Body.Close()
69 if resp.StatusCode == http.StatusGone {
70 return ErrorGone
71 }
72 if resp.StatusCode == http.StatusFailedDependency {
73 return ErrorFailed
74 }
75 if resp.StatusCode != http.StatusOK {
76 return fmt.Errorf("received unexpected status code %d", resp.StatusCode)
77 }
78 if data != nil {
79 return json.NewDecoder(resp.Body).Decode(data)
80 }
81 return nil
82 }
83 }
84
85 func (client *httpClient) PostSuiteWillBegin(report types.Report) error {
86 return client.post("/suite-will-begin", report)
87 }
88
89 func (client *httpClient) PostDidRun(report types.SpecReport) error {
90 return client.post("/did-run", report)
91 }
92
93 func (client *httpClient) PostSuiteDidEnd(report types.Report) error {
94 return client.post("/suite-did-end", report)
95 }
96
97 func (client *httpClient) PostEmitProgressReport(report types.ProgressReport) error {
98 return client.post("/progress-report", report)
99 }
100
101 func (client *httpClient) PostReportBeforeSuiteCompleted(state types.SpecState) error {
102 return client.post("/report-before-suite-completed", state)
103 }
104
105 func (client *httpClient) BlockUntilReportBeforeSuiteCompleted() (types.SpecState, error) {
106 var state types.SpecState
107 err := client.poll("/report-before-suite-state", &state)
108 if err == ErrorGone {
109 return types.SpecStateFailed, nil
110 }
111 return state, err
112 }
113
114 func (client *httpClient) PostSynchronizedBeforeSuiteCompleted(state types.SpecState, data []byte) error {
115 beforeSuiteState := BeforeSuiteState{
116 State: state,
117 Data: data,
118 }
119 return client.post("/before-suite-completed", beforeSuiteState)
120 }
121
122 func (client *httpClient) BlockUntilSynchronizedBeforeSuiteData() (types.SpecState, []byte, error) {
123 var beforeSuiteState BeforeSuiteState
124 err := client.poll("/before-suite-state", &beforeSuiteState)
125 if err == ErrorGone {
126 return types.SpecStateInvalid, nil, types.GinkgoErrors.SynchronizedBeforeSuiteDisappearedOnProc1()
127 }
128 return beforeSuiteState.State, beforeSuiteState.Data, err
129 }
130
131 func (client *httpClient) BlockUntilNonprimaryProcsHaveFinished() error {
132 return client.poll("/have-nonprimary-procs-finished", nil)
133 }
134
135 func (client *httpClient) BlockUntilAggregatedNonprimaryProcsReport() (types.Report, error) {
136 var report types.Report
137 err := client.poll("/aggregated-nonprimary-procs-report", &report)
138 if err == ErrorGone {
139 return types.Report{}, types.GinkgoErrors.AggregatedReportUnavailableDueToNodeDisappearing()
140 }
141 return report, err
142 }
143
144 func (client *httpClient) FetchNextCounter() (int, error) {
145 var counter ParallelIndexCounter
146 err := client.poll("/counter", &counter)
147 return counter.Index, err
148 }
149
150 func (client *httpClient) PostAbort() error {
151 return client.post("/abort", nil)
152 }
153
154 func (client *httpClient) ShouldAbort() bool {
155 err := client.poll("/abort", nil)
156 if err == ErrorGone {
157 return true
158 }
159 return false
160 }
161
162 func (client *httpClient) Write(p []byte) (int, error) {
163 resp, err := http.Post(client.serverHost+"/emit-output", "text/plain;charset=UTF-8 ", bytes.NewReader(p))
164 resp.Body.Close()
165 if resp.StatusCode != http.StatusOK {
166 return 0, fmt.Errorf("failed to emit output")
167 }
168 return len(p), err
169 }
170
View as plain text