...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/watcher.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"os"
     8  	"strconv"
     9  	"sync"
    10  	"sync/atomic"
    11  	"time"
    12  
    13  	gw "sigs.k8s.io/gateway-api/apis/v1alpha1"
    14  
    15  	"github.com/datawire/ambassador/v2/pkg/acp"
    16  	"github.com/datawire/ambassador/v2/pkg/ambex"
    17  	"github.com/datawire/ambassador/v2/pkg/debug"
    18  	ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
    19  	"github.com/datawire/ambassador/v2/pkg/gateway"
    20  	"github.com/datawire/ambassador/v2/pkg/kates"
    21  	"github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    22  	"github.com/datawire/dlib/dgroup"
    23  	"github.com/datawire/dlib/dlog"
    24  )
    25  
    26  func WatchAllTheThings(
    27  	ctx context.Context,
    28  	ambwatch *acp.AmbassadorWatcher,
    29  	encoded *atomic.Value,
    30  	fastpathCh chan<- *ambex.FastpathSnapshot,
    31  	clusterID string,
    32  	version string,
    33  ) error {
    34  	client, err := kates.NewClient(kates.ClientConfig{})
    35  	if err != nil {
    36  		return err
    37  	}
    38  	intv, err := strconv.Atoi(env("AMBASSADOR_RECONFIG_MAX_DELAY", "1"))
    39  	if err != nil {
    40  		return err
    41  	}
    42  	maxInterval := time.Duration(intv) * time.Second
    43  	err = client.MaxAccumulatorInterval(maxInterval)
    44  	if err != nil {
    45  		return err
    46  	}
    47  	dlog.Infof(ctx, "AMBASSADOR_RECONFIG_MAX_DELAY set to %d", intv)
    48  
    49  	serverTypeList, err := client.ServerResources()
    50  	if err != nil {
    51  		// It's possible that an error prevented listing some apigroups, but not all; so
    52  		// process the output even if there is an error.
    53  		dlog.Infof(ctx, "Warning, unable to list api-resources: %v", err)
    54  	}
    55  
    56  	interestingTypes := GetInterestingTypes(ctx, serverTypeList)
    57  	queries := GetQueries(ctx, interestingTypes)
    58  
    59  	ambassadorMeta := getAmbassadorMeta(GetAmbassadorID(), clusterID, version, client)
    60  
    61  	// **** SETUP DONE for the Kubernetes Watcher
    62  
    63  	notify := func(ctx context.Context, disposition SnapshotDisposition, _ []byte) error {
    64  		if disposition == SnapshotReady {
    65  			return notifyReconfigWebhooks(ctx, ambwatch)
    66  		}
    67  		return nil
    68  	}
    69  
    70  	fastpathUpdate := func(ctx context.Context, fastpathSnapshot *ambex.FastpathSnapshot) {
    71  		fastpathCh <- fastpathSnapshot
    72  	}
    73  
    74  	k8sSrc := newK8sSource(client)
    75  	consulSrc := watchConsul
    76  	istioCertSrc := newIstioCertSource()
    77  
    78  	return watchAllTheThingsInternal(
    79  		ctx,
    80  		encoded,
    81  		k8sSrc,
    82  		queries,
    83  		consulSrc, // watchConsulFunc
    84  		istioCertSrc,
    85  		notify,         // snapshotProcessor
    86  		fastpathUpdate, // fastpathProcessor
    87  		ambassadorMeta,
    88  	)
    89  }
    90  
    91  func getAmbassadorMeta(ambassadorID string, clusterID string, version string, client *kates.Client) *snapshot.AmbassadorMetaInfo {
    92  	ambMeta := &snapshot.AmbassadorMetaInfo{
    93  		ClusterID:         clusterID,
    94  		AmbassadorID:      ambassadorID,
    95  		AmbassadorVersion: version,
    96  	}
    97  	kubeServerVer, err := client.ServerVersion()
    98  	if err == nil {
    99  		ambMeta.KubeVersion = kubeServerVer.GitVersion
   100  	}
   101  	return ambMeta
   102  }
   103  
   104  type SnapshotProcessor func(context.Context, SnapshotDisposition, []byte) error
   105  
   106  type SnapshotDisposition int
   107  
   108  const (
   109  	// Indicates the watcher is still in the booting process and the snapshot has dangling pointers.
   110  	SnapshotIncomplete SnapshotDisposition = iota
   111  	// Indicates that the watcher is deferring processing of the snapshot because it is considered
   112  	// to be a product of churn.
   113  	SnapshotDefer
   114  	// Indicates that the watcher is dropping the snapshot because it has determined that it is
   115  	// logically a noop.
   116  	SnapshotDrop
   117  	// Indicates that the snapshot is ready to be processed.
   118  	SnapshotReady
   119  )
   120  
   121  func (disposition SnapshotDisposition) String() string {
   122  	ret, ok := map[SnapshotDisposition]string{
   123  		SnapshotIncomplete: "SnapshotIncomplete",
   124  		SnapshotDefer:      "SnapshotDefer",
   125  		SnapshotDrop:       "SnapshotDrop",
   126  		SnapshotReady:      "SnapshotReady",
   127  	}[disposition]
   128  	if !ok {
   129  		return fmt.Sprintf("%[1]T(%[1]d)", disposition)
   130  	}
   131  	return ret
   132  }
   133  
   134  type FastpathProcessor func(context.Context, *ambex.FastpathSnapshot)
   135  
   136  // watcher is _the_ thing that watches all the different kinds of Ambassador configuration
   137  // events that we care about. This right here is pretty much the root of everything flowing
   138  // into Ambassador from the outside world, so:
   139  //
   140  // ******** READ THE FOLLOWING COMMENT CAREFULLY! ********
   141  //
   142  // Since this is where _all_ the different kinds of these events (K8s, Consul, filesystem,
   143  // whatever) are brought together and examined, and where we pass judgement on whether or
   144  // not a given event is worth reconfiguring Ambassador or not, the interactions between
   145  // this function and other pieces of the system can be quite a bit more complex than you
   146  // might expect. There are two really huge things you should be bearing in mind if you
   147  // need to work on this:
   148  //
   149  //  1. The set of things we're watching is not static, but it must converge.
   150  //
   151  //     An example: you can set up a Kubernetes watch that finds a KubernetesConsulResolver
   152  //     resource, which will then prompt a new Consul watch to happen. At present, nothing
   153  //     that that Consul watch could find is capable of prompting a new Kubernetes watch to
   154  //     be created. This is important: it would be fairly easy to change things such that
   155  //     there is a feedback loop where the set of things we watch does not converge on a
   156  //     stable set. If such a loop exists, fixing it will probably require grokking this
   157  //     watcher function, kates.Accumulator, and maybe the reconcilers in consul.go and
   158  //     endpoints.go as well.
   159  //
   160  //  2. No one source of input events can be allowed to alter the event stream for another
   161  //     source.
   162  //
   163  //     An example: at one point, a bug in the watcher function resulted in the Kubernetes
   164  //     watcher being able to decide to short-circuit a watcher iteration -- which had the
   165  //     effect of allowing the K8s watcher to cause _Consul_ events to be ignored. That's
   166  //     not OK. To guard against this:
   167  //
   168  //     A. Refrain from adding state to the watcher loop.
   169  //
   170  //     B. Try very very hard to keep logic that applies to a single source within that
   171  //     source's specific case in the watcher's select statement.
   172  //
   173  //     C. Don't add any more select statements, so that B. above is unambiguous.
   174  //
   175  //  3. If you add a new channel to watch, you MUST make sure it has a way to let the loop
   176  //     know whether it saw real changes, so that the short-circuit logic works correctly.
   177  //     That said, recognize that the way it works now, with the state for the individual
   178  //     watchers in the watcher() function itself is a crock, and the path forward is to
   179  //     refactor them into classes that can separate things more cleanly.
   180  //
   181  //  4. If you don't fully understand everything above, _do not touch this function without
   182  //     guidance_.
   183  func watchAllTheThingsInternal(
   184  	ctx context.Context,
   185  	encoded *atomic.Value,
   186  	k8sSrc K8sSource,
   187  	queries []kates.Query,
   188  	watchConsulFunc watchConsulFunc,
   189  	istioCertSrc IstioCertSource,
   190  	snapshotProcessor SnapshotProcessor,
   191  	fastpathProcessor FastpathProcessor,
   192  	ambassadorMeta *snapshot.AmbassadorMetaInfo,
   193  ) error {
   194  	// Ambassador has three sources of inputs: kubernetes, consul, and the filesystem. The job
   195  	// of the watchAllTheThingsInternal loop is to read updates from all three of these sources,
   196  	// assemble them into a single coherent configuration, and pass them along to other parts of
   197  	// ambassador for processing.
   198  
   199  	// The watchAllTheThingsInternal loop must decide what information is relevant to solicit
   200  	// from each source. This is decided a bit differently for each source.
   201  	//
   202  	// For kubernetes the set of subscriptions is basically hardcoded to the set of resources
   203  	// defined in interesting_types.go, this is filtered down at boot based on RBAC
   204  	// limitations. The filtered list is used to construct the queries that are passed into this
   205  	// function, and that set of queries remains fixed for the lifetime of the loop, i.e. the
   206  	// lifetime of the abmassador process (unless we are testing, in which case we may run the
   207  	// watchAllTheThingsInternal loop more than once in a single process).
   208  	//
   209  	// For the consul source we derive the set of resources to watch based on the configuration in
   210  	// kubernetes, i.e. we watch the services defined in Mappings that are configured to use a
   211  	// consul resolver. We use the ConsulResolver that a given Mapping is configured with to find
   212  	// the datacenter to query.
   213  	//
   214  	// The filesystem datasource is for istio secrets. XXX fill in more
   215  
   216  	grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
   217  
   218  	// Each time the wathcerLoop wakes up, it assembles updates from whatever source woke it up into
   219  	// its view of the world. It then determines if enough information has been assembled to
   220  	// consider ambassador "booted" and if so passes the updated view along to its output (the
   221  	// SnapshotProcessor).
   222  
   223  	// Setup our three sources of ambassador inputs: kubernetes, consul, and the filesystem. Each of
   224  	// these have interfaces that enable us to run with the "real" implementation or a mock
   225  	// implementation for our Fake test harness.
   226  	k8sWatcher, err := k8sSrc.Watch(ctx, queries...)
   227  	if err != nil {
   228  		return err
   229  	}
   230  	consulWatcher := newConsulWatcher(watchConsulFunc)
   231  	grp.Go("consul", consulWatcher.run)
   232  	istioCertWatcher, err := istioCertSrc.Watch(ctx)
   233  	if err != nil {
   234  		return err
   235  	}
   236  	istio := newIstioCertWatchManager(ctx, istioCertWatcher)
   237  
   238  	// SnapshotHolder tracks all the data structures that get updated by the various sources of
   239  	// information. It also holds the business logic that converts the data as received to a more
   240  	// amenable form for processing. It not only serves to group these together, but it also
   241  	// provides a mutex to protect access to the data.
   242  	snapshots, err := NewSnapshotHolder(ambassadorMeta)
   243  	if err != nil {
   244  		return err
   245  	}
   246  
   247  	// This points to notifyCh when we have updated information to send and nil when we have no new
   248  	// information. This is deliberately nil to begin with as we have nothing to send yet.
   249  	var out chan *SnapshotHolder
   250  	notifyCh := make(chan *SnapshotHolder)
   251  	grp.Go("notifyCh", func(ctx context.Context) error {
   252  		for {
   253  			select {
   254  			case sh := <-notifyCh:
   255  				if err := sh.Notify(ctx, encoded, consulWatcher, snapshotProcessor); err != nil {
   256  					return err
   257  				}
   258  			case <-ctx.Done():
   259  				return nil
   260  			}
   261  		}
   262  	})
   263  
   264  	grp.Go("loop", func(ctx context.Context) error {
   265  		for {
   266  			dlog.Debugf(ctx, "WATCHER: --------")
   267  
   268  			// XXX Hack: the istioCertWatchManager needs to reset at the start of the
   269  			// loop, for now. A better way, I think, will be to instead track deltas in
   270  			// ReconcileSecrets -- that way we can ditch this crap and Istio-cert changes
   271  			// that somehow don't generate an actual change will still not trigger a
   272  			// reconfigure.
   273  			istio.StartLoop(ctx)
   274  
   275  			select {
   276  			case <-k8sWatcher.Changed():
   277  				// Kubernetes has some changes, so we need to handle them.
   278  				changed, err := snapshots.K8sUpdate(ctx, k8sWatcher, consulWatcher, fastpathProcessor)
   279  				if err != nil {
   280  					return err
   281  				}
   282  				if !changed {
   283  					continue
   284  				}
   285  				out = notifyCh
   286  			case <-consulWatcher.changed():
   287  				dlog.Debugf(ctx, "WATCHER: Consul fired")
   288  				snapshots.ConsulUpdate(ctx, consulWatcher, fastpathProcessor)
   289  				out = notifyCh
   290  			case icertUpdate := <-istio.Changed():
   291  				// The Istio cert has some changes, so we need to handle them.
   292  				if _, err := snapshots.IstioUpdate(ctx, istio, icertUpdate); err != nil {
   293  					return err
   294  				}
   295  				out = notifyCh
   296  			case out <- snapshots:
   297  				out = nil
   298  			case <-ctx.Done():
   299  				return nil
   300  			}
   301  		}
   302  	})
   303  
   304  	return grp.Wait()
   305  }
   306  
   307  // SnapshotHolder is responsible for holding
   308  type SnapshotHolder struct {
   309  	// This protects the entire struct.
   310  	mutex sync.Mutex
   311  
   312  	// The thing that knows how to validate kubernetes resources. This is always calling into the
   313  	// kates validator even when we are being driven by the Fake harness.
   314  	validator *resourceValidator
   315  
   316  	// Ambassadro meta info to pass along in the snapshot.
   317  	ambassadorMeta *snapshot.AmbassadorMetaInfo
   318  
   319  	// These two fields represent the view of the kubernetes world and the view of the consul
   320  	// world. This view is constructed from the raw data given to us from each respective source,
   321  	// plus additional fields that are computed based on the raw data. These are cumulative values,
   322  	// they always represent the entire state of their respective worlds.
   323  	k8sSnapshot    *snapshot.KubernetesSnapshot
   324  	consulSnapshot *snapshot.ConsulSnapshot
   325  	// XXX: you would expect there to be an analogous snapshot for istio secrets, however the istio
   326  	// source works by directly munging the k8sSnapshot.
   327  
   328  	// The unsentDeltas field tracks the stream of deltas that have occured in between each
   329  	// kubernetes snapshot. This is a passthrough of the full stream of deltas reported by kates
   330  	// which is in turn a facade fo the deltas reported by client-go.
   331  	unsentDeltas []*kates.Delta
   332  
   333  	endpointRoutingInfo endpointRoutingInfo
   334  	dispatcher          *gateway.Dispatcher
   335  
   336  	// Serial number that tracks if we need to send snapshot changes or not. This is incremented
   337  	// when a change worth sending is made, and we copy it over to snapshotNotifiedCount when the
   338  	// change is sent.
   339  	snapshotChangeCount    int
   340  	snapshotChangeNotified int
   341  
   342  	// Has the very first reconfig happened?
   343  	firstReconfig bool
   344  }
   345  
   346  func NewSnapshotHolder(ambassadorMeta *snapshot.AmbassadorMetaInfo) (*SnapshotHolder, error) {
   347  	disp := gateway.NewDispatcher()
   348  	err := disp.Register("Gateway", func(untyped kates.Object) (*gateway.CompiledConfig, error) {
   349  		return gateway.Compile_Gateway(untyped.(*gw.Gateway))
   350  	})
   351  	if err != nil {
   352  		return nil, err
   353  	}
   354  	err = disp.Register("HTTPRoute", func(untyped kates.Object) (*gateway.CompiledConfig, error) {
   355  		return gateway.Compile_HTTPRoute(untyped.(*gw.HTTPRoute))
   356  	})
   357  	if err != nil {
   358  		return nil, err
   359  	}
   360  	validator, err := newResourceValidator()
   361  	if err != nil {
   362  		return nil, err
   363  	}
   364  	return &SnapshotHolder{
   365  		validator:           validator,
   366  		ambassadorMeta:      ambassadorMeta,
   367  		k8sSnapshot:         NewKubernetesSnapshot(),
   368  		consulSnapshot:      &snapshot.ConsulSnapshot{},
   369  		endpointRoutingInfo: newEndpointRoutingInfo(),
   370  		dispatcher:          disp,
   371  		firstReconfig:       true,
   372  	}, nil
   373  }
   374  
   375  // Get the raw update from the kubernetes watcher, then redo our computed view.
   376  func (sh *SnapshotHolder) K8sUpdate(
   377  	ctx context.Context,
   378  	watcher K8sWatcher,
   379  	consulWatcher *consulWatcher,
   380  	fastpathProcessor FastpathProcessor,
   381  ) (bool, error) {
   382  	dbg := debug.FromContext(ctx)
   383  
   384  	katesUpdateTimer := dbg.Timer("katesUpdate")
   385  	parseAnnotationsTimer := dbg.Timer("parseAnnotations")
   386  	reconcileSecretsTimer := dbg.Timer("reconcileSecrets")
   387  	reconcileConsulTimer := dbg.Timer("reconcileConsul")
   388  	reconcileAuthServicesTimer := dbg.Timer("reconcileAuthServices")
   389  
   390  	endpointsChanged := false
   391  	dispatcherChanged := false
   392  	var endpoints *ambex.Endpoints
   393  	var dispSnapshot *ecp_v2_cache.Snapshot
   394  	changed, err := func() (bool, error) {
   395  		dlog.Debugf(ctx, "[WATCHER]: processing cluster changes detected by the kubernetes watcher")
   396  		sh.mutex.Lock()
   397  		defer sh.mutex.Unlock()
   398  
   399  		// We could probably get a win in some scenarios by using this filtered update thing to
   400  		// pre-exclude based on ambassador-id.
   401  		var deltas []*kates.Delta
   402  		var changed bool
   403  		var err error
   404  		katesUpdateTimer.Time(func() {
   405  			changed, err = watcher.FilteredUpdate(ctx, sh.k8sSnapshot, &deltas, func(un *kates.Unstructured) bool {
   406  				return sh.validator.isValid(ctx, un)
   407  			})
   408  		})
   409  
   410  		if err != nil {
   411  			dlog.Errorf(ctx, "[WATCHER]: ERROR calculating changes in an update to the cluster config: %v", err)
   412  			return false, err
   413  		}
   414  		if !changed {
   415  			dlog.Debugf(ctx, "[WATCHER]: K8sUpdate did not detected any change to the resources relevant to this instance of Ambassador")
   416  			return false, err
   417  		}
   418  
   419  		// ConsulResolvers are special in that people like to be able to interpolate enviroment
   420  		// variables in their Spec.Address field (e.g. "address: $CONSULHOST:8500" or the like),
   421  		// so we need to handle that, but we need to also not interpolate the same thing multiple
   422  		// times (it's probably unlikely to cause trouble, but you just know eventually it'll
   423  		// bite us). So we'll look through deltas for changing ConsulResolvers, and then only
   424  		// interpolate the ones that've changed.
   425  		//
   426  		// Also note that legacy mode supported interpolation literally anywhere in the
   427  		// input, but let's not do that here.
   428  		for _, delta := range deltas {
   429  			if (delta.Kind == "ConsulResolver") && (delta.DeltaType != kates.ObjectDelete) {
   430  				// Oh, look, a ConsulResolver changed, and it wasn't deleted. Go find the object
   431  				// in the snapshot so we can update it.
   432  				//
   433  				// XXX Yes, I know, linear searches suck. We don't expect there to be many
   434  				// ConsulResolvers, though, and we also don't expect them to change often.
   435  				for _, resolver := range sh.k8sSnapshot.ConsulResolvers {
   436  					if resolver.ObjectMeta.Name == delta.Name {
   437  						// Found it! Go do the environment variable interpolation and update
   438  						// resolver.Spec.Address in place, so that the change makes it into
   439  						// the snapshot.
   440  						resolver.Spec.Address = os.ExpandEnv(resolver.Spec.Address)
   441  					}
   442  				}
   443  			}
   444  		}
   445  
   446  		parseAnnotationsTimer.Time(func() {
   447  			if err := sh.k8sSnapshot.PopulateAnnotations(ctx); err != nil {
   448  				dlog.Errorf(ctx, "[WATCHER]: ERROR parsing annotations in configuration change: %v", err)
   449  			}
   450  		})
   451  
   452  		reconcileSecretsTimer.Time(func() {
   453  			err = ReconcileSecrets(ctx, sh)
   454  		})
   455  		if err != nil {
   456  			dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling Secrets: %v", err)
   457  			return false, err
   458  		}
   459  		reconcileConsulTimer.Time(func() {
   460  			err = ReconcileConsul(ctx, consulWatcher, sh.k8sSnapshot)
   461  		})
   462  		if err != nil {
   463  			dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling Consul resources: %v", err)
   464  			return false, err
   465  		}
   466  		reconcileAuthServicesTimer.Time(func() {
   467  			err = ReconcileAuthServices(ctx, sh, &deltas)
   468  		})
   469  		if err != nil {
   470  			dlog.Errorf(ctx, "[WATCHER]: ERROR reconciling AuthServices: %v", err)
   471  			return false, err
   472  		}
   473  
   474  		sh.endpointRoutingInfo.reconcileEndpointWatches(ctx, sh.k8sSnapshot)
   475  		// Check if the set of endpoints we are interested in has changed. If so we need to send
   476  		// endpoint info again even if endpoints have not changed.
   477  		if sh.endpointRoutingInfo.watchesChanged() {
   478  			dlog.Infof(ctx, "[WATCHER]: endpoint watches changed: %v", sh.endpointRoutingInfo.endpointWatches)
   479  			endpointsChanged = true
   480  		}
   481  
   482  		endpointsOnly := true
   483  		for _, delta := range deltas {
   484  			sh.unsentDeltas = append(sh.unsentDeltas, delta)
   485  
   486  			if delta.Kind == "Endpoints" {
   487  				key := fmt.Sprintf("%s:%s", delta.Namespace, delta.Name)
   488  				if sh.endpointRoutingInfo.endpointWatches[key] || sh.dispatcher.IsWatched(delta.Namespace, delta.Name) {
   489  					endpointsChanged = true
   490  				}
   491  			} else {
   492  				endpointsOnly = false
   493  			}
   494  
   495  			if sh.dispatcher.IsRegistered(delta.Kind) {
   496  				dispatcherChanged = true
   497  				if delta.DeltaType == kates.ObjectDelete {
   498  					sh.dispatcher.DeleteKey(delta.Kind, delta.Namespace, delta.Name)
   499  				}
   500  			}
   501  		}
   502  		if !endpointsOnly {
   503  			sh.snapshotChangeCount += 1
   504  		}
   505  
   506  		if endpointsChanged || dispatcherChanged {
   507  			endpoints = makeEndpoints(ctx, sh.k8sSnapshot, sh.consulSnapshot.Endpoints)
   508  			for _, gwc := range sh.k8sSnapshot.GatewayClasses {
   509  				if err := sh.dispatcher.Upsert(gwc); err != nil {
   510  					// TODO: Should this be more severe?
   511  					dlog.Error(ctx, err)
   512  				}
   513  			}
   514  			for _, gw := range sh.k8sSnapshot.Gateways {
   515  				if err := sh.dispatcher.Upsert(gw); err != nil {
   516  					// TODO: Should this be more severe?
   517  					dlog.Error(ctx, err)
   518  				}
   519  
   520  			}
   521  			for _, hr := range sh.k8sSnapshot.HTTPRoutes {
   522  				if err := sh.dispatcher.Upsert(hr); err != nil {
   523  					// TODO: Should this be more severe?
   524  					dlog.Error(ctx, err)
   525  				}
   526  			}
   527  			_, dispSnapshot = sh.dispatcher.GetSnapshot(ctx)
   528  		}
   529  		return true, nil
   530  	}()
   531  	if err != nil {
   532  		dlog.Errorf(ctx, "[WATCHER]: ERROR checking changes from a cluster config update: %v", err)
   533  		return changed, err
   534  	}
   535  
   536  	if endpointsChanged || dispatcherChanged {
   537  		fastpath := &ambex.FastpathSnapshot{
   538  			Endpoints: endpoints,
   539  			Snapshot:  dispSnapshot,
   540  		}
   541  		fastpathProcessor(ctx, fastpath)
   542  	}
   543  	return changed, nil
   544  }
   545  
   546  func (sh *SnapshotHolder) ConsulUpdate(ctx context.Context, consulWatcher *consulWatcher, fastpathProcessor FastpathProcessor) bool {
   547  	var endpoints *ambex.Endpoints
   548  	var dispSnapshot *ecp_v2_cache.Snapshot
   549  	func() {
   550  		sh.mutex.Lock()
   551  		defer sh.mutex.Unlock()
   552  		consulWatcher.update(sh.consulSnapshot)
   553  		endpoints = makeEndpoints(ctx, sh.k8sSnapshot, sh.consulSnapshot.Endpoints)
   554  		_, dispSnapshot = sh.dispatcher.GetSnapshot(ctx)
   555  	}()
   556  	fastpathProcessor(ctx, &ambex.FastpathSnapshot{
   557  		Endpoints: endpoints,
   558  		Snapshot:  dispSnapshot,
   559  	})
   560  	return true
   561  }
   562  
   563  func (sh *SnapshotHolder) IstioUpdate(ctx context.Context, istio *istioCertWatchManager,
   564  	icertUpdate IstioCertUpdate) (bool, error) {
   565  	dbg := debug.FromContext(ctx)
   566  
   567  	istioCertUpdateTimer := dbg.Timer("istioCertUpdate")
   568  	reconcileSecretsTimer := dbg.Timer("reconcileSecrets")
   569  
   570  	sh.mutex.Lock()
   571  	defer sh.mutex.Unlock()
   572  
   573  	istioCertUpdateTimer.Time(func() {
   574  		istio.Update(ctx, icertUpdate, sh.k8sSnapshot)
   575  	})
   576  
   577  	var err error
   578  	reconcileSecretsTimer.Time(func() {
   579  		err = ReconcileSecrets(ctx, sh)
   580  	})
   581  	if err != nil {
   582  		return false, err
   583  	}
   584  
   585  	sh.snapshotChangeCount += 1
   586  	return true, nil
   587  }
   588  
   589  func (sh *SnapshotHolder) Notify(
   590  	ctx context.Context,
   591  	encoded *atomic.Value,
   592  	consulWatcher *consulWatcher,
   593  	snapshotProcessor SnapshotProcessor,
   594  ) error {
   595  	dbg := debug.FromContext(ctx)
   596  
   597  	notifyWebhooksTimer := dbg.Timer("notifyWebhooks")
   598  
   599  	// If the change is solely endpoints we don't bother making a snapshot.
   600  	var snapshotJSON []byte
   601  	var bootstrapped bool
   602  	changed := true
   603  
   604  	err := func() error {
   605  		sh.mutex.Lock()
   606  		defer sh.mutex.Unlock()
   607  
   608  		if sh.snapshotChangeNotified == sh.snapshotChangeCount {
   609  			changed = false
   610  			return nil
   611  		}
   612  
   613  		sn := &snapshot.Snapshot{
   614  			Kubernetes:     sh.k8sSnapshot,
   615  			Consul:         sh.consulSnapshot,
   616  			Invalid:        sh.validator.getInvalid(),
   617  			Deltas:         sh.unsentDeltas,
   618  			AmbassadorMeta: sh.ambassadorMeta,
   619  		}
   620  
   621  		var err error
   622  		snapshotJSON, err = json.MarshalIndent(sn, "", "  ")
   623  		if err != nil {
   624  			return err
   625  		}
   626  
   627  		bootstrapped = consulWatcher.isBootstrapped()
   628  		if bootstrapped {
   629  			sh.unsentDeltas = nil
   630  			if sh.firstReconfig {
   631  				dlog.Debugf(ctx, "WATCHER: Bootstrapped! Computing initial configuration...")
   632  				sh.firstReconfig = false
   633  			}
   634  			sh.snapshotChangeNotified = sh.snapshotChangeCount
   635  		}
   636  		return nil
   637  	}()
   638  	if err != nil {
   639  		return err
   640  	}
   641  	if !changed {
   642  		return nil
   643  	}
   644  
   645  	if bootstrapped {
   646  		// ...then stash this snapshot and fire off webhooks.
   647  		encoded.Store(snapshotJSON)
   648  
   649  		// Finally, use the reconfigure webhooks to let the rest of Ambassador
   650  		// know about the new configuration.
   651  		var err error
   652  		notifyWebhooksTimer.Time(func() {
   653  			err = snapshotProcessor(ctx, SnapshotReady, snapshotJSON)
   654  		})
   655  		if err != nil {
   656  			return err
   657  		}
   658  	}
   659  	return snapshotProcessor(ctx, SnapshotIncomplete, snapshotJSON)
   660  }
   661  
   662  // The kates aka "real" version of our injected dependencies.
   663  type k8sSource struct {
   664  	client *kates.Client
   665  }
   666  
   667  func (k *k8sSource) Watch(ctx context.Context, queries ...kates.Query) (K8sWatcher, error) {
   668  	return k.client.Watch(ctx, queries...)
   669  }
   670  
   671  func newK8sSource(client *kates.Client) *k8sSource {
   672  	return &k8sSource{
   673  		client: client,
   674  	}
   675  }
   676  

View as plain text