1
18
19 package main
20
21 import (
22 "context"
23 "flag"
24 "math"
25 "runtime"
26 "sync"
27 "time"
28
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/benchmark"
31 "google.golang.org/grpc/benchmark/stats"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/credentials"
34 "google.golang.org/grpc/credentials/insecure"
35 "google.golang.org/grpc/internal/grpcrand"
36 "google.golang.org/grpc/internal/syscall"
37 "google.golang.org/grpc/status"
38 "google.golang.org/grpc/testdata"
39
40 testgrpc "google.golang.org/grpc/interop/grpc_testing"
41 testpb "google.golang.org/grpc/interop/grpc_testing"
42
43 _ "google.golang.org/grpc/xds"
44 )
45
46 var caFile = flag.String("ca_file", "", "The file containing the CA root cert file")
47
48 type lockingHistogram struct {
49 mu sync.Mutex
50 histogram *stats.Histogram
51 }
52
53 func (h *lockingHistogram) add(value int64) {
54 h.mu.Lock()
55 defer h.mu.Unlock()
56 h.histogram.Add(value)
57 }
58
59
60 func (h *lockingHistogram) swap(o *stats.Histogram) *stats.Histogram {
61 h.mu.Lock()
62 defer h.mu.Unlock()
63 old := h.histogram
64 h.histogram = o
65 return old
66 }
67
68 func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
69 h.mu.Lock()
70 defer h.mu.Unlock()
71 merged.Merge(h.histogram)
72 }
73
74 type benchmarkClient struct {
75 closeConns func()
76 stop chan bool
77 lastResetTime time.Time
78 histogramOptions stats.HistogramOptions
79 lockingHistograms []lockingHistogram
80 rusageLastReset *syscall.Rusage
81 }
82
83 func printClientConfig(config *testpb.ClientConfig) {
84
85
86
87
88
89 logger.Infof(" * client type: %v (ignored, always creates sync client)", config.ClientType)
90 logger.Infof(" * async client threads: %v (ignored)", config.AsyncClientThreads)
91
92 logger.Infof(" * core list: %v (ignored)", config.CoreList)
93
94 logger.Infof(" - security params: %v", config.SecurityParams)
95 logger.Infof(" - core limit: %v", config.CoreLimit)
96 logger.Infof(" - payload config: %v", config.PayloadConfig)
97 logger.Infof(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
98 logger.Infof(" - channel number: %v", config.ClientChannels)
99 logger.Infof(" - load params: %v", config.LoadParams)
100 logger.Infof(" - rpc type: %v", config.RpcType)
101 logger.Infof(" - histogram params: %v", config.HistogramParams)
102 logger.Infof(" - server targets: %v", config.ServerTargets)
103 }
104
105 func setupClientEnv(config *testpb.ClientConfig) {
106
107
108 if config.CoreLimit > 0 {
109 runtime.GOMAXPROCS(int(config.CoreLimit))
110 } else {
111 runtime.GOMAXPROCS(runtime.NumCPU())
112 }
113 }
114
115
116
117
118 func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
119 opts := []grpc.DialOption{
120 grpc.WithWriteBufferSize(128 * 1024),
121 grpc.WithReadBufferSize(128 * 1024),
122 }
123
124
125 switch config.ClientType {
126 case testpb.ClientType_SYNC_CLIENT:
127 case testpb.ClientType_ASYNC_CLIENT:
128 default:
129 return nil, nil, status.Errorf(codes.InvalidArgument, "unknown client type: %v", config.ClientType)
130 }
131
132
133 if config.SecurityParams != nil {
134 if *caFile == "" {
135 *caFile = testdata.Path("ca.pem")
136 }
137 creds, err := credentials.NewClientTLSFromFile(*caFile, config.SecurityParams.ServerHostOverride)
138 if err != nil {
139 return nil, nil, status.Errorf(codes.InvalidArgument, "failed to create TLS credentials: %v", err)
140 }
141 opts = append(opts, grpc.WithTransportCredentials(creds))
142 } else {
143 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
144 }
145
146
147 if config.PayloadConfig != nil {
148 switch config.PayloadConfig.Payload.(type) {
149 case *testpb.PayloadConfig_BytebufParams:
150 opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(byteBufCodec{})))
151 case *testpb.PayloadConfig_SimpleParams:
152 default:
153 return nil, nil, status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
154 }
155 }
156
157
158 connCount := int(config.ClientChannels)
159 conns := make([]*grpc.ClientConn, connCount)
160 for connIndex := 0; connIndex < connCount; connIndex++ {
161 conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
162 }
163
164 return conns, func() {
165 for _, conn := range conns {
166 conn.Close()
167 }
168 }, nil
169 }
170
171 func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
172
173 var (
174 payloadReqSize, payloadRespSize int
175 payloadType string
176 )
177 if config.PayloadConfig != nil {
178 switch c := config.PayloadConfig.Payload.(type) {
179 case *testpb.PayloadConfig_BytebufParams:
180 payloadReqSize = int(c.BytebufParams.ReqSize)
181 payloadRespSize = int(c.BytebufParams.RespSize)
182 payloadType = "bytebuf"
183 case *testpb.PayloadConfig_SimpleParams:
184 payloadReqSize = int(c.SimpleParams.ReqSize)
185 payloadRespSize = int(c.SimpleParams.RespSize)
186 payloadType = "protobuf"
187 default:
188 return status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
189 }
190 }
191
192
193
194
195
196 var poissonLambda *float64
197 switch t := config.LoadParams.Load.(type) {
198 case *testpb.LoadParams_ClosedLoop:
199 case *testpb.LoadParams_Poisson:
200 if t.Poisson == nil {
201 return status.Errorf(codes.InvalidArgument, "poisson is nil, needs to be set")
202 }
203 if t.Poisson.OfferedLoad <= 0 {
204 return status.Errorf(codes.InvalidArgument, "poisson.offered is <= 0: %v, needs to be >0", t.Poisson.OfferedLoad)
205 }
206 poissonLambda = &t.Poisson.OfferedLoad
207 default:
208 return status.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
209 }
210
211 rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
212
213 switch config.RpcType {
214 case testpb.RpcType_UNARY:
215 bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
216 case testpb.RpcType_STREAMING:
217 bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
218 default:
219 return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
220 }
221
222 return nil
223 }
224
225 func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
226 printClientConfig(config)
227
228
229 setupClientEnv(config)
230
231 conns, closeConns, err := createConns(config)
232 if err != nil {
233 return nil, err
234 }
235
236 rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
237 bc := &benchmarkClient{
238 histogramOptions: stats.HistogramOptions{
239 NumBuckets: int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
240 GrowthFactor: config.HistogramParams.Resolution,
241 BaseBucketSize: (1 + config.HistogramParams.Resolution),
242 MinValue: 0,
243 },
244 lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),
245
246 stop: make(chan bool),
247 lastResetTime: time.Now(),
248 closeConns: closeConns,
249 rusageLastReset: syscall.GetRusage(),
250 }
251
252 if err = performRPCs(config, conns, bc); err != nil {
253
254 closeConns()
255 return nil, err
256 }
257
258 return bc, nil
259 }
260
261 func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
262 for ic, conn := range conns {
263 client := testgrpc.NewBenchmarkServiceClient(conn)
264
265 for j := 0; j < rpcCountPerConn; j++ {
266
267 idx := ic*rpcCountPerConn + j
268 bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
269
270 go func(idx int) {
271
272
273
274
275 if poissonLambda == nil {
276 done := make(chan bool)
277 for {
278 go func() {
279 start := time.Now()
280 if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
281 select {
282 case <-bc.stop:
283 case done <- false:
284 }
285 return
286 }
287 elapse := time.Since(start)
288 bc.lockingHistograms[idx].add(int64(elapse))
289 select {
290 case <-bc.stop:
291 case done <- true:
292 }
293 }()
294 select {
295 case <-bc.stop:
296 return
297 case <-done:
298 }
299 }
300 } else {
301 timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
302 time.AfterFunc(timeBetweenRPCs, func() {
303 bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
304 })
305 }
306
307 }(idx)
308 }
309 }
310 }
311
312 func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
313 var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
314 if payloadType == "bytebuf" {
315 doRPC = benchmark.DoByteBufStreamingRoundTrip
316 } else {
317 doRPC = benchmark.DoStreamingRoundTrip
318 }
319 for ic, conn := range conns {
320
321 for j := 0; j < rpcCountPerConn; j++ {
322 c := testgrpc.NewBenchmarkServiceClient(conn)
323 stream, err := c.StreamingCall(context.Background())
324 if err != nil {
325 logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
326 }
327 idx := ic*rpcCountPerConn + j
328 bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
329 if poissonLambda == nil {
330
331 go func(idx int) {
332
333
334
335
336 for {
337 start := time.Now()
338 if err := doRPC(stream, reqSize, respSize); err != nil {
339 return
340 }
341 elapse := time.Since(start)
342 bc.lockingHistograms[idx].add(int64(elapse))
343 select {
344 case <-bc.stop:
345 return
346 default:
347 }
348 }
349 }(idx)
350 } else {
351 timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
352 time.AfterFunc(timeBetweenRPCs, func() {
353 bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC)
354 })
355 }
356 }
357 }
358 }
359
360 func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
361 go func() {
362 start := time.Now()
363 if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
364 return
365 }
366 elapse := time.Since(start)
367 bc.lockingHistograms[idx].add(int64(elapse))
368 }()
369 timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
370 time.AfterFunc(timeBetweenRPCs, func() {
371 bc.poissonUnary(client, idx, reqSize, respSize, lambda)
372 })
373 }
374
375 func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
376 go func() {
377 start := time.Now()
378 if err := doRPC(stream, reqSize, respSize); err != nil {
379 return
380 }
381 elapse := time.Since(start)
382 bc.lockingHistograms[idx].add(int64(elapse))
383 }()
384 timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
385 time.AfterFunc(timeBetweenRPCs, func() {
386 bc.poissonStreaming(stream, idx, reqSize, respSize, lambda, doRPC)
387 })
388 }
389
390
391
392 func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
393 var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64
394 mergedHistogram := stats.NewHistogram(bc.histogramOptions)
395
396 if reset {
397
398
399 toMerge := make([]*stats.Histogram, len(bc.lockingHistograms))
400 for i := range bc.lockingHistograms {
401 toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions))
402 }
403
404 for i := 0; i < len(toMerge); i++ {
405 mergedHistogram.Merge(toMerge[i])
406 }
407
408 wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
409 latestRusage := syscall.GetRusage()
410 uTimeElapsed, sTimeElapsed = syscall.CPUTimeDiff(bc.rusageLastReset, latestRusage)
411
412 bc.rusageLastReset = latestRusage
413 bc.lastResetTime = time.Now()
414 } else {
415
416 for i := range bc.lockingHistograms {
417 bc.lockingHistograms[i].mergeInto(mergedHistogram)
418 }
419
420 wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
421 uTimeElapsed, sTimeElapsed = syscall.CPUTimeDiff(bc.rusageLastReset, syscall.GetRusage())
422 }
423
424 b := make([]uint32, len(mergedHistogram.Buckets))
425 for i, v := range mergedHistogram.Buckets {
426 b[i] = uint32(v.Count)
427 }
428 return &testpb.ClientStats{
429 Latencies: &testpb.HistogramData{
430 Bucket: b,
431 MinSeen: float64(mergedHistogram.Min),
432 MaxSeen: float64(mergedHistogram.Max),
433 Sum: float64(mergedHistogram.Sum),
434 SumOfSquares: float64(mergedHistogram.SumOfSquares),
435 Count: float64(mergedHistogram.Count),
436 },
437 TimeElapsed: wallTimeElapsed,
438 TimeUser: uTimeElapsed,
439 TimeSystem: sTimeElapsed,
440 }
441 }
442
443 func (bc *benchmarkClient) shutdown() {
444 close(bc.stop)
445 bc.closeConns()
446 }
447
View as plain text