1
18
19
20
21 package main
22
23 import (
24 "context"
25 "flag"
26 "fmt"
27 "io"
28 "net"
29 "net/http"
30 _ "net/http/pprof"
31 "runtime"
32 "strconv"
33 "time"
34
35 "google.golang.org/grpc"
36 "google.golang.org/grpc/codes"
37 "google.golang.org/grpc/grpclog"
38 "google.golang.org/grpc/status"
39
40 testgrpc "google.golang.org/grpc/interop/grpc_testing"
41 testpb "google.golang.org/grpc/interop/grpc_testing"
42 )
43
44 var (
45 driverPort = flag.Int("driver_port", 10000, "port for communication with driver")
46 serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
47 pprofPort = flag.Int("pprof_port", -1, "Port for pprof debug server to listen on. Pprof server doesn't start if unset")
48 blockProfRate = flag.Int("block_prof_rate", 0, "fraction of goroutine blocking events to report in blocking profile")
49
50 logger = grpclog.Component("benchmark")
51 )
52
53 type byteBufCodec struct {
54 }
55
56 func (byteBufCodec) Marshal(v any) ([]byte, error) {
57 b, ok := v.(*[]byte)
58 if !ok {
59 return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
60 }
61 return *b, nil
62 }
63
64 func (byteBufCodec) Unmarshal(data []byte, v any) error {
65 b, ok := v.(*[]byte)
66 if !ok {
67 return fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
68 }
69 *b = data
70 return nil
71 }
72
73 func (byteBufCodec) String() string {
74 return "bytebuffer"
75 }
76
77
78
79 type workerServer struct {
80 testgrpc.UnimplementedWorkerServiceServer
81 stop chan<- bool
82 serverPort int
83 }
84
85 func (s *workerServer) RunServer(stream testgrpc.WorkerService_RunServerServer) error {
86 var bs *benchmarkServer
87 defer func() {
88
89 logger.Infof("closing benchmark server")
90 if bs != nil {
91 bs.closeFunc()
92 }
93 }()
94 for {
95 in, err := stream.Recv()
96 if err == io.EOF {
97 return nil
98 }
99 if err != nil {
100 return err
101 }
102
103 var out *testpb.ServerStatus
104 switch argtype := in.Argtype.(type) {
105 case *testpb.ServerArgs_Setup:
106 logger.Infof("server setup received:")
107 if bs != nil {
108 logger.Infof("server setup received when server already exists, closing the existing server")
109 bs.closeFunc()
110 }
111 bs, err = startBenchmarkServer(argtype.Setup, s.serverPort)
112 if err != nil {
113 return err
114 }
115 out = &testpb.ServerStatus{
116 Stats: bs.getStats(false),
117 Port: int32(bs.port),
118 Cores: int32(bs.cores),
119 }
120
121 case *testpb.ServerArgs_Mark:
122 logger.Infof("server mark received:")
123 logger.Infof(" - %v", argtype)
124 if bs == nil {
125 return status.Error(codes.InvalidArgument, "server does not exist when mark received")
126 }
127 out = &testpb.ServerStatus{
128 Stats: bs.getStats(argtype.Mark.Reset_),
129 Port: int32(bs.port),
130 Cores: int32(bs.cores),
131 }
132 }
133
134 if err := stream.Send(out); err != nil {
135 return err
136 }
137 }
138 }
139
140 func (s *workerServer) RunClient(stream testgrpc.WorkerService_RunClientServer) error {
141 var bc *benchmarkClient
142 defer func() {
143
144 logger.Infof("shuting down benchmark client")
145 if bc != nil {
146 bc.shutdown()
147 }
148 }()
149 for {
150 in, err := stream.Recv()
151 if err == io.EOF {
152 return nil
153 }
154 if err != nil {
155 return err
156 }
157
158 var out *testpb.ClientStatus
159 switch t := in.Argtype.(type) {
160 case *testpb.ClientArgs_Setup:
161 logger.Infof("client setup received:")
162 if bc != nil {
163 logger.Infof("client setup received when client already exists, shuting down the existing client")
164 bc.shutdown()
165 }
166 bc, err = startBenchmarkClient(t.Setup)
167 if err != nil {
168 return err
169 }
170 out = &testpb.ClientStatus{
171 Stats: bc.getStats(false),
172 }
173
174 case *testpb.ClientArgs_Mark:
175 logger.Infof("client mark received:")
176 logger.Infof(" - %v", t)
177 if bc == nil {
178 return status.Error(codes.InvalidArgument, "client does not exist when mark received")
179 }
180 out = &testpb.ClientStatus{
181 Stats: bc.getStats(t.Mark.Reset_),
182 }
183 }
184
185 if err := stream.Send(out); err != nil {
186 return err
187 }
188 }
189 }
190
191 func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
192 logger.Infof("core count: %v", runtime.NumCPU())
193 return &testpb.CoreResponse{Cores: int32(runtime.NumCPU())}, nil
194 }
195
196 func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
197 logger.Infof("quitting worker")
198 s.stop <- true
199 return &testpb.Void{}, nil
200 }
201
202 func main() {
203 grpc.EnableTracing = false
204
205 flag.Parse()
206 lis, err := net.Listen("tcp", ":"+strconv.Itoa(*driverPort))
207 if err != nil {
208 logger.Fatalf("failed to listen: %v", err)
209 }
210 logger.Infof("worker listening at port %v", *driverPort)
211
212 s := grpc.NewServer()
213 stop := make(chan bool)
214 testgrpc.RegisterWorkerServiceServer(s, &workerServer{
215 stop: stop,
216 serverPort: *serverPort,
217 })
218
219 go func() {
220 <-stop
221
222
223 time.Sleep(time.Second)
224 s.Stop()
225 }()
226
227 runtime.SetBlockProfileRate(*blockProfRate)
228
229 if *pprofPort >= 0 {
230 go func() {
231 logger.Infoln("Starting pprof server on port " + strconv.Itoa(*pprofPort))
232 logger.Infoln(http.ListenAndServe("localhost:"+strconv.Itoa(*pprofPort), nil))
233 }()
234 }
235
236 s.Serve(lis)
237 }
238
View as plain text