1
18
19
39 package main
40
41 import (
42 "context"
43 "flag"
44 "fmt"
45 "os"
46 "runtime"
47 "runtime/pprof"
48 "sync"
49 "time"
50
51 "google.golang.org/grpc"
52 "google.golang.org/grpc/benchmark"
53 "google.golang.org/grpc/benchmark/stats"
54 "google.golang.org/grpc/credentials/insecure"
55 "google.golang.org/grpc/grpclog"
56 "google.golang.org/grpc/internal/syscall"
57
58 testgrpc "google.golang.org/grpc/interop/grpc_testing"
59 testpb "google.golang.org/grpc/interop/grpc_testing"
60 )
61
62 var (
63 port = flag.String("port", "50051", "Localhost port to connect to.")
64 numRPC = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
65 numConn = flag.Int("c", 1, "The number of parallel connections.")
66 warmupDur = flag.Int("w", 10, "Warm-up duration in seconds")
67 duration = flag.Int("d", 60, "Benchmark duration in seconds")
68 rqSize = flag.Int("req", 1, "Request message size in bytes.")
69 rspSize = flag.Int("resp", 1, "Response message size in bytes.")
70 rpcType = flag.String("rpc_type", "unary",
71 `Configure different client rpc type. Valid options are:
72 unary;
73 streaming.`)
74 testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
75 wg sync.WaitGroup
76 hopts = stats.HistogramOptions{
77 NumBuckets: 2495,
78 GrowthFactor: .01,
79 }
80 mu sync.Mutex
81 hists []*stats.Histogram
82
83 logger = grpclog.Component("benchmark")
84 )
85
86 func main() {
87 flag.Parse()
88 if *testName == "" {
89 logger.Fatal("-test_name not set")
90 }
91 req := &testpb.SimpleRequest{
92 ResponseType: testpb.PayloadType_COMPRESSABLE,
93 ResponseSize: int32(*rspSize),
94 Payload: &testpb.Payload{
95 Type: testpb.PayloadType_COMPRESSABLE,
96 Body: make([]byte, *rqSize),
97 },
98 }
99 connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
100 defer connectCancel()
101 ccs := buildConnections(connectCtx)
102 warmDeadline := time.Now().Add(time.Duration(*warmupDur) * time.Second)
103 endDeadline := warmDeadline.Add(time.Duration(*duration) * time.Second)
104 cf, err := os.Create("/tmp/" + *testName + ".cpu")
105 if err != nil {
106 logger.Fatalf("Error creating file: %v", err)
107 }
108 defer cf.Close()
109 pprof.StartCPUProfile(cf)
110 cpuBeg := syscall.GetCPUTime()
111 for _, cc := range ccs {
112 runWithConn(cc, req, warmDeadline, endDeadline)
113 }
114 wg.Wait()
115 cpu := time.Duration(syscall.GetCPUTime() - cpuBeg)
116 pprof.StopCPUProfile()
117 mf, err := os.Create("/tmp/" + *testName + ".mem")
118 if err != nil {
119 logger.Fatalf("Error creating file: %v", err)
120 }
121 defer mf.Close()
122 runtime.GC()
123 if err := pprof.WriteHeapProfile(mf); err != nil {
124 logger.Fatalf("Error writing memory profile: %v", err)
125 }
126 hist := stats.NewHistogram(hopts)
127 for _, h := range hists {
128 hist.Merge(h)
129 }
130 parseHist(hist)
131 fmt.Println("Client CPU utilization:", cpu)
132 fmt.Println("Client CPU profile:", cf.Name())
133 fmt.Println("Client Mem Profile:", mf.Name())
134 }
135
136 func buildConnections(ctx context.Context) []*grpc.ClientConn {
137 ccs := make([]*grpc.ClientConn, *numConn)
138 for i := range ccs {
139 ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port,
140 grpc.WithTransportCredentials(insecure.NewCredentials()),
141 grpc.WithBlock(),
142 grpc.WithWriteBufferSize(128*1024),
143 grpc.WithReadBufferSize(128*1024),
144 )
145 }
146 return ccs
147 }
148
149 func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
150 for i := 0; i < *numRPC; i++ {
151 wg.Add(1)
152 go func() {
153 defer wg.Done()
154 caller := makeCaller(cc, req)
155 hist := stats.NewHistogram(hopts)
156 for {
157 start := time.Now()
158 if start.After(endDeadline) {
159 mu.Lock()
160 hists = append(hists, hist)
161 mu.Unlock()
162 return
163 }
164 caller()
165 elapsed := time.Since(start)
166 if start.After(warmDeadline) {
167 hist.Add(elapsed.Nanoseconds())
168 }
169 }
170 }()
171 }
172 }
173
174 func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
175 client := testgrpc.NewBenchmarkServiceClient(cc)
176 if *rpcType == "unary" {
177 return func() {
178 if _, err := client.UnaryCall(context.Background(), req); err != nil {
179 logger.Fatalf("RPC failed: %v", err)
180 }
181 }
182 }
183 stream, err := client.StreamingCall(context.Background())
184 if err != nil {
185 logger.Fatalf("RPC failed: %v", err)
186 }
187 return func() {
188 if err := stream.Send(req); err != nil {
189 logger.Fatalf("Streaming RPC failed to send: %v", err)
190 }
191 if _, err := stream.Recv(); err != nil {
192 logger.Fatalf("Streaming RPC failed to read: %v", err)
193 }
194 }
195 }
196
197 func parseHist(hist *stats.Histogram) {
198 fmt.Println("qps:", float64(hist.Count)/float64(*duration))
199 fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
200 time.Duration(median(.5, hist)),
201 time.Duration(median(.9, hist)),
202 time.Duration(median(.99, hist)))
203 }
204
205 func median(percentile float64, h *stats.Histogram) int64 {
206 need := int64(float64(h.Count) * percentile)
207 have := int64(0)
208 for _, bucket := range h.Buckets {
209 count := bucket.Count
210 if have+count >= need {
211 percent := float64(need-have) / float64(count)
212 return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
213 }
214 have += bucket.Count
215 }
216 panic("should have found a bound")
217 }
218
View as plain text