1 package parallel_support
2
3 import (
4 "net/rpc"
5 "time"
6
7 "github.com/onsi/ginkgo/v2/types"
8 )
9
10 type rpcClient struct {
11 serverHost string
12 client *rpc.Client
13 }
14
15 func newRPCClient(serverHost string) *rpcClient {
16 return &rpcClient{
17 serverHost: serverHost,
18 }
19 }
20
21 func (client *rpcClient) Connect() bool {
22 var err error
23 if client.client != nil {
24 return true
25 }
26 client.client, err = rpc.DialHTTPPath("tcp", client.serverHost, "/")
27 if err != nil {
28 client.client = nil
29 return false
30 }
31 return true
32 }
33
34 func (client *rpcClient) Close() error {
35 return client.client.Close()
36 }
37
38 func (client *rpcClient) poll(method string, data interface{}) error {
39 for {
40 err := client.client.Call(method, voidSender, data)
41 if err == nil {
42 return nil
43 }
44 switch err.Error() {
45 case ErrorEarly.Error():
46 time.Sleep(POLLING_INTERVAL)
47 case ErrorGone.Error():
48 return ErrorGone
49 case ErrorFailed.Error():
50 return ErrorFailed
51 default:
52 return err
53 }
54 }
55 }
56
57 func (client *rpcClient) PostSuiteWillBegin(report types.Report) error {
58 return client.client.Call("Server.SpecSuiteWillBegin", report, voidReceiver)
59 }
60
61 func (client *rpcClient) PostDidRun(report types.SpecReport) error {
62 return client.client.Call("Server.DidRun", report, voidReceiver)
63 }
64
65 func (client *rpcClient) PostSuiteDidEnd(report types.Report) error {
66 return client.client.Call("Server.SpecSuiteDidEnd", report, voidReceiver)
67 }
68
69 func (client *rpcClient) Write(p []byte) (int, error) {
70 var n int
71 err := client.client.Call("Server.EmitOutput", p, &n)
72 return n, err
73 }
74
75 func (client *rpcClient) PostEmitProgressReport(report types.ProgressReport) error {
76 return client.client.Call("Server.EmitProgressReport", report, voidReceiver)
77 }
78
79 func (client *rpcClient) PostReportBeforeSuiteCompleted(state types.SpecState) error {
80 return client.client.Call("Server.ReportBeforeSuiteCompleted", state, voidReceiver)
81 }
82
83 func (client *rpcClient) BlockUntilReportBeforeSuiteCompleted() (types.SpecState, error) {
84 var state types.SpecState
85 err := client.poll("Server.ReportBeforeSuiteState", &state)
86 if err == ErrorGone {
87 return types.SpecStateFailed, nil
88 }
89 return state, err
90 }
91
92 func (client *rpcClient) PostSynchronizedBeforeSuiteCompleted(state types.SpecState, data []byte) error {
93 beforeSuiteState := BeforeSuiteState{
94 State: state,
95 Data: data,
96 }
97 return client.client.Call("Server.BeforeSuiteCompleted", beforeSuiteState, voidReceiver)
98 }
99
100 func (client *rpcClient) BlockUntilSynchronizedBeforeSuiteData() (types.SpecState, []byte, error) {
101 var beforeSuiteState BeforeSuiteState
102 err := client.poll("Server.BeforeSuiteState", &beforeSuiteState)
103 if err == ErrorGone {
104 return types.SpecStateInvalid, nil, types.GinkgoErrors.SynchronizedBeforeSuiteDisappearedOnProc1()
105 }
106 return beforeSuiteState.State, beforeSuiteState.Data, err
107 }
108
109 func (client *rpcClient) BlockUntilNonprimaryProcsHaveFinished() error {
110 return client.poll("Server.HaveNonprimaryProcsFinished", voidReceiver)
111 }
112
113 func (client *rpcClient) BlockUntilAggregatedNonprimaryProcsReport() (types.Report, error) {
114 var report types.Report
115 err := client.poll("Server.AggregatedNonprimaryProcsReport", &report)
116 if err == ErrorGone {
117 return types.Report{}, types.GinkgoErrors.AggregatedReportUnavailableDueToNodeDisappearing()
118 }
119 return report, err
120 }
121
122 func (client *rpcClient) FetchNextCounter() (int, error) {
123 var counter int
124 err := client.client.Call("Server.Counter", voidSender, &counter)
125 return counter, err
126 }
127
128 func (client *rpcClient) PostAbort() error {
129 return client.client.Call("Server.Abort", voidSender, voidReceiver)
130 }
131
132 func (client *rpcClient) ShouldAbort() bool {
133 var shouldAbort bool
134 client.client.Call("Server.ShouldAbort", voidSender, &shouldAbort)
135 return shouldAbort
136 }
137
View as plain text