1
18
19
20 package main
21
22 import (
23 "context"
24 "flag"
25 "fmt"
26 "log"
27 "net"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/admin"
35 "google.golang.org/grpc/credentials/insecure"
36 "google.golang.org/grpc/credentials/xds"
37 "google.golang.org/grpc/grpclog"
38 "google.golang.org/grpc/metadata"
39 "google.golang.org/grpc/peer"
40 "google.golang.org/grpc/reflection"
41 "google.golang.org/grpc/status"
42 _ "google.golang.org/grpc/xds"
43
44 testgrpc "google.golang.org/grpc/interop/grpc_testing"
45 testpb "google.golang.org/grpc/interop/grpc_testing"
46 _ "google.golang.org/grpc/interop/xds"
47 )
48
49 func init() {
50 rpcCfgs.Store([]*rpcConfig{{typ: unaryCall}})
51 }
52
53 type statsWatcherKey struct {
54 startID int32
55 endID int32
56 }
57
58
59
60 type rpcInfo struct {
61 typ string
62 hostname string
63 }
64
65 type statsWatcher struct {
66 rpcsByPeer map[string]int32
67 rpcsByType map[string]map[string]int32
68 numFailures int32
69 remainingRPCs int32
70 chanHosts chan *rpcInfo
71 }
72
73 func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
74 rpcsByType := make(map[string]*testpb.LoadBalancerStatsResponse_RpcsByPeer, len(watcher.rpcsByType))
75 for t, rpcsByPeer := range watcher.rpcsByType {
76 rpcsByType[t] = &testpb.LoadBalancerStatsResponse_RpcsByPeer{
77 RpcsByPeer: rpcsByPeer,
78 }
79 }
80
81 return &testpb.LoadBalancerStatsResponse{
82 NumFailures: watcher.numFailures + watcher.remainingRPCs,
83 RpcsByPeer: watcher.rpcsByPeer,
84 RpcsByMethod: rpcsByType,
85 }
86 }
87
88 type accumulatedStats struct {
89 mu sync.Mutex
90 numRPCsStartedByMethod map[string]int32
91 numRPCsSucceededByMethod map[string]int32
92 numRPCsFailedByMethod map[string]int32
93 rpcStatusByMethod map[string]map[int32]int32
94 }
95
96 func convertRPCName(in string) string {
97 switch in {
98 case unaryCall:
99 return testpb.ClientConfigureRequest_UNARY_CALL.String()
100 case emptyCall:
101 return testpb.ClientConfigureRequest_EMPTY_CALL.String()
102 }
103 logger.Warningf("unrecognized rpc type: %s", in)
104 return in
105 }
106
107
108 func copyStatsMap(originalMap map[string]int32) map[string]int32 {
109 newMap := make(map[string]int32, len(originalMap))
110 for k, v := range originalMap {
111 newMap[k] = v
112 }
113 return newMap
114 }
115
116
117 func copyStatsIntMap(originalMap map[int32]int32) map[int32]int32 {
118 newMap := make(map[int32]int32, len(originalMap))
119 for k, v := range originalMap {
120 newMap[k] = v
121 }
122 return newMap
123 }
124
125 func (as *accumulatedStats) makeStatsMap() map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats {
126 m := make(map[string]*testpb.LoadBalancerAccumulatedStatsResponse_MethodStats)
127 for k, v := range as.numRPCsStartedByMethod {
128 m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{RpcsStarted: v}
129 }
130 for k, v := range as.rpcStatusByMethod {
131 if m[k] == nil {
132 m[k] = &testpb.LoadBalancerAccumulatedStatsResponse_MethodStats{}
133 }
134 m[k].Result = copyStatsIntMap(v)
135 }
136 return m
137 }
138
139 func (as *accumulatedStats) buildResp() *testpb.LoadBalancerAccumulatedStatsResponse {
140 as.mu.Lock()
141 defer as.mu.Unlock()
142 return &testpb.LoadBalancerAccumulatedStatsResponse{
143 NumRpcsStartedByMethod: copyStatsMap(as.numRPCsStartedByMethod),
144 NumRpcsSucceededByMethod: copyStatsMap(as.numRPCsSucceededByMethod),
145 NumRpcsFailedByMethod: copyStatsMap(as.numRPCsFailedByMethod),
146 StatsPerMethod: as.makeStatsMap(),
147 }
148 }
149
150 func (as *accumulatedStats) startRPC(rpcType string) {
151 as.mu.Lock()
152 defer as.mu.Unlock()
153 as.numRPCsStartedByMethod[convertRPCName(rpcType)]++
154 }
155
156 func (as *accumulatedStats) finishRPC(rpcType string, err error) {
157 as.mu.Lock()
158 defer as.mu.Unlock()
159 name := convertRPCName(rpcType)
160 if as.rpcStatusByMethod[name] == nil {
161 as.rpcStatusByMethod[name] = make(map[int32]int32)
162 }
163 as.rpcStatusByMethod[name][int32(status.Convert(err).Code())]++
164 if err != nil {
165 as.numRPCsFailedByMethod[name]++
166 return
167 }
168 as.numRPCsSucceededByMethod[name]++
169 }
170
171 var (
172 failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
173 numChannels = flag.Int("num_channels", 1, "Num of channels")
174 printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
175 qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
176 rpc = flag.String("rpc", "UnaryCall", "Types of RPCs to make, ',' separated string. RPCs can be EmptyCall or UnaryCall. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
177 rpcMetadata = flag.String("metadata", "", "The metadata to send with RPC, in format EmptyCall:key1:value1,UnaryCall:key2:value2. Deprecated: Use Configure RPC to XdsUpdateClientConfigureServiceServer instead.")
178 rpcTimeout = flag.Duration("rpc_timeout", 20*time.Second, "Per RPC timeout")
179 server = flag.String("server", "localhost:8080", "Address of server to connect to")
180 statsPort = flag.Int("stats_port", 8081, "Port to expose peer distribution stats service")
181 secureMode = flag.Bool("secure_mode", false, "If true, retrieve security configuration from the management server. Else, use insecure credentials.")
182
183 rpcCfgs atomic.Value
184
185 mu sync.Mutex
186 currentRequestID int32
187 watchers = make(map[statsWatcherKey]*statsWatcher)
188
189 accStats = accumulatedStats{
190 numRPCsStartedByMethod: make(map[string]int32),
191 numRPCsSucceededByMethod: make(map[string]int32),
192 numRPCsFailedByMethod: make(map[string]int32),
193 rpcStatusByMethod: make(map[string]map[int32]int32),
194 }
195
196
197
198 rpcSucceeded uint32
199
200 logger = grpclog.Component("interop")
201 )
202
203 type statsService struct {
204 testgrpc.UnimplementedLoadBalancerStatsServiceServer
205 }
206
207 func hasRPCSucceeded() bool {
208 return atomic.LoadUint32(&rpcSucceeded) > 0
209 }
210
211 func setRPCSucceeded() {
212 atomic.StoreUint32(&rpcSucceeded, 1)
213 }
214
215
216
217
218
219 func (s *statsService) GetClientStats(ctx context.Context, in *testpb.LoadBalancerStatsRequest) (*testpb.LoadBalancerStatsResponse, error) {
220 mu.Lock()
221 watcherKey := statsWatcherKey{currentRequestID, currentRequestID + in.GetNumRpcs()}
222 watcher, ok := watchers[watcherKey]
223 if !ok {
224 watcher = &statsWatcher{
225 rpcsByPeer: make(map[string]int32),
226 rpcsByType: make(map[string]map[string]int32),
227 numFailures: 0,
228 remainingRPCs: in.GetNumRpcs(),
229 chanHosts: make(chan *rpcInfo),
230 }
231 watchers[watcherKey] = watcher
232 }
233 mu.Unlock()
234
235 ctx, cancel := context.WithTimeout(ctx, time.Duration(in.GetTimeoutSec())*time.Second)
236 defer cancel()
237
238 defer func() {
239 mu.Lock()
240 delete(watchers, watcherKey)
241 mu.Unlock()
242 }()
243
244
245 for {
246 select {
247 case info := <-watcher.chanHosts:
248 if info != nil {
249 watcher.rpcsByPeer[info.hostname]++
250
251 rpcsByPeerForType := watcher.rpcsByType[info.typ]
252 if rpcsByPeerForType == nil {
253 rpcsByPeerForType = make(map[string]int32)
254 watcher.rpcsByType[info.typ] = rpcsByPeerForType
255 }
256 rpcsByPeerForType[info.hostname]++
257 } else {
258 watcher.numFailures++
259 }
260 watcher.remainingRPCs--
261 if watcher.remainingRPCs == 0 {
262 return watcher.buildResp(), nil
263 }
264 case <-ctx.Done():
265 logger.Info("Timed out, returning partial stats")
266 return watcher.buildResp(), nil
267 }
268 }
269 }
270
271 func (s *statsService) GetClientAccumulatedStats(ctx context.Context, in *testpb.LoadBalancerAccumulatedStatsRequest) (*testpb.LoadBalancerAccumulatedStatsResponse, error) {
272 return accStats.buildResp(), nil
273 }
274
275 type configureService struct {
276 testgrpc.UnimplementedXdsUpdateClientConfigureServiceServer
277 }
278
279 func (s *configureService) Configure(ctx context.Context, in *testpb.ClientConfigureRequest) (*testpb.ClientConfigureResponse, error) {
280 rpcsToMD := make(map[testpb.ClientConfigureRequest_RpcType][]string)
281 for _, typ := range in.GetTypes() {
282 rpcsToMD[typ] = nil
283 }
284 for _, md := range in.GetMetadata() {
285 typ := md.GetType()
286 strs, ok := rpcsToMD[typ]
287 if !ok {
288 continue
289 }
290 rpcsToMD[typ] = append(strs, md.GetKey(), md.GetValue())
291 }
292 cfgs := make([]*rpcConfig, 0, len(rpcsToMD))
293 for typ, md := range rpcsToMD {
294 var rpcType string
295 switch typ {
296 case testpb.ClientConfigureRequest_UNARY_CALL:
297 rpcType = unaryCall
298 case testpb.ClientConfigureRequest_EMPTY_CALL:
299 rpcType = emptyCall
300 default:
301 return nil, fmt.Errorf("unsupported RPC type: %v", typ)
302 }
303 cfgs = append(cfgs, &rpcConfig{
304 typ: rpcType,
305 md: metadata.Pairs(md...),
306 timeout: in.GetTimeoutSec(),
307 })
308 }
309 rpcCfgs.Store(cfgs)
310 return &testpb.ClientConfigureResponse{}, nil
311 }
312
313 const (
314 unaryCall string = "UnaryCall"
315 emptyCall string = "EmptyCall"
316 )
317
318 func parseRPCTypes(rpcStr string) []string {
319 if len(rpcStr) == 0 {
320 return []string{unaryCall}
321 }
322
323 rpcs := strings.Split(rpcStr, ",")
324 ret := make([]string, 0, len(rpcStr))
325 for _, r := range rpcs {
326 switch r {
327 case unaryCall, emptyCall:
328 ret = append(ret, r)
329 default:
330 flag.PrintDefaults()
331 log.Fatalf("unsupported RPC type: %v", r)
332 }
333 }
334 return ret
335 }
336
337 type rpcConfig struct {
338 typ string
339 md metadata.MD
340 timeout int32
341 }
342
343
344
345
346 func parseRPCMetadata(rpcMetadataStr string, rpcs []string) []*rpcConfig {
347 rpcMetadataSplit := strings.Split(rpcMetadataStr, ",")
348 rpcsToMD := make(map[string][]string)
349 for _, rm := range rpcMetadataSplit {
350 rmSplit := strings.Split(rm, ":")
351 if len(rmSplit)%2 != 1 {
352 log.Fatalf("invalid metadata config %v, want EmptyCall:key1:value1", rm)
353 }
354 rpcsToMD[rmSplit[0]] = append(rpcsToMD[rmSplit[0]], rmSplit[1:]...)
355 }
356 ret := make([]*rpcConfig, 0, len(rpcs))
357 for _, rpcT := range rpcs {
358 rpcC := &rpcConfig{
359 typ: rpcT,
360 }
361 if md := rpcsToMD[string(rpcT)]; len(md) > 0 {
362 rpcC.md = metadata.Pairs(md...)
363 }
364 ret = append(ret, rpcC)
365 }
366 return ret
367 }
368
369 func main() {
370 flag.Parse()
371 rpcCfgs.Store(parseRPCMetadata(*rpcMetadata, parseRPCTypes(*rpc)))
372
373 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *statsPort))
374 if err != nil {
375 logger.Fatalf("failed to listen: %v", err)
376 }
377 s := grpc.NewServer()
378 defer s.Stop()
379 testgrpc.RegisterLoadBalancerStatsServiceServer(s, &statsService{})
380 testgrpc.RegisterXdsUpdateClientConfigureServiceServer(s, &configureService{})
381 reflection.Register(s)
382 cleanup, err := admin.Register(s)
383 if err != nil {
384 logger.Fatalf("Failed to register admin: %v", err)
385 }
386 defer cleanup()
387 go s.Serve(lis)
388
389 creds := insecure.NewCredentials()
390 if *secureMode {
391 var err error
392 creds, err = xds.NewClientCredentials(xds.ClientOptions{FallbackCreds: insecure.NewCredentials()})
393 if err != nil {
394 logger.Fatalf("Failed to create xDS credentials: %v", err)
395 }
396 }
397
398 clients := make([]testgrpc.TestServiceClient, *numChannels)
399 for i := 0; i < *numChannels; i++ {
400 conn, err := grpc.Dial(*server, grpc.WithTransportCredentials(creds))
401 if err != nil {
402 logger.Fatalf("Fail to dial: %v", err)
403 }
404 defer conn.Close()
405 clients[i] = testgrpc.NewTestServiceClient(conn)
406 }
407 ticker := time.NewTicker(time.Second / time.Duration(*qps**numChannels))
408 defer ticker.Stop()
409 sendRPCs(clients, ticker)
410 }
411
412 func makeOneRPC(c testgrpc.TestServiceClient, cfg *rpcConfig) (*peer.Peer, *rpcInfo, error) {
413 timeout := *rpcTimeout
414 if cfg.timeout != 0 {
415 timeout = time.Duration(cfg.timeout) * time.Second
416 }
417 ctx, cancel := context.WithTimeout(context.Background(), timeout)
418 defer cancel()
419
420 if len(cfg.md) != 0 {
421 ctx = metadata.NewOutgoingContext(ctx, cfg.md)
422 }
423 info := rpcInfo{typ: cfg.typ}
424
425 var (
426 p peer.Peer
427 header metadata.MD
428 err error
429 )
430 accStats.startRPC(cfg.typ)
431 switch cfg.typ {
432 case unaryCall:
433 var resp *testpb.SimpleResponse
434 resp, err = c.UnaryCall(ctx, &testpb.SimpleRequest{FillServerId: true}, grpc.Peer(&p), grpc.Header(&header))
435
436
437 if resp != nil {
438 info.hostname = resp.Hostname
439 }
440 case emptyCall:
441 _, err = c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&p), grpc.Header(&header))
442 }
443 accStats.finishRPC(cfg.typ, err)
444 if err != nil {
445 return nil, nil, err
446 }
447
448 hosts := header["hostname"]
449 if len(hosts) > 0 {
450 info.hostname = hosts[0]
451 }
452 return &p, &info, err
453 }
454
455 func sendRPCs(clients []testgrpc.TestServiceClient, ticker *time.Ticker) {
456 var i int
457 for range ticker.C {
458
459
460 mu.Lock()
461 savedRequestID := currentRequestID
462 currentRequestID++
463 savedWatchers := []*statsWatcher{}
464 for key, value := range watchers {
465 if key.startID <= savedRequestID && savedRequestID < key.endID {
466 savedWatchers = append(savedWatchers, value)
467 }
468 }
469 mu.Unlock()
470
471
472 cfgs := rpcCfgs.Load().([]*rpcConfig)
473
474 c := clients[i]
475 for _, cfg := range cfgs {
476 go func(cfg *rpcConfig) {
477 p, info, err := makeOneRPC(c, cfg)
478
479 for _, watcher := range savedWatchers {
480
481 watcher.chanHosts <- info
482 }
483 if err != nil && *failOnFailedRPC && hasRPCSucceeded() {
484 logger.Fatalf("RPC failed: %v", err)
485 }
486 if err == nil {
487 setRPCSucceeded()
488 }
489 if *printResponse {
490 if err == nil {
491 if cfg.typ == unaryCall {
492
493
494 fmt.Printf("Greeting: Hello world, this is %s, from %v\n", info.hostname, p.Addr)
495 } else {
496 fmt.Printf("RPC %q, from host %s, addr %v\n", cfg.typ, info.hostname, p.Addr)
497 }
498 } else {
499 fmt.Printf("RPC %q, failed with %v\n", cfg.typ, err)
500 }
501 }
502 }(cfg)
503 }
504 i = (i + 1) % len(clients)
505 }
506 }
507
View as plain text