1
17
18 package e2e
19
20 import (
21 "bytes"
22 "context"
23 "flag"
24 "fmt"
25 "os"
26 "strconv"
27 "testing"
28 "time"
29
30 "google.golang.org/grpc/internal/testutils/xds/e2e"
31
32 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
33 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
34 channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
35 testpb "google.golang.org/grpc/interop/grpc_testing"
36 )
37
38 var (
39 clientPath = flag.String("client", "./binaries/client", "The interop client")
40 serverPath = flag.String("server", "./binaries/server", "The interop server")
41 )
42
43 type testOpts struct {
44 testName string
45 backendCount int
46 clientFlags []string
47 }
48
49 func setup(t *testing.T, opts testOpts) (*controlPlane, *client, []*server) {
50 t.Helper()
51 if _, err := os.Stat(*clientPath); os.IsNotExist(err) {
52 t.Skip("skipped because client is not found")
53 }
54 if _, err := os.Stat(*serverPath); os.IsNotExist(err) {
55 t.Skip("skipped because server is not found")
56 }
57 backendCount := 1
58 if opts.backendCount != 0 {
59 backendCount = opts.backendCount
60 }
61
62 cp, err := newControlPlane()
63 if err != nil {
64 t.Fatalf("failed to start control-plane: %v", err)
65 }
66 t.Cleanup(cp.stop)
67
68 var clientLog bytes.Buffer
69 c, err := newClient(fmt.Sprintf("xds:///%s", opts.testName), *clientPath, cp.bootstrapContent, &clientLog, opts.clientFlags...)
70 if err != nil {
71 t.Fatalf("failed to start client: %v", err)
72 }
73 t.Cleanup(c.stop)
74
75 var serverLog bytes.Buffer
76 servers, err := newServers(opts.testName, *serverPath, cp.bootstrapContent, &serverLog, backendCount)
77 if err != nil {
78 t.Fatalf("failed to start server: %v", err)
79 }
80 t.Cleanup(func() {
81 for _, s := range servers {
82 s.stop()
83 }
84 })
85 t.Cleanup(func() {
86
87 t.Logf("\n----- client logs -----\n%v", clientLog.String())
88 t.Logf("\n----- server logs -----\n%v", serverLog.String())
89 })
90 return cp, c, servers
91 }
92
93 func TestPingPong(t *testing.T) {
94 const testName = "pingpong"
95 cp, c, _ := setup(t, testOpts{testName: testName})
96
97 resources := e2e.DefaultClientResources(e2e.ResourceParams{
98 DialTarget: testName,
99 NodeID: cp.nodeID,
100 Host: "localhost",
101 Port: serverPort,
102 SecLevel: e2e.SecurityLevelNone,
103 })
104
105 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
106 defer cancel()
107 if err := cp.server.Update(ctx, resources); err != nil {
108 t.Fatalf("failed to update control plane resources: %v", err)
109 }
110
111 st, err := c.clientStats(ctx)
112 if err != nil {
113 t.Fatalf("failed to get client stats: %v", err)
114 }
115 if st.NumFailures != 0 {
116 t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
117 }
118 }
119
120
121
122
123
124
125
126
127
128
129 func TestAffinity(t *testing.T) {
130 const (
131 testName = "affinity"
132 backendCount = 3
133 testMDKey = "xds_md"
134 testMDValue = "unary_yranu"
135 )
136 cp, c, servers := setup(t, testOpts{
137 testName: testName,
138 backendCount: backendCount,
139 clientFlags: []string{"--rpc=EmptyCall", fmt.Sprintf("--metadata=EmptyCall:%s:%s", testMDKey, testMDValue)},
140 })
141
142 resources := e2e.DefaultClientResources(e2e.ResourceParams{
143 DialTarget: testName,
144 NodeID: cp.nodeID,
145 Host: "localhost",
146 Port: serverPort,
147 SecLevel: e2e.SecurityLevelNone,
148 })
149
150
151 var ports []uint32
152 for _, s := range servers {
153 ports = append(ports, uint32(s.port))
154 }
155 edsMsg := resources.Endpoints[0]
156 resources.Endpoints[0] = e2e.DefaultEndpoint(
157 edsMsg.ClusterName,
158 "localhost",
159 ports,
160 )
161
162
163 cdsMsg := resources.Clusters[0]
164 cdsMsg.LbPolicy = v3clusterpb.Cluster_RING_HASH
165
166
167 rdsMsg := resources.Routes[0]
168 rdsMsg.VirtualHosts[0].Routes[0].Action = &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
169 ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: cdsMsg.Name},
170 HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
171 PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
172 Header: &v3routepb.RouteAction_HashPolicy_Header{
173 HeaderName: testMDKey,
174 },
175 },
176 }},
177 }}
178
179 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
180 defer cancel()
181 if err := cp.server.Update(ctx, resources); err != nil {
182 t.Fatalf("failed to update control plane resources: %v", err)
183 }
184
185
186
187
188
189
190
191
192
193
194
195 st, err := c.clientStats(ctx)
196 if err != nil {
197 t.Fatalf("failed to get client stats: %v", err)
198 }
199 if st.NumFailures != 0 {
200 t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
201 }
202 if len(st.RpcsByPeer) != 1 {
203 t.Fatalf("more than 1 backends got traffic: %v, want 1", st.RpcsByPeer)
204 }
205
206
207 scs, err := c.channelzSubChannels(ctx)
208 if err != nil {
209 t.Fatalf("failed to fetch channelz: %v", err)
210 }
211 verifySubConnStates(t, scs, map[channelzpb.ChannelConnectivityState_State]int{
212 channelzpb.ChannelConnectivityState_READY: 1,
213 channelzpb.ChannelConnectivityState_IDLE: 2,
214 })
215
216
217
218 var (
219 diffPeerPicked bool
220 mdValue int
221 )
222 for !diffPeerPicked {
223 if err := c.configRPCs(ctx, &testpb.ClientConfigureRequest{
224 Types: []testpb.ClientConfigureRequest_RpcType{
225 testpb.ClientConfigureRequest_EMPTY_CALL,
226 testpb.ClientConfigureRequest_UNARY_CALL,
227 },
228 Metadata: []*testpb.ClientConfigureRequest_Metadata{
229 {Type: testpb.ClientConfigureRequest_EMPTY_CALL, Key: testMDKey, Value: testMDValue},
230 {Type: testpb.ClientConfigureRequest_UNARY_CALL, Key: testMDKey, Value: strconv.Itoa(mdValue)},
231 },
232 }); err != nil {
233 t.Fatalf("failed to configure RPC: %v", err)
234 }
235
236 st, err := c.clientStats(ctx)
237 if err != nil {
238 t.Fatalf("failed to get client stats: %v", err)
239 }
240 if st.NumFailures != 0 {
241 t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
242 }
243 if len(st.RpcsByPeer) == 2 {
244 break
245 }
246
247 mdValue++
248 }
249
250
251 scs2, err := c.channelzSubChannels(ctx)
252 if err != nil {
253 t.Fatalf("failed to fetch channelz: %v", err)
254 }
255 verifySubConnStates(t, scs2, map[channelzpb.ChannelConnectivityState_State]int{
256 channelzpb.ChannelConnectivityState_READY: 2,
257 channelzpb.ChannelConnectivityState_IDLE: 1,
258 })
259 }
260
View as plain text