1# Copyright 2018 Datawire. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License
14
15import re
16import urllib.parse
17from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
18from typing import cast as typecast
19
20from ..config import Config
21from ..utils import RichStatus
22from .irhealthchecks import IRHealthChecks
23from .irresource import IRResource
24from .irtlscontext import IRTLSContext
25
26if TYPE_CHECKING:
27 from .ir import IR # pragma: no cover
28 from .ir.irserviceresolver import IRServiceResolver # pragma: no cover
29
30#############################################################################
31## ircluster.py -- the ircluster configuration object for Ambassador
32##
33## IRCluster represents an Envoy cluster: a collection of endpoints that
34## provide a single service. IRClusters get used for quite a few different
35## things in Ambassador -- they are basically the generic "upstream service"
36## entity.
37
38
39class IRCluster(IRResource):
40 def __init__(
41 self,
42 ir: "IR",
43 aconf: Config,
44 parent_ir_resource: "IRResource",
45 location: str, # REQUIRED
46 service: str, # REQUIRED
47 resolver: Optional[str] = None,
48 connect_timeout_ms: Optional[int] = 3000,
49 cluster_idle_timeout_ms: Optional[int] = None,
50 cluster_max_connection_lifetime_ms: Optional[int] = None,
51 marker: Optional[str] = None, # extra marker for this context name
52 stats_name: Optional[str] = None, # Override the stats name for this cluster
53 ctx_name: Optional[Union[str, bool]] = None,
54 host_rewrite: Optional[str] = None,
55 dns_type: Optional[str] = "strict_dns",
56 enable_ipv4: Optional[bool] = None,
57 enable_ipv6: Optional[bool] = None,
58 lb_type: str = "round_robin",
59 grpc: Optional[bool] = False,
60 allow_scheme: Optional[bool] = True,
61 load_balancer: Optional[dict] = None,
62 keepalive: Optional[dict] = None,
63 circuit_breakers: Optional[list] = None,
64 respect_dns_ttl: Optional[bool] = False,
65 health_checks: Optional[IRHealthChecks] = None,
66 rkey: str = "-override-",
67 kind: str = "IRCluster",
68 apiVersion: str = "getambassador.io/v0", # Not a typo! See below.
69 **kwargs,
70 ) -> None:
71 # Step one: look at the service and such and figure out a cluster name
72 # and TLS origination info.
73
74 # Here's how it goes:
75 # - If allow_scheme is True and the service starts with https://, it is forced
76 # to originate TLS.
77 # - Else, if allow_scheme is True and the service starts with http://, it is
78 # forced to _not_ originate TLS.
79 # - Else, if we have a context (either a string that names a valid context,
80 # or the boolean value True), it will originate TLS.
81 #
82 # After figuring that out, if we have a context which is a string value,
83 # we try to use that context name to look up certs to use. If we can't
84 # find any, we won't send any originating cert.
85 #
86 # Finally, if no port is present in the service already, we force port 443
87 # if we're originating TLS, 80 if not.
88
89 originate_tls: bool = False
90 name_fields: List[str] = ["cluster"]
91 ctx: Optional[IRTLSContext] = None
92 errors: List[str] = []
93 unknown_breakers = 0
94
95 # Do we have a marker?
96 if marker:
97 name_fields.append(marker)
98
99 # Set this flag to True if you discover something that's grave enough to warrant ignoring this cluster
100 self.ignore_cluster = False
101
102 self.logger = ir.logger
103
104 # Toss in the original service before we mess with it, too.
105 name_fields.append(service)
106
107 # If we have a ctx_name, does it match a real context?
108 if ctx_name:
109 if ctx_name is True:
110 ir.logger.debug("using null context")
111 ctx = IRTLSContext.null_context(ir=ir)
112 else:
113 ir.logger.debug("seeking named context %s" % ctx_name)
114 ctx = ir.get_tls_context(typecast(str, ctx_name))
115
116 if not ctx:
117 ir.logger.debug("no named context %s" % ctx_name)
118 errors.append("Originate-TLS context %s is not defined" % ctx_name)
119 else:
120 ir.logger.debug("found context %s" % ctx)
121
122 # TODO: lots of duplication of here, need to replace with broken down functions
123
124 if allow_scheme and service.lower().startswith("https://"):
125 service = service[len("https://") :]
126
127 originate_tls = True
128 name_fields.append("otls")
129
130 elif allow_scheme and service.lower().startswith("http://"):
131 service = service[len("http://") :]
132
133 if ctx:
134 errors.append(
135 "Originate-TLS context %s being used even though service %s lists HTTP"
136 % (ctx_name, service)
137 )
138 originate_tls = True
139 name_fields.append("otls")
140 else:
141 originate_tls = False
142
143 elif ctx:
144 # No scheme (or schemes are ignored), but we have a context.
145 originate_tls = True
146 name_fields.append("otls")
147 name_fields.append(ctx.name)
148
149 if "://" in service:
150 # WTF is this?
151 idx = service.index("://")
152 scheme = service[0:idx]
153
154 if allow_scheme:
155 errors.append(
156 "service %s has unknown scheme %s, assuming %s"
157 % (service, scheme, "HTTPS" if originate_tls else "HTTP")
158 )
159 else:
160 errors.append(
161 "ignoring scheme %s for service %s, since it is being used for a non-HTTP mapping"
162 % (scheme, service)
163 )
164
165 service = service[idx + 3 :]
166
167 # XXX Should this be checking originate_tls? Why does it do that?
168 if originate_tls and host_rewrite:
169 name_fields.append("hr-%s" % host_rewrite)
170
171 # Parse the service as a URL. Note that we have to supply a scheme to urllib's
172 # parser, because it's kind of stupid.
173
174 ir.logger.debug("cluster setup: service %s otls %s ctx %s" % (service, originate_tls, ctx))
175 p = urllib.parse.urlparse("random://" + service)
176
177 # Is there any junk after the host?
178
179 if p.path or p.params or p.query or p.fragment:
180 errors.append(
181 "service %s has extra URL components; ignoring everything but the host and port"
182 % service
183 )
184
185 # p is read-only, so break stuff out.
186
187 hostname = p.hostname
188 namespace = parent_ir_resource.namespace
189 # Make sure we save the namespace in the cluster name, to prevent clashes with non-fully qualified service resolution
190 name_fields.append(namespace)
191
192 # Do we actually have a hostname?
193 if not hostname:
194 # We don't. That ain't good.
195 errors.append(
196 "service %s has no hostname and will be ignored; please re-configure" % service
197 )
198 self.ignore_cluster = True
199 hostname = "unknown"
200
201 try:
202 port = p.port
203 except ValueError as e:
204 errors.append(
205 "found invalid port for service {}. Please specify a valid port between 0 and 65535 - {}. Service {} cluster will be ignored, please re-configure".format(
206 service, e, service
207 )
208 )
209 self.ignore_cluster = True
210 port = 0
211
212 # If the port is unset, fix it up.
213 if not port:
214 port = 443 if originate_tls else 80
215
216 # Rebuild the URL with the 'tcp' scheme and our changed info.
217 # (Yes, really, TCP. Envoy uses the TLS context to determine whether to originate
218 # TLS. Kind of odd, but there we go.)
219 url = "tcp://%s:%d" % (hostname, port)
220
221 # Is there a circuit breaker involved here?
222 if circuit_breakers:
223 for breaker in circuit_breakers:
224 name = breaker.get("_name", None)
225
226 if name:
227 name_fields.append(name)
228 else:
229 # This is "impossible", but... let it go I guess?
230 errors.append(f"{service}: unvalidated circuit breaker {breaker}!")
231 name_fields.append(f"cbu{unknown_breakers}")
232 unknown_breakers += 1
233
234 # The Ambassador module will always have a load_balancer (which may be None).
235 global_load_balancer = ir.ambassador_module.load_balancer
236
237 if not load_balancer:
238 load_balancer = global_load_balancer
239
240 self.logger.debug(f"Load balancer for {url} is {load_balancer}")
241
242 enable_endpoints = False
243
244 if self.endpoints_required(load_balancer):
245 if not Config.enable_endpoints:
246 # Bzzt.
247 errors.append(
248 f"{service}: endpoint routing is not enabled, falling back to {global_load_balancer}"
249 )
250 load_balancer = global_load_balancer
251 else:
252 enable_endpoints = True
253
254 if load_balancer:
255 # This is used only for cluster naming; it doesn't need to be a real
256 # load balancer policy.
257
258 lb_type = load_balancer.get("policy", "default")
259
260 key_fields = ["er", lb_type.lower()]
261
262 # XXX Should we really include these things?
263 if "header" in load_balancer:
264 key_fields.append("hdr")
265 key_fields.append(load_balancer["header"])
266
267 if "cookie" in load_balancer:
268 key_fields.append("cookie")
269 key_fields.append(load_balancer["cookie"]["name"])
270
271 if "source_ip" in load_balancer:
272 key_fields.append("srcip")
273
274 name_fields.append("-".join(key_fields))
275
276 # Finally we can construct the cluster name.
277 name = "_".join(name_fields)
278 name = re.sub(r"[^0-9A-Za-z_]", "_", name)
279
280 # OK. Build our default args.
281 #
282 # XXX We should really save the hostname and the port, not the URL.
283
284 if enable_ipv4 is None:
285 enable_ipv4 = ir.ambassador_module.enable_ipv4
286 ir.logger.debug(
287 "%s: copying enable_ipv4 %s from Ambassador Module" % (name, enable_ipv4)
288 )
289
290 if enable_ipv6 is None:
291 enable_ipv6 = ir.ambassador_module.enable_ipv6
292 ir.logger.debug(
293 "%s: copying enable_ipv6 %s from Ambassador Module" % (name, enable_ipv6)
294 )
295
296 new_args: Dict[str, Any] = {
297 "type": dns_type,
298 "lb_type": lb_type,
299 "urls": [url], # TODO: Should we completely eliminate `urls` in favor of `targets`?
300 "load_balancer": load_balancer,
301 "keepalive": keepalive,
302 "circuit_breakers": circuit_breakers,
303 "service": service,
304 "enable_ipv4": enable_ipv4,
305 "enable_ipv6": enable_ipv6,
306 "enable_endpoints": enable_endpoints,
307 "connect_timeout_ms": connect_timeout_ms,
308 "cluster_idle_timeout_ms": cluster_idle_timeout_ms,
309 "cluster_max_connection_lifetime_ms": cluster_max_connection_lifetime_ms,
310 "respect_dns_ttl": respect_dns_ttl,
311 "health_checks": health_checks,
312 }
313
314 # If we have a stats_name, use it. If not, default it to the service to make life
315 # easier for people trying to find stats later -- but translate unusual characters
316 # to underscores, just in case.
317
318 if stats_name:
319 new_args["stats_name"] = stats_name
320 else:
321 new_args["stats_name"] = re.sub(r"[^0-9A-Za-z_]", "_", service)
322
323 if grpc:
324 new_args["grpc"] = True
325
326 if host_rewrite:
327 new_args["host_rewrite"] = host_rewrite
328
329 if originate_tls:
330 if ctx:
331 new_args["tls_context"] = typecast(IRTLSContext, ctx)
332 else:
333 new_args["tls_context"] = IRTLSContext.null_context(ir=ir)
334
335 if rkey == "-override-":
336 rkey = name
337
338 # Stash the resolver, hostname, and port for setup.
339 self._resolver = resolver
340 self._hostname = hostname
341 self._namespace = namespace
342 self._port = port
343 self._is_sidecar = False
344
345 if self._hostname == "127.0.0.1" and self._port == 8500:
346 self._is_sidecar = True
347
348 super().__init__(
349 ir=ir,
350 aconf=aconf,
351 rkey=rkey,
352 location=location,
353 kind=kind,
354 name=name,
355 apiVersion=apiVersion,
356 **new_args,
357 )
358
359 if ctx:
360 ctx.referenced_by(self)
361
362 if errors:
363 for error in errors:
364 ir.post_error(error, resource=self)
365
366 def setup(self, ir: "IR", aconf: Config) -> bool:
367 self._cache_key = f"Cluster-{self.name}"
368
369 if self.ignore_cluster:
370 return False
371
372 # Resolve our actual targets.
373 targets = ir.resolve_targets(
374 self, self._resolver, self._hostname, self._namespace, self._port
375 )
376
377 self.targets = targets
378
379 if not targets:
380 self.ir.logger.debug("accepting cluster with no endpoints: %s" % self.name)
381
382 # If we have health checking config then generate IR for it
383 if "health_checks" in self:
384 self.health_checks = IRHealthChecks(ir, aconf, self.get("health_checks", None))
385 return True
386
387 def is_edge_stack_sidecar(self) -> bool:
388 return self.is_active() and self._is_sidecar
389
390 def endpoints_required(self, load_balancer) -> bool:
391 required = False
392
393 if load_balancer:
394 lb_policy = load_balancer.get("policy")
395
396 if lb_policy in ["round_robin", "least_request", "ring_hash", "maglev"]:
397 self.logger.debug(
398 "Endpoints are required for load balancing policy {}".format(lb_policy)
399 )
400 required = True
401
402 return required
403
404 def add_url(self, url: str) -> List[str]:
405 self.urls.append(url)
406
407 return self.urls
408
409 def merge(self, other: "IRCluster") -> bool:
410 # Is this mergeable?
411
412 mismatches = []
413
414 for key in [
415 "type",
416 "lb_type",
417 "host_rewrite",
418 "tls_context",
419 "originate_tls",
420 "grpc",
421 "connect_timeout_ms",
422 "cluster_idle_timeout_ms",
423 "cluster_max_connection_lifetime_ms",
424 ]:
425 if self.get(key, None) != other.get(key, None):
426 mismatches.append(key)
427
428 if mismatches:
429 self.post_error(
430 RichStatus.fromError(
431 "cannot merge cluster %s: mismatched attributes %s"
432 % (other.name, ", ".join(mismatches))
433 )
434 )
435 return False
436
437 # All good.
438 if other.urls:
439 self.referenced_by(other)
440
441 for url in other.urls:
442 self.add_url(url)
443
444 if other.targets:
445 self.referenced_by(other)
446 if self.targets == None:
447 self.targets = other.targets
448 else:
449 self.targets = (
450 typecast(List[Dict[str, Union[int, str]]], self.targets) + other.targets
451 )
452
453 return True
454
455 def get_resolver(self) -> "IRServiceResolver":
456 return self.ir.resolve_resolver(self, self._resolver)
457
458 def clustermap_entry(self) -> Dict:
459 return self.get_resolver().clustermap_entry(
460 self.ir, self, self._hostname, self._namespace, self._port
461 )
View as plain text