...

Source file src/google.golang.org/grpc/xds/csds/csds_e2e_test.go

Documentation: google.golang.org/grpc/xds/csds

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package csds_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"io"
    25  	"sort"
    26  	"strings"
    27  	"testing"
    28  	"time"
    29  
    30  	"github.com/google/go-cmp/cmp"
    31  	"github.com/google/uuid"
    32  	"google.golang.org/grpc"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/internal/grpctest"
    35  	"google.golang.org/grpc/internal/testutils"
    36  	"google.golang.org/grpc/internal/testutils/xds/bootstrap"
    37  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    38  	"google.golang.org/grpc/xds/csds"
    39  	"google.golang.org/grpc/xds/internal/xdsclient"
    40  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    41  	"google.golang.org/protobuf/encoding/prototext"
    42  	"google.golang.org/protobuf/testing/protocmp"
    43  	"google.golang.org/protobuf/types/known/anypb"
    44  
    45  	v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
    46  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    47  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    48  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    49  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    50  	v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
    51  	v3statuspbgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
    52  
    53  	_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
    54  )
    55  
    56  const defaultTestTimeout = 5 * time.Second
    57  
    58  var cmpOpts = cmp.Options{
    59  	cmp.Transformer("sort", func(in []*v3statuspb.ClientConfig_GenericXdsConfig) []*v3statuspb.ClientConfig_GenericXdsConfig {
    60  		out := append([]*v3statuspb.ClientConfig_GenericXdsConfig(nil), in...)
    61  		sort.Slice(out, func(i, j int) bool {
    62  			a, b := out[i], out[j]
    63  			if a == nil {
    64  				return true
    65  			}
    66  			if b == nil {
    67  				return false
    68  			}
    69  			if strings.Compare(a.TypeUrl, b.TypeUrl) == 0 {
    70  				return strings.Compare(a.Name, b.Name) < 0
    71  			}
    72  			return strings.Compare(a.TypeUrl, b.TypeUrl) < 0
    73  		})
    74  		return out
    75  	}),
    76  	protocmp.Transform(),
    77  	protocmp.IgnoreFields((*v3statuspb.ClientConfig_GenericXdsConfig)(nil), "last_updated"),
    78  	protocmp.IgnoreFields((*v3adminpb.UpdateFailureState)(nil), "last_update_attempt", "details"),
    79  }
    80  
    81  type s struct {
    82  	grpctest.Tester
    83  }
    84  
    85  func Test(t *testing.T) {
    86  	grpctest.RunSubTests(t, s{})
    87  }
    88  
    89  // The following watcher implementations are no-ops since we don't really care
    90  // about the callback received by these watchers in the test. We only care
    91  // whether CSDS reports the expected state.
    92  
    93  type unimplementedListenerWatcher struct{}
    94  
    95  func (unimplementedListenerWatcher) OnUpdate(*xdsresource.ListenerResourceData) {}
    96  func (unimplementedListenerWatcher) OnError(error)                              {}
    97  func (unimplementedListenerWatcher) OnResourceDoesNotExist()                    {}
    98  
    99  type unimplementedRouteConfigWatcher struct{}
   100  
   101  func (unimplementedRouteConfigWatcher) OnUpdate(*xdsresource.RouteConfigResourceData) {}
   102  func (unimplementedRouteConfigWatcher) OnError(error)                                 {}
   103  func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist()                       {}
   104  
   105  type unimplementedClusterWatcher struct{}
   106  
   107  func (unimplementedClusterWatcher) OnUpdate(*xdsresource.ClusterResourceData) {}
   108  func (unimplementedClusterWatcher) OnError(error)                             {}
   109  func (unimplementedClusterWatcher) OnResourceDoesNotExist()                   {}
   110  
   111  type unimplementedEndpointsWatcher struct{}
   112  
   113  func (unimplementedEndpointsWatcher) OnUpdate(*xdsresource.EndpointsResourceData) {}
   114  func (unimplementedEndpointsWatcher) OnError(error)                               {}
   115  func (unimplementedEndpointsWatcher) OnResourceDoesNotExist()                     {}
   116  
   117  func (s) TestCSDS(t *testing.T) {
   118  	// Spin up a xDS management server on a local port.
   119  	nodeID := uuid.New().String()
   120  	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
   121  	if err != nil {
   122  		t.Fatal(err)
   123  	}
   124  	defer mgmtServer.Stop()
   125  
   126  	// Create a bootstrap file in a temporary directory.
   127  	bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{
   128  		NodeID:    nodeID,
   129  		ServerURI: mgmtServer.Address,
   130  	})
   131  	if err != nil {
   132  		t.Fatal(err)
   133  	}
   134  	defer bootstrapCleanup()
   135  
   136  	// Create an xDS client. This will end up using the same singleton as used
   137  	// by the CSDS service.
   138  	xdsC, close, err := xdsclient.New()
   139  	if err != nil {
   140  		t.Fatalf("Failed to create xDS client: %v", err)
   141  	}
   142  	defer close()
   143  
   144  	// Initialize an gRPC server and register CSDS on it.
   145  	server := grpc.NewServer()
   146  	csdss, err := csds.NewClientStatusDiscoveryServer()
   147  	if err != nil {
   148  		t.Fatal(err)
   149  	}
   150  	v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
   151  	defer func() {
   152  		server.Stop()
   153  		csdss.Close()
   154  	}()
   155  
   156  	// Create a local listener and pass it to Serve().
   157  	lis, err := testutils.LocalTCPListener()
   158  	if err != nil {
   159  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   160  	}
   161  	go func() {
   162  		if err := server.Serve(lis); err != nil {
   163  			t.Errorf("Serve() failed: %v", err)
   164  		}
   165  	}()
   166  
   167  	// Create a client to the CSDS server.
   168  	conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   169  	if err != nil {
   170  		t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
   171  	}
   172  	c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
   173  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   174  	defer cancel()
   175  	stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
   176  	if err != nil {
   177  		t.Fatalf("Failed to create a stream for CSDS: %v", err)
   178  	}
   179  	defer conn.Close()
   180  
   181  	// Verify that the xDS client reports an empty config.
   182  	if err := checkClientStatusResponse(stream, nil); err != nil {
   183  		t.Fatal(err)
   184  	}
   185  
   186  	// Initialize the xDS resources to be used in this test.
   187  	ldsTargets := []string{"lds.target.good:0000", "lds.target.good:1111"}
   188  	rdsTargets := []string{"route-config-0", "route-config-1"}
   189  	cdsTargets := []string{"cluster-0", "cluster-1"}
   190  	edsTargets := []string{"endpoints-0", "endpoints-1"}
   191  	listeners := make([]*v3listenerpb.Listener, len(ldsTargets))
   192  	listenerAnys := make([]*anypb.Any, len(ldsTargets))
   193  	for i := range ldsTargets {
   194  		listeners[i] = e2e.DefaultClientListener(ldsTargets[i], rdsTargets[i])
   195  		listenerAnys[i] = testutils.MarshalAny(t, listeners[i])
   196  	}
   197  	routes := make([]*v3routepb.RouteConfiguration, len(rdsTargets))
   198  	routeAnys := make([]*anypb.Any, len(rdsTargets))
   199  	for i := range rdsTargets {
   200  		routes[i] = e2e.DefaultRouteConfig(rdsTargets[i], ldsTargets[i], cdsTargets[i])
   201  		routeAnys[i] = testutils.MarshalAny(t, routes[i])
   202  	}
   203  	clusters := make([]*v3clusterpb.Cluster, len(cdsTargets))
   204  	clusterAnys := make([]*anypb.Any, len(cdsTargets))
   205  	for i := range cdsTargets {
   206  		clusters[i] = e2e.DefaultCluster(cdsTargets[i], edsTargets[i], e2e.SecurityLevelNone)
   207  		clusterAnys[i] = testutils.MarshalAny(t, clusters[i])
   208  	}
   209  	endpoints := make([]*v3endpointpb.ClusterLoadAssignment, len(edsTargets))
   210  	endpointAnys := make([]*anypb.Any, len(edsTargets))
   211  	ips := []string{"0.0.0.0", "1.1.1.1"}
   212  	ports := []uint32{123, 456}
   213  	for i := range edsTargets {
   214  		endpoints[i] = e2e.DefaultEndpoint(edsTargets[i], ips[i], ports[i:i+1])
   215  		endpointAnys[i] = testutils.MarshalAny(t, endpoints[i])
   216  	}
   217  
   218  	// Register watches on the xDS client for two resources of each type.
   219  	for _, target := range ldsTargets {
   220  		xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{})
   221  	}
   222  	for _, target := range rdsTargets {
   223  		xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{})
   224  	}
   225  	for _, target := range cdsTargets {
   226  		xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{})
   227  	}
   228  	for _, target := range edsTargets {
   229  		xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{})
   230  	}
   231  
   232  	// Verify that the xDS client reports the resources as being in "Requested"
   233  	// state.
   234  	want := []*v3statuspb.ClientConfig_GenericXdsConfig{}
   235  	for i := range ldsTargets {
   236  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
   237  	}
   238  	for i := range rdsTargets {
   239  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
   240  	}
   241  	for i := range cdsTargets {
   242  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
   243  	}
   244  	for i := range edsTargets {
   245  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "", v3adminpb.ClientResourceStatus_REQUESTED, nil))
   246  	}
   247  	for {
   248  		if err := ctx.Err(); err != nil {
   249  			t.Fatalf("Timeout when waiting for resources in \"Requested\" state: %v", err)
   250  		}
   251  		if err := checkClientStatusResponse(stream, want); err == nil {
   252  			break
   253  		}
   254  		time.Sleep(time.Millisecond * 100)
   255  	}
   256  
   257  	// Configure the management server with two resources of each type,
   258  	// corresponding to the watches registered above.
   259  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
   260  		NodeID:    nodeID,
   261  		Listeners: listeners,
   262  		Routes:    routes,
   263  		Clusters:  clusters,
   264  		Endpoints: endpoints,
   265  	}); err != nil {
   266  		t.Fatal(err)
   267  	}
   268  
   269  	// Verify that the xDS client reports the resources as being in "ACKed"
   270  	// state, and in version "1".
   271  	want = nil
   272  	for i := range ldsTargets {
   273  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i]))
   274  	}
   275  	for i := range rdsTargets {
   276  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i]))
   277  	}
   278  	for i := range cdsTargets {
   279  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i]))
   280  	}
   281  	for i := range edsTargets {
   282  		want = append(want, makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "1", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i]))
   283  	}
   284  	for {
   285  		if err := ctx.Err(); err != nil {
   286  			t.Fatalf("Timeout when waiting for resources in \"ACKed\" state: %v", err)
   287  		}
   288  		err := checkClientStatusResponse(stream, want)
   289  		if err == nil {
   290  			break
   291  		}
   292  		time.Sleep(time.Millisecond * 100)
   293  	}
   294  
   295  	// Update the first resource of each type in the management server to a
   296  	// value which is expected to be NACK'ed by the xDS client.
   297  	const nackResourceIdx = 0
   298  	listeners[nackResourceIdx].ApiListener = &v3listenerpb.ApiListener{}
   299  	routes[nackResourceIdx].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}}
   300  	clusters[nackResourceIdx].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC}
   301  	endpoints[nackResourceIdx].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}}
   302  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
   303  		NodeID:         nodeID,
   304  		Listeners:      listeners,
   305  		Routes:         routes,
   306  		Clusters:       clusters,
   307  		Endpoints:      endpoints,
   308  		SkipValidation: true,
   309  	}); err != nil {
   310  		t.Fatal(err)
   311  	}
   312  
   313  	// Verify that the xDS client reports the first resource of each type as
   314  	// being in "NACKed" state, and the second resource of each type to be in
   315  	// "ACKed" state. The version for the ACKed resource would be "2", while
   316  	// that for the NACKed resource would be "1". In the NACKed resource, the
   317  	// version which is NACKed is stored in the ErrorState field.
   318  	want = nil
   319  	for i := range ldsTargets {
   320  		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[i])
   321  		if i == nackResourceIdx {
   322  			config.VersionInfo = "1"
   323  			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
   324  			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
   325  		}
   326  		want = append(want, config)
   327  	}
   328  	for i := range rdsTargets {
   329  		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[i])
   330  		if i == nackResourceIdx {
   331  			config.VersionInfo = "1"
   332  			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
   333  			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
   334  		}
   335  		want = append(want, config)
   336  	}
   337  	for i := range cdsTargets {
   338  		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[i])
   339  		if i == nackResourceIdx {
   340  			config.VersionInfo = "1"
   341  			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
   342  			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
   343  		}
   344  		want = append(want, config)
   345  	}
   346  	for i := range edsTargets {
   347  		config := makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[i], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[i])
   348  		if i == nackResourceIdx {
   349  			config.VersionInfo = "1"
   350  			config.ClientStatus = v3adminpb.ClientResourceStatus_NACKED
   351  			config.ErrorState = &v3adminpb.UpdateFailureState{VersionInfo: "2"}
   352  		}
   353  		want = append(want, config)
   354  	}
   355  	for {
   356  		if err := ctx.Err(); err != nil {
   357  			t.Fatalf("Timeout when waiting for resources in \"NACKed\" state: %v", err)
   358  		}
   359  		err := checkClientStatusResponse(stream, want)
   360  		if err == nil {
   361  			break
   362  		}
   363  		time.Sleep(time.Millisecond * 100)
   364  	}
   365  }
   366  
   367  func makeGenericXdsConfig(typeURL, name, version string, status v3adminpb.ClientResourceStatus, config *anypb.Any) *v3statuspb.ClientConfig_GenericXdsConfig {
   368  	return &v3statuspb.ClientConfig_GenericXdsConfig{
   369  		TypeUrl:      typeURL,
   370  		Name:         name,
   371  		VersionInfo:  version,
   372  		ClientStatus: status,
   373  		XdsConfig:    config,
   374  	}
   375  }
   376  
   377  func checkClientStatusResponse(stream v3statuspbgrpc.ClientStatusDiscoveryService_StreamClientStatusClient, want []*v3statuspb.ClientConfig_GenericXdsConfig) error {
   378  	if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
   379  		if err != io.EOF {
   380  			return fmt.Errorf("failed to send ClientStatusRequest: %v", err)
   381  		}
   382  		// If the stream has closed, we call Recv() until it returns a non-nil
   383  		// error to get the actual error on the stream.
   384  		for {
   385  			if _, err := stream.Recv(); err != nil {
   386  				return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
   387  			}
   388  		}
   389  	}
   390  	resp, err := stream.Recv()
   391  	if err != nil {
   392  		return fmt.Errorf("failed to recv ClientStatusResponse: %v", err)
   393  	}
   394  
   395  	if n := len(resp.Config); n != 1 {
   396  		return fmt.Errorf("got %d configs, want 1: %v", n, prototext.Format(resp))
   397  	}
   398  
   399  	if diff := cmp.Diff(resp.Config[0].GenericXdsConfigs, want, cmpOpts); diff != "" {
   400  		return fmt.Errorf(diff)
   401  	}
   402  	return nil
   403  }
   404  
   405  func (s) TestCSDSNoXDSClient(t *testing.T) {
   406  	// Create a bootstrap file in a temporary directory. Since we pass empty
   407  	// options, it would end up creating a bootstrap file with an empty
   408  	// serverURI which will fail xDS client creation.
   409  	bootstrapCleanup, err := bootstrap.CreateFile(bootstrap.Options{})
   410  	if err != nil {
   411  		t.Fatal(err)
   412  	}
   413  	t.Cleanup(func() { bootstrapCleanup() })
   414  
   415  	// Initialize an gRPC server and register CSDS on it.
   416  	server := grpc.NewServer()
   417  	csdss, err := csds.NewClientStatusDiscoveryServer()
   418  	if err != nil {
   419  		t.Fatal(err)
   420  	}
   421  	defer csdss.Close()
   422  	v3statuspbgrpc.RegisterClientStatusDiscoveryServiceServer(server, csdss)
   423  
   424  	// Create a local listener and pass it to Serve().
   425  	lis, err := testutils.LocalTCPListener()
   426  	if err != nil {
   427  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   428  	}
   429  	go func() {
   430  		if err := server.Serve(lis); err != nil {
   431  			t.Errorf("Serve() failed: %v", err)
   432  		}
   433  	}()
   434  	defer server.Stop()
   435  
   436  	// Create a client to the CSDS server.
   437  	conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   438  	if err != nil {
   439  		t.Fatalf("Failed to dial CSDS server %q: %v", lis.Addr().String(), err)
   440  	}
   441  	defer conn.Close()
   442  	c := v3statuspbgrpc.NewClientStatusDiscoveryServiceClient(conn)
   443  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   444  	defer cancel()
   445  	stream, err := c.StreamClientStatus(ctx, grpc.WaitForReady(true))
   446  	if err != nil {
   447  		t.Fatalf("Failed to create a stream for CSDS: %v", err)
   448  	}
   449  
   450  	if err := stream.Send(&v3statuspb.ClientStatusRequest{Node: nil}); err != nil {
   451  		t.Fatalf("Failed to send ClientStatusRequest: %v", err)
   452  	}
   453  	r, err := stream.Recv()
   454  	if err != nil {
   455  		// io.EOF is not ok.
   456  		t.Fatalf("Failed to recv ClientStatusResponse: %v", err)
   457  	}
   458  	if n := len(r.Config); n != 0 {
   459  		t.Fatalf("got %d configs, want 0: %v", n, prototext.Format(r))
   460  	}
   461  }
   462  

View as plain text