...

Source file src/google.golang.org/grpc/xds/internal/test/e2e/e2e_test.go

Documentation: google.golang.org/grpc/xds/internal/test/e2e

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package e2e
    19  
    20  import (
    21  	"bytes"
    22  	"context"
    23  	"flag"
    24  	"fmt"
    25  	"os"
    26  	"strconv"
    27  	"testing"
    28  	"time"
    29  
    30  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    31  
    32  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    33  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    34  	channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
    35  	testpb "google.golang.org/grpc/interop/grpc_testing"
    36  )
    37  
    38  var (
    39  	clientPath = flag.String("client", "./binaries/client", "The interop client")
    40  	serverPath = flag.String("server", "./binaries/server", "The interop server")
    41  )
    42  
    43  type testOpts struct {
    44  	testName     string
    45  	backendCount int
    46  	clientFlags  []string
    47  }
    48  
    49  func setup(t *testing.T, opts testOpts) (*controlPlane, *client, []*server) {
    50  	t.Helper()
    51  	if _, err := os.Stat(*clientPath); os.IsNotExist(err) {
    52  		t.Skip("skipped because client is not found")
    53  	}
    54  	if _, err := os.Stat(*serverPath); os.IsNotExist(err) {
    55  		t.Skip("skipped because server is not found")
    56  	}
    57  	backendCount := 1
    58  	if opts.backendCount != 0 {
    59  		backendCount = opts.backendCount
    60  	}
    61  
    62  	cp, err := newControlPlane()
    63  	if err != nil {
    64  		t.Fatalf("failed to start control-plane: %v", err)
    65  	}
    66  	t.Cleanup(cp.stop)
    67  
    68  	var clientLog bytes.Buffer
    69  	c, err := newClient(fmt.Sprintf("xds:///%s", opts.testName), *clientPath, cp.bootstrapContent, &clientLog, opts.clientFlags...)
    70  	if err != nil {
    71  		t.Fatalf("failed to start client: %v", err)
    72  	}
    73  	t.Cleanup(c.stop)
    74  
    75  	var serverLog bytes.Buffer
    76  	servers, err := newServers(opts.testName, *serverPath, cp.bootstrapContent, &serverLog, backendCount)
    77  	if err != nil {
    78  		t.Fatalf("failed to start server: %v", err)
    79  	}
    80  	t.Cleanup(func() {
    81  		for _, s := range servers {
    82  			s.stop()
    83  		}
    84  	})
    85  	t.Cleanup(func() {
    86  		// TODO: find a better way to print the log. They are long, and hide the failure.
    87  		t.Logf("\n----- client logs -----\n%v", clientLog.String())
    88  		t.Logf("\n----- server logs -----\n%v", serverLog.String())
    89  	})
    90  	return cp, c, servers
    91  }
    92  
    93  func TestPingPong(t *testing.T) {
    94  	const testName = "pingpong"
    95  	cp, c, _ := setup(t, testOpts{testName: testName})
    96  
    97  	resources := e2e.DefaultClientResources(e2e.ResourceParams{
    98  		DialTarget: testName,
    99  		NodeID:     cp.nodeID,
   100  		Host:       "localhost",
   101  		Port:       serverPort,
   102  		SecLevel:   e2e.SecurityLevelNone,
   103  	})
   104  
   105  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   106  	defer cancel()
   107  	if err := cp.server.Update(ctx, resources); err != nil {
   108  		t.Fatalf("failed to update control plane resources: %v", err)
   109  	}
   110  
   111  	st, err := c.clientStats(ctx)
   112  	if err != nil {
   113  		t.Fatalf("failed to get client stats: %v", err)
   114  	}
   115  	if st.NumFailures != 0 {
   116  		t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
   117  	}
   118  }
   119  
   120  // TestAffinity covers the affinity tests with ringhash policy.
   121  // - client is configured to use ringhash, with 3 backends
   122  // - all RPCs will hash a specific metadata header
   123  // - verify that
   124  //   - all RPCs with the same metadata value are sent to the same backend
   125  //   - only one backend is Ready
   126  //
   127  // - send more RPCs with different metadata values until a new backend is picked, and verify that
   128  //   - only two backends are in Ready
   129  func TestAffinity(t *testing.T) {
   130  	const (
   131  		testName     = "affinity"
   132  		backendCount = 3
   133  		testMDKey    = "xds_md"
   134  		testMDValue  = "unary_yranu"
   135  	)
   136  	cp, c, servers := setup(t, testOpts{
   137  		testName:     testName,
   138  		backendCount: backendCount,
   139  		clientFlags:  []string{"--rpc=EmptyCall", fmt.Sprintf("--metadata=EmptyCall:%s:%s", testMDKey, testMDValue)},
   140  	})
   141  
   142  	resources := e2e.DefaultClientResources(e2e.ResourceParams{
   143  		DialTarget: testName,
   144  		NodeID:     cp.nodeID,
   145  		Host:       "localhost",
   146  		Port:       serverPort,
   147  		SecLevel:   e2e.SecurityLevelNone,
   148  	})
   149  
   150  	// Update EDS to multiple backends.
   151  	var ports []uint32
   152  	for _, s := range servers {
   153  		ports = append(ports, uint32(s.port))
   154  	}
   155  	edsMsg := resources.Endpoints[0]
   156  	resources.Endpoints[0] = e2e.DefaultEndpoint(
   157  		edsMsg.ClusterName,
   158  		"localhost",
   159  		ports,
   160  	)
   161  
   162  	// Update CDS lbpolicy to ringhash.
   163  	cdsMsg := resources.Clusters[0]
   164  	cdsMsg.LbPolicy = v3clusterpb.Cluster_RING_HASH
   165  
   166  	// Update RDS to hash the header.
   167  	rdsMsg := resources.Routes[0]
   168  	rdsMsg.VirtualHosts[0].Routes[0].Action = &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
   169  		ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: cdsMsg.Name},
   170  		HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
   171  			PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
   172  				Header: &v3routepb.RouteAction_HashPolicy_Header{
   173  					HeaderName: testMDKey,
   174  				},
   175  			},
   176  		}},
   177  	}}
   178  
   179  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   180  	defer cancel()
   181  	if err := cp.server.Update(ctx, resources); err != nil {
   182  		t.Fatalf("failed to update control plane resources: %v", err)
   183  	}
   184  
   185  	// Note: We can skip CSDS check because there's no long delay as in TD.
   186  	//
   187  	// The client stats check doesn't race with the xds resource update because
   188  	// there's only one version of xds resource, updated at the beginning of the
   189  	// test. So there's no need to retry the stats call.
   190  	//
   191  	// In the future, we may add tests that update xds in the middle. Then we
   192  	// either need to retry clientStats(), or make a CSDS check before so the
   193  	// result is stable.
   194  
   195  	st, err := c.clientStats(ctx)
   196  	if err != nil {
   197  		t.Fatalf("failed to get client stats: %v", err)
   198  	}
   199  	if st.NumFailures != 0 {
   200  		t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
   201  	}
   202  	if len(st.RpcsByPeer) != 1 {
   203  		t.Fatalf("more than 1 backends got traffic: %v, want 1", st.RpcsByPeer)
   204  	}
   205  
   206  	// Call channelz to verify that only one subchannel is in state Ready.
   207  	scs, err := c.channelzSubChannels(ctx)
   208  	if err != nil {
   209  		t.Fatalf("failed to fetch channelz: %v", err)
   210  	}
   211  	verifySubConnStates(t, scs, map[channelzpb.ChannelConnectivityState_State]int{
   212  		channelzpb.ChannelConnectivityState_READY: 1,
   213  		channelzpb.ChannelConnectivityState_IDLE:  2,
   214  	})
   215  
   216  	// Send Unary call with different metadata value with integers starting from
   217  	// 0. Stop when a second peer is picked.
   218  	var (
   219  		diffPeerPicked bool
   220  		mdValue        int
   221  	)
   222  	for !diffPeerPicked {
   223  		if err := c.configRPCs(ctx, &testpb.ClientConfigureRequest{
   224  			Types: []testpb.ClientConfigureRequest_RpcType{
   225  				testpb.ClientConfigureRequest_EMPTY_CALL,
   226  				testpb.ClientConfigureRequest_UNARY_CALL,
   227  			},
   228  			Metadata: []*testpb.ClientConfigureRequest_Metadata{
   229  				{Type: testpb.ClientConfigureRequest_EMPTY_CALL, Key: testMDKey, Value: testMDValue},
   230  				{Type: testpb.ClientConfigureRequest_UNARY_CALL, Key: testMDKey, Value: strconv.Itoa(mdValue)},
   231  			},
   232  		}); err != nil {
   233  			t.Fatalf("failed to configure RPC: %v", err)
   234  		}
   235  
   236  		st, err := c.clientStats(ctx)
   237  		if err != nil {
   238  			t.Fatalf("failed to get client stats: %v", err)
   239  		}
   240  		if st.NumFailures != 0 {
   241  			t.Fatalf("Got %v failures: %+v", st.NumFailures, st)
   242  		}
   243  		if len(st.RpcsByPeer) == 2 {
   244  			break
   245  		}
   246  
   247  		mdValue++
   248  	}
   249  
   250  	// Call channelz to verify that only one subchannel is in state Ready.
   251  	scs2, err := c.channelzSubChannels(ctx)
   252  	if err != nil {
   253  		t.Fatalf("failed to fetch channelz: %v", err)
   254  	}
   255  	verifySubConnStates(t, scs2, map[channelzpb.ChannelConnectivityState_State]int{
   256  		channelzpb.ChannelConnectivityState_READY: 2,
   257  		channelzpb.ChannelConnectivityState_IDLE:  1,
   258  	})
   259  }
   260  

View as plain text