1
18
19
20 package stats
21
22 import (
23 "bytes"
24 "fmt"
25 "log"
26 "math"
27 "runtime"
28 "sort"
29 "strconv"
30 "sync"
31 "time"
32
33 "google.golang.org/grpc"
34 )
35
36
37
38
39 type FeatureIndex int
40
41
42 const (
43 EnableTraceIndex FeatureIndex = iota
44 ReadLatenciesIndex
45 ReadKbpsIndex
46 ReadMTUIndex
47 MaxConcurrentCallsIndex
48 ReqSizeBytesIndex
49 RespSizeBytesIndex
50 ReqPayloadCurveIndex
51 RespPayloadCurveIndex
52 CompModesIndex
53 EnableChannelzIndex
54 EnablePreloaderIndex
55 ClientReadBufferSize
56 ClientWriteBufferSize
57 ServerReadBufferSize
58 ServerWriteBufferSize
59 SleepBetweenRPCs
60 RecvBufferPool
61 SharedWriteBuffer
62
63
64
65 MaxFeatureIndex
66 )
67
68
69
70
71
72 type Features struct {
73
74
75 NetworkMode string
76
77
78 UseBufConn bool
79
80
81 EnableKeepalive bool
82
83 BenchTime time.Duration
84
85 Connections int
86
87
88
89
90
91
92
93 EnableTrace bool
94
95 Latency time.Duration
96
97 Kbps int
98
99 MTU int
100
101
102 MaxConcurrentCalls int
103
104
105 ReqSizeBytes int
106
107
108 RespSizeBytes int
109
110
111 ReqPayloadCurve *PayloadCurve
112
113
114 RespPayloadCurve *PayloadCurve
115
116 ModeCompressor string
117
118 EnableChannelz bool
119
120 EnablePreloader bool
121
122 ClientReadBufferSize int
123
124 ClientWriteBufferSize int
125
126 ServerReadBufferSize int
127
128 ServerWriteBufferSize int
129
130 SleepBetweenRPCs time.Duration
131
132 RecvBufferPool string
133
134 SharedWriteBuffer bool
135 }
136
137
138 func (f Features) String() string {
139 var reqPayloadString, respPayloadString string
140 if f.ReqPayloadCurve != nil {
141 reqPayloadString = fmt.Sprintf("reqPayloadCurve_%s", f.ReqPayloadCurve.ShortHash())
142 } else {
143 reqPayloadString = fmt.Sprintf("reqSize_%vB", f.ReqSizeBytes)
144 }
145 if f.RespPayloadCurve != nil {
146 respPayloadString = fmt.Sprintf("respPayloadCurve_%s", f.RespPayloadCurve.ShortHash())
147 } else {
148 respPayloadString = fmt.Sprintf("respSize_%vB", f.RespSizeBytes)
149 }
150 return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+
151 "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+
152 "compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+
153 "clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+
154 "sleepBetweenRPCs_%v-connections_%v-recvBufferPool_%v-sharedWriteBuffer_%v",
155 f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace,
156 f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString,
157 respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader,
158 f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize,
159 f.ServerWriteBufferSize, f.SleepBetweenRPCs, f.Connections,
160 f.RecvBufferPool, f.SharedWriteBuffer)
161 }
162
163
164
165 func (f Features) SharedFeatures(wantFeatures []bool) string {
166 var b bytes.Buffer
167 if f.NetworkMode != "" {
168 b.WriteString(fmt.Sprintf("Network: %v\n", f.NetworkMode))
169 }
170 if f.UseBufConn {
171 b.WriteString(fmt.Sprintf("UseBufConn: %v\n", f.UseBufConn))
172 }
173 if f.EnableKeepalive {
174 b.WriteString(fmt.Sprintf("EnableKeepalive: %v\n", f.EnableKeepalive))
175 }
176 b.WriteString(fmt.Sprintf("BenchTime: %v\n", f.BenchTime))
177 f.partialString(&b, wantFeatures, ": ", "\n")
178 return b.String()
179 }
180
181
182
183
184 func (f Features) PrintableName(wantFeatures []bool) string {
185 var b bytes.Buffer
186 f.partialString(&b, wantFeatures, "_", "-")
187 return b.String()
188 }
189
190
191
192 func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim string) {
193 for i, sf := range wantFeatures {
194 if sf {
195 switch FeatureIndex(i) {
196 case EnableTraceIndex:
197 b.WriteString(fmt.Sprintf("Trace%v%v%v", sep, f.EnableTrace, delim))
198 case ReadLatenciesIndex:
199 b.WriteString(fmt.Sprintf("Latency%v%v%v", sep, f.Latency, delim))
200 case ReadKbpsIndex:
201 b.WriteString(fmt.Sprintf("Kbps%v%v%v", sep, f.Kbps, delim))
202 case ReadMTUIndex:
203 b.WriteString(fmt.Sprintf("MTU%v%v%v", sep, f.MTU, delim))
204 case MaxConcurrentCallsIndex:
205 b.WriteString(fmt.Sprintf("Callers%v%v%v", sep, f.MaxConcurrentCalls, delim))
206 case ReqSizeBytesIndex:
207 b.WriteString(fmt.Sprintf("ReqSize%v%vB%v", sep, f.ReqSizeBytes, delim))
208 case RespSizeBytesIndex:
209 b.WriteString(fmt.Sprintf("RespSize%v%vB%v", sep, f.RespSizeBytes, delim))
210 case ReqPayloadCurveIndex:
211 if f.ReqPayloadCurve != nil {
212 b.WriteString(fmt.Sprintf("ReqPayloadCurve%vSHA-256:%v%v", sep, f.ReqPayloadCurve.Hash(), delim))
213 }
214 case RespPayloadCurveIndex:
215 if f.RespPayloadCurve != nil {
216 b.WriteString(fmt.Sprintf("RespPayloadCurve%vSHA-256:%v%v", sep, f.RespPayloadCurve.Hash(), delim))
217 }
218 case CompModesIndex:
219 b.WriteString(fmt.Sprintf("Compressor%v%v%v", sep, f.ModeCompressor, delim))
220 case EnableChannelzIndex:
221 b.WriteString(fmt.Sprintf("Channelz%v%v%v", sep, f.EnableChannelz, delim))
222 case EnablePreloaderIndex:
223 b.WriteString(fmt.Sprintf("Preloader%v%v%v", sep, f.EnablePreloader, delim))
224 case ClientReadBufferSize:
225 b.WriteString(fmt.Sprintf("ClientReadBufferSize%v%v%v", sep, f.ClientReadBufferSize, delim))
226 case ClientWriteBufferSize:
227 b.WriteString(fmt.Sprintf("ClientWriteBufferSize%v%v%v", sep, f.ClientWriteBufferSize, delim))
228 case ServerReadBufferSize:
229 b.WriteString(fmt.Sprintf("ServerReadBufferSize%v%v%v", sep, f.ServerReadBufferSize, delim))
230 case ServerWriteBufferSize:
231 b.WriteString(fmt.Sprintf("ServerWriteBufferSize%v%v%v", sep, f.ServerWriteBufferSize, delim))
232 case SleepBetweenRPCs:
233 b.WriteString(fmt.Sprintf("SleepBetweenRPCs%v%v%v", sep, f.SleepBetweenRPCs, delim))
234 case RecvBufferPool:
235 b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim))
236 case SharedWriteBuffer:
237 b.WriteString(fmt.Sprintf("SharedWriteBuffer%v%v%v", sep, f.SharedWriteBuffer, delim))
238 default:
239 log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex)
240 }
241 }
242 }
243 }
244
245
246
247
248
249 type BenchResults struct {
250
251 GoVersion string
252
253 GrpcVersion string
254
255
256 RunMode string
257
258 Features Features
259
260
261
262
263 SharedFeatures []bool
264
265 Data RunData
266 }
267
268
269 type RunData struct {
270
271
272 TotalOps uint64
273
274
275 SendOps uint64
276
277
278 RecvOps uint64
279
280 AllocedBytes float64
281
282 Allocs float64
283
284 ReqT float64
285
286 RespT float64
287
288
289
290
291
292
293 Fiftieth time.Duration
294
295 Ninetieth time.Duration
296
297 NinetyNinth time.Duration
298
299 Average time.Duration
300 }
301
302 type durationSlice []time.Duration
303
304 func (a durationSlice) Len() int { return len(a) }
305 func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
306 func (a durationSlice) Less(i, j int) bool { return a[i] < a[j] }
307
308
309 type Stats struct {
310 mu sync.Mutex
311 numBuckets int
312 hw *histWrapper
313 results []BenchResults
314 startMS runtime.MemStats
315 stopMS runtime.MemStats
316 }
317
318 type histWrapper struct {
319 unit time.Duration
320 histogram *Histogram
321 durations durationSlice
322 }
323
324
325
326 func NewStats(numBuckets int) *Stats {
327 if numBuckets <= 0 {
328 numBuckets = 16
329 }
330
331 s := &Stats{numBuckets: numBuckets + 1}
332 s.hw = &histWrapper{}
333 return s
334 }
335
336
337 func (s *Stats) StartRun(mode string, f Features, sf []bool) {
338 s.mu.Lock()
339 defer s.mu.Unlock()
340
341 runtime.ReadMemStats(&s.startMS)
342 s.results = append(s.results, BenchResults{
343 GoVersion: runtime.Version(),
344 GrpcVersion: grpc.Version,
345 RunMode: mode,
346 Features: f,
347 SharedFeatures: sf,
348 })
349 }
350
351
352
353 func (s *Stats) EndRun(count uint64) {
354 s.mu.Lock()
355 defer s.mu.Unlock()
356
357 runtime.ReadMemStats(&s.stopMS)
358 r := &s.results[len(s.results)-1]
359 r.Data = RunData{
360 TotalOps: count,
361 AllocedBytes: float64(s.stopMS.TotalAlloc-s.startMS.TotalAlloc) / float64(count),
362 Allocs: float64(s.stopMS.Mallocs-s.startMS.Mallocs) / float64(count),
363 ReqT: float64(count) * float64(r.Features.ReqSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
364 RespT: float64(count) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
365 }
366 s.computeLatencies(r)
367 s.dump(r)
368 s.hw = &histWrapper{}
369 }
370
371
372
373 func (s *Stats) EndUnconstrainedRun(req uint64, resp uint64) {
374 s.mu.Lock()
375 defer s.mu.Unlock()
376
377 runtime.ReadMemStats(&s.stopMS)
378 r := &s.results[len(s.results)-1]
379 r.Data = RunData{
380 SendOps: req,
381 RecvOps: resp,
382 AllocedBytes: float64(s.stopMS.TotalAlloc-s.startMS.TotalAlloc) / float64((req+resp)/2),
383 Allocs: float64(s.stopMS.Mallocs-s.startMS.Mallocs) / float64((req+resp)/2),
384 ReqT: float64(req) * float64(r.Features.ReqSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
385 RespT: float64(resp) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
386 }
387 s.computeLatencies(r)
388 s.dump(r)
389 s.hw = &histWrapper{}
390 }
391
392
393
394 func (s *Stats) AddDuration(d time.Duration) {
395 s.mu.Lock()
396 defer s.mu.Unlock()
397
398 s.hw.durations = append(s.hw.durations, d)
399 }
400
401
402 func (s *Stats) GetResults() []BenchResults {
403 s.mu.Lock()
404 defer s.mu.Unlock()
405
406 return s.results
407 }
408
409
410
411 func (s *Stats) computeLatencies(result *BenchResults) {
412 if len(s.hw.durations) == 0 {
413 return
414 }
415 sort.Sort(s.hw.durations)
416 minDuration := int64(s.hw.durations[0])
417 maxDuration := int64(s.hw.durations[len(s.hw.durations)-1])
418
419
420 s.hw.unit = time.Nanosecond
421 for _, u := range []time.Duration{time.Microsecond, time.Millisecond, time.Second} {
422 if minDuration <= int64(u) {
423 break
424 }
425 s.hw.unit = u
426 }
427
428 numBuckets := s.numBuckets
429 if n := int(maxDuration - minDuration + 1); n < numBuckets {
430 numBuckets = n
431 }
432 s.hw.histogram = NewHistogram(HistogramOptions{
433 NumBuckets: numBuckets,
434
435 GrowthFactor: math.Pow(float64(maxDuration-minDuration), 1/float64(numBuckets-2)) - 1,
436 BaseBucketSize: 1.0,
437 MinValue: minDuration,
438 })
439 for _, d := range s.hw.durations {
440 s.hw.histogram.Add(int64(d))
441 }
442 result.Data.Fiftieth = s.hw.durations[max(s.hw.histogram.Count*int64(50)/100-1, 0)]
443 result.Data.Ninetieth = s.hw.durations[max(s.hw.histogram.Count*int64(90)/100-1, 0)]
444 result.Data.NinetyNinth = s.hw.durations[max(s.hw.histogram.Count*int64(99)/100-1, 0)]
445 result.Data.Average = time.Duration(float64(s.hw.histogram.Sum) / float64(s.hw.histogram.Count))
446 }
447
448
449 func (s *Stats) dump(result *BenchResults) {
450 var b bytes.Buffer
451
452
453 b.WriteString(fmt.Sprintf("%s/grpc%s\n", result.GoVersion, result.GrpcVersion))
454
455
456 b.WriteString(fmt.Sprintf("%s-%s:\n", result.RunMode, result.Features.String()))
457
458 unit := s.hw.unit
459 tUnit := fmt.Sprintf("%v", unit)[1:]
460
461 if l := result.Data.Fiftieth; l != 0 {
462 b.WriteString(fmt.Sprintf("50_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
463 }
464 if l := result.Data.Ninetieth; l != 0 {
465 b.WriteString(fmt.Sprintf("90_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
466 }
467 if l := result.Data.NinetyNinth; l != 0 {
468 b.WriteString(fmt.Sprintf("99_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
469 }
470 if l := result.Data.Average; l != 0 {
471 b.WriteString(fmt.Sprintf("Avg_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
472 }
473 b.WriteString(fmt.Sprintf("Bytes/op: %v\t", result.Data.AllocedBytes))
474 b.WriteString(fmt.Sprintf("Allocs/op: %v\t\n", result.Data.Allocs))
475
476
477 if s.hw.histogram == nil {
478 b.WriteString("Histogram (empty)\n")
479 } else {
480 b.WriteString(fmt.Sprintf("Histogram (unit: %s)\n", tUnit))
481 s.hw.histogram.PrintWithUnit(&b, float64(unit))
482 }
483
484
485 req := result.Data.SendOps
486 if req == 0 {
487 req = result.Data.TotalOps
488 }
489 resp := result.Data.RecvOps
490 if resp == 0 {
491 resp = result.Data.TotalOps
492 }
493 b.WriteString(fmt.Sprintf("Number of requests: %v\tRequest throughput: %v bit/s\n", req, result.Data.ReqT))
494 b.WriteString(fmt.Sprintf("Number of responses: %v\tResponse throughput: %v bit/s\n", resp, result.Data.RespT))
495 fmt.Println(b.String())
496 }
497
498 func max(a, b int64) int64 {
499 if a > b {
500 return a
501 }
502 return b
503 }
504
View as plain text