...

Source file src/google.golang.org/grpc/benchmark/worker/benchmark_server.go

Documentation: google.golang.org/grpc/benchmark/worker

     1  /*
     2   *
     3   * Copyright 2016 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package main
    20  
    21  import (
    22  	"flag"
    23  	"fmt"
    24  	"net"
    25  	"runtime"
    26  	"strconv"
    27  	"strings"
    28  	"sync"
    29  	"time"
    30  
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/benchmark"
    33  	"google.golang.org/grpc/codes"
    34  	"google.golang.org/grpc/credentials"
    35  	"google.golang.org/grpc/internal/syscall"
    36  	testpb "google.golang.org/grpc/interop/grpc_testing"
    37  	"google.golang.org/grpc/status"
    38  	"google.golang.org/grpc/testdata"
    39  )
    40  
    41  var (
    42  	certFile = flag.String("tls_cert_file", "", "The TLS cert file")
    43  	keyFile  = flag.String("tls_key_file", "", "The TLS key file")
    44  )
    45  
    46  type benchmarkServer struct {
    47  	port            int
    48  	cores           int
    49  	closeFunc       func()
    50  	mu              sync.RWMutex
    51  	lastResetTime   time.Time
    52  	rusageLastReset *syscall.Rusage
    53  }
    54  
    55  func printServerConfig(config *testpb.ServerConfig) {
    56  	// Some config options are ignored:
    57  	// - server type:
    58  	//     will always start sync server
    59  	// - async server threads
    60  	// - core list
    61  	logger.Infof(" * server type: %v (ignored, always starts sync server)", config.ServerType)
    62  	logger.Infof(" * async server threads: %v (ignored)", config.AsyncServerThreads)
    63  	// TODO: use cores specified by CoreList when setting list of cores is supported in go.
    64  	logger.Infof(" * core list: %v (ignored)", config.CoreList)
    65  
    66  	logger.Infof(" - security params: %v", config.SecurityParams)
    67  	logger.Infof(" - core limit: %v", config.CoreLimit)
    68  	logger.Infof(" - port: %v", config.Port)
    69  	logger.Infof(" - payload config: %v", config.PayloadConfig)
    70  }
    71  
    72  func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchmarkServer, error) {
    73  	printServerConfig(config)
    74  
    75  	// Use all cpu cores available on machine by default.
    76  	// TODO: Revisit this for the optimal default setup.
    77  	numOfCores := runtime.NumCPU()
    78  	if config.CoreLimit > 0 {
    79  		numOfCores = int(config.CoreLimit)
    80  	}
    81  	runtime.GOMAXPROCS(numOfCores)
    82  
    83  	opts := []grpc.ServerOption{
    84  		grpc.WriteBufferSize(128 * 1024),
    85  		grpc.ReadBufferSize(128 * 1024),
    86  	}
    87  
    88  	// Sanity check for server type.
    89  	switch config.ServerType {
    90  	case testpb.ServerType_SYNC_SERVER:
    91  	case testpb.ServerType_ASYNC_SERVER:
    92  	case testpb.ServerType_ASYNC_GENERIC_SERVER:
    93  	default:
    94  		return nil, status.Errorf(codes.InvalidArgument, "unknown server type: %v", config.ServerType)
    95  	}
    96  
    97  	// Set security options.
    98  	if config.SecurityParams != nil {
    99  		if *certFile == "" {
   100  			*certFile = testdata.Path("server1.pem")
   101  		}
   102  		if *keyFile == "" {
   103  			*keyFile = testdata.Path("server1.key")
   104  		}
   105  		creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
   106  		if err != nil {
   107  			logger.Fatalf("failed to generate credentials: %v", err)
   108  		}
   109  		opts = append(opts, grpc.Creds(creds))
   110  	}
   111  
   112  	// Priority: config.Port > serverPort > default (0).
   113  	port := int(config.Port)
   114  	if port == 0 {
   115  		port = serverPort
   116  	}
   117  	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
   118  	if err != nil {
   119  		logger.Fatalf("Failed to listen: %v", err)
   120  	}
   121  	addr := lis.Addr().String()
   122  
   123  	// Create different benchmark server according to config.
   124  	var closeFunc func()
   125  	if config.PayloadConfig != nil {
   126  		switch payload := config.PayloadConfig.Payload.(type) {
   127  		case *testpb.PayloadConfig_BytebufParams:
   128  			opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
   129  			closeFunc = benchmark.StartServer(benchmark.ServerInfo{
   130  				Type:     "bytebuf",
   131  				Metadata: payload.BytebufParams.RespSize,
   132  				Listener: lis,
   133  			}, opts...)
   134  		case *testpb.PayloadConfig_SimpleParams:
   135  			closeFunc = benchmark.StartServer(benchmark.ServerInfo{
   136  				Type:     "protobuf",
   137  				Listener: lis,
   138  			}, opts...)
   139  		case *testpb.PayloadConfig_ComplexParams:
   140  			return nil, status.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
   141  		default:
   142  			return nil, status.Errorf(codes.InvalidArgument, "unknown payload config: %v", config.PayloadConfig)
   143  		}
   144  	} else {
   145  		// Start protobuf server if payload config is nil.
   146  		closeFunc = benchmark.StartServer(benchmark.ServerInfo{
   147  			Type:     "protobuf",
   148  			Listener: lis,
   149  		}, opts...)
   150  	}
   151  
   152  	logger.Infof("benchmark server listening at %v", addr)
   153  	addrSplitted := strings.Split(addr, ":")
   154  	p, err := strconv.Atoi(addrSplitted[len(addrSplitted)-1])
   155  	if err != nil {
   156  		logger.Fatalf("failed to get port number from server address: %v", err)
   157  	}
   158  
   159  	return &benchmarkServer{
   160  		port:            p,
   161  		cores:           numOfCores,
   162  		closeFunc:       closeFunc,
   163  		lastResetTime:   time.Now(),
   164  		rusageLastReset: syscall.GetRusage(),
   165  	}, nil
   166  }
   167  
   168  // getStats returns the stats for benchmark server.
   169  // It resets lastResetTime if argument reset is true.
   170  func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats {
   171  	bs.mu.RLock()
   172  	defer bs.mu.RUnlock()
   173  	wallTimeElapsed := time.Since(bs.lastResetTime).Seconds()
   174  	rusageLatest := syscall.GetRusage()
   175  	uTimeElapsed, sTimeElapsed := syscall.CPUTimeDiff(bs.rusageLastReset, rusageLatest)
   176  
   177  	if reset {
   178  		bs.lastResetTime = time.Now()
   179  		bs.rusageLastReset = rusageLatest
   180  	}
   181  	return &testpb.ServerStats{
   182  		TimeElapsed: wallTimeElapsed,
   183  		TimeUser:    uTimeElapsed,
   184  		TimeSystem:  sTimeElapsed,
   185  	}
   186  }
   187  

View as plain text