1 package entrypoint
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io/ioutil"
8 "net/http"
9 "os"
10 "reflect"
11 "sync"
12 "sync/atomic"
13 "testing"
14 "time"
15
16 "github.com/datawire/ambassador/v2/cmd/entrypoint/internal/testqueue"
17 "github.com/datawire/ambassador/v2/pkg/ambex"
18 v3bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v3"
19 amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
20 "github.com/datawire/ambassador/v2/pkg/consulwatch"
21 "github.com/datawire/ambassador/v2/pkg/kates"
22 "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
23 "github.com/datawire/dlib/dexec"
24 "github.com/datawire/dlib/dgroup"
25 "github.com/datawire/dlib/dlog"
26 )
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 type Fake struct {
55
56
57 config FakeConfig
58 T *testing.T
59 group *dgroup.Group
60 cancel context.CancelFunc
61
62 k8sSource *fakeK8sSource
63 watcher *fakeWatcher
64 istioCertSource *fakeIstioCertSource
65
66
67 k8sStore *K8sStore
68 consulStore *ConsulStore
69 k8sNotifier *Notifier
70 consulNotifier *Notifier
71
72
73 currentSnapshot *atomic.Value
74
75 fastpath *testqueue.Queue
76 snapshots *testqueue.Queue
77 envoyConfigs *testqueue.Queue
78
79
80 teardownOnce sync.Once
81
82 ambassadorMeta *snapshot.AmbassadorMetaInfo
83
84 DiagdBindPort string
85 }
86
87
88 type FakeConfig struct {
89 EnvoyConfig bool
90 DiagdDebug bool
91 Timeout time.Duration
92 }
93
94 func (fc *FakeConfig) fillDefaults() {
95 if fc.Timeout == 0 {
96 fc.Timeout = 10 * time.Second
97 }
98 }
99
100
101
102 func NewFake(t *testing.T, config FakeConfig) *Fake {
103 config.fillDefaults()
104 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
105 k8sStore := NewK8sStore()
106 consulStore := NewConsulStore()
107
108 fake := &Fake{
109 config: config,
110 T: t,
111 cancel: cancel,
112 group: dgroup.NewGroup(ctx, dgroup.GroupConfig{EnableWithSoftness: true}),
113
114 k8sStore: k8sStore,
115 consulStore: consulStore,
116 k8sNotifier: NewNotifier(),
117 consulNotifier: NewNotifier(),
118
119 currentSnapshot: &atomic.Value{},
120
121 fastpath: testqueue.NewQueue(t, config.Timeout),
122 snapshots: testqueue.NewQueue(t, config.Timeout),
123 envoyConfigs: testqueue.NewQueue(t, config.Timeout),
124 }
125
126 fake.k8sSource = &fakeK8sSource{fake: fake, store: k8sStore}
127 fake.watcher = &fakeWatcher{fake: fake, store: consulStore}
128 fake.istioCertSource = &fakeIstioCertSource{}
129
130 return fake
131 }
132
133
134
135 func RunFake(t *testing.T, config FakeConfig, ambMeta *snapshot.AmbassadorMetaInfo) *Fake {
136 fake := NewFake(t, config)
137 fake.SetAmbassadorMeta(ambMeta)
138 fake.Setup()
139 fake.T.Cleanup(fake.Teardown)
140 return fake
141 }
142
143
144
145
146 func (f *Fake) Setup() {
147 if f.config.EnvoyConfig {
148 _, err := dexec.LookPath("diagd")
149 if err != nil {
150 f.T.Fatal("unable to find diagd, cannot run")
151 }
152
153 f.group.Go("snapshot_server", func(ctx context.Context) error {
154 return snapshotServer(ctx, f.currentSnapshot)
155 })
156
157 f.DiagdBindPort = GetDiagdBindPort()
158
159 f.group.Go("diagd", func(ctx context.Context) error {
160 args := []string{
161 "diagd",
162 "/tmp",
163 "/tmp/bootstrap-ads.json",
164 "/tmp/envoy.json",
165 "--no-envoy",
166 "--host", "127.0.0.1",
167 "--port", f.DiagdBindPort,
168 }
169
170 if f.config.DiagdDebug {
171 args = append(args, "--debug")
172 }
173
174 cmd := dexec.CommandContext(ctx, args[0], args[1:]...)
175 if envbool("DEV_SHUTUP_DIAGD") {
176 devnull, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
177 cmd.Stdout = devnull
178 cmd.Stderr = devnull
179 }
180 err := cmd.Run()
181 if err != nil {
182 exErr, ok := err.(*dexec.ExitError)
183 if ok {
184 f.T.Logf("diagd exited with error: %+v", exErr)
185 return nil
186 }
187 }
188 return err
189 })
190 }
191 f.group.Go("fake-watcher", f.runWatcher)
192
193 }
194
195
196 func (f *Fake) GetFeatures(ctx context.Context, features interface{}) error {
197
198
199 if !f.config.EnvoyConfig {
200 return fmt.Errorf("Features are not available with EnvoyConfig false")
201 }
202
203
204
205
206
207
208 featuresURL := fmt.Sprintf("http://localhost:%s/_internal/v0/features", f.DiagdBindPort)
209
210 req, err := http.NewRequestWithContext(ctx, "GET", featuresURL, nil)
211
212 if err != nil {
213 return err
214 }
215
216
217
218 req.Header.Set("X-Ambassador-Diag-IP", "127.0.0.1")
219 req.Header.Set("content-type", "application/json")
220 resp, err := http.DefaultClient.Do(req)
221
222 if err != nil {
223 return err
224 }
225
226 defer resp.Body.Close()
227
228 if resp.StatusCode != 200 {
229 return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
230 }
231
232 body, err := ioutil.ReadAll(resp.Body)
233
234 if err != nil {
235 return err
236 }
237
238
239
240
241
242
243
244
245
246
247 return json.Unmarshal(body, features)
248 }
249
250
251
252
253 func (f *Fake) Teardown() {
254 f.teardownOnce.Do(func() {
255 f.cancel()
256 err := f.group.Wait()
257 if err != nil && err != context.Canceled {
258 f.T.Fatalf("fake edgestack errored out: %+v", err)
259 }
260 })
261 }
262
263 func (f *Fake) runWatcher(ctx context.Context) error {
264 interestingTypes := GetInterestingTypes(ctx, nil)
265 queries := GetQueries(ctx, interestingTypes)
266
267 return watchAllTheThingsInternal(
268 ctx,
269 f.currentSnapshot,
270 f.k8sSource,
271 queries,
272 f.watcher.Watch,
273 f.istioCertSource,
274 f.notifySnapshot,
275 f.notifyFastpath,
276 f.ambassadorMeta,
277 )
278 }
279
280 func (f *Fake) notifyFastpath(ctx context.Context, fastpath *ambex.FastpathSnapshot) {
281 f.fastpath.Add(f.T, fastpath)
282 }
283
284 func (f *Fake) GetEndpoints(predicate func(*ambex.Endpoints) bool) (*ambex.Endpoints, error) {
285 f.T.Helper()
286 untyped, err := f.fastpath.Get(f.T, func(obj interface{}) bool {
287 fastpath := obj.(*ambex.FastpathSnapshot)
288 return predicate(fastpath.Endpoints)
289 })
290 if err != nil {
291 return nil, err
292 }
293 return untyped.(*ambex.FastpathSnapshot).Endpoints, nil
294 }
295
296 func (f *Fake) AssertEndpointsEmpty(timeout time.Duration) {
297 f.T.Helper()
298 f.fastpath.AssertEmpty(f.T, timeout, "endpoints queue not empty")
299 }
300
301 type SnapshotEntry struct {
302 Disposition SnapshotDisposition
303 Snapshot *snapshot.Snapshot
304 }
305
306 func (entry SnapshotEntry) String() string {
307 snapshot := "nil"
308 if entry.Snapshot != nil {
309 snapshot = fmt.Sprintf("&%#v", *entry.Snapshot)
310 }
311 return fmt.Sprintf("{Disposition: %v, Snapshot: %s}", entry.Disposition, snapshot)
312 }
313
314
315 func (f *Fake) notifySnapshot(ctx context.Context, disp SnapshotDisposition, snapJSON []byte) error {
316 if disp == SnapshotReady && f.config.EnvoyConfig {
317 if err := notifyReconfigWebhooksFunc(ctx, &noopNotable{}, false); err != nil {
318 return err
319 }
320 f.appendEnvoyConfig(ctx)
321 }
322
323 var snap *snapshot.Snapshot
324 err := json.Unmarshal(snapJSON, &snap)
325 if err != nil {
326 f.T.Fatalf("error decoding snapshot: %+v", err)
327 }
328
329 f.snapshots.Add(f.T, SnapshotEntry{disp, snap})
330 return nil
331 }
332
333
334 func (f *Fake) GetSnapshotEntry(predicate func(SnapshotEntry) bool) (SnapshotEntry, error) {
335 f.T.Helper()
336 untyped, err := f.snapshots.Get(f.T, func(obj interface{}) bool {
337 entry := obj.(SnapshotEntry)
338 return predicate(entry)
339 })
340 if err != nil {
341 return SnapshotEntry{}, err
342 }
343 return untyped.(SnapshotEntry), nil
344 }
345
346
347 func (f *Fake) GetSnapshot(predicate func(*snapshot.Snapshot) bool) (*snapshot.Snapshot, error) {
348 f.T.Helper()
349 entry, err := f.GetSnapshotEntry(func(entry SnapshotEntry) bool {
350 return entry.Disposition == SnapshotReady && predicate(entry.Snapshot)
351 })
352 if err != nil {
353 return nil, err
354 }
355 return entry.Snapshot, nil
356 }
357
358 func (f *Fake) appendEnvoyConfig(ctx context.Context) {
359 msg, err := ambex.Decode(ctx, "/tmp/envoy.json")
360 if err != nil {
361 f.T.Fatalf("error decoding envoy.json after sending snapshot to python: %+v", err)
362 }
363 bs := msg.(*v3bootstrap.Bootstrap)
364 f.envoyConfigs.Add(f.T, bs)
365 }
366
367
368 func (f *Fake) GetEnvoyConfig(predicate func(*v3bootstrap.Bootstrap) bool) (*v3bootstrap.Bootstrap, error) {
369 f.T.Helper()
370 untyped, err := f.envoyConfigs.Get(f.T, func(obj interface{}) bool {
371 return predicate(obj.(*v3bootstrap.Bootstrap))
372 })
373 if err != nil {
374 return nil, err
375 }
376 return untyped.(*v3bootstrap.Bootstrap), nil
377 }
378
379
380 func (f *Fake) AutoFlush(enabled bool) {
381 f.k8sNotifier.AutoNotify(enabled)
382 f.consulNotifier.AutoNotify(enabled)
383 }
384
385
386 func (f *Fake) Flush() {
387 f.k8sNotifier.Notify()
388 f.consulNotifier.Notify()
389 }
390
391
392 func (f *Fake) SetAmbassadorMeta(ambMeta *snapshot.AmbassadorMetaInfo) {
393 f.ambassadorMeta = ambMeta
394 }
395
396
397
398 func (f *Fake) UpsertFile(filename string) error {
399 if err := f.k8sStore.UpsertFile(filename); err != nil {
400 return err
401 }
402 f.k8sNotifier.Changed()
403 return nil
404 }
405
406
407
408 func (f *Fake) UpsertYAML(yaml string) error {
409 if err := f.k8sStore.UpsertYAML(yaml); err != nil {
410 return err
411 }
412 f.k8sNotifier.Changed()
413 return nil
414 }
415
416
417 func (f *Fake) Upsert(resource kates.Object) error {
418 if err := f.k8sStore.Upsert(resource); err != nil {
419 return err
420 }
421 f.k8sNotifier.Changed()
422 return nil
423 }
424
425
426 func (f *Fake) Delete(kind, namespace, name string) error {
427 if err := f.k8sStore.Delete(kind, namespace, name); err != nil {
428 return err
429 }
430 f.k8sNotifier.Changed()
431 return nil
432 }
433
434
435 func (f *Fake) ConsulEndpoint(datacenter, service, address string, port int, tags ...string) {
436 f.consulStore.ConsulEndpoint(datacenter, service, address, port, tags...)
437 f.consulNotifier.Changed()
438 }
439
440
441 func (f *Fake) SendIstioCertUpdate(update IstioCertUpdate) {
442 f.istioCertSource.updateChannel <- update
443 }
444
445 type fakeK8sSource struct {
446 fake *Fake
447 store *K8sStore
448 }
449
450 func (fs *fakeK8sSource) Watch(ctx context.Context, queries ...kates.Query) (K8sWatcher, error) {
451 fw := &fakeK8sWatcher{fs.store.Cursor(), make(chan struct{}), queries}
452 fs.fake.k8sNotifier.Listen(func() {
453 go func() {
454 fw.notifyCh <- struct{}{}
455 }()
456 })
457 return fw, nil
458 }
459
460 type fakeK8sWatcher struct {
461 cursor *K8sStoreCursor
462 notifyCh chan struct{}
463 queries []kates.Query
464 }
465
466 func (f *fakeK8sWatcher) Changed() <-chan struct{} {
467 return f.notifyCh
468 }
469
470 func (f *fakeK8sWatcher) FilteredUpdate(_ context.Context, target interface{}, deltas *[]*kates.Delta, predicate func(*kates.Unstructured) bool) (bool, error) {
471 byname := map[string][]kates.Object{}
472 resources, newDeltas, err := f.cursor.Get()
473 if err != nil {
474 return false, err
475 }
476 for _, obj := range resources {
477 for _, q := range f.queries {
478 var un *kates.Unstructured
479 err := convert(obj, &un)
480 if err != nil {
481 return false, err
482 }
483 doesMatch, err := matches(q, obj)
484 if err != nil {
485 return false, err
486 }
487 if doesMatch && predicate(un) {
488 byname[q.Name] = append(byname[q.Name], obj)
489 }
490 }
491 }
492
493
494 targetVal := reflect.ValueOf(target)
495 targetType := targetVal.Type().Elem()
496 for _, q := range f.queries {
497 name := q.Name
498 v := byname[q.Name]
499 fieldEntry, ok := targetType.FieldByName(name)
500 if !ok {
501 return false, fmt.Errorf("no such field: %q", name)
502 }
503 val := reflect.New(fieldEntry.Type)
504 err := convert(v, val.Interface())
505 if err != nil {
506 return false, err
507 }
508 targetVal.Elem().FieldByName(name).Set(reflect.Indirect(val))
509 }
510
511 *deltas = newDeltas
512
513 return len(newDeltas) > 0, nil
514 }
515
516 func matches(query kates.Query, obj kates.Object) (bool, error) {
517 queryKind, err := canon(query.Kind)
518 if err != nil {
519 return false, err
520 }
521 objKind, err := canon(obj.GetObjectKind().GroupVersionKind().Kind)
522 if err != nil {
523 return false, err
524 }
525 return queryKind == objKind, nil
526 }
527
528 type fakeWatcher struct {
529 fake *Fake
530 store *ConsulStore
531 }
532
533 func (f *fakeWatcher) Watch(ctx context.Context, resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) (Stopper, error) {
534 var sent consulwatch.Endpoints
535 stop := f.fake.consulNotifier.Listen(func() {
536 ep, ok := f.store.Get(resolver.Spec.Datacenter, svc)
537 if ok && !reflect.DeepEqual(ep, sent) {
538 endpoints <- ep
539 sent = ep
540 }
541 })
542 return &fakeStopper{stop}, nil
543 }
544
545 type fakeStopper struct {
546 stop StopFunc
547 }
548
549 func (f *fakeStopper) Stop() {
550 f.stop()
551 }
552
553 type fakeIstioCertSource struct {
554 updateChannel chan IstioCertUpdate
555 }
556
557 func (src *fakeIstioCertSource) Watch(ctx context.Context) (IstioCertWatcher, error) {
558 src.updateChannel = make(chan IstioCertUpdate)
559
560 return &istioCertWatcher{
561 updateChannel: src.updateChannel,
562 }, nil
563 }
564
View as plain text