...

Source file src/google.golang.org/grpc/benchmark/worker/benchmark_client.go

Documentation: google.golang.org/grpc/benchmark/worker

     1  /*
     2   *
     3   * Copyright 2016 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package main
    20  
    21  import (
    22  	"context"
    23  	"flag"
    24  	"math"
    25  	"runtime"
    26  	"sync"
    27  	"time"
    28  
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/benchmark"
    31  	"google.golang.org/grpc/benchmark/stats"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/credentials"
    34  	"google.golang.org/grpc/credentials/insecure"
    35  	"google.golang.org/grpc/internal/grpcrand"
    36  	"google.golang.org/grpc/internal/syscall"
    37  	"google.golang.org/grpc/status"
    38  	"google.golang.org/grpc/testdata"
    39  
    40  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    41  	testpb "google.golang.org/grpc/interop/grpc_testing"
    42  
    43  	_ "google.golang.org/grpc/xds" // To install the xds resolvers and balancers.
    44  )
    45  
    46  var caFile = flag.String("ca_file", "", "The file containing the CA root cert file")
    47  
    48  type lockingHistogram struct {
    49  	mu        sync.Mutex
    50  	histogram *stats.Histogram
    51  }
    52  
    53  func (h *lockingHistogram) add(value int64) {
    54  	h.mu.Lock()
    55  	defer h.mu.Unlock()
    56  	h.histogram.Add(value)
    57  }
    58  
    59  // swap sets h.histogram to o and returns its old value.
    60  func (h *lockingHistogram) swap(o *stats.Histogram) *stats.Histogram {
    61  	h.mu.Lock()
    62  	defer h.mu.Unlock()
    63  	old := h.histogram
    64  	h.histogram = o
    65  	return old
    66  }
    67  
    68  func (h *lockingHistogram) mergeInto(merged *stats.Histogram) {
    69  	h.mu.Lock()
    70  	defer h.mu.Unlock()
    71  	merged.Merge(h.histogram)
    72  }
    73  
    74  type benchmarkClient struct {
    75  	closeConns        func()
    76  	stop              chan bool
    77  	lastResetTime     time.Time
    78  	histogramOptions  stats.HistogramOptions
    79  	lockingHistograms []lockingHistogram
    80  	rusageLastReset   *syscall.Rusage
    81  }
    82  
    83  func printClientConfig(config *testpb.ClientConfig) {
    84  	// Some config options are ignored:
    85  	// - client type:
    86  	//     will always create sync client
    87  	// - async client threads.
    88  	// - core list
    89  	logger.Infof(" * client type: %v (ignored, always creates sync client)", config.ClientType)
    90  	logger.Infof(" * async client threads: %v (ignored)", config.AsyncClientThreads)
    91  	// TODO: use cores specified by CoreList when setting list of cores is supported in go.
    92  	logger.Infof(" * core list: %v (ignored)", config.CoreList)
    93  
    94  	logger.Infof(" - security params: %v", config.SecurityParams)
    95  	logger.Infof(" - core limit: %v", config.CoreLimit)
    96  	logger.Infof(" - payload config: %v", config.PayloadConfig)
    97  	logger.Infof(" - rpcs per chann: %v", config.OutstandingRpcsPerChannel)
    98  	logger.Infof(" - channel number: %v", config.ClientChannels)
    99  	logger.Infof(" - load params: %v", config.LoadParams)
   100  	logger.Infof(" - rpc type: %v", config.RpcType)
   101  	logger.Infof(" - histogram params: %v", config.HistogramParams)
   102  	logger.Infof(" - server targets: %v", config.ServerTargets)
   103  }
   104  
   105  func setupClientEnv(config *testpb.ClientConfig) {
   106  	// Use all cpu cores available on machine by default.
   107  	// TODO: Revisit this for the optimal default setup.
   108  	if config.CoreLimit > 0 {
   109  		runtime.GOMAXPROCS(int(config.CoreLimit))
   110  	} else {
   111  		runtime.GOMAXPROCS(runtime.NumCPU())
   112  	}
   113  }
   114  
   115  // createConns creates connections according to given config.
   116  // It returns the connections and corresponding function to close them.
   117  // It returns non-nil error if there is anything wrong.
   118  func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
   119  	opts := []grpc.DialOption{
   120  		grpc.WithWriteBufferSize(128 * 1024),
   121  		grpc.WithReadBufferSize(128 * 1024),
   122  	}
   123  
   124  	// Sanity check for client type.
   125  	switch config.ClientType {
   126  	case testpb.ClientType_SYNC_CLIENT:
   127  	case testpb.ClientType_ASYNC_CLIENT:
   128  	default:
   129  		return nil, nil, status.Errorf(codes.InvalidArgument, "unknown client type: %v", config.ClientType)
   130  	}
   131  
   132  	// Check and set security options.
   133  	if config.SecurityParams != nil {
   134  		if *caFile == "" {
   135  			*caFile = testdata.Path("ca.pem")
   136  		}
   137  		creds, err := credentials.NewClientTLSFromFile(*caFile, config.SecurityParams.ServerHostOverride)
   138  		if err != nil {
   139  			return nil, nil, status.Errorf(codes.InvalidArgument, "failed to create TLS credentials: %v", err)
   140  		}
   141  		opts = append(opts, grpc.WithTransportCredentials(creds))
   142  	} else {
   143  		opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
   144  	}
   145  
   146  	// Use byteBufCodec if it is required.
   147  	if config.PayloadConfig != nil {
   148  		switch config.PayloadConfig.Payload.(type) {
   149  		case *testpb.PayloadConfig_BytebufParams:
   150  			opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(byteBufCodec{})))
   151  		case *testpb.PayloadConfig_SimpleParams:
   152  		default:
   153  			return nil, nil, status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
   154  		}
   155  	}
   156  
   157  	// Create connections.
   158  	connCount := int(config.ClientChannels)
   159  	conns := make([]*grpc.ClientConn, connCount)
   160  	for connIndex := 0; connIndex < connCount; connIndex++ {
   161  		conns[connIndex] = benchmark.NewClientConn(config.ServerTargets[connIndex%len(config.ServerTargets)], opts...)
   162  	}
   163  
   164  	return conns, func() {
   165  		for _, conn := range conns {
   166  			conn.Close()
   167  		}
   168  	}, nil
   169  }
   170  
   171  func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benchmarkClient) error {
   172  	// Read payload size and type from config.
   173  	var (
   174  		payloadReqSize, payloadRespSize int
   175  		payloadType                     string
   176  	)
   177  	if config.PayloadConfig != nil {
   178  		switch c := config.PayloadConfig.Payload.(type) {
   179  		case *testpb.PayloadConfig_BytebufParams:
   180  			payloadReqSize = int(c.BytebufParams.ReqSize)
   181  			payloadRespSize = int(c.BytebufParams.RespSize)
   182  			payloadType = "bytebuf"
   183  		case *testpb.PayloadConfig_SimpleParams:
   184  			payloadReqSize = int(c.SimpleParams.ReqSize)
   185  			payloadRespSize = int(c.SimpleParams.RespSize)
   186  			payloadType = "protobuf"
   187  		default:
   188  			return status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
   189  		}
   190  	}
   191  
   192  	// If set, perform an open loop, if not perform a closed loop. An open loop
   193  	// asynchronously starts RPCs based on random start times derived from a
   194  	// Poisson distribution. A closed loop performs RPCs in a blocking manner,
   195  	// and runs the next RPC after the previous RPC completes and returns.
   196  	var poissonLambda *float64
   197  	switch t := config.LoadParams.Load.(type) {
   198  	case *testpb.LoadParams_ClosedLoop:
   199  	case *testpb.LoadParams_Poisson:
   200  		if t.Poisson == nil {
   201  			return status.Errorf(codes.InvalidArgument, "poisson is nil, needs to be set")
   202  		}
   203  		if t.Poisson.OfferedLoad <= 0 {
   204  			return status.Errorf(codes.InvalidArgument, "poisson.offered is <= 0: %v, needs to be >0", t.Poisson.OfferedLoad)
   205  		}
   206  		poissonLambda = &t.Poisson.OfferedLoad
   207  	default:
   208  		return status.Errorf(codes.InvalidArgument, "unknown load params: %v", config.LoadParams)
   209  	}
   210  
   211  	rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
   212  
   213  	switch config.RpcType {
   214  	case testpb.RpcType_UNARY:
   215  		bc.unaryLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, poissonLambda)
   216  	case testpb.RpcType_STREAMING:
   217  		bc.streamingLoop(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType, poissonLambda)
   218  	default:
   219  		return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
   220  	}
   221  
   222  	return nil
   223  }
   224  
   225  func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) {
   226  	printClientConfig(config)
   227  
   228  	// Set running environment like how many cores to use.
   229  	setupClientEnv(config)
   230  
   231  	conns, closeConns, err := createConns(config)
   232  	if err != nil {
   233  		return nil, err
   234  	}
   235  
   236  	rpcCountPerConn := int(config.OutstandingRpcsPerChannel)
   237  	bc := &benchmarkClient{
   238  		histogramOptions: stats.HistogramOptions{
   239  			NumBuckets:     int(math.Log(config.HistogramParams.MaxPossible)/math.Log(1+config.HistogramParams.Resolution)) + 1,
   240  			GrowthFactor:   config.HistogramParams.Resolution,
   241  			BaseBucketSize: (1 + config.HistogramParams.Resolution),
   242  			MinValue:       0,
   243  		},
   244  		lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns)),
   245  
   246  		stop:            make(chan bool),
   247  		lastResetTime:   time.Now(),
   248  		closeConns:      closeConns,
   249  		rusageLastReset: syscall.GetRusage(),
   250  	}
   251  
   252  	if err = performRPCs(config, conns, bc); err != nil {
   253  		// Close all connections if performRPCs failed.
   254  		closeConns()
   255  		return nil, err
   256  	}
   257  
   258  	return bc, nil
   259  }
   260  
   261  func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, poissonLambda *float64) {
   262  	for ic, conn := range conns {
   263  		client := testgrpc.NewBenchmarkServiceClient(conn)
   264  		// For each connection, create rpcCountPerConn goroutines to do rpc.
   265  		for j := 0; j < rpcCountPerConn; j++ {
   266  			// Create histogram for each goroutine.
   267  			idx := ic*rpcCountPerConn + j
   268  			bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
   269  			// Start goroutine on the created mutex and histogram.
   270  			go func(idx int) {
   271  				// TODO: do warm up if necessary.
   272  				// Now relying on worker client to reserve time to do warm up.
   273  				// The worker client needs to wait for some time after client is created,
   274  				// before starting benchmark.
   275  				if poissonLambda == nil { // Closed loop.
   276  					done := make(chan bool)
   277  					for {
   278  						go func() {
   279  							start := time.Now()
   280  							if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
   281  								select {
   282  								case <-bc.stop:
   283  								case done <- false:
   284  								}
   285  								return
   286  							}
   287  							elapse := time.Since(start)
   288  							bc.lockingHistograms[idx].add(int64(elapse))
   289  							select {
   290  							case <-bc.stop:
   291  							case done <- true:
   292  							}
   293  						}()
   294  						select {
   295  						case <-bc.stop:
   296  							return
   297  						case <-done:
   298  						}
   299  					}
   300  				} else { // Open loop.
   301  					timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
   302  					time.AfterFunc(timeBetweenRPCs, func() {
   303  						bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
   304  					})
   305  				}
   306  
   307  			}(idx)
   308  		}
   309  	}
   310  }
   311  
   312  func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
   313  	var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
   314  	if payloadType == "bytebuf" {
   315  		doRPC = benchmark.DoByteBufStreamingRoundTrip
   316  	} else {
   317  		doRPC = benchmark.DoStreamingRoundTrip
   318  	}
   319  	for ic, conn := range conns {
   320  		// For each connection, create rpcCountPerConn goroutines to do rpc.
   321  		for j := 0; j < rpcCountPerConn; j++ {
   322  			c := testgrpc.NewBenchmarkServiceClient(conn)
   323  			stream, err := c.StreamingCall(context.Background())
   324  			if err != nil {
   325  				logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
   326  			}
   327  			idx := ic*rpcCountPerConn + j
   328  			bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
   329  			if poissonLambda == nil { // Closed loop.
   330  				// Start goroutine on the created mutex and histogram.
   331  				go func(idx int) {
   332  					// TODO: do warm up if necessary.
   333  					// Now relying on worker client to reserve time to do warm up.
   334  					// The worker client needs to wait for some time after client is created,
   335  					// before starting benchmark.
   336  					for {
   337  						start := time.Now()
   338  						if err := doRPC(stream, reqSize, respSize); err != nil {
   339  							return
   340  						}
   341  						elapse := time.Since(start)
   342  						bc.lockingHistograms[idx].add(int64(elapse))
   343  						select {
   344  						case <-bc.stop:
   345  							return
   346  						default:
   347  						}
   348  					}
   349  				}(idx)
   350  			} else { // Open loop.
   351  				timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
   352  				time.AfterFunc(timeBetweenRPCs, func() {
   353  					bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC)
   354  				})
   355  			}
   356  		}
   357  	}
   358  }
   359  
   360  func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
   361  	go func() {
   362  		start := time.Now()
   363  		if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
   364  			return
   365  		}
   366  		elapse := time.Since(start)
   367  		bc.lockingHistograms[idx].add(int64(elapse))
   368  	}()
   369  	timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
   370  	time.AfterFunc(timeBetweenRPCs, func() {
   371  		bc.poissonUnary(client, idx, reqSize, respSize, lambda)
   372  	})
   373  }
   374  
   375  func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_StreamingCallClient, idx int, reqSize int, respSize int, lambda float64, doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error) {
   376  	go func() {
   377  		start := time.Now()
   378  		if err := doRPC(stream, reqSize, respSize); err != nil {
   379  			return
   380  		}
   381  		elapse := time.Since(start)
   382  		bc.lockingHistograms[idx].add(int64(elapse))
   383  	}()
   384  	timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
   385  	time.AfterFunc(timeBetweenRPCs, func() {
   386  		bc.poissonStreaming(stream, idx, reqSize, respSize, lambda, doRPC)
   387  	})
   388  }
   389  
   390  // getStats returns the stats for benchmark client.
   391  // It resets lastResetTime and all histograms if argument reset is true.
   392  func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats {
   393  	var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64
   394  	mergedHistogram := stats.NewHistogram(bc.histogramOptions)
   395  
   396  	if reset {
   397  		// Merging histogram may take some time.
   398  		// Put all histograms aside and merge later.
   399  		toMerge := make([]*stats.Histogram, len(bc.lockingHistograms))
   400  		for i := range bc.lockingHistograms {
   401  			toMerge[i] = bc.lockingHistograms[i].swap(stats.NewHistogram(bc.histogramOptions))
   402  		}
   403  
   404  		for i := 0; i < len(toMerge); i++ {
   405  			mergedHistogram.Merge(toMerge[i])
   406  		}
   407  
   408  		wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
   409  		latestRusage := syscall.GetRusage()
   410  		uTimeElapsed, sTimeElapsed = syscall.CPUTimeDiff(bc.rusageLastReset, latestRusage)
   411  
   412  		bc.rusageLastReset = latestRusage
   413  		bc.lastResetTime = time.Now()
   414  	} else {
   415  		// Merge only, not reset.
   416  		for i := range bc.lockingHistograms {
   417  			bc.lockingHistograms[i].mergeInto(mergedHistogram)
   418  		}
   419  
   420  		wallTimeElapsed = time.Since(bc.lastResetTime).Seconds()
   421  		uTimeElapsed, sTimeElapsed = syscall.CPUTimeDiff(bc.rusageLastReset, syscall.GetRusage())
   422  	}
   423  
   424  	b := make([]uint32, len(mergedHistogram.Buckets))
   425  	for i, v := range mergedHistogram.Buckets {
   426  		b[i] = uint32(v.Count)
   427  	}
   428  	return &testpb.ClientStats{
   429  		Latencies: &testpb.HistogramData{
   430  			Bucket:       b,
   431  			MinSeen:      float64(mergedHistogram.Min),
   432  			MaxSeen:      float64(mergedHistogram.Max),
   433  			Sum:          float64(mergedHistogram.Sum),
   434  			SumOfSquares: float64(mergedHistogram.SumOfSquares),
   435  			Count:        float64(mergedHistogram.Count),
   436  		},
   437  		TimeElapsed: wallTimeElapsed,
   438  		TimeUser:    uTimeElapsed,
   439  		TimeSystem:  sTimeElapsed,
   440  	}
   441  }
   442  
   443  func (bc *benchmarkClient) shutdown() {
   444  	close(bc.stop)
   445  	bc.closeConns()
   446  }
   447  

View as plain text