1from ipaddress import ip_address
2from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
3
4from ..config import Config
5from .irresource import IRResource
6
7if TYPE_CHECKING:
8 from .ir import IR # pragma: no cover
9 from .irbasemapping import IRBaseMapping # pragma: no cover
10 from .ircluster import IRCluster # pragma: no cover
11
12#############################################################################
13## irserviceresolver.py -- resolve endpoints for services
14##
15## IRServiceResolver does the work of looking into Service data structures.
16## There are, naturally, some weirdnesses.
17##
18## Here's the way this goes:
19##
20## When you create an AConf, you must hand in Service objects and Resolver
21## objects. (This will generally happen by virtue of the ResourceFetcher
22## finding them someplace.) There can be multiple kinds of Resolver objects
23## (e.g. ConsulResolver, KubernetesEndpointResolver, etc.).
24##
25## When you create an IR from that AConf, the various kinds of Resolvers
26## all get turned into IRServiceResolvers, and the IR uses those to handle
27## the mechanics of finding the upstream endpoints for a service.
28
29SvcEndpoint = Dict[str, Union[int, str]]
30SvcEndpointSet = List[SvcEndpoint]
31ClustermapEntry = Dict[str, Union[int, str]]
32
33
34class IRServiceResolver(IRResource):
35 def __init__(
36 self,
37 ir: "IR",
38 aconf: Config,
39 rkey: str = "ir.resolver",
40 kind: str = "IRServiceResolver",
41 name: str = "ir.resolver",
42 location: str = "--internal--",
43 **kwargs,
44 ) -> None:
45 super().__init__(
46 ir=ir, aconf=aconf, rkey=rkey, kind=kind, name=name, location=location, **kwargs
47 )
48
49 def setup(self, ir: "IR", aconf: Config) -> bool:
50 if self.kind == "ConsulResolver":
51 self.resolve_with = "consul"
52
53 if not self.get("datacenter"):
54 self.post_error("ConsulResolver is required to have a datacenter")
55 return False
56 elif self.kind == "KubernetesServiceResolver":
57 self.resolve_with = "k8s"
58 elif self.kind == "KubernetesEndpointResolver":
59 self.resolve_with = "k8s"
60 else:
61 self.post_error(f"Resolver kind {self.kind} unknown")
62 return False
63
64 return True
65
66 def valid_mapping(self, ir: "IR", mapping: "IRBaseMapping") -> bool:
67 fn = {
68 "KubernetesServiceResolver": self._k8s_svc_valid_mapping,
69 "KubernetesEndpointResolver": self._k8s_valid_mapping,
70 "ConsulResolver": self._consul_valid_mapping,
71 }[self.kind]
72
73 return fn(ir, mapping)
74
75 def _k8s_svc_valid_mapping(self, ir: "IR", mapping: "IRBaseMapping"):
76 # You're not allowed to specific a load balancer with a KubernetesServiceResolver.
77 if mapping.get("load_balancer"):
78 mapping.post_error(
79 "No load_balancer setting is allowed with the KubernetesServiceResolver"
80 )
81 return False
82
83 return True
84
85 def _k8s_valid_mapping(self, ir: "IR", mapping: "IRBaseMapping"):
86 # There's no real validation to do here beyond what the Mapping already does.
87 return True
88
89 def _consul_valid_mapping(self, ir: "IR", mapping: "IRBaseMapping"):
90 # Mappings using the Consul resolver can't use service names with '.', or port
91 # override. We currently do this the cheap & sleazy way.
92
93 valid = True
94
95 if mapping.service.find(".") >= 0:
96 mapping.post_error("The Consul resolver does not allow dots in service names")
97 valid = False
98
99 if mapping.service.find(":") >= 0:
100 # This is not an _error_ per se -- we'll accept the mapping and just ignore the port.
101 ir.aconf.post_notice(
102 "The Consul resolver does not allow overriding service port; ignoring requested port",
103 resource=mapping,
104 )
105
106 return valid
107
108 def resolve(
109 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
110 ) -> Optional[SvcEndpointSet]:
111 fn = {
112 "KubernetesServiceResolver": self._k8s_svc_resolver,
113 "KubernetesEndpointResolver": self._k8s_resolver,
114 "ConsulResolver": self._consul_resolver,
115 }[self.kind]
116
117 return fn(ir, cluster, svc_name, svc_namespace, port)
118
119 def _k8s_svc_resolver(
120 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
121 ) -> Optional[SvcEndpointSet]:
122 # The K8s service resolver always returns a single endpoint.
123 return [{"ip": svc_name, "port": port, "target_kind": "DNSname"}]
124
125 def _k8s_resolver(
126 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
127 ) -> Optional[SvcEndpointSet]:
128 svc, namespace = self.parse_service(ir, svc_name, svc_namespace)
129 # Find endpoints, and try for a port match!
130 return self.get_endpoints(ir, f"k8s-{svc}-{namespace}", port)
131
132 def parse_service(self, ir: "IR", svc_name: str, svc_namespace: str) -> Tuple[str, str]:
133 # K8s service names can be 'svc' or 'svc.namespace'. Which does this look like?
134 svc = svc_name
135 namespace = Config.ambassador_namespace
136
137 if "." in svc and not is_ip_address(svc):
138 # OK, cool. Peel off the service and the namespace.
139 #
140 # Note that some people may use service.namespace.cluster.svc.local or
141 # some such crap. The [0:2] is to restrict this to just the first two
142 # elements if there are more, but still work if there are not.
143
144 (svc, namespace) = svc.split(".", 2)[0:2]
145 elif (
146 not ir.ambassador_module.use_ambassador_namespace_for_service_resolution
147 and svc_namespace
148 ):
149 namespace = svc_namespace
150 ir.logger.debug(
151 "KubernetesEndpointResolver use_ambassador_namespace_for_service_resolution %s, upstream key %s"
152 % (
153 ir.ambassador_module.use_ambassador_namespace_for_service_resolution,
154 f"{svc}-{namespace}",
155 )
156 )
157
158 return svc, namespace
159
160 def _consul_resolver(
161 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
162 ) -> Optional[SvcEndpointSet]:
163 # For Consul, we look things up with the service name and the datacenter at present.
164 # We ignore the port in the lookup (we should've already posted a warning about the port
165 # being present, actually).
166
167 return self.get_endpoints(ir, f"consul-{svc_name}-{self.datacenter}", None)
168
169 def get_endpoints(self, ir: "IR", key: str, port: Optional[int]) -> Optional[SvcEndpointSet]:
170 # OK. Do we have a Service by this key?
171 service = ir.services.get(key)
172
173 if not service:
174 self.logger.debug(f"Resolver {self.name}: {key} matches no Service for endpoints")
175 return None
176
177 self.logger.debug(f"Resolver {self.name}: {key} matches %s" % service.as_json())
178
179 endpoints = service.get("endpoints")
180
181 if not endpoints:
182 self.logger.debug(f"Resolver {self.name}: {key} has no endpoints")
183 return None
184
185 # Do we have a match for the port they're asking for (y'know, if they're asking for one)?
186
187 targets = endpoints.get(port or "*")
188
189 if targets:
190 # Yes!
191 tstr = ", ".join([f'{x["ip"]}:{x["port"]}' for x in targets])
192
193 self.logger.debug(f"Resolver {self.name}: {key}:{port} matches {tstr}")
194
195 return targets
196 else:
197 hrtype = "Kubernetes" if (self.resolve_with == "k8s") else self.resolve_with
198
199 # This is ugly. We're almost certainly being called from _within_ the initialization
200 # of the cluster here -- so I guess we'll report the error against the service. Sigh.
201 self.ir.aconf.post_error(
202 f"Service {service.name}: {key}:{port} matches no endpoints from {hrtype}",
203 resource=service,
204 )
205
206 return None
207
208 def clustermap_entry(
209 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
210 ) -> ClustermapEntry:
211 fn = {
212 "KubernetesServiceResolver": self._k8s_svc_clustermap_entry,
213 "KubernetesEndpointResolver": self._k8s_clustermap_entry,
214 "ConsulResolver": self._consul_clustermap_entry,
215 }[self.kind]
216
217 return fn(ir, cluster, svc_name, svc_namespace, port)
218
219 def _k8s_svc_clustermap_entry(
220 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
221 ) -> ClustermapEntry:
222 # The K8s service resolver always returns a single endpoint.
223 svc, namespace = self.parse_service(ir, svc_name, svc_namespace)
224 return {"port": port, "kind": self.kind, "service": svc, "namespace": namespace}
225
226 def _k8s_clustermap_entry(
227 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
228 ) -> ClustermapEntry:
229 # Fallback to the KubernetesServiceResolver for IP addresses or if the service doesn't exist.
230 if is_ip_address(svc_name):
231 return {
232 "service": svc_name,
233 "namespace": svc_namespace,
234 "port": port,
235 "kind": "KubernetesServiceResolver",
236 }
237
238 if port:
239 portstr = "/%s" % port
240 else:
241 portstr = ""
242 svc, namespace = self.parse_service(ir, svc_name, svc_namespace)
243 # Find endpoints, and try for a port match!
244 return {
245 "service": svc,
246 "namespace": namespace,
247 "port": port,
248 "kind": self.kind,
249 "endpoint_path": "k8s/%s/%s%s" % (namespace, svc, portstr),
250 }
251
252 def _consul_clustermap_entry(
253 self, ir: "IR", cluster: "IRCluster", svc_name: str, svc_namespace: str, port: int
254 ) -> ClustermapEntry:
255 # Fallback to the KubernetesServiceResolver for ip addresses.
256 if is_ip_address(svc_name):
257 return {
258 "service": svc_name,
259 "namespace": svc_namespace,
260 "port": port,
261 "kind": "KubernetesServiceResolver",
262 }
263
264 # For Consul, we look things up with the service name and the datacenter at present.
265 # We ignore the port in the lookup (we should've already posted a warning about the port
266 # being present, actually).
267 return {
268 "service": svc_name,
269 "datacenter": self.datacenter,
270 "kind": self.kind,
271 "endpoint_path": "consul/%s/%s" % (self.datacenter, svc_name),
272 }
273
274
275class IRServiceResolverFactory:
276 @classmethod
277 def load_all(cls, ir: "IR", aconf: Config) -> None:
278 config_info = aconf.get_config("resolvers")
279
280 if config_info:
281 assert len(config_info) > 0 # really rank paranoia on my part...
282
283 for config in config_info.values():
284 cdict = config.as_dict()
285 cdict["rkey"] = config.rkey
286 cdict["location"] = config.location
287
288 ir.add_resolver(IRServiceResolver(ir, aconf, **cdict))
289
290 if not ir.get_resolver("kubernetes-service"):
291 # Default the K8s service resolver.
292 resolver_config = {
293 "apiVersion": "getambassador.io/v3alpha1",
294 "kind": "KubernetesServiceResolver",
295 "name": "kubernetes-service",
296 }
297
298 if Config.single_namespace:
299 resolver_config["namespace"] = Config.ambassador_namespace
300
301 ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
302
303 # Ugh, the aliasing for the K8s and Consul endpoint resolvers is annoying.
304 res_e = ir.get_resolver("endpoint")
305 res_k_e = ir.get_resolver("kubernetes-endpoint")
306
307 if not res_e and not res_k_e:
308 # Neither exists. Create them from scratch.
309
310 resolver_config = {
311 "apiVersion": "getambassador.io/v3alpha1",
312 "kind": "KubernetesEndpointResolver",
313 "name": "kubernetes-endpoint",
314 }
315
316 if Config.single_namespace:
317 resolver_config["namespace"] = Config.ambassador_namespace
318
319 ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
320
321 resolver_config["name"] = "endpoint"
322
323 ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
324 else:
325 cls.check_aliases(ir, aconf, "endpoint", res_e, "kubernetes-endpoint", res_k_e)
326
327 res_c = ir.get_resolver("consul")
328 res_c_e = ir.get_resolver("consul-endpoint")
329
330 if not res_c and not res_c_e:
331 # Neither exists. Create them from scratch.
332
333 resolver_config = {
334 "apiVersion": "getambassador.io/v3alpha1",
335 "kind": "ConsulResolver",
336 "name": "consul-endpoint",
337 "datacenter": "dc1",
338 }
339
340 ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
341
342 resolver_config["name"] = "consul"
343
344 ir.add_resolver(IRServiceResolver(ir, aconf, **resolver_config))
345 else:
346 cls.check_aliases(ir, aconf, "consul", res_c, "consul-endpoint", res_c_e)
347
348 @classmethod
349 def check_aliases(
350 cls,
351 ir: "IR",
352 aconf: Config,
353 n1: str,
354 r1: Optional[IRServiceResolver],
355 n2: str,
356 r2: Optional[IRServiceResolver],
357 ) -> None:
358 source = None
359 name = None
360
361 if not r1:
362 # r2 must exist to be here.
363 source = r2
364 name = n1
365 elif not r2:
366 # r1 must exist to be here.
367 source = r1
368 name = n2
369
370 if source:
371 config = dict(**source.as_dict())
372
373 # Fix up this dict. Sigh.
374 config["rkey"] = config.pop("_rkey", config.get("rkey", None)) # Kludge, I know...
375 config.pop("_errored", None)
376 config.pop("_active", None)
377 config.pop("resolve_with", None)
378
379 config["name"] = name
380
381 ir.add_resolver(IRServiceResolver(ir, aconf, **config))
382
383
384def is_ip_address(addr: str) -> bool:
385 try:
386 x = ip_address(addr)
387 return True
388 except ValueError:
389 return False
View as plain text