...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/endpoints.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  	"reflect"
     8  	"strconv"
     9  	"strings"
    10  
    11  	amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
    12  	"github.com/datawire/ambassador/v2/pkg/kates"
    13  	snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    14  	"github.com/datawire/dlib/dlog"
    15  )
    16  
    17  // endpointRoutingInfo keeps track of everything we need to know to figure out if
    18  // endpoint routing is active.
    19  type endpointRoutingInfo struct {
    20  	// Map from resolver name to resolver type.
    21  	resolverTypes   map[string]ResolverType
    22  	module          moduleResolver
    23  	endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about.
    24  	previousWatches map[string]bool
    25  }
    26  
    27  type ResolverType int
    28  
    29  const (
    30  	KubernetesServiceResolver ResolverType = iota
    31  	KubernetesEndpointResolver
    32  	ConsulResolver
    33  )
    34  
    35  func (rt ResolverType) String() string {
    36  	switch rt {
    37  	case KubernetesServiceResolver:
    38  		return "KubernetesServiceResolver"
    39  	case KubernetesEndpointResolver:
    40  		return "KubernetesEndpointResolver"
    41  	case ConsulResolver:
    42  		return "ConsulResolver"
    43  	default:
    44  		panic(fmt.Errorf("ResolverType.String: invalid enum value: %d", rt))
    45  	}
    46  }
    47  
    48  // newEndpointRoutingInfo creates a shiny new struct to hold information about
    49  // resolvers in use and such.
    50  func newEndpointRoutingInfo() endpointRoutingInfo {
    51  	return endpointRoutingInfo{
    52  		// resolverTypes keeps track of the type of every resolver in the system.
    53  		// It starts out empty.
    54  		//
    55  		// Why do we need to look at all the resolvers? Because, unless the user
    56  		// overrides them, resolvers "endpoint" and "kubernetes-endpoint" are
    57  		// implicitly endpoint resolvers -- but they won't show up in the snapshot.
    58  		// So we need to track whether they've been redefined. Sigh.
    59  		resolverTypes: make(map[string]ResolverType),
    60  		// Track which endpoints we actually want to watch.
    61  		endpointWatches: make(map[string]bool),
    62  	}
    63  }
    64  
    65  func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s *snapshotTypes.KubernetesSnapshot) {
    66  	// Reset our state except for the previous endpoint watches. We keep them so we can detect if
    67  	// the set of things we are interested in has changed.
    68  	eri.resolverTypes = map[string]ResolverType{}
    69  	eri.module = moduleResolver{}
    70  	eri.previousWatches = eri.endpointWatches
    71  	eri.endpointWatches = map[string]bool{}
    72  
    73  	// Phase one processes all the configuration stuff that Mappings depend on. Right now this
    74  	// includes Modules and Resolvers. When we are done with Phase one we have processed enough
    75  	// resources to correctly interpret Mappings.
    76  	for _, list := range s.Annotations {
    77  		for _, a := range list {
    78  			if _, isInvalid := a.(*kates.Unstructured); isInvalid {
    79  				continue
    80  			}
    81  			if include(GetAmbID(ctx, a)) {
    82  				eri.checkResourcePhase1(ctx, a, "annotation")
    83  			}
    84  		}
    85  	}
    86  
    87  	// After that, walk all the other resources. We do this with separate loops
    88  	// for each type -- since we know a priori what type they are, there's no
    89  	// need to test every resource, and no need to walk over things we're not
    90  	// interested in.
    91  	for _, m := range s.Modules {
    92  		if include(m.Spec.AmbassadorID) {
    93  			eri.checkModule(ctx, m, "CRD")
    94  		}
    95  	}
    96  
    97  	for _, r := range s.KubernetesServiceResolvers {
    98  		if include(r.Spec.AmbassadorID) {
    99  			eri.saveResolver(ctx, r.GetName(), KubernetesServiceResolver, "CRD")
   100  		}
   101  	}
   102  
   103  	for _, r := range s.KubernetesEndpointResolvers {
   104  		if include(r.Spec.AmbassadorID) {
   105  			eri.saveResolver(ctx, r.GetName(), KubernetesEndpointResolver, "CRD")
   106  		}
   107  	}
   108  
   109  	for _, r := range s.ConsulResolvers {
   110  		if include(r.Spec.AmbassadorID) {
   111  			eri.saveResolver(ctx, r.GetName(), ConsulResolver, "CRD")
   112  		}
   113  	}
   114  
   115  	// Once all THAT is done, make sure to define the default "endpoint" and
   116  	// "kubernetes-endpoint" resolvers if they don't exist.
   117  	for _, rName := range []string{"endpoint", "kubernetes-endpoint"} {
   118  		_, found := eri.resolverTypes[rName]
   119  
   120  		if !found {
   121  			dlog.Debugf(ctx, "WATCHER: endpoint resolver %s exists by default", rName)
   122  			eri.resolverTypes[rName] = KubernetesEndpointResolver
   123  		}
   124  	}
   125  
   126  	for _, list := range s.Annotations {
   127  		for _, a := range list {
   128  			if _, isInvalid := a.(*kates.Unstructured); isInvalid {
   129  				continue
   130  			}
   131  			if include(GetAmbID(ctx, a)) {
   132  				eri.checkResourcePhase2(ctx, a, "annotation")
   133  			}
   134  		}
   135  	}
   136  
   137  	for _, m := range s.Mappings {
   138  		if include(m.Spec.AmbassadorID) {
   139  			eri.checkMapping(ctx, m, "CRD")
   140  		}
   141  	}
   142  
   143  	for _, t := range s.TCPMappings {
   144  		if include(t.Spec.AmbassadorID) {
   145  			eri.checkTCPMapping(ctx, t, "CRD")
   146  		}
   147  	}
   148  }
   149  
   150  func (eri *endpointRoutingInfo) watchesChanged() bool {
   151  	return !reflect.DeepEqual(eri.endpointWatches, eri.previousWatches)
   152  }
   153  
   154  // checkResourcePhase1 processes Modules and Resolvers and calls the correct type specific handler.
   155  func (eri *endpointRoutingInfo) checkResourcePhase1(ctx context.Context, obj kates.Object, source string) {
   156  	switch v := obj.(type) {
   157  	case *amb.Module:
   158  		eri.checkModule(ctx, v, source)
   159  	case *amb.KubernetesServiceResolver:
   160  		eri.saveResolver(ctx, v.GetName(), KubernetesServiceResolver, "CRD")
   161  	case *amb.KubernetesEndpointResolver:
   162  		eri.saveResolver(ctx, v.GetName(), KubernetesEndpointResolver, "CRD")
   163  	case *amb.ConsulResolver:
   164  		eri.saveResolver(ctx, v.GetName(), ConsulResolver, "CRD")
   165  	}
   166  }
   167  
   168  // checkResourcePhase2 processes both regular and tcp Mappings and calls the correct type specific handler.
   169  func (eri *endpointRoutingInfo) checkResourcePhase2(ctx context.Context, obj kates.Object, source string) {
   170  	switch v := obj.(type) {
   171  	case *amb.Mapping:
   172  		eri.checkMapping(ctx, v, source)
   173  	case *amb.TCPMapping:
   174  		eri.checkTCPMapping(ctx, v, source)
   175  	}
   176  }
   177  
   178  type moduleResolver struct {
   179  	Resolver                                   string `json:"resolver"`
   180  	UseAmbassadorNamespaceForServiceResolution bool   `json:"use_ambassador_namespace_for_service_resolution"`
   181  }
   182  
   183  // checkModule parses the stuff we care about out of the ambassador Module.
   184  func (eri *endpointRoutingInfo) checkModule(ctx context.Context, mod *amb.Module, source string) {
   185  	if mod.GetName() != "ambassador" {
   186  		return
   187  	}
   188  
   189  	mr := moduleResolver{}
   190  	err := convert(mod.Spec.Config, &mr)
   191  
   192  	if err != nil {
   193  		dlog.Errorf(ctx, "error parsing ambassador module: %v", err)
   194  		return
   195  	}
   196  
   197  	// The default resolver is the kubernetes service resolver.
   198  	if mr.Resolver == "" {
   199  		mr.Resolver = "kubernetes-service"
   200  	}
   201  
   202  	eri.module = mr
   203  }
   204  
   205  // saveResolver saves an active resolver in our resolver-type map. This is used for
   206  // all kinds of resolvers, hence the resType parameter.
   207  func (eri *endpointRoutingInfo) saveResolver(ctx context.Context, name string, resType ResolverType, source string) {
   208  	// No magic here, just save the silly thing.
   209  	eri.resolverTypes[name] = resType
   210  
   211  	dlog.Debugf(ctx, "WATCHER: %s resolver %s is active (%s)", resType.String(), name, source)
   212  }
   213  
   214  // checkMapping figures out what resolver is in use for a given Mapping.
   215  func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.Mapping, source string) {
   216  	// Grab the name and the (possibly-empty) resolver.
   217  	name := mapping.GetName()
   218  	resolver := mapping.Spec.Resolver
   219  	service := mapping.Spec.Service
   220  
   221  	if resolver == "" {
   222  		// No specified resolver means "use the default resolver".
   223  		resolver = eri.module.Resolver
   224  		dlog.Debugf(ctx, "WATCHER: Mapping %s uses the default resolver (%s)", name, source)
   225  	}
   226  
   227  	if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
   228  		svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
   229  		eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
   230  	}
   231  }
   232  
   233  // checkTCPMapping figures out what resolver is in use for a given TCPMapping.
   234  func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping *amb.TCPMapping, source string) {
   235  	// Grab the name and the (possibly-empty) resolver.
   236  	name := tcpmapping.GetName()
   237  	resolver := tcpmapping.Spec.Resolver
   238  	service := tcpmapping.Spec.Service
   239  
   240  	if resolver == "" {
   241  		// No specified resolver means "use the default resolver".
   242  		dlog.Debugf(ctx, "WATCHER: TCPMapping %s uses the default resolver (%s)", name, source)
   243  		resolver = eri.module.Resolver
   244  	}
   245  
   246  	if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
   247  		svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
   248  		eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
   249  	}
   250  }
   251  
   252  func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) {
   253  	// First strip off the scheme if it exists.
   254  	parts := strings.SplitN(svcName, "://", 2)
   255  	if len(parts) > 1 {
   256  		svcName = parts[1]
   257  	}
   258  
   259  	// Next split off the port if it exists.
   260  	parts = strings.SplitN(svcName, ":", 2)
   261  	if len(parts) > 1 {
   262  		_, err := strconv.Atoi(parts[1])
   263  		if err == nil {
   264  			port = parts[1]
   265  			svcName = parts[0]
   266  		}
   267  	}
   268  
   269  	// Next check to see if it is an IP address.
   270  	ip := net.ParseIP(svcName)
   271  	if ip != nil {
   272  		name = svcName
   273  	} else if strings.Contains(svcName, ".") {
   274  		// If it's not an ip address but does have a dot then we split it up to find the namespace.
   275  		parts := strings.Split(svcName, ".")
   276  		if len(parts) > 2 {
   277  			using := strings.Join(parts[:2], ".")
   278  			dlog.Errorf(ctx, "mapping %s in namespace %s: ignoring extra domain parts in service, using %q",
   279  				resource.GetName(), resource.GetNamespace(), using)
   280  		}
   281  		name = parts[0]
   282  		namespace = parts[1]
   283  		return
   284  	} else {
   285  		name = svcName
   286  	}
   287  
   288  	if m.UseAmbassadorNamespaceForServiceResolution || svcNamespace == "" {
   289  		namespace = GetAmbassadorNamespace()
   290  	} else {
   291  		namespace = svcNamespace
   292  	}
   293  
   294  	return
   295  }
   296  

View as plain text