...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/irbasemapping.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1import re
     2from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
     3from urllib.parse import quote as urlquote
     4from urllib.parse import scheme_chars
     5from urllib.parse import unquote as urlunquote
     6from urllib.parse import urlparse
     7
     8from ..config import Config
     9from ..utils import dump_json
    10from .irresource import IRResource
    11
    12if TYPE_CHECKING:
    13    from .ir import IR  # pragma: no cover
    14
    15
    16def would_confuse_urlparse(url: str) -> bool:
    17    """Returns whether an URL-ish string would be interpretted by urlparse()
    18    differently than we want, by parsing it as a non-URL URI ("scheme:path")
    19    instead of as a URL ("[scheme:]//authority[:port]/path").  We don't want to
    20    interpret "myhost:8080" as "ParseResult(scheme='myhost', path='8080')"!
    21
    22    Note: This has a Go equivalent in github.com/emissary-ingress/emissary/v3/pkg/emissaryutil.  Please
    23    keep them in-sync.
    24    """
    25    if url.find(":") > 0 and url.lstrip(scheme_chars).startswith("://"):
    26        # has a scheme
    27        return False
    28    if url.startswith("//"):
    29        # does not have a scheme, but has the "//" URL authority marker
    30        return False
    31    return True
    32
    33
    34def normalize_service_name(
    35    ir: "IR",
    36    in_service: str,
    37    mapping_namespace: Optional[str],
    38    resolver_kind: str,
    39    rkey: Optional[str] = None,
    40) -> str:
    41    """
    42    Note: This has a Go equivalent in github.com/emissary-ingress/emissary/v3/pkg/emissaryutil.  Please
    43    keep them in-sync.
    44    """
    45    try:
    46        parsed = urlparse(f"//{in_service}" if would_confuse_urlparse(in_service) else in_service)
    47
    48        if not parsed.hostname:
    49            raise ValueError("No hostname")
    50        # urlib.parse.unquote is permissive, but we want to be strict
    51        bad_seqs = [
    52            seq
    53            for seq in re.findall(r"%.{,2}", parsed.hostname)
    54            if not re.fullmatch(r"%[0-9a-fA-F]{2}", seq)
    55        ]
    56        if bad_seqs:
    57            raise ValueError(f"Invalid percent-escape in hostname: {bad_seqs[0]}")
    58        hostname = urlunquote(parsed.hostname)
    59        scheme = parsed.scheme
    60        port = parsed.port
    61    except ValueError as e:
    62        # This could happen with mismatched [] in a scheme://[IPv6], or with a port that can't
    63        # cast to int, or a port outside [0,2^16), or...
    64        #
    65        # The best we can do here is probably just to log the error, return the original string
    66        # and hope for the best. I guess.
    67
    68        errstr = f"Malformed service {repr(in_service)}: {e}"
    69        if rkey:
    70            errstr = f"{rkey}: {errstr}"
    71        ir.post_error(errstr)
    72
    73        return in_service
    74
    75    # Consul Resolvers don't allow service names to include subdomains, but
    76    # Kubernetes Resolvers _require_ subdomains to correctly handle namespaces.
    77    want_qualified = (
    78        not ir.ambassador_module.use_ambassador_namespace_for_service_resolution
    79        and resolver_kind.startswith("Kubernetes")
    80    )
    81
    82    is_qualified = "." in hostname or ":" in hostname or "localhost" == hostname
    83
    84    if (
    85        mapping_namespace
    86        and mapping_namespace != ir.ambassador_namespace
    87        and want_qualified
    88        and not is_qualified
    89    ):
    90        hostname += "." + mapping_namespace
    91
    92    out_service = urlquote(
    93        hostname, safe="!$&'()*+,;=:[]<>\""
    94    )  # match 'encodeHost' behavior of Go stdlib net/url/url.go
    95    if ":" in out_service:
    96        out_service = f"[{out_service}]"
    97    if scheme:
    98        out_service = f"{scheme}://{out_service}"
    99    if port:
   100        out_service += f":{port}"
   101
   102    ir.logger.debug(
   103        "%s use_ambassador_namespace_for_service_resolution %s, fully qualified %s, upstream hostname %s"
   104        % (
   105            resolver_kind,
   106            ir.ambassador_module.use_ambassador_namespace_for_service_resolution,
   107            is_qualified,
   108            out_service,
   109        )
   110    )
   111
   112    return out_service
   113
   114
   115class IRBaseMapping(IRResource):
   116    group_id: str
   117    host: Optional[str]
   118    route_weight: List[Union[str, int]]
   119    cached_status: Optional[Dict[str, str]]
   120    status_update: Optional[Dict[str, str]]
   121    cluster_key: Optional[str]
   122    _weight: int
   123
   124    def __init__(
   125        self,
   126        ir: "IR",
   127        aconf: Config,
   128        rkey: str,  # REQUIRED
   129        name: str,  # REQUIRED
   130        location: str,  # REQUIRED
   131        kind: str,  # REQUIRED
   132        namespace: Optional[str] = None,
   133        metadata_labels: Optional[Dict[str, str]] = None,
   134        apiVersion: str = "getambassador.io/v3alpha1",
   135        precedence: int = 0,
   136        cluster_tag: Optional[str] = None,
   137        **kwargs,
   138    ) -> None:
   139        # Default status...
   140        self.cached_status = None
   141        self.status_update = None
   142
   143        # Start by assuming that we don't know the cluster key for this Mapping.
   144        self.cluster_key = None
   145
   146        # We don't know the calculated weight yet, so set it to 0.
   147        self._weight = 0
   148
   149        # Init the superclass...
   150        super().__init__(
   151            ir=ir,
   152            aconf=aconf,
   153            rkey=rkey,
   154            location=location,
   155            kind=kind,
   156            name=name,
   157            namespace=namespace,
   158            metadata_labels=metadata_labels,
   159            apiVersion=apiVersion,
   160            precedence=precedence,
   161            cluster_tag=cluster_tag,
   162            **kwargs,
   163        )
   164
   165    @classmethod
   166    def make_cache_key(cls, kind: str, name: str, namespace: str, version: str = "v2") -> str:
   167        # Why is this split on the name necessary?
   168        # the name of a Mapping when we fetch it from the aconf will match the metadata.name of
   169        # the Mapping that the config comes from _only if_ it is the only Mapping with that exact name.
   170        # If there are multiple Mappings with the same name in different namespaces then the name
   171        # becomes `name.namespace` for all mappings of the same name after the first one.
   172        # The first one just gets to be `name` for "reasons".
   173        #
   174        # This behaviour is needed by other places in the code, but for the cache key, we need it to match the
   175        # below format regardless of how many Mappings there are with that name. This is necessary for the cache
   176        # specifically because there are places where we interact with the cache that have access to the
   177        # metadata.name and metadata.namespace of the Mapping, but do not have access to the aconf representation
   178        # of the Mapping name and thus have no way of knowing whether a specific name is mangled due to multiple
   179        # Mappings sharing the same name or not.
   180        name = name.split(".")[0]
   181        return f"{kind}-{version}-{name}-{namespace}"
   182
   183    def setup(self, ir: "IR", aconf: Config) -> bool:
   184        # Set up our cache key. We're using this format so that it'll be easy
   185        # to generate it just from the Mapping's K8s metadata.
   186        self._cache_key = IRBaseMapping.make_cache_key(self.kind, self.name, self.namespace)
   187
   188        # ...and start without a cluster key for this Mapping.
   189        self.cluster_key = None
   190
   191        # We assume that any subclass madness is managed already, so we can compute the group ID...
   192        self.group_id = self._group_id()
   193
   194        # ...and the route weight.
   195        self.route_weight = self._route_weight()
   196
   197        # We can also default the resolver, and scream if it doesn't match a resolver we
   198        # know about.
   199        if not self.get("resolver"):
   200            self.resolver = self.ir.ambassador_module.get("resolver", "kubernetes-service")
   201
   202        resolver = self.ir.get_resolver(self.resolver)
   203
   204        if not resolver:
   205            self.post_error(f"resolver {self.resolver} is unknown!")
   206            return False
   207
   208        self.ir.logger.debug(
   209            "%s: GID %s route_weight %s, resolver %s"
   210            % (self, self.group_id, self.route_weight, resolver)
   211        )
   212
   213        # And, of course, we can make sure that the resolver thinks that this Mapping is OK.
   214        if not resolver.valid_mapping(ir, self):
   215            # If there's trouble, the resolver should've already posted about it.
   216            return False
   217
   218        if self.get("circuit_breakers", None) is None:
   219            self["circuit_breakers"] = ir.ambassador_module.circuit_breakers
   220
   221        if self.get("circuit_breakers", None) is not None:
   222            if not self.validate_circuit_breakers(ir, self["circuit_breakers"]):
   223                self.post_error(
   224                    "Invalid circuit_breakers specified: {}, invalidating mapping".format(
   225                        self["circuit_breakers"]
   226                    )
   227                )
   228                return False
   229
   230        return True
   231
   232    @staticmethod
   233    def validate_circuit_breakers(ir: "IR", circuit_breakers) -> bool:
   234        if not isinstance(circuit_breakers, (list, tuple)):
   235            return False
   236
   237        for circuit_breaker in circuit_breakers:
   238            if "_name" in circuit_breaker:
   239                # Already reconciled.
   240                ir.logger.debug(f'Breaker validation: good breaker {circuit_breaker["_name"]}')
   241                continue
   242
   243            ir.logger.debug(f"Breaker validation: {dump_json(circuit_breakers, pretty=True)}")
   244
   245            name_fields = ["cb"]
   246
   247            if "priority" in circuit_breaker:
   248                prio = circuit_breaker.get("priority").lower()
   249                if prio not in ["default", "high"]:
   250                    return False
   251
   252                name_fields.append(prio[0])
   253            else:
   254                name_fields.append("n")
   255
   256            digit_fields = [
   257                ("max_connections", "c"),
   258                ("max_pending_requests", "p"),
   259                ("max_requests", "r"),
   260                ("max_retries", "t"),
   261            ]
   262
   263            for field, abbrev in digit_fields:
   264                if field in circuit_breaker:
   265                    try:
   266                        value = int(circuit_breaker[field])
   267                        name_fields.append(f"{abbrev}{value}")
   268                    except ValueError:
   269                        return False
   270
   271            circuit_breaker["_name"] = "".join(name_fields)
   272            ir.logger.debug(f'Breaker valid: {circuit_breaker["_name"]}')
   273
   274        return True
   275
   276    def get_label(self, key: str) -> Optional[str]:
   277        labels = self.get("metadata_labels") or {}
   278        return labels.get(key) or None
   279
   280    def status(self) -> Optional[Dict[str, Any]]:
   281        """
   282        Return the new status we should have. Subclasses would typically override
   283        this.
   284
   285        :return: new status (may be None)
   286        """
   287        return None
   288
   289    def check_status(self) -> None:
   290        crd_name = self.get_label("ambassador_crd")
   291
   292        if not crd_name:
   293            return
   294
   295        # OK, we're supposed to be a CRD. What status do we want, and
   296        # what do we have?
   297
   298        wanted = self.status()
   299
   300        if wanted != self.cached_status:
   301            self.ir.k8s_status_updates[crd_name] = ("Mapping", self.namespace, wanted)
   302
   303    def _group_id(self) -> str:
   304        """Compute the group ID for this Mapping. Must be defined by subclasses."""
   305        raise NotImplementedError("%s._group_id is not implemented?" % self.__class__.__name__)
   306
   307    def _route_weight(self) -> List[Union[str, int]]:
   308        """Compute the route weight for this Mapping. Must be defined by subclasses."""
   309        raise NotImplementedError("%s._route_weight is not implemented?" % self.__class__.__name__)

View as plain text