...

Source file src/google.golang.org/grpc/test/xds/xds_client_ack_nack_test.go

Documentation: google.golang.org/grpc/test/xds

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"testing"
    25  
    26  	"github.com/google/go-cmp/cmp"
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/credentials/insecure"
    29  	"google.golang.org/grpc/internal/grpcsync"
    30  	"google.golang.org/grpc/internal/stubserver"
    31  	"google.golang.org/grpc/internal/testutils"
    32  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    33  
    34  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    35  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    36  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    37  	testpb "google.golang.org/grpc/interop/grpc_testing"
    38  )
    39  
    40  // We are interested in LDS, RDS, CDS and EDS resources as part of the regular
    41  // xDS flow on the client.
    42  const wantResources = 4
    43  
    44  // seenAllACKs returns true if the provided ackVersions map contains valid acks
    45  // for all the resources that we are interested in. If `wantNonEmpty` is true,
    46  // only non-empty ack versions are considered valid.
    47  func seenAllACKs(acksVersions map[string]string, wantNonEmpty bool) bool {
    48  	if len(acksVersions) != wantResources {
    49  		return false
    50  	}
    51  	for _, ack := range acksVersions {
    52  		if wantNonEmpty && ack == "" {
    53  			return false
    54  		}
    55  	}
    56  	return true
    57  }
    58  
    59  // TestClientResourceVersionAfterStreamRestart tests the scenario where the
    60  // xdsClient's ADS stream to the management server gets broken. This test
    61  // verifies that the version number on the initial request on the new stream
    62  // indicates the most recent version seen by the client on the previous stream.
    63  func (s) TestClientResourceVersionAfterStreamRestart(t *testing.T) {
    64  	// Create a restartable listener which can close existing connections.
    65  	l, err := testutils.LocalTCPListener()
    66  	if err != nil {
    67  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
    68  	}
    69  	lis := testutils.NewRestartableListener(l)
    70  
    71  	// We depend on the fact that the management server assigns monotonically
    72  	// increasing stream IDs starting at 1.
    73  	const (
    74  		idBeforeRestart = 1
    75  		idAfterRestart  = 2
    76  	)
    77  
    78  	// Events of importance in the test, in the order in which they are expected
    79  	// to happen.
    80  	acksReceivedBeforeRestart := grpcsync.NewEvent()
    81  	streamRestarted := grpcsync.NewEvent()
    82  	acksReceivedAfterRestart := grpcsync.NewEvent()
    83  
    84  	// Map from stream id to a map of resource type to resource version.
    85  	ackVersionsMap := make(map[int64]map[string]string)
    86  	managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
    87  		Listener: lis,
    88  		OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
    89  			// Return early under the following circumstances:
    90  			// - Received all the requests we wanted to see. This is to avoid
    91  			//   any stray requests leading to test flakes.
    92  			// - Request contains no resource names. Such requests are usually
    93  			//   seen when the xdsclient is shutting down and is no longer
    94  			//   interested in the resources that it had subscribed to earlier.
    95  			if acksReceivedAfterRestart.HasFired() || len(req.GetResourceNames()) == 0 {
    96  				return nil
    97  			}
    98  			// Create a stream specific map to store ack versions if this is the
    99  			// first time we are seeing this stream id.
   100  			if ackVersionsMap[id] == nil {
   101  				ackVersionsMap[id] = make(map[string]string)
   102  			}
   103  			ackVersionsMap[id][req.GetTypeUrl()] = req.GetVersionInfo()
   104  			// Prior to stream restart, we are interested only in non-empty
   105  			// resource versions. The xdsclient first sends out requests with an
   106  			// empty version string. After receipt of requested resource, it
   107  			// sends out another request for the same resource, but this time
   108  			// with a non-empty version string, to serve as an ACK.
   109  			if seenAllACKs(ackVersionsMap[idBeforeRestart], true) {
   110  				acksReceivedBeforeRestart.Fire()
   111  			}
   112  			// After stream restart, we expect the xdsclient to send out
   113  			// requests with version string set to the previously ACKed
   114  			// versions. If it sends out requests with empty version string, it
   115  			// is a bug and we want this test to catch it.
   116  			if seenAllACKs(ackVersionsMap[idAfterRestart], false) {
   117  				acksReceivedAfterRestart.Fire()
   118  			}
   119  			return nil
   120  		},
   121  		OnStreamClosed: func(int64, *v3corepb.Node) {
   122  			streamRestarted.Fire()
   123  		},
   124  	})
   125  	defer cleanup1()
   126  
   127  	server := stubserver.StartTestService(t, nil)
   128  	defer server.Stop()
   129  
   130  	const serviceName = "my-service-client-side-xds"
   131  	resources := e2e.DefaultClientResources(e2e.ResourceParams{
   132  		DialTarget: serviceName,
   133  		NodeID:     nodeID,
   134  		Host:       "localhost",
   135  		Port:       testutils.ParsePort(t, server.Address),
   136  		SecLevel:   e2e.SecurityLevelNone,
   137  	})
   138  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   139  	defer cancel()
   140  	if err := managementServer.Update(ctx, resources); err != nil {
   141  		t.Fatal(err)
   142  	}
   143  
   144  	// Create a ClientConn and make a successful RPC.
   145  	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
   146  	if err != nil {
   147  		t.Fatalf("failed to dial local test server: %v", err)
   148  	}
   149  	defer cc.Close()
   150  
   151  	client := testgrpc.NewTestServiceClient(cc)
   152  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   153  		t.Fatalf("rpc EmptyCall() failed: %v", err)
   154  	}
   155  
   156  	// A successful RPC means that the xdsclient received all requested
   157  	// resources. The ACKs from the xdsclient may get a little delayed. So, we
   158  	// need to wait for all ACKs to be received on the management server before
   159  	// restarting the stream.
   160  	select {
   161  	case <-ctx.Done():
   162  		t.Fatal("Timeout when waiting for all resources to be ACKed prior to stream restart")
   163  	case <-acksReceivedBeforeRestart.Done():
   164  	}
   165  
   166  	// Stop the listener on the management server. This will cause the client to
   167  	// backoff and recreate the stream.
   168  	lis.Stop()
   169  
   170  	// Wait for the stream to be closed on the server.
   171  	<-streamRestarted.Done()
   172  
   173  	// Restart the listener on the management server to be able to accept
   174  	// reconnect attempts from the client.
   175  	lis.Restart()
   176  
   177  	// Wait for all the previously sent resources to be re-requested.
   178  	select {
   179  	case <-ctx.Done():
   180  		t.Fatal("Timeout when waiting for all resources to be ACKed post stream restart")
   181  	case <-acksReceivedAfterRestart.Done():
   182  	}
   183  
   184  	if diff := cmp.Diff(ackVersionsMap[idBeforeRestart], ackVersionsMap[idAfterRestart]); diff != "" {
   185  		t.Fatalf("unexpected diff in ack versions before and after stream restart (-want, +got):\n%s", diff)
   186  	}
   187  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   188  		t.Fatalf("rpc EmptyCall() failed: %v", err)
   189  	}
   190  }
   191  

View as plain text