1 package entrypoint
2
3 import (
4 "context"
5 "reflect"
6 "sync"
7
8 consulapi "github.com/hashicorp/consul/api"
9
10 amb "github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
11 "github.com/emissary-ingress/emissary/v3/pkg/consulwatch"
12 snapshotTypes "github.com/emissary-ingress/emissary/v3/pkg/snapshot/v1"
13 )
14
15
16
17 type consulMapping struct {
18 Service string
19 Resolver string
20 }
21
22 func ReconcileConsul(ctx context.Context, consulWatcher *consulWatcher, s *snapshotTypes.KubernetesSnapshot) error {
23 envAmbID := GetAmbassadorID()
24
25 var mappings []consulMapping
26 for _, list := range s.Annotations {
27 for _, a := range list {
28 switch m := a.(type) {
29 case *amb.Mapping:
30 if m.Spec.AmbassadorID.Matches(envAmbID) {
31 mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
32 }
33 case *amb.TCPMapping:
34 if m.Spec.AmbassadorID.Matches(envAmbID) {
35 mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
36 }
37 }
38 }
39 }
40
41 var resolvers []*amb.ConsulResolver
42 for _, cr := range s.ConsulResolvers {
43 if cr.Spec.AmbassadorID.Matches(envAmbID) {
44 resolvers = append(resolvers, cr)
45 }
46 }
47
48 for _, m := range s.Mappings {
49 if m.Spec.AmbassadorID.Matches(envAmbID) {
50 mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
51 }
52 }
53
54 for _, tm := range s.TCPMappings {
55 if tm.Spec.AmbassadorID.Matches(envAmbID) {
56 mappings = append(mappings, consulMapping{Service: tm.Spec.Service, Resolver: tm.Spec.Resolver})
57 }
58 }
59
60 return consulWatcher.reconcile(ctx, s.ConsulResolvers, mappings)
61 }
62
63 type consulWatcher struct {
64 watchFunc watchConsulFunc
65 resolvers map[string]*resolver
66 firstReconcileHasHappened bool
67
68
69
70 coalescedDirty chan struct{}
71
72
73 endpointsCh chan consulwatch.Endpoints
74
75
76 mutex sync.Mutex
77 endpoints map[string]consulwatch.Endpoints
78 keysForBootstrap []string
79 bootstrapped bool
80 }
81
82 func newConsulWatcher(watchFunc watchConsulFunc) *consulWatcher {
83 return &consulWatcher{
84 watchFunc: watchFunc,
85 resolvers: make(map[string]*resolver),
86 coalescedDirty: make(chan struct{}),
87 endpointsCh: make(chan consulwatch.Endpoints),
88 endpoints: make(map[string]consulwatch.Endpoints),
89 }
90 }
91
92 func (c *consulWatcher) run(ctx context.Context) error {
93 dirty := false
94 for {
95 if dirty {
96 select {
97 case c.coalescedDirty <- struct{}{}:
98 dirty = false
99 case ep := <-c.endpointsCh:
100 c.updateEndpoints(ep)
101 dirty = true
102 case <-ctx.Done():
103 return c.cleanup(ctx)
104 }
105 } else {
106 select {
107 case ep := <-c.endpointsCh:
108 c.updateEndpoints(ep)
109 dirty = true
110 case <-ctx.Done():
111 return c.cleanup(ctx)
112 }
113 }
114 }
115 }
116
117 func (c *consulWatcher) updateEndpoints(endpoints consulwatch.Endpoints) {
118 c.mutex.Lock()
119 defer c.mutex.Unlock()
120 c.endpoints[endpoints.Service] = endpoints
121 }
122
123 func (c *consulWatcher) changed() chan struct{} {
124 return c.coalescedDirty
125 }
126
127 func (c *consulWatcher) update(snap *snapshotTypes.ConsulSnapshot) {
128 c.mutex.Lock()
129 defer c.mutex.Unlock()
130 snap.Endpoints = make(map[string]consulwatch.Endpoints, len(c.endpoints))
131 for k, v := range c.endpoints {
132 snap.Endpoints[k] = v
133 }
134 }
135
136 func (c *consulWatcher) isBootstrapped() bool {
137 if !c.firstReconcileHasHappened {
138 return false
139 }
140 c.mutex.Lock()
141 defer c.mutex.Unlock()
142
143
144 if c.bootstrapped {
145 return true
146 }
147
148 for _, key := range c.keysForBootstrap {
149 if _, ok := c.endpoints[key]; !ok {
150 return false
151 }
152 }
153
154 c.bootstrapped = true
155
156 return true
157 }
158
159
160 func (c *consulWatcher) cleanup(ctx context.Context) error {
161
162
166
167 return c.reconcile(ctx, nil, nil)
168 }
169
170
171
172 func (c *consulWatcher) reconcile(ctx context.Context, resolvers []*amb.ConsulResolver, mappings []consulMapping) error {
173
174 resolversByName := make(map[string]*amb.ConsulResolver)
175 for _, cr := range resolvers {
176
177
178 resolversByName[cr.GetName()] = cr
179 }
180
181 mappingsByResolver := make(map[string][]consulMapping)
182 for _, m := range mappings {
183
184
185
186
187
188 rname := m.Resolver
189
190 if rname == "" {
191 continue
192 }
193
194 _, ok := resolversByName[rname]
195 if !ok {
196 continue
197 }
198 mappingsByResolver[rname] = append(mappingsByResolver[rname], m)
199 }
200
201
202 for name := range resolversByName {
203 _, ok := mappingsByResolver[name]
204 if !ok {
205 delete(resolversByName, name)
206 }
207 }
208
209
210
211
212 for name, cr := range resolversByName {
213 oldr, ok := c.resolvers[name]
214
215
216 if ok && reflect.DeepEqual(oldr.resolver.Spec, cr.Spec) {
217 continue
218 }
219
220 if ok {
221 oldr.deleted()
222 }
223 c.resolvers[name] = newResolver(cr)
224 }
225
226
227 for name, resolver := range c.resolvers {
228 _, ok := resolversByName[name]
229 if !ok {
230 resolver.deleted()
231 delete(c.resolvers, name)
232 }
233 }
234
235
236 for rname, mappings := range mappingsByResolver {
237 res := c.resolvers[rname]
238 if err := res.reconcile(ctx, c.watchFunc, mappings, c.endpointsCh); err != nil {
239 return err
240 }
241 }
242
243
244
245 if !c.firstReconcileHasHappened {
246 c.firstReconcileHasHappened = true
247 var keysForBootstrap []string
248 for _, mappings := range mappingsByResolver {
249 for _, m := range mappings {
250 keysForBootstrap = append(keysForBootstrap, m.Service)
251 }
252 }
253 c.mutex.Lock()
254 defer c.mutex.Unlock()
255 c.keysForBootstrap = keysForBootstrap
256 }
257 return nil
258 }
259
260 type resolver struct {
261 resolver *amb.ConsulResolver
262 watches map[string]Stopper
263 }
264
265 func newResolver(spec *amb.ConsulResolver) *resolver {
266 return &resolver{resolver: spec, watches: make(map[string]Stopper)}
267 }
268
269 func (r *resolver) deleted() {
270 for _, w := range r.watches {
271 w.Stop()
272 }
273 }
274
275 func (r *resolver) reconcile(ctx context.Context, watchFunc watchConsulFunc, mappings []consulMapping, endpoints chan consulwatch.Endpoints) error {
276 servicesByName := make(map[string]bool)
277 for _, m := range mappings {
278
279 svc := m.Service
280 servicesByName[svc] = true
281 w, ok := r.watches[svc]
282 if !ok {
283 var err error
284 w, err = watchFunc(ctx, r.resolver, svc, endpoints)
285 if err != nil {
286 return err
287 }
288 r.watches[svc] = w
289 }
290 }
291
292 for name, w := range r.watches {
293 _, ok := servicesByName[name]
294 if !ok {
295 w.Stop()
296 delete(r.watches, name)
297 }
298 }
299 return nil
300 }
301
302 type watchConsulFunc func(ctx context.Context, resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) (Stopper, error)
303
304 type Stopper interface {
305 Stop()
306 }
307
308 func watchConsul(
309 ctx context.Context,
310 resolver *amb.ConsulResolver,
311 svc string,
312 endpointsCh chan consulwatch.Endpoints,
313 ) (Stopper, error) {
314
315 consulConfig := consulapi.DefaultConfig()
316 consulConfig.Address = resolver.Spec.Address
317 consul, err := consulapi.NewClient(consulConfig)
318 if err != nil {
319 return nil, err
320 }
321
322
323 w, err := consulwatch.New(consul, resolver.Spec.Datacenter, svc, true)
324 if err != nil {
325 return nil, err
326 }
327
328 w.Watch(func(endpoints consulwatch.Endpoints, e error) {
329 if endpoints.Id == "" {
330
331
332 endpoints.Id = resolver.Spec.Datacenter
333 }
334
335 endpointsCh <- endpoints
336 })
337
338 go func() {
339 if err := w.Start(ctx); err != nil {
340 panic(err)
341 }
342 }()
343
344 return w, nil
345 }
346
View as plain text