...
1
18
19 package test
20
21 import (
22 "context"
23 "fmt"
24 "net"
25 "sync"
26 "testing"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/credentials/insecure"
30 "google.golang.org/grpc/interop"
31 testgrpc "google.golang.org/grpc/interop/grpc_testing"
32 "google.golang.org/grpc/peer"
33 "google.golang.org/grpc/stats"
34 )
35
36
37
38
39 func (s) TestPeerForClientStatsHandler(t *testing.T) {
40 psh := &peerStatsHandler{}
41
42
43
44
45
46 expectedCallouts := map[stats.RPCStats]bool{
47 &stats.OutPayload{}: true,
48 &stats.InHeader{}: true,
49 &stats.OutHeader{}: true,
50 &stats.InTrailer{}: true,
51 &stats.OutTrailer{}: true,
52 &stats.End{}: true,
53 &stats.Begin{}: false,
54 &stats.PickerUpdated{}: false,
55 }
56
57
58 l, err := net.Listen("tcp", "localhost:0")
59 if err != nil {
60 t.Fatal(err)
61 }
62 s := grpc.NewServer()
63 testgrpc.RegisterTestServiceServer(s, interop.NewTestServer())
64 errCh := make(chan error)
65 go func() {
66 errCh <- s.Serve(l)
67 }()
68 defer func() {
69 s.Stop()
70 if err := <-errCh; err != nil {
71 t.Error(err)
72 }
73 }()
74
75
76 cc, err := grpc.NewClient(
77 l.Addr().String(),
78 grpc.WithTransportCredentials(insecure.NewCredentials()),
79 grpc.WithStatsHandler(psh))
80 if err != nil {
81 t.Fatal(err)
82 }
83 defer cc.Close()
84 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
85 defer cancel()
86 client := testgrpc.NewTestServiceClient(cc)
87 interop.DoEmptyUnaryCall(ctx, client)
88
89 psh.mu.Lock()
90 pshArgs := psh.args
91 psh.mu.Unlock()
92
93
94 uniqueStatsTypes := make(map[string]struct{})
95 for _, callbackArgs := range pshArgs {
96 key := fmt.Sprintf("%T", callbackArgs.rpcStats)
97 if _, exists := uniqueStatsTypes[key]; exists {
98 continue
99 }
100 uniqueStatsTypes[fmt.Sprintf("%T", callbackArgs.rpcStats)] = struct{}{}
101 }
102 if len(uniqueStatsTypes) != len(expectedCallouts) {
103 t.Errorf("Unexpected number of stats handler callouts. Got %v, want %v", len(uniqueStatsTypes), len(expectedCallouts))
104 }
105
106 for _, callbackArgs := range pshArgs {
107 expectedPeer, found := expectedCallouts[callbackArgs.rpcStats]
108
109
110 if found && expectedPeer && callbackArgs.peer != nil {
111 continue
112 } else if expectedPeer && callbackArgs.peer == nil {
113 t.Errorf("peer not populated for: %T", callbackArgs.rpcStats)
114 }
115 }
116 }
117
118 type peerStats struct {
119 rpcStats stats.RPCStats
120 peer *peer.Peer
121 }
122
123 type peerStatsHandler struct {
124 args []peerStats
125 mu sync.Mutex
126 }
127
128 func (h *peerStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
129 return ctx
130 }
131
132 func (h *peerStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
133 p, _ := peer.FromContext(ctx)
134 h.mu.Lock()
135 defer h.mu.Unlock()
136 h.args = append(h.args, peerStats{rs, p})
137 }
138
139 func (h *peerStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
140 return ctx
141 }
142
143 func (h *peerStatsHandler) HandleConn(context.Context, stats.ConnStats) {}
144
View as plain text