...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/gateway/dispatcher.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/gateway

     1  package gateway
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  
     8  	"github.com/pkg/errors"
     9  	"google.golang.org/protobuf/types/known/durationpb"
    10  
    11  	// Envoy API v3
    12  
    13  	v3cluster "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/cluster/v3"
    14  	v3core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
    15  	v3endpoint "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/endpoint/v3"
    16  	v3listener "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/listener/v3"
    17  	v3route "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/route/v3"
    18  
    19  	// Envoy control plane API's
    20  	ecp_cache_types "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
    21  	ecp_v3_cache "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
    22  	ecp_v3_resource "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
    23  	ecp_wellknown "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/wellknown"
    24  
    25  	"github.com/datawire/dlib/dlog"
    26  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    27  )
    28  
    29  // The Dispatcher struct allows transforms to be registered for different kinds of kubernetes
    30  // resources and invokes those transforms to produce compiled envoy configurations. It also knows
    31  // how to assemble the compiled envoy configuration into a complete snapshot.
    32  //
    33  // Currently the dispatch process is relatively simple, each resource is processed as an independent
    34  // unit. This is sufficient for the gateway API since the currently implemented resources are
    35  // conveniently defined in such a way as to make them independent.
    36  //
    37  // Consistency is guaranteed assuming transform functions don't use out of band communication to
    38  // include information from other resources. This guarantee is achieved because each transform is
    39  // only passed a single resource and can therefore only use information from that one
    40  // resource. Changes to any other resource cannot impact the result of that transform.
    41  //
    42  // Not all the edgestack resources are defined as conveniently, so the Dispatcher design is expected
    43  // to be extended in two ways to handle resources with more complex interdependencies:
    44  //
    45  //  1. Grouping -- This feature would cover resources that need to be processed as a group,
    46  //     e.g. Mappings that get grouped together based on prefix. Instead of dispatching at the
    47  //     granularity of a single resource, the dispatcher will track groups of resources that need to
    48  //     be processed together via a logical "hash" function provided at registration. Whenever any
    49  //     item in a given bucket changes, the dispatcher will transform the entire bucket.
    50  //
    51  //  2. Dependencies -- This feature would cover resources that need to lookup the contents of other
    52  //     resources in order to properly implement their transform. This would be done by passing the
    53  //     transform function a Query API. Any resources queried by the transform would be
    54  //     automatically tracked as a dependency of that resource. The dependencies would then be used
    55  //     to perform invalidation whenever a resource is Upsert()ed.
    56  type Dispatcher struct {
    57  	// Map from kind to transform function.
    58  	transforms map[string]func(kates.Object) (*CompiledConfig, error)
    59  	configs    map[string]*CompiledConfig
    60  
    61  	version         string
    62  	changeCount     int
    63  	snapshot        *ecp_v3_cache.Snapshot
    64  	endpointWatches map[string]bool
    65  }
    66  
    67  type ResourceRef struct {
    68  	Kind      string
    69  	Namespace string
    70  	Name      string
    71  }
    72  
    73  // resourceKey produces a fully qualified key for a kubernetes resource.
    74  func resourceKey(resource kates.Object) string {
    75  	gvk := resource.GetObjectKind().GroupVersionKind()
    76  	return resourceKeyFromParts(gvk.Kind, resource.GetNamespace(), resource.GetName())
    77  }
    78  
    79  func resourceKeyFromParts(kind, namespace, name string) string {
    80  	return fmt.Sprintf("%s:%s:%s", kind, namespace, name)
    81  }
    82  
    83  // NewDispatcher creates a new and empty *Dispatcher struct.
    84  func NewDispatcher() *Dispatcher {
    85  	return &Dispatcher{
    86  		transforms: map[string]func(kates.Object) (*CompiledConfig, error){},
    87  		configs:    map[string]*CompiledConfig{},
    88  	}
    89  }
    90  
    91  // Register registers a transform function for the specified kubernetes resource. The transform
    92  // argument must be a function that takes a single resource of the supplied "kind" and returns a
    93  // single CompiledConfig object, i.e.: `func(Kind) *CompiledConfig`
    94  func (d *Dispatcher) Register(kind string, transform func(kates.Object) (*CompiledConfig, error)) error {
    95  	_, ok := d.transforms[kind]
    96  	if ok {
    97  		return errors.Errorf("duplicate transform: %q", kind)
    98  	}
    99  
   100  	d.transforms[kind] = transform
   101  
   102  	return nil
   103  }
   104  
   105  // IsRegistered returns true if the given kind can be processed by this dispatcher.
   106  func (d *Dispatcher) IsRegistered(kind string) bool {
   107  	_, ok := d.transforms[kind]
   108  	return ok
   109  }
   110  
   111  // Upsert processes the given kubernetes resource whether it is new or just updated.
   112  func (d *Dispatcher) Upsert(resource kates.Object) error {
   113  	gvk := resource.GetObjectKind().GroupVersionKind()
   114  	xform, ok := d.transforms[gvk.Kind]
   115  	if !ok {
   116  		return errors.Errorf("no transform for kind: %q", gvk.Kind)
   117  	}
   118  
   119  	key := resourceKey(resource)
   120  
   121  	config, err := xform(resource)
   122  	if err != nil {
   123  		return errors.Wrapf(err, "internal error processing %s", key)
   124  	}
   125  
   126  	d.configs[key] = config
   127  	// Clear out the snapshot so we regenerate one.
   128  	d.snapshot = nil
   129  	return nil
   130  }
   131  
   132  // Delete processes the deletion of the given kubernetes resource.
   133  func (d *Dispatcher) Delete(resource kates.Object) {
   134  	key := resourceKey(resource)
   135  	delete(d.configs, key)
   136  
   137  	// Clear out the snapshot so we regenerate one.
   138  	d.snapshot = nil
   139  }
   140  
   141  func (d *Dispatcher) DeleteKey(kind, namespace, name string) {
   142  	key := resourceKeyFromParts(kind, namespace, name)
   143  	delete(d.configs, key)
   144  	d.snapshot = nil
   145  }
   146  
   147  // UpsertYaml parses the supplied yaml and invokes Upsert on the result.
   148  func (d *Dispatcher) UpsertYaml(manifests string) error {
   149  	objs, err := kates.ParseManifests(manifests)
   150  	if err != nil {
   151  		return err
   152  	}
   153  	for _, obj := range objs {
   154  		err := d.Upsert(obj)
   155  		if err != nil {
   156  			return err
   157  		}
   158  	}
   159  	return nil
   160  }
   161  
   162  // GetErrors returns all compiled items with errors.
   163  func (d *Dispatcher) GetErrors() []*CompiledItem {
   164  	var result []*CompiledItem
   165  	for _, config := range d.configs {
   166  		if config.Error != "" {
   167  			result = append(result, &config.CompiledItem)
   168  		}
   169  		for _, l := range config.Listeners {
   170  			if l.Error != "" {
   171  				result = append(result, &l.CompiledItem)
   172  			}
   173  		}
   174  		for _, r := range config.Routes {
   175  			if r.Error != "" {
   176  				result = append(result, &r.CompiledItem)
   177  			}
   178  			for _, cr := range r.ClusterRefs {
   179  				if cr.Error != "" {
   180  					result = append(result, &cr.CompiledItem)
   181  				}
   182  			}
   183  		}
   184  		for _, c := range config.Clusters {
   185  			if c.Error != "" {
   186  				result = append(result, &c.CompiledItem)
   187  			}
   188  		}
   189  		for _, la := range config.LoadAssignments {
   190  			if la.Error != "" {
   191  				result = append(result, &la.CompiledItem)
   192  			}
   193  		}
   194  	}
   195  	return result
   196  }
   197  
   198  // GetSnapshot returns a version and a snapshot if the snapshot is consistent
   199  // Important: a nil snapshot can be returned so you must check to to make sure it exists
   200  func (d *Dispatcher) GetSnapshot(ctx context.Context) (string, *ecp_v3_cache.Snapshot) {
   201  	if d.snapshot == nil {
   202  		d.buildSnapshot(ctx)
   203  	}
   204  	return d.version, d.snapshot
   205  }
   206  
   207  // GetListener returns a *v3listener.Listener with the specified name or nil if none exists.
   208  func (d *Dispatcher) GetListener(ctx context.Context, name string) *v3listener.Listener {
   209  	_, snapshot := d.GetSnapshot(ctx)
   210  	// ensure that snapshot is not nil before trying to use
   211  	if snapshot == nil {
   212  		return nil
   213  	}
   214  
   215  	for _, rsrc := range snapshot.Resources[ecp_cache_types.Listener].Items {
   216  		l := rsrc.Resource.(*v3listener.Listener)
   217  		if l.Name == name {
   218  			return l
   219  		}
   220  	}
   221  	return nil
   222  
   223  }
   224  
   225  // GetRouteConfiguration returns a *apiv2.RouteConfiguration with the specified name or nil if none
   226  // exists.
   227  func (d *Dispatcher) GetRouteConfiguration(ctx context.Context, name string) *v3route.RouteConfiguration {
   228  	_, snapshot := d.GetSnapshot(ctx)
   229  	// ensure snapshot is valid before attempting to access members to prevent panic
   230  	if snapshot == nil {
   231  		return nil
   232  	}
   233  
   234  	for _, rsrc := range snapshot.Resources[ecp_cache_types.Route].Items {
   235  		r := rsrc.Resource.(*v3route.RouteConfiguration)
   236  		if r.Name == name {
   237  			return r
   238  		}
   239  	}
   240  	return nil
   241  }
   242  
   243  // IsWatched is a temporary hack for dealing with the way endpoint data currenttly flows from
   244  // watcher -> ambex.n
   245  func (d *Dispatcher) IsWatched(namespace, name string) bool {
   246  	key := fmt.Sprintf("%s:%s", namespace, name)
   247  	_, ok := d.endpointWatches[key]
   248  	return ok
   249  }
   250  
   251  func (d *Dispatcher) buildClusterMap() (map[string]string, map[string]bool) {
   252  	refs := map[string]string{}
   253  	watches := map[string]bool{}
   254  	for _, config := range d.configs {
   255  		for _, route := range config.Routes {
   256  			for _, ref := range route.ClusterRefs {
   257  				refs[ref.Name] = ref.EndpointPath
   258  				if route.Namespace != "" {
   259  					key := fmt.Sprintf("%s:%s", route.Namespace, ref.Name)
   260  					watches[key] = true
   261  				}
   262  			}
   263  		}
   264  	}
   265  	return refs, watches
   266  }
   267  
   268  func (d *Dispatcher) buildEndpointMap() map[string]*v3endpoint.ClusterLoadAssignment {
   269  	endpoints := map[string]*v3endpoint.ClusterLoadAssignment{}
   270  	for _, config := range d.configs {
   271  		for _, la := range config.LoadAssignments {
   272  			endpoints[la.LoadAssignment.ClusterName] = la.LoadAssignment
   273  		}
   274  	}
   275  	return endpoints
   276  }
   277  
   278  func (d *Dispatcher) buildRouteConfigurations() ([]ecp_cache_types.Resource, []ecp_cache_types.Resource) {
   279  	listeners := []ecp_cache_types.Resource{}
   280  	routes := []ecp_cache_types.Resource{}
   281  	for _, config := range d.configs {
   282  		for _, lst := range config.Listeners {
   283  			listeners = append(listeners, lst.Listener)
   284  			r := d.buildRouteConfiguration(lst)
   285  			if r != nil {
   286  				routes = append(routes, r)
   287  			}
   288  		}
   289  	}
   290  	return listeners, routes
   291  }
   292  
   293  func (d *Dispatcher) buildRouteConfiguration(lst *CompiledListener) *v3route.RouteConfiguration {
   294  	rdsName, isRds := getRdsName(lst.Listener)
   295  	if !isRds {
   296  		return nil
   297  	}
   298  
   299  	var routes []*v3route.Route
   300  	for _, config := range d.configs {
   301  		for _, route := range config.Routes {
   302  			if lst.Predicate(route) {
   303  				routes = append(routes, route.Routes...)
   304  			}
   305  		}
   306  	}
   307  
   308  	return &v3route.RouteConfiguration{
   309  		Name: rdsName,
   310  		VirtualHosts: []*v3route.VirtualHost{
   311  			{
   312  				Name:    rdsName,
   313  				Domains: lst.Domains,
   314  				Routes:  routes,
   315  			},
   316  		},
   317  	}
   318  }
   319  
   320  // getRdsName returns the RDS route configuration name configured for the listener and a flag
   321  // indicating whether the listener uses Rds.
   322  func getRdsName(l *v3listener.Listener) (string, bool) {
   323  	for _, fc := range l.FilterChains {
   324  		for _, f := range fc.Filters {
   325  			if f.Name != ecp_wellknown.HTTPConnectionManager {
   326  				continue
   327  			}
   328  
   329  			hcm := ecp_v3_resource.GetHTTPConnectionManager(f)
   330  			if hcm != nil {
   331  				rds := hcm.GetRds()
   332  				if rds != nil {
   333  					return rds.RouteConfigName, true
   334  				}
   335  			}
   336  		}
   337  	}
   338  	return "", false
   339  }
   340  
   341  func (d *Dispatcher) buildSnapshot(ctx context.Context) {
   342  	d.changeCount++
   343  	d.version = fmt.Sprintf("v%d", d.changeCount)
   344  
   345  	endpointMap := d.buildEndpointMap()
   346  	clusterMap, endpointWatches := d.buildClusterMap()
   347  
   348  	clusters := []ecp_cache_types.Resource{}
   349  	endpoints := []ecp_cache_types.Resource{}
   350  	for name, path := range clusterMap {
   351  		clusters = append(clusters, makeCluster(name, path))
   352  		key := path
   353  		if key == "" {
   354  			key = name
   355  		}
   356  		la, ok := endpointMap[key]
   357  		if ok {
   358  			endpoints = append(endpoints, la)
   359  		} else {
   360  			endpoints = append(endpoints, &v3endpoint.ClusterLoadAssignment{
   361  				ClusterName: key,
   362  				Endpoints:   []*v3endpoint.LocalityLbEndpoints{},
   363  			})
   364  		}
   365  	}
   366  
   367  	listeners, routes := d.buildRouteConfigurations()
   368  
   369  	snapshotResources := map[ecp_v3_resource.Type][]ecp_cache_types.Resource{
   370  		ecp_v3_resource.EndpointType: endpoints,
   371  		ecp_v3_resource.ClusterType:  clusters,
   372  		ecp_v3_resource.RouteType:    routes,
   373  		ecp_v3_resource.ListenerType: listeners,
   374  	}
   375  
   376  	snapshot, err := ecp_v3_cache.NewSnapshot(d.version, snapshotResources)
   377  	if err != nil {
   378  		dlog.Errorf(ctx, "Dispatcher Snapshot Error: %v", err)
   379  	}
   380  
   381  	if err := snapshot.Consistent(); err != nil {
   382  		bs, _ := json.MarshalIndent(snapshot, "", "  ")
   383  		dlog.Errorf(ctx, "Dispatcher Snapshot inconsistency: %v: %s", err, bs)
   384  	} else {
   385  		d.snapshot = snapshot
   386  		d.endpointWatches = endpointWatches
   387  	}
   388  }
   389  
   390  func makeCluster(name, path string) *v3cluster.Cluster {
   391  	return &v3cluster.Cluster{
   392  		Name:                 name,
   393  		ConnectTimeout:       &durationpb.Duration{Seconds: 10},
   394  		ClusterDiscoveryType: &v3cluster.Cluster_Type{Type: v3cluster.Cluster_EDS},
   395  		EdsClusterConfig: &v3cluster.Cluster_EdsClusterConfig{
   396  			EdsConfig:   &v3core.ConfigSource{ConfigSourceSpecifier: &v3core.ConfigSource_Ads{}},
   397  			ServiceName: path,
   398  		},
   399  	}
   400  }
   401  

View as plain text