...

Source file src/google.golang.org/grpc/benchmark/worker/main.go

Documentation: google.golang.org/grpc/benchmark/worker

     1  /*
     2   *
     3   * Copyright 2016 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Binary worker implements the benchmark worker that can turn into a benchmark
    20  // client or server.
    21  package main
    22  
    23  import (
    24  	"context"
    25  	"flag"
    26  	"fmt"
    27  	"io"
    28  	"net"
    29  	"net/http"
    30  	_ "net/http/pprof"
    31  	"runtime"
    32  	"strconv"
    33  	"time"
    34  
    35  	"google.golang.org/grpc"
    36  	"google.golang.org/grpc/codes"
    37  	"google.golang.org/grpc/grpclog"
    38  	"google.golang.org/grpc/status"
    39  
    40  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    41  	testpb "google.golang.org/grpc/interop/grpc_testing"
    42  )
    43  
    44  var (
    45  	driverPort    = flag.Int("driver_port", 10000, "port for communication with driver")
    46  	serverPort    = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message")
    47  	pprofPort     = flag.Int("pprof_port", -1, "Port for pprof debug server to listen on. Pprof server doesn't start if unset")
    48  	blockProfRate = flag.Int("block_prof_rate", 0, "fraction of goroutine blocking events to report in blocking profile")
    49  
    50  	logger = grpclog.Component("benchmark")
    51  )
    52  
    53  type byteBufCodec struct {
    54  }
    55  
    56  func (byteBufCodec) Marshal(v any) ([]byte, error) {
    57  	b, ok := v.(*[]byte)
    58  	if !ok {
    59  		return nil, fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
    60  	}
    61  	return *b, nil
    62  }
    63  
    64  func (byteBufCodec) Unmarshal(data []byte, v any) error {
    65  	b, ok := v.(*[]byte)
    66  	if !ok {
    67  		return fmt.Errorf("failed to marshal: %v is not type of *[]byte", v)
    68  	}
    69  	*b = data
    70  	return nil
    71  }
    72  
    73  func (byteBufCodec) String() string {
    74  	return "bytebuffer"
    75  }
    76  
    77  // workerServer implements WorkerService rpc handlers.
    78  // It can create benchmarkServer or benchmarkClient on demand.
    79  type workerServer struct {
    80  	testgrpc.UnimplementedWorkerServiceServer
    81  	stop       chan<- bool
    82  	serverPort int
    83  }
    84  
    85  func (s *workerServer) RunServer(stream testgrpc.WorkerService_RunServerServer) error {
    86  	var bs *benchmarkServer
    87  	defer func() {
    88  		// Close benchmark server when stream ends.
    89  		logger.Infof("closing benchmark server")
    90  		if bs != nil {
    91  			bs.closeFunc()
    92  		}
    93  	}()
    94  	for {
    95  		in, err := stream.Recv()
    96  		if err == io.EOF {
    97  			return nil
    98  		}
    99  		if err != nil {
   100  			return err
   101  		}
   102  
   103  		var out *testpb.ServerStatus
   104  		switch argtype := in.Argtype.(type) {
   105  		case *testpb.ServerArgs_Setup:
   106  			logger.Infof("server setup received:")
   107  			if bs != nil {
   108  				logger.Infof("server setup received when server already exists, closing the existing server")
   109  				bs.closeFunc()
   110  			}
   111  			bs, err = startBenchmarkServer(argtype.Setup, s.serverPort)
   112  			if err != nil {
   113  				return err
   114  			}
   115  			out = &testpb.ServerStatus{
   116  				Stats: bs.getStats(false),
   117  				Port:  int32(bs.port),
   118  				Cores: int32(bs.cores),
   119  			}
   120  
   121  		case *testpb.ServerArgs_Mark:
   122  			logger.Infof("server mark received:")
   123  			logger.Infof(" - %v", argtype)
   124  			if bs == nil {
   125  				return status.Error(codes.InvalidArgument, "server does not exist when mark received")
   126  			}
   127  			out = &testpb.ServerStatus{
   128  				Stats: bs.getStats(argtype.Mark.Reset_),
   129  				Port:  int32(bs.port),
   130  				Cores: int32(bs.cores),
   131  			}
   132  		}
   133  
   134  		if err := stream.Send(out); err != nil {
   135  			return err
   136  		}
   137  	}
   138  }
   139  
   140  func (s *workerServer) RunClient(stream testgrpc.WorkerService_RunClientServer) error {
   141  	var bc *benchmarkClient
   142  	defer func() {
   143  		// Shut down benchmark client when stream ends.
   144  		logger.Infof("shuting down benchmark client")
   145  		if bc != nil {
   146  			bc.shutdown()
   147  		}
   148  	}()
   149  	for {
   150  		in, err := stream.Recv()
   151  		if err == io.EOF {
   152  			return nil
   153  		}
   154  		if err != nil {
   155  			return err
   156  		}
   157  
   158  		var out *testpb.ClientStatus
   159  		switch t := in.Argtype.(type) {
   160  		case *testpb.ClientArgs_Setup:
   161  			logger.Infof("client setup received:")
   162  			if bc != nil {
   163  				logger.Infof("client setup received when client already exists, shuting down the existing client")
   164  				bc.shutdown()
   165  			}
   166  			bc, err = startBenchmarkClient(t.Setup)
   167  			if err != nil {
   168  				return err
   169  			}
   170  			out = &testpb.ClientStatus{
   171  				Stats: bc.getStats(false),
   172  			}
   173  
   174  		case *testpb.ClientArgs_Mark:
   175  			logger.Infof("client mark received:")
   176  			logger.Infof(" - %v", t)
   177  			if bc == nil {
   178  				return status.Error(codes.InvalidArgument, "client does not exist when mark received")
   179  			}
   180  			out = &testpb.ClientStatus{
   181  				Stats: bc.getStats(t.Mark.Reset_),
   182  			}
   183  		}
   184  
   185  		if err := stream.Send(out); err != nil {
   186  			return err
   187  		}
   188  	}
   189  }
   190  
   191  func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
   192  	logger.Infof("core count: %v", runtime.NumCPU())
   193  	return &testpb.CoreResponse{Cores: int32(runtime.NumCPU())}, nil
   194  }
   195  
   196  func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
   197  	logger.Infof("quitting worker")
   198  	s.stop <- true
   199  	return &testpb.Void{}, nil
   200  }
   201  
   202  func main() {
   203  	grpc.EnableTracing = false
   204  
   205  	flag.Parse()
   206  	lis, err := net.Listen("tcp", ":"+strconv.Itoa(*driverPort))
   207  	if err != nil {
   208  		logger.Fatalf("failed to listen: %v", err)
   209  	}
   210  	logger.Infof("worker listening at port %v", *driverPort)
   211  
   212  	s := grpc.NewServer()
   213  	stop := make(chan bool)
   214  	testgrpc.RegisterWorkerServiceServer(s, &workerServer{
   215  		stop:       stop,
   216  		serverPort: *serverPort,
   217  	})
   218  
   219  	go func() {
   220  		<-stop
   221  		// Wait for 1 second before stopping the server to make sure the return value of QuitWorker is sent to client.
   222  		// TODO revise this once server graceful stop is supported in gRPC.
   223  		time.Sleep(time.Second)
   224  		s.Stop()
   225  	}()
   226  
   227  	runtime.SetBlockProfileRate(*blockProfRate)
   228  
   229  	if *pprofPort >= 0 {
   230  		go func() {
   231  			logger.Infoln("Starting pprof server on port " + strconv.Itoa(*pprofPort))
   232  			logger.Infoln(http.ListenAndServe("localhost:"+strconv.Itoa(*pprofPort), nil))
   233  		}()
   234  	}
   235  
   236  	s.Serve(lis)
   237  }
   238  

View as plain text