1 package entrypoint
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "os"
8 "strconv"
9 "sync"
10 "sync/atomic"
11 "time"
12
13 gw "sigs.k8s.io/gateway-api/apis/v1alpha1"
14
15 "github.com/datawire/dlib/dgroup"
16 "github.com/datawire/dlib/dlog"
17 "github.com/emissary-ingress/emissary/v3/pkg/acp"
18 "github.com/emissary-ingress/emissary/v3/pkg/ambex"
19 "github.com/emissary-ingress/emissary/v3/pkg/debug"
20 ecp_v3_cache "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
21 "github.com/emissary-ingress/emissary/v3/pkg/gateway"
22 "github.com/emissary-ingress/emissary/v3/pkg/kates"
23 "github.com/emissary-ingress/emissary/v3/pkg/snapshot/v1"
24 )
25
26 func WatchAllTheThings(
27 ctx context.Context,
28 ambwatch *acp.AmbassadorWatcher,
29 encoded *atomic.Value,
30 fastpathCh chan<- *ambex.FastpathSnapshot,
31 clusterID string,
32 version string,
33 ) error {
34 client, err := kates.NewClient(kates.ClientConfig{})
35 if err != nil {
36 return err
37 }
38 intv, err := strconv.Atoi(env("AMBASSADOR_RECONFIG_MAX_DELAY", "1"))
39 if err != nil {
40 return err
41 }
42 maxInterval := time.Duration(intv) * time.Second
43 err = client.MaxAccumulatorInterval(maxInterval)
44 if err != nil {
45 return err
46 }
47 dlog.Infof(ctx, "AMBASSADOR_RECONFIG_MAX_DELAY set to %d", intv)
48
49 serverTypeList, err := client.ServerResources()
50 if err != nil {
51
52
53 dlog.Infof(ctx, "Warning, unable to list api-resources: %v", err)
54 }
55
56 interestingTypes := GetInterestingTypes(ctx, serverTypeList)
57 queries := GetQueries(ctx, interestingTypes)
58
59 ambassadorMeta := getAmbassadorMeta(GetAmbassadorID(), clusterID, version, client)
60
61
62
63 notify := func(ctx context.Context, disposition SnapshotDisposition, _ []byte) error {
64 if disposition == SnapshotReady {
65 return notifyReconfigWebhooks(ctx, ambwatch)
66 }
67 return nil
68 }
69
70 fastpathUpdate := func(ctx context.Context, fastpathSnapshot *ambex.FastpathSnapshot) {
71 fastpathCh <- fastpathSnapshot
72 }
73
74 k8sSrc := newK8sSource(client)
75 consulSrc := watchConsul
76 istioCertSrc := newIstioCertSource()
77
78 return watchAllTheThingsInternal(
79 ctx,
80 encoded,
81 k8sSrc,
82 queries,
83 consulSrc,
84 istioCertSrc,
85 notify,
86 fastpathUpdate,
87 ambassadorMeta,
88 )
89 }
90
91 func getAmbassadorMeta(ambassadorID string, clusterID string, version string, client *kates.Client) *snapshot.AmbassadorMetaInfo {
92 ambMeta := &snapshot.AmbassadorMetaInfo{
93 ClusterID: clusterID,
94 AmbassadorID: ambassadorID,
95 AmbassadorVersion: version,
96 }
97 kubeServerVer, err := client.ServerVersion()
98 if err == nil {
99 ambMeta.KubeVersion = kubeServerVer.GitVersion
100 }
101 return ambMeta
102 }
103
104 type SnapshotProcessor func(context.Context, SnapshotDisposition, []byte) error
105
106 type SnapshotDisposition int
107
108 const (
109
110 SnapshotIncomplete SnapshotDisposition = iota
111
112
113 SnapshotDefer
114
115
116 SnapshotDrop
117
118 SnapshotReady
119 )
120
121 func (disposition SnapshotDisposition) String() string {
122 ret, ok := map[SnapshotDisposition]string{
123 SnapshotIncomplete: "SnapshotIncomplete",
124 SnapshotDefer: "SnapshotDefer",
125 SnapshotDrop: "SnapshotDrop",
126 SnapshotReady: "SnapshotReady",
127 }[disposition]
128 if !ok {
129 return fmt.Sprintf("%[1]T(%[1]d)", disposition)
130 }
131 return ret
132 }
133
134 type FastpathProcessor func(context.Context, *ambex.FastpathSnapshot)
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 func watchAllTheThingsInternal(
184 ctx context.Context,
185 encoded *atomic.Value,
186 k8sSrc K8sSource,
187 queries []kates.Query,
188 watchConsulFunc watchConsulFunc,
189 istioCertSrc IstioCertSource,
190 snapshotProcessor SnapshotProcessor,
191 fastpathProcessor FastpathProcessor,
192 ambassadorMeta *snapshot.AmbassadorMetaInfo,
193 ) error {
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
217
218
219
220
221
222
223
224
225
226 k8sWatcher, err := k8sSrc.Watch(ctx, queries...)
227 if err != nil {
228 return err
229 }
230 consulWatcher := newConsulWatcher(watchConsulFunc)
231 grp.Go("consul", consulWatcher.run)
232 istioCertWatcher, err := istioCertSrc.Watch(ctx)
233 if err != nil {
234 return err
235 }
236 istio := newIstioCertWatchManager(ctx, istioCertWatcher)
237
238
239
240
241
242 snapshots, err := NewSnapshotHolder(ambassadorMeta)
243 if err != nil {
244 return err
245 }
246
247
248
249 var out chan *SnapshotHolder
250 notifyCh := make(chan *SnapshotHolder)
251 grp.Go("notifyCh", func(ctx context.Context) error {
252 for {
253 select {
254 case sh := <-notifyCh:
255 if err := sh.Notify(ctx, encoded, consulWatcher, snapshotProcessor); err != nil {
256 return err
257 }
258 case <-ctx.Done():
259 return nil
260 }
261 }
262 })
263
264 grp.Go("loop", func(ctx context.Context) error {
265 for {
266 dlog.Debugf(ctx, "WATCHER: --------")
267
268
269
270
271
272
273 istio.StartLoop(ctx)
274
275 select {
276 case <-k8sWatcher.Changed():
277
278 changed, err := snapshots.K8sUpdate(ctx, k8sWatcher, consulWatcher, fastpathProcessor)
279 if err != nil {
280 return err
281 }
282 if !changed {
283 continue
284 }
285 out = notifyCh
286 case <-consulWatcher.changed():
287 dlog.Debugf(ctx, "WATCHER: Consul fired")
288 snapshots.ConsulUpdate(ctx, consulWatcher, fastpathProcessor)
289 out = notifyCh
290 case icertUpdate := <-istio.Changed():
291
292 if _, err := snapshots.IstioUpdate(ctx, istio, icertUpdate); err != nil {
293 return err
294 }
295 out = notifyCh
296 case out <- snapshots:
297 out = nil
298 case <-ctx.Done():
299 return nil
300 }
301 }
302 })
303
304 return grp.Wait()
305 }
306
307
308 type SnapshotHolder struct {
309
310 mutex sync.Mutex
311
312
313
314 validator *resourceValidator
315
316
317 ambassadorMeta *snapshot.AmbassadorMetaInfo
318
319
320
321
322
323 k8sSnapshot *snapshot.KubernetesSnapshot
324 consulSnapshot *snapshot.ConsulSnapshot
325
326
327
328
329
330
331 unsentDeltas []*kates.Delta
332
333 endpointRoutingInfo endpointRoutingInfo
334 dispatcher *gateway.Dispatcher
335
336
337
338
339 snapshotChangeCount int
340 snapshotChangeNotified int
341
342
343 firstReconfig bool
344 }
345
346 func NewSnapshotHolder(ambassadorMeta *snapshot.AmbassadorMetaInfo) (*SnapshotHolder, error) {
347 disp := gateway.NewDispatcher()
348 err := disp.Register("Gateway", func(untyped kates.Object) (*gateway.CompiledConfig, error) {
349 return gateway.Compile_Gateway(untyped.(*gw.Gateway))
350 })
351 if err != nil {
352 return nil, err
353 }
354 err = disp.Register("HTTPRoute", func(untyped kates.Object) (*gateway.CompiledConfig, error) {
355 return gateway.Compile_HTTPRoute(untyped.(*gw.HTTPRoute))
356 })
357 if err != nil {
358 return nil, err
359 }
360 validator, err := newResourceValidator()
361 if err != nil {
362 return nil, err
363 }
364 return &SnapshotHolder{
365 validator: validator,
366 ambassadorMeta: ambassadorMeta,
367 k8sSnapshot: NewKubernetesSnapshot(),
368 consulSnapshot: &snapshot.ConsulSnapshot{},
369 endpointRoutingInfo: newEndpointRoutingInfo(),
370 dispatcher: disp,
371 firstReconfig: true,
372 }, nil
373 }
374
375
376 func (sh *SnapshotHolder) K8sUpdate(
377 ctx context.Context,
378 watcher K8sWatcher,
379 consulWatcher *consulWatcher,
380 fastpathProcessor FastpathProcessor,
381 ) (bool, error) {
382 dbg := debug.FromContext(ctx)
383
384 katesUpdateTimer := dbg.Timer("katesUpdate")
385 parseAnnotationsTimer := dbg.Timer("parseAnnotations")
386 reconcileSecretsTimer := dbg.Timer("reconcileSecrets")
387 reconcileConsulTimer := dbg.Timer("reconcileConsul")
388 reconcileAuthServicesTimer := dbg.Timer("reconcileAuthServices")
389 reconcileRateLimitServicesTimer := dbg.Timer("reconcileRateLimitServices")
390
391 endpointsChanged := false
392 dispatcherChanged := false
393 var endpoints *ambex.Endpoints
394 var dispSnapshot *ecp_v3_cache.Snapshot
395 changed, err := func() (bool, error) {
396 dlog.Debugf(ctx, "[WATCHER]: processing cluster changes detected by the kubernetes watcher")
397 sh.mutex.Lock()
398 defer sh.mutex.Unlock()
399
400
401
402 var deltas []*kates.Delta
403 var changed bool
404 var err error
405 katesUpdateTimer.Time(func() {
406 changed, err = watcher.FilteredUpdate(ctx, sh.k8sSnapshot, &deltas, func(un *kates.Unstructured) bool {
407 return sh.validator.isValid(ctx, un)
408 })
409 })
410
411 if err != nil {
412 dlog.Errorf(ctx, "[WATCHER]: ERROR calculating changes in an update to the cluster config: %v", err)
413 return false, err
414 }
415 if !changed {
416 dlog.Debugf(ctx, "[WATCHER]: K8sUpdate did not detected any change to the resources relevant to this instance of Ambassador")
417 return false, err
418 }
419
420
421
422
423
424
425
426
427
428
429 for _, delta := range deltas {
430 if (delta.Kind == "ConsulResolver") && (delta.DeltaType != kates.ObjectDelete) {
431
432
433
434
435
436 for _, resolver := range sh.k8sSnapshot.ConsulResolvers {
437 if resolver.ObjectMeta.Name == delta.Name {
438
439
440
441 resolver.Spec.Address = os.ExpandEnv(resolver.Spec.Address)
442 }
443 }
444 }
445 }
446
447 parseAnnotationsTimer.Time(func() {
448 if err := sh.k8sSnapshot.PopulateAnnotations(ctx); err != nil {
449 dlog.Errorf(ctx, "[WATCHER]: ERROR parsing annotations in configuration change: %v", err)
450 }
451 })
452
453 reconcileSecretsTimer.Time(func() {
454 err = ReconcileSecrets(ctx, sh)
455 })
456 if err != nil {
457 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling Secrets: %v", err)
458 return false, err
459 }
460 reconcileConsulTimer.Time(func() {
461 err = ReconcileConsul(ctx, consulWatcher, sh.k8sSnapshot)
462 })
463 if err != nil {
464 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling Consul resources: %v", err)
465 return false, err
466 }
467 reconcileAuthServicesTimer.Time(func() {
468 err = ReconcileAuthServices(ctx, sh, &deltas)
469 })
470 if err != nil {
471 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling AuthServices: %v", err)
472 return false, err
473 }
474 reconcileRateLimitServicesTimer.Time(func() {
475 err = ReconcileRateLimit(ctx, sh, &deltas)
476 })
477 if err != nil {
478 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling RateLimitServices: %v", err)
479 return false, err
480 }
481
482 sh.endpointRoutingInfo.reconcileEndpointWatches(ctx, sh.k8sSnapshot)
483
484
485 if sh.endpointRoutingInfo.watchesChanged() {
486 dlog.Infof(ctx, "[WATCHER]: endpoint watches changed: %v", sh.endpointRoutingInfo.endpointWatches)
487 endpointsChanged = true
488 }
489
490 endpointsOnly := true
491 for _, delta := range deltas {
492 sh.unsentDeltas = append(sh.unsentDeltas, delta)
493
494 if delta.Kind == "Endpoints" {
495 key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name)
496 if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) {
497 endpointsChanged = true
498 }
499 } else {
500 endpointsOnly = false
501 }
502
503 if sh.dispatcher.IsRegistered(delta.Kind) {
504 dispatcherChanged = true
505 if delta.DeltaType == kates.ObjectDelete {
506 sh.dispatcher.DeleteKey(delta.Kind, delta.Namespace, delta.Name)
507 }
508 }
509 }
510 if !endpointsOnly {
511 sh.snapshotChangeCount += 1
512 }
513
514 if endpointsChanged || dispatcherChanged {
515 endpoints = makeEndpoints(ctx, sh.k8sSnapshot, sh.consulSnapshot.Endpoints)
516 for _, gwc := range sh.k8sSnapshot.GatewayClasses {
517 if err := sh.dispatcher.Upsert(gwc); err != nil {
518
519 dlog.Error(ctx, err)
520 }
521 }
522 for _, gw := range sh.k8sSnapshot.Gateways {
523 if err := sh.dispatcher.Upsert(gw); err != nil {
524
525 dlog.Error(ctx, err)
526 }
527
528 }
529 for _, hr := range sh.k8sSnapshot.HTTPRoutes {
530 if err := sh.dispatcher.Upsert(hr); err != nil {
531
532 dlog.Error(ctx, err)
533 }
534 }
535
536 _, dispSnapshot = sh.dispatcher.GetSnapshot(ctx)
537 if dispSnapshot == nil {
538 err := fmt.Errorf("[Dispatch Snapshot]: unable to get valid snapshot")
539 dlog.Error(ctx, err)
540 return false, err
541 }
542 }
543 return true, nil
544 }()
545 if err != nil {
546 dlog.Errorf(ctx, "[WATCHER]: ERROR checking changes from a cluster config update: %v", err)
547 return changed, err
548 }
549
550 if endpointsChanged || dispatcherChanged {
551 fastpath := &ambex.FastpathSnapshot{
552 Endpoints: endpoints,
553 Snapshot: dispSnapshot,
554 }
555 fastpathProcessor(ctx, fastpath)
556 }
557 return changed, nil
558 }
559
560 func (sh *SnapshotHolder) ConsulUpdate(ctx context.Context, consulWatcher *consulWatcher, fastpathProcessor FastpathProcessor) bool {
561 var endpoints *ambex.Endpoints
562 var dispSnapshot *ecp_v3_cache.Snapshot
563 func() {
564 sh.mutex.Lock()
565 defer sh.mutex.Unlock()
566 consulWatcher.update(sh.consulSnapshot)
567 endpoints = makeEndpoints(ctx, sh.k8sSnapshot, sh.consulSnapshot.Endpoints)
568 _, dispSnapshot = sh.dispatcher.GetSnapshot(ctx)
569 }()
570 fastpathProcessor(ctx, &ambex.FastpathSnapshot{
571 Endpoints: endpoints,
572 Snapshot: dispSnapshot,
573 })
574 return true
575 }
576
577 func (sh *SnapshotHolder) IstioUpdate(ctx context.Context, istio *istioCertWatchManager,
578 icertUpdate IstioCertUpdate) (bool, error) {
579 dbg := debug.FromContext(ctx)
580
581 istioCertUpdateTimer := dbg.Timer("istioCertUpdate")
582 reconcileSecretsTimer := dbg.Timer("reconcileSecrets")
583
584 sh.mutex.Lock()
585 defer sh.mutex.Unlock()
586
587 istioCertUpdateTimer.Time(func() {
588 istio.Update(ctx, icertUpdate, sh.k8sSnapshot)
589 })
590
591 var err error
592 reconcileSecretsTimer.Time(func() {
593 err = ReconcileSecrets(ctx, sh)
594 })
595 if err != nil {
596 return false, err
597 }
598
599 sh.snapshotChangeCount += 1
600 return true, nil
601 }
602
603 func (sh *SnapshotHolder) Notify(
604 ctx context.Context,
605 encoded *atomic.Value,
606 consulWatcher *consulWatcher,
607 snapshotProcessor SnapshotProcessor,
608 ) error {
609 dbg := debug.FromContext(ctx)
610
611 notifyWebhooksTimer := dbg.Timer("notifyWebhooks")
612
613
614 var snapshotJSON []byte
615 var bootstrapped bool
616 changed := true
617
618 err := func() error {
619 sh.mutex.Lock()
620 defer sh.mutex.Unlock()
621
622 if sh.snapshotChangeNotified == sh.snapshotChangeCount {
623 changed = false
624 return nil
625 }
626
627 sn := &snapshot.Snapshot{
628 Kubernetes: sh.k8sSnapshot,
629 Consul: sh.consulSnapshot,
630 Invalid: sh.validator.getInvalid(),
631 Deltas: sh.unsentDeltas,
632 AmbassadorMeta: sh.ambassadorMeta,
633 }
634
635 var err error
636 snapshotJSON, err = json.MarshalIndent(sn, "", " ")
637 if err != nil {
638 return err
639 }
640
641 bootstrapped = consulWatcher.isBootstrapped()
642 if bootstrapped {
643 sh.unsentDeltas = nil
644 if sh.firstReconfig {
645 dlog.Debugf(ctx, "WATCHER: Bootstrapped! Computing initial configuration...")
646 sh.firstReconfig = false
647 }
648 sh.snapshotChangeNotified = sh.snapshotChangeCount
649 }
650 return nil
651 }()
652 if err != nil {
653 return err
654 }
655 if !changed {
656 return nil
657 }
658
659 if bootstrapped {
660
661 encoded.Store(snapshotJSON)
662
663
664
665 var err error
666 notifyWebhooksTimer.Time(func() {
667 err = snapshotProcessor(ctx, SnapshotReady, snapshotJSON)
668 })
669 if err != nil {
670 return err
671 }
672 }
673 return snapshotProcessor(ctx, SnapshotIncomplete, snapshotJSON)
674 }
675
676
677 type k8sSource struct {
678 client *kates.Client
679 }
680
681 func (k *k8sSource) Watch(ctx context.Context, queries ...kates.Query) (K8sWatcher, error) {
682 return k.client.Watch(ctx, queries...)
683 }
684
685 func newK8sSource(client *kates.Client) *k8sSource {
686 return &k8sSource{
687 client: client,
688 }
689 }
690
View as plain text