1 package gateway
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "github.com/pkg/errors"
9 "google.golang.org/protobuf/types/known/durationpb"
10
11 apiv2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
12 apiv2_core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
13 apiv2_endpoint "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/endpoint"
14 apiv2_route "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/route"
15
16 ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
17 ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
18 ecp_v2_resource "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/resource/v2"
19 ecp_wellknown "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/wellknown"
20
21 "github.com/datawire/ambassador/v2/pkg/kates"
22 "github.com/datawire/dlib/dlog"
23 )
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 type Dispatcher struct {
53
54 transforms map[string]func(kates.Object) (*CompiledConfig, error)
55 configs map[string]*CompiledConfig
56
57 version string
58 changeCount int
59 snapshot *ecp_v2_cache.Snapshot
60 endpointWatches map[string]bool
61 }
62
63 type ResourceRef struct {
64 Kind string
65 Namespace string
66 Name string
67 }
68
69
70 func resourceKey(resource kates.Object) string {
71 gvk := resource.GetObjectKind().GroupVersionKind()
72 return resourceKeyFromParts(gvk.Kind, resource.GetNamespace(), resource.GetName())
73 }
74
75 func resourceKeyFromParts(kind, namespace, name string) string {
76 return fmt.Sprintf("%s:%s:%s", kind, namespace, name)
77 }
78
79
80 func NewDispatcher() *Dispatcher {
81 return &Dispatcher{
82 transforms: map[string]func(kates.Object) (*CompiledConfig, error){},
83 configs: map[string]*CompiledConfig{},
84 }
85 }
86
87
88
89
90 func (d *Dispatcher) Register(kind string, transform func(kates.Object) (*CompiledConfig, error)) error {
91 _, ok := d.transforms[kind]
92 if ok {
93 return errors.Errorf("duplicate transform: %q", kind)
94 }
95
96 d.transforms[kind] = transform
97
98 return nil
99 }
100
101
102 func (d *Dispatcher) IsRegistered(kind string) bool {
103 _, ok := d.transforms[kind]
104 return ok
105 }
106
107
108 func (d *Dispatcher) Upsert(resource kates.Object) error {
109 gvk := resource.GetObjectKind().GroupVersionKind()
110 xform, ok := d.transforms[gvk.Kind]
111 if !ok {
112 return errors.Errorf("no transform for kind: %q", gvk.Kind)
113 }
114
115 key := resourceKey(resource)
116
117 config, err := xform(resource)
118 if err != nil {
119 return errors.Wrapf(err, "internal error processing %s", key)
120 }
121
122 d.configs[key] = config
123
124 d.snapshot = nil
125 return nil
126 }
127
128
129 func (d *Dispatcher) Delete(resource kates.Object) {
130 key := resourceKey(resource)
131 delete(d.configs, key)
132
133
134 d.snapshot = nil
135 }
136
137 func (d *Dispatcher) DeleteKey(kind, namespace, name string) {
138 key := resourceKeyFromParts(kind, namespace, name)
139 delete(d.configs, key)
140 d.snapshot = nil
141 }
142
143
144 func (d *Dispatcher) UpsertYaml(manifests string) error {
145 objs, err := kates.ParseManifests(manifests)
146 if err != nil {
147 return err
148 }
149 for _, obj := range objs {
150 err := d.Upsert(obj)
151 if err != nil {
152 return err
153 }
154 }
155 return nil
156 }
157
158
159 func (d *Dispatcher) GetErrors() []*CompiledItem {
160 var result []*CompiledItem
161 for _, config := range d.configs {
162 if config.Error != "" {
163 result = append(result, &config.CompiledItem)
164 }
165 for _, l := range config.Listeners {
166 if l.Error != "" {
167 result = append(result, &l.CompiledItem)
168 }
169 }
170 for _, r := range config.Routes {
171 if r.Error != "" {
172 result = append(result, &r.CompiledItem)
173 }
174 for _, cr := range r.ClusterRefs {
175 if cr.Error != "" {
176 result = append(result, &cr.CompiledItem)
177 }
178 }
179 }
180 for _, c := range config.Clusters {
181 if c.Error != "" {
182 result = append(result, &c.CompiledItem)
183 }
184 }
185 for _, la := range config.LoadAssignments {
186 if la.Error != "" {
187 result = append(result, &la.CompiledItem)
188 }
189 }
190 }
191 return result
192 }
193
194
195 func (d *Dispatcher) GetSnapshot(ctx context.Context) (string, *ecp_v2_cache.Snapshot) {
196 if d.snapshot == nil {
197 d.buildSnapshot(ctx)
198 }
199 return d.version, d.snapshot
200 }
201
202
203 func (d *Dispatcher) GetListener(ctx context.Context, name string) *apiv2.Listener {
204 _, snap := d.GetSnapshot(ctx)
205 for _, rsrc := range snap.Resources[ecp_cache_types.Listener].Items {
206 l := rsrc.Resource.(*apiv2.Listener)
207 if l.Name == name {
208 return l
209 }
210 }
211 return nil
212
213 }
214
215
216
217 func (d *Dispatcher) GetRouteConfiguration(ctx context.Context, name string) *apiv2.RouteConfiguration {
218 _, snap := d.GetSnapshot(ctx)
219 for _, rsrc := range snap.Resources[ecp_cache_types.Route].Items {
220 r := rsrc.Resource.(*apiv2.RouteConfiguration)
221 if r.Name == name {
222 return r
223 }
224 }
225 return nil
226 }
227
228
229
230 func (d *Dispatcher) IsWatched(namespace, name string) bool {
231 key := fmt.Sprintf("%s:%s", namespace, name)
232 _, ok := d.endpointWatches[key]
233 return ok
234 }
235
236 func (d *Dispatcher) buildClusterMap() (map[string]string, map[string]bool) {
237 refs := map[string]string{}
238 watches := map[string]bool{}
239 for _, config := range d.configs {
240 for _, route := range config.Routes {
241 for _, ref := range route.ClusterRefs {
242 refs[ref.Name] = ref.EndpointPath
243 if route.Namespace != "" {
244 key := fmt.Sprintf("%s:%s", route.Namespace, ref.Name)
245 watches[key] = true
246 }
247 }
248 }
249 }
250 return refs, watches
251 }
252
253 func (d *Dispatcher) buildEndpointMap() map[string]*apiv2.ClusterLoadAssignment {
254 endpoints := map[string]*apiv2.ClusterLoadAssignment{}
255 for _, config := range d.configs {
256 for _, la := range config.LoadAssignments {
257 endpoints[la.LoadAssignment.ClusterName] = la.LoadAssignment
258 }
259 }
260 return endpoints
261 }
262
263 func (d *Dispatcher) buildRouteConfigurations() ([]ecp_cache_types.Resource, []ecp_cache_types.Resource) {
264 listeners := []ecp_cache_types.Resource{}
265 routes := []ecp_cache_types.Resource{}
266 for _, config := range d.configs {
267 for _, lst := range config.Listeners {
268 listeners = append(listeners, lst.Listener)
269 r := d.buildRouteConfiguration(lst)
270 if r != nil {
271 routes = append(routes, r)
272 }
273 }
274 }
275 return listeners, routes
276 }
277
278 func (d *Dispatcher) buildRouteConfiguration(lst *CompiledListener) *apiv2.RouteConfiguration {
279 rdsName, isRds := getRdsName(lst.Listener)
280 if !isRds {
281 return nil
282 }
283
284 var routes []*apiv2_route.Route
285 for _, config := range d.configs {
286 for _, route := range config.Routes {
287 if lst.Predicate(route) {
288 routes = append(routes, route.Routes...)
289 }
290 }
291 }
292
293 return &apiv2.RouteConfiguration{
294 Name: rdsName,
295 VirtualHosts: []*apiv2_route.VirtualHost{
296 {
297 Name: rdsName,
298 Domains: lst.Domains,
299 Routes: routes,
300 },
301 },
302 }
303 }
304
305
306
307 func getRdsName(l *apiv2.Listener) (string, bool) {
308 for _, fc := range l.FilterChains {
309 for _, f := range fc.Filters {
310 if f.Name != ecp_wellknown.HTTPConnectionManager {
311 continue
312 }
313
314 hcm := ecp_v2_resource.GetHTTPConnectionManager(f)
315 if hcm != nil {
316 rds := hcm.GetRds()
317 if rds != nil {
318 return rds.RouteConfigName, true
319 }
320 }
321 }
322 }
323 return "", false
324 }
325
326 func (d *Dispatcher) buildSnapshot(ctx context.Context) {
327 d.changeCount++
328 d.version = fmt.Sprintf("v%d", d.changeCount)
329
330 endpointMap := d.buildEndpointMap()
331 clusterMap, endpointWatches := d.buildClusterMap()
332
333 clusters := []ecp_cache_types.Resource{}
334 endpoints := []ecp_cache_types.Resource{}
335 for name, path := range clusterMap {
336 clusters = append(clusters, makeCluster(name, path))
337 key := path
338 if key == "" {
339 key = name
340 }
341 la, ok := endpointMap[key]
342 if ok {
343 endpoints = append(endpoints, la)
344 } else {
345 endpoints = append(endpoints, &apiv2.ClusterLoadAssignment{
346 ClusterName: key,
347 Endpoints: []*apiv2_endpoint.LocalityLbEndpoints{},
348 })
349 }
350 }
351
352 listeners, routes := d.buildRouteConfigurations()
353
354 snapshot := ecp_v2_cache.NewSnapshot(
355 d.version,
356 endpoints,
357 clusters,
358 routes,
359 listeners,
360 nil,
361 nil,
362 )
363 if err := snapshot.Consistent(); err != nil {
364 bs, _ := json.MarshalIndent(snapshot, "", " ")
365 dlog.Errorf(ctx, "Dispatcher Snapshot inconsistency: %v: %s", err, bs)
366 } else {
367 d.snapshot = &snapshot
368 d.endpointWatches = endpointWatches
369 }
370 }
371
372 func makeCluster(name, path string) *apiv2.Cluster {
373 return &apiv2.Cluster{
374 Name: name,
375 ConnectTimeout: &durationpb.Duration{Seconds: 10},
376 ClusterDiscoveryType: &apiv2.Cluster_Type{Type: apiv2.Cluster_EDS},
377 EdsClusterConfig: &apiv2.Cluster_EdsClusterConfig{
378 EdsConfig: &apiv2_core.ConfigSource{ConfigSourceSpecifier: &apiv2_core.ConfigSource_Ads{}},
379 ServiceName: path,
380 },
381 }
382 }
383
View as plain text