1 package integration
2
3 import (
4 "context"
5 "net"
6 "testing"
7 "time"
8
9 "github.com/stretchr/testify/assert"
10 "google.golang.org/grpc"
11 "google.golang.org/grpc/credentials/insecure"
12 "google.golang.org/protobuf/proto"
13 "google.golang.org/protobuf/types/known/anypb"
14
15 envoy_config_core_v3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
16 envoy_config_endpoint_v3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/endpoint/v3"
17 envoy_service_discovery_v3 "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3"
18 endpointservice "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/endpoint/v3"
19 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
20 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
21 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
22 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
23 )
24
25 type logger struct {
26 t *testing.T
27 }
28
29 func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) }
30 func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) }
31 func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) }
32 func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
33
34 func TestTTLResponse(t *testing.T) {
35
36 ctx, cancel := context.WithCancel(context.Background())
37 defer cancel()
38
39 snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second)
40
41 server := server.NewServer(ctx, snapshotCache, nil)
42
43 grpcServer := grpc.NewServer()
44 endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
45
46 l, err := net.Listen("tcp", ":9999")
47 assert.NoError(t, err)
48
49 go func() {
50 assert.NoError(t, grpcServer.Serve(l))
51 }()
52 defer grpcServer.Stop()
53
54 conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials()))
55 assert.NoError(t, err)
56 client := endpointservice.NewEndpointDiscoveryServiceClient(conn)
57
58 sclient, err := client.StreamEndpoints(ctx)
59 assert.NoError(t, err)
60
61 err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
62 Node: &envoy_config_core_v3.Node{
63 Id: "test",
64 },
65 ResourceNames: []string{"resource"},
66 TypeUrl: resource.EndpointType,
67 })
68 assert.NoError(t, err)
69
70 oneSecond := time.Second
71 cla := &envoy_config_endpoint_v3.ClusterLoadAssignment{ClusterName: "resource"}
72 snap, _ := cache.NewSnapshotWithTTLs("1", map[resource.Type][]types.ResourceWithTTL{
73 resource.EndpointType: {{
74 Resource: cla,
75 TTL: &oneSecond,
76 }},
77 })
78 err = snapshotCache.SetSnapshot(context.Background(), "test", snap)
79 assert.NoError(t, err)
80
81 timeout := time.NewTimer(5 * time.Second)
82
83 awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse {
84 t.Helper()
85 doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse)
86 go func() {
87
88 r, err := sclient.Recv()
89 assert.NoError(t, err)
90
91 doneCh <- r
92 }()
93
94 select {
95 case <-timeout.C:
96 assert.Fail(t, "timed out")
97 return nil
98 case r := <-doneCh:
99 return r
100 }
101 }
102
103 response := awaitResponse()
104 isFullResponseWithTTL(t, response)
105
106 err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{
107 Node: &envoy_config_core_v3.Node{
108 Id: "test",
109 },
110 ResourceNames: []string{"resource"},
111 TypeUrl: resource.EndpointType,
112 VersionInfo: "1",
113 ResponseNonce: response.Nonce,
114 })
115 assert.NoError(t, err)
116
117 response = awaitResponse()
118 isHeartbeatResponseWithTTL(t, response)
119 }
120
121 func isFullResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
122 t.Helper()
123
124 assert.Len(t, response.Resources, 1)
125 r := response.Resources[0]
126 resource := &envoy_service_discovery_v3.Resource{}
127 err := anypb.UnmarshalTo(r, resource, proto.UnmarshalOptions{})
128 assert.NoError(t, err)
129
130 assert.NotNil(t, resource.Ttl)
131 assert.NotNil(t, resource.Resource)
132 }
133
134 func isHeartbeatResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) {
135 t.Helper()
136
137 assert.Len(t, response.Resources, 1)
138 r := response.Resources[0]
139 resource := &envoy_service_discovery_v3.Resource{}
140 err := anypb.UnmarshalTo(r, resource, proto.UnmarshalOptions{})
141 assert.NoError(t, err)
142
143 assert.NotNil(t, resource.Ttl)
144 assert.Nil(t, resource.Resource)
145 }
146
View as plain text