1
18
19 package xdsclient
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25 "time"
26
27 "github.com/google/uuid"
28 "google.golang.org/grpc/internal/grpcsync"
29 util "google.golang.org/grpc/internal/testutils"
30 "google.golang.org/grpc/internal/testutils/xds/e2e"
31 "google.golang.org/grpc/xds/internal"
32
33 "google.golang.org/grpc/internal/xds/bootstrap"
34 "google.golang.org/grpc/xds/internal/testutils"
35 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
36 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
37
38 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
39 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
40 _ "google.golang.org/grpc/xds/internal/httpfilter/router"
41 )
42
43 var emptyServerOpts = e2e.ManagementServerOptions{}
44
45 var (
46
47
48
49 listenerResourceType = internal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type)
50 rtRegistry = newResourceTypeRegistry()
51 )
52
53 func init() {
54
55
56 rtRegistry.types[listenerResourceType.TypeURL()] = listenerResourceType
57 }
58
59 func setupTest(ctx context.Context, t *testing.T, opts e2e.ManagementServerOptions, watchExpiryTimeout time.Duration) (*authority, *e2e.ManagementServer, string) {
60 t.Helper()
61 nodeID := uuid.New().String()
62 ms, err := e2e.StartManagementServer(opts)
63 if err != nil {
64 t.Fatalf("Failed to spin up the xDS management server: %q", err)
65 }
66
67 a, err := newAuthority(authorityArgs{
68 serverCfg: testutils.ServerConfigForAddress(t, ms.Address),
69 bootstrapCfg: &bootstrap.Config{
70 NodeProto: &v3corepb.Node{Id: nodeID},
71 },
72 serializer: grpcsync.NewCallbackSerializer(ctx),
73 resourceTypeGetter: rtRegistry.get,
74 watchExpiryTimeout: watchExpiryTimeout,
75 logger: nil,
76 })
77 if err != nil {
78 t.Fatalf("Failed to create authority: %q", err)
79 }
80 return a, ms, nodeID
81 }
82
83
84
85
86 func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) {
87 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
88 defer cancel()
89 a, ms, nodeID := setupTest(ctx, t, emptyServerOpts, defaultTestTimeout)
90 defer ms.Stop()
91 defer a.close()
92
93 rn := "xdsclient-test-lds-resource"
94 w := testutils.NewTestResourceWatcher()
95 cancelResource := a.watchResource(listenerResourceType, rn, w)
96 defer cancelResource()
97
98
99
100
101 for ctx.Err() == nil {
102 if err := compareWatchState(a, rn, watchStateRequested); err == nil {
103 break
104 }
105 }
106 if ctx.Err() != nil {
107 t.Fatalf("Test timed out before state transition to %q was verified.", watchStateRequested)
108 }
109
110
111
112 if err := updateResourceInServer(ctx, ms, rn, nodeID); err != nil {
113 t.Fatalf("Failed to update server with resource: %q; err: %q", rn, err)
114 }
115 for {
116 select {
117 case <-ctx.Done():
118 t.Fatal("Test timed out before watcher received an update from server.")
119 case <-w.ErrorCh:
120 case <-w.UpdateCh:
121
122 if err := compareWatchState(a, rn, watchStateReceived); err != nil {
123 t.Fatal(err)
124 }
125 return
126 }
127 }
128 }
129
130
131
132
133
134 func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) {
135 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
136 defer cancel()
137 a, ms, _ := setupTest(ctx, t, emptyServerOpts, defaultTestTimeout)
138 defer a.close()
139
140 rn := "xdsclient-test-lds-resource"
141 w := testutils.NewTestResourceWatcher()
142 cancelResource := a.watchResource(listenerResourceType, rn, w)
143 defer cancelResource()
144
145
146
147
148 ms.Stop()
149
150 select {
151 case <-ctx.Done():
152 t.Fatal("Test timed out before verifying error propagation.")
153 case err := <-w.ErrorCh:
154 if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection {
155 t.Fatal("Connection error not propagated to watchers.")
156 }
157 }
158
159 if err := compareWatchState(a, rn, watchStateStarted); err != nil {
160 t.Fatal(err)
161 }
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) {
180 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
181 defer cancel()
182
183 l, err := util.LocalTCPListener()
184 if err != nil {
185 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
186 }
187 lis := util.NewRestartableListener(l)
188 defer lis.Close()
189 streamRestarted := grpcsync.NewEvent()
190 serverOpt := e2e.ManagementServerOptions{
191 Listener: lis,
192 OnStreamClosed: func(int64, *v3corepb.Node) {
193 streamRestarted.Fire()
194 },
195 }
196
197 a, ms, nodeID := setupTest(ctx, t, serverOpt, defaultTestTimeout)
198 defer ms.Stop()
199 defer a.close()
200
201 nameA := "xdsclient-test-lds-resourceA"
202 watcherA := testutils.NewTestResourceWatcher()
203 cancelA := a.watchResource(listenerResourceType, nameA, watcherA)
204
205 if err := updateResourceInServer(ctx, ms, nameA, nodeID); err != nil {
206 t.Fatalf("Failed to update server with resource: %q; err: %q", nameA, err)
207 }
208
209
210
211 select {
212 case <-ctx.Done():
213 t.Fatal("Test timed out before watcher received the update.")
214 case err := <-watcherA.ErrorCh:
215 t.Fatalf("Watch got an unexpected error update: %q; want: valid update.", err)
216 case <-watcherA.UpdateCh:
217 }
218
219 cancelA()
220 lis.Stop()
221
222 nameB := "xdsclient-test-lds-resourceB"
223 watcherB := testutils.NewTestResourceWatcher()
224 cancelB := a.watchResource(listenerResourceType, nameB, watcherB)
225 defer cancelB()
226
227
228
229
230
231 select {
232 case <-ctx.Done():
233 t.Fatal("Test timed out before mgmt server got the request.")
234 case u := <-watcherB.UpdateCh:
235 t.Fatalf("Watch got an unexpected resource update: %v.", u)
236 case <-watcherB.ResourceDoesNotExistCh:
237 t.Fatalf("Illegal invocation of OnResourceDoesNotExist() method on the watcher.")
238 case gotErr := <-watcherB.ErrorCh:
239 wantErr := xdsresource.ErrorTypeConnection
240 if xdsresource.ErrType(gotErr) != wantErr {
241 t.Fatalf("Watch got an unexpected error:%q. Want: %q.", gotErr, wantErr)
242 }
243 }
244
245
246 if err := updateResourceInServer(ctx, ms, nameB, nodeID); err != nil {
247 t.Fatalf("Failed to update server with resource: %q; err: %q", nameB, err)
248 }
249 lis.Restart()
250
251 for {
252 select {
253 case <-ctx.Done():
254 t.Fatal("Test timed out before watcher received the update.")
255 case <-watcherB.UpdateCh:
256 return
257 }
258 }
259 }
260
261 func compareWatchState(a *authority, rn string, wantState watchState) error {
262 a.resourcesMu.Lock()
263 defer a.resourcesMu.Unlock()
264 gotState := a.resources[listenerResourceType][rn].wState
265 if gotState != wantState {
266 return fmt.Errorf("Got %v. Want: %v", gotState, wantState)
267 }
268
269 wTimer := a.resources[listenerResourceType][rn].wTimer
270 switch gotState {
271 case watchStateRequested:
272 if wTimer == nil {
273 return fmt.Errorf("got nil timer, want active timer")
274 }
275 case watchStateStarted:
276 if wTimer != nil {
277 return fmt.Errorf("got active timer, want nil timer")
278 }
279 default:
280 if wTimer.Stop() {
281
282 return fmt.Errorf("got active timer, want stopped timer")
283 }
284 }
285 return nil
286 }
287
288 func updateResourceInServer(ctx context.Context, ms *e2e.ManagementServer, rn string, nID string) error {
289 l := e2e.DefaultClientListener(rn, "new-rds-resource")
290 resources := e2e.UpdateOptions{
291 NodeID: nID,
292 Listeners: []*v3listenerpb.Listener{l},
293 SkipValidation: true,
294 }
295 return ms.Update(ctx, resources)
296 }
297
View as plain text