...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/endpoints.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  	"reflect"
     8  	"strconv"
     9  	"strings"
    10  
    11  	"github.com/datawire/dlib/dlog"
    12  	amb "github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
    13  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    14  	snapshotTypes "github.com/emissary-ingress/emissary/v3/pkg/snapshot/v1"
    15  )
    16  
    17  // endpointRoutingInfo keeps track of everything we need to know to figure out if
    18  // endpoint routing is active.
    19  type endpointRoutingInfo struct {
    20  	// Map from resolver name to resolver type.
    21  	resolverTypes   map[string]ResolverType
    22  	module          moduleResolver
    23  	endpointWatches map[string]bool // A set to track the subset of kubernetes endpoints we care about.
    24  	previousWatches map[string]bool
    25  }
    26  
    27  type ResolverType int
    28  
    29  const (
    30  	KubernetesServiceResolver ResolverType = iota
    31  	KubernetesEndpointResolver
    32  	ConsulResolver
    33  )
    34  
    35  func (rt ResolverType) String() string {
    36  	switch rt {
    37  	case KubernetesServiceResolver:
    38  		return "KubernetesServiceResolver"
    39  	case KubernetesEndpointResolver:
    40  		return "KubernetesEndpointResolver"
    41  	case ConsulResolver:
    42  		return "ConsulResolver"
    43  	default:
    44  		panic(fmt.Errorf("ResolverType.String: invalid enum value: %d", rt))
    45  	}
    46  }
    47  
    48  // newEndpointRoutingInfo creates a shiny new struct to hold information about
    49  // resolvers in use and such.
    50  func newEndpointRoutingInfo() endpointRoutingInfo {
    51  	return endpointRoutingInfo{
    52  		// resolverTypes keeps track of the type of every resolver in the system.
    53  		// It starts out empty.
    54  		//
    55  		// Why do we need to look at all the resolvers? Because, unless the user
    56  		// overrides them, resolvers "endpoint" and "kubernetes-endpoint" are
    57  		// implicitly endpoint resolvers -- but they won't show up in the snapshot.
    58  		// So we need to track whether they've been redefined. Sigh.
    59  		resolverTypes: make(map[string]ResolverType),
    60  		// Track which endpoints we actually want to watch.
    61  		endpointWatches: make(map[string]bool),
    62  	}
    63  }
    64  
    65  func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s *snapshotTypes.KubernetesSnapshot) {
    66  	envAmbID := GetAmbassadorID()
    67  
    68  	// Reset our state except for the previous endpoint watches. We keep them so we can detect if
    69  	// the set of things we are interested in has changed.
    70  	eri.resolverTypes = map[string]ResolverType{}
    71  	eri.module = moduleResolver{}
    72  	eri.previousWatches = eri.endpointWatches
    73  	eri.endpointWatches = map[string]bool{}
    74  
    75  	// Phase one processes all the configuration stuff that Mappings depend on. Right now this
    76  	// includes Modules and Resolvers. When we are done with Phase one we have processed enough
    77  	// resources to correctly interpret Mappings.
    78  	for _, list := range s.Annotations {
    79  		for _, a := range list {
    80  			if _, isInvalid := a.(*kates.Unstructured); isInvalid {
    81  				continue
    82  			}
    83  			if GetAmbID(ctx, a).Matches(envAmbID) {
    84  				eri.checkResourcePhase1(ctx, a, "annotation")
    85  			}
    86  		}
    87  	}
    88  
    89  	// After that, walk all the other resources. We do this with separate loops
    90  	// for each type -- since we know a priori what type they are, there's no
    91  	// need to test every resource, and no need to walk over things we're not
    92  	// interested in.
    93  	for _, m := range s.Modules {
    94  		if m.Spec.AmbassadorID.Matches(envAmbID) {
    95  			eri.checkModule(ctx, m, "CRD")
    96  		}
    97  	}
    98  
    99  	for _, r := range s.KubernetesServiceResolvers {
   100  		if r.Spec.AmbassadorID.Matches(envAmbID) {
   101  			eri.saveResolver(ctx, r.GetName(), KubernetesServiceResolver, "CRD")
   102  		}
   103  	}
   104  
   105  	for _, r := range s.KubernetesEndpointResolvers {
   106  		if r.Spec.AmbassadorID.Matches(envAmbID) {
   107  			eri.saveResolver(ctx, r.GetName(), KubernetesEndpointResolver, "CRD")
   108  		}
   109  	}
   110  
   111  	for _, r := range s.ConsulResolvers {
   112  		if r.Spec.AmbassadorID.Matches(envAmbID) {
   113  			eri.saveResolver(ctx, r.GetName(), ConsulResolver, "CRD")
   114  		}
   115  	}
   116  
   117  	// Once all THAT is done, make sure to define the default "endpoint" and
   118  	// "kubernetes-endpoint" resolvers if they don't exist.
   119  	for _, rName := range []string{"endpoint", "kubernetes-endpoint"} {
   120  		_, found := eri.resolverTypes[rName]
   121  
   122  		if !found {
   123  			dlog.Debugf(ctx, "WATCHER: endpoint resolver %s exists by default", rName)
   124  			eri.resolverTypes[rName] = KubernetesEndpointResolver
   125  		}
   126  	}
   127  
   128  	for _, list := range s.Annotations {
   129  		for _, a := range list {
   130  			if _, isInvalid := a.(*kates.Unstructured); isInvalid {
   131  				continue
   132  			}
   133  			if GetAmbID(ctx, a).Matches(envAmbID) {
   134  				eri.checkResourcePhase2(ctx, a, "annotation")
   135  			}
   136  		}
   137  	}
   138  
   139  	for _, m := range s.Mappings {
   140  		if m.Spec.AmbassadorID.Matches(envAmbID) {
   141  			eri.checkMapping(ctx, m, "CRD")
   142  		}
   143  	}
   144  
   145  	for _, t := range s.TCPMappings {
   146  		if t.Spec.AmbassadorID.Matches(envAmbID) {
   147  			eri.checkTCPMapping(ctx, t, "CRD")
   148  		}
   149  	}
   150  }
   151  
   152  func (eri *endpointRoutingInfo) watchesChanged() bool {
   153  	return !reflect.DeepEqual(eri.endpointWatches, eri.previousWatches)
   154  }
   155  
   156  // checkResourcePhase1 processes Modules and Resolvers and calls the correct type specific handler.
   157  func (eri *endpointRoutingInfo) checkResourcePhase1(ctx context.Context, obj kates.Object, source string) {
   158  	switch v := obj.(type) {
   159  	case *amb.Module:
   160  		eri.checkModule(ctx, v, source)
   161  	case *amb.KubernetesServiceResolver:
   162  		eri.saveResolver(ctx, v.GetName(), KubernetesServiceResolver, "CRD")
   163  	case *amb.KubernetesEndpointResolver:
   164  		eri.saveResolver(ctx, v.GetName(), KubernetesEndpointResolver, "CRD")
   165  	case *amb.ConsulResolver:
   166  		eri.saveResolver(ctx, v.GetName(), ConsulResolver, "CRD")
   167  	}
   168  }
   169  
   170  // checkResourcePhase2 processes both regular and tcp Mappings and calls the correct type specific handler.
   171  func (eri *endpointRoutingInfo) checkResourcePhase2(ctx context.Context, obj kates.Object, source string) {
   172  	switch v := obj.(type) {
   173  	case *amb.Mapping:
   174  		eri.checkMapping(ctx, v, source)
   175  	case *amb.TCPMapping:
   176  		eri.checkTCPMapping(ctx, v, source)
   177  	}
   178  }
   179  
   180  type moduleResolver struct {
   181  	Resolver                                   string `json:"resolver"`
   182  	UseAmbassadorNamespaceForServiceResolution bool   `json:"use_ambassador_namespace_for_service_resolution"`
   183  }
   184  
   185  // checkModule parses the stuff we care about out of the ambassador Module.
   186  func (eri *endpointRoutingInfo) checkModule(ctx context.Context, mod *amb.Module, source string) {
   187  	if mod.GetName() != "ambassador" {
   188  		return
   189  	}
   190  
   191  	mr := moduleResolver{}
   192  	err := convert(mod.Spec.Config, &mr)
   193  
   194  	if err != nil {
   195  		dlog.Errorf(ctx, "error parsing ambassador module: %v", err)
   196  		return
   197  	}
   198  
   199  	// The default resolver is the kubernetes service resolver.
   200  	if mr.Resolver == "" {
   201  		mr.Resolver = "kubernetes-service"
   202  	}
   203  
   204  	eri.module = mr
   205  }
   206  
   207  // saveResolver saves an active resolver in our resolver-type map. This is used for
   208  // all kinds of resolvers, hence the resType parameter.
   209  func (eri *endpointRoutingInfo) saveResolver(ctx context.Context, name string, resType ResolverType, source string) {
   210  	// No magic here, just save the silly thing.
   211  	eri.resolverTypes[name] = resType
   212  
   213  	dlog.Debugf(ctx, "WATCHER: %s resolver %s is active (%s)", resType.String(), name, source)
   214  }
   215  
   216  // checkMapping figures out what resolver is in use for a given Mapping.
   217  func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.Mapping, source string) {
   218  	// Grab the name and the (possibly-empty) resolver.
   219  	name := mapping.GetName()
   220  	resolver := mapping.Spec.Resolver
   221  	service := mapping.Spec.Service
   222  
   223  	if resolver == "" {
   224  		// No specified resolver means "use the default resolver".
   225  		resolver = eri.module.Resolver
   226  		dlog.Debugf(ctx, "WATCHER: Mapping %s uses the default resolver (%s)", name, source)
   227  	}
   228  
   229  	if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
   230  		svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
   231  		eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
   232  	}
   233  }
   234  
   235  // checkTCPMapping figures out what resolver is in use for a given TCPMapping.
   236  func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping *amb.TCPMapping, source string) {
   237  	// Grab the name and the (possibly-empty) resolver.
   238  	name := tcpmapping.GetName()
   239  	resolver := tcpmapping.Spec.Resolver
   240  	service := tcpmapping.Spec.Service
   241  
   242  	if resolver == "" {
   243  		// No specified resolver means "use the default resolver".
   244  		dlog.Debugf(ctx, "WATCHER: TCPMapping %s uses the default resolver (%s)", name, source)
   245  		resolver = eri.module.Resolver
   246  	}
   247  
   248  	if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
   249  		svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
   250  		eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
   251  	}
   252  }
   253  
   254  func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) {
   255  	// First strip off the scheme if it exists.
   256  	parts := strings.SplitN(svcName, "://", 2)
   257  	if len(parts) > 1 {
   258  		svcName = parts[1]
   259  	}
   260  
   261  	// Next split off the port if it exists.
   262  	parts = strings.SplitN(svcName, ":", 2)
   263  	if len(parts) > 1 {
   264  		_, err := strconv.Atoi(parts[1])
   265  		if err == nil {
   266  			port = parts[1]
   267  			svcName = parts[0]
   268  		}
   269  	}
   270  
   271  	// Next check to see if it is an IP address.
   272  	ip := net.ParseIP(svcName)
   273  	if ip != nil {
   274  		name = svcName
   275  	} else if strings.Contains(svcName, ".") {
   276  		// If it's not an ip address but does have a dot then we split it up to find the namespace.
   277  		parts := strings.Split(svcName, ".")
   278  		if len(parts) > 2 {
   279  			using := strings.Join(parts[:2], ".")
   280  			dlog.Errorf(ctx, "mapping %s in namespace %s: ignoring extra domain parts in service, using %q",
   281  				resource.GetName(), resource.GetNamespace(), using)
   282  		}
   283  		name = parts[0]
   284  		namespace = parts[1]
   285  		return
   286  	} else {
   287  		name = svcName
   288  	}
   289  
   290  	if m.UseAmbassadorNamespaceForServiceResolution || svcNamespace == "" {
   291  		namespace = GetAmbassadorNamespace()
   292  	} else {
   293  		namespace = svcNamespace
   294  	}
   295  
   296  	return
   297  }
   298  

View as plain text