...

Source file src/google.golang.org/grpc/benchmark/benchmain/main.go

Documentation: google.golang.org/grpc/benchmark/benchmain

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  /*
    20  Package main provides benchmark with setting flags.
    21  
    22  An example to run some benchmarks with profiling enabled:
    23  
    24  	go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
    25  	  -compression=gzip -maxConcurrentCalls=1 -trace=off \
    26  	  -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
    27  	  -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
    28  
    29  As a suggestion, when creating a branch, you can run this benchmark and save the result
    30  file "-resultFile=basePerf", and later when you at the middle of the work or finish the
    31  work, you can get the benchmark result and compare it with the base anytime.
    32  
    33  Assume there are two result files names as "basePerf" and "curPerf" created by adding
    34  -resultFile=basePerf and -resultFile=curPerf.
    35  
    36  		To format the curPerf, run:
    37  	  	go run benchmark/benchresult/main.go curPerf
    38  		To observe how the performance changes based on a base result, run:
    39  	  	go run benchmark/benchresult/main.go basePerf curPerf
    40  */
    41  package main
    42  
    43  import (
    44  	"context"
    45  	"encoding/gob"
    46  	"flag"
    47  	"fmt"
    48  	"io"
    49  	"log"
    50  	"math/rand"
    51  	"net"
    52  	"os"
    53  	"reflect"
    54  	"runtime"
    55  	"runtime/pprof"
    56  	"strconv"
    57  	"strings"
    58  	"sync"
    59  	"sync/atomic"
    60  	"time"
    61  
    62  	"google.golang.org/grpc"
    63  	"google.golang.org/grpc/benchmark"
    64  	"google.golang.org/grpc/benchmark/flags"
    65  	"google.golang.org/grpc/benchmark/latency"
    66  	"google.golang.org/grpc/benchmark/stats"
    67  	"google.golang.org/grpc/credentials/insecure"
    68  	"google.golang.org/grpc/experimental"
    69  	"google.golang.org/grpc/grpclog"
    70  	"google.golang.org/grpc/internal"
    71  	"google.golang.org/grpc/internal/channelz"
    72  	"google.golang.org/grpc/keepalive"
    73  	"google.golang.org/grpc/metadata"
    74  	"google.golang.org/grpc/test/bufconn"
    75  
    76  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    77  	testpb "google.golang.org/grpc/interop/grpc_testing"
    78  )
    79  
    80  var (
    81  	workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
    82  		fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
    83  	traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
    84  		fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
    85  	preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
    86  		fmt.Sprintf("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will be ignored in unary mode",
    87  			strings.Join(allToggleModes, ", ")), allToggleModes)
    88  	channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
    89  		fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
    90  	compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
    91  		fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
    92  	networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
    93  		"Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
    94  	readLatency           = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
    95  	readKbps              = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
    96  	readMTU               = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
    97  	maxConcurrentCalls    = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
    98  	readReqSizeBytes      = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
    99  	readRespSizeBytes     = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
   100  	reqPayloadCurveFiles  = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
   101  	respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
   102  	benchTime             = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
   103  	memProfile            = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
   104  	memProfileRate        = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
   105  		"memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
   106  		"set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
   107  	cpuProfile          = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
   108  	benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
   109  	useBufconn          = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
   110  	enableKeepalive     = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
   111  		"Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
   112  	clientReadBufferSize  = flags.IntSlice("clientReadBufferSize", []int{-1}, "Configures the client read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
   113  	clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
   114  	serverReadBufferSize  = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
   115  	serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
   116  	sleepBetweenRPCs      = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list")
   117  	connections           = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams")
   118  	recvBufferPool        = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools)
   119  	sharedWriteBuffer     = flags.StringWithAllowedValues("sharedWriteBuffer", toggleModeOff,
   120  		fmt.Sprintf("Configures both client and server to share write buffer - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
   121  
   122  	logger = grpclog.Component("benchmark")
   123  )
   124  
   125  const (
   126  	workloadsUnary         = "unary"
   127  	workloadsStreaming     = "streaming"
   128  	workloadsUnconstrained = "unconstrained"
   129  	workloadsAll           = "all"
   130  	// Compression modes.
   131  	compModeOff  = "off"
   132  	compModeGzip = "gzip"
   133  	compModeNop  = "nop"
   134  	compModeAll  = "all"
   135  	// Toggle modes.
   136  	toggleModeOff  = "off"
   137  	toggleModeOn   = "on"
   138  	toggleModeBoth = "both"
   139  	// Network modes.
   140  	networkModeNone  = "none"
   141  	networkModeLocal = "Local"
   142  	networkModeLAN   = "LAN"
   143  	networkModeWAN   = "WAN"
   144  	networkLongHaul  = "Longhaul"
   145  	// Shared recv buffer pool
   146  	recvBufferPoolNil    = "nil"
   147  	recvBufferPoolSimple = "simple"
   148  	recvBufferPoolAll    = "all"
   149  
   150  	numStatsBuckets = 10
   151  	warmupCallCount = 10
   152  	warmuptime      = time.Second
   153  )
   154  
   155  var (
   156  	allWorkloads              = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
   157  	allCompModes              = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
   158  	allToggleModes            = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
   159  	allNetworkModes           = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
   160  	allRecvBufferPools        = []string{recvBufferPoolNil, recvBufferPoolSimple, recvBufferPoolAll}
   161  	defaultReadLatency        = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
   162  	defaultReadKbps           = []int{0, 10240}                           // if non-positive, infinite
   163  	defaultReadMTU            = []int{0}                                  // if non-positive, infinite
   164  	defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
   165  	defaultReqSizeBytes       = []int{1, 1024, 1024 * 1024}
   166  	defaultRespSizeBytes      = []int{1, 1024, 1024 * 1024}
   167  	networks                  = map[string]latency.Network{
   168  		networkModeLocal: latency.Local,
   169  		networkModeLAN:   latency.LAN,
   170  		networkModeWAN:   latency.WAN,
   171  		networkLongHaul:  latency.Longhaul,
   172  	}
   173  	keepaliveTime    = 10 * time.Second
   174  	keepaliveTimeout = 1 * time.Second
   175  	// This is 0.8*keepaliveTime to prevent connection issues because of server
   176  	// keepalive enforcement.
   177  	keepaliveMinTime = 8 * time.Second
   178  )
   179  
   180  // runModes indicates the workloads to run. This is initialized with a call to
   181  // `runModesFromWorkloads`, passing the workloads flag set by the user.
   182  type runModes struct {
   183  	unary, streaming, unconstrained bool
   184  }
   185  
   186  // runModesFromWorkloads determines the runModes based on the value of
   187  // workloads flag set by the user.
   188  func runModesFromWorkloads(workload string) runModes {
   189  	r := runModes{}
   190  	switch workload {
   191  	case workloadsUnary:
   192  		r.unary = true
   193  	case workloadsStreaming:
   194  		r.streaming = true
   195  	case workloadsUnconstrained:
   196  		r.unconstrained = true
   197  	case workloadsAll:
   198  		r.unary = true
   199  		r.streaming = true
   200  		r.unconstrained = true
   201  	default:
   202  		log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
   203  			workloads, strings.Join(allWorkloads, ", "))
   204  	}
   205  	return r
   206  }
   207  
   208  type startFunc func(mode string, bf stats.Features)
   209  type stopFunc func(count uint64)
   210  type ucStopFunc func(req uint64, resp uint64)
   211  type rpcCallFunc func(cn, pos int)
   212  type rpcSendFunc func(cn, pos int)
   213  type rpcRecvFunc func(cn, pos int)
   214  type rpcCleanupFunc func()
   215  
   216  func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
   217  	caller, cleanup := makeFuncUnary(bf)
   218  	defer cleanup()
   219  	runBenchmark(caller, start, stop, bf, s, workloadsUnary)
   220  }
   221  
   222  func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
   223  	caller, cleanup := makeFuncStream(bf)
   224  	defer cleanup()
   225  	runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
   226  }
   227  
   228  func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features) {
   229  	var sender rpcSendFunc
   230  	var recver rpcRecvFunc
   231  	var cleanup rpcCleanupFunc
   232  	if bf.EnablePreloader {
   233  		sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
   234  	} else {
   235  		sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
   236  	}
   237  	defer cleanup()
   238  
   239  	var req, resp uint64
   240  	go func() {
   241  		// Resets the counters once warmed up
   242  		<-time.NewTimer(warmuptime).C
   243  		atomic.StoreUint64(&req, 0)
   244  		atomic.StoreUint64(&resp, 0)
   245  		start(workloadsUnconstrained, bf)
   246  	}()
   247  
   248  	bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
   249  	var wg sync.WaitGroup
   250  	wg.Add(2 * bf.Connections * bf.MaxConcurrentCalls)
   251  	maxSleep := int(bf.SleepBetweenRPCs)
   252  	for cn := 0; cn < bf.Connections; cn++ {
   253  		for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
   254  			go func(cn, pos int) {
   255  				defer wg.Done()
   256  				for {
   257  					if maxSleep > 0 {
   258  						time.Sleep(time.Duration(rand.Intn(maxSleep)))
   259  					}
   260  					t := time.Now()
   261  					if t.After(bmEnd) {
   262  						return
   263  					}
   264  					sender(cn, pos)
   265  					atomic.AddUint64(&req, 1)
   266  				}
   267  			}(cn, pos)
   268  			go func(cn, pos int) {
   269  				defer wg.Done()
   270  				for {
   271  					t := time.Now()
   272  					if t.After(bmEnd) {
   273  						return
   274  					}
   275  					recver(cn, pos)
   276  					atomic.AddUint64(&resp, 1)
   277  				}
   278  			}(cn, pos)
   279  		}
   280  	}
   281  	wg.Wait()
   282  	stop(req, resp)
   283  }
   284  
   285  // makeClients returns a gRPC client (or multiple clients) for the grpc.testing.BenchmarkService
   286  // service. The client is configured using the different options in the passed
   287  // 'bf'. Also returns a cleanup function to close the client and release
   288  // resources.
   289  func makeClients(bf stats.Features) ([]testgrpc.BenchmarkServiceClient, func()) {
   290  	nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
   291  	opts := []grpc.DialOption{}
   292  	sopts := []grpc.ServerOption{}
   293  	if bf.ModeCompressor == compModeNop {
   294  		sopts = append(sopts,
   295  			grpc.RPCCompressor(nopCompressor{}),
   296  			grpc.RPCDecompressor(nopDecompressor{}),
   297  		)
   298  		opts = append(opts,
   299  			grpc.WithCompressor(nopCompressor{}),
   300  			grpc.WithDecompressor(nopDecompressor{}),
   301  		)
   302  	}
   303  	if bf.ModeCompressor == compModeGzip {
   304  		sopts = append(sopts,
   305  			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
   306  			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
   307  		)
   308  		opts = append(opts,
   309  			grpc.WithCompressor(grpc.NewGZIPCompressor()),
   310  			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
   311  		)
   312  	}
   313  	if bf.EnableKeepalive {
   314  		sopts = append(sopts,
   315  			grpc.KeepaliveParams(keepalive.ServerParameters{
   316  				Time:    keepaliveTime,
   317  				Timeout: keepaliveTimeout,
   318  			}),
   319  			grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
   320  				MinTime:             keepaliveMinTime,
   321  				PermitWithoutStream: true,
   322  			}),
   323  		)
   324  		opts = append(opts,
   325  			grpc.WithKeepaliveParams(keepalive.ClientParameters{
   326  				Time:                keepaliveTime,
   327  				Timeout:             keepaliveTimeout,
   328  				PermitWithoutStream: true,
   329  			}),
   330  		)
   331  	}
   332  	if bf.ClientReadBufferSize >= 0 {
   333  		opts = append(opts, grpc.WithReadBufferSize(bf.ClientReadBufferSize))
   334  	}
   335  	if bf.ClientWriteBufferSize >= 0 {
   336  		opts = append(opts, grpc.WithWriteBufferSize(bf.ClientWriteBufferSize))
   337  	}
   338  	if bf.ServerReadBufferSize >= 0 {
   339  		sopts = append(sopts, grpc.ReadBufferSize(bf.ServerReadBufferSize))
   340  	}
   341  	if bf.SharedWriteBuffer {
   342  		opts = append(opts, grpc.WithSharedWriteBuffer(true))
   343  		sopts = append(sopts, grpc.SharedWriteBuffer(true))
   344  	}
   345  	if bf.ServerWriteBufferSize >= 0 {
   346  		sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize))
   347  	}
   348  	switch bf.RecvBufferPool {
   349  	case recvBufferPoolNil:
   350  		// Do nothing.
   351  	case recvBufferPoolSimple:
   352  		opts = append(opts, experimental.WithRecvBufferPool(grpc.NewSharedBufferPool()))
   353  		sopts = append(sopts, experimental.RecvBufferPool(grpc.NewSharedBufferPool()))
   354  	default:
   355  		logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
   356  	}
   357  
   358  	sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
   359  	opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
   360  
   361  	var lis net.Listener
   362  	if bf.UseBufConn {
   363  		bcLis := bufconn.Listen(256 * 1024)
   364  		lis = bcLis
   365  		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
   366  			return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
   367  				return bcLis.Dial()
   368  			})(ctx, "", "")
   369  		}))
   370  	} else {
   371  		var err error
   372  		lis, err = net.Listen("tcp", "localhost:0")
   373  		if err != nil {
   374  			logger.Fatalf("Failed to listen: %v", err)
   375  		}
   376  		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
   377  			return nw.ContextDialer((internal.NetDialerWithTCPKeepalive().DialContext))(ctx, "tcp", lis.Addr().String())
   378  		}))
   379  	}
   380  	lis = nw.Listener(lis)
   381  	stopper := benchmark.StartServer(benchmark.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
   382  	conns := make([]*grpc.ClientConn, bf.Connections)
   383  	clients := make([]testgrpc.BenchmarkServiceClient, bf.Connections)
   384  	for cn := 0; cn < bf.Connections; cn++ {
   385  		conns[cn] = benchmark.NewClientConn("" /* target not used */, opts...)
   386  		clients[cn] = testgrpc.NewBenchmarkServiceClient(conns[cn])
   387  	}
   388  
   389  	return clients, func() {
   390  		for _, conn := range conns {
   391  			conn.Close()
   392  		}
   393  		stopper()
   394  	}
   395  }
   396  
   397  func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
   398  	clients, cleanup := makeClients(bf)
   399  	return func(cn, pos int) {
   400  		reqSizeBytes := bf.ReqSizeBytes
   401  		respSizeBytes := bf.RespSizeBytes
   402  		if bf.ReqPayloadCurve != nil {
   403  			reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
   404  		}
   405  		if bf.RespPayloadCurve != nil {
   406  			respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
   407  		}
   408  		unaryCaller(clients[cn], reqSizeBytes, respSizeBytes)
   409  	}, cleanup
   410  }
   411  
   412  func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
   413  	streams, req, cleanup := setupStream(bf, false)
   414  
   415  	var preparedMsg [][]*grpc.PreparedMsg
   416  	if bf.EnablePreloader {
   417  		preparedMsg = prepareMessages(streams, req)
   418  	}
   419  
   420  	return func(cn, pos int) {
   421  		reqSizeBytes := bf.ReqSizeBytes
   422  		respSizeBytes := bf.RespSizeBytes
   423  		if bf.ReqPayloadCurve != nil {
   424  			reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
   425  		}
   426  		if bf.RespPayloadCurve != nil {
   427  			respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
   428  		}
   429  		var req any
   430  		if bf.EnablePreloader {
   431  			req = preparedMsg[cn][pos]
   432  		} else {
   433  			pl := benchmark.NewPayload(testpb.PayloadType_COMPRESSABLE, reqSizeBytes)
   434  			req = &testpb.SimpleRequest{
   435  				ResponseType: pl.Type,
   436  				ResponseSize: int32(respSizeBytes),
   437  				Payload:      pl,
   438  			}
   439  		}
   440  		streamCaller(streams[cn][pos], req)
   441  	}, cleanup
   442  }
   443  
   444  func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
   445  	streams, req, cleanup := setupStream(bf, true)
   446  
   447  	preparedMsg := prepareMessages(streams, req)
   448  
   449  	return func(cn, pos int) {
   450  			streams[cn][pos].SendMsg(preparedMsg[cn][pos])
   451  		}, func(cn, pos int) {
   452  			streams[cn][pos].Recv()
   453  		}, cleanup
   454  }
   455  
   456  func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
   457  	streams, req, cleanup := setupStream(bf, true)
   458  
   459  	return func(cn, pos int) {
   460  			streams[cn][pos].Send(req)
   461  		}, func(cn, pos int) {
   462  			streams[cn][pos].Recv()
   463  		}, cleanup
   464  }
   465  
   466  func setupStream(bf stats.Features, unconstrained bool) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
   467  	clients, cleanup := makeClients(bf)
   468  
   469  	streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
   470  	ctx := context.Background()
   471  	if unconstrained {
   472  		md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
   473  		ctx = metadata.NewOutgoingContext(ctx, md)
   474  	}
   475  	if bf.EnablePreloader {
   476  		md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
   477  		ctx = metadata.NewOutgoingContext(ctx, md)
   478  	}
   479  	for cn := 0; cn < bf.Connections; cn++ {
   480  		tc := clients[cn]
   481  		streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
   482  		for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
   483  			stream, err := tc.StreamingCall(ctx)
   484  			if err != nil {
   485  				logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
   486  			}
   487  			streams[cn][pos] = stream
   488  		}
   489  	}
   490  
   491  	pl := benchmark.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
   492  	req := &testpb.SimpleRequest{
   493  		ResponseType: pl.Type,
   494  		ResponseSize: int32(bf.RespSizeBytes),
   495  		Payload:      pl,
   496  	}
   497  
   498  	return streams, req, cleanup
   499  }
   500  
   501  func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, req *testpb.SimpleRequest) [][]*grpc.PreparedMsg {
   502  	preparedMsg := make([][]*grpc.PreparedMsg, len(streams))
   503  	for cn, connStreams := range streams {
   504  		preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams))
   505  		for pos, stream := range connStreams {
   506  			preparedMsg[cn][pos] = &grpc.PreparedMsg{}
   507  			if err := preparedMsg[cn][pos].Encode(stream, req); err != nil {
   508  				logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err)
   509  			}
   510  		}
   511  	}
   512  	return preparedMsg
   513  }
   514  
   515  // Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
   516  // request and response sizes.
   517  func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
   518  	if err := benchmark.DoUnaryCall(client, reqSize, respSize); err != nil {
   519  		logger.Fatalf("DoUnaryCall failed: %v", err)
   520  	}
   521  }
   522  
   523  func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, req any) {
   524  	if err := benchmark.DoStreamingRoundTripPreloaded(stream, req); err != nil {
   525  		logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
   526  	}
   527  }
   528  
   529  func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
   530  	// if SleepBetweenRPCs > 0 we skip the warmup because otherwise
   531  	// we are going to send a set of simultaneous requests on every connection,
   532  	// which is something we are trying to avoid when using SleepBetweenRPCs.
   533  	if bf.SleepBetweenRPCs == 0 {
   534  		// Warm up connections.
   535  		for i := 0; i < warmupCallCount; i++ {
   536  			for cn := 0; cn < bf.Connections; cn++ {
   537  				caller(cn, 0)
   538  			}
   539  		}
   540  	}
   541  
   542  	// Run benchmark.
   543  	start(mode, bf)
   544  	var wg sync.WaitGroup
   545  	wg.Add(bf.Connections * bf.MaxConcurrentCalls)
   546  	bmEnd := time.Now().Add(bf.BenchTime)
   547  	maxSleep := int(bf.SleepBetweenRPCs)
   548  	var count uint64
   549  	for cn := 0; cn < bf.Connections; cn++ {
   550  		for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
   551  			go func(cn, pos int) {
   552  				defer wg.Done()
   553  				for {
   554  					if maxSleep > 0 {
   555  						time.Sleep(time.Duration(rand.Intn(maxSleep)))
   556  					}
   557  					t := time.Now()
   558  					if t.After(bmEnd) {
   559  						return
   560  					}
   561  					start := time.Now()
   562  					caller(cn, pos)
   563  					elapse := time.Since(start)
   564  					atomic.AddUint64(&count, 1)
   565  					s.AddDuration(elapse)
   566  				}
   567  			}(cn, pos)
   568  		}
   569  	}
   570  	wg.Wait()
   571  	stop(count)
   572  }
   573  
   574  // benchOpts represents all configurable options available while running this
   575  // benchmark. This is built from the values passed as flags.
   576  type benchOpts struct {
   577  	rModes              runModes
   578  	benchTime           time.Duration
   579  	memProfileRate      int
   580  	memProfile          string
   581  	cpuProfile          string
   582  	networkMode         string
   583  	benchmarkResultFile string
   584  	useBufconn          bool
   585  	enableKeepalive     bool
   586  	connections         int
   587  	features            *featureOpts
   588  }
   589  
   590  // featureOpts represents options which can have multiple values. The user
   591  // usually provides a comma-separated list of options for each of these
   592  // features through command line flags. We generate all possible combinations
   593  // for the provided values and run the benchmarks for each combination.
   594  type featureOpts struct {
   595  	enableTrace           []bool
   596  	readLatencies         []time.Duration
   597  	readKbps              []int
   598  	readMTU               []int
   599  	maxConcurrentCalls    []int
   600  	reqSizeBytes          []int
   601  	respSizeBytes         []int
   602  	reqPayloadCurves      []*stats.PayloadCurve
   603  	respPayloadCurves     []*stats.PayloadCurve
   604  	compModes             []string
   605  	enableChannelz        []bool
   606  	enablePreloader       []bool
   607  	clientReadBufferSize  []int
   608  	clientWriteBufferSize []int
   609  	serverReadBufferSize  []int
   610  	serverWriteBufferSize []int
   611  	sleepBetweenRPCs      []time.Duration
   612  	recvBufferPools       []string
   613  	sharedWriteBuffer     []bool
   614  }
   615  
   616  // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
   617  // element of the slice (indexed by 'featuresIndex' enum) contains the number
   618  // of features to be exercised by the benchmark code.
   619  // For example: Index 0 of the returned slice contains the number of values for
   620  // enableTrace feature, while index 1 contains the number of value of
   621  // readLatencies feature and so on.
   622  func makeFeaturesNum(b *benchOpts) []int {
   623  	featuresNum := make([]int, stats.MaxFeatureIndex)
   624  	for i := 0; i < len(featuresNum); i++ {
   625  		switch stats.FeatureIndex(i) {
   626  		case stats.EnableTraceIndex:
   627  			featuresNum[i] = len(b.features.enableTrace)
   628  		case stats.ReadLatenciesIndex:
   629  			featuresNum[i] = len(b.features.readLatencies)
   630  		case stats.ReadKbpsIndex:
   631  			featuresNum[i] = len(b.features.readKbps)
   632  		case stats.ReadMTUIndex:
   633  			featuresNum[i] = len(b.features.readMTU)
   634  		case stats.MaxConcurrentCallsIndex:
   635  			featuresNum[i] = len(b.features.maxConcurrentCalls)
   636  		case stats.ReqSizeBytesIndex:
   637  			featuresNum[i] = len(b.features.reqSizeBytes)
   638  		case stats.RespSizeBytesIndex:
   639  			featuresNum[i] = len(b.features.respSizeBytes)
   640  		case stats.ReqPayloadCurveIndex:
   641  			featuresNum[i] = len(b.features.reqPayloadCurves)
   642  		case stats.RespPayloadCurveIndex:
   643  			featuresNum[i] = len(b.features.respPayloadCurves)
   644  		case stats.CompModesIndex:
   645  			featuresNum[i] = len(b.features.compModes)
   646  		case stats.EnableChannelzIndex:
   647  			featuresNum[i] = len(b.features.enableChannelz)
   648  		case stats.EnablePreloaderIndex:
   649  			featuresNum[i] = len(b.features.enablePreloader)
   650  		case stats.ClientReadBufferSize:
   651  			featuresNum[i] = len(b.features.clientReadBufferSize)
   652  		case stats.ClientWriteBufferSize:
   653  			featuresNum[i] = len(b.features.clientWriteBufferSize)
   654  		case stats.ServerReadBufferSize:
   655  			featuresNum[i] = len(b.features.serverReadBufferSize)
   656  		case stats.ServerWriteBufferSize:
   657  			featuresNum[i] = len(b.features.serverWriteBufferSize)
   658  		case stats.SleepBetweenRPCs:
   659  			featuresNum[i] = len(b.features.sleepBetweenRPCs)
   660  		case stats.RecvBufferPool:
   661  			featuresNum[i] = len(b.features.recvBufferPools)
   662  		case stats.SharedWriteBuffer:
   663  			featuresNum[i] = len(b.features.sharedWriteBuffer)
   664  		default:
   665  			log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
   666  		}
   667  	}
   668  	return featuresNum
   669  }
   670  
   671  // sharedFeatures returns a bool slice which acts as a bitmask. Each item in
   672  // the slice represents a feature, indexed by 'featureIndex' enum.  The bit is
   673  // set to 1 if the corresponding feature does not have multiple value, so is
   674  // shared amongst all benchmarks.
   675  func sharedFeatures(featuresNum []int) []bool {
   676  	result := make([]bool, len(featuresNum))
   677  	for i, num := range featuresNum {
   678  		if num <= 1 {
   679  			result[i] = true
   680  		}
   681  	}
   682  	return result
   683  }
   684  
   685  // generateFeatures generates all combinations of the provided feature options.
   686  // While all the feature options are stored in the benchOpts struct, the input
   687  // parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
   688  // the number of values for each feature.
   689  // For example, let's say the user sets -workloads=all and
   690  // -maxConcurrentCalls=1,100, this would end up with the following
   691  // combinations:
   692  // [workloads: unary, maxConcurrentCalls=1]
   693  // [workloads: unary, maxConcurrentCalls=1]
   694  // [workloads: streaming, maxConcurrentCalls=100]
   695  // [workloads: streaming, maxConcurrentCalls=100]
   696  // [workloads: unconstrained, maxConcurrentCalls=1]
   697  // [workloads: unconstrained, maxConcurrentCalls=100]
   698  func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
   699  	// curPos and initialPos are two slices where each value acts as an index
   700  	// into the appropriate feature slice maintained in benchOpts.features. This
   701  	// loop generates all possible combinations of features by changing one value
   702  	// at a time, and once curPos becomes equal to initialPos, we have explored
   703  	// all options.
   704  	var result []stats.Features
   705  	var curPos []int
   706  	initialPos := make([]int, stats.MaxFeatureIndex)
   707  	for !reflect.DeepEqual(initialPos, curPos) {
   708  		if curPos == nil {
   709  			curPos = make([]int, stats.MaxFeatureIndex)
   710  		}
   711  		f := stats.Features{
   712  			// These features stay the same for each iteration.
   713  			NetworkMode:     b.networkMode,
   714  			UseBufConn:      b.useBufconn,
   715  			EnableKeepalive: b.enableKeepalive,
   716  			BenchTime:       b.benchTime,
   717  			Connections:     b.connections,
   718  			// These features can potentially change for each iteration.
   719  			EnableTrace:           b.features.enableTrace[curPos[stats.EnableTraceIndex]],
   720  			Latency:               b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
   721  			Kbps:                  b.features.readKbps[curPos[stats.ReadKbpsIndex]],
   722  			MTU:                   b.features.readMTU[curPos[stats.ReadMTUIndex]],
   723  			MaxConcurrentCalls:    b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
   724  			ModeCompressor:        b.features.compModes[curPos[stats.CompModesIndex]],
   725  			EnableChannelz:        b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
   726  			EnablePreloader:       b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
   727  			ClientReadBufferSize:  b.features.clientReadBufferSize[curPos[stats.ClientReadBufferSize]],
   728  			ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
   729  			ServerReadBufferSize:  b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
   730  			ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
   731  			SleepBetweenRPCs:      b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]],
   732  			RecvBufferPool:        b.features.recvBufferPools[curPos[stats.RecvBufferPool]],
   733  			SharedWriteBuffer:     b.features.sharedWriteBuffer[curPos[stats.SharedWriteBuffer]],
   734  		}
   735  		if len(b.features.reqPayloadCurves) == 0 {
   736  			f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
   737  		} else {
   738  			f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
   739  		}
   740  		if len(b.features.respPayloadCurves) == 0 {
   741  			f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
   742  		} else {
   743  			f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
   744  		}
   745  		result = append(result, f)
   746  		addOne(curPos, featuresNum)
   747  	}
   748  	return result
   749  }
   750  
   751  // addOne mutates the input slice 'features' by changing one feature, thus
   752  // arriving at the next combination of feature values. 'featuresMaxPosition'
   753  // provides the numbers of allowed values for each feature, indexed by
   754  // 'featureIndex' enum.
   755  func addOne(features []int, featuresMaxPosition []int) {
   756  	for i := len(features) - 1; i >= 0; i-- {
   757  		if featuresMaxPosition[i] == 0 {
   758  			continue
   759  		}
   760  		features[i] = (features[i] + 1)
   761  		if features[i]/featuresMaxPosition[i] == 0 {
   762  			break
   763  		}
   764  		features[i] = features[i] % featuresMaxPosition[i]
   765  	}
   766  }
   767  
   768  // processFlags reads the command line flags and builds benchOpts. Specifying
   769  // invalid values for certain flags will cause flag.Parse() to fail, and the
   770  // program to terminate.
   771  // This *SHOULD* be the only place where the flags are accessed. All other
   772  // parts of the benchmark code should rely on the returned benchOpts.
   773  func processFlags() *benchOpts {
   774  	flag.Parse()
   775  	if flag.NArg() != 0 {
   776  		log.Fatal("Error: unparsed arguments: ", flag.Args())
   777  	}
   778  
   779  	opts := &benchOpts{
   780  		rModes:              runModesFromWorkloads(*workloads),
   781  		benchTime:           *benchTime,
   782  		memProfileRate:      *memProfileRate,
   783  		memProfile:          *memProfile,
   784  		cpuProfile:          *cpuProfile,
   785  		networkMode:         *networkMode,
   786  		benchmarkResultFile: *benchmarkResultFile,
   787  		useBufconn:          *useBufconn,
   788  		enableKeepalive:     *enableKeepalive,
   789  		connections:         *connections,
   790  		features: &featureOpts{
   791  			enableTrace:           setToggleMode(*traceMode),
   792  			readLatencies:         append([]time.Duration(nil), *readLatency...),
   793  			readKbps:              append([]int(nil), *readKbps...),
   794  			readMTU:               append([]int(nil), *readMTU...),
   795  			maxConcurrentCalls:    append([]int(nil), *maxConcurrentCalls...),
   796  			reqSizeBytes:          append([]int(nil), *readReqSizeBytes...),
   797  			respSizeBytes:         append([]int(nil), *readRespSizeBytes...),
   798  			compModes:             setCompressorMode(*compressorMode),
   799  			enableChannelz:        setToggleMode(*channelzOn),
   800  			enablePreloader:       setToggleMode(*preloaderMode),
   801  			clientReadBufferSize:  append([]int(nil), *clientReadBufferSize...),
   802  			clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
   803  			serverReadBufferSize:  append([]int(nil), *serverReadBufferSize...),
   804  			serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
   805  			sleepBetweenRPCs:      append([]time.Duration(nil), *sleepBetweenRPCs...),
   806  			recvBufferPools:       setRecvBufferPool(*recvBufferPool),
   807  			sharedWriteBuffer:     setToggleMode(*sharedWriteBuffer),
   808  		},
   809  	}
   810  
   811  	if len(*reqPayloadCurveFiles) == 0 {
   812  		if len(opts.features.reqSizeBytes) == 0 {
   813  			opts.features.reqSizeBytes = defaultReqSizeBytes
   814  		}
   815  	} else {
   816  		if len(opts.features.reqSizeBytes) != 0 {
   817  			log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
   818  		}
   819  		if len(opts.features.enablePreloader) != 0 {
   820  			log.Fatalf("you may not specify -reqPayloadCurveFiles and -preloader at the same time")
   821  		}
   822  		for _, file := range *reqPayloadCurveFiles {
   823  			pc, err := stats.NewPayloadCurve(file)
   824  			if err != nil {
   825  				log.Fatalf("cannot load payload curve file %s: %v", file, err)
   826  			}
   827  			opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
   828  		}
   829  		opts.features.reqSizeBytes = nil
   830  	}
   831  	if len(*respPayloadCurveFiles) == 0 {
   832  		if len(opts.features.respSizeBytes) == 0 {
   833  			opts.features.respSizeBytes = defaultRespSizeBytes
   834  		}
   835  	} else {
   836  		if len(opts.features.respSizeBytes) != 0 {
   837  			log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
   838  		}
   839  		if len(opts.features.enablePreloader) != 0 {
   840  			log.Fatalf("you may not specify -respPayloadCurveFiles and -preloader at the same time")
   841  		}
   842  		for _, file := range *respPayloadCurveFiles {
   843  			pc, err := stats.NewPayloadCurve(file)
   844  			if err != nil {
   845  				log.Fatalf("cannot load payload curve file %s: %v", file, err)
   846  			}
   847  			opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
   848  		}
   849  		opts.features.respSizeBytes = nil
   850  	}
   851  
   852  	// Re-write latency, kpbs and mtu if network mode is set.
   853  	if network, ok := networks[opts.networkMode]; ok {
   854  		opts.features.readLatencies = []time.Duration{network.Latency}
   855  		opts.features.readKbps = []int{network.Kbps}
   856  		opts.features.readMTU = []int{network.MTU}
   857  	}
   858  	return opts
   859  }
   860  
   861  func setToggleMode(val string) []bool {
   862  	switch val {
   863  	case toggleModeOn:
   864  		return []bool{true}
   865  	case toggleModeOff:
   866  		return []bool{false}
   867  	case toggleModeBoth:
   868  		return []bool{false, true}
   869  	default:
   870  		// This should never happen because a wrong value passed to this flag would
   871  		// be caught during flag.Parse().
   872  		return []bool{}
   873  	}
   874  }
   875  
   876  func setCompressorMode(val string) []string {
   877  	switch val {
   878  	case compModeNop, compModeGzip, compModeOff:
   879  		return []string{val}
   880  	case compModeAll:
   881  		return []string{compModeNop, compModeGzip, compModeOff}
   882  	default:
   883  		// This should never happen because a wrong value passed to this flag would
   884  		// be caught during flag.Parse().
   885  		return []string{}
   886  	}
   887  }
   888  
   889  func setRecvBufferPool(val string) []string {
   890  	switch val {
   891  	case recvBufferPoolNil, recvBufferPoolSimple:
   892  		return []string{val}
   893  	case recvBufferPoolAll:
   894  		return []string{recvBufferPoolNil, recvBufferPoolSimple}
   895  	default:
   896  		// This should never happen because a wrong value passed to this flag would
   897  		// be caught during flag.Parse().
   898  		return []string{}
   899  	}
   900  }
   901  
   902  func main() {
   903  	opts := processFlags()
   904  	before(opts)
   905  
   906  	s := stats.NewStats(numStatsBuckets)
   907  	featuresNum := makeFeaturesNum(opts)
   908  	sf := sharedFeatures(featuresNum)
   909  
   910  	var (
   911  		start  = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
   912  		stop   = func(count uint64) { s.EndRun(count) }
   913  		ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
   914  	)
   915  
   916  	for _, bf := range opts.generateFeatures(featuresNum) {
   917  		grpc.EnableTracing = bf.EnableTrace
   918  		if bf.EnableChannelz {
   919  			channelz.TurnOn()
   920  		}
   921  		if opts.rModes.unary {
   922  			unaryBenchmark(start, stop, bf, s)
   923  		}
   924  		if opts.rModes.streaming {
   925  			streamBenchmark(start, stop, bf, s)
   926  		}
   927  		if opts.rModes.unconstrained {
   928  			unconstrainedStreamBenchmark(start, ucStop, bf)
   929  		}
   930  	}
   931  	after(opts, s.GetResults())
   932  }
   933  
   934  func before(opts *benchOpts) {
   935  	if opts.memProfile != "" {
   936  		runtime.MemProfileRate = opts.memProfileRate
   937  	}
   938  	if opts.cpuProfile != "" {
   939  		f, err := os.Create(opts.cpuProfile)
   940  		if err != nil {
   941  			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
   942  			return
   943  		}
   944  		if err := pprof.StartCPUProfile(f); err != nil {
   945  			fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
   946  			f.Close()
   947  			return
   948  		}
   949  	}
   950  }
   951  
   952  func after(opts *benchOpts, data []stats.BenchResults) {
   953  	if opts.cpuProfile != "" {
   954  		pprof.StopCPUProfile() // flushes profile to disk
   955  	}
   956  	if opts.memProfile != "" {
   957  		f, err := os.Create(opts.memProfile)
   958  		if err != nil {
   959  			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
   960  			os.Exit(2)
   961  		}
   962  		runtime.GC() // materialize all statistics
   963  		if err = pprof.WriteHeapProfile(f); err != nil {
   964  			fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
   965  			os.Exit(2)
   966  		}
   967  		f.Close()
   968  	}
   969  	if opts.benchmarkResultFile != "" {
   970  		f, err := os.Create(opts.benchmarkResultFile)
   971  		if err != nil {
   972  			log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
   973  		}
   974  		dataEncoder := gob.NewEncoder(f)
   975  		dataEncoder.Encode(data)
   976  		f.Close()
   977  	}
   978  }
   979  
   980  // nopCompressor is a compressor that just copies data.
   981  type nopCompressor struct{}
   982  
   983  func (nopCompressor) Do(w io.Writer, p []byte) error {
   984  	n, err := w.Write(p)
   985  	if err != nil {
   986  		return err
   987  	}
   988  	if n != len(p) {
   989  		return fmt.Errorf("nopCompressor.Write: wrote %d bytes; want %d", n, len(p))
   990  	}
   991  	return nil
   992  }
   993  
   994  func (nopCompressor) Type() string { return compModeNop }
   995  
   996  // nopDecompressor is a decompressor that just copies data.
   997  type nopDecompressor struct{}
   998  
   999  func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return io.ReadAll(r) }
  1000  func (nopDecompressor) Type() string                   { return compModeNop }
  1001  

View as plain text