...

Source file src/google.golang.org/grpc/interop/xds/client/client.go

Documentation: google.golang.org/grpc/interop/xds/client

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Binary client for xDS interop tests.
    20  package main
    21  
    22  import (
    23  	"context"
    24  	"flag"
    25  	"fmt"
    26  	"log"
    27  	"net"
    28  	"strings"
    29  	"sync"
    30  	"sync/atomic"
    31  	"time"
    32  
    33  	"google.golang.org/grpc"
    34  	"google.golang.org/grpc/admin"
    35  	"google.golang.org/grpc/credentials/insecure"
    36  	"google.golang.org/grpc/credentials/xds"
    37  	"google.golang.org/grpc/grpclog"
    38  	"google.golang.org/grpc/metadata"
    39  	"google.golang.org/grpc/peer"
    40  	"google.golang.org/grpc/reflection"
    41  	"google.golang.org/grpc/status"
    42  	_ "google.golang.org/grpc/xds"
    43  
    44  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    45  	testpb "google.golang.org/grpc/interop/grpc_testing"
    46  	_ "google.golang.org/grpc/interop/xds" // to register Custom LB.
    47  )
    48  
    49  func init() {
    50  	rpcCfgs.Store([]*rpcConfig{{typ: unaryCall}})
    51  }
    52  
    53  type statsWatcherKey struct {
    54  	startID int32
    55  	endID   int32
    56  }
    57  
    58  // rpcInfo contains the rpc type and the hostname where the response is received
    59  // from.
    60  type rpcInfo struct {
    61  	typ      string
    62  	hostname string
    63  }
    64  
    65  type statsWatcher struct {
    66  	rpcsByPeer    map[string]int32
    67  	rpcsByType    map[string]map[string]int32
    68  	numFailures   int32
    69  	remainingRPCs int32
    70  	chanHosts     chan *rpcInfo
    71  }
    72  
    73  func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
    74  	rpcsByType := make(map[string]*testpb.LoadBalancerStatsResponse_RpcsByPeer, len(watcher.rpcsByType))
    75  	for t, rpcsByPeer := range watcher.rpcsByType {
    76  		rpcsByType[t] = &testpb.LoadBalancerStatsResponse_RpcsByPeer{
    77  			RpcsByPeer: rpcsByPeer,
    78  		}
    79  	}
    80  
    81  	return &testpb.LoadBalancerStatsResponse{
    82  		NumFailures:  watcher.numFailures + watcher.remainingRPCs,
    83  		RpcsByPeer:   watcher.rpcsByPeer,
    84  		RpcsByMethod: rpcsByType,
    85  	}
    86  }
    87  
    88  type accumulatedStats struct {
    89  	mu                       sync.Mutex
    90  	numRPCsStartedByMethod   map[string]int32
    91  	numRPCsSucceededByMethod map[string]int32
    92  	numRPCsFailedByMethod    map[string]int32
    93  	rpcStatusByMethod        map[string]map[int32]int32
    94  }
    95  
    96  func convertRPCName(in string) string {
    97  	switch in {
    98  	case unaryCall:
    99  		return testpb.ClientConfigureRequest_UNARY_CALL.String()
   100  	case emptyCall:
   101  		return testpb.ClientConfigureRequest_EMPTY_CALL.String()
   102  	}
   103  	logger.Warningf("unrecognized rpc type: %s", in)
   104  	return in
   105  }
   106  
   107  // copyStatsMap makes a copy of the map.
   108  func copyStatsMap(originalMap map[string]int32) map[string]int32 {
   109  	newMap := make(map[string]int32, len(originalMap))
   110  	for k, v := range originalMap {
   111  		newMap[k] = v
   112  	}
   113  	return newMap
   114  }
   115  
   116  // copyStatsIntMap makes a copy of the map.
   117  func copyStatsIntMap(originalMap map[int32]int32) map[int32]int32 {
   118  	newMap := make(map[int32]int32, len(originalMap))
   119  	for k, v := range originalMap {
   120  		newMap[k] = v
   121  	}
   122  	return newMap
   123  }
   124  
   125  func (as *accumulatedStats) makeStatsMap() map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats {
   126  	m := make(map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats)
   127  	for k, v := range as.numRPCsStartedByMethod {
   128  		m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{RpcsStarted: v}
   129  	}
   130  	for k, v := range as.rpcStatusByMethod {
   131  		if m[k] == nil {
   132  			m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{}
   133  		}
   134  		m[k].Result = copyStatsIntMap(v)
   135  	}
   136  	return m
   137  }
   138  
   139  func (as *accumulatedStats) buildResp() *testpb.LoadBalancerAccumulatedStatsResponse {
   140  	as.mu.Lock()
   141  	defer as.mu.Unlock()
   142  	return &testpb.LoadBalancerAccumulatedStatsResponse{
   143  		NumRpcsStartedByMethod:   copyStatsMap(as.numRPCsStartedByMethod),
   144  		NumRpcsSucceededByMethod: copyStatsMap(as.numRPCsSucceededByMethod),
   145  		NumRpcsFailedByMethod:    copyStatsMap(as.numRPCsFailedByMethod),
   146  		StatsPerMethod:           as.makeStatsMap(),
   147  	}
   148  }
   149  
   150  func (as *accumulatedStats) startRPC(rpcType string) {
   151  	as.mu.Lock()
   152  	defer as.mu.Unlock()
   153  	as.numRPCsStartedByMethod[convertRPCName(rpcType)]++
   154  }
   155  
   156  func (as *accumulatedStats) finishRPC(rpcType string, err error) {
   157  	as.mu.Lock()
   158  	defer as.mu.Unlock()
   159  	name := convertRPCName(rpcType)
   160  	if as.rpcStatusByMethod[name] == nil {
   161  		as.rpcStatusByMethod[name] = make(map[int32]int32)
   162  	}
   163  	as.rpcStatusByMethod[name][int32(status.Convert(err).Code())]++
   164  	if err != nil {
   165  		as.numRPCsFailedByMethod[name]++
   166  		return
   167  	}
   168  	as.numRPCsSucceededByMethod[name]++
   169  }
   170  
   171  var (
   172  	failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
   173  	numChannels     = flag.Int("num_channels", 1, "Num of channels")
   174  	printResponse   = flag.Bool("print_response", false, "Write RPC response to stdout")
   175  	qps             = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
   176  	rpc             = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
   177  	rpcMetadata     = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
   178  	rpcTimeout      = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout")
   179  	server          = flag.String("server", "localhost:8080", "Address of server to connect to")
   180  	statsPort       = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service")
   181  	secureMode      = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
   182  
   183  	rpcCfgs atomic.Value
   184  
   185  	mu               sync.Mutex
   186  	currentRequestID int32
   187  	watchers         = make(map[statsWatcherKey]*statsWatcher)
   188  
   189  	accStats = accumulatedStats{
   190  		numRPCsStartedByMethod:   make(map[string]int32),
   191  		numRPCsSucceededByMethod: make(map[string]int32),
   192  		numRPCsFailedByMethod:    make(map[string]int32),
   193  		rpcStatusByMethod:        make(map[string]map[int32]int32),
   194  	}
   195  
   196  	// 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and
   197  	// setRPCSucceeded to access in a safe manner.
   198  	rpcSucceeded uint32
   199  
   200  	logger = grpclog.Component("interop")
   201  )
   202  
   203  type statsService struct {
   204  	testgrpc.UnimplementedLoadBalancerStatsServiceServer
   205  }
   206  
   207  func hasRPCSucceeded() bool {
   208  	return atomic.LoadUint32(&rpcSucceeded) > 0
   209  }
   210  
   211  func setRPCSucceeded() {
   212  	atomic.StoreUint32(&rpcSucceeded, 1)
   213  }
   214  
   215  // Wait for the next LoadBalancerStatsRequest.GetNumRpcs to start and complete,
   216  // and return the distribution of remote peers. This is essentially a clientside
   217  // LB reporting mechanism that is designed to be queried by an external test
   218  // driver when verifying that the client is distributing RPCs as expected.
   219  func (s *statsService) GetClientStats(ctx context.Context, in *testpb.LoadBalancerStatsRequest) (*testpb.LoadBalancerStatsResponse, error) {
   220  	mu.Lock()
   221  	watcherKey := statsWatcherKey{currentRequestID, currentRequestID + in.GetNumRpcs()}
   222  	watcher, ok := watchers[watcherKey]
   223  	if !ok {
   224  		watcher = &statsWatcher{
   225  			rpcsByPeer:    make(map[string]int32),
   226  			rpcsByType:    make(map[string]map[string]int32),
   227  			numFailures:   0,
   228  			remainingRPCs: in.GetNumRpcs(),
   229  			chanHosts:     make(chan *rpcInfo),
   230  		}
   231  		watchers[watcherKey] = watcher
   232  	}
   233  	mu.Unlock()
   234  
   235  	ctx, cancel := context.WithTimeout(ctx, time.Duration(in.GetTimeoutSec())*time.Second)
   236  	defer cancel()
   237  
   238  	defer func() {
   239  		mu.Lock()
   240  		delete(watchers, watcherKey)
   241  		mu.Unlock()
   242  	}()
   243  
   244  	// Wait until the requested RPCs have all been recorded or timeout occurs.
   245  	for {
   246  		select {
   247  		case info := <-watcher.chanHosts:
   248  			if info != nil {
   249  				watcher.rpcsByPeer[info.hostname]++
   250  
   251  				rpcsByPeerForType := watcher.rpcsByType[info.typ]
   252  				if rpcsByPeerForType == nil {
   253  					rpcsByPeerForType = make(map[string]int32)
   254  					watcher.rpcsByType[info.typ] = rpcsByPeerForType
   255  				}
   256  				rpcsByPeerForType[info.hostname]++
   257  			} else {
   258  				watcher.numFailures++
   259  			}
   260  			watcher.remainingRPCs--
   261  			if watcher.remainingRPCs == 0 {
   262  				return watcher.buildResp(), nil
   263  			}
   264  		case <-ctx.Done():
   265  			logger.Info("Timed out, returning partial stats")
   266  			return watcher.buildResp(), nil
   267  		}
   268  	}
   269  }
   270  
   271  func (s *statsService) GetClientAccumulatedStats(ctx context.Context, in *testpb.LoadBalancerAccumulatedStatsRequest) (*testpb.LoadBalancerAccumulatedStatsResponse, error) {
   272  	return accStats.buildResp(), nil
   273  }
   274  
   275  type configureService struct {
   276  	testgrpc.UnimplementedXdsUpdateClientConfigureServiceServer
   277  }
   278  
   279  func (s *configureService) Configure(ctx context.Context, in *testpb.ClientConfigureRequest) (*testpb.ClientConfigureResponse, error) {
   280  	rpcsToMD := make(map[testpb.ClientConfigureRequest_RpcType][]string)
   281  	for _, typ := range in.GetTypes() {
   282  		rpcsToMD[typ] = nil
   283  	}
   284  	for _, md := range in.GetMetadata() {
   285  		typ := md.GetType()
   286  		strs, ok := rpcsToMD[typ]
   287  		if !ok {
   288  			continue
   289  		}
   290  		rpcsToMD[typ] = append(strs, md.GetKey(), md.GetValue())
   291  	}
   292  	cfgs := make([]*rpcConfig, 0, len(rpcsToMD))
   293  	for typ, md := range rpcsToMD {
   294  		var rpcType string
   295  		switch typ {
   296  		case testpb.ClientConfigureRequest_UNARY_CALL:
   297  			rpcType = unaryCall
   298  		case testpb.ClientConfigureRequest_EMPTY_CALL:
   299  			rpcType = emptyCall
   300  		default:
   301  			return nil, fmt.Errorf("unsupported RPC type: %v", typ)
   302  		}
   303  		cfgs = append(cfgs, &rpcConfig{
   304  			typ:     rpcType,
   305  			md:      metadata.Pairs(md...),
   306  			timeout: in.GetTimeoutSec(),
   307  		})
   308  	}
   309  	rpcCfgs.Store(cfgs)
   310  	return &testpb.ClientConfigureResponse{}, nil
   311  }
   312  
   313  const (
   314  	unaryCall string = "UnaryCall"
   315  	emptyCall string = "EmptyCall"
   316  )
   317  
   318  func parseRPCTypes(rpcStr string) []string {
   319  	if len(rpcStr) == 0 {
   320  		return []string{unaryCall}
   321  	}
   322  
   323  	rpcs := strings.Split(rpcStr, ",")
   324  	ret := make([]string, 0, len(rpcStr))
   325  	for _, r := range rpcs {
   326  		switch r {
   327  		case unaryCall, emptyCall:
   328  			ret = append(ret, r)
   329  		default:
   330  			flag.PrintDefaults()
   331  			log.Fatalf("unsupported RPC type: %v", r)
   332  		}
   333  	}
   334  	return ret
   335  }
   336  
   337  type rpcConfig struct {
   338  	typ     string
   339  	md      metadata.MD
   340  	timeout int32
   341  }
   342  
   343  // parseRPCMetadata turns EmptyCall:key1:value1 into
   344  //
   345  //	{typ: emptyCall, md: {key1:value1}}.
   346  func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig {
   347  	rpcMetadataSplit := strings.Split(rpcMetadataStr, ",")
   348  	rpcsToMD := make(map[string][]string)
   349  	for _, rm := range rpcMetadataSplit {
   350  		rmSplit := strings.Split(rm, ":")
   351  		if len(rmSplit)%2 != 1 {
   352  			log.Fatalf("invalid metadata config %v, want EmptyCall:key1:value1", rm)
   353  		}
   354  		rpcsToMD[rmSplit[0]] = append(rpcsToMD[rmSplit[0]], rmSplit[1:]...)
   355  	}
   356  	ret := make([]*rpcConfig, 0, len(rpcs))
   357  	for _, rpcT := range rpcs {
   358  		rpcC := &rpcConfig{
   359  			typ: rpcT,
   360  		}
   361  		if md := rpcsToMD[string(rpcT)]; len(md) > 0 {
   362  			rpcC.md = metadata.Pairs(md...)
   363  		}
   364  		ret = append(ret, rpcC)
   365  	}
   366  	return ret
   367  }
   368  
   369  func main() {
   370  	flag.Parse()
   371  	rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc)))
   372  
   373  	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort))
   374  	if err != nil {
   375  		logger.Fatalf("failed to listen: %v", err)
   376  	}
   377  	s := grpc.NewServer()
   378  	defer s.Stop()
   379  	testgrpc.RegisterLoadBalancerStatsServiceServer(s, &statsService{})
   380  	testgrpc.RegisterXdsUpdateClientConfigureServiceServer(s, &configureService{})
   381  	reflection.Register(s)
   382  	cleanup, err := admin.Register(s)
   383  	if err != nil {
   384  		logger.Fatalf("Failed to register admin: %v", err)
   385  	}
   386  	defer cleanup()
   387  	go s.Serve(lis)
   388  
   389  	creds := insecure.NewCredentials()
   390  	if *secureMode {
   391  		var err error
   392  		creds, err = xds.NewClientCredentials(xds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
   393  		if err != nil {
   394  			logger.Fatalf("Failed to create xDS credentials: %v", err)
   395  		}
   396  	}
   397  
   398  	clients := make([]testgrpc.TestServiceClient, *numChannels)
   399  	for i := 0; i < *numChannels; i++ {
   400  		conn, err := grpc.Dial(*server, grpc.WithTransportCredentials(creds))
   401  		if err != nil {
   402  			logger.Fatalf("Fail to dial: %v", err)
   403  		}
   404  		defer conn.Close()
   405  		clients[i] = testgrpc.NewTestServiceClient(conn)
   406  	}
   407  	ticker := time.NewTicker(time.Second / time.Duration(*qps**numChannels))
   408  	defer ticker.Stop()
   409  	sendRPCs(clients, ticker)
   410  }
   411  
   412  func makeOneRPC(c testgrpc.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInfo, error) {
   413  	timeout := *rpcTimeout
   414  	if cfg.timeout != 0 {
   415  		timeout = time.Duration(cfg.timeout) * time.Second
   416  	}
   417  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   418  	defer cancel()
   419  
   420  	if len(cfg.md) != 0 {
   421  		ctx = metadata.NewOutgoingContext(ctx, cfg.md)
   422  	}
   423  	info := rpcInfo{typ: cfg.typ}
   424  
   425  	var (
   426  		p      peer.Peer
   427  		header metadata.MD
   428  		err    error
   429  	)
   430  	accStats.startRPC(cfg.typ)
   431  	switch cfg.typ {
   432  	case unaryCall:
   433  		var resp *testpb.SimpleResponse
   434  		resp, err = c.UnaryCall(ctx, &testpb.SimpleRequest{FillServerId: true}, grpc.Peer(&p), grpc.Header(&header))
   435  		// For UnaryCall, also read hostname from response, in case the server
   436  		// isn't updated to send headers.
   437  		if resp != nil {
   438  			info.hostname = resp.Hostname
   439  		}
   440  	case emptyCall:
   441  		_, err = c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p), grpc.Header(&header))
   442  	}
   443  	accStats.finishRPC(cfg.typ, err)
   444  	if err != nil {
   445  		return nil, nil, err
   446  	}
   447  
   448  	hosts := header["hostname"]
   449  	if len(hosts) > 0 {
   450  		info.hostname = hosts[0]
   451  	}
   452  	return &p, &info, err
   453  }
   454  
   455  func sendRPCs(clients []testgrpc.TestServiceClient, ticker *time.Ticker) {
   456  	var i int
   457  	for range ticker.C {
   458  		// Get and increment request ID, and save a list of watchers that are
   459  		// interested in this RPC.
   460  		mu.Lock()
   461  		savedRequestID := currentRequestID
   462  		currentRequestID++
   463  		savedWatchers := []*statsWatcher{}
   464  		for key, value := range watchers {
   465  			if key.startID <= savedRequestID && savedRequestID < key.endID {
   466  				savedWatchers = append(savedWatchers, value)
   467  			}
   468  		}
   469  		mu.Unlock()
   470  
   471  		// Get the RPC metadata configurations from the Configure RPC.
   472  		cfgs := rpcCfgs.Load().([]*rpcConfig)
   473  
   474  		c := clients[i]
   475  		for _, cfg := range cfgs {
   476  			go func(cfg *rpcConfig) {
   477  				p, info, err := makeOneRPC(c, cfg)
   478  
   479  				for _, watcher := range savedWatchers {
   480  					// This sends an empty string if the RPC failed.
   481  					watcher.chanHosts <- info
   482  				}
   483  				if err != nil && *failOnFailedRPC && hasRPCSucceeded() {
   484  					logger.Fatalf("RPC failed: %v", err)
   485  				}
   486  				if err == nil {
   487  					setRPCSucceeded()
   488  				}
   489  				if *printResponse {
   490  					if err == nil {
   491  						if cfg.typ == unaryCall {
   492  							// Need to keep this format, because some tests are
   493  							// relying on stdout.
   494  							fmt.Printf("Greeting: Hello world, this is %s, from %v\n", info.hostname, p.Addr)
   495  						} else {
   496  							fmt.Printf("RPC %q, from host %s, addr %v\n", cfg.typ, info.hostname, p.Addr)
   497  						}
   498  					} else {
   499  						fmt.Printf("RPC %q, failed with %v\n", cfg.typ, err)
   500  					}
   501  				}
   502  			}(cfg)
   503  		}
   504  		i = (i + 1) % len(clients)
   505  	}
   506  }
   507  

View as plain text