...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/ircluster.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1# Copyright 2018 Datawire. All rights reserved.
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#     http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License
    14
    15import re
    16import urllib.parse
    17from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
    18from typing import cast as typecast
    19
    20from ..config import Config
    21from ..utils import RichStatus
    22from .irhealthchecks import IRHealthChecks
    23from .irresource import IRResource
    24from .irtlscontext import IRTLSContext
    25
    26if TYPE_CHECKING:
    27    from .ir import IR  # pragma: no cover
    28    from .ir.irserviceresolver import IRServiceResolver  # pragma: no cover
    29
    30#############################################################################
    31## ircluster.py -- the ircluster configuration object for Ambassador
    32##
    33## IRCluster represents an Envoy cluster: a collection of endpoints that
    34## provide a single service. IRClusters get used for quite a few different
    35## things in Ambassador -- they are basically the generic "upstream service"
    36## entity.
    37
    38
    39class IRCluster(IRResource):
    40    def __init__(
    41        self,
    42        ir: "IR",
    43        aconf: Config,
    44        parent_ir_resource: "IRResource",
    45        location: str,  # REQUIRED
    46        service: str,  # REQUIRED
    47        resolver: Optional[str] = None,
    48        connect_timeout_ms: Optional[int] = 3000,
    49        cluster_idle_timeout_ms: Optional[int] = None,
    50        cluster_max_connection_lifetime_ms: Optional[int] = None,
    51        marker: Optional[str] = None,  # extra marker for this context name
    52        stats_name: Optional[str] = None,  # Override the stats name for this cluster
    53        ctx_name: Optional[Union[str, bool]] = None,
    54        host_rewrite: Optional[str] = None,
    55        dns_type: Optional[str] = "strict_dns",
    56        enable_ipv4: Optional[bool] = None,
    57        enable_ipv6: Optional[bool] = None,
    58        lb_type: str = "round_robin",
    59        grpc: Optional[bool] = False,
    60        allow_scheme: Optional[bool] = True,
    61        load_balancer: Optional[dict] = None,
    62        keepalive: Optional[dict] = None,
    63        circuit_breakers: Optional[list] = None,
    64        respect_dns_ttl: Optional[bool] = False,
    65        health_checks: Optional[IRHealthChecks] = None,
    66        rkey: str = "-override-",
    67        kind: str = "IRCluster",
    68        apiVersion: str = "getambassador.io/v0",  # Not a typo! See below.
    69        **kwargs,
    70    ) -> None:
    71        # Step one: look at the service and such and figure out a cluster name
    72        # and TLS origination info.
    73
    74        # Here's how it goes:
    75        # - If allow_scheme is True and the service starts with https://, it is forced
    76        #   to originate TLS.
    77        # - Else, if allow_scheme is True and the service starts with http://, it is
    78        #   forced to _not_ originate TLS.
    79        # - Else, if we have a context (either a string that names a valid context,
    80        #   or the boolean value True), it will originate TLS.
    81        #
    82        # After figuring that out, if we have a context which is a string value,
    83        # we try to use that context name to look up certs to use. If we can't
    84        # find any, we won't send any originating cert.
    85        #
    86        # Finally, if no port is present in the service already, we force port 443
    87        # if we're originating TLS, 80 if not.
    88
    89        originate_tls: bool = False
    90        name_fields: List[str] = ["cluster"]
    91        ctx: Optional[IRTLSContext] = None
    92        errors: List[str] = []
    93        unknown_breakers = 0
    94
    95        # Do we have a marker?
    96        if marker:
    97            name_fields.append(marker)
    98
    99        # Set this flag to True if you discover something that's grave enough to warrant ignoring this cluster
   100        self.ignore_cluster = False
   101
   102        self.logger = ir.logger
   103
   104        # Toss in the original service before we mess with it, too.
   105        name_fields.append(service)
   106
   107        # If we have a ctx_name, does it match a real context?
   108        if ctx_name:
   109            if ctx_name is True:
   110                ir.logger.debug("using null context")
   111                ctx = IRTLSContext.null_context(ir=ir)
   112            else:
   113                ir.logger.debug("seeking named context %s" % ctx_name)
   114                ctx = ir.get_tls_context(typecast(str, ctx_name))
   115
   116            if not ctx:
   117                ir.logger.debug("no named context %s" % ctx_name)
   118                errors.append("Originate-TLS context %s is not defined" % ctx_name)
   119            else:
   120                ir.logger.debug("found context %s" % ctx)
   121
   122        # TODO: lots of duplication of here, need to replace with broken down functions
   123
   124        if allow_scheme and service.lower().startswith("https://"):
   125            service = service[len("https://") :]
   126
   127            originate_tls = True
   128            name_fields.append("otls")
   129
   130        elif allow_scheme and service.lower().startswith("http://"):
   131            service = service[len("http://") :]
   132
   133            if ctx:
   134                errors.append(
   135                    "Originate-TLS context %s being used even though service %s lists HTTP"
   136                    % (ctx_name, service)
   137                )
   138                originate_tls = True
   139                name_fields.append("otls")
   140            else:
   141                originate_tls = False
   142
   143        elif ctx:
   144            # No scheme (or schemes are ignored), but we have a context.
   145            originate_tls = True
   146            name_fields.append("otls")
   147            name_fields.append(ctx.name)
   148
   149        if "://" in service:
   150            # WTF is this?
   151            idx = service.index("://")
   152            scheme = service[0:idx]
   153
   154            if allow_scheme:
   155                errors.append(
   156                    "service %s has unknown scheme %s, assuming %s"
   157                    % (service, scheme, "HTTPS" if originate_tls else "HTTP")
   158                )
   159            else:
   160                errors.append(
   161                    "ignoring scheme %s for service %s, since it is being used for a non-HTTP mapping"
   162                    % (scheme, service)
   163                )
   164
   165            service = service[idx + 3 :]
   166
   167        # XXX Should this be checking originate_tls? Why does it do that?
   168        if originate_tls and host_rewrite:
   169            name_fields.append("hr-%s" % host_rewrite)
   170
   171        # Parse the service as a URL. Note that we have to supply a scheme to urllib's
   172        # parser, because it's kind of stupid.
   173
   174        ir.logger.debug("cluster setup: service %s otls %s ctx %s" % (service, originate_tls, ctx))
   175        p = urllib.parse.urlparse("random://" + service)
   176
   177        # Is there any junk after the host?
   178
   179        if p.path or p.params or p.query or p.fragment:
   180            errors.append(
   181                "service %s has extra URL components; ignoring everything but the host and port"
   182                % service
   183            )
   184
   185        # p is read-only, so break stuff out.
   186
   187        hostname = p.hostname
   188        namespace = parent_ir_resource.namespace
   189        # Make sure we save the namespace in the cluster name, to prevent clashes with non-fully qualified service resolution
   190        name_fields.append(namespace)
   191
   192        # Do we actually have a hostname?
   193        if not hostname:
   194            # We don't. That ain't good.
   195            errors.append(
   196                "service %s has no hostname and will be ignored; please re-configure" % service
   197            )
   198            self.ignore_cluster = True
   199            hostname = "unknown"
   200
   201        try:
   202            port = p.port
   203        except ValueError as e:
   204            errors.append(
   205                "found invalid port for service {}. Please specify a valid port between 0 and 65535 - {}. Service {} cluster will be ignored, please re-configure".format(
   206                    service, e, service
   207                )
   208            )
   209            self.ignore_cluster = True
   210            port = 0
   211
   212        # If the port is unset, fix it up.
   213        if not port:
   214            port = 443 if originate_tls else 80
   215
   216        # Rebuild the URL with the 'tcp' scheme and our changed info.
   217        # (Yes, really, TCP. Envoy uses the TLS context to determine whether to originate
   218        # TLS. Kind of odd, but there we go.)
   219        url = "tcp://%s:%d" % (hostname, port)
   220
   221        # Is there a circuit breaker involved here?
   222        if circuit_breakers:
   223            for breaker in circuit_breakers:
   224                name = breaker.get("_name", None)
   225
   226                if name:
   227                    name_fields.append(name)
   228                else:
   229                    # This is "impossible", but... let it go I guess?
   230                    errors.append(f"{service}: unvalidated circuit breaker {breaker}!")
   231                    name_fields.append(f"cbu{unknown_breakers}")
   232                    unknown_breakers += 1
   233
   234        # The Ambassador module will always have a load_balancer (which may be None).
   235        global_load_balancer = ir.ambassador_module.load_balancer
   236
   237        if not load_balancer:
   238            load_balancer = global_load_balancer
   239
   240        self.logger.debug(f"Load balancer for {url} is {load_balancer}")
   241
   242        enable_endpoints = False
   243
   244        if self.endpoints_required(load_balancer):
   245            if not Config.enable_endpoints:
   246                # Bzzt.
   247                errors.append(
   248                    f"{service}: endpoint routing is not enabled, falling back to {global_load_balancer}"
   249                )
   250                load_balancer = global_load_balancer
   251            else:
   252                enable_endpoints = True
   253
   254                if load_balancer:
   255                    # This is used only for cluster naming; it doesn't need to be a real
   256                    # load balancer policy.
   257
   258                    lb_type = load_balancer.get("policy", "default")
   259
   260                    key_fields = ["er", lb_type.lower()]
   261
   262                    # XXX Should we really include these things?
   263                    if "header" in load_balancer:
   264                        key_fields.append("hdr")
   265                        key_fields.append(load_balancer["header"])
   266
   267                    if "cookie" in load_balancer:
   268                        key_fields.append("cookie")
   269                        key_fields.append(load_balancer["cookie"]["name"])
   270
   271                    if "source_ip" in load_balancer:
   272                        key_fields.append("srcip")
   273
   274                    name_fields.append("-".join(key_fields))
   275
   276        # Finally we can construct the cluster name.
   277        name = "_".join(name_fields)
   278        name = re.sub(r"[^0-9A-Za-z_]", "_", name)
   279
   280        # OK. Build our default args.
   281        #
   282        # XXX We should really save the hostname and the port, not the URL.
   283
   284        if enable_ipv4 is None:
   285            enable_ipv4 = ir.ambassador_module.enable_ipv4
   286            ir.logger.debug(
   287                "%s: copying enable_ipv4 %s from Ambassador Module" % (name, enable_ipv4)
   288            )
   289
   290        if enable_ipv6 is None:
   291            enable_ipv6 = ir.ambassador_module.enable_ipv6
   292            ir.logger.debug(
   293                "%s: copying enable_ipv6 %s from Ambassador Module" % (name, enable_ipv6)
   294            )
   295
   296        new_args: Dict[str, Any] = {
   297            "type": dns_type,
   298            "lb_type": lb_type,
   299            "urls": [url],  # TODO: Should we completely eliminate `urls` in favor of `targets`?
   300            "load_balancer": load_balancer,
   301            "keepalive": keepalive,
   302            "circuit_breakers": circuit_breakers,
   303            "service": service,
   304            "enable_ipv4": enable_ipv4,
   305            "enable_ipv6": enable_ipv6,
   306            "enable_endpoints": enable_endpoints,
   307            "connect_timeout_ms": connect_timeout_ms,
   308            "cluster_idle_timeout_ms": cluster_idle_timeout_ms,
   309            "cluster_max_connection_lifetime_ms": cluster_max_connection_lifetime_ms,
   310            "respect_dns_ttl": respect_dns_ttl,
   311            "health_checks": health_checks,
   312        }
   313
   314        # If we have a stats_name, use it. If not, default it to the service to make life
   315        # easier for people trying to find stats later -- but translate unusual characters
   316        # to underscores, just in case.
   317
   318        if stats_name:
   319            new_args["stats_name"] = stats_name
   320        else:
   321            new_args["stats_name"] = re.sub(r"[^0-9A-Za-z_]", "_", service)
   322
   323        if grpc:
   324            new_args["grpc"] = True
   325
   326        if host_rewrite:
   327            new_args["host_rewrite"] = host_rewrite
   328
   329        if originate_tls:
   330            if ctx:
   331                new_args["tls_context"] = typecast(IRTLSContext, ctx)
   332            else:
   333                new_args["tls_context"] = IRTLSContext.null_context(ir=ir)
   334
   335        if rkey == "-override-":
   336            rkey = name
   337
   338        # Stash the resolver, hostname, and port for setup.
   339        self._resolver = resolver
   340        self._hostname = hostname
   341        self._namespace = namespace
   342        self._port = port
   343        self._is_sidecar = False
   344
   345        if self._hostname == "127.0.0.1" and self._port == 8500:
   346            self._is_sidecar = True
   347
   348        super().__init__(
   349            ir=ir,
   350            aconf=aconf,
   351            rkey=rkey,
   352            location=location,
   353            kind=kind,
   354            name=name,
   355            apiVersion=apiVersion,
   356            **new_args,
   357        )
   358
   359        if ctx:
   360            ctx.referenced_by(self)
   361
   362        if errors:
   363            for error in errors:
   364                ir.post_error(error, resource=self)
   365
   366    def setup(self, ir: "IR", aconf: Config) -> bool:
   367        self._cache_key = f"Cluster-{self.name}"
   368
   369        if self.ignore_cluster:
   370            return False
   371
   372        # Resolve our actual targets.
   373        targets = ir.resolve_targets(
   374            self, self._resolver, self._hostname, self._namespace, self._port
   375        )
   376
   377        self.targets = targets
   378
   379        if not targets:
   380            self.ir.logger.debug("accepting cluster with no endpoints: %s" % self.name)
   381
   382        # If we have health checking config then generate IR for it
   383        if "health_checks" in self:
   384            self.health_checks = IRHealthChecks(ir, aconf, self.get("health_checks", None))
   385        return True
   386
   387    def is_edge_stack_sidecar(self) -> bool:
   388        return self.is_active() and self._is_sidecar
   389
   390    def endpoints_required(self, load_balancer) -> bool:
   391        required = False
   392
   393        if load_balancer:
   394            lb_policy = load_balancer.get("policy")
   395
   396            if lb_policy in ["round_robin", "least_request", "ring_hash", "maglev"]:
   397                self.logger.debug(
   398                    "Endpoints are required for load balancing policy {}".format(lb_policy)
   399                )
   400                required = True
   401
   402        return required
   403
   404    def add_url(self, url: str) -> List[str]:
   405        self.urls.append(url)
   406
   407        return self.urls
   408
   409    def merge(self, other: "IRCluster") -> bool:
   410        # Is this mergeable?
   411
   412        mismatches = []
   413
   414        for key in [
   415            "type",
   416            "lb_type",
   417            "host_rewrite",
   418            "tls_context",
   419            "originate_tls",
   420            "grpc",
   421            "connect_timeout_ms",
   422            "cluster_idle_timeout_ms",
   423            "cluster_max_connection_lifetime_ms",
   424        ]:
   425            if self.get(key, None) != other.get(key, None):
   426                mismatches.append(key)
   427
   428        if mismatches:
   429            self.post_error(
   430                RichStatus.fromError(
   431                    "cannot merge cluster %s: mismatched attributes %s"
   432                    % (other.name, ", ".join(mismatches))
   433                )
   434            )
   435            return False
   436
   437        # All good.
   438        if other.urls:
   439            self.referenced_by(other)
   440
   441            for url in other.urls:
   442                self.add_url(url)
   443
   444        if other.targets:
   445            self.referenced_by(other)
   446            if self.targets == None:
   447                self.targets = other.targets
   448            else:
   449                self.targets = (
   450                    typecast(List[Dict[str, Union[int, str]]], self.targets) + other.targets
   451                )
   452
   453        return True
   454
   455    def get_resolver(self) -> "IRServiceResolver":
   456        return self.ir.resolve_resolver(self, self._resolver)
   457
   458    def clustermap_entry(self) -> Dict:
   459        return self.get_resolver().clustermap_entry(
   460            self.ir, self, self._hostname, self._namespace, self._port
   461        )

View as plain text