1 package envoytest
2
3 import (
4
5 "context"
6 "fmt"
7 "sync"
8
9
10 "google.golang.org/genproto/googleapis/rpc/status"
11 "google.golang.org/grpc"
12
13
14 v3core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
15 v3cluster "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/cluster/v3"
16 v3discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3"
17 v3endpoint "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/endpoint/v3"
18 v3listener "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/listener/v3"
19 v3route "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/route/v3"
20
21
22 ecp_v3_cache "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
23 ecp_log "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/log"
24 ecp_v3_resource "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
25 ecp_v3_server "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
26
27
28 "github.com/datawire/dlib/dhttp"
29 "github.com/datawire/dlib/dlog"
30 )
31
32
33
34
35 type EnvoyController struct {
36 address string
37
38 configCache ecp_v3_cache.SnapshotCache
39
40 cond *sync.Cond
41 results map[string]*errorInfo
42 outstanding map[string]ackInfo
43
44
45 logCtx context.Context
46 }
47
48
49
50 type ackInfo struct {
51 version string
52 typeURL string
53 }
54
55
56
57 type errorInfo struct {
58 version string
59 details map[string]*status.Status
60 }
61
62 func (e *errorInfo) String() string {
63 return fmt.Sprintf("%s %v", e.version, e.details)
64 }
65
66
67 func NewEnvoyController(address string) *EnvoyController {
68 ret := &EnvoyController{
69 address: address,
70 cond: sync.NewCond(&sync.Mutex{}),
71 results: map[string]*errorInfo{},
72 outstanding: map[string]ackInfo{},
73 }
74
75 ret.configCache = ecp_v3_cache.NewSnapshotCache(
76 true,
77 ecNodeHash{},
78 ecLogger{ec: ret},
79 )
80 return ret
81 }
82
83
84
85 func (e *EnvoyController) Configure(ctx context.Context, node, version string, snapshot ecp_v3_cache.ResourceSnapshot) (*status.Status, error) {
86
87 err := e.configCache.SetSnapshot(ctx, node, snapshot)
88 if err != nil {
89 return nil, err
90 }
91
92
93
94
95 var typeURLs []string
96
97 if len(snapshot.GetResources(ecp_v3_resource.EndpointType)) > 0 {
98 typeURLs = append(typeURLs, ecp_v3_resource.EndpointType)
99 }
100 if len(snapshot.GetResources(ecp_v3_resource.ClusterType)) > 0 {
101 typeURLs = append(typeURLs, ecp_v3_resource.ClusterType)
102 }
103 if len(snapshot.GetResources(ecp_v3_resource.RouteType)) > 0 {
104 typeURLs = append(typeURLs, ecp_v3_resource.RouteType)
105 }
106 if len(snapshot.GetResources(ecp_v3_resource.ListenerType)) > 0 {
107 typeURLs = append(typeURLs, ecp_v3_resource.ListenerType)
108 }
109
110 for _, t := range typeURLs {
111 status, err := e.waitFor(ctx, version, t)
112 if err != nil {
113 return nil, err
114 }
115 if status != nil {
116 return status, nil
117 }
118 }
119
120 return nil, nil
121 }
122
123
124
125 func (e *EnvoyController) waitFor(ctx context.Context, version string, typeURL string) (*status.Status, error) {
126 ctx, cancel := context.WithCancel(ctx)
127 defer func() {
128 cancel()
129 }()
130 go func() {
131 <-ctx.Done()
132 e.cond.L.Lock()
133 defer e.cond.L.Unlock()
134 e.cond.Broadcast()
135 }()
136
137 var (
138 retStatus *status.Status
139 retErr error
140 )
141
142 condition := func() bool {
143
144 if err := ctx.Err(); err != nil {
145 retErr = err
146 return true
147 }
148
149 result, ok := e.results[version]
150 if !ok {
151 return false
152 }
153
154 if status, ok := result.details[typeURL]; ok {
155 retStatus = status
156 return true
157 }
158
159
160 for _, status := range result.details {
161 if status != nil {
162 retStatus = status
163 return true
164 }
165 }
166
167 return false
168 }
169
170 e.cond.L.Lock()
171 defer e.cond.L.Unlock()
172 for !condition() {
173 e.cond.Wait()
174 }
175 return retStatus, retErr
176 }
177
178
179 func (e *EnvoyController) Run(ctx context.Context) error {
180
181 e.logCtx = ctx
182
183 srv := ecp_v3_server.NewServer(ctx,
184 e.configCache,
185 ecCallbacks{ec: e},
186 )
187
188 grpcMux := grpc.NewServer()
189 v3discovery.RegisterAggregatedDiscoveryServiceServer(grpcMux, srv)
190 v3endpoint.RegisterEndpointDiscoveryServiceServer(grpcMux, srv)
191 v3cluster.RegisterClusterDiscoveryServiceServer(grpcMux, srv)
192 v3route.RegisterRouteDiscoveryServiceServer(grpcMux, srv)
193 v3listener.RegisterListenerDiscoveryServiceServer(grpcMux, srv)
194
195 sc := &dhttp.ServerConfig{
196 Handler: grpcMux,
197 }
198 return sc.ListenAndServe(ctx, e.address)
199 }
200
201
202
203 type ecNodeHash struct{}
204
205 var _ ecp_v3_cache.NodeHash = ecNodeHash{}
206
207
208 func (ecNodeHash) ID(node *v3core.Node) string {
209 if node == nil {
210 return "unknown"
211 }
212 return node.Id
213 }
214
215
216
217 type ecCallbacks struct {
218 ec *EnvoyController
219 }
220
221 var _ ecp_v3_server.Callbacks = ecCallbacks{}
222
223
224 func (ecc ecCallbacks) OnStreamOpen(_ context.Context, sid int64, stype string) error {
225
226 return nil
227 }
228
229
230 func (ecc ecCallbacks) OnStreamClosed(sid int64, node *v3core.Node) {
231
232 }
233
234
235 func (ecc ecCallbacks) OnStreamRequest(sid int64, req *v3discovery.DiscoveryRequest) error {
236
237
238 ecc.ec.cond.L.Lock()
239 defer ecc.ec.cond.L.Unlock()
240 defer ecc.ec.cond.Broadcast()
241
242 if ackInfo, ok := ecc.ec.outstanding[req.ResponseNonce]; ok {
243 results, ok := ecc.ec.results[ackInfo.version]
244 if !ok {
245 results = &errorInfo{version: ackInfo.version, details: map[string]*status.Status{}}
246 ecc.ec.results[ackInfo.version] = results
247 }
248 results.details[ackInfo.typeURL] = req.ErrorDetail
249 delete(ecc.ec.outstanding, req.ResponseNonce)
250 }
251
252 return nil
253 }
254
255
256 func (ecc ecCallbacks) OnStreamResponse(ctx context.Context, sid int64, req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
257
258
259 ecc.ec.cond.L.Lock()
260 defer ecc.ec.cond.L.Unlock()
261 defer ecc.ec.cond.Broadcast()
262
263 ecc.ec.outstanding[res.Nonce] = ackInfo{res.VersionInfo, res.TypeUrl}
264
265 }
266
267
268 func (ecc ecCallbacks) OnFetchRequest(_ context.Context, r *v3discovery.DiscoveryRequest) error {
269
270 return nil
271 }
272
273
274 func (ecc ecCallbacks) OnFetchResponse(req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
275
276 }
277
278
279 func (ecc ecCallbacks) OnDeltaStreamOpen(ctx context.Context, sid int64, stype string) error {
280 return nil
281 }
282
283
284 func (ecc ecCallbacks) OnDeltaStreamClosed(sid int64, node *v3core.Node) {
285 }
286
287
288 func (ecc ecCallbacks) OnStreamDeltaRequest(sid int64, req *v3discovery.DeltaDiscoveryRequest) error {
289 return nil
290 }
291
292
293 func (ecc ecCallbacks) OnStreamDeltaResponse(sid int64, req *v3discovery.DeltaDiscoveryRequest, res *v3discovery.DeltaDiscoveryResponse) {
294 }
295
296
297
298 type ecLogger struct {
299 ec *EnvoyController
300 }
301
302 var _ ecp_log.Logger = ecLogger{}
303
304
305 func (ecl ecLogger) Debugf(format string, args ...interface{}) {
306 dlog.Debugf(ecl.ec.logCtx, format, args...)
307 }
308
309
310 func (ecl ecLogger) Infof(format string, args ...interface{}) {
311 dlog.Infof(ecl.ec.logCtx, format, args...)
312 }
313
314
315 func (ecl ecLogger) Warnf(format string, args ...interface{}) {
316 dlog.Warnf(ecl.ec.logCtx, format, args...)
317 }
318
319
320 func (ecl ecLogger) Errorf(format string, args ...interface{}) {
321 dlog.Errorf(ecl.ec.logCtx, format, args...)
322 }
323
View as plain text