1
18
19 package server
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "github.com/google/go-cmp/cmp/cmpopts"
29 "google.golang.org/grpc/internal/grpctest"
30 "google.golang.org/grpc/internal/testutils/xds/e2e"
31 "google.golang.org/grpc/xds/internal/xdsclient"
32 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
33
34 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
35 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
36 )
37
38 type s struct {
39 grpctest.Tester
40 }
41
42 func Test(t *testing.T) {
43 grpctest.RunSubTests(t, s{})
44 }
45
46 const (
47 defaultTestTimeout = 10 * time.Second
48 defaultTestShortTimeout = 10 * time.Millisecond
49 )
50
51 const (
52 listenerName = "listener"
53 clusterName = "cluster"
54
55 route1 = "route1"
56 route2 = "route2"
57 route3 = "route3"
58 route4 = "route4"
59 )
60
61
62
63
64
65
66
67
68
69
70
71
72 func xdsSetupForTests(t *testing.T) (*e2e.ManagementServer, string, chan []string, chan []string, xdsclient.XDSClient) {
73 t.Helper()
74
75 ldsNamesCh := make(chan []string, 1)
76 rdsNamesCh := make(chan []string, 1)
77
78
79
80 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
81 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
82 switch req.GetTypeUrl() {
83 case version.V3ListenerURL:
84 select {
85 case <-ldsNamesCh:
86 default:
87 }
88 select {
89 case ldsNamesCh <- req.GetResourceNames():
90 default:
91 }
92 case version.V3RouteConfigURL:
93 select {
94 case <-rdsNamesCh:
95 default:
96 }
97 select {
98 case rdsNamesCh <- req.GetResourceNames():
99 default:
100 }
101 default:
102 return fmt.Errorf("unexpected resources %v of type %q requested", req.GetResourceNames(), req.GetTypeUrl())
103 }
104 return nil
105 },
106 AllowResourceSubset: true,
107 })
108 t.Cleanup(cleanup)
109
110 xdsC, cancel, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
111 if err != nil {
112 t.Fatal(err)
113 }
114 t.Cleanup(cancel)
115
116 return mgmtServer, nodeID, ldsNamesCh, rdsNamesCh, xdsC
117 }
118
119
120
121 func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) {
122 t.Helper()
123
124 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
125 select {
126 case <-ctx.Done():
127 case gotNames := <-namesCh:
128 if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) {
129 return
130 }
131 t.Logf("Received resource names %v, want %v", gotNames, wantNames)
132 }
133 }
134 t.Fatalf("Timeout waiting for resource to be requested from the management server")
135 }
136
137 func routeConfigResourceForName(name string) *v3routepb.RouteConfiguration {
138 return e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
139 RouteConfigName: name,
140 ListenerName: listenerName,
141 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeCluster,
142 ClusterName: clusterName,
143 })
144 }
145
146 type testCallbackVerify struct {
147 ch chan callbackStruct
148 }
149
150 type callbackStruct struct {
151 routeName string
152 rwu rdsWatcherUpdate
153 }
154
155 func (tcv *testCallbackVerify) testCallback(routeName string, rwu rdsWatcherUpdate) {
156 tcv.ch <- callbackStruct{routeName: routeName, rwu: rwu}
157 }
158
159 func verifyRouteName(ctx context.Context, t *testing.T, ch chan callbackStruct, want callbackStruct) {
160 t.Helper()
161 select {
162 case got := <-ch:
163 if diff := cmp.Diff(got.routeName, want.routeName); diff != "" {
164 t.Fatalf("unexpected update received (-got, +want):%v, want: %v", got, want)
165 }
166 case <-ctx.Done():
167 t.Fatalf("timeout waiting for callback")
168 }
169 }
170
171
172
173
174
175
176
177
178
179 func (s) TestRDSHandler(t *testing.T) {
180 mgmtServer, nodeID, _, rdsNamesCh, xdsC := xdsSetupForTests(t)
181
182 ch := make(chan callbackStruct, 1)
183 tcv := &testCallbackVerify{ch: ch}
184 rh := newRDSHandler(tcv.testCallback, xdsC, nil)
185
186 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
187 defer cancel()
188
189
190 routeResource1 := routeConfigResourceForName(route1)
191 resources := e2e.UpdateOptions{
192 NodeID: nodeID,
193 Routes: []*v3routepb.RouteConfiguration{routeResource1},
194 SkipValidation: true,
195 }
196 if err := mgmtServer.Update(ctx, resources); err != nil {
197 t.Fatal(err)
198 }
199
200 rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
201 waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2})
202 verifyRouteName(ctx, t, ch, callbackStruct{routeName: route1})
203
204
205 if got := rh.determineRouteConfigurationReady(); got != false {
206 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: false", false)
207 }
208
209
210 routeResource2 := routeConfigResourceForName(route2)
211 resources.Routes = []*v3routepb.RouteConfiguration{routeResource1, routeResource2}
212 if err := mgmtServer.Update(ctx, resources); err != nil {
213 t.Fatal(err)
214 }
215
216 verifyRouteName(ctx, t, ch, callbackStruct{routeName: route2})
217
218 if got := rh.determineRouteConfigurationReady(); got != true {
219 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
220 }
221
222 rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true})
223 waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route2, route3})
224 if got := rh.determineRouteConfigurationReady(); got != false {
225 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: false", got)
226 }
227
228
229 routeResource3 := routeConfigResourceForName(route3)
230 resources.Routes = []*v3routepb.RouteConfiguration{routeResource1, routeResource2, routeResource3}
231 if err := mgmtServer.Update(ctx, resources); err != nil {
232 t.Fatal(err)
233 }
234 verifyRouteName(ctx, t, ch, callbackStruct{routeName: route3})
235
236 if got := rh.determineRouteConfigurationReady(); got != true {
237 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
238 }
239
240 rh.updateRouteNamesToWatch(map[string]bool{route1: true, route3: true})
241 if got := rh.determineRouteConfigurationReady(); got != true {
242 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
243 }
244
245
246 rh.updateRouteNamesToWatch(map[string]bool{route1: true, route4: true})
247 waitForResourceNames(ctx, t, rdsNamesCh, []string{route1, route4})
248 if got := rh.determineRouteConfigurationReady(); got != false {
249 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: false", got)
250 }
251 routeResource4 := routeConfigResourceForName(route4)
252 resources.Routes = []*v3routepb.RouteConfiguration{routeResource1, routeResource2, routeResource3, routeResource4}
253 if err := mgmtServer.Update(ctx, resources); err != nil {
254 t.Fatal(err)
255 }
256 verifyRouteName(ctx, t, ch, callbackStruct{routeName: route4})
257 if got := rh.determineRouteConfigurationReady(); got != true {
258 t.Fatalf("rh.determineRouteConfigurationReady: %v, want: true", got)
259 }
260 }
261
View as plain text