...

Source file src/github.com/datawire/ambassador/v2/pkg/envoy-control-plane/integration/ttl_integration_test.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoy-control-plane/integration

     1  package integration
     2  
     3  import (
     4  	"context"
     5  	"net"
     6  	"testing"
     7  	"time"
     8  
     9  	envoy_config_core_v3 "github.com/datawire/ambassador/v2/pkg/api/envoy/config/core/v3"
    10  	envoy_config_endpoint_v3 "github.com/datawire/ambassador/v2/pkg/api/envoy/config/endpoint/v3"
    11  	envoy_service_discovery_v3 "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
    12  	endpointservice "github.com/datawire/ambassador/v2/pkg/api/envoy/service/endpoint/v3"
    13  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    14  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3"
    15  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/resource/v3"
    16  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v3"
    17  	"github.com/golang/protobuf/ptypes"
    18  	"github.com/stretchr/testify/assert"
    19  	"google.golang.org/grpc"
    20  )
    21  
    22  type logger struct {
    23  	t *testing.T
    24  }
    25  
    26  func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
    27  func (log logger) Infof(format string, args ...interface{})  { log.t.Logf(format, args...) }
    28  func (log logger) Warnf(format string, args ...interface{})  { log.t.Logf(format, args...) }
    29  func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
    30  
    31  func TestTtlResponse(t *testing.T) {
    32  
    33  	ctx, cancel := context.WithCancel(context.Background())
    34  	defer cancel()
    35  
    36  	snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second)
    37  
    38  	server := server.NewServer(ctx, snapshotCache, nil)
    39  
    40  	grpcServer := grpc.NewServer()
    41  	endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
    42  
    43  	l, err := net.Listen("tcp", ":9999")
    44  	assert.NoError(t, err)
    45  
    46  	go func() {
    47  		grpcServer.Serve(l)
    48  	}()
    49  	defer grpcServer.Stop()
    50  
    51  	conn, err := grpc.Dial(":9999", grpc.WithInsecure())
    52  	assert.NoError(t, err)
    53  	client := endpointservice.NewEndpointDiscoveryServiceClient(conn)
    54  
    55  	sclient, err := client.StreamEndpoints(ctx)
    56  	assert.NoError(t, err)
    57  
    58  	err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
    59  		Node: &envoy_config_core_v3.Node{
    60  			Id: "test",
    61  		},
    62  		ResourceNames: []string{"resource"},
    63  		TypeUrl:       resource.EndpointType,
    64  	})
    65  	assert.NoError(t, err)
    66  
    67  	oneSecond := time.Second
    68  	cla := &envoy_config_endpoint_v3.ClusterLoadAssignment{ClusterName: "resource"}
    69  	err = snapshotCache.SetSnapshot("test", cache.NewSnapshotWithTtls("1", []types.ResourceWithTtl{{
    70  		Resource: cla,
    71  		Ttl:      &oneSecond,
    72  	}}, nil, nil, nil, nil, nil))
    73  	assert.NoError(t, err)
    74  
    75  	timeout := time.NewTimer(5 * time.Second)
    76  
    77  	awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse {
    78  		t.Helper()
    79  		doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse)
    80  		go func() {
    81  
    82  			r, err := sclient.Recv()
    83  			assert.NoError(t, err)
    84  
    85  			doneCh <- r
    86  		}()
    87  
    88  		select {
    89  		case <-timeout.C:
    90  			assert.Fail(t, "timed out")
    91  			return nil
    92  		case r := <-doneCh:
    93  			return r
    94  		}
    95  	}
    96  
    97  	response := awaitResponse()
    98  	isFullResponseWithTTL(t, response)
    99  
   100  	err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
   101  		Node: &envoy_config_core_v3.Node{
   102  			Id: "test",
   103  		},
   104  		ResourceNames: []string{"resource"},
   105  		TypeUrl:       resource.EndpointType,
   106  		VersionInfo:   "1",
   107  		ResponseNonce: response.Nonce,
   108  	})
   109  	assert.NoError(t, err)
   110  
   111  	response = awaitResponse()
   112  	isHeartbeatResponseWithTTL(t, response)
   113  }
   114  
   115  func isFullResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
   116  	t.Helper()
   117  
   118  	assert.Len(t, response.Resources, 1)
   119  	r := response.Resources[0]
   120  	resource := &envoy_service_discovery_v3.Resource{}
   121  	err := ptypes.UnmarshalAny(r, resource)
   122  	assert.NoError(t, err)
   123  
   124  	assert.NotNil(t, resource.Ttl)
   125  	assert.NotNil(t, resource.Resource)
   126  }
   127  
   128  func isHeartbeatResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
   129  	t.Helper()
   130  
   131  	assert.Len(t, response.Resources, 1)
   132  	r := response.Resources[0]
   133  	resource := &envoy_service_discovery_v3.Resource{}
   134  	err := ptypes.UnmarshalAny(r, resource)
   135  	assert.NoError(t, err)
   136  
   137  	assert.NotNil(t, resource.Ttl)
   138  	assert.Nil(t, resource.Resource)
   139  }
   140  

View as plain text