...

Source file src/github.com/onsi/ginkgo/v2/internal/parallel_support/rpc_client.go

Documentation: github.com/onsi/ginkgo/v2/internal/parallel_support

     1  package parallel_support
     2  
     3  import (
     4  	"net/rpc"
     5  	"time"
     6  
     7  	"github.com/onsi/ginkgo/v2/types"
     8  )
     9  
    10  type rpcClient struct {
    11  	serverHost string
    12  	client     *rpc.Client
    13  }
    14  
    15  func newRPCClient(serverHost string) *rpcClient {
    16  	return &rpcClient{
    17  		serverHost: serverHost,
    18  	}
    19  }
    20  
    21  func (client *rpcClient) Connect() bool {
    22  	var err error
    23  	if client.client != nil {
    24  		return true
    25  	}
    26  	client.client, err = rpc.DialHTTPPath("tcp", client.serverHost, "/")
    27  	if err != nil {
    28  		client.client = nil
    29  		return false
    30  	}
    31  	return true
    32  }
    33  
    34  func (client *rpcClient) Close() error {
    35  	return client.client.Close()
    36  }
    37  
    38  func (client *rpcClient) poll(method string, data interface{}) error {
    39  	for {
    40  		err := client.client.Call(method, voidSender, data)
    41  		if err == nil {
    42  			return nil
    43  		}
    44  		switch err.Error() {
    45  		case ErrorEarly.Error():
    46  			time.Sleep(POLLING_INTERVAL)
    47  		case ErrorGone.Error():
    48  			return ErrorGone
    49  		case ErrorFailed.Error():
    50  			return ErrorFailed
    51  		default:
    52  			return err
    53  		}
    54  	}
    55  }
    56  
    57  func (client *rpcClient) PostSuiteWillBegin(report types.Report) error {
    58  	return client.client.Call("Server.SpecSuiteWillBegin", report, voidReceiver)
    59  }
    60  
    61  func (client *rpcClient) PostDidRun(report types.SpecReport) error {
    62  	return client.client.Call("Server.DidRun", report, voidReceiver)
    63  }
    64  
    65  func (client *rpcClient) PostSuiteDidEnd(report types.Report) error {
    66  	return client.client.Call("Server.SpecSuiteDidEnd", report, voidReceiver)
    67  }
    68  
    69  func (client *rpcClient) Write(p []byte) (int, error) {
    70  	var n int
    71  	err := client.client.Call("Server.EmitOutput", p, &n)
    72  	return n, err
    73  }
    74  
    75  func (client *rpcClient) PostEmitProgressReport(report types.ProgressReport) error {
    76  	return client.client.Call("Server.EmitProgressReport", report, voidReceiver)
    77  }
    78  
    79  func (client *rpcClient) PostReportBeforeSuiteCompleted(state types.SpecState) error {
    80  	return client.client.Call("Server.ReportBeforeSuiteCompleted", state, voidReceiver)
    81  }
    82  
    83  func (client *rpcClient) BlockUntilReportBeforeSuiteCompleted() (types.SpecState, error) {
    84  	var state types.SpecState
    85  	err := client.poll("Server.ReportBeforeSuiteState", &state)
    86  	if err == ErrorGone {
    87  		return types.SpecStateFailed, nil
    88  	}
    89  	return state, err
    90  }
    91  
    92  func (client *rpcClient) PostSynchronizedBeforeSuiteCompleted(state types.SpecState, data []byte) error {
    93  	beforeSuiteState := BeforeSuiteState{
    94  		State: state,
    95  		Data:  data,
    96  	}
    97  	return client.client.Call("Server.BeforeSuiteCompleted", beforeSuiteState, voidReceiver)
    98  }
    99  
   100  func (client *rpcClient) BlockUntilSynchronizedBeforeSuiteData() (types.SpecState, []byte, error) {
   101  	var beforeSuiteState BeforeSuiteState
   102  	err := client.poll("Server.BeforeSuiteState", &beforeSuiteState)
   103  	if err == ErrorGone {
   104  		return types.SpecStateInvalid, nil, types.GinkgoErrors.SynchronizedBeforeSuiteDisappearedOnProc1()
   105  	}
   106  	return beforeSuiteState.State, beforeSuiteState.Data, err
   107  }
   108  
   109  func (client *rpcClient) BlockUntilNonprimaryProcsHaveFinished() error {
   110  	return client.poll("Server.HaveNonprimaryProcsFinished", voidReceiver)
   111  }
   112  
   113  func (client *rpcClient) BlockUntilAggregatedNonprimaryProcsReport() (types.Report, error) {
   114  	var report types.Report
   115  	err := client.poll("Server.AggregatedNonprimaryProcsReport", &report)
   116  	if err == ErrorGone {
   117  		return types.Report{}, types.GinkgoErrors.AggregatedReportUnavailableDueToNodeDisappearing()
   118  	}
   119  	return report, err
   120  }
   121  
   122  func (client *rpcClient) FetchNextCounter() (int, error) {
   123  	var counter int
   124  	err := client.client.Call("Server.Counter", voidSender, &counter)
   125  	return counter, err
   126  }
   127  
   128  func (client *rpcClient) PostAbort() error {
   129  	return client.client.Call("Server.Abort", voidSender, voidReceiver)
   130  }
   131  
   132  func (client *rpcClient) ShouldAbort() bool {
   133  	var shouldAbort bool
   134  	client.client.Call("Server.ShouldAbort", voidSender, &shouldAbort)
   135  	return shouldAbort
   136  }
   137  

View as plain text