/* * * Copyright 2023 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package xdsclient import ( "context" "fmt" "testing" "time" "github.com/google/uuid" "google.golang.org/grpc/internal/grpcsync" util "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/internal/xds/bootstrap" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter. ) var emptyServerOpts = e2e.ManagementServerOptions{} var ( // Listener resource type implementation retrieved from the resource type map // in the internal package, which is initialized when the individual resource // types are created. listenerResourceType = internal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type) rtRegistry = newResourceTypeRegistry() ) func init() { // Simulating maybeRegister for listenerResourceType. The getter to this registry // is passed to the authority for accessing the resource type. rtRegistry.types[listenerResourceType.TypeURL()] = listenerResourceType } func setupTest(ctx context.Context, t *testing.T, opts e2e.ManagementServerOptions, watchExpiryTimeout time.Duration) (*authority, *e2e.ManagementServer, string) { t.Helper() nodeID := uuid.New().String() ms, err := e2e.StartManagementServer(opts) if err != nil { t.Fatalf("Failed to spin up the xDS management server: %q", err) } a, err := newAuthority(authorityArgs{ serverCfg: testutils.ServerConfigForAddress(t, ms.Address), bootstrapCfg: &bootstrap.Config{ NodeProto: &v3corepb.Node{Id: nodeID}, }, serializer: grpcsync.NewCallbackSerializer(ctx), resourceTypeGetter: rtRegistry.get, watchExpiryTimeout: watchExpiryTimeout, logger: nil, }) if err != nil { t.Fatalf("Failed to create authority: %q", err) } return a, ms, nodeID } // This tests verifies watch and timer state for the scenario where a watch for // an LDS resource is registered and the management server sends an update the // same resource. func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() a, ms, nodeID := setupTest(ctx, t, emptyServerOpts, defaultTestTimeout) defer ms.Stop() defer a.close() rn := "xdsclient-test-lds-resource" w := testutils.NewTestResourceWatcher() cancelResource := a.watchResource(listenerResourceType, rn, w) defer cancelResource() // Looping until the underlying transport has successfully sent the request to // the server, which would call the onSend callback and transition the watchState // to `watchStateRequested`. for ctx.Err() == nil { if err := compareWatchState(a, rn, watchStateRequested); err == nil { break } } if ctx.Err() != nil { t.Fatalf("Test timed out before state transition to %q was verified.", watchStateRequested) } // Updating mgmt server with the same lds resource. Blocking on watcher's update // ch to verify the watch state transition to `watchStateReceived`. if err := updateResourceInServer(ctx, ms, rn, nodeID); err != nil { t.Fatalf("Failed to update server with resource: %q; err: %q", rn, err) } for { select { case <-ctx.Done(): t.Fatal("Test timed out before watcher received an update from server.") case <-w.ErrorCh: case <-w.UpdateCh: // This means the OnUpdate callback was invoked and the watcher was notified. if err := compareWatchState(a, rn, watchStateReceived); err != nil { t.Fatal(err) } return } } } // This tests the resource's watch state transition when the ADS stream is closed // by the management server. After the test calls `watchResource` api to register // a watch for a resource, it stops the management server, and verifies the resource's // watch state transitions to `watchStateStarted` and timer ready to be restarted. func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() a, ms, _ := setupTest(ctx, t, emptyServerOpts, defaultTestTimeout) defer a.close() rn := "xdsclient-test-lds-resource" w := testutils.NewTestResourceWatcher() cancelResource := a.watchResource(listenerResourceType, rn, w) defer cancelResource() // Stopping the server and blocking on watcher's err channel to be notified. // This means the onErr callback should be invoked which transitions the watch // state to `watchStateStarted`. ms.Stop() select { case <-ctx.Done(): t.Fatal("Test timed out before verifying error propagation.") case err := <-w.ErrorCh: if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection { t.Fatal("Connection error not propagated to watchers.") } } if err := compareWatchState(a, rn, watchStateStarted); err != nil { t.Fatal(err) } } // This tests the case where the ADS stream breaks after successfully receiving // a message on the stream. The test performs the following: // - configures the management server with the ability to dropRequests based on // a boolean flag. // - update the mgmt server with resourceA. // - registers a watch for resourceA and verifies that the watcher's update // callback is invoked. // - registers a watch for resourceB and verifies that the watcher's update // callback is not invoked. This is because the management server does not // contain resourceB. // - force mgmt server to drop requests. Verify that watcher for resourceB gets // connection error. // - resume mgmt server to accept requests. // - update the mgmt server with resourceB and verifies that the watcher's // update callback is invoked. func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() // Create a restartable listener which can close existing connections. l, err := util.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } lis := util.NewRestartableListener(l) defer lis.Close() streamRestarted := grpcsync.NewEvent() serverOpt := e2e.ManagementServerOptions{ Listener: lis, OnStreamClosed: func(int64, *v3corepb.Node) { streamRestarted.Fire() }, } a, ms, nodeID := setupTest(ctx, t, serverOpt, defaultTestTimeout) defer ms.Stop() defer a.close() nameA := "xdsclient-test-lds-resourceA" watcherA := testutils.NewTestResourceWatcher() cancelA := a.watchResource(listenerResourceType, nameA, watcherA) if err := updateResourceInServer(ctx, ms, nameA, nodeID); err != nil { t.Fatalf("Failed to update server with resource: %q; err: %q", nameA, err) } // Blocking on resource A watcher's update Channel to verify that there is // more than one msg(s) received the ADS stream. select { case <-ctx.Done(): t.Fatal("Test timed out before watcher received the update.") case err := <-watcherA.ErrorCh: t.Fatalf("Watch got an unexpected error update: %q; want: valid update.", err) case <-watcherA.UpdateCh: } cancelA() lis.Stop() nameB := "xdsclient-test-lds-resourceB" watcherB := testutils.NewTestResourceWatcher() cancelB := a.watchResource(listenerResourceType, nameB, watcherB) defer cancelB() // Blocking on resource B watcher's error channel. This error should be due to // connectivity issue when reconnecting because the mgmt server was already been // stopped. Also verifying that OnResourceDoesNotExist() method was not invoked // on the watcher. select { case <-ctx.Done(): t.Fatal("Test timed out before mgmt server got the request.") case u := <-watcherB.UpdateCh: t.Fatalf("Watch got an unexpected resource update: %v.", u) case <-watcherB.ResourceDoesNotExistCh: t.Fatalf("Illegal invocation of OnResourceDoesNotExist() method on the watcher.") case gotErr := <-watcherB.ErrorCh: wantErr := xdsresource.ErrorTypeConnection if xdsresource.ErrType(gotErr) != wantErr { t.Fatalf("Watch got an unexpected error:%q. Want: %q.", gotErr, wantErr) } } // Updating server with resource B and also re-enabling requests on the server. if err := updateResourceInServer(ctx, ms, nameB, nodeID); err != nil { t.Fatalf("Failed to update server with resource: %q; err: %q", nameB, err) } lis.Restart() for { select { case <-ctx.Done(): t.Fatal("Test timed out before watcher received the update.") case <-watcherB.UpdateCh: return } } } func compareWatchState(a *authority, rn string, wantState watchState) error { a.resourcesMu.Lock() defer a.resourcesMu.Unlock() gotState := a.resources[listenerResourceType][rn].wState if gotState != wantState { return fmt.Errorf("Got %v. Want: %v", gotState, wantState) } wTimer := a.resources[listenerResourceType][rn].wTimer switch gotState { case watchStateRequested: if wTimer == nil { return fmt.Errorf("got nil timer, want active timer") } case watchStateStarted: if wTimer != nil { return fmt.Errorf("got active timer, want nil timer") } default: if wTimer.Stop() { // This means that the timer was running but could be successfully stopped. return fmt.Errorf("got active timer, want stopped timer") } } return nil } func updateResourceInServer(ctx context.Context, ms *e2e.ManagementServer, rn string, nID string) error { l := e2e.DefaultClientListener(rn, "new-rds-resource") resources := e2e.UpdateOptions{ NodeID: nID, Listeners: []*v3listenerpb.Listener{l}, SkipValidation: true, } return ms.Update(ctx, resources) }