1 package entrypoint
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "os"
8 "strconv"
9 "sync"
10 "sync/atomic"
11 "time"
12
13 gw "sigs.k8s.io/gateway-api/apis/v1alpha1"
14
15 "github.com/datawire/ambassador/v2/pkg/acp"
16 "github.com/datawire/ambassador/v2/pkg/ambex"
17 "github.com/datawire/ambassador/v2/pkg/debug"
18 ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
19 "github.com/datawire/ambassador/v2/pkg/gateway"
20 "github.com/datawire/ambassador/v2/pkg/kates"
21 "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
22 "github.com/datawire/dlib/dgroup"
23 "github.com/datawire/dlib/dlog"
24 )
25
26 func WatchAllTheThings(
27 ctx context.Context,
28 ambwatch *acp.AmbassadorWatcher,
29 encoded *atomic.Value,
30 fastpathCh chan<- *ambex.FastpathSnapshot,
31 clusterID string,
32 version string,
33 ) error {
34 client, err := kates.NewClient(kates.ClientConfig{})
35 if err != nil {
36 return err
37 }
38 intv, err := strconv.Atoi(env("AMBASSADOR_RECONFIG_MAX_DELAY", "1"))
39 if err != nil {
40 return err
41 }
42 maxInterval := time.Duration(intv) * time.Second
43 err = client.MaxAccumulatorInterval(maxInterval)
44 if err != nil {
45 return err
46 }
47 dlog.Infof(ctx, "AMBASSADOR_RECONFIG_MAX_DELAY set to %d", intv)
48
49 serverTypeList, err := client.ServerResources()
50 if err != nil {
51
52
53 dlog.Infof(ctx, "Warning, unable to list api-resources: %v", err)
54 }
55
56 interestingTypes := GetInterestingTypes(ctx, serverTypeList)
57 queries := GetQueries(ctx, interestingTypes)
58
59 ambassadorMeta := getAmbassadorMeta(GetAmbassadorID(), clusterID, version, client)
60
61
62
63 notify := func(ctx context.Context, disposition SnapshotDisposition, _ []byte) error {
64 if disposition == SnapshotReady {
65 return notifyReconfigWebhooks(ctx, ambwatch)
66 }
67 return nil
68 }
69
70 fastpathUpdate := func(ctx context.Context, fastpathSnapshot *ambex.FastpathSnapshot) {
71 fastpathCh <- fastpathSnapshot
72 }
73
74 k8sSrc := newK8sSource(client)
75 consulSrc := watchConsul
76 istioCertSrc := newIstioCertSource()
77
78 return watchAllTheThingsInternal(
79 ctx,
80 encoded,
81 k8sSrc,
82 queries,
83 consulSrc,
84 istioCertSrc,
85 notify,
86 fastpathUpdate,
87 ambassadorMeta,
88 )
89 }
90
91 func getAmbassadorMeta(ambassadorID string, clusterID string, version string, client *kates.Client) *snapshot.AmbassadorMetaInfo {
92 ambMeta := &snapshot.AmbassadorMetaInfo{
93 ClusterID: clusterID,
94 AmbassadorID: ambassadorID,
95 AmbassadorVersion: version,
96 }
97 kubeServerVer, err := client.ServerVersion()
98 if err == nil {
99 ambMeta.KubeVersion = kubeServerVer.GitVersion
100 }
101 return ambMeta
102 }
103
104 type SnapshotProcessor func(context.Context, SnapshotDisposition, []byte) error
105
106 type SnapshotDisposition int
107
108 const (
109
110 SnapshotIncomplete SnapshotDisposition = iota
111
112
113 SnapshotDefer
114
115
116 SnapshotDrop
117
118 SnapshotReady
119 )
120
121 func (disposition SnapshotDisposition) String() string {
122 ret, ok := map[SnapshotDisposition]string{
123 SnapshotIncomplete: "SnapshotIncomplete",
124 SnapshotDefer: "SnapshotDefer",
125 SnapshotDrop: "SnapshotDrop",
126 SnapshotReady: "SnapshotReady",
127 }[disposition]
128 if !ok {
129 return fmt.Sprintf("%[1]T(%[1]d)", disposition)
130 }
131 return ret
132 }
133
134 type FastpathProcessor func(context.Context, *ambex.FastpathSnapshot)
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 func watchAllTheThingsInternal(
184 ctx context.Context,
185 encoded *atomic.Value,
186 k8sSrc K8sSource,
187 queries []kates.Query,
188 watchConsulFunc watchConsulFunc,
189 istioCertSrc IstioCertSource,
190 snapshotProcessor SnapshotProcessor,
191 fastpathProcessor FastpathProcessor,
192 ambassadorMeta *snapshot.AmbassadorMetaInfo,
193 ) error {
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216 grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
217
218
219
220
221
222
223
224
225
226 k8sWatcher, err := k8sSrc.Watch(ctx, queries...)
227 if err != nil {
228 return err
229 }
230 consulWatcher := newConsulWatcher(watchConsulFunc)
231 grp.Go("consul", consulWatcher.run)
232 istioCertWatcher, err := istioCertSrc.Watch(ctx)
233 if err != nil {
234 return err
235 }
236 istio := newIstioCertWatchManager(ctx, istioCertWatcher)
237
238
239
240
241
242 snapshots, err := NewSnapshotHolder(ambassadorMeta)
243 if err != nil {
244 return err
245 }
246
247
248
249 var out chan *SnapshotHolder
250 notifyCh := make(chan *SnapshotHolder)
251 grp.Go("notifyCh", func(ctx context.Context) error {
252 for {
253 select {
254 case sh := <-notifyCh:
255 if err := sh.Notify(ctx, encoded, consulWatcher, snapshotProcessor); err != nil {
256 return err
257 }
258 case <-ctx.Done():
259 return nil
260 }
261 }
262 })
263
264 grp.Go("loop", func(ctx context.Context) error {
265 for {
266 dlog.Debugf(ctx, "WATCHER: --------")
267
268
269
270
271
272
273 istio.StartLoop(ctx)
274
275 select {
276 case <-k8sWatcher.Changed():
277
278 changed, err := snapshots.K8sUpdate(ctx, k8sWatcher, consulWatcher, fastpathProcessor)
279 if err != nil {
280 return err
281 }
282 if !changed {
283 continue
284 }
285 out = notifyCh
286 case <-consulWatcher.changed():
287 dlog.Debugf(ctx, "WATCHER: Consul fired")
288 snapshots.ConsulUpdate(ctx, consulWatcher, fastpathProcessor)
289 out = notifyCh
290 case icertUpdate := <-istio.Changed():
291
292 if _, err := snapshots.IstioUpdate(ctx, istio, icertUpdate); err != nil {
293 return err
294 }
295 out = notifyCh
296 case out <- snapshots:
297 out = nil
298 case <-ctx.Done():
299 return nil
300 }
301 }
302 })
303
304 return grp.Wait()
305 }
306
307
308 type SnapshotHolder struct {
309
310 mutex sync.Mutex
311
312
313
314 validator *resourceValidator
315
316
317 ambassadorMeta *snapshot.AmbassadorMetaInfo
318
319
320
321
322
323 k8sSnapshot *snapshot.KubernetesSnapshot
324 consulSnapshot *snapshot.ConsulSnapshot
325
326
327
328
329
330
331 unsentDeltas []*kates.Delta
332
333 endpointRoutingInfo endpointRoutingInfo
334 dispatcher *gateway.Dispatcher
335
336
337
338
339 snapshotChangeCount int
340 snapshotChangeNotified int
341
342
343 firstReconfig bool
344 }
345
346 func NewSnapshotHolder(ambassadorMeta *snapshot.AmbassadorMetaInfo) (*SnapshotHolder, error) {
347 disp := gateway.NewDispatcher()
348 err := disp.Register("Gateway", func(untyped kates.Object) (*gateway.CompiledConfig, error) {
349 return gateway.Compile_Gateway(untyped.(*gw.Gateway))
350 })
351 if err != nil {
352 return nil, err
353 }
354 err = disp.Register("HTTPRoute", func(untyped kates.Object) (*gateway.CompiledConfig, error) {
355 return gateway.Compile_HTTPRoute(untyped.(*gw.HTTPRoute))
356 })
357 if err != nil {
358 return nil, err
359 }
360 validator, err := newResourceValidator()
361 if err != nil {
362 return nil, err
363 }
364 return &SnapshotHolder{
365 validator: validator,
366 ambassadorMeta: ambassadorMeta,
367 k8sSnapshot: NewKubernetesSnapshot(),
368 consulSnapshot: &snapshot.ConsulSnapshot{},
369 endpointRoutingInfo: newEndpointRoutingInfo(),
370 dispatcher: disp,
371 firstReconfig: true,
372 }, nil
373 }
374
375
376 func (sh *SnapshotHolder) K8sUpdate(
377 ctx context.Context,
378 watcher K8sWatcher,
379 consulWatcher *consulWatcher,
380 fastpathProcessor FastpathProcessor,
381 ) (bool, error) {
382 dbg := debug.FromContext(ctx)
383
384 katesUpdateTimer := dbg.Timer("katesUpdate")
385 parseAnnotationsTimer := dbg.Timer("parseAnnotations")
386 reconcileSecretsTimer := dbg.Timer("reconcileSecrets")
387 reconcileConsulTimer := dbg.Timer("reconcileConsul")
388 reconcileAuthServicesTimer := dbg.Timer("reconcileAuthServices")
389
390 endpointsChanged := false
391 dispatcherChanged := false
392 var endpoints *ambex.Endpoints
393 var dispSnapshot *ecp_v2_cache.Snapshot
394 changed, err := func() (bool, error) {
395 dlog.Debugf(ctx, "[WATCHER]: processing cluster changes detected by the kubernetes watcher")
396 sh.mutex.Lock()
397 defer sh.mutex.Unlock()
398
399
400
401 var deltas []*kates.Delta
402 var changed bool
403 var err error
404 katesUpdateTimer.Time(func() {
405 changed, err = watcher.FilteredUpdate(ctx, sh.k8sSnapshot, &deltas, func(un *kates.Unstructured) bool {
406 return sh.validator.isValid(ctx, un)
407 })
408 })
409
410 if err != nil {
411 dlog.Errorf(ctx, "[WATCHER]: ERROR calculating changes in an update to the cluster config: %v", err)
412 return false, err
413 }
414 if !changed {
415 dlog.Debugf(ctx, "[WATCHER]: K8sUpdate did not detected any change to the resources relevant to this instance of Ambassador")
416 return false, err
417 }
418
419
420
421
422
423
424
425
426
427
428 for _, delta := range deltas {
429 if (delta.Kind == "ConsulResolver") && (delta.DeltaType != kates.ObjectDelete) {
430
431
432
433
434
435 for _, resolver := range sh.k8sSnapshot.ConsulResolvers {
436 if resolver.ObjectMeta.Name == delta.Name {
437
438
439
440 resolver.Spec.Address = os.ExpandEnv(resolver.Spec.Address)
441 }
442 }
443 }
444 }
445
446 parseAnnotationsTimer.Time(func() {
447 if err := sh.k8sSnapshot.PopulateAnnotations(ctx); err != nil {
448 dlog.Errorf(ctx, "[WATCHER]: ERROR parsing annotations in configuration change: %v", err)
449 }
450 })
451
452 reconcileSecretsTimer.Time(func() {
453 err = ReconcileSecrets(ctx, sh)
454 })
455 if err != nil {
456 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling Secrets: %v", err)
457 return false, err
458 }
459 reconcileConsulTimer.Time(func() {
460 err = ReconcileConsul(ctx, consulWatcher, sh.k8sSnapshot)
461 })
462 if err != nil {
463 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling Consul resources: %v", err)
464 return false, err
465 }
466 reconcileAuthServicesTimer.Time(func() {
467 err = ReconcileAuthServices(ctx, sh, &deltas)
468 })
469 if err != nil {
470 dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling AuthServices: %v", err)
471 return false, err
472 }
473
474 sh.endpointRoutingInfo.reconcileEndpointWatches(ctx, sh.k8sSnapshot)
475
476
477 if sh.endpointRoutingInfo.watchesChanged() {
478 dlog.Infof(ctx, "[WATCHER]: endpoint watches changed: %v", sh.endpointRoutingInfo.endpointWatches)
479 endpointsChanged = true
480 }
481
482 endpointsOnly := true
483 for _, delta := range deltas {
484 sh.unsentDeltas = append(sh.unsentDeltas, delta)
485
486 if delta.Kind == "Endpoints" {
487 key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name)
488 if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) {
489 endpointsChanged = true
490 }
491 } else {
492 endpointsOnly = false
493 }
494
495 if sh.dispatcher.IsRegistered(delta.Kind) {
496 dispatcherChanged = true
497 if delta.DeltaType == kates.ObjectDelete {
498 sh.dispatcher.DeleteKey(delta.Kind, delta.Namespace, delta.Name)
499 }
500 }
501 }
502 if !endpointsOnly {
503 sh.snapshotChangeCount += 1
504 }
505
506 if endpointsChanged || dispatcherChanged {
507 endpoints = makeEndpoints(ctx, sh.k8sSnapshot, sh.consulSnapshot.Endpoints)
508 for _, gwc := range sh.k8sSnapshot.GatewayClasses {
509 if err := sh.dispatcher.Upsert(gwc); err != nil {
510
511 dlog.Error(ctx, err)
512 }
513 }
514 for _, gw := range sh.k8sSnapshot.Gateways {
515 if err := sh.dispatcher.Upsert(gw); err != nil {
516
517 dlog.Error(ctx, err)
518 }
519
520 }
521 for _, hr := range sh.k8sSnapshot.HTTPRoutes {
522 if err := sh.dispatcher.Upsert(hr); err != nil {
523
524 dlog.Error(ctx, err)
525 }
526 }
527 _, dispSnapshot = sh.dispatcher.GetSnapshot(ctx)
528 }
529 return true, nil
530 }()
531 if err != nil {
532 dlog.Errorf(ctx, "[WATCHER]: ERROR checking changes from a cluster config update: %v", err)
533 return changed, err
534 }
535
536 if endpointsChanged || dispatcherChanged {
537 fastpath := &ambex.FastpathSnapshot{
538 Endpoints: endpoints,
539 Snapshot: dispSnapshot,
540 }
541 fastpathProcessor(ctx, fastpath)
542 }
543 return changed, nil
544 }
545
546 func (sh *SnapshotHolder) ConsulUpdate(ctx context.Context, consulWatcher *consulWatcher, fastpathProcessor FastpathProcessor) bool {
547 var endpoints *ambex.Endpoints
548 var dispSnapshot *ecp_v2_cache.Snapshot
549 func() {
550 sh.mutex.Lock()
551 defer sh.mutex.Unlock()
552 consulWatcher.update(sh.consulSnapshot)
553 endpoints = makeEndpoints(ctx, sh.k8sSnapshot, sh.consulSnapshot.Endpoints)
554 _, dispSnapshot = sh.dispatcher.GetSnapshot(ctx)
555 }()
556 fastpathProcessor(ctx, &ambex.FastpathSnapshot{
557 Endpoints: endpoints,
558 Snapshot: dispSnapshot,
559 })
560 return true
561 }
562
563 func (sh *SnapshotHolder) IstioUpdate(ctx context.Context, istio *istioCertWatchManager,
564 icertUpdate IstioCertUpdate) (bool, error) {
565 dbg := debug.FromContext(ctx)
566
567 istioCertUpdateTimer := dbg.Timer("istioCertUpdate")
568 reconcileSecretsTimer := dbg.Timer("reconcileSecrets")
569
570 sh.mutex.Lock()
571 defer sh.mutex.Unlock()
572
573 istioCertUpdateTimer.Time(func() {
574 istio.Update(ctx, icertUpdate, sh.k8sSnapshot)
575 })
576
577 var err error
578 reconcileSecretsTimer.Time(func() {
579 err = ReconcileSecrets(ctx, sh)
580 })
581 if err != nil {
582 return false, err
583 }
584
585 sh.snapshotChangeCount += 1
586 return true, nil
587 }
588
589 func (sh *SnapshotHolder) Notify(
590 ctx context.Context,
591 encoded *atomic.Value,
592 consulWatcher *consulWatcher,
593 snapshotProcessor SnapshotProcessor,
594 ) error {
595 dbg := debug.FromContext(ctx)
596
597 notifyWebhooksTimer := dbg.Timer("notifyWebhooks")
598
599
600 var snapshotJSON []byte
601 var bootstrapped bool
602 changed := true
603
604 err := func() error {
605 sh.mutex.Lock()
606 defer sh.mutex.Unlock()
607
608 if sh.snapshotChangeNotified == sh.snapshotChangeCount {
609 changed = false
610 return nil
611 }
612
613 sn := &snapshot.Snapshot{
614 Kubernetes: sh.k8sSnapshot,
615 Consul: sh.consulSnapshot,
616 Invalid: sh.validator.getInvalid(),
617 Deltas: sh.unsentDeltas,
618 AmbassadorMeta: sh.ambassadorMeta,
619 }
620
621 var err error
622 snapshotJSON, err = json.MarshalIndent(sn, "", " ")
623 if err != nil {
624 return err
625 }
626
627 bootstrapped = consulWatcher.isBootstrapped()
628 if bootstrapped {
629 sh.unsentDeltas = nil
630 if sh.firstReconfig {
631 dlog.Debugf(ctx, "WATCHER: Bootstrapped! Computing initial configuration...")
632 sh.firstReconfig = false
633 }
634 sh.snapshotChangeNotified = sh.snapshotChangeCount
635 }
636 return nil
637 }()
638 if err != nil {
639 return err
640 }
641 if !changed {
642 return nil
643 }
644
645 if bootstrapped {
646
647 encoded.Store(snapshotJSON)
648
649
650
651 var err error
652 notifyWebhooksTimer.Time(func() {
653 err = snapshotProcessor(ctx, SnapshotReady, snapshotJSON)
654 })
655 if err != nil {
656 return err
657 }
658 }
659 return snapshotProcessor(ctx, SnapshotIncomplete, snapshotJSON)
660 }
661
662
663 type k8sSource struct {
664 client *kates.Client
665 }
666
667 func (k *k8sSource) Watch(ctx context.Context, queries ...kates.Query) (K8sWatcher, error) {
668 return k.client.Watch(ctx, queries...)
669 }
670
671 func newK8sSource(client *kates.Client) *k8sSource {
672 return &k8sSource{
673 client: client,
674 }
675 }
676
View as plain text