...

Source file src/google.golang.org/grpc/benchmark/benchmark.go

Documentation: google.golang.org/grpc/benchmark

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  /*
    20  Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
    21  */
    22  package benchmark
    23  
    24  import (
    25  	"context"
    26  	"fmt"
    27  	"io"
    28  	"log"
    29  	"math/rand"
    30  	"net"
    31  	"strconv"
    32  	"time"
    33  
    34  	"google.golang.org/grpc"
    35  	"google.golang.org/grpc/codes"
    36  	"google.golang.org/grpc/grpclog"
    37  	"google.golang.org/grpc/metadata"
    38  	"google.golang.org/grpc/status"
    39  
    40  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    41  	testpb "google.golang.org/grpc/interop/grpc_testing"
    42  )
    43  
    44  var logger = grpclog.Component("benchmark")
    45  
    46  // Allows reuse of the same testpb.Payload object.
    47  func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
    48  	if size < 0 {
    49  		logger.Fatalf("Requested a response with invalid length %d", size)
    50  	}
    51  	body := make([]byte, size)
    52  	switch t {
    53  	case testpb.PayloadType_COMPRESSABLE:
    54  	default:
    55  		logger.Fatalf("Unsupported payload type: %d", t)
    56  	}
    57  	p.Type = t
    58  	p.Body = body
    59  }
    60  
    61  // NewPayload creates a payload with the given type and size.
    62  func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
    63  	p := new(testpb.Payload)
    64  	setPayload(p, t, size)
    65  	return p
    66  }
    67  
    68  type testServer struct {
    69  	testgrpc.UnimplementedBenchmarkServiceServer
    70  }
    71  
    72  func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
    73  	return &testpb.SimpleResponse{
    74  		Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
    75  	}, nil
    76  }
    77  
    78  // UnconstrainedStreamingHeader indicates to the StreamingCall handler that its
    79  // behavior should be unconstrained (constant send/receive in parallel) instead
    80  // of ping-pong.
    81  const UnconstrainedStreamingHeader = "unconstrained-streaming"
    82  
    83  // UnconstrainedStreamingDelayHeader is used to pass the maximum amount of time
    84  // the server should sleep between consecutive RPC responses.
    85  const UnconstrainedStreamingDelayHeader = "unconstrained-streaming-delay"
    86  
    87  // PreloadMsgSizeHeader indicates that the client is going to ask for
    88  // a fixed response size and passes this size to the server.
    89  // The server is expected to preload the response on startup.
    90  const PreloadMsgSizeHeader = "preload-msg-size"
    91  
    92  func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
    93  	preloadMsgSize := 0
    94  	if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[PreloadMsgSizeHeader]) != 0 {
    95  		val := md[PreloadMsgSizeHeader][0]
    96  		var err error
    97  		preloadMsgSize, err = strconv.Atoi(val)
    98  		if err != nil {
    99  			return fmt.Errorf("%q header value is not an integer: %s", PreloadMsgSizeHeader, err)
   100  		}
   101  	}
   102  
   103  	if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 {
   104  		return s.UnconstrainedStreamingCall(stream, preloadMsgSize)
   105  	}
   106  	response := &testpb.SimpleResponse{
   107  		Payload: new(testpb.Payload),
   108  	}
   109  	preloadedResponse := &grpc.PreparedMsg{}
   110  	if preloadMsgSize > 0 {
   111  		setPayload(response.Payload, testpb.PayloadType_COMPRESSABLE, preloadMsgSize)
   112  		if err := preloadedResponse.Encode(stream, response); err != nil {
   113  			return err
   114  		}
   115  	}
   116  	in := new(testpb.SimpleRequest)
   117  	for {
   118  		// use ServerStream directly to reuse the same testpb.SimpleRequest object
   119  		err := stream.(grpc.ServerStream).RecvMsg(in)
   120  		if err == io.EOF {
   121  			// read done.
   122  			return nil
   123  		}
   124  		if err != nil {
   125  			return err
   126  		}
   127  		if preloadMsgSize > 0 {
   128  			err = stream.SendMsg(preloadedResponse)
   129  		} else {
   130  			setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
   131  			err = stream.Send(response)
   132  		}
   133  		if err != nil {
   134  			return err
   135  		}
   136  	}
   137  }
   138  
   139  func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer, preloadMsgSize int) error {
   140  	maxSleep := 0
   141  	if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingDelayHeader]) != 0 {
   142  		val := md[UnconstrainedStreamingDelayHeader][0]
   143  		d, err := time.ParseDuration(val)
   144  		if err != nil {
   145  			return fmt.Errorf("can't parse %q header: %s", UnconstrainedStreamingDelayHeader, err)
   146  		}
   147  		maxSleep = int(d)
   148  	}
   149  
   150  	in := new(testpb.SimpleRequest)
   151  	// Receive a message to learn response type and size.
   152  	err := stream.RecvMsg(in)
   153  	if err == io.EOF {
   154  		// read done.
   155  		return nil
   156  	}
   157  	if err != nil {
   158  		return err
   159  	}
   160  
   161  	response := &testpb.SimpleResponse{
   162  		Payload: new(testpb.Payload),
   163  	}
   164  	setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
   165  
   166  	preloadedResponse := &grpc.PreparedMsg{}
   167  	if preloadMsgSize > 0 {
   168  		if err := preloadedResponse.Encode(stream, response); err != nil {
   169  			return err
   170  		}
   171  	}
   172  
   173  	go func() {
   174  		for {
   175  			// Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
   176  			err := stream.RecvMsg(in)
   177  			switch status.Code(err) {
   178  			case codes.Canceled:
   179  				return
   180  			case codes.OK:
   181  			default:
   182  				log.Fatalf("server recv error: %v", err)
   183  			}
   184  		}
   185  	}()
   186  
   187  	go func() {
   188  		for {
   189  			if maxSleep > 0 {
   190  				time.Sleep(time.Duration(rand.Intn(maxSleep)))
   191  			}
   192  			var err error
   193  			if preloadMsgSize > 0 {
   194  				err = stream.SendMsg(preloadedResponse)
   195  			} else {
   196  				err = stream.Send(response)
   197  			}
   198  			switch status.Code(err) {
   199  			case codes.Unavailable, codes.Canceled:
   200  				return
   201  			case codes.OK:
   202  			default:
   203  				log.Fatalf("server send error: %v", err)
   204  			}
   205  		}
   206  	}()
   207  
   208  	<-stream.Context().Done()
   209  	return stream.Context().Err()
   210  }
   211  
   212  // byteBufServer is a gRPC server that sends and receives byte buffer.
   213  // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
   214  type byteBufServer struct {
   215  	testgrpc.UnimplementedBenchmarkServiceServer
   216  	respSize int32
   217  }
   218  
   219  // UnaryCall is an empty function and is not used for benchmark.
   220  // If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
   221  func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
   222  	return &testpb.SimpleResponse{}, nil
   223  }
   224  
   225  func (s *byteBufServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
   226  	for {
   227  		var in []byte
   228  		err := stream.(grpc.ServerStream).RecvMsg(&in)
   229  		if err == io.EOF {
   230  			return nil
   231  		}
   232  		if err != nil {
   233  			return err
   234  		}
   235  		out := make([]byte, s.respSize)
   236  		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
   237  			return err
   238  		}
   239  	}
   240  }
   241  
   242  // ServerInfo contains the information to create a gRPC benchmark server.
   243  type ServerInfo struct {
   244  	// Type is the type of the server.
   245  	// It should be "protobuf" or "bytebuf".
   246  	Type string
   247  
   248  	// Metadata is an optional configuration.
   249  	// For "protobuf", it's ignored.
   250  	// For "bytebuf", it should be an int representing response size.
   251  	Metadata any
   252  
   253  	// Listener is the network listener for the server to use
   254  	Listener net.Listener
   255  }
   256  
   257  // StartServer starts a gRPC server serving a benchmark service according to info.
   258  // It returns a function to stop the server.
   259  func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
   260  	s := grpc.NewServer(opts...)
   261  	switch info.Type {
   262  	case "protobuf":
   263  		testgrpc.RegisterBenchmarkServiceServer(s, &testServer{})
   264  	case "bytebuf":
   265  		respSize, ok := info.Metadata.(int32)
   266  		if !ok {
   267  			logger.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
   268  		}
   269  		testgrpc.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
   270  	default:
   271  		logger.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
   272  	}
   273  	go s.Serve(info.Listener)
   274  	return func() {
   275  		s.Stop()
   276  	}
   277  }
   278  
   279  // DoUnaryCall performs an unary RPC with given stub and request and response sizes.
   280  func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
   281  	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
   282  	req := &testpb.SimpleRequest{
   283  		ResponseType: pl.Type,
   284  		ResponseSize: int32(respSize),
   285  		Payload:      pl,
   286  	}
   287  	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
   288  		return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
   289  	}
   290  	return nil
   291  }
   292  
   293  // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
   294  func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
   295  	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
   296  	req := &testpb.SimpleRequest{
   297  		ResponseType: pl.Type,
   298  		ResponseSize: int32(respSize),
   299  		Payload:      pl,
   300  	}
   301  	return DoStreamingRoundTripPreloaded(stream, req)
   302  }
   303  
   304  // DoStreamingRoundTripPreloaded performs a round trip for a single streaming rpc with preloaded payload.
   305  func DoStreamingRoundTripPreloaded(stream testgrpc.BenchmarkService_StreamingCallClient, req any) error {
   306  	// req could be either *testpb.SimpleRequest or *grpc.PreparedMsg
   307  	if err := stream.SendMsg(req); err != nil {
   308  		return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
   309  	}
   310  	if _, err := stream.Recv(); err != nil {
   311  		// EOF is a valid error here.
   312  		if err == io.EOF {
   313  			return nil
   314  		}
   315  		return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
   316  	}
   317  	return nil
   318  }
   319  
   320  // DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
   321  func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
   322  	out := make([]byte, reqSize)
   323  	if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
   324  		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
   325  	}
   326  	var in []byte
   327  	if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
   328  		// EOF is a valid error here.
   329  		if err == io.EOF {
   330  			return nil
   331  		}
   332  		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
   333  	}
   334  	return nil
   335  }
   336  
   337  // NewClientConn creates a gRPC client connection to addr.
   338  func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
   339  	return NewClientConnWithContext(context.Background(), addr, opts...)
   340  }
   341  
   342  // NewClientConnWithContext creates a gRPC client connection to addr using ctx.
   343  func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
   344  	conn, err := grpc.DialContext(ctx, addr, opts...)
   345  	if err != nil {
   346  		logger.Fatalf("NewClientConn(%q) failed to create a ClientConn: %v", addr, err)
   347  	}
   348  	return conn
   349  }
   350  

View as plain text