...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/consul.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"reflect"
     6  	"sync"
     7  
     8  	consulapi "github.com/hashicorp/consul/api"
     9  
    10  	amb "github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
    11  	"github.com/emissary-ingress/emissary/v3/pkg/consulwatch"
    12  	snapshotTypes "github.com/emissary-ingress/emissary/v3/pkg/snapshot/v1"
    13  )
    14  
    15  // consulMapping contains the necessary subset of Ambassador Mapping and TCPMapping
    16  // definitions needed for consul reconcilation and watching to happen.
    17  type consulMapping struct {
    18  	Service  string
    19  	Resolver string
    20  }
    21  
    22  func ReconcileConsul(ctx context.Context, consulWatcher *consulWatcher, s *snapshotTypes.KubernetesSnapshot) error {
    23  	envAmbID := GetAmbassadorID()
    24  
    25  	var mappings []consulMapping
    26  	for _, list := range s.Annotations {
    27  		for _, a := range list {
    28  			switch m := a.(type) {
    29  			case *amb.Mapping:
    30  				if m.Spec.AmbassadorID.Matches(envAmbID) {
    31  					mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
    32  				}
    33  			case *amb.TCPMapping:
    34  				if m.Spec.AmbassadorID.Matches(envAmbID) {
    35  					mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
    36  				}
    37  			}
    38  		}
    39  	}
    40  
    41  	var resolvers []*amb.ConsulResolver
    42  	for _, cr := range s.ConsulResolvers {
    43  		if cr.Spec.AmbassadorID.Matches(envAmbID) {
    44  			resolvers = append(resolvers, cr)
    45  		}
    46  	}
    47  
    48  	for _, m := range s.Mappings {
    49  		if m.Spec.AmbassadorID.Matches(envAmbID) {
    50  			mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
    51  		}
    52  	}
    53  
    54  	for _, tm := range s.TCPMappings {
    55  		if tm.Spec.AmbassadorID.Matches(envAmbID) {
    56  			mappings = append(mappings, consulMapping{Service: tm.Spec.Service, Resolver: tm.Spec.Resolver})
    57  		}
    58  	}
    59  
    60  	return consulWatcher.reconcile(ctx, s.ConsulResolvers, mappings)
    61  }
    62  
    63  type consulWatcher struct {
    64  	watchFunc                 watchConsulFunc
    65  	resolvers                 map[string]*resolver
    66  	firstReconcileHasHappened bool
    67  
    68  	// The changed method returns this channel. We write down this channel to signal that a new
    69  	// snapshot is available since the last time the update method was invoke.
    70  	coalescedDirty chan struct{}
    71  	// Individual watches write to this when new endpoint data is available. It is always being read
    72  	// by the implementation, so writing will never block.
    73  	endpointsCh chan consulwatch.Endpoints
    74  
    75  	// The mutex protects access to endpoints, keysForBootstrap, and bootstrapped.
    76  	mutex            sync.Mutex
    77  	endpoints        map[string]consulwatch.Endpoints
    78  	keysForBootstrap []string
    79  	bootstrapped     bool
    80  }
    81  
    82  func newConsulWatcher(watchFunc watchConsulFunc) *consulWatcher {
    83  	return &consulWatcher{
    84  		watchFunc:      watchFunc,
    85  		resolvers:      make(map[string]*resolver),
    86  		coalescedDirty: make(chan struct{}),
    87  		endpointsCh:    make(chan consulwatch.Endpoints),
    88  		endpoints:      make(map[string]consulwatch.Endpoints),
    89  	}
    90  }
    91  
    92  func (c *consulWatcher) run(ctx context.Context) error {
    93  	dirty := false
    94  	for {
    95  		if dirty {
    96  			select {
    97  			case c.coalescedDirty <- struct{}{}:
    98  				dirty = false
    99  			case ep := <-c.endpointsCh:
   100  				c.updateEndpoints(ep)
   101  				dirty = true
   102  			case <-ctx.Done():
   103  				return c.cleanup(ctx)
   104  			}
   105  		} else {
   106  			select {
   107  			case ep := <-c.endpointsCh:
   108  				c.updateEndpoints(ep)
   109  				dirty = true
   110  			case <-ctx.Done():
   111  				return c.cleanup(ctx)
   112  			}
   113  		}
   114  	}
   115  }
   116  
   117  func (c *consulWatcher) updateEndpoints(endpoints consulwatch.Endpoints) {
   118  	c.mutex.Lock()
   119  	defer c.mutex.Unlock()
   120  	c.endpoints[endpoints.Service] = endpoints
   121  }
   122  
   123  func (c *consulWatcher) changed() chan struct{} {
   124  	return c.coalescedDirty
   125  }
   126  
   127  func (c *consulWatcher) update(snap *snapshotTypes.ConsulSnapshot) {
   128  	c.mutex.Lock()
   129  	defer c.mutex.Unlock()
   130  	snap.Endpoints = make(map[string]consulwatch.Endpoints, len(c.endpoints))
   131  	for k, v := range c.endpoints {
   132  		snap.Endpoints[k] = v
   133  	}
   134  }
   135  
   136  func (c *consulWatcher) isBootstrapped() bool {
   137  	if !c.firstReconcileHasHappened {
   138  		return false
   139  	}
   140  	c.mutex.Lock()
   141  	defer c.mutex.Unlock()
   142  
   143  	// we want bootstrappedness to be idempotent
   144  	if c.bootstrapped {
   145  		return true
   146  	}
   147  
   148  	for _, key := range c.keysForBootstrap {
   149  		if _, ok := c.endpoints[key]; !ok {
   150  			return false
   151  		}
   152  	}
   153  
   154  	c.bootstrapped = true
   155  
   156  	return true
   157  }
   158  
   159  // Stop all service watches.
   160  func (c *consulWatcher) cleanup(ctx context.Context) error {
   161  	// XXX: do we care about a clean shutdown
   162  	/*go func() {
   163  		<-ctx.Done()
   164  		w.Stop()
   165  	}()*/
   166  
   167  	return c.reconcile(ctx, nil, nil)
   168  }
   169  
   170  // Start and stop consul service watches as needed in order to match the supplied set of resolvers
   171  // and mappings.
   172  func (c *consulWatcher) reconcile(ctx context.Context, resolvers []*amb.ConsulResolver, mappings []consulMapping) error {
   173  	// ==First we compute resolvers and their related mappings without actualy changing anything.==
   174  	resolversByName := make(map[string]*amb.ConsulResolver)
   175  	for _, cr := range resolvers {
   176  		// Ambassador can find resolvers in any namespace, but they're not partitioned
   177  		// by namespace once located, so just save using the name.
   178  		resolversByName[cr.GetName()] = cr
   179  	}
   180  
   181  	mappingsByResolver := make(map[string][]consulMapping)
   182  	for _, m := range mappings {
   183  		// Everything here is keyed off m.Spec.Resolver -- again, it's fine to use a resolver
   184  		// from any namespace, as long as it was loaded.
   185  		//
   186  		// (This implies that if you typo a resolver name, things won't work.)
   187  
   188  		rname := m.Resolver
   189  
   190  		if rname == "" {
   191  			continue
   192  		}
   193  
   194  		_, ok := resolversByName[rname]
   195  		if !ok {
   196  			continue
   197  		}
   198  		mappingsByResolver[rname] = append(mappingsByResolver[rname], m)
   199  	}
   200  
   201  	// Prune any resolvers that don't actually have mappings
   202  	for name := range resolversByName {
   203  		_, ok := mappingsByResolver[name]
   204  		if !ok {
   205  			delete(resolversByName, name)
   206  		}
   207  	}
   208  
   209  	// ==Now we implement the changes implied by resolversByName and mappingsByResolver.==
   210  
   211  	// First we (re)create any new or modified resolvers.
   212  	for name, cr := range resolversByName {
   213  		oldr, ok := c.resolvers[name]
   214  		// The resolver hasn't change so continue. Make sure we only compare the spec, since we
   215  		// don't want to delete/recreate resolvers on things like label changes.
   216  		if ok && reflect.DeepEqual(oldr.resolver.Spec, cr.Spec) {
   217  			continue
   218  		}
   219  		// It exists, but is different, so we delete/recreate i.
   220  		if ok {
   221  			oldr.deleted()
   222  		}
   223  		c.resolvers[name] = newResolver(cr)
   224  	}
   225  
   226  	// Now we delete unneeded resolvers.
   227  	for name, resolver := range c.resolvers {
   228  		_, ok := resolversByName[name]
   229  		if !ok {
   230  			resolver.deleted()
   231  			delete(c.resolvers, name)
   232  		}
   233  	}
   234  
   235  	// Finally we reconcile each mapping.
   236  	for rname, mappings := range mappingsByResolver {
   237  		res := c.resolvers[rname]
   238  		if err := res.reconcile(ctx, c.watchFunc, mappings, c.endpointsCh); err != nil {
   239  			return err
   240  		}
   241  	}
   242  
   243  	// If this is the first time we are reconciling, we need to compute conditions for being
   244  	// bootstrapped.
   245  	if !c.firstReconcileHasHappened {
   246  		c.firstReconcileHasHappened = true
   247  		var keysForBootstrap []string
   248  		for _, mappings := range mappingsByResolver {
   249  			for _, m := range mappings {
   250  				keysForBootstrap = append(keysForBootstrap, m.Service)
   251  			}
   252  		}
   253  		c.mutex.Lock()
   254  		defer c.mutex.Unlock()
   255  		c.keysForBootstrap = keysForBootstrap
   256  	}
   257  	return nil
   258  }
   259  
   260  type resolver struct {
   261  	resolver *amb.ConsulResolver
   262  	watches  map[string]Stopper
   263  }
   264  
   265  func newResolver(spec *amb.ConsulResolver) *resolver {
   266  	return &resolver{resolver: spec, watches: make(map[string]Stopper)}
   267  }
   268  
   269  func (r *resolver) deleted() {
   270  	for _, w := range r.watches {
   271  		w.Stop()
   272  	}
   273  }
   274  
   275  func (r *resolver) reconcile(ctx context.Context, watchFunc watchConsulFunc, mappings []consulMapping, endpoints chan consulwatch.Endpoints) error {
   276  	servicesByName := make(map[string]bool)
   277  	for _, m := range mappings {
   278  		// XXX: how to parse this?
   279  		svc := m.Service
   280  		servicesByName[svc] = true
   281  		w, ok := r.watches[svc]
   282  		if !ok {
   283  			var err error
   284  			w, err = watchFunc(ctx, r.resolver, svc, endpoints)
   285  			if err != nil {
   286  				return err
   287  			}
   288  			r.watches[svc] = w
   289  		}
   290  	}
   291  
   292  	for name, w := range r.watches {
   293  		_, ok := servicesByName[name]
   294  		if !ok {
   295  			w.Stop()
   296  			delete(r.watches, name)
   297  		}
   298  	}
   299  	return nil
   300  }
   301  
   302  type watchConsulFunc func(ctx context.Context, resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) (Stopper, error)
   303  
   304  type Stopper interface {
   305  	Stop()
   306  }
   307  
   308  func watchConsul(
   309  	ctx context.Context,
   310  	resolver *amb.ConsulResolver,
   311  	svc string,
   312  	endpointsCh chan consulwatch.Endpoints,
   313  ) (Stopper, error) {
   314  	// XXX: should this part be shared?
   315  	consulConfig := consulapi.DefaultConfig()
   316  	consulConfig.Address = resolver.Spec.Address
   317  	consul, err := consulapi.NewClient(consulConfig)
   318  	if err != nil {
   319  		return nil, err
   320  	}
   321  
   322  	// this part is per service
   323  	w, err := consulwatch.New(consul, resolver.Spec.Datacenter, svc, true)
   324  	if err != nil {
   325  		return nil, err
   326  	}
   327  
   328  	w.Watch(func(endpoints consulwatch.Endpoints, e error) {
   329  		if endpoints.Id == "" {
   330  			// For Ambassador, overwrite the ID with the resolver's datacenter -- the
   331  			// Consul watcher doesn't actually hand back the DC, and we need it.
   332  			endpoints.Id = resolver.Spec.Datacenter
   333  		}
   334  
   335  		endpointsCh <- endpoints
   336  	})
   337  
   338  	go func() {
   339  		if err := w.Start(ctx); err != nil {
   340  			panic(err) // TODO: Find a better way of reporting errors from goroutines.
   341  		}
   342  	}()
   343  
   344  	return w, nil
   345  }
   346  

View as plain text