1 package integration
2
3 import (
4 "context"
5 "net"
6 "testing"
7 "time"
8
9 envoy_config_core_v3 "github.com/datawire/ambassador/v2/pkg/api/envoy/config/core/v3"
10 envoy_config_endpoint_v3 "github.com/datawire/ambassador/v2/pkg/api/envoy/config/endpoint/v3"
11 envoy_service_discovery_v3 "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
12 endpointservice "github.com/datawire/ambassador/v2/pkg/api/envoy/service/endpoint/v3"
13 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
14 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3"
15 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/resource/v3"
16 "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v3"
17 "github.com/golang/protobuf/ptypes"
18 "github.com/stretchr/testify/assert"
19 "google.golang.org/grpc"
20 )
21
22 type logger struct {
23 t *testing.T
24 }
25
26 func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
27 func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) }
28 func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
29 func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
30
31 func TestTtlResponse(t *testing.T) {
32
33 ctx, cancel := context.WithCancel(context.Background())
34 defer cancel()
35
36 snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second)
37
38 server := server.NewServer(ctx, snapshotCache, nil)
39
40 grpcServer := grpc.NewServer()
41 endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
42
43 l, err := net.Listen("tcp", ":9999")
44 assert.NoError(t, err)
45
46 go func() {
47 grpcServer.Serve(l)
48 }()
49 defer grpcServer.Stop()
50
51 conn, err := grpc.Dial(":9999", grpc.WithInsecure())
52 assert.NoError(t, err)
53 client := endpointservice.NewEndpointDiscoveryServiceClient(conn)
54
55 sclient, err := client.StreamEndpoints(ctx)
56 assert.NoError(t, err)
57
58 err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
59 Node: &envoy_config_core_v3.Node{
60 Id: "test",
61 },
62 ResourceNames: []string{"resource"},
63 TypeUrl: resource.EndpointType,
64 })
65 assert.NoError(t, err)
66
67 oneSecond := time.Second
68 cla := &envoy_config_endpoint_v3.ClusterLoadAssignment{ClusterName: "resource"}
69 err = snapshotCache.SetSnapshot("test", cache.NewSnapshotWithTtls("1", []types.ResourceWithTtl{{
70 Resource: cla,
71 Ttl: &oneSecond,
72 }}, nil, nil, nil, nil, nil))
73 assert.NoError(t, err)
74
75 timeout := time.NewTimer(5 * time.Second)
76
77 awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse {
78 t.Helper()
79 doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse)
80 go func() {
81
82 r, err := sclient.Recv()
83 assert.NoError(t, err)
84
85 doneCh <- r
86 }()
87
88 select {
89 case <-timeout.C:
90 assert.Fail(t, "timed out")
91 return nil
92 case r := <-doneCh:
93 return r
94 }
95 }
96
97 response := awaitResponse()
98 isFullResponseWithTTL(t, response)
99
100 err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
101 Node: &envoy_config_core_v3.Node{
102 Id: "test",
103 },
104 ResourceNames: []string{"resource"},
105 TypeUrl: resource.EndpointType,
106 VersionInfo: "1",
107 ResponseNonce: response.Nonce,
108 })
109 assert.NoError(t, err)
110
111 response = awaitResponse()
112 isHeartbeatResponseWithTTL(t, response)
113 }
114
115 func isFullResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
116 t.Helper()
117
118 assert.Len(t, response.Resources, 1)
119 r := response.Resources[0]
120 resource := &envoy_service_discovery_v3.Resource{}
121 err := ptypes.UnmarshalAny(r, resource)
122 assert.NoError(t, err)
123
124 assert.NotNil(t, resource.Ttl)
125 assert.NotNil(t, resource.Resource)
126 }
127
128 func isHeartbeatResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
129 t.Helper()
130
131 assert.Len(t, response.Resources, 1)
132 r := response.Resources[0]
133 resource := &envoy_service_discovery_v3.Resource{}
134 err := ptypes.UnmarshalAny(r, resource)
135 assert.NoError(t, err)
136
137 assert.NotNil(t, resource.Ttl)
138 assert.Nil(t, resource.Resource)
139 }
140
View as plain text