1 package envoytest
2
3 import (
4
5 "context"
6 "fmt"
7 "net"
8 "sync"
9 "testing"
10
11
12 "google.golang.org/genproto/googleapis/rpc/status"
13 "google.golang.org/grpc"
14
15
16 apiv2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
17 apiv2_core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
18 apiv2_discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v2"
19
20
21 ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
22 ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
23 ecp_v2_server "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v2"
24
25
26 "github.com/datawire/dlib/dhttp"
27 "github.com/datawire/dlib/dlog"
28 )
29
30
31
32
33 type EnvoyController struct {
34 address string
35
36 configCache ecp_v2_cache.SnapshotCache
37
38
39 cond *sync.Cond
40 errors map[string]*errorInfo
41 outstanding map[string]ackInfo
42
43
44 logCtx context.Context
45 }
46
47
48
49 type ackInfo struct {
50 version string
51 typeUrl string
52 }
53
54
55 type errorInfo struct {
56 version string
57 details map[string]*status.Status
58 }
59
60 func (e *errorInfo) String() string {
61 return fmt.Sprintf("%s %v", e.version, e.details)
62 }
63
64
65 func NewEnvoyController(address string) *EnvoyController {
66 result := &EnvoyController{
67 address: address,
68 cond: sync.NewCond(&sync.Mutex{}),
69 errors: map[string]*errorInfo{},
70 outstanding: map[string]ackInfo{},
71 }
72 result.configCache = ecp_v2_cache.NewSnapshotCache(true, result, result)
73 return result
74 }
75
76
77
78 func (e *EnvoyController) Configure(node, version string, snapshot ecp_v2_cache.Snapshot) (*status.Status, error) {
79 err := e.configCache.SetSnapshot(node, snapshot)
80 if err != nil {
81 return nil, err
82 }
83
84
85
86
87 typeUrls := []string{}
88 if len(snapshot.Resources[ecp_cache_types.Endpoint].Items) > 0 {
89 typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
90 }
91 if len(snapshot.Resources[ecp_cache_types.Cluster].Items) > 0 {
92 typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.Cluster")
93 }
94 if len(snapshot.Resources[ecp_cache_types.Route].Items) > 0 {
95 typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.RouteConfiguration")
96 }
97 if len(snapshot.Resources[ecp_cache_types.Listener].Items) > 0 {
98 typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.Listener")
99 }
100
101 for _, t := range typeUrls {
102 status := e.waitFor(version, t)
103 if status != nil {
104 return status, nil
105 }
106 }
107
108 return nil, nil
109 }
110
111
112
113 func (e *EnvoyController) waitFor(version string, typeUrl string) *status.Status {
114 e.cond.L.Lock()
115 defer e.cond.L.Unlock()
116 for {
117 error, ok := e.errors[version]
118 if ok {
119 for k, v := range error.details {
120 if v != nil {
121 return v
122 }
123 if k == typeUrl {
124 return v
125 }
126 }
127 }
128 e.cond.Wait()
129 }
130 }
131
132
133 func (e *EnvoyController) Run(ctx context.Context) error {
134
135 e.logCtx = ctx
136
137 grpcServer := grpc.NewServer()
138 srv := ecp_v2_server.NewServer(ctx, e.configCache, e)
139
140 apiv2_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv)
141 apiv2.RegisterEndpointDiscoveryServiceServer(grpcServer, srv)
142 apiv2.RegisterClusterDiscoveryServiceServer(grpcServer, srv)
143 apiv2.RegisterRouteDiscoveryServiceServer(grpcServer, srv)
144 apiv2.RegisterListenerDiscoveryServiceServer(grpcServer, srv)
145
146 lis, err := net.Listen("tcp", e.address)
147 if err != nil {
148 return err
149 }
150
151 sc := &dhttp.ServerConfig{
152 Handler: grpcServer,
153 }
154 if err := sc.Serve(ctx, lis); err != nil {
155 if err != nil && err != context.Canceled {
156 return err
157 }
158 }
159 return nil
160 }
161
162
163
164 func SetupEnvoyController(t *testing.T, address string) *EnvoyController {
165 e := NewEnvoyController(address)
166 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
167 done := make(chan struct{})
168 t.Cleanup(func() {
169 cancel()
170 <-done
171 })
172 go func() {
173 err := e.Run(ctx)
174 if err != nil {
175 t.Errorf("envoy controller exited with error: %+v", err)
176 }
177 close(done)
178 }()
179 return e
180 }
181
182
183 func (e EnvoyController) ID(node *apiv2_core.Node) string {
184 if node == nil {
185 return "unknown"
186 }
187 return node.Id
188 }
189
190
191 func (e *EnvoyController) OnStreamOpen(_ context.Context, sid int64, stype string) error {
192
193 return nil
194 }
195
196
197 func (e *EnvoyController) OnStreamClosed(sid int64) {
198
199 }
200
201
202 func (e *EnvoyController) OnStreamRequest(sid int64, req *apiv2.DiscoveryRequest) error {
203
204
205 func() {
206 e.cond.L.Lock()
207 defer e.cond.L.Unlock()
208 ackInfo, ok := e.outstanding[req.ResponseNonce]
209 if ok {
210 errors, ok := e.errors[ackInfo.version]
211 if !ok {
212 errors = &errorInfo{version: ackInfo.version, details: map[string]*status.Status{}}
213 e.errors[ackInfo.version] = errors
214 }
215 errors.details[ackInfo.typeUrl] = req.ErrorDetail
216 delete(e.outstanding, req.ResponseNonce)
217 }
218 e.cond.Broadcast()
219 }()
220
221 return nil
222 }
223
224
225 func (e *EnvoyController) OnStreamResponse(sid int64, req *apiv2.DiscoveryRequest, res *apiv2.DiscoveryResponse) {
226
227 func() {
228 e.cond.L.Lock()
229 defer e.cond.L.Unlock()
230 e.outstanding[res.Nonce] = ackInfo{res.VersionInfo, res.TypeUrl}
231 }()
232 }
233
234
235 func (e *EnvoyController) OnFetchRequest(_ context.Context, r *apiv2.DiscoveryRequest) error {
236
237 return nil
238 }
239
240
241 func (e *EnvoyController) OnFetchResponse(req *apiv2.DiscoveryRequest, res *apiv2.DiscoveryResponse) {
242
243 }
244
245
246
247 func (e *EnvoyController) Debugf(format string, args ...interface{}) {
248 dlog.Debugf(e.logCtx, format, args...)
249 }
250 func (e *EnvoyController) Infof(format string, args ...interface{}) {
251 dlog.Infof(e.logCtx, format, args...)
252 }
253 func (e *EnvoyController) Warnf(format string, args ...interface{}) {
254 dlog.Warnf(e.logCtx, format, args...)
255 }
256 func (e *EnvoyController) Errorf(format string, args ...interface{}) {
257 dlog.Errorf(e.logCtx, format, args...)
258 }
259
View as plain text