1import re
2from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
3from urllib.parse import quote as urlquote
4from urllib.parse import scheme_chars
5from urllib.parse import unquote as urlunquote
6from urllib.parse import urlparse
7
8from ..config import Config
9from ..utils import dump_json
10from .irresource import IRResource
11
12if TYPE_CHECKING:
13 from .ir import IR # pragma: no cover
14
15
16def would_confuse_urlparse(url: str) -> bool:
17 """Returns whether an URL-ish string would be interpretted by urlparse()
18 differently than we want, by parsing it as a non-URL URI ("scheme:path")
19 instead of as a URL ("[scheme:]//authority[:port]/path"). We don't want to
20 interpret "myhost:8080" as "ParseResult(scheme='myhost', path='8080')"!
21
22 Note: This has a Go equivalent in github.com/emissary-ingress/emissary/v3/pkg/emissaryutil. Please
23 keep them in-sync.
24 """
25 if url.find(":") > 0 and url.lstrip(scheme_chars).startswith("://"):
26 # has a scheme
27 return False
28 if url.startswith("//"):
29 # does not have a scheme, but has the "//" URL authority marker
30 return False
31 return True
32
33
34def normalize_service_name(
35 ir: "IR",
36 in_service: str,
37 mapping_namespace: Optional[str],
38 resolver_kind: str,
39 rkey: Optional[str] = None,
40) -> str:
41 """
42 Note: This has a Go equivalent in github.com/emissary-ingress/emissary/v3/pkg/emissaryutil. Please
43 keep them in-sync.
44 """
45 try:
46 parsed = urlparse(f"//{in_service}" if would_confuse_urlparse(in_service) else in_service)
47
48 if not parsed.hostname:
49 raise ValueError("No hostname")
50 # urlib.parse.unquote is permissive, but we want to be strict
51 bad_seqs = [
52 seq
53 for seq in re.findall(r"%.{,2}", parsed.hostname)
54 if not re.fullmatch(r"%[0-9a-fA-F]{2}", seq)
55 ]
56 if bad_seqs:
57 raise ValueError(f"Invalid percent-escape in hostname: {bad_seqs[0]}")
58 hostname = urlunquote(parsed.hostname)
59 scheme = parsed.scheme
60 port = parsed.port
61 except ValueError as e:
62 # This could happen with mismatched [] in a scheme://[IPv6], or with a port that can't
63 # cast to int, or a port outside [0,2^16), or...
64 #
65 # The best we can do here is probably just to log the error, return the original string
66 # and hope for the best. I guess.
67
68 errstr = f"Malformed service {repr(in_service)}: {e}"
69 if rkey:
70 errstr = f"{rkey}: {errstr}"
71 ir.post_error(errstr)
72
73 return in_service
74
75 # Consul Resolvers don't allow service names to include subdomains, but
76 # Kubernetes Resolvers _require_ subdomains to correctly handle namespaces.
77 want_qualified = (
78 not ir.ambassador_module.use_ambassador_namespace_for_service_resolution
79 and resolver_kind.startswith("Kubernetes")
80 )
81
82 is_qualified = "." in hostname or ":" in hostname or "localhost" == hostname
83
84 if (
85 mapping_namespace
86 and mapping_namespace != ir.ambassador_namespace
87 and want_qualified
88 and not is_qualified
89 ):
90 hostname += "." + mapping_namespace
91
92 out_service = urlquote(
93 hostname, safe="!$&'()*+,;=:[]<>\""
94 ) # match 'encodeHost' behavior of Go stdlib net/url/url.go
95 if ":" in out_service:
96 out_service = f"[{out_service}]"
97 if scheme:
98 out_service = f"{scheme}://{out_service}"
99 if port:
100 out_service += f":{port}"
101
102 ir.logger.debug(
103 "%s use_ambassador_namespace_for_service_resolution %s, fully qualified %s, upstream hostname %s"
104 % (
105 resolver_kind,
106 ir.ambassador_module.use_ambassador_namespace_for_service_resolution,
107 is_qualified,
108 out_service,
109 )
110 )
111
112 return out_service
113
114
115class IRBaseMapping(IRResource):
116 group_id: str
117 host: Optional[str]
118 route_weight: List[Union[str, int]]
119 cached_status: Optional[Dict[str, str]]
120 status_update: Optional[Dict[str, str]]
121 cluster_key: Optional[str]
122 _weight: int
123
124 def __init__(
125 self,
126 ir: "IR",
127 aconf: Config,
128 rkey: str, # REQUIRED
129 name: str, # REQUIRED
130 location: str, # REQUIRED
131 kind: str, # REQUIRED
132 namespace: Optional[str] = None,
133 metadata_labels: Optional[Dict[str, str]] = None,
134 apiVersion: str = "getambassador.io/v3alpha1",
135 precedence: int = 0,
136 cluster_tag: Optional[str] = None,
137 **kwargs,
138 ) -> None:
139 # Default status...
140 self.cached_status = None
141 self.status_update = None
142
143 # Start by assuming that we don't know the cluster key for this Mapping.
144 self.cluster_key = None
145
146 # We don't know the calculated weight yet, so set it to 0.
147 self._weight = 0
148
149 # Init the superclass...
150 super().__init__(
151 ir=ir,
152 aconf=aconf,
153 rkey=rkey,
154 location=location,
155 kind=kind,
156 name=name,
157 namespace=namespace,
158 metadata_labels=metadata_labels,
159 apiVersion=apiVersion,
160 precedence=precedence,
161 cluster_tag=cluster_tag,
162 **kwargs,
163 )
164
165 @classmethod
166 def make_cache_key(cls, kind: str, name: str, namespace: str, version: str = "v2") -> str:
167 # Why is this split on the name necessary?
168 # the name of a Mapping when we fetch it from the aconf will match the metadata.name of
169 # the Mapping that the config comes from _only if_ it is the only Mapping with that exact name.
170 # If there are multiple Mappings with the same name in different namespaces then the name
171 # becomes `name.namespace` for all mappings of the same name after the first one.
172 # The first one just gets to be `name` for "reasons".
173 #
174 # This behaviour is needed by other places in the code, but for the cache key, we need it to match the
175 # below format regardless of how many Mappings there are with that name. This is necessary for the cache
176 # specifically because there are places where we interact with the cache that have access to the
177 # metadata.name and metadata.namespace of the Mapping, but do not have access to the aconf representation
178 # of the Mapping name and thus have no way of knowing whether a specific name is mangled due to multiple
179 # Mappings sharing the same name or not.
180 name = name.split(".")[0]
181 return f"{kind}-{version}-{name}-{namespace}"
182
183 def setup(self, ir: "IR", aconf: Config) -> bool:
184 # Set up our cache key. We're using this format so that it'll be easy
185 # to generate it just from the Mapping's K8s metadata.
186 self._cache_key = IRBaseMapping.make_cache_key(self.kind, self.name, self.namespace)
187
188 # ...and start without a cluster key for this Mapping.
189 self.cluster_key = None
190
191 # We assume that any subclass madness is managed already, so we can compute the group ID...
192 self.group_id = self._group_id()
193
194 # ...and the route weight.
195 self.route_weight = self._route_weight()
196
197 # We can also default the resolver, and scream if it doesn't match a resolver we
198 # know about.
199 if not self.get("resolver"):
200 self.resolver = self.ir.ambassador_module.get("resolver", "kubernetes-service")
201
202 resolver = self.ir.get_resolver(self.resolver)
203
204 if not resolver:
205 self.post_error(f"resolver {self.resolver} is unknown!")
206 return False
207
208 self.ir.logger.debug(
209 "%s: GID %s route_weight %s, resolver %s"
210 % (self, self.group_id, self.route_weight, resolver)
211 )
212
213 # And, of course, we can make sure that the resolver thinks that this Mapping is OK.
214 if not resolver.valid_mapping(ir, self):
215 # If there's trouble, the resolver should've already posted about it.
216 return False
217
218 if self.get("circuit_breakers", None) is None:
219 self["circuit_breakers"] = ir.ambassador_module.circuit_breakers
220
221 if self.get("circuit_breakers", None) is not None:
222 if not self.validate_circuit_breakers(ir, self["circuit_breakers"]):
223 self.post_error(
224 "Invalid circuit_breakers specified: {}, invalidating mapping".format(
225 self["circuit_breakers"]
226 )
227 )
228 return False
229
230 return True
231
232 @staticmethod
233 def validate_circuit_breakers(ir: "IR", circuit_breakers) -> bool:
234 if not isinstance(circuit_breakers, (list, tuple)):
235 return False
236
237 for circuit_breaker in circuit_breakers:
238 if "_name" in circuit_breaker:
239 # Already reconciled.
240 ir.logger.debug(f'Breaker validation: good breaker {circuit_breaker["_name"]}')
241 continue
242
243 ir.logger.debug(f"Breaker validation: {dump_json(circuit_breakers, pretty=True)}")
244
245 name_fields = ["cb"]
246
247 if "priority" in circuit_breaker:
248 prio = circuit_breaker.get("priority").lower()
249 if prio not in ["default", "high"]:
250 return False
251
252 name_fields.append(prio[0])
253 else:
254 name_fields.append("n")
255
256 digit_fields = [
257 ("max_connections", "c"),
258 ("max_pending_requests", "p"),
259 ("max_requests", "r"),
260 ("max_retries", "t"),
261 ]
262
263 for field, abbrev in digit_fields:
264 if field in circuit_breaker:
265 try:
266 value = int(circuit_breaker[field])
267 name_fields.append(f"{abbrev}{value}")
268 except ValueError:
269 return False
270
271 circuit_breaker["_name"] = "".join(name_fields)
272 ir.logger.debug(f'Breaker valid: {circuit_breaker["_name"]}')
273
274 return True
275
276 def get_label(self, key: str) -> Optional[str]:
277 labels = self.get("metadata_labels") or {}
278 return labels.get(key) or None
279
280 def status(self) -> Optional[Dict[str, Any]]:
281 """
282 Return the new status we should have. Subclasses would typically override
283 this.
284
285 :return: new status (may be None)
286 """
287 return None
288
289 def check_status(self) -> None:
290 crd_name = self.get_label("ambassador_crd")
291
292 if not crd_name:
293 return
294
295 # OK, we're supposed to be a CRD. What status do we want, and
296 # what do we have?
297
298 wanted = self.status()
299
300 if wanted != self.cached_status:
301 self.ir.k8s_status_updates[crd_name] = ("Mapping", self.namespace, wanted)
302
303 def _group_id(self) -> str:
304 """Compute the group ID for this Mapping. Must be defined by subclasses."""
305 raise NotImplementedError("%s._group_id is not implemented?" % self.__class__.__name__)
306
307 def _route_weight(self) -> List[Union[str, int]]:
308 """Compute the route weight for this Mapping. Must be defined by subclasses."""
309 raise NotImplementedError("%s._route_weight is not implemented?" % self.__class__.__name__)
View as plain text