...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/loadreport_test.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xdsclient
    20  
    21  import (
    22  	"context"
    23  	"testing"
    24  	"time"
    25  
    26  	"github.com/google/go-cmp/cmp"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/internal/testutils/xds/fakeserver"
    29  	"google.golang.org/grpc/internal/xds/bootstrap"
    30  	"google.golang.org/grpc/status"
    31  	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
    32  	"google.golang.org/protobuf/testing/protocmp"
    33  
    34  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    35  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    36  	v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
    37  	"google.golang.org/protobuf/types/known/durationpb"
    38  )
    39  
    40  const (
    41  	defaultClientWatchExpiryTimeout = 15 * time.Second
    42  )
    43  
    44  func (s) TestLRSClient(t *testing.T) {
    45  	fs1, sCleanup, err := fakeserver.StartServer(nil)
    46  	if err != nil {
    47  		t.Fatalf("failed to start fake xDS server: %v", err)
    48  	}
    49  	defer sCleanup()
    50  
    51  	serverCfg1 := xdstestutils.ServerConfigForAddress(t, fs1.Address)
    52  	xdsC, close, err := NewWithConfigForTesting(&bootstrap.Config{
    53  		XDSServer: serverCfg1,
    54  		NodeProto: &v3corepb.Node{},
    55  	}, defaultClientWatchExpiryTimeout, time.Duration(0))
    56  	if err != nil {
    57  		t.Fatalf("failed to create xds client: %v", err)
    58  	}
    59  	defer close()
    60  
    61  	// Report to the same address should not create new ClientConn.
    62  	store1, lrsCancel1 := xdsC.ReportLoad(serverCfg1)
    63  	defer lrsCancel1()
    64  
    65  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    66  	defer cancel()
    67  	if u, err := fs1.NewConnChan.Receive(ctx); err != nil {
    68  		t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
    69  	}
    70  
    71  	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
    72  	defer sCancel()
    73  	if u, err := fs1.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {
    74  		t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err)
    75  	}
    76  
    77  	fs2, sCleanup2, err := fakeserver.StartServer(nil)
    78  	if err != nil {
    79  		t.Fatalf("failed to start fake xDS server: %v", err)
    80  	}
    81  	defer sCleanup2()
    82  
    83  	// Report to a different address should create new ClientConn.
    84  	serverCgf2 := xdstestutils.ServerConfigForAddress(t, fs2.Address)
    85  	store2, lrsCancel2 := xdsC.ReportLoad(serverCgf2)
    86  	defer lrsCancel2()
    87  	if u, err := fs2.NewConnChan.Receive(ctx); err != nil {
    88  		t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
    89  	}
    90  
    91  	if store1 == store2 {
    92  		t.Fatalf("got same store for different servers, want different")
    93  	}
    94  
    95  	if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil {
    96  		t.Errorf("unexpected timeout: %v, %v, want NewConn", u, err)
    97  	}
    98  	store2.PerCluster("cluster", "eds").CallDropped("test")
    99  
   100  	// Send one resp to the client.
   101  	fs2.LRSResponseChan <- &fakeserver.Response{
   102  		Resp: &v3lrspb.LoadStatsResponse{
   103  			SendAllClusters:       true,
   104  			LoadReportingInterval: &durationpb.Duration{Nanos: 50000000},
   105  		},
   106  	}
   107  
   108  	// Server should receive a req with the loads.
   109  	u, err := fs2.LRSRequestChan.Receive(ctx)
   110  	if err != nil {
   111  		t.Fatalf("unexpected LRS request: %v, %v, want error canceled", u, err)
   112  	}
   113  	receivedLoad := u.(*fakeserver.Request).Req.(*v3lrspb.LoadStatsRequest).ClusterStats
   114  	if len(receivedLoad) <= 0 {
   115  		t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test")
   116  	}
   117  	receivedLoad[0].LoadReportInterval = nil
   118  	want := &v3endpointpb.ClusterStats{
   119  		ClusterName:          "cluster",
   120  		ClusterServiceName:   "eds",
   121  		TotalDroppedRequests: 1,
   122  		DroppedRequests:      []*v3endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}},
   123  	}
   124  	if d := cmp.Diff(want, receivedLoad[0], protocmp.Transform()); d != "" {
   125  		t.Fatalf("unexpected load received, want load for cluster, eds, dropped for test, diff (-want +got):\n%s", d)
   126  	}
   127  
   128  	// Cancel this load reporting stream, server should see error canceled.
   129  	lrsCancel2()
   130  
   131  	// Server should receive a stream canceled error.
   132  	if u, err := fs2.LRSRequestChan.Receive(ctx); err != nil || status.Code(u.(*fakeserver.Request).Err) != codes.Canceled {
   133  		t.Errorf("unexpected LRS request: %v, %v, want error canceled", u, err)
   134  	}
   135  }
   136  

View as plain text