...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/integration/ttl_integration_test.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/integration

     1  package integration
     2  
     3  import (
     4  	"context"
     5  	"net"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/stretchr/testify/assert"
    10  	"google.golang.org/grpc"
    11  	"google.golang.org/grpc/credentials/insecure"
    12  	"google.golang.org/protobuf/proto"
    13  	"google.golang.org/protobuf/types/known/anypb"
    14  
    15  	envoy_config_core_v3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
    16  	envoy_config_endpoint_v3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/endpoint/v3"
    17  	envoy_service_discovery_v3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3"
    18  	endpointservice "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/endpoint/v3"
    19  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
    20  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
    21  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
    22  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
    23  )
    24  
    25  type logger struct {
    26  	t *testing.T
    27  }
    28  
    29  func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
    30  func (log logger) Infof(format string, args ...interface{})  { log.t.Logf(format, args...) }
    31  func (log logger) Warnf(format string, args ...interface{})  { log.t.Logf(format, args...) }
    32  func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
    33  
    34  func TestTTLResponse(t *testing.T) {
    35  
    36  	ctx, cancel := context.WithCancel(context.Background())
    37  	defer cancel()
    38  
    39  	snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second)
    40  
    41  	server := server.NewServer(ctx, snapshotCache, nil)
    42  
    43  	grpcServer := grpc.NewServer()
    44  	endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
    45  
    46  	l, err := net.Listen("tcp", ":9999") // nolint:gosec
    47  	assert.NoError(t, err)
    48  
    49  	go func() {
    50  		assert.NoError(t, grpcServer.Serve(l))
    51  	}()
    52  	defer grpcServer.Stop()
    53  
    54  	conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials()))
    55  	assert.NoError(t, err)
    56  	client := endpointservice.NewEndpointDiscoveryServiceClient(conn)
    57  
    58  	sclient, err := client.StreamEndpoints(ctx)
    59  	assert.NoError(t, err)
    60  
    61  	err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
    62  		Node: &envoy_config_core_v3.Node{
    63  			Id: "test",
    64  		},
    65  		ResourceNames: []string{"resource"},
    66  		TypeUrl:       resource.EndpointType,
    67  	})
    68  	assert.NoError(t, err)
    69  
    70  	oneSecond := time.Second
    71  	cla := &envoy_config_endpoint_v3.ClusterLoadAssignment{ClusterName: "resource"}
    72  	snap, _ := cache.NewSnapshotWithTTLs("1", map[resource.Type][]types.ResourceWithTTL{
    73  		resource.EndpointType: {{
    74  			Resource: cla,
    75  			TTL:      &oneSecond,
    76  		}},
    77  	})
    78  	err = snapshotCache.SetSnapshot(context.Background(), "test", snap)
    79  	assert.NoError(t, err)
    80  
    81  	timeout := time.NewTimer(5 * time.Second)
    82  
    83  	awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse {
    84  		t.Helper()
    85  		doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse)
    86  		go func() {
    87  
    88  			r, err := sclient.Recv()
    89  			assert.NoError(t, err)
    90  
    91  			doneCh <- r
    92  		}()
    93  
    94  		select {
    95  		case <-timeout.C:
    96  			assert.Fail(t, "timed out")
    97  			return nil
    98  		case r := <-doneCh:
    99  			return r
   100  		}
   101  	}
   102  
   103  	response := awaitResponse()
   104  	isFullResponseWithTTL(t, response)
   105  
   106  	err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
   107  		Node: &envoy_config_core_v3.Node{
   108  			Id: "test",
   109  		},
   110  		ResourceNames: []string{"resource"},
   111  		TypeUrl:       resource.EndpointType,
   112  		VersionInfo:   "1",
   113  		ResponseNonce: response.Nonce,
   114  	})
   115  	assert.NoError(t, err)
   116  
   117  	response = awaitResponse()
   118  	isHeartbeatResponseWithTTL(t, response)
   119  }
   120  
   121  func isFullResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
   122  	t.Helper()
   123  
   124  	assert.Len(t, response.Resources, 1)
   125  	r := response.Resources[0]
   126  	resource := &envoy_service_discovery_v3.Resource{}
   127  	err := anypb.UnmarshalTo(r, resource, proto.UnmarshalOptions{})
   128  	assert.NoError(t, err)
   129  
   130  	assert.NotNil(t, resource.Ttl)
   131  	assert.NotNil(t, resource.Resource)
   132  }
   133  
   134  func isHeartbeatResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
   135  	t.Helper()
   136  
   137  	assert.Len(t, response.Resources, 1)
   138  	r := response.Resources[0]
   139  	resource := &envoy_service_discovery_v3.Resource{}
   140  	err := anypb.UnmarshalTo(r, resource, proto.UnmarshalOptions{})
   141  	assert.NoError(t, err)
   142  
   143  	assert.NotNil(t, resource.Ttl)
   144  	assert.Nil(t, resource.Resource)
   145  }
   146  

View as plain text