1from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional, Tuple
2
3from ..config import Config
4from .irbasemapping import IRBaseMapping
5from .irbasemappinggroup import IRBaseMappingGroup
6from .ircluster import IRCluster
7from .irresource import IRResource
8
9if TYPE_CHECKING:
10 from .ir import IR # pragma: no cover
11
12
13########
14## IRTCPMappingGroup is a collection of IRTCPMappings. We'll use it to build Envoy routes later,
15## so the group itself ends up with some of the group-wide attributes of its Mappings.
16
17
18class IRTCPMappingGroup(IRBaseMappingGroup):
19 CoreMappingKeys: ClassVar[Dict[str, bool]] = {
20 "address": True,
21 "circuit_breakers": True,
22 "enable_ipv4": True,
23 "enable_ipv6": True,
24 "group_id": True,
25 "host": True,
26 "idle_timeout_ms": True,
27 # 'labels' doesn't appear in the TransparentKeys list for IRMapping, but it's still
28 # a CoreMappingKey -- if it appears, it can't have multiple values within an IRTCPMappingGroup.
29 "labels": True,
30 "port": True,
31 "tls": True,
32 }
33
34 DoNotFlattenKeys: ClassVar[Dict[str, bool]] = dict(CoreMappingKeys)
35 DoNotFlattenKeys.update(
36 {
37 "cluster": True,
38 "cluster_key": True,
39 "kind": True,
40 "location": True,
41 "name": True,
42 "rkey": True,
43 "route_weight": True,
44 "service": True,
45 "weight": True,
46 }
47 )
48
49 @staticmethod
50 def helper_mappings(res: IRResource, k: str) -> Tuple[str, List[dict]]:
51 return k, list(
52 reversed(sorted([x.as_dict() for x in res.mappings], key=lambda x: x["route_weight"]))
53 )
54
55 def __init__(
56 self,
57 ir: "IR",
58 aconf: Config,
59 location: str,
60 mapping: IRBaseMapping,
61 rkey: str = "ir.mappinggroup",
62 kind: str = "IRTCPMappingGroup",
63 name: str = "ir.mappinggroup",
64 **kwargs,
65 ) -> None:
66 # print("IRTCPMappingGroup __init__ (%s %s %s)" % (kind, name, kwargs))
67 del rkey # silence unused-variable warning
68
69 super().__init__(
70 ir=ir, aconf=aconf, rkey=mapping.rkey, location=location, kind=kind, name=name, **kwargs
71 )
72
73 self.add_dict_helper("mappings", IRTCPMappingGroup.helper_mappings)
74
75 # Time to lift a bunch of core stuff from the first mapping up into the
76 # group.
77
78 if ("group_weight" not in self) and ("route_weight" in mapping):
79 self.group_weight = mapping.route_weight
80
81 for k in IRTCPMappingGroup.CoreMappingKeys:
82 if (k not in self) and (k in mapping):
83 self[k] = mapping[k]
84
85 self.add_mapping(aconf, mapping)
86
87 def add_mapping(self, aconf: Config, mapping: IRBaseMapping) -> None:
88 mismatches = []
89
90 for k in IRTCPMappingGroup.CoreMappingKeys:
91 if (k in mapping) and ((k not in self) or (mapping[k] != self[k])):
92 mismatches.append((k, mapping[k], self.get(k, "-unset-")))
93
94 if mismatches:
95 self.post_error(
96 "cannot accept new mapping %s with mismatched %s"
97 % (mapping.name, ", ".join(["%s: %s != %s" % (x, y, z) for x, y, z in mismatches]))
98 )
99 return
100
101 self.mappings.append(mapping)
102
103 if mapping.route_weight > self.group_weight:
104 self.group_weight = mapping.group_weight
105
106 self.referenced_by(mapping)
107
108 # Deliberately matches IRListener.bind_to()
109 def bind_to(self) -> str:
110 bind_addr = self.get("address") or Config.envoy_bind_address
111 return f"tcp-{bind_addr}-{self.port}"
112
113 def add_cluster_for_mapping(
114 self, mapping: IRBaseMapping, marker: Optional[str] = None
115 ) -> IRCluster:
116 cluster: Optional[IRCluster] = None
117
118 if mapping.cluster_key:
119 # Aha. Is our cluster already in the cache?
120 cached_cluster = self.ir.cache_fetch(mapping.cluster_key)
121
122 if cached_cluster is not None:
123 # We know a priori that anything in the cache under a cluster key must be
124 # an IRCluster, but let's assert that rather than casting.
125 assert isinstance(cached_cluster, IRCluster)
126 cluster = cached_cluster
127
128 self.ir.logger.debug(
129 f"IRTCPMappingGroup: got Cluster from cache for {mapping.cluster_key}"
130 )
131
132 if not cluster:
133 # Find or create the cluster for this Mapping...
134 cluster = IRCluster(
135 ir=self.ir,
136 aconf=self.ir.aconf,
137 parent_ir_resource=mapping,
138 location=mapping.location,
139 service=mapping.service,
140 resolver=mapping.resolver,
141 ctx_name=mapping.get("tls", None),
142 host_rewrite=mapping.get("host_rewrite", False),
143 enable_ipv4=mapping.get("enable_ipv4", None),
144 enable_ipv6=mapping.get("enable_ipv6", None),
145 circuit_breakers=mapping.get("circuit_breakers", None),
146 marker=marker,
147 stats_name=self.get("stats_name", None),
148 )
149
150 # Make sure that the cluster is really in our IR...
151 stored = self.ir.add_cluster(cluster)
152 stored.referenced_by(mapping)
153
154 # ...and then check if we just synthesized this cluster.
155 if not mapping.cluster_key:
156 # Yes. The mapping is already in the cache, but we need to cache the cluster...
157 self.ir.cache_add(stored)
158
159 # ...and link the Group to the cluster.
160 #
161 # Right now, I'm going for maximum safety, which means a single chain linking
162 # Mapping -> Group -> Cluster. That means that deleting a single Mapping deletes
163 # the Group to which that Mapping is attached, which in turn deletes all the
164 # Clusters for that Group.
165 #
166 # Performance might dictate linking Mapping -> Group and Mapping -> Cluster, so
167 # that deleting a Mapping deletes the Group but only the single Cluster. Needs
168 # testing.
169
170 self.ir.cache_link(self, stored)
171
172 # Finally, save the cluster's cache_key in this Mapping.
173 mapping.cluster_key = stored.cache_key
174
175 # Finally, return the stored cluster. Done.
176 return stored
177
178 def finalize(self, ir: "IR", aconf: Config) -> List[IRCluster]:
179 """
180 Finalize a MappingGroup based on the attributes of its Mappings. Core elements get lifted into
181 the Group so we can more easily build Envoy routes; host-redirect and shadow get handled, etc.
182
183 :param ir: the IR we're working from
184 :param aconf: the Config we're working from
185 :return: a list of the IRClusters this Group uses
186 """
187
188 metadata_labels: Dict[str, str] = {}
189
190 for mapping in sorted(self.mappings, key=lambda m: m.route_weight):
191 self.ir.logger.debug("%s mapping %s" % (self, mapping.as_json()))
192
193 for k in mapping.keys():
194 if (
195 k.startswith("_")
196 or mapping.skip_key(k)
197 or (k in IRTCPMappingGroup.DoNotFlattenKeys)
198 ):
199 # self.ir.logger.debug("%s: don't flatten %s" % (self, k))
200 continue
201
202 # self.ir.logger.debug("%s: flatten %s" % (self, k))
203
204 self[k] = mapping[k]
205
206 # Should we have higher weights win over lower if there are conflicts?
207 # Should we disallow conflicts?
208 metadata_labels.update(mapping.get("metadata_labels") or {})
209
210 if metadata_labels:
211 self.metadata_labels = metadata_labels
212
213 # self.ir.logger.debug("%s after flattening %s" % (self, self.as_json()))
214
215 total_weight = 0.0
216 unspecified_mappings = 0
217
218 # # OK. Save some typing with local variables for default labels and our labels...
219 # labels: Dict[str, Any] = self.get('labels', None)
220 #
221 # if not labels:
222 # # No labels. Use the default label domain to see if we have some valid defaults.
223 # defaults = ir.ambassador_module.get_default_labels()
224 #
225 # if defaults:
226 # domain = ir.ambassador_module.get_default_label_domain()
227 #
228 # self.labels = {
229 # domain: [
230 # {
231 # 'defaults': defaults
232 # }
233 # ]
234 # }
235 # else:
236 # # Walk all the domains in our labels, and prepend the defaults, if any.
237 # # ir.logger.info("%s: labels %s" % (self.as_json(), labels))
238 #
239 # for domain in labels.keys():
240 # defaults = ir.ambassador_module.get_default_labels(domain)
241 # ir.logger.debug("%s: defaults %s" % (domain, defaults))
242 #
243 # if defaults:
244 # ir.logger.debug("%s: labels %s" % (domain, labels[domain]))
245 #
246 # for label in labels[domain]:
247 # ir.logger.debug("%s: label %s" % (domain, label))
248 #
249 # lkeys = label.keys()
250 # if len(lkeys) > 1:
251 # err = RichStatus.fromError("label has multiple entries (%s) instead of just one" %
252 # lkeys)
253 # aconf.post_error(err, self)
254 #
255 # lkey = list(lkeys)[0]
256 #
257 # if lkey.startswith('v0_ratelimit_'):
258 # # Don't prepend defaults, as this was imported from a V0 rate_limit.
259 # continue
260 #
261 # label[lkey] = defaults + label[lkey]
262
263 for mapping in self.mappings:
264 mapping.cluster = self.add_cluster_for_mapping(mapping, mapping.cluster_tag)
265
266 self.logger.debug(f"Normalizing weights in mappings now...")
267 if not self.normalize_weights_in_mappings():
268 self.post_error(f"Could not normalize mapping weights, ignoring...")
269 return []
270
271 return list([mapping.cluster for mapping in self.mappings])
View as plain text