...

Source file src/google.golang.org/grpc/benchmark/client/main.go

Documentation: google.golang.org/grpc/benchmark/client

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  /*
    20  Package main provides a client used for benchmarking.  Before running the
    21  client, the user would need to launch the grpc server.
    22  
    23  To start the server before running the client, you can run look for the command
    24  under the following file:
    25  
    26  	benchmark/server/main.go
    27  
    28  After starting the server, the client can be run.  An example of how to run this
    29  command is:
    30  
    31  go run benchmark/client/main.go -test_name=grpc_test
    32  
    33  If the server is running on a different port than 50051, then use the port flag
    34  for the client to hit the server on the correct port.
    35  An example for how to run this command on a different port can be found here:
    36  
    37  go run benchmark/client/main.go -test_name=grpc_test -port=8080
    38  */
    39  package main
    40  
    41  import (
    42  	"context"
    43  	"flag"
    44  	"fmt"
    45  	"os"
    46  	"runtime"
    47  	"runtime/pprof"
    48  	"sync"
    49  	"time"
    50  
    51  	"google.golang.org/grpc"
    52  	"google.golang.org/grpc/benchmark"
    53  	"google.golang.org/grpc/benchmark/stats"
    54  	"google.golang.org/grpc/credentials/insecure"
    55  	"google.golang.org/grpc/grpclog"
    56  	"google.golang.org/grpc/internal/syscall"
    57  
    58  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    59  	testpb "google.golang.org/grpc/interop/grpc_testing"
    60  )
    61  
    62  var (
    63  	port      = flag.String("port", "50051", "Localhost port to connect to.")
    64  	numRPC    = flag.Int("r", 1, "The number of concurrent RPCs on each connection.")
    65  	numConn   = flag.Int("c", 1, "The number of parallel connections.")
    66  	warmupDur = flag.Int("w", 10, "Warm-up duration in seconds")
    67  	duration  = flag.Int("d", 60, "Benchmark duration in seconds")
    68  	rqSize    = flag.Int("req", 1, "Request message size in bytes.")
    69  	rspSize   = flag.Int("resp", 1, "Response message size in bytes.")
    70  	rpcType   = flag.String("rpc_type", "unary",
    71  		`Configure different client rpc type. Valid options are:
    72  		   unary;
    73  		   streaming.`)
    74  	testName = flag.String("test_name", "", "Name of the test used for creating profiles.")
    75  	wg       sync.WaitGroup
    76  	hopts    = stats.HistogramOptions{
    77  		NumBuckets:   2495,
    78  		GrowthFactor: .01,
    79  	}
    80  	mu    sync.Mutex
    81  	hists []*stats.Histogram
    82  
    83  	logger = grpclog.Component("benchmark")
    84  )
    85  
    86  func main() {
    87  	flag.Parse()
    88  	if *testName == "" {
    89  		logger.Fatal("-test_name not set")
    90  	}
    91  	req := &testpb.SimpleRequest{
    92  		ResponseType: testpb.PayloadType_COMPRESSABLE,
    93  		ResponseSize: int32(*rspSize),
    94  		Payload: &testpb.Payload{
    95  			Type: testpb.PayloadType_COMPRESSABLE,
    96  			Body: make([]byte, *rqSize),
    97  		},
    98  	}
    99  	connectCtx, connectCancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
   100  	defer connectCancel()
   101  	ccs := buildConnections(connectCtx)
   102  	warmDeadline := time.Now().Add(time.Duration(*warmupDur) * time.Second)
   103  	endDeadline := warmDeadline.Add(time.Duration(*duration) * time.Second)
   104  	cf, err := os.Create("/tmp/" + *testName + ".cpu")
   105  	if err != nil {
   106  		logger.Fatalf("Error creating file: %v", err)
   107  	}
   108  	defer cf.Close()
   109  	pprof.StartCPUProfile(cf)
   110  	cpuBeg := syscall.GetCPUTime()
   111  	for _, cc := range ccs {
   112  		runWithConn(cc, req, warmDeadline, endDeadline)
   113  	}
   114  	wg.Wait()
   115  	cpu := time.Duration(syscall.GetCPUTime() - cpuBeg)
   116  	pprof.StopCPUProfile()
   117  	mf, err := os.Create("/tmp/" + *testName + ".mem")
   118  	if err != nil {
   119  		logger.Fatalf("Error creating file: %v", err)
   120  	}
   121  	defer mf.Close()
   122  	runtime.GC() // materialize all statistics
   123  	if err := pprof.WriteHeapProfile(mf); err != nil {
   124  		logger.Fatalf("Error writing memory profile: %v", err)
   125  	}
   126  	hist := stats.NewHistogram(hopts)
   127  	for _, h := range hists {
   128  		hist.Merge(h)
   129  	}
   130  	parseHist(hist)
   131  	fmt.Println("Client CPU utilization:", cpu)
   132  	fmt.Println("Client CPU profile:", cf.Name())
   133  	fmt.Println("Client Mem Profile:", mf.Name())
   134  }
   135  
   136  func buildConnections(ctx context.Context) []*grpc.ClientConn {
   137  	ccs := make([]*grpc.ClientConn, *numConn)
   138  	for i := range ccs {
   139  		ccs[i] = benchmark.NewClientConnWithContext(ctx, "localhost:"+*port,
   140  			grpc.WithTransportCredentials(insecure.NewCredentials()),
   141  			grpc.WithBlock(),
   142  			grpc.WithWriteBufferSize(128*1024),
   143  			grpc.WithReadBufferSize(128*1024),
   144  		)
   145  	}
   146  	return ccs
   147  }
   148  
   149  func runWithConn(cc *grpc.ClientConn, req *testpb.SimpleRequest, warmDeadline, endDeadline time.Time) {
   150  	for i := 0; i < *numRPC; i++ {
   151  		wg.Add(1)
   152  		go func() {
   153  			defer wg.Done()
   154  			caller := makeCaller(cc, req)
   155  			hist := stats.NewHistogram(hopts)
   156  			for {
   157  				start := time.Now()
   158  				if start.After(endDeadline) {
   159  					mu.Lock()
   160  					hists = append(hists, hist)
   161  					mu.Unlock()
   162  					return
   163  				}
   164  				caller()
   165  				elapsed := time.Since(start)
   166  				if start.After(warmDeadline) {
   167  					hist.Add(elapsed.Nanoseconds())
   168  				}
   169  			}
   170  		}()
   171  	}
   172  }
   173  
   174  func makeCaller(cc *grpc.ClientConn, req *testpb.SimpleRequest) func() {
   175  	client := testgrpc.NewBenchmarkServiceClient(cc)
   176  	if *rpcType == "unary" {
   177  		return func() {
   178  			if _, err := client.UnaryCall(context.Background(), req); err != nil {
   179  				logger.Fatalf("RPC failed: %v", err)
   180  			}
   181  		}
   182  	}
   183  	stream, err := client.StreamingCall(context.Background())
   184  	if err != nil {
   185  		logger.Fatalf("RPC failed: %v", err)
   186  	}
   187  	return func() {
   188  		if err := stream.Send(req); err != nil {
   189  			logger.Fatalf("Streaming RPC failed to send: %v", err)
   190  		}
   191  		if _, err := stream.Recv(); err != nil {
   192  			logger.Fatalf("Streaming RPC failed to read: %v", err)
   193  		}
   194  	}
   195  }
   196  
   197  func parseHist(hist *stats.Histogram) {
   198  	fmt.Println("qps:", float64(hist.Count)/float64(*duration))
   199  	fmt.Printf("Latency: (50/90/99 %%ile): %v/%v/%v\n",
   200  		time.Duration(median(.5, hist)),
   201  		time.Duration(median(.9, hist)),
   202  		time.Duration(median(.99, hist)))
   203  }
   204  
   205  func median(percentile float64, h *stats.Histogram) int64 {
   206  	need := int64(float64(h.Count) * percentile)
   207  	have := int64(0)
   208  	for _, bucket := range h.Buckets {
   209  		count := bucket.Count
   210  		if have+count >= need {
   211  			percent := float64(need-have) / float64(count)
   212  			return int64((1.0-percent)*bucket.LowBound + percent*bucket.LowBound*(1.0+hopts.GrowthFactor))
   213  		}
   214  		have += bucket.Count
   215  	}
   216  	panic("should have found a bound")
   217  }
   218  

View as plain text