1 package gateway
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7
8 "github.com/pkg/errors"
9 "google.golang.org/protobuf/types/known/durationpb"
10
11
12
13 v3cluster "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/cluster/v3"
14 v3core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
15 v3endpoint "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/endpoint/v3"
16 v3listener "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/listener/v3"
17 v3route "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/route/v3"
18
19
20 ecp_cache_types "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
21 ecp_v3_cache "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
22 ecp_v3_resource "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
23 ecp_wellknown "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/wellknown"
24
25 "github.com/datawire/dlib/dlog"
26 "github.com/emissary-ingress/emissary/v3/pkg/kates"
27 )
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 type Dispatcher struct {
57
58 transforms map[string]func(kates.Object) (*CompiledConfig, error)
59 configs map[string]*CompiledConfig
60
61 version string
62 changeCount int
63 snapshot *ecp_v3_cache.Snapshot
64 endpointWatches map[string]bool
65 }
66
67 type ResourceRef struct {
68 Kind string
69 Namespace string
70 Name string
71 }
72
73
74 func resourceKey(resource kates.Object) string {
75 gvk := resource.GetObjectKind().GroupVersionKind()
76 return resourceKeyFromParts(gvk.Kind, resource.GetNamespace(), resource.GetName())
77 }
78
79 func resourceKeyFromParts(kind, namespace, name string) string {
80 return fmt.Sprintf("%s:%s:%s", kind, namespace, name)
81 }
82
83
84 func NewDispatcher() *Dispatcher {
85 return &Dispatcher{
86 transforms: map[string]func(kates.Object) (*CompiledConfig, error){},
87 configs: map[string]*CompiledConfig{},
88 }
89 }
90
91
92
93
94 func (d *Dispatcher) Register(kind string, transform func(kates.Object) (*CompiledConfig, error)) error {
95 _, ok := d.transforms[kind]
96 if ok {
97 return errors.Errorf("duplicate transform: %q", kind)
98 }
99
100 d.transforms[kind] = transform
101
102 return nil
103 }
104
105
106 func (d *Dispatcher) IsRegistered(kind string) bool {
107 _, ok := d.transforms[kind]
108 return ok
109 }
110
111
112 func (d *Dispatcher) Upsert(resource kates.Object) error {
113 gvk := resource.GetObjectKind().GroupVersionKind()
114 xform, ok := d.transforms[gvk.Kind]
115 if !ok {
116 return errors.Errorf("no transform for kind: %q", gvk.Kind)
117 }
118
119 key := resourceKey(resource)
120
121 config, err := xform(resource)
122 if err != nil {
123 return errors.Wrapf(err, "internal error processing %s", key)
124 }
125
126 d.configs[key] = config
127
128 d.snapshot = nil
129 return nil
130 }
131
132
133 func (d *Dispatcher) Delete(resource kates.Object) {
134 key := resourceKey(resource)
135 delete(d.configs, key)
136
137
138 d.snapshot = nil
139 }
140
141 func (d *Dispatcher) DeleteKey(kind, namespace, name string) {
142 key := resourceKeyFromParts(kind, namespace, name)
143 delete(d.configs, key)
144 d.snapshot = nil
145 }
146
147
148 func (d *Dispatcher) UpsertYaml(manifests string) error {
149 objs, err := kates.ParseManifests(manifests)
150 if err != nil {
151 return err
152 }
153 for _, obj := range objs {
154 err := d.Upsert(obj)
155 if err != nil {
156 return err
157 }
158 }
159 return nil
160 }
161
162
163 func (d *Dispatcher) GetErrors() []*CompiledItem {
164 var result []*CompiledItem
165 for _, config := range d.configs {
166 if config.Error != "" {
167 result = append(result, &config.CompiledItem)
168 }
169 for _, l := range config.Listeners {
170 if l.Error != "" {
171 result = append(result, &l.CompiledItem)
172 }
173 }
174 for _, r := range config.Routes {
175 if r.Error != "" {
176 result = append(result, &r.CompiledItem)
177 }
178 for _, cr := range r.ClusterRefs {
179 if cr.Error != "" {
180 result = append(result, &cr.CompiledItem)
181 }
182 }
183 }
184 for _, c := range config.Clusters {
185 if c.Error != "" {
186 result = append(result, &c.CompiledItem)
187 }
188 }
189 for _, la := range config.LoadAssignments {
190 if la.Error != "" {
191 result = append(result, &la.CompiledItem)
192 }
193 }
194 }
195 return result
196 }
197
198
199
200 func (d *Dispatcher) GetSnapshot(ctx context.Context) (string, *ecp_v3_cache.Snapshot) {
201 if d.snapshot == nil {
202 d.buildSnapshot(ctx)
203 }
204 return d.version, d.snapshot
205 }
206
207
208 func (d *Dispatcher) GetListener(ctx context.Context, name string) *v3listener.Listener {
209 _, snapshot := d.GetSnapshot(ctx)
210
211 if snapshot == nil {
212 return nil
213 }
214
215 for _, rsrc := range snapshot.Resources[ecp_cache_types.Listener].Items {
216 l := rsrc.Resource.(*v3listener.Listener)
217 if l.Name == name {
218 return l
219 }
220 }
221 return nil
222
223 }
224
225
226
227 func (d *Dispatcher) GetRouteConfiguration(ctx context.Context, name string) *v3route.RouteConfiguration {
228 _, snapshot := d.GetSnapshot(ctx)
229
230 if snapshot == nil {
231 return nil
232 }
233
234 for _, rsrc := range snapshot.Resources[ecp_cache_types.Route].Items {
235 r := rsrc.Resource.(*v3route.RouteConfiguration)
236 if r.Name == name {
237 return r
238 }
239 }
240 return nil
241 }
242
243
244
245 func (d *Dispatcher) IsWatched(namespace, name string) bool {
246 key := fmt.Sprintf("%s:%s", namespace, name)
247 _, ok := d.endpointWatches[key]
248 return ok
249 }
250
251 func (d *Dispatcher) buildClusterMap() (map[string]string, map[string]bool) {
252 refs := map[string]string{}
253 watches := map[string]bool{}
254 for _, config := range d.configs {
255 for _, route := range config.Routes {
256 for _, ref := range route.ClusterRefs {
257 refs[ref.Name] = ref.EndpointPath
258 if route.Namespace != "" {
259 key := fmt.Sprintf("%s:%s", route.Namespace, ref.Name)
260 watches[key] = true
261 }
262 }
263 }
264 }
265 return refs, watches
266 }
267
268 func (d *Dispatcher) buildEndpointMap() map[string]*v3endpoint.ClusterLoadAssignment {
269 endpoints := map[string]*v3endpoint.ClusterLoadAssignment{}
270 for _, config := range d.configs {
271 for _, la := range config.LoadAssignments {
272 endpoints[la.LoadAssignment.ClusterName] = la.LoadAssignment
273 }
274 }
275 return endpoints
276 }
277
278 func (d *Dispatcher) buildRouteConfigurations() ([]ecp_cache_types.Resource, []ecp_cache_types.Resource) {
279 listeners := []ecp_cache_types.Resource{}
280 routes := []ecp_cache_types.Resource{}
281 for _, config := range d.configs {
282 for _, lst := range config.Listeners {
283 listeners = append(listeners, lst.Listener)
284 r := d.buildRouteConfiguration(lst)
285 if r != nil {
286 routes = append(routes, r)
287 }
288 }
289 }
290 return listeners, routes
291 }
292
293 func (d *Dispatcher) buildRouteConfiguration(lst *CompiledListener) *v3route.RouteConfiguration {
294 rdsName, isRds := getRdsName(lst.Listener)
295 if !isRds {
296 return nil
297 }
298
299 var routes []*v3route.Route
300 for _, config := range d.configs {
301 for _, route := range config.Routes {
302 if lst.Predicate(route) {
303 routes = append(routes, route.Routes...)
304 }
305 }
306 }
307
308 return &v3route.RouteConfiguration{
309 Name: rdsName,
310 VirtualHosts: []*v3route.VirtualHost{
311 {
312 Name: rdsName,
313 Domains: lst.Domains,
314 Routes: routes,
315 },
316 },
317 }
318 }
319
320
321
322 func getRdsName(l *v3listener.Listener) (string, bool) {
323 for _, fc := range l.FilterChains {
324 for _, f := range fc.Filters {
325 if f.Name != ecp_wellknown.HTTPConnectionManager {
326 continue
327 }
328
329 hcm := ecp_v3_resource.GetHTTPConnectionManager(f)
330 if hcm != nil {
331 rds := hcm.GetRds()
332 if rds != nil {
333 return rds.RouteConfigName, true
334 }
335 }
336 }
337 }
338 return "", false
339 }
340
341 func (d *Dispatcher) buildSnapshot(ctx context.Context) {
342 d.changeCount++
343 d.version = fmt.Sprintf("v%d", d.changeCount)
344
345 endpointMap := d.buildEndpointMap()
346 clusterMap, endpointWatches := d.buildClusterMap()
347
348 clusters := []ecp_cache_types.Resource{}
349 endpoints := []ecp_cache_types.Resource{}
350 for name, path := range clusterMap {
351 clusters = append(clusters, makeCluster(name, path))
352 key := path
353 if key == "" {
354 key = name
355 }
356 la, ok := endpointMap[key]
357 if ok {
358 endpoints = append(endpoints, la)
359 } else {
360 endpoints = append(endpoints, &v3endpoint.ClusterLoadAssignment{
361 ClusterName: key,
362 Endpoints: []*v3endpoint.LocalityLbEndpoints{},
363 })
364 }
365 }
366
367 listeners, routes := d.buildRouteConfigurations()
368
369 snapshotResources := map[ecp_v3_resource.Type][]ecp_cache_types.Resource{
370 ecp_v3_resource.EndpointType: endpoints,
371 ecp_v3_resource.ClusterType: clusters,
372 ecp_v3_resource.RouteType: routes,
373 ecp_v3_resource.ListenerType: listeners,
374 }
375
376 snapshot, err := ecp_v3_cache.NewSnapshot(d.version, snapshotResources)
377 if err != nil {
378 dlog.Errorf(ctx, "Dispatcher Snapshot Error: %v", err)
379 }
380
381 if err := snapshot.Consistent(); err != nil {
382 bs, _ := json.MarshalIndent(snapshot, "", " ")
383 dlog.Errorf(ctx, "Dispatcher Snapshot inconsistency: %v: %s", err, bs)
384 } else {
385 d.snapshot = snapshot
386 d.endpointWatches = endpointWatches
387 }
388 }
389
390 func makeCluster(name, path string) *v3cluster.Cluster {
391 return &v3cluster.Cluster{
392 Name: name,
393 ConnectTimeout: &durationpb.Duration{Seconds: 10},
394 ClusterDiscoveryType: &v3cluster.Cluster_Type{Type: v3cluster.Cluster_EDS},
395 EdsClusterConfig: &v3cluster.Cluster_EdsClusterConfig{
396 EdsConfig: &v3core.ConfigSource{ConfigSourceSpecifier: &v3core.ConfigSource_Ads{}},
397 ServiceName: path,
398 },
399 }
400 }
401
View as plain text