...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/irtcpmappinggroup.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional, Tuple
     2
     3from ..config import Config
     4from .irbasemapping import IRBaseMapping
     5from .irbasemappinggroup import IRBaseMappingGroup
     6from .ircluster import IRCluster
     7from .irresource import IRResource
     8
     9if TYPE_CHECKING:
    10    from .ir import IR  # pragma: no cover
    11
    12
    13########
    14## IRTCPMappingGroup is a collection of IRTCPMappings. We'll use it to build Envoy routes later,
    15## so the group itself ends up with some of the group-wide attributes of its Mappings.
    16
    17
    18class IRTCPMappingGroup(IRBaseMappingGroup):
    19    CoreMappingKeys: ClassVar[Dict[str, bool]] = {
    20        "address": True,
    21        "circuit_breakers": True,
    22        "enable_ipv4": True,
    23        "enable_ipv6": True,
    24        "group_id": True,
    25        "host": True,
    26        "idle_timeout_ms": True,
    27        # 'labels' doesn't appear in the TransparentKeys list for IRMapping, but it's still
    28        # a CoreMappingKey -- if it appears, it can't have multiple values within an IRTCPMappingGroup.
    29        "labels": True,
    30        "port": True,
    31        "tls": True,
    32    }
    33
    34    DoNotFlattenKeys: ClassVar[Dict[str, bool]] = dict(CoreMappingKeys)
    35    DoNotFlattenKeys.update(
    36        {
    37            "cluster": True,
    38            "cluster_key": True,
    39            "kind": True,
    40            "location": True,
    41            "name": True,
    42            "rkey": True,
    43            "route_weight": True,
    44            "service": True,
    45            "weight": True,
    46        }
    47    )
    48
    49    @staticmethod
    50    def helper_mappings(res: IRResource, k: str) -> Tuple[str, List[dict]]:
    51        return k, list(
    52            reversed(sorted([x.as_dict() for x in res.mappings], key=lambda x: x["route_weight"]))
    53        )
    54
    55    def __init__(
    56        self,
    57        ir: "IR",
    58        aconf: Config,
    59        location: str,
    60        mapping: IRBaseMapping,
    61        rkey: str = "ir.mappinggroup",
    62        kind: str = "IRTCPMappingGroup",
    63        name: str = "ir.mappinggroup",
    64        **kwargs,
    65    ) -> None:
    66        # print("IRTCPMappingGroup __init__ (%s %s %s)" % (kind, name, kwargs))
    67        del rkey  # silence unused-variable warning
    68
    69        super().__init__(
    70            ir=ir, aconf=aconf, rkey=mapping.rkey, location=location, kind=kind, name=name, **kwargs
    71        )
    72
    73        self.add_dict_helper("mappings", IRTCPMappingGroup.helper_mappings)
    74
    75        # Time to lift a bunch of core stuff from the first mapping up into the
    76        # group.
    77
    78        if ("group_weight" not in self) and ("route_weight" in mapping):
    79            self.group_weight = mapping.route_weight
    80
    81        for k in IRTCPMappingGroup.CoreMappingKeys:
    82            if (k not in self) and (k in mapping):
    83                self[k] = mapping[k]
    84
    85        self.add_mapping(aconf, mapping)
    86
    87    def add_mapping(self, aconf: Config, mapping: IRBaseMapping) -> None:
    88        mismatches = []
    89
    90        for k in IRTCPMappingGroup.CoreMappingKeys:
    91            if (k in mapping) and ((k not in self) or (mapping[k] != self[k])):
    92                mismatches.append((k, mapping[k], self.get(k, "-unset-")))
    93
    94        if mismatches:
    95            self.post_error(
    96                "cannot accept new mapping %s with mismatched %s"
    97                % (mapping.name, ", ".join(["%s: %s != %s" % (x, y, z) for x, y, z in mismatches]))
    98            )
    99            return
   100
   101        self.mappings.append(mapping)
   102
   103        if mapping.route_weight > self.group_weight:
   104            self.group_weight = mapping.group_weight
   105
   106        self.referenced_by(mapping)
   107
   108    # Deliberately matches IRListener.bind_to()
   109    def bind_to(self) -> str:
   110        bind_addr = self.get("address") or Config.envoy_bind_address
   111        return f"tcp-{bind_addr}-{self.port}"
   112
   113    def add_cluster_for_mapping(
   114        self, mapping: IRBaseMapping, marker: Optional[str] = None
   115    ) -> IRCluster:
   116        cluster: Optional[IRCluster] = None
   117
   118        if mapping.cluster_key:
   119            # Aha. Is our cluster already in the cache?
   120            cached_cluster = self.ir.cache_fetch(mapping.cluster_key)
   121
   122            if cached_cluster is not None:
   123                # We know a priori that anything in the cache under a cluster key must be
   124                # an IRCluster, but let's assert that rather than casting.
   125                assert isinstance(cached_cluster, IRCluster)
   126                cluster = cached_cluster
   127
   128                self.ir.logger.debug(
   129                    f"IRTCPMappingGroup: got Cluster from cache for {mapping.cluster_key}"
   130                )
   131
   132        if not cluster:
   133            # Find or create the cluster for this Mapping...
   134            cluster = IRCluster(
   135                ir=self.ir,
   136                aconf=self.ir.aconf,
   137                parent_ir_resource=mapping,
   138                location=mapping.location,
   139                service=mapping.service,
   140                resolver=mapping.resolver,
   141                ctx_name=mapping.get("tls", None),
   142                host_rewrite=mapping.get("host_rewrite", False),
   143                enable_ipv4=mapping.get("enable_ipv4", None),
   144                enable_ipv6=mapping.get("enable_ipv6", None),
   145                circuit_breakers=mapping.get("circuit_breakers", None),
   146                marker=marker,
   147                stats_name=self.get("stats_name", None),
   148            )
   149
   150        # Make sure that the cluster is really in our IR...
   151        stored = self.ir.add_cluster(cluster)
   152        stored.referenced_by(mapping)
   153
   154        # ...and then check if we just synthesized this cluster.
   155        if not mapping.cluster_key:
   156            # Yes. The mapping is already in the cache, but we need to cache the cluster...
   157            self.ir.cache_add(stored)
   158
   159            # ...and link the Group to the cluster.
   160            #
   161            # Right now, I'm going for maximum safety, which means a single chain linking
   162            # Mapping -> Group -> Cluster. That means that deleting a single Mapping deletes
   163            # the Group to which that Mapping is attached, which in turn deletes all the
   164            # Clusters for that Group.
   165            #
   166            # Performance might dictate linking Mapping -> Group and Mapping -> Cluster, so
   167            # that deleting a Mapping deletes the Group but only the single Cluster. Needs
   168            # testing.
   169
   170            self.ir.cache_link(self, stored)
   171
   172            # Finally, save the cluster's cache_key in this Mapping.
   173            mapping.cluster_key = stored.cache_key
   174
   175        # Finally, return the stored cluster. Done.
   176        return stored
   177
   178    def finalize(self, ir: "IR", aconf: Config) -> List[IRCluster]:
   179        """
   180        Finalize a MappingGroup based on the attributes of its Mappings. Core elements get lifted into
   181        the Group so we can more easily build Envoy routes; host-redirect and shadow get handled, etc.
   182
   183        :param ir: the IR we're working from
   184        :param aconf: the Config we're working from
   185        :return: a list of the IRClusters this Group uses
   186        """
   187
   188        metadata_labels: Dict[str, str] = {}
   189
   190        for mapping in sorted(self.mappings, key=lambda m: m.route_weight):
   191            self.ir.logger.debug("%s mapping %s" % (self, mapping.as_json()))
   192
   193            for k in mapping.keys():
   194                if (
   195                    k.startswith("_")
   196                    or mapping.skip_key(k)
   197                    or (k in IRTCPMappingGroup.DoNotFlattenKeys)
   198                ):
   199                    # self.ir.logger.debug("%s: don't flatten %s" % (self, k))
   200                    continue
   201
   202                # self.ir.logger.debug("%s: flatten %s" % (self, k))
   203
   204                self[k] = mapping[k]
   205
   206            # Should we have higher weights win over lower if there are conflicts?
   207            # Should we disallow conflicts?
   208            metadata_labels.update(mapping.get("metadata_labels") or {})
   209
   210        if metadata_labels:
   211            self.metadata_labels = metadata_labels
   212
   213        # self.ir.logger.debug("%s after flattening %s" % (self, self.as_json()))
   214
   215        total_weight = 0.0
   216        unspecified_mappings = 0
   217
   218        # # OK. Save some typing with local variables for default labels and our labels...
   219        # labels: Dict[str, Any] = self.get('labels', None)
   220        #
   221        # if not labels:
   222        #     # No labels. Use the default label domain to see if we have some valid defaults.
   223        #     defaults = ir.ambassador_module.get_default_labels()
   224        #
   225        #     if defaults:
   226        #         domain = ir.ambassador_module.get_default_label_domain()
   227        #
   228        #         self.labels = {
   229        #             domain: [
   230        #                 {
   231        #                     'defaults': defaults
   232        #                 }
   233        #             ]
   234        #         }
   235        # else:
   236        #     # Walk all the domains in our labels, and prepend the defaults, if any.
   237        #     # ir.logger.info("%s: labels %s" % (self.as_json(), labels))
   238        #
   239        #     for domain in labels.keys():
   240        #         defaults = ir.ambassador_module.get_default_labels(domain)
   241        #         ir.logger.debug("%s: defaults %s" % (domain, defaults))
   242        #
   243        #         if defaults:
   244        #             ir.logger.debug("%s: labels %s" % (domain, labels[domain]))
   245        #
   246        #             for label in labels[domain]:
   247        #                 ir.logger.debug("%s: label %s" % (domain, label))
   248        #
   249        #                 lkeys = label.keys()
   250        #                 if len(lkeys) > 1:
   251        #                     err = RichStatus.fromError("label has multiple entries (%s) instead of just one" %
   252        #                                                lkeys)
   253        #                     aconf.post_error(err, self)
   254        #
   255        #                 lkey = list(lkeys)[0]
   256        #
   257        #                 if lkey.startswith('v0_ratelimit_'):
   258        #                     # Don't prepend defaults, as this was imported from a V0 rate_limit.
   259        #                     continue
   260        #
   261        #                 label[lkey] = defaults + label[lkey]
   262
   263        for mapping in self.mappings:
   264            mapping.cluster = self.add_cluster_for_mapping(mapping, mapping.cluster_tag)
   265
   266        self.logger.debug(f"Normalizing weights in mappings now...")
   267        if not self.normalize_weights_in_mappings():
   268            self.post_error(f"Could not normalize mapping weights, ignoring...")
   269            return []
   270
   271        return list([mapping.cluster for mapping in self.mappings])

View as plain text