...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/consul.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"reflect"
     6  	"sync"
     7  
     8  	consulapi "github.com/hashicorp/consul/api"
     9  
    10  	amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
    11  	"github.com/datawire/ambassador/v2/pkg/consulwatch"
    12  	snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    13  )
    14  
    15  // consulMapping contains the necessary subset of Ambassador Mapping and TCPMapping
    16  // definitions needed for consul reconcilation and watching to happen.
    17  type consulMapping struct {
    18  	Service  string
    19  	Resolver string
    20  }
    21  
    22  func ReconcileConsul(ctx context.Context, consulWatcher *consulWatcher, s *snapshotTypes.KubernetesSnapshot) error {
    23  	var mappings []consulMapping
    24  	for _, list := range s.Annotations {
    25  		for _, a := range list {
    26  			switch m := a.(type) {
    27  			case *amb.Mapping:
    28  				if include(m.Spec.AmbassadorID) {
    29  					mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
    30  				}
    31  			case *amb.TCPMapping:
    32  				if include(m.Spec.AmbassadorID) {
    33  					mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
    34  				}
    35  			}
    36  		}
    37  	}
    38  
    39  	var resolvers []*amb.ConsulResolver
    40  	for _, cr := range s.ConsulResolvers {
    41  		if include(cr.Spec.AmbassadorID) {
    42  			resolvers = append(resolvers, cr)
    43  		}
    44  	}
    45  
    46  	for _, m := range s.Mappings {
    47  		if include(m.Spec.AmbassadorID) {
    48  			mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
    49  		}
    50  	}
    51  
    52  	for _, tm := range s.TCPMappings {
    53  		if include(tm.Spec.AmbassadorID) {
    54  			mappings = append(mappings, consulMapping{Service: tm.Spec.Service, Resolver: tm.Spec.Resolver})
    55  		}
    56  	}
    57  
    58  	return consulWatcher.reconcile(ctx, s.ConsulResolvers, mappings)
    59  }
    60  
    61  type consulWatcher struct {
    62  	watchFunc                 watchConsulFunc
    63  	resolvers                 map[string]*resolver
    64  	firstReconcileHasHappened bool
    65  
    66  	// The changed method returns this channel. We write down this channel to signal that a new
    67  	// snapshot is available since the last time the update method was invoke.
    68  	coalescedDirty chan struct{}
    69  	// Individual watches write to this when new endpoint data is available. It is always being read
    70  	// by the implementation, so writing will never block.
    71  	endpointsCh chan consulwatch.Endpoints
    72  
    73  	// The mutex protects access to endpoints, keysForBootstrap, and bootstrapped.
    74  	mutex            sync.Mutex
    75  	endpoints        map[string]consulwatch.Endpoints
    76  	keysForBootstrap []string
    77  	bootstrapped     bool
    78  }
    79  
    80  func newConsulWatcher(watchFunc watchConsulFunc) *consulWatcher {
    81  	return &consulWatcher{
    82  		watchFunc:      watchFunc,
    83  		resolvers:      make(map[string]*resolver),
    84  		coalescedDirty: make(chan struct{}),
    85  		endpointsCh:    make(chan consulwatch.Endpoints),
    86  		endpoints:      make(map[string]consulwatch.Endpoints),
    87  	}
    88  }
    89  
    90  func (c *consulWatcher) run(ctx context.Context) error {
    91  	dirty := false
    92  	for {
    93  		if dirty {
    94  			select {
    95  			case c.coalescedDirty <- struct{}{}:
    96  				dirty = false
    97  			case ep := <-c.endpointsCh:
    98  				c.updateEndpoints(ep)
    99  				dirty = true
   100  			case <-ctx.Done():
   101  				return c.cleanup(ctx)
   102  			}
   103  		} else {
   104  			select {
   105  			case ep := <-c.endpointsCh:
   106  				c.updateEndpoints(ep)
   107  				dirty = true
   108  			case <-ctx.Done():
   109  				return c.cleanup(ctx)
   110  			}
   111  		}
   112  	}
   113  }
   114  
   115  func (c *consulWatcher) updateEndpoints(endpoints consulwatch.Endpoints) {
   116  	c.mutex.Lock()
   117  	defer c.mutex.Unlock()
   118  	c.endpoints[endpoints.Service] = endpoints
   119  }
   120  
   121  func (c *consulWatcher) changed() chan struct{} {
   122  	return c.coalescedDirty
   123  }
   124  
   125  func (c *consulWatcher) update(snap *snapshotTypes.ConsulSnapshot) {
   126  	c.mutex.Lock()
   127  	defer c.mutex.Unlock()
   128  	snap.Endpoints = make(map[string]consulwatch.Endpoints, len(c.endpoints))
   129  	for k, v := range c.endpoints {
   130  		snap.Endpoints[k] = v
   131  	}
   132  }
   133  
   134  func (c *consulWatcher) isBootstrapped() bool {
   135  	if !c.firstReconcileHasHappened {
   136  		return false
   137  	}
   138  	c.mutex.Lock()
   139  	defer c.mutex.Unlock()
   140  
   141  	// we want bootstrappedness to be idempotent
   142  	if c.bootstrapped {
   143  		return true
   144  	}
   145  
   146  	for _, key := range c.keysForBootstrap {
   147  		if _, ok := c.endpoints[key]; !ok {
   148  			return false
   149  		}
   150  	}
   151  
   152  	c.bootstrapped = true
   153  
   154  	return true
   155  }
   156  
   157  // Stop all service watches.
   158  func (c *consulWatcher) cleanup(ctx context.Context) error {
   159  	// XXX: do we care about a clean shutdown
   160  	/*go func() {
   161  		<-ctx.Done()
   162  		w.Stop()
   163  	}()*/
   164  
   165  	return c.reconcile(ctx, nil, nil)
   166  }
   167  
   168  // Start and stop consul service watches as needed in order to match the supplied set of resolvers
   169  // and mappings.
   170  func (c *consulWatcher) reconcile(ctx context.Context, resolvers []*amb.ConsulResolver, mappings []consulMapping) error {
   171  	// ==First we compute resolvers and their related mappings without actualy changing anything.==
   172  	resolversByName := make(map[string]*amb.ConsulResolver)
   173  	for _, cr := range resolvers {
   174  		// Ambassador can find resolvers in any namespace, but they're not partitioned
   175  		// by namespace once located, so just save using the name.
   176  		resolversByName[cr.GetName()] = cr
   177  	}
   178  
   179  	mappingsByResolver := make(map[string][]consulMapping)
   180  	for _, m := range mappings {
   181  		// Everything here is keyed off m.Spec.Resolver -- again, it's fine to use a resolver
   182  		// from any namespace, as long as it was loaded.
   183  		//
   184  		// (This implies that if you typo a resolver name, things won't work.)
   185  
   186  		rname := m.Resolver
   187  
   188  		if rname == "" {
   189  			continue
   190  		}
   191  
   192  		_, ok := resolversByName[rname]
   193  		if !ok {
   194  			continue
   195  		}
   196  		mappingsByResolver[rname] = append(mappingsByResolver[rname], m)
   197  	}
   198  
   199  	// Prune any resolvers that don't actually have mappings
   200  	for name := range resolversByName {
   201  		_, ok := mappingsByResolver[name]
   202  		if !ok {
   203  			delete(resolversByName, name)
   204  		}
   205  	}
   206  
   207  	// ==Now we implement the changes implied by resolversByName and mappingsByResolver.==
   208  
   209  	// First we (re)create any new or modified resolvers.
   210  	for name, cr := range resolversByName {
   211  		oldr, ok := c.resolvers[name]
   212  		// The resolver hasn't change so continue. Make sure we only compare the spec, since we
   213  		// don't want to delete/recreate resolvers on things like label changes.
   214  		if ok && reflect.DeepEqual(oldr.resolver.Spec, cr.Spec) {
   215  			continue
   216  		}
   217  		// It exists, but is different, so we delete/recreate i.
   218  		if ok {
   219  			oldr.deleted()
   220  		}
   221  		c.resolvers[name] = newResolver(cr)
   222  	}
   223  
   224  	// Now we delete unneeded resolvers.
   225  	for name, resolver := range c.resolvers {
   226  		_, ok := resolversByName[name]
   227  		if !ok {
   228  			resolver.deleted()
   229  			delete(c.resolvers, name)
   230  		}
   231  	}
   232  
   233  	// Finally we reconcile each mapping.
   234  	for rname, mappings := range mappingsByResolver {
   235  		res := c.resolvers[rname]
   236  		if err := res.reconcile(ctx, c.watchFunc, mappings, c.endpointsCh); err != nil {
   237  			return err
   238  		}
   239  	}
   240  
   241  	// If this is the first time we are reconciling, we need to compute conditions for being
   242  	// bootstrapped.
   243  	if !c.firstReconcileHasHappened {
   244  		c.firstReconcileHasHappened = true
   245  		var keysForBootstrap []string
   246  		for _, mappings := range mappingsByResolver {
   247  			for _, m := range mappings {
   248  				keysForBootstrap = append(keysForBootstrap, m.Service)
   249  			}
   250  		}
   251  		c.mutex.Lock()
   252  		defer c.mutex.Unlock()
   253  		c.keysForBootstrap = keysForBootstrap
   254  	}
   255  	return nil
   256  }
   257  
   258  type resolver struct {
   259  	resolver *amb.ConsulResolver
   260  	watches  map[string]Stopper
   261  }
   262  
   263  func newResolver(spec *amb.ConsulResolver) *resolver {
   264  	return &resolver{resolver: spec, watches: make(map[string]Stopper)}
   265  }
   266  
   267  func (r *resolver) deleted() {
   268  	for _, w := range r.watches {
   269  		w.Stop()
   270  	}
   271  }
   272  
   273  func (r *resolver) reconcile(ctx context.Context, watchFunc watchConsulFunc, mappings []consulMapping, endpoints chan consulwatch.Endpoints) error {
   274  	servicesByName := make(map[string]bool)
   275  	for _, m := range mappings {
   276  		// XXX: how to parse this?
   277  		svc := m.Service
   278  		servicesByName[svc] = true
   279  		w, ok := r.watches[svc]
   280  		if !ok {
   281  			var err error
   282  			w, err = watchFunc(ctx, r.resolver, svc, endpoints)
   283  			if err != nil {
   284  				return err
   285  			}
   286  			r.watches[svc] = w
   287  		}
   288  	}
   289  
   290  	for name, w := range r.watches {
   291  		_, ok := servicesByName[name]
   292  		if !ok {
   293  			w.Stop()
   294  			delete(r.watches, name)
   295  		}
   296  	}
   297  	return nil
   298  }
   299  
   300  type watchConsulFunc func(ctx context.Context, resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) (Stopper, error)
   301  
   302  type Stopper interface {
   303  	Stop()
   304  }
   305  
   306  func watchConsul(
   307  	ctx context.Context,
   308  	resolver *amb.ConsulResolver,
   309  	svc string,
   310  	endpointsCh chan consulwatch.Endpoints,
   311  ) (Stopper, error) {
   312  	// XXX: should this part be shared?
   313  	consulConfig := consulapi.DefaultConfig()
   314  	consulConfig.Address = resolver.Spec.Address
   315  	consul, err := consulapi.NewClient(consulConfig)
   316  	if err != nil {
   317  		return nil, err
   318  	}
   319  
   320  	// this part is per service
   321  	w, err := consulwatch.New(consul, resolver.Spec.Datacenter, svc, true)
   322  	if err != nil {
   323  		return nil, err
   324  	}
   325  
   326  	w.Watch(func(endpoints consulwatch.Endpoints, e error) {
   327  		if endpoints.Id == "" {
   328  			// For Ambassador, overwrite the ID with the resolver's datacenter -- the
   329  			// Consul watcher doesn't actually hand back the DC, and we need it.
   330  			endpoints.Id = resolver.Spec.Datacenter
   331  		}
   332  
   333  		endpointsCh <- endpoints
   334  	})
   335  
   336  	go func() {
   337  		if err := w.Start(ctx); err != nil {
   338  			panic(err) // TODO: Find a better way of reporting errors from goroutines.
   339  		}
   340  	}()
   341  
   342  	return w, nil
   343  }
   344  

View as plain text