...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/irserviceresolver.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1from ipaddress import ip_address
     2from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
     3
     4from ..config import Config
     5from .irresource import IRResource
     6
     7if TYPE_CHECKING:
     8    from .ir import IR  # pragma: no cover
     9    from .irbasemapping import IRBaseMapping  # pragma: no cover
    10    from .ircluster import IRCluster  # pragma: no cover
    11
    12#############################################################################
    13## irserviceresolver.py -- resolve endpoints for services
    14##
    15## IRServiceResolver does the work of looking into Service data structures.
    16## There are, naturally, some weirdnesses.
    17##
    18## Here's the way this goes:
    19##
    20## When you create an AConf, you must hand in Service objects and Resolver
    21## objects. (This will generally happen by virtue of the ResourceFetcher
    22## finding them someplace.) There can be multiple kinds of Resolver objects
    23## (e.g. ConsulResolver, KubernetesEndpointResolver, etc.).
    24##
    25## When you create an IR from that AConf, the various kinds of Resolvers
    26## all get turned into IRServiceResolvers, and the IR uses those to handle
    27## the mechanics of finding the upstream endpoints for a service.
    28
    29SvcEndpoint = Dict[str, Union[int, str]]
    30SvcEndpointSet = List[SvcEndpoint]
    31ClustermapEntry = Dict[str, Union[int, str]]
    32
    33
    34class IRServiceResolver(IRResource):
    35    def __init__(
    36        self,
    37        ir: "IR",
    38        aconf: Config,
    39        rkey: str = "ir.resolver",
    40        kind: str = "IRServiceResolver",
    41        name: str = "ir.resolver",
    42        location: str = "--internal--",
    43        **kwargs,
    44    ) -> None:
    45        super().__init__(
    46            ir=ir, aconf=aconf, rkey=rkey, kind=kind, name=name, location=location, **kwargs
    47        )
    48
    49    def setup(self, ir: "IR", aconf: Config) -> bool:
    50        if self.kind == "ConsulResolver":
    51            self.resolve_with = "consul"
    52
    53            if not self.get("datacenter"):
    54                self.post_error("ConsulResolver is required to have a datacenter")
    55                return False
    56        elif self.kind == "KubernetesServiceResolver":
    57            self.resolve_with = "k8s"
    58        elif self.kind == "KubernetesEndpointResolver":
    59            self.resolve_with = "k8s"
    60        else:
    61            self.post_error(f"Resolver kind {self.kind} unknown")
    62            return False
    63
    64        return True
    65
    66    def valid_mapping(self, ir: "IR", mapping: "IRBaseMapping") -> bool:
    67        fn = {
    68            "KubernetesServiceResolver": self._k8s_svc_valid_mapping,
    69            "KubernetesEndpointResolver": self._k8s_valid_mapping,
    70            "ConsulResolver": self._consul_valid_mapping,
    71        }[self.kind]
    72
    73        return fn(ir, mapping)
    74
    75    def _k8s_svc_valid_mapping(self, ir: "IR", mapping: "IRBaseMapping"):
    76        # You're not allowed to specific a load balancer with a KubernetesServiceResolver.
    77        if mapping.get("load_balancer"):
    78            mapping.post_error(
    79                "No load_balancer setting is allowed with the KubernetesServiceResolver"
    80            )
    81            return False
    82
    83        return True
    84
    85    def _k8s_valid_mapping(self, ir: "IR", mapping: "IRBaseMapping"):
    86        # There's no real validation to do here beyond what the Mapping already does.
    87        return True
    88
    89    def _consul_valid_mapping(self, ir: "IR", mapping: "IRBaseMapping"):
    90        # Mappings using the Consul resolver can't use service names with '.', or port
    91        # override. We currently do this the cheap & sleazy way.
    92
    93        valid = True
    94
    95        if mapping.service.find(".") >= 0:
    96            mapping.post_error("The Consul resolver does not allow dots in service names")
    97            valid = False
    98
    99        if mapping.service.find(":") >= 0:
   100            # This is not an _error_ per se -- we'll accept the mapping and just ignore the port.
   101            ir.aconf.post_notice(
   102                "The Consul resolver does not allow overriding service port; ignoring requested port",
   103                resource=mapping,
   104            )
   105
   106        return valid
   107
   108    def resolve(
   109        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   110    ) -> Optional[SvcEndpointSet]:
   111        fn = {
   112            "KubernetesServiceResolver": self._k8s_svc_resolver,
   113            "KubernetesEndpointResolver": self._k8s_resolver,
   114            "ConsulResolver": self._consul_resolver,
   115        }[self.kind]
   116
   117        return fn(ir, cluster, svc_name, svc_namespace, port)
   118
   119    def _k8s_svc_resolver(
   120        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   121    ) -> Optional[SvcEndpointSet]:
   122        # The K8s service resolver always returns a single endpoint.
   123        return [{"ip": svc_name, "port": port, "target_kind": "DNSname"}]
   124
   125    def _k8s_resolver(
   126        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   127    ) -> Optional[SvcEndpointSet]:
   128        svc, namespace = self.parse_service(ir, svc_name, svc_namespace)
   129        # Find endpoints, and try for a port match!
   130        return self.get_endpoints(ir, f"k8s-{svc}-{namespace}", port)
   131
   132    def parse_service(self, ir: "IR", svc_name: str, svc_namespace: str) -> Tuple[str, str]:
   133        # K8s service names can be 'svc' or 'svc.namespace'. Which does this look like?
   134        svc = svc_name
   135        namespace = Config.ambassador_namespace
   136
   137        if "." in svc and not is_ip_address(svc):
   138            # OK, cool. Peel off the service and the namespace.
   139            #
   140            # Note that some people may use service.namespace.cluster.svc.local or
   141            # some such crap. The [0:2] is to restrict this to just the first two
   142            # elements if there are more, but still work if there are not.
   143
   144            (svc, namespace) = svc.split(".", 2)[0:2]
   145        elif (
   146            not ir.ambassador_module.use_ambassador_namespace_for_service_resolution
   147            and svc_namespace
   148        ):
   149            namespace = svc_namespace
   150            ir.logger.debug(
   151                "KubernetesEndpointResolver use_ambassador_namespace_for_service_resolution %s, upstream key %s"
   152                % (
   153                    ir.ambassador_module.use_ambassador_namespace_for_service_resolution,
   154                    f"{svc}-{namespace}",
   155                )
   156            )
   157
   158        return svc, namespace
   159
   160    def _consul_resolver(
   161        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   162    ) -> Optional[SvcEndpointSet]:
   163        # For Consul, we look things up with the service name and the datacenter at present.
   164        # We ignore the port in the lookup (we should've already posted a warning about the port
   165        # being present, actually).
   166
   167        return self.get_endpoints(ir, f"consul-{svc_name}-{self.datacenter}", None)
   168
   169    def get_endpoints(self, ir: "IR", key: str, port: Optional[int]) -> Optional[SvcEndpointSet]:
   170        # OK. Do we have a Service by this key?
   171        service = ir.services.get(key)
   172
   173        if not service:
   174            self.logger.debug(f"Resolver {self.name}: {key} matches no Service for endpoints")
   175            return None
   176
   177        self.logger.debug(f"Resolver {self.name}: {key} matches %s" % service.as_json())
   178
   179        endpoints = service.get("endpoints")
   180
   181        if not endpoints:
   182            self.logger.debug(f"Resolver {self.name}: {key} has no endpoints")
   183            return None
   184
   185        # Do we have a match for the port they're asking for (y'know, if they're asking for one)?
   186
   187        targets = endpoints.get(port or "*")
   188
   189        if targets:
   190            # Yes!
   191            tstr = ", ".join([f'{x["ip"]}:{x["port"]}' for x in targets])
   192
   193            self.logger.debug(f"Resolver {self.name}: {key}:{port} matches {tstr}")
   194
   195            return targets
   196        else:
   197            hrtype = "Kubernetes" if (self.resolve_with == "k8s") else self.resolve_with
   198
   199            # This is ugly. We're almost certainly being called from _within_ the initialization
   200            # of the cluster here -- so I guess we'll report the error against the service. Sigh.
   201            self.ir.aconf.post_error(
   202                f"Service {service.name}: {key}:{port} matches no endpoints from {hrtype}",
   203                resource=service,
   204            )
   205
   206            return None
   207
   208    def clustermap_entry(
   209        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   210    ) -> ClustermapEntry:
   211        fn = {
   212            "KubernetesServiceResolver": self._k8s_svc_clustermap_entry,
   213            "KubernetesEndpointResolver": self._k8s_clustermap_entry,
   214            "ConsulResolver": self._consul_clustermap_entry,
   215        }[self.kind]
   216
   217        return fn(ir, cluster, svc_name, svc_namespace, port)
   218
   219    def _k8s_svc_clustermap_entry(
   220        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   221    ) -> ClustermapEntry:
   222        # The K8s service resolver always returns a single endpoint.
   223        svc, namespace = self.parse_service(ir, svc_name, svc_namespace)
   224        return {"port": port, "kind": self.kind, "service": svc, "namespace": namespace}
   225
   226    def _k8s_clustermap_entry(
   227        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   228    ) -> ClustermapEntry:
   229        # Fallback to the KubernetesServiceResolver for IP addresses or if the service doesn't exist.
   230        if is_ip_address(svc_name):
   231            return {
   232                "service": svc_name,
   233                "namespace": svc_namespace,
   234                "port": port,
   235                "kind": "KubernetesServiceResolver",
   236            }
   237
   238        if port:
   239            portstr = "/%s" % port
   240        else:
   241            portstr = ""
   242        svc, namespace = self.parse_service(ir, svc_name, svc_namespace)
   243        # Find endpoints, and try for a port match!
   244        return {
   245            "service": svc,
   246            "namespace": namespace,
   247            "port": port,
   248            "kind": self.kind,
   249            "endpoint_path": "k8s/%s/%s%s" % (namespace, svc, portstr),
   250        }
   251
   252    def _consul_clustermap_entry(
   253        self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
   254    ) -> ClustermapEntry:
   255        # Fallback to the KubernetesServiceResolver for ip addresses.
   256        if is_ip_address(svc_name):
   257            return {
   258                "service": svc_name,
   259                "namespace": svc_namespace,
   260                "port": port,
   261                "kind": "KubernetesServiceResolver",
   262            }
   263
   264        # For Consul, we look things up with the service name and the datacenter at present.
   265        # We ignore the port in the lookup (we should've already posted a warning about the port
   266        # being present, actually).
   267        return {
   268            "service": svc_name,
   269            "datacenter": self.datacenter,
   270            "kind": self.kind,
   271            "endpoint_path": "consul/%s/%s" % (self.datacenter, svc_name),
   272        }
   273
   274
   275class IRServiceResolverFactory:
   276    @classmethod
   277    def load_all(cls, ir: "IR", aconf: Config) -> None:
   278        config_info = aconf.get_config("resolvers")
   279
   280        if config_info:
   281            assert len(config_info) > 0  # really rank paranoia on my part...
   282
   283            for config in config_info.values():
   284                cdict = config.as_dict()
   285                cdict["rkey"] = config.rkey
   286                cdict["location"] = config.location
   287
   288                ir.add_resolver(IRServiceResolver(ir, aconf, **cdict))
   289
   290        if not ir.get_resolver("kubernetes-service"):
   291            # Default the K8s service resolver.
   292            resolver_config = {
   293                "apiVersion": "getambassador.io/v3alpha1",
   294                "kind": "KubernetesServiceResolver",
   295                "name": "kubernetes-service",
   296            }
   297
   298            if Config.single_namespace:
   299                resolver_config["namespace"] = Config.ambassador_namespace
   300
   301            ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
   302
   303        # Ugh, the aliasing for the K8s and Consul endpoint resolvers is annoying.
   304        res_e = ir.get_resolver("endpoint")
   305        res_k_e = ir.get_resolver("kubernetes-endpoint")
   306
   307        if not res_e and not res_k_e:
   308            # Neither exists. Create them from scratch.
   309
   310            resolver_config = {
   311                "apiVersion": "getambassador.io/v3alpha1",
   312                "kind": "KubernetesEndpointResolver",
   313                "name": "kubernetes-endpoint",
   314            }
   315
   316            if Config.single_namespace:
   317                resolver_config["namespace"] = Config.ambassador_namespace
   318
   319            ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
   320
   321            resolver_config["name"] = "endpoint"
   322
   323            ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
   324        else:
   325            cls.check_aliases(ir, aconf, "endpoint", res_e, "kubernetes-endpoint", res_k_e)
   326
   327        res_c = ir.get_resolver("consul")
   328        res_c_e = ir.get_resolver("consul-endpoint")
   329
   330        if not res_c and not res_c_e:
   331            # Neither exists. Create them from scratch.
   332
   333            resolver_config = {
   334                "apiVersion": "getambassador.io/v3alpha1",
   335                "kind": "ConsulResolver",
   336                "name": "consul-endpoint",
   337                "datacenter": "dc1",
   338            }
   339
   340            ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
   341
   342            resolver_config["name"] = "consul"
   343
   344            ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
   345        else:
   346            cls.check_aliases(ir, aconf, "consul", res_c, "consul-endpoint", res_c_e)
   347
   348    @classmethod
   349    def check_aliases(
   350        cls,
   351        ir: "IR",
   352        aconf: Config,
   353        n1: str,
   354        r1: Optional[IRServiceResolver],
   355        n2: str,
   356        r2: Optional[IRServiceResolver],
   357    ) -> None:
   358        source = None
   359        name = None
   360
   361        if not r1:
   362            # r2 must exist to be here.
   363            source = r2
   364            name = n1
   365        elif not r2:
   366            # r1 must exist to be here.
   367            source = r1
   368            name = n2
   369
   370        if source:
   371            config = dict(**source.as_dict())
   372
   373            # Fix up this dict. Sigh.
   374            config["rkey"] = config.pop("_rkey", config.get("rkey", None))  # Kludge, I know...
   375            config.pop("_errored", None)
   376            config.pop("_active", None)
   377            config.pop("resolve_with", None)
   378
   379            config["name"] = name
   380
   381            ir.add_resolver(IRServiceResolver(ir, aconf, **config))
   382
   383
   384def is_ip_address(addr: str) -> bool:
   385    try:
   386        x = ip_address(addr)
   387        return True
   388    except ValueError:
   389        return False

View as plain text