...

Source file src/github.com/datawire/ambassador/v2/pkg/gateway/dispatcher.go

Documentation: github.com/datawire/ambassador/v2/pkg/gateway

     1  package gateway
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  
     8  	"github.com/pkg/errors"
     9  	"google.golang.org/protobuf/types/known/durationpb"
    10  
    11  	apiv2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
    12  	apiv2_core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
    13  	apiv2_endpoint "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/endpoint"
    14  	apiv2_route "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/route"
    15  
    16  	ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    17  	ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
    18  	ecp_v2_resource "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/resource/v2"
    19  	ecp_wellknown "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/wellknown"
    20  
    21  	"github.com/datawire/ambassador/v2/pkg/kates"
    22  	"github.com/datawire/dlib/dlog"
    23  )
    24  
    25  // The Dispatcher struct allows transforms to be registered for different kinds of kubernetes
    26  // resources and invokes those transforms to produce compiled envoy configurations. It also knows
    27  // how to assemble the compiled envoy configuration into a complete snapshot.
    28  //
    29  // Currently the dispatch process is relatively simple, each resource is processed as an independent
    30  // unit. This is sufficient for the gateway API since the currently implemented resources are
    31  // conveniently defined in such a way as to make them independent.
    32  //
    33  // Consistency is guaranteed assuming transform functions don't use out of band communication to
    34  // include information from other resources. This guarantee is achieved because each transform is
    35  // only passed a single resource and can therefore only use information from that one
    36  // resource. Changes to any other resource cannot impact the result of that transform.
    37  //
    38  // Not all the edgestack resources are defined as conveniently, so the Dispatcher design is expected
    39  // to be extended in two ways to handle resources with more complex interdependencies:
    40  //
    41  //  1. Grouping -- This feature would cover resources that need to be processed as a group,
    42  //     e.g. Mappings that get grouped together based on prefix. Instead of dispatching at the
    43  //     granularity of a single resource, the dispatcher will track groups of resources that need to
    44  //     be processed together via a logical "hash" function provided at registration. Whenever any
    45  //     item in a given bucket changes, the dispatcher will transform the entire bucket.
    46  //
    47  //  2. Dependencies -- This feature would cover resources that need to lookup the contents of other
    48  //     resources in order to properly implement their transform. This would be done by passing the
    49  //     transform function a Query API. Any resources queried by the transform would be
    50  //     automatically tracked as a dependency of that resource. The dependencies would then be used
    51  //     to perform invalidation whenever a resource is Upsert()ed.
    52  type Dispatcher struct {
    53  	// Map from kind to transform function.
    54  	transforms map[string]func(kates.Object) (*CompiledConfig, error)
    55  	configs    map[string]*CompiledConfig
    56  
    57  	version         string
    58  	changeCount     int
    59  	snapshot        *ecp_v2_cache.Snapshot
    60  	endpointWatches map[string]bool
    61  }
    62  
    63  type ResourceRef struct {
    64  	Kind      string
    65  	Namespace string
    66  	Name      string
    67  }
    68  
    69  // resourceKey produces a fully qualified key for a kubernetes resource.
    70  func resourceKey(resource kates.Object) string {
    71  	gvk := resource.GetObjectKind().GroupVersionKind()
    72  	return resourceKeyFromParts(gvk.Kind, resource.GetNamespace(), resource.GetName())
    73  }
    74  
    75  func resourceKeyFromParts(kind, namespace, name string) string {
    76  	return fmt.Sprintf("%s:%s:%s", kind, namespace, name)
    77  }
    78  
    79  // NewDispatcher creates a new and empty *Dispatcher struct.
    80  func NewDispatcher() *Dispatcher {
    81  	return &Dispatcher{
    82  		transforms: map[string]func(kates.Object) (*CompiledConfig, error){},
    83  		configs:    map[string]*CompiledConfig{},
    84  	}
    85  }
    86  
    87  // Register registers a transform function for the specified kubernetes resource. The transform
    88  // argument must be a function that takes a single resource of the supplied "kind" and returns a
    89  // single CompiledConfig object, i.e.: `func(Kind) *CompiledConfig`
    90  func (d *Dispatcher) Register(kind string, transform func(kates.Object) (*CompiledConfig, error)) error {
    91  	_, ok := d.transforms[kind]
    92  	if ok {
    93  		return errors.Errorf("duplicate transform: %q", kind)
    94  	}
    95  
    96  	d.transforms[kind] = transform
    97  
    98  	return nil
    99  }
   100  
   101  // IsRegistered returns true if the given kind can be processed by this dispatcher.
   102  func (d *Dispatcher) IsRegistered(kind string) bool {
   103  	_, ok := d.transforms[kind]
   104  	return ok
   105  }
   106  
   107  // Upsert processes the given kubernetes resource whether it is new or just updated.
   108  func (d *Dispatcher) Upsert(resource kates.Object) error {
   109  	gvk := resource.GetObjectKind().GroupVersionKind()
   110  	xform, ok := d.transforms[gvk.Kind]
   111  	if !ok {
   112  		return errors.Errorf("no transform for kind: %q", gvk.Kind)
   113  	}
   114  
   115  	key := resourceKey(resource)
   116  
   117  	config, err := xform(resource)
   118  	if err != nil {
   119  		return errors.Wrapf(err, "internal error processing %s", key)
   120  	}
   121  
   122  	d.configs[key] = config
   123  	// Clear out the snapshot so we regenerate one.
   124  	d.snapshot = nil
   125  	return nil
   126  }
   127  
   128  // Delete processes the deletion of the given kubernetes resource.
   129  func (d *Dispatcher) Delete(resource kates.Object) {
   130  	key := resourceKey(resource)
   131  	delete(d.configs, key)
   132  
   133  	// Clear out the snapshot so we regenerate one.
   134  	d.snapshot = nil
   135  }
   136  
   137  func (d *Dispatcher) DeleteKey(kind, namespace, name string) {
   138  	key := resourceKeyFromParts(kind, namespace, name)
   139  	delete(d.configs, key)
   140  	d.snapshot = nil
   141  }
   142  
   143  // UpsertYaml parses the supplied yaml and invokes Upsert on the result.
   144  func (d *Dispatcher) UpsertYaml(manifests string) error {
   145  	objs, err := kates.ParseManifests(manifests)
   146  	if err != nil {
   147  		return err
   148  	}
   149  	for _, obj := range objs {
   150  		err := d.Upsert(obj)
   151  		if err != nil {
   152  			return err
   153  		}
   154  	}
   155  	return nil
   156  }
   157  
   158  // GetErrors returns all compiled items with errors.
   159  func (d *Dispatcher) GetErrors() []*CompiledItem {
   160  	var result []*CompiledItem
   161  	for _, config := range d.configs {
   162  		if config.Error != "" {
   163  			result = append(result, &config.CompiledItem)
   164  		}
   165  		for _, l := range config.Listeners {
   166  			if l.Error != "" {
   167  				result = append(result, &l.CompiledItem)
   168  			}
   169  		}
   170  		for _, r := range config.Routes {
   171  			if r.Error != "" {
   172  				result = append(result, &r.CompiledItem)
   173  			}
   174  			for _, cr := range r.ClusterRefs {
   175  				if cr.Error != "" {
   176  					result = append(result, &cr.CompiledItem)
   177  				}
   178  			}
   179  		}
   180  		for _, c := range config.Clusters {
   181  			if c.Error != "" {
   182  				result = append(result, &c.CompiledItem)
   183  			}
   184  		}
   185  		for _, la := range config.LoadAssignments {
   186  			if la.Error != "" {
   187  				result = append(result, &la.CompiledItem)
   188  			}
   189  		}
   190  	}
   191  	return result
   192  }
   193  
   194  // GetSnapshot returns a version and a snapshot.
   195  func (d *Dispatcher) GetSnapshot(ctx context.Context) (string, *ecp_v2_cache.Snapshot) {
   196  	if d.snapshot == nil {
   197  		d.buildSnapshot(ctx)
   198  	}
   199  	return d.version, d.snapshot
   200  }
   201  
   202  // GetListener returns a *apiv2.Listener with the specified name or nil if none exists.
   203  func (d *Dispatcher) GetListener(ctx context.Context, name string) *apiv2.Listener {
   204  	_, snap := d.GetSnapshot(ctx)
   205  	for _, rsrc := range snap.Resources[ecp_cache_types.Listener].Items {
   206  		l := rsrc.Resource.(*apiv2.Listener)
   207  		if l.Name == name {
   208  			return l
   209  		}
   210  	}
   211  	return nil
   212  
   213  }
   214  
   215  // GetRouteConfiguration returns a *apiv2.RouteConfiguration with the specified name or nil if none
   216  // exists.
   217  func (d *Dispatcher) GetRouteConfiguration(ctx context.Context, name string) *apiv2.RouteConfiguration {
   218  	_, snap := d.GetSnapshot(ctx)
   219  	for _, rsrc := range snap.Resources[ecp_cache_types.Route].Items {
   220  		r := rsrc.Resource.(*apiv2.RouteConfiguration)
   221  		if r.Name == name {
   222  			return r
   223  		}
   224  	}
   225  	return nil
   226  }
   227  
   228  // IsWatched is a temporary hack for dealing with the way endpoint data currenttly flows from
   229  // watcher -> ambex.n
   230  func (d *Dispatcher) IsWatched(namespace, name string) bool {
   231  	key := fmt.Sprintf("%s:%s", namespace, name)
   232  	_, ok := d.endpointWatches[key]
   233  	return ok
   234  }
   235  
   236  func (d *Dispatcher) buildClusterMap() (map[string]string, map[string]bool) {
   237  	refs := map[string]string{}
   238  	watches := map[string]bool{}
   239  	for _, config := range d.configs {
   240  		for _, route := range config.Routes {
   241  			for _, ref := range route.ClusterRefs {
   242  				refs[ref.Name] = ref.EndpointPath
   243  				if route.Namespace != "" {
   244  					key := fmt.Sprintf("%s:%s", route.Namespace, ref.Name)
   245  					watches[key] = true
   246  				}
   247  			}
   248  		}
   249  	}
   250  	return refs, watches
   251  }
   252  
   253  func (d *Dispatcher) buildEndpointMap() map[string]*apiv2.ClusterLoadAssignment {
   254  	endpoints := map[string]*apiv2.ClusterLoadAssignment{}
   255  	for _, config := range d.configs {
   256  		for _, la := range config.LoadAssignments {
   257  			endpoints[la.LoadAssignment.ClusterName] = la.LoadAssignment
   258  		}
   259  	}
   260  	return endpoints
   261  }
   262  
   263  func (d *Dispatcher) buildRouteConfigurations() ([]ecp_cache_types.Resource, []ecp_cache_types.Resource) {
   264  	listeners := []ecp_cache_types.Resource{}
   265  	routes := []ecp_cache_types.Resource{}
   266  	for _, config := range d.configs {
   267  		for _, lst := range config.Listeners {
   268  			listeners = append(listeners, lst.Listener)
   269  			r := d.buildRouteConfiguration(lst)
   270  			if r != nil {
   271  				routes = append(routes, r)
   272  			}
   273  		}
   274  	}
   275  	return listeners, routes
   276  }
   277  
   278  func (d *Dispatcher) buildRouteConfiguration(lst *CompiledListener) *apiv2.RouteConfiguration {
   279  	rdsName, isRds := getRdsName(lst.Listener)
   280  	if !isRds {
   281  		return nil
   282  	}
   283  
   284  	var routes []*apiv2_route.Route
   285  	for _, config := range d.configs {
   286  		for _, route := range config.Routes {
   287  			if lst.Predicate(route) {
   288  				routes = append(routes, route.Routes...)
   289  			}
   290  		}
   291  	}
   292  
   293  	return &apiv2.RouteConfiguration{
   294  		Name: rdsName,
   295  		VirtualHosts: []*apiv2_route.VirtualHost{
   296  			{
   297  				Name:    rdsName,
   298  				Domains: lst.Domains,
   299  				Routes:  routes,
   300  			},
   301  		},
   302  	}
   303  }
   304  
   305  // getRdsName returns the RDS route configuration name configured for the listener and a flag
   306  // indicating whether the listener uses Rds.
   307  func getRdsName(l *apiv2.Listener) (string, bool) {
   308  	for _, fc := range l.FilterChains {
   309  		for _, f := range fc.Filters {
   310  			if f.Name != ecp_wellknown.HTTPConnectionManager {
   311  				continue
   312  			}
   313  
   314  			hcm := ecp_v2_resource.GetHTTPConnectionManager(f)
   315  			if hcm != nil {
   316  				rds := hcm.GetRds()
   317  				if rds != nil {
   318  					return rds.RouteConfigName, true
   319  				}
   320  			}
   321  		}
   322  	}
   323  	return "", false
   324  }
   325  
   326  func (d *Dispatcher) buildSnapshot(ctx context.Context) {
   327  	d.changeCount++
   328  	d.version = fmt.Sprintf("v%d", d.changeCount)
   329  
   330  	endpointMap := d.buildEndpointMap()
   331  	clusterMap, endpointWatches := d.buildClusterMap()
   332  
   333  	clusters := []ecp_cache_types.Resource{}
   334  	endpoints := []ecp_cache_types.Resource{}
   335  	for name, path := range clusterMap {
   336  		clusters = append(clusters, makeCluster(name, path))
   337  		key := path
   338  		if key == "" {
   339  			key = name
   340  		}
   341  		la, ok := endpointMap[key]
   342  		if ok {
   343  			endpoints = append(endpoints, la)
   344  		} else {
   345  			endpoints = append(endpoints, &apiv2.ClusterLoadAssignment{
   346  				ClusterName: key,
   347  				Endpoints:   []*apiv2_endpoint.LocalityLbEndpoints{},
   348  			})
   349  		}
   350  	}
   351  
   352  	listeners, routes := d.buildRouteConfigurations()
   353  
   354  	snapshot := ecp_v2_cache.NewSnapshot(
   355  		d.version,
   356  		endpoints,
   357  		clusters,
   358  		routes,
   359  		listeners,
   360  		nil, // runtimes
   361  		nil, // secrets
   362  	)
   363  	if err := snapshot.Consistent(); err != nil {
   364  		bs, _ := json.MarshalIndent(snapshot, "", "  ")
   365  		dlog.Errorf(ctx, "Dispatcher Snapshot inconsistency: %v: %s", err, bs)
   366  	} else {
   367  		d.snapshot = &snapshot
   368  		d.endpointWatches = endpointWatches
   369  	}
   370  }
   371  
   372  func makeCluster(name, path string) *apiv2.Cluster {
   373  	return &apiv2.Cluster{
   374  		Name:                 name,
   375  		ConnectTimeout:       &durationpb.Duration{Seconds: 10},
   376  		ClusterDiscoveryType: &apiv2.Cluster_Type{Type: apiv2.Cluster_EDS},
   377  		EdsClusterConfig: &apiv2.Cluster_EdsClusterConfig{
   378  			EdsConfig:   &apiv2_core.ConfigSource{ConfigSourceSpecifier: &apiv2_core.ConfigSource_Ads{}},
   379  			ServiceName: path,
   380  		},
   381  	}
   382  }
   383  

View as plain text