1 package entrypoint
2
3 import (
4 "context"
5 "reflect"
6 "sync"
7
8 consulapi "github.com/hashicorp/consul/api"
9
10 amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
11 "github.com/datawire/ambassador/v2/pkg/consulwatch"
12 snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
13 )
14
15
16
17 type consulMapping struct {
18 Service string
19 Resolver string
20 }
21
22 func ReconcileConsul(ctx context.Context, consulWatcher *consulWatcher, s *snapshotTypes.KubernetesSnapshot) error {
23 var mappings []consulMapping
24 for _, list := range s.Annotations {
25 for _, a := range list {
26 switch m := a.(type) {
27 case *amb.Mapping:
28 if include(m.Spec.AmbassadorID) {
29 mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
30 }
31 case *amb.TCPMapping:
32 if include(m.Spec.AmbassadorID) {
33 mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
34 }
35 }
36 }
37 }
38
39 var resolvers []*amb.ConsulResolver
40 for _, cr := range s.ConsulResolvers {
41 if include(cr.Spec.AmbassadorID) {
42 resolvers = append(resolvers, cr)
43 }
44 }
45
46 for _, m := range s.Mappings {
47 if include(m.Spec.AmbassadorID) {
48 mappings = append(mappings, consulMapping{Service: m.Spec.Service, Resolver: m.Spec.Resolver})
49 }
50 }
51
52 for _, tm := range s.TCPMappings {
53 if include(tm.Spec.AmbassadorID) {
54 mappings = append(mappings, consulMapping{Service: tm.Spec.Service, Resolver: tm.Spec.Resolver})
55 }
56 }
57
58 return consulWatcher.reconcile(ctx, s.ConsulResolvers, mappings)
59 }
60
61 type consulWatcher struct {
62 watchFunc watchConsulFunc
63 resolvers map[string]*resolver
64 firstReconcileHasHappened bool
65
66
67
68 coalescedDirty chan struct{}
69
70
71 endpointsCh chan consulwatch.Endpoints
72
73
74 mutex sync.Mutex
75 endpoints map[string]consulwatch.Endpoints
76 keysForBootstrap []string
77 bootstrapped bool
78 }
79
80 func newConsulWatcher(watchFunc watchConsulFunc) *consulWatcher {
81 return &consulWatcher{
82 watchFunc: watchFunc,
83 resolvers: make(map[string]*resolver),
84 coalescedDirty: make(chan struct{}),
85 endpointsCh: make(chan consulwatch.Endpoints),
86 endpoints: make(map[string]consulwatch.Endpoints),
87 }
88 }
89
90 func (c *consulWatcher) run(ctx context.Context) error {
91 dirty := false
92 for {
93 if dirty {
94 select {
95 case c.coalescedDirty <- struct{}{}:
96 dirty = false
97 case ep := <-c.endpointsCh:
98 c.updateEndpoints(ep)
99 dirty = true
100 case <-ctx.Done():
101 return c.cleanup(ctx)
102 }
103 } else {
104 select {
105 case ep := <-c.endpointsCh:
106 c.updateEndpoints(ep)
107 dirty = true
108 case <-ctx.Done():
109 return c.cleanup(ctx)
110 }
111 }
112 }
113 }
114
115 func (c *consulWatcher) updateEndpoints(endpoints consulwatch.Endpoints) {
116 c.mutex.Lock()
117 defer c.mutex.Unlock()
118 c.endpoints[endpoints.Service] = endpoints
119 }
120
121 func (c *consulWatcher) changed() chan struct{} {
122 return c.coalescedDirty
123 }
124
125 func (c *consulWatcher) update(snap *snapshotTypes.ConsulSnapshot) {
126 c.mutex.Lock()
127 defer c.mutex.Unlock()
128 snap.Endpoints = make(map[string]consulwatch.Endpoints, len(c.endpoints))
129 for k, v := range c.endpoints {
130 snap.Endpoints[k] = v
131 }
132 }
133
134 func (c *consulWatcher) isBootstrapped() bool {
135 if !c.firstReconcileHasHappened {
136 return false
137 }
138 c.mutex.Lock()
139 defer c.mutex.Unlock()
140
141
142 if c.bootstrapped {
143 return true
144 }
145
146 for _, key := range c.keysForBootstrap {
147 if _, ok := c.endpoints[key]; !ok {
148 return false
149 }
150 }
151
152 c.bootstrapped = true
153
154 return true
155 }
156
157
158 func (c *consulWatcher) cleanup(ctx context.Context) error {
159
160
164
165 return c.reconcile(ctx, nil, nil)
166 }
167
168
169
170 func (c *consulWatcher) reconcile(ctx context.Context, resolvers []*amb.ConsulResolver, mappings []consulMapping) error {
171
172 resolversByName := make(map[string]*amb.ConsulResolver)
173 for _, cr := range resolvers {
174
175
176 resolversByName[cr.GetName()] = cr
177 }
178
179 mappingsByResolver := make(map[string][]consulMapping)
180 for _, m := range mappings {
181
182
183
184
185
186 rname := m.Resolver
187
188 if rname == "" {
189 continue
190 }
191
192 _, ok := resolversByName[rname]
193 if !ok {
194 continue
195 }
196 mappingsByResolver[rname] = append(mappingsByResolver[rname], m)
197 }
198
199
200 for name := range resolversByName {
201 _, ok := mappingsByResolver[name]
202 if !ok {
203 delete(resolversByName, name)
204 }
205 }
206
207
208
209
210 for name, cr := range resolversByName {
211 oldr, ok := c.resolvers[name]
212
213
214 if ok && reflect.DeepEqual(oldr.resolver.Spec, cr.Spec) {
215 continue
216 }
217
218 if ok {
219 oldr.deleted()
220 }
221 c.resolvers[name] = newResolver(cr)
222 }
223
224
225 for name, resolver := range c.resolvers {
226 _, ok := resolversByName[name]
227 if !ok {
228 resolver.deleted()
229 delete(c.resolvers, name)
230 }
231 }
232
233
234 for rname, mappings := range mappingsByResolver {
235 res := c.resolvers[rname]
236 if err := res.reconcile(ctx, c.watchFunc, mappings, c.endpointsCh); err != nil {
237 return err
238 }
239 }
240
241
242
243 if !c.firstReconcileHasHappened {
244 c.firstReconcileHasHappened = true
245 var keysForBootstrap []string
246 for _, mappings := range mappingsByResolver {
247 for _, m := range mappings {
248 keysForBootstrap = append(keysForBootstrap, m.Service)
249 }
250 }
251 c.mutex.Lock()
252 defer c.mutex.Unlock()
253 c.keysForBootstrap = keysForBootstrap
254 }
255 return nil
256 }
257
258 type resolver struct {
259 resolver *amb.ConsulResolver
260 watches map[string]Stopper
261 }
262
263 func newResolver(spec *amb.ConsulResolver) *resolver {
264 return &resolver{resolver: spec, watches: make(map[string]Stopper)}
265 }
266
267 func (r *resolver) deleted() {
268 for _, w := range r.watches {
269 w.Stop()
270 }
271 }
272
273 func (r *resolver) reconcile(ctx context.Context, watchFunc watchConsulFunc, mappings []consulMapping, endpoints chan consulwatch.Endpoints) error {
274 servicesByName := make(map[string]bool)
275 for _, m := range mappings {
276
277 svc := m.Service
278 servicesByName[svc] = true
279 w, ok := r.watches[svc]
280 if !ok {
281 var err error
282 w, err = watchFunc(ctx, r.resolver, svc, endpoints)
283 if err != nil {
284 return err
285 }
286 r.watches[svc] = w
287 }
288 }
289
290 for name, w := range r.watches {
291 _, ok := servicesByName[name]
292 if !ok {
293 w.Stop()
294 delete(r.watches, name)
295 }
296 }
297 return nil
298 }
299
300 type watchConsulFunc func(ctx context.Context, resolver *amb.ConsulResolver, svc string, endpoints chan consulwatch.Endpoints) (Stopper, error)
301
302 type Stopper interface {
303 Stop()
304 }
305
306 func watchConsul(
307 ctx context.Context,
308 resolver *amb.ConsulResolver,
309 svc string,
310 endpointsCh chan consulwatch.Endpoints,
311 ) (Stopper, error) {
312
313 consulConfig := consulapi.DefaultConfig()
314 consulConfig.Address = resolver.Spec.Address
315 consul, err := consulapi.NewClient(consulConfig)
316 if err != nil {
317 return nil, err
318 }
319
320
321 w, err := consulwatch.New(consul, resolver.Spec.Datacenter, svc, true)
322 if err != nil {
323 return nil, err
324 }
325
326 w.Watch(func(endpoints consulwatch.Endpoints, e error) {
327 if endpoints.Id == "" {
328
329
330 endpoints.Id = resolver.Spec.Datacenter
331 }
332
333 endpointsCh <- endpoints
334 })
335
336 go func() {
337 if err := w.Start(ctx); err != nil {
338 panic(err)
339 }
340 }()
341
342 return w, nil
343 }
344
View as plain text