1
18
19
41 package main
42
43 import (
44 "context"
45 "encoding/gob"
46 "flag"
47 "fmt"
48 "io"
49 "log"
50 "math/rand"
51 "net"
52 "os"
53 "reflect"
54 "runtime"
55 "runtime/pprof"
56 "strconv"
57 "strings"
58 "sync"
59 "sync/atomic"
60 "time"
61
62 "google.golang.org/grpc"
63 "google.golang.org/grpc/benchmark"
64 "google.golang.org/grpc/benchmark/flags"
65 "google.golang.org/grpc/benchmark/latency"
66 "google.golang.org/grpc/benchmark/stats"
67 "google.golang.org/grpc/credentials/insecure"
68 "google.golang.org/grpc/experimental"
69 "google.golang.org/grpc/grpclog"
70 "google.golang.org/grpc/internal"
71 "google.golang.org/grpc/internal/channelz"
72 "google.golang.org/grpc/keepalive"
73 "google.golang.org/grpc/metadata"
74 "google.golang.org/grpc/test/bufconn"
75
76 testgrpc "google.golang.org/grpc/interop/grpc_testing"
77 testpb "google.golang.org/grpc/interop/grpc_testing"
78 )
79
80 var (
81 workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
82 fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
83 traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
84 fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
85 preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
86 fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will be ignored in unary mode",
87 strings.Join(allToggleModes, ", ")), allToggleModes)
88 channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
89 fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
90 compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
91 fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
92 networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
93 "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
94 readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
95 readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
96 readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
97 maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
98 readReqSizeBytes = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
99 readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
100 reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
101 respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
102 benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
103 memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
104 memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
105 "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
106 "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
107 cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
108 benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
109 useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
110 enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
111 "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
112 clientReadBufferSize = flags.IntSlice("clientReadBufferSize", []int{-1}, "Configures the client read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
113 clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
114 serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
115 serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
116 sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list")
117 connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams")
118 recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools)
119 sharedWriteBuffer = flags.StringWithAllowedValues("sharedWriteBuffer", toggleModeOff,
120 fmt.Sprintf("Configures both client and server to share write buffer - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
121
122 logger = grpclog.Component("benchmark")
123 )
124
125 const (
126 workloadsUnary = "unary"
127 workloadsStreaming = "streaming"
128 workloadsUnconstrained = "unconstrained"
129 workloadsAll = "all"
130
131 compModeOff = "off"
132 compModeGzip = "gzip"
133 compModeNop = "nop"
134 compModeAll = "all"
135
136 toggleModeOff = "off"
137 toggleModeOn = "on"
138 toggleModeBoth = "both"
139
140 networkModeNone = "none"
141 networkModeLocal = "Local"
142 networkModeLAN = "LAN"
143 networkModeWAN = "WAN"
144 networkLongHaul = "Longhaul"
145
146 recvBufferPoolNil = "nil"
147 recvBufferPoolSimple = "simple"
148 recvBufferPoolAll = "all"
149
150 numStatsBuckets = 10
151 warmupCallCount = 10
152 warmuptime = time.Second
153 )
154
155 var (
156 allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
157 allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
158 allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
159 allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
160 allRecvBufferPools = []string{recvBufferPoolNil, recvBufferPoolSimple, recvBufferPoolAll}
161 defaultReadLatency = []time.Duration{0, 40 * time.Millisecond}
162 defaultReadKbps = []int{0, 10240}
163 defaultReadMTU = []int{0}
164 defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
165 defaultReqSizeBytes = []int{1, 1024, 1024 * 1024}
166 defaultRespSizeBytes = []int{1, 1024, 1024 * 1024}
167 networks = map[string]latency.Network{
168 networkModeLocal: latency.Local,
169 networkModeLAN: latency.LAN,
170 networkModeWAN: latency.WAN,
171 networkLongHaul: latency.Longhaul,
172 }
173 keepaliveTime = 10 * time.Second
174 keepaliveTimeout = 1 * time.Second
175
176
177 keepaliveMinTime = 8 * time.Second
178 )
179
180
181
182 type runModes struct {
183 unary, streaming, unconstrained bool
184 }
185
186
187
188 func runModesFromWorkloads(workload string) runModes {
189 r := runModes{}
190 switch workload {
191 case workloadsUnary:
192 r.unary = true
193 case workloadsStreaming:
194 r.streaming = true
195 case workloadsUnconstrained:
196 r.unconstrained = true
197 case workloadsAll:
198 r.unary = true
199 r.streaming = true
200 r.unconstrained = true
201 default:
202 log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
203 workloads, strings.Join(allWorkloads, ", "))
204 }
205 return r
206 }
207
208 type startFunc func(mode string, bf stats.Features)
209 type stopFunc func(count uint64)
210 type ucStopFunc func(req uint64, resp uint64)
211 type rpcCallFunc func(cn, pos int)
212 type rpcSendFunc func(cn, pos int)
213 type rpcRecvFunc func(cn, pos int)
214 type rpcCleanupFunc func()
215
216 func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
217 caller, cleanup := makeFuncUnary(bf)
218 defer cleanup()
219 runBenchmark(caller, start, stop, bf, s, workloadsUnary)
220 }
221
222 func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
223 caller, cleanup := makeFuncStream(bf)
224 defer cleanup()
225 runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
226 }
227
228 func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features) {
229 var sender rpcSendFunc
230 var recver rpcRecvFunc
231 var cleanup rpcCleanupFunc
232 if bf.EnablePreloader {
233 sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
234 } else {
235 sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
236 }
237 defer cleanup()
238
239 var req, resp uint64
240 go func() {
241
242 <-time.NewTimer(warmuptime).C
243 atomic.StoreUint64(&req, 0)
244 atomic.StoreUint64(&resp, 0)
245 start(workloadsUnconstrained, bf)
246 }()
247
248 bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
249 var wg sync.WaitGroup
250 wg.Add(2 * bf.Connections * bf.MaxConcurrentCalls)
251 maxSleep := int(bf.SleepBetweenRPCs)
252 for cn := 0; cn < bf.Connections; cn++ {
253 for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
254 go func(cn, pos int) {
255 defer wg.Done()
256 for {
257 if maxSleep > 0 {
258 time.Sleep(time.Duration(rand.Intn(maxSleep)))
259 }
260 t := time.Now()
261 if t.After(bmEnd) {
262 return
263 }
264 sender(cn, pos)
265 atomic.AddUint64(&req, 1)
266 }
267 }(cn, pos)
268 go func(cn, pos int) {
269 defer wg.Done()
270 for {
271 t := time.Now()
272 if t.After(bmEnd) {
273 return
274 }
275 recver(cn, pos)
276 atomic.AddUint64(&resp, 1)
277 }
278 }(cn, pos)
279 }
280 }
281 wg.Wait()
282 stop(req, resp)
283 }
284
285
286
287
288
289 func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func()) {
290 nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
291 opts := []grpc.DialOption{}
292 sopts := []grpc.ServerOption{}
293 if bf.ModeCompressor == compModeNop {
294 sopts = append(sopts,
295 grpc.RPCCompressor(nopCompressor{}),
296 grpc.RPCDecompressor(nopDecompressor{}),
297 )
298 opts = append(opts,
299 grpc.WithCompressor(nopCompressor{}),
300 grpc.WithDecompressor(nopDecompressor{}),
301 )
302 }
303 if bf.ModeCompressor == compModeGzip {
304 sopts = append(sopts,
305 grpc.RPCCompressor(grpc.NewGZIPCompressor()),
306 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
307 )
308 opts = append(opts,
309 grpc.WithCompressor(grpc.NewGZIPCompressor()),
310 grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
311 )
312 }
313 if bf.EnableKeepalive {
314 sopts = append(sopts,
315 grpc.KeepaliveParams(keepalive.ServerParameters{
316 Time: keepaliveTime,
317 Timeout: keepaliveTimeout,
318 }),
319 grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
320 MinTime: keepaliveMinTime,
321 PermitWithoutStream: true,
322 }),
323 )
324 opts = append(opts,
325 grpc.WithKeepaliveParams(keepalive.ClientParameters{
326 Time: keepaliveTime,
327 Timeout: keepaliveTimeout,
328 PermitWithoutStream: true,
329 }),
330 )
331 }
332 if bf.ClientReadBufferSize >= 0 {
333 opts = append(opts, grpc.WithReadBufferSize(bf.ClientReadBufferSize))
334 }
335 if bf.ClientWriteBufferSize >= 0 {
336 opts = append(opts, grpc.WithWriteBufferSize(bf.ClientWriteBufferSize))
337 }
338 if bf.ServerReadBufferSize >= 0 {
339 sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize))
340 }
341 if bf.SharedWriteBuffer {
342 opts = append(opts, grpc.WithSharedWriteBuffer(true))
343 sopts = append(sopts, grpc.SharedWriteBuffer(true))
344 }
345 if bf.ServerWriteBufferSize >= 0 {
346 sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize))
347 }
348 switch bf.RecvBufferPool {
349 case recvBufferPoolNil:
350
351 case recvBufferPoolSimple:
352 opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool()))
353 sopts = append(sopts, experimental.RecvBufferPool(grpc.NewSharedBufferPool()))
354 default:
355 logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
356 }
357
358 sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
359 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
360
361 var lis net.Listener
362 if bf.UseBufConn {
363 bcLis := bufconn.Listen(256 * 1024)
364 lis = bcLis
365 opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
366 return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
367 return bcLis.Dial()
368 })(ctx, "", "")
369 }))
370 } else {
371 var err error
372 lis, err = net.Listen("tcp", "localhost:0")
373 if err != nil {
374 logger.Fatalf("Failed to listen: %v", err)
375 }
376 opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
377 return nw.ContextDialer((internal.NetDialerWithTCPKeepalive().DialContext))(ctx, "tcp", lis.Addr().String())
378 }))
379 }
380 lis = nw.Listener(lis)
381 stopper := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
382 conns := make([]*grpc.ClientConn, bf.Connections)
383 clients := make([]testgrpc.BenchmarkServiceClient, bf.Connections)
384 for cn := 0; cn < bf.Connections; cn++ {
385 conns[cn] = benchmark.NewClientConn("" , opts...)
386 clients[cn] = testgrpc.NewBenchmarkServiceClient(conns[cn])
387 }
388
389 return clients, func() {
390 for _, conn := range conns {
391 conn.Close()
392 }
393 stopper()
394 }
395 }
396
397 func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
398 clients, cleanup := makeClients(bf)
399 return func(cn, pos int) {
400 reqSizeBytes := bf.ReqSizeBytes
401 respSizeBytes := bf.RespSizeBytes
402 if bf.ReqPayloadCurve != nil {
403 reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
404 }
405 if bf.RespPayloadCurve != nil {
406 respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
407 }
408 unaryCaller(clients[cn], reqSizeBytes, respSizeBytes)
409 }, cleanup
410 }
411
412 func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
413 streams, req, cleanup := setupStream(bf, false)
414
415 var preparedMsg [][]*grpc.PreparedMsg
416 if bf.EnablePreloader {
417 preparedMsg = prepareMessages(streams, req)
418 }
419
420 return func(cn, pos int) {
421 reqSizeBytes := bf.ReqSizeBytes
422 respSizeBytes := bf.RespSizeBytes
423 if bf.ReqPayloadCurve != nil {
424 reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
425 }
426 if bf.RespPayloadCurve != nil {
427 respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
428 }
429 var req any
430 if bf.EnablePreloader {
431 req = preparedMsg[cn][pos]
432 } else {
433 pl := benchmark.NewPayload(testpb.PayloadType_COMPRESSABLE, reqSizeBytes)
434 req = &testpb.SimpleRequest{
435 ResponseType: pl.Type,
436 ResponseSize: int32(respSizeBytes),
437 Payload: pl,
438 }
439 }
440 streamCaller(streams[cn][pos], req)
441 }, cleanup
442 }
443
444 func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
445 streams, req, cleanup := setupStream(bf, true)
446
447 preparedMsg := prepareMessages(streams, req)
448
449 return func(cn, pos int) {
450 streams[cn][pos].SendMsg(preparedMsg[cn][pos])
451 }, func(cn, pos int) {
452 streams[cn][pos].Recv()
453 }, cleanup
454 }
455
456 func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
457 streams, req, cleanup := setupStream(bf, true)
458
459 return func(cn, pos int) {
460 streams[cn][pos].Send(req)
461 }, func(cn, pos int) {
462 streams[cn][pos].Recv()
463 }, cleanup
464 }
465
466 func setupStream(bf stats.Features, unconstrained bool) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
467 clients, cleanup := makeClients(bf)
468
469 streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
470 ctx := context.Background()
471 if unconstrained {
472 md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
473 ctx = metadata.NewOutgoingContext(ctx, md)
474 }
475 if bf.EnablePreloader {
476 md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
477 ctx = metadata.NewOutgoingContext(ctx, md)
478 }
479 for cn := 0; cn < bf.Connections; cn++ {
480 tc := clients[cn]
481 streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
482 for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
483 stream, err := tc.StreamingCall(ctx)
484 if err != nil {
485 logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
486 }
487 streams[cn][pos] = stream
488 }
489 }
490
491 pl := benchmark.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
492 req := &testpb.SimpleRequest{
493 ResponseType: pl.Type,
494 ResponseSize: int32(bf.RespSizeBytes),
495 Payload: pl,
496 }
497
498 return streams, req, cleanup
499 }
500
501 func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, req *testpb.SimpleRequest) [][]*grpc.PreparedMsg {
502 preparedMsg := make([][]*grpc.PreparedMsg, len(streams))
503 for cn, connStreams := range streams {
504 preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams))
505 for pos, stream := range connStreams {
506 preparedMsg[cn][pos] = &grpc.PreparedMsg{}
507 if err := preparedMsg[cn][pos].Encode(stream, req); err != nil {
508 logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err)
509 }
510 }
511 }
512 return preparedMsg
513 }
514
515
516
517 func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
518 if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
519 logger.Fatalf("DoUnaryCall failed: %v", err)
520 }
521 }
522
523 func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, req any) {
524 if err := benchmark.DoStreamingRoundTripPreloaded(stream, req); err != nil {
525 logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
526 }
527 }
528
529 func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
530
531
532
533 if bf.SleepBetweenRPCs == 0 {
534
535 for i := 0; i < warmupCallCount; i++ {
536 for cn := 0; cn < bf.Connections; cn++ {
537 caller(cn, 0)
538 }
539 }
540 }
541
542
543 start(mode, bf)
544 var wg sync.WaitGroup
545 wg.Add(bf.Connections * bf.MaxConcurrentCalls)
546 bmEnd := time.Now().Add(bf.BenchTime)
547 maxSleep := int(bf.SleepBetweenRPCs)
548 var count uint64
549 for cn := 0; cn < bf.Connections; cn++ {
550 for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
551 go func(cn, pos int) {
552 defer wg.Done()
553 for {
554 if maxSleep > 0 {
555 time.Sleep(time.Duration(rand.Intn(maxSleep)))
556 }
557 t := time.Now()
558 if t.After(bmEnd) {
559 return
560 }
561 start := time.Now()
562 caller(cn, pos)
563 elapse := time.Since(start)
564 atomic.AddUint64(&count, 1)
565 s.AddDuration(elapse)
566 }
567 }(cn, pos)
568 }
569 }
570 wg.Wait()
571 stop(count)
572 }
573
574
575
576 type benchOpts struct {
577 rModes runModes
578 benchTime time.Duration
579 memProfileRate int
580 memProfile string
581 cpuProfile string
582 networkMode string
583 benchmarkResultFile string
584 useBufconn bool
585 enableKeepalive bool
586 connections int
587 features *featureOpts
588 }
589
590
591
592
593
594 type featureOpts struct {
595 enableTrace []bool
596 readLatencies []time.Duration
597 readKbps []int
598 readMTU []int
599 maxConcurrentCalls []int
600 reqSizeBytes []int
601 respSizeBytes []int
602 reqPayloadCurves []*stats.PayloadCurve
603 respPayloadCurves []*stats.PayloadCurve
604 compModes []string
605 enableChannelz []bool
606 enablePreloader []bool
607 clientReadBufferSize []int
608 clientWriteBufferSize []int
609 serverReadBufferSize []int
610 serverWriteBufferSize []int
611 sleepBetweenRPCs []time.Duration
612 recvBufferPools []string
613 sharedWriteBuffer []bool
614 }
615
616
617
618
619
620
621
622 func makeFeaturesNum(b *benchOpts) []int {
623 featuresNum := make([]int, stats.MaxFeatureIndex)
624 for i := 0; i < len(featuresNum); i++ {
625 switch stats.FeatureIndex(i) {
626 case stats.EnableTraceIndex:
627 featuresNum[i] = len(b.features.enableTrace)
628 case stats.ReadLatenciesIndex:
629 featuresNum[i] = len(b.features.readLatencies)
630 case stats.ReadKbpsIndex:
631 featuresNum[i] = len(b.features.readKbps)
632 case stats.ReadMTUIndex:
633 featuresNum[i] = len(b.features.readMTU)
634 case stats.MaxConcurrentCallsIndex:
635 featuresNum[i] = len(b.features.maxConcurrentCalls)
636 case stats.ReqSizeBytesIndex:
637 featuresNum[i] = len(b.features.reqSizeBytes)
638 case stats.RespSizeBytesIndex:
639 featuresNum[i] = len(b.features.respSizeBytes)
640 case stats.ReqPayloadCurveIndex:
641 featuresNum[i] = len(b.features.reqPayloadCurves)
642 case stats.RespPayloadCurveIndex:
643 featuresNum[i] = len(b.features.respPayloadCurves)
644 case stats.CompModesIndex:
645 featuresNum[i] = len(b.features.compModes)
646 case stats.EnableChannelzIndex:
647 featuresNum[i] = len(b.features.enableChannelz)
648 case stats.EnablePreloaderIndex:
649 featuresNum[i] = len(b.features.enablePreloader)
650 case stats.ClientReadBufferSize:
651 featuresNum[i] = len(b.features.clientReadBufferSize)
652 case stats.ClientWriteBufferSize:
653 featuresNum[i] = len(b.features.clientWriteBufferSize)
654 case stats.ServerReadBufferSize:
655 featuresNum[i] = len(b.features.serverReadBufferSize)
656 case stats.ServerWriteBufferSize:
657 featuresNum[i] = len(b.features.serverWriteBufferSize)
658 case stats.SleepBetweenRPCs:
659 featuresNum[i] = len(b.features.sleepBetweenRPCs)
660 case stats.RecvBufferPool:
661 featuresNum[i] = len(b.features.recvBufferPools)
662 case stats.SharedWriteBuffer:
663 featuresNum[i] = len(b.features.sharedWriteBuffer)
664 default:
665 log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
666 }
667 }
668 return featuresNum
669 }
670
671
672
673
674
675 func sharedFeatures(featuresNum []int) []bool {
676 result := make([]bool, len(featuresNum))
677 for i, num := range featuresNum {
678 if num <= 1 {
679 result[i] = true
680 }
681 }
682 return result
683 }
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698 func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
699
700
701
702
703
704 var result []stats.Features
705 var curPos []int
706 initialPos := make([]int, stats.MaxFeatureIndex)
707 for !reflect.DeepEqual(initialPos, curPos) {
708 if curPos == nil {
709 curPos = make([]int, stats.MaxFeatureIndex)
710 }
711 f := stats.Features{
712
713 NetworkMode: b.networkMode,
714 UseBufConn: b.useBufconn,
715 EnableKeepalive: b.enableKeepalive,
716 BenchTime: b.benchTime,
717 Connections: b.connections,
718
719 EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
720 Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
721 Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
722 MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
723 MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
724 ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
725 EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
726 EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
727 ClientReadBufferSize: b.features.clientReadBufferSize[curPos[stats.ClientReadBufferSize]],
728 ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
729 ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
730 ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
731 SleepBetweenRPCs: b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]],
732 RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]],
733 SharedWriteBuffer: b.features.sharedWriteBuffer[curPos[stats.SharedWriteBuffer]],
734 }
735 if len(b.features.reqPayloadCurves) == 0 {
736 f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
737 } else {
738 f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
739 }
740 if len(b.features.respPayloadCurves) == 0 {
741 f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
742 } else {
743 f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
744 }
745 result = append(result, f)
746 addOne(curPos, featuresNum)
747 }
748 return result
749 }
750
751
752
753
754
755 func addOne(features []int, featuresMaxPosition []int) {
756 for i := len(features) - 1; i >= 0; i-- {
757 if featuresMaxPosition[i] == 0 {
758 continue
759 }
760 features[i] = (features[i] + 1)
761 if features[i]/featuresMaxPosition[i] == 0 {
762 break
763 }
764 features[i] = features[i] % featuresMaxPosition[i]
765 }
766 }
767
768
769
770
771
772
773 func processFlags() *benchOpts {
774 flag.Parse()
775 if flag.NArg() != 0 {
776 log.Fatal("Error: unparsed arguments: ", flag.Args())
777 }
778
779 opts := &benchOpts{
780 rModes: runModesFromWorkloads(*workloads),
781 benchTime: *benchTime,
782 memProfileRate: *memProfileRate,
783 memProfile: *memProfile,
784 cpuProfile: *cpuProfile,
785 networkMode: *networkMode,
786 benchmarkResultFile: *benchmarkResultFile,
787 useBufconn: *useBufconn,
788 enableKeepalive: *enableKeepalive,
789 connections: *connections,
790 features: &featureOpts{
791 enableTrace: setToggleMode(*traceMode),
792 readLatencies: append([]time.Duration(nil), *readLatency...),
793 readKbps: append([]int(nil), *readKbps...),
794 readMTU: append([]int(nil), *readMTU...),
795 maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
796 reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
797 respSizeBytes: append([]int(nil), *readRespSizeBytes...),
798 compModes: setCompressorMode(*compressorMode),
799 enableChannelz: setToggleMode(*channelzOn),
800 enablePreloader: setToggleMode(*preloaderMode),
801 clientReadBufferSize: append([]int(nil), *clientReadBufferSize...),
802 clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
803 serverReadBufferSize: append([]int(nil), *serverReadBufferSize...),
804 serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
805 sleepBetweenRPCs: append([]time.Duration(nil), *sleepBetweenRPCs...),
806 recvBufferPools: setRecvBufferPool(*recvBufferPool),
807 sharedWriteBuffer: setToggleMode(*sharedWriteBuffer),
808 },
809 }
810
811 if len(*reqPayloadCurveFiles) == 0 {
812 if len(opts.features.reqSizeBytes) == 0 {
813 opts.features.reqSizeBytes = defaultReqSizeBytes
814 }
815 } else {
816 if len(opts.features.reqSizeBytes) != 0 {
817 log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
818 }
819 if len(opts.features.enablePreloader) != 0 {
820 log.Fatalf("you may not specify -reqPayloadCurveFiles and -preloader at the same time")
821 }
822 for _, file := range *reqPayloadCurveFiles {
823 pc, err := stats.NewPayloadCurve(file)
824 if err != nil {
825 log.Fatalf("cannot load payload curve file %s: %v", file, err)
826 }
827 opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
828 }
829 opts.features.reqSizeBytes = nil
830 }
831 if len(*respPayloadCurveFiles) == 0 {
832 if len(opts.features.respSizeBytes) == 0 {
833 opts.features.respSizeBytes = defaultRespSizeBytes
834 }
835 } else {
836 if len(opts.features.respSizeBytes) != 0 {
837 log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
838 }
839 if len(opts.features.enablePreloader) != 0 {
840 log.Fatalf("you may not specify -respPayloadCurveFiles and -preloader at the same time")
841 }
842 for _, file := range *respPayloadCurveFiles {
843 pc, err := stats.NewPayloadCurve(file)
844 if err != nil {
845 log.Fatalf("cannot load payload curve file %s: %v", file, err)
846 }
847 opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
848 }
849 opts.features.respSizeBytes = nil
850 }
851
852
853 if network, ok := networks[opts.networkMode]; ok {
854 opts.features.readLatencies = []time.Duration{network.Latency}
855 opts.features.readKbps = []int{network.Kbps}
856 opts.features.readMTU = []int{network.MTU}
857 }
858 return opts
859 }
860
861 func setToggleMode(val string) []bool {
862 switch val {
863 case toggleModeOn:
864 return []bool{true}
865 case toggleModeOff:
866 return []bool{false}
867 case toggleModeBoth:
868 return []bool{false, true}
869 default:
870
871
872 return []bool{}
873 }
874 }
875
876 func setCompressorMode(val string) []string {
877 switch val {
878 case compModeNop, compModeGzip, compModeOff:
879 return []string{val}
880 case compModeAll:
881 return []string{compModeNop, compModeGzip, compModeOff}
882 default:
883
884
885 return []string{}
886 }
887 }
888
889 func setRecvBufferPool(val string) []string {
890 switch val {
891 case recvBufferPoolNil, recvBufferPoolSimple:
892 return []string{val}
893 case recvBufferPoolAll:
894 return []string{recvBufferPoolNil, recvBufferPoolSimple}
895 default:
896
897
898 return []string{}
899 }
900 }
901
902 func main() {
903 opts := processFlags()
904 before(opts)
905
906 s := stats.NewStats(numStatsBuckets)
907 featuresNum := makeFeaturesNum(opts)
908 sf := sharedFeatures(featuresNum)
909
910 var (
911 start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
912 stop = func(count uint64) { s.EndRun(count) }
913 ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
914 )
915
916 for _, bf := range opts.generateFeatures(featuresNum) {
917 grpc.EnableTracing = bf.EnableTrace
918 if bf.EnableChannelz {
919 channelz.TurnOn()
920 }
921 if opts.rModes.unary {
922 unaryBenchmark(start, stop, bf, s)
923 }
924 if opts.rModes.streaming {
925 streamBenchmark(start, stop, bf, s)
926 }
927 if opts.rModes.unconstrained {
928 unconstrainedStreamBenchmark(start, ucStop, bf)
929 }
930 }
931 after(opts, s.GetResults())
932 }
933
934 func before(opts *benchOpts) {
935 if opts.memProfile != "" {
936 runtime.MemProfileRate = opts.memProfileRate
937 }
938 if opts.cpuProfile != "" {
939 f, err := os.Create(opts.cpuProfile)
940 if err != nil {
941 fmt.Fprintf(os.Stderr, "testing: %s\n", err)
942 return
943 }
944 if err := pprof.StartCPUProfile(f); err != nil {
945 fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
946 f.Close()
947 return
948 }
949 }
950 }
951
952 func after(opts *benchOpts, data []stats.BenchResults) {
953 if opts.cpuProfile != "" {
954 pprof.StopCPUProfile()
955 }
956 if opts.memProfile != "" {
957 f, err := os.Create(opts.memProfile)
958 if err != nil {
959 fmt.Fprintf(os.Stderr, "testing: %s\n", err)
960 os.Exit(2)
961 }
962 runtime.GC()
963 if err = pprof.WriteHeapProfile(f); err != nil {
964 fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
965 os.Exit(2)
966 }
967 f.Close()
968 }
969 if opts.benchmarkResultFile != "" {
970 f, err := os.Create(opts.benchmarkResultFile)
971 if err != nil {
972 log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
973 }
974 dataEncoder := gob.NewEncoder(f)
975 dataEncoder.Encode(data)
976 f.Close()
977 }
978 }
979
980
981 type nopCompressor struct{}
982
983 func (nopCompressor) Do(w io.Writer, p []byte) error {
984 n, err := w.Write(p)
985 if err != nil {
986 return err
987 }
988 if n != len(p) {
989 return fmt.Errorf("nopCompressor.Write: wrote %d bytes; want %d", n, len(p))
990 }
991 return nil
992 }
993
994 func (nopCompressor) Type() string { return compModeNop }
995
996
997 type nopDecompressor struct{}
998
999 func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return io.ReadAll(r) }
1000 func (nopDecompressor) Type() string { return compModeNop }
1001
View as plain text