1# Copyright 2018 Datawire. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License
14import hashlib
15import logging
16import os
17from ipaddress import ip_address
18from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, ValuesView
19from typing import cast as typecast
20
21from ..cache import Cache, NullCache
22from ..config import Config
23from ..constants import Constants
24from ..fetch import ResourceFetcher
25from ..utils import RichStatus, SavedSecret, SecretHandler, SecretInfo, dump_json, parse_bool
26from ..VERSION import Commit, Version
27from .irambassador import IRAmbassador
28from .irauth import IRAuth
29from .irbasemapping import IRBaseMapping
30from .irbasemappinggroup import IRBaseMappingGroup
31from .ircluster import IRCluster
32from .irerrorresponse import IRErrorResponse
33from .irfilter import IRFilter
34from .irhost import HostFactory, IRHost
35from .irhttpmapping import IRHTTPMapping
36from .irlistener import IRListener, ListenerFactory
37from .irlogservice import IRLogService, IRLogServiceFactory
38from .irmappingfactory import MappingFactory
39from .irratelimit import IRRateLimit
40from .irresource import IRResource
41from .irserviceresolver import IRServiceResolver, IRServiceResolverFactory, SvcEndpointSet
42from .irtls import IRAmbassadorTLS, TLSModuleFactory
43from .irtlscontext import IRTLSContext, TLSContextFactory
44from .irtracing import IRTracing
45
46#############################################################################
47## ir.py -- the Ambassador Intermediate Representation (IR)
48##
49## After getting an ambassador.Config, you can create an ambassador.IR. The
50## IR is the basis for everything else: you can use it to configure an Envoy
51## or to run diagnostics.
52##
53## IRs are not meant to be terribly long-lived: if anything at all changes
54## in your world, you should toss the IR and make a new one. In particular,
55## it is _absolutely not OK_ to try to edit the contents of an IR and then
56## re-run any of the generators -- IRs are to be considered immutable once
57## created.
58##
59## This goes double in the incremental-reconfiguration world: the IRResources
60## that make up the IR all point back to their IR to make life easier on the
61## generators, so - to ease the transition to the incremental-reconfiguration
62## world - right now we reset the IR pointer when we pull these objects out
63## the cache. In the future this should be fixed, but at present, you can
64## really mess up your world if you try to have two active IRs sharing a
65## cache.
66
67
68IRFileChecker = Callable[[str], bool]
69
70
71class IR:
72 ambassador_module: IRAmbassador
73 ambassador_id: str
74 ambassador_namespace: str
75 ambassador_nodename: str
76 aconf: Config
77 cache: Cache
78 clusters: Dict[str, IRCluster]
79 agent_active: bool
80 agent_service: Optional[str]
81 agent_origination_ctx: Optional[IRTLSContext]
82 edge_stack_allowed: bool
83 file_checker: IRFileChecker
84 filters: List[IRFilter]
85 groups: Dict[str, IRBaseMappingGroup]
86 grpc_services: Dict[str, IRCluster]
87 hosts: Dict[str, IRHost]
88 invalid: List[Dict]
89 invalidate_groups_for: List[str]
90 # The key for listeners is "{socket_protocol}-{bindaddr}-{port}" (see IRListener.bind_to())
91 listeners: Dict[str, IRListener]
92 log_services: Dict[str, IRLogService]
93 ratelimit: Optional[IRRateLimit]
94 redirect_cleartext_from: Optional[int]
95 resolvers: Dict[str, IRServiceResolver]
96 router_config: Dict[str, Any]
97 saved_resources: Dict[str, IRResource]
98 saved_secrets: Dict[str, SavedSecret]
99 secret_handler: SecretHandler
100 secret_root: str
101 sidecar_cluster_name: Optional[str]
102 tls_contexts: Dict[str, IRTLSContext]
103 tls_module: Optional[IRAmbassadorTLS]
104 tracing: Optional[IRTracing]
105
106 @classmethod
107 def check_deltas(
108 cls, logger: logging.Logger, fetcher: "ResourceFetcher", cache: Optional[Cache] = None
109 ) -> Tuple[str, bool, List[str]]:
110 # Assume that this should be marked as a complete reconfigure, and that we'll be
111 # resetting the cache.
112 config_type = "complete"
113 reset_cache = True
114
115 # to_invalidate is the list of things we can invalidate right now. If we're
116 # running with a cache, every valid Delta will get its cache key added into
117 # to_invalidate; after we finish looking at all the deltas, we'll invalidate
118 # all the entries in this list.
119 #
120 # Mapping deltas, though, are more complex: not only must we invalidate the
121 # Mapping, but we _also_ need to invalidate any cached Group that contains
122 # the Mapping (otherwise, adding a new Mapping to a cached Group won't work).
123 # This is messy, because the Delta doesn't have the information we need to
124 # compute the Group's cache key.
125 #
126 # We deal with this by adding the cache keys of any Mapping deltas to the
127 # invalidate_groups_for list, and then handing that to the IR so that the
128 # MappingFactory can use it to do the right thing.
129 #
130 # "But wait," I hear you cry, "you're only checking Mappings and TCPMappings
131 # right now anyway, so why bother separating these things?" That's because
132 # we expect the use of the cache to broaden, so we'll just go ahead and do
133 # this.
134 to_invalidate: List[str] = []
135 invalidate_groups_for: List[str] = []
136
137 # OK. If we don't have a cache, just skip all this crap.
138 if cache is not None:
139 # We have a cache. Start by assuming that we'll need to reset it,
140 # unless there are no deltas at all.
141 reset_cache = len(fetcher.deltas) > 0
142
143 # Next up: are there any deltas?
144 if fetcher.deltas:
145 # Yes. We're going to walk over them all and assemble a list
146 # of things to delete and a count of errors while processing our
147 # list.
148
149 delta_errors = 0
150
151 for delta in fetcher.deltas:
152 logger.debug(f"Delta: {delta}")
153
154 # The "kind" of a Delta must be a string; assert that to make
155 # mypy happy.
156
157 delta_kind = delta["kind"]
158 assert isinstance(delta_kind, str)
159
160 # Only worry about Mappings and TCPMappings right now.
161 if (delta_kind == "Mapping") or (delta_kind == "TCPMapping"):
162 # XXX C'mon, mypy, is this cast really necessary?
163 metadata = typecast(Dict[str, str], delta.get("metadata", {}))
164 name = metadata.get("name", "")
165 namespace = metadata.get("namespace", "")
166
167 if not name or not namespace:
168 # This is an error.
169 delta_errors += 1
170
171 logger.error(f"Delta object needs name and namespace: {delta}")
172 else:
173 key = IRBaseMapping.make_cache_key(delta_kind, name, namespace)
174 to_invalidate.append(key)
175
176 # If we're invalidating the Mapping, we need to invalidate its Group.
177 invalidate_groups_for.append(key)
178
179 # OK. If we have things to invalidate, and we have NO ERRORS...
180 if to_invalidate and not delta_errors:
181 # ...then we can invalidate all those things instead of clearing the cache.
182 reset_cache = False
183
184 for key in to_invalidate:
185 logger.debug(f"Delta: invalidating {key}")
186 cache.invalidate(key)
187
188 # When all is said and done, it's an incremental if we don't need to reset
189 # the cache.
190 if not reset_cache:
191 config_type = "incremental"
192
193 # This is _not_ an incremental reconfigure. Reset the cache...
194 else:
195 # OK, we're doing an incremental reconfigure.
196 config_type = "incremental"
197
198 cache.dump("Checking incoming deltas (reset_cache %s)", reset_cache)
199
200 return (config_type, reset_cache, invalidate_groups_for)
201
202 def __init__(
203 self,
204 aconf: Config,
205 secret_handler: SecretHandler,
206 file_checker: Optional[IRFileChecker] = None,
207 logger: Optional[logging.Logger] = None,
208 invalidate_groups_for: Optional[List[str]] = None,
209 cache: Optional[Cache] = None,
210 watch_only=False,
211 ) -> None:
212 # Initialize the basics...
213 self.ambassador_id = Config.ambassador_id
214 self.ambassador_namespace = Config.ambassador_namespace
215 self.ambassador_nodename = aconf.ambassador_nodename
216 self.statsd = aconf.statsd
217
218 # ...then make sure we have a logger...
219 self.logger = logger or logging.getLogger("ambassador.ir")
220
221 # ...then make sure we have a cache (which might be a NullCache)...
222 self.cache = cache or NullCache(self.logger)
223 self.invalidate_groups_for = invalidate_groups_for or []
224
225 # ...then, finally, grab all the invalid objects from the aconf. This is for metrics later.
226 self.invalid = aconf.invalid
227
228 self.cache.dump("Fetcher")
229
230 # We're using setattr since since mypy complains about assigning directly to a method.
231 secret_root = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
232
233 # This setattr business is because mypy seems to think that, since self.file_checker is
234 # callable, any mention of self.file_checker must be a function call. Sigh.
235 setattr(self, "file_checker", file_checker if file_checker is not None else os.path.isfile)
236
237 # The secret_handler is _required_.
238 self.secret_handler = secret_handler
239
240 assert self.secret_handler, "Ambassador.IR requires a SecretHandler at initialization"
241
242 self.logger.debug("IR __init__:")
243 self.logger.debug("IR: Version %s built from commit %s" % (Version, Commit))
244 self.logger.debug("IR: AMBASSADOR_ID %s" % self.ambassador_id)
245 self.logger.debug("IR: Namespace %s" % self.ambassador_namespace)
246 self.logger.debug("IR: Nodename %s" % self.ambassador_nodename)
247 self.logger.debug(
248 "IR: Endpoints %s" % "enabled" if Config.enable_endpoints else "disabled"
249 )
250
251 self.logger.debug("IR: file checker: %s" % getattr(self, "file_checker").__name__)
252 self.logger.debug("IR: secret handler: %s" % type(self.secret_handler).__name__)
253
254 # First up: save the Config object. Its source map may be necessary later.
255 self.aconf = aconf
256
257 # Next, we'll want a way to keep track of resources we end up working
258 # with. It starts out empty.
259 self.saved_resources = {}
260
261 # Also, we have no saved secret stuff yet...
262 self.saved_secrets = {}
263 self.secret_info: Dict[str, SecretInfo] = {}
264
265 # ...and the initial IR state is empty _except for k8s_status_updates_.
266 #
267 # Note that we use a map for clusters, not a list -- the reason is that
268 # multiple mappings can use the same service, and we don't want multiple
269 # clusters.
270
271 self.breakers = {}
272 self.clusters = {}
273 self.filters = []
274 self.groups = {}
275 self.grpc_services = {}
276 self.hosts = {}
277 # self.invalidate_groups_for is handled above.
278 # self.k8s_status_updates is handled below.
279 self.listeners = {}
280 self.log_services = {}
281 self.outliers = {}
282 self.ratelimit = None
283 self.redirect_cleartext_from = None
284 self.resolvers = {}
285 self.saved_secrets = {}
286 self.secret_info = {}
287 self.services = {}
288 self.sidecar_cluster_name = None
289 self.tls_contexts = {}
290 self.tls_module = None
291 self.tracing = None
292
293 # Copy k8s_status_updates from our aconf.
294 self.k8s_status_updates = aconf.k8s_status_updates
295
296 # Check on the intercept agent and edge stack. Note that the Edge Stack touchfile is _not_
297 # within $AMBASSADOR_CONFIG_BASE_DIR: it stays in /ambassador no matter what.
298
299 self.agent_active = os.environ.get("AGENT_SERVICE", None) != None
300 # Allow an environment variable to state whether we're in Edge Stack. But keep the
301 # existing condition as sufficient, so that there is less of a chance of breaking
302 # things running in a container with this file present.
303 self.edge_stack_allowed = parse_bool(
304 os.environ.get("EDGE_STACK", "false")
305 ) or os.path.exists("/ambassador/.edge_stack")
306 self.agent_origination_ctx = None
307
308 # OK, time to get this show on the road. First things first: set up the
309 # Ambassador module.
310 #
311 # The Ambassador module is special: it doesn't do anything in its setup() method, but
312 # instead defers all its heavy lifting to its finalize() method. Why? Because we need
313 # to create the Ambassador module very early to allow IRResource.lookup() to work, but
314 # we need to go pull in secrets and such before we can get all the Ambassador-module
315 # stuff fully set up.
316 #
317 # So. First, create the module.
318 self.ambassador_module = typecast(
319 IRAmbassador, self.save_resource(IRAmbassador(self, aconf))
320 )
321
322 # Next, grab whatever information our aconf has about secrets...
323 self.save_secret_info(aconf)
324
325 # ...and then it's on to default TLS stuff, both from the TLS module and from
326 # any TLS contexts.
327 #
328 # XXX This feels like a hack -- shouldn't it be class-wide initialization
329 # in TLSModule or TLSContext? So far it's the only place we need anything like
330 # this though.
331
332 TLSModuleFactory.load_all(self, aconf)
333 TLSContextFactory.load_all(self, aconf)
334
335 # After TLSContexts, grab Listeners...
336 ListenerFactory.load_all(self, aconf)
337
338 # Now that we know all of the listeners, we can check to see if there are any shared bindings
339 # accross protocols (TCP & UDP sharing same addres & port). When a TCP/HTTP listener binds
340 # to the same address and port of the UPD/HTTP Listener then it will be marked as http3_enabled=True.
341 # This causes the `alt-svc` header to be auto-injected into http responses on the TCP/HTTP responses.
342 # The alt-service header notifies clients (browsers, curl, libraries) that they can upgrade
343 # TCP connections to UDP (HTTP/3) connections.
344 #
345 # Note: at first glance it would seem this logic should sit inside the Listener class but
346 # we wait until all the listeners are loaded so that we can check for the existance of a
347 # "companion" TCP Listener. If a UDP listener was the first to be parsed then
348 # we wouldn't know at that time. Thus we need to wait until after all of them have been loaded.
349 udp_listeners = (l for l in self.listeners.values() if l.socket_protocol == "UDP")
350 for udp_listener in udp_listeners:
351 ## this matches the `listener.bind_to` for the tcp listener
352 tcp_listener_key = f"tcp-{udp_listener.bind_address}-{udp_listener.port}"
353 tcp_listener = self.listeners.get(tcp_listener_key, None)
354
355 if tcp_listener is not None:
356 tcp_listener.http3_enabled = True
357
358 if "HTTP" in tcp_listener.protocolStack:
359 tcp_listener.http3_enabled = True
360
361 # ...then grab whatever we know about Hosts...
362 HostFactory.load_all(self, aconf)
363
364 # ...then set up for the intercept agent, if that's a thing.
365 self.agent_init(aconf)
366
367 # Finally, finalize all the Host stuff (including the !*@#&!* fallback context)...
368 HostFactory.finalize(self, aconf)
369
370 # Now we can finalize the Ambassador module, to tidy up secrets et al. We do this
371 # here so that secrets and TLS contexts are available.
372 if not self.ambassador_module.finalize(self, aconf):
373 # Uhoh.
374 self.ambassador_module.set_active(False) # This can't be good.
375
376 _activity_str = "watching" if watch_only else "starting"
377 _mode_str = "OSS"
378
379 if self.agent_active:
380 _mode_str = "Intercept Agent"
381 elif self.edge_stack_allowed:
382 _mode_str = "Edge Stack"
383
384 self.logger.debug(f"IR: {_activity_str} {_mode_str}")
385
386 # Next up, initialize our IRServiceResolvers...
387 IRServiceResolverFactory.load_all(self, aconf)
388
389 # ...and then we can finalize the agent, if that's a thing.
390 self.agent_finalize(aconf)
391
392 # Once here, if we're only watching, we're done.
393 if watch_only:
394 return
395
396 # REMEMBER FOR SAVING YOU NEED TO CALL save_resource!
397 # THIS IS VERY IMPORTANT!
398
399 # Save circuit breakers, outliers, and services.
400 self.breakers = aconf.get_config("CircuitBreaker") or {}
401 self.outliers = aconf.get_config("OutlierDetection") or {}
402 self.services = aconf.get_config("service") or {}
403
404 # Save tracing, ratelimit, and logging settings.
405 self.tracing = typecast(IRTracing, self.save_resource(IRTracing(self, aconf)))
406 self.ratelimit = typecast(IRRateLimit, self.save_resource(IRRateLimit(self, aconf)))
407 IRLogServiceFactory.load_all(self, aconf)
408
409 # After the Ambassador and TLS modules are done, we need to set up the
410 # filter chains. Note that order of the filters matters. Start with CORS,
411 # so that preflights will work even for things behind auth.
412
413 self.save_filter(
414 IRFilter(ir=self, aconf=aconf, rkey="ir.cors", kind="ir.cors", name="cors", config={})
415 )
416
417 # Next is auth...
418 self.save_filter(IRAuth(self, aconf))
419
420 # ...then the ratelimit filter...
421 if self.ratelimit:
422 self.save_filter(self.ratelimit, already_saved=True)
423
424 # ...and the error response filter...
425 self.save_filter(
426 IRErrorResponse(
427 self,
428 aconf,
429 self.ambassador_module.get("error_response_overrides", None),
430 referenced_by_obj=self.ambassador_module,
431 )
432 )
433
434 # ...and, finally, the barely-configurable router filter.
435 router_config = {}
436
437 if self.tracing:
438 router_config["start_child_span"] = True
439
440 self.save_filter(
441 IRFilter(
442 ir=self,
443 aconf=aconf,
444 rkey="ir.router",
445 kind="ir.router",
446 name="router",
447 type="decoder",
448 config=router_config,
449 )
450 )
451
452 # We would handle other modules here -- but guess what? There aren't any.
453 # At this point ambassador, tls, and the deprecated auth module are all there
454 # are, and they're handled above. So. At this point go sort out all the Mappings.
455 MappingFactory.load_all(self, aconf)
456
457 self.walk_saved_resources(aconf, "add_mappings")
458
459 TLSModuleFactory.finalize(self, aconf)
460 MappingFactory.finalize(self, aconf)
461
462 # We can't finalize the listeners until _after_ we have all the TCPMapping
463 # information we might need, so that happens here.
464 ListenerFactory.finalize(self, aconf)
465
466 # At this point we should know the full set of clusters, so we can generate
467 # appropriate envoy names.
468 #
469 # Envoy cluster name generation happens in two steps. First, we check every
470 # cluster and set the envoy name to the cluster name if it is short enough.
471 # If it isn't, we group all of the long cluster names by a common prefix
472 # and normalize them later.
473 #
474 # This ensures that:
475 # - All IRCluster objects have an envoy_name
476 # - All envoy_name fields are valid cluster names, ie: they are short enough
477 collisions: Dict[str, List[str]] = {}
478
479 for name in sorted(self.clusters.keys()):
480 if len(name) > 60:
481 # Too long. Gather this cluster by name prefix and normalize
482 # its name below.
483 h = hashlib.new("sha1")
484 h.update(name.encode("utf-8"))
485 hd = h.hexdigest()[0:16].upper()
486
487 short_name = name[0:40] + "-" + hd
488
489 cluster = self.clusters[name]
490 self.logger.debug(f"COLLISION: compress {name} to {short_name}")
491
492 collision_list = collisions.setdefault(short_name, [])
493 collision_list.append(name)
494 else:
495 # Short enough, set the envoy name to the cluster name.
496 self.clusters[name]["envoy_name"] = name
497
498 for short_name in sorted(collisions.keys()):
499 name_list = collisions[short_name]
500
501 i = 0
502
503 for name in sorted(name_list):
504 mangled_name = "%s-%d" % (short_name, i)
505 i += 1
506
507 cluster = self.clusters[name]
508 self.logger.debug("COLLISION: mangle %s => %s" % (name, mangled_name))
509
510 # We must not modify a cluster's name (nor its rkey, for that matter)
511 # because our object caching implementation depends on stable object
512 # names and keys. If we were to update it, we could lose track of an
513 # existing object and accidentally create a duplicate (tested in
514 # python/tests/test_cache.py test_long_cluster_1).
515 #
516 # Instead, the resulting IR must set envoy_name to the mangled name, which
517 # is guaranteed to be valid in envoy configuration.
518 #
519 # An important consequence of this choice is that we must never read back
520 # envoy config to create IRCluster config, since the cluster names are
521 # not necessarily the same. This is currently fine, since we never use
522 # envoy config as a source of truth - we leave that to the cluster annotations
523 # and CRDs.
524 #
525 # Another important consideration is that when the cache is active, we need
526 # to shred any cached cluster with this mangled_name, because the mangled_name
527 # can change as new clusters appear! This is obviously not ideal.
528 #
529 # XXX This is doubly a hack because it's duplicating this magic format from
530 # v2cluster.py and v3cluster.py.
531 self.cache.invalidate(f"V2-{cluster.cache_key}")
532 self.cache.invalidate(f"V3-{cluster.cache_key}")
533 self.cache.dump(
534 "Invalidate clusters V2-%s, V3-%s", cluster.cache_key, cluster.cache_key
535 )
536
537 # OK. Finally, we can update the envoy_name.
538 cluster["envoy_name"] = mangled_name
539 self.logger.debug("COLLISION: envoy_name %s" % cluster["envoy_name"])
540
541 # After we have the cluster names fixed up, go finalize filters.
542 if self.tracing:
543 self.tracing.finalize()
544
545 if self.ratelimit:
546 self.ratelimit.finalize()
547
548 for filter in self.filters:
549 filter.finalize()
550
551 # XXX Brutal hackery here! Probably this is a clue that Config and IR and such should have
552 # a common container that can hold errors.
553 def post_error(
554 self,
555 rc: Union[str, RichStatus],
556 resource: Optional[IRResource] = None,
557 rkey: Optional[str] = None,
558 log_level=logging.INFO,
559 ):
560 self.aconf.post_error(rc, resource=resource, rkey=rkey, log_level=log_level)
561
562 def agent_init(self, aconf: Config) -> None:
563 """
564 Initialize as the Intercept Agent, if we're doing that.
565
566 THIS WHOLE METHOD NEEDS TO GO AWAY: instead, just configure the agent with CRDs as usual.
567 However, that's just too painful to contemplate without `edgectl inject-agent`.
568
569 :param aconf: Config to work with
570 :return: None
571 """
572
573 # Intercept stuff is an Edge Stack thing.
574 if not (self.edge_stack_allowed and self.agent_active):
575 self.logger.debug("Intercept agent not active, skipping initialization")
576 return
577
578 self.agent_service = os.environ.get("AGENT_SERVICE", None)
579
580 if self.agent_service is None:
581 # This is technically impossible, but whatever.
582 self.logger.info("Intercept agent active but no AGENT_SERVICE? skipping initialization")
583 self.agent_active = False
584 return
585
586 self.logger.debug(f"Intercept agent active for {self.agent_service}, initializing")
587
588 # We're going to either create a Host to terminate TLS, or to do cleartext. In neither
589 # case will we do ACME. Set additionalPort to -1 so we don't grab 8080 in the TLS case.
590 host_args: Dict[str, Any] = {
591 "hostname": "*",
592 "selector": {"matchLabels": {"intercept": self.agent_service}},
593 "acmeProvider": {"authority": "none"},
594 "requestPolicy": {
595 "insecure": {
596 "additionalPort": -1,
597 },
598 },
599 }
600
601 # Have they asked us to do TLS?
602 agent_termination_secret = os.environ.get("AGENT_TLS_TERM_SECRET", None)
603
604 if agent_termination_secret:
605 # Yup.
606 host_args["tlsSecret"] = {"name": agent_termination_secret}
607 else:
608 # No termination secret, so do cleartext.
609 host_args["requestPolicy"]["insecure"]["action"] = "Route"
610
611 host = IRHost(
612 self,
613 aconf,
614 rkey=self.ambassador_module.rkey,
615 location=self.ambassador_module.location,
616 name="agent-host",
617 **host_args,
618 )
619
620 if host.is_active():
621 host.referenced_by(self.ambassador_module)
622 host.sourced_by(self.ambassador_module)
623
624 self.logger.debug(f"Intercept agent: saving host {host}")
625 # self.logger.debug(host.as_json())
626 self.save_host(host)
627 else:
628 self.logger.debug(f"Intercept agent: not saving inactive host {host}")
629
630 # How about originating TLS?
631 agent_origination_secret = os.environ.get("AGENT_TLS_ORIG_SECRET", None)
632
633 if agent_origination_secret:
634 # Uhhhh. Synthesize a TLSContext for this, I guess.
635 #
636 # XXX What if they already have a context with this name?
637 ctx = IRTLSContext(
638 self,
639 aconf,
640 rkey=self.ambassador_module.rkey,
641 location=self.ambassador_module.location,
642 name="agent-origination-context",
643 secret=agent_origination_secret,
644 )
645
646 ctx.referenced_by(self.ambassador_module)
647 self.save_tls_context(ctx)
648
649 self.logger.debug(f"Intercept agent: saving origination TLSContext {ctx.name}")
650 # self.logger.debug(ctx.as_json())
651
652 self.agent_origination_ctx = ctx
653
654 def agent_finalize(self, aconf) -> None:
655 if not (self.edge_stack_allowed and self.agent_active):
656 self.logger.debug(f"Intercept agent not active, skipping finalization")
657 return
658
659 # self.logger.info(f"Intercept agent active for {self.agent_service}, finalizing")
660
661 # We don't want to listen on the default AES ports (8080, 8443) as that is likely to
662 # conflict with the user's application running in the same Pod.
663 agent_listen_port_str = os.environ.get("AGENT_LISTEN_PORT", None)
664
665 agent_grpc = os.environ.get("AGENT_ENABLE_GRPC", "false")
666
667 if agent_listen_port_str is None:
668 self.ambassador_module.service_port = Constants.SERVICE_PORT_AGENT
669 else:
670 try:
671 self.ambassador_module.service_port = int(agent_listen_port_str)
672 except ValueError:
673 self.post_error(f"Intercept agent listen port {agent_listen_port_str} is not valid")
674 self.agent_active = False
675 return
676
677 agent_port_str = os.environ.get("AGENT_PORT", None)
678
679 if agent_port_str is None:
680 self.post_error("Intercept agent requires both AGENT_SERVICE and AGENT_PORT to be set")
681 self.agent_active = False
682 return
683
684 agent_port = -1
685
686 try:
687 agent_port = int(agent_port_str)
688 except:
689 self.post_error(f"Intercept agent port {agent_port_str} is not valid")
690 self.agent_active = False
691 return
692
693 # self.logger.info(f"Intercept agent active for {self.agent_service}:{agent_port}, adding fallback mapping")
694
695 # XXX OMG this is a crock. Don't use precedence -1000000 for this, because otherwise Edge
696 # Stack might decide it's the Edge Policy Console fallback mapping and force it to be
697 # routed insecure. !*@&#*!@&#* We need per-mapping security settings.
698 #
699 # XXX What if they already have a mapping with this name?
700
701 ctx_name = None
702
703 if self.agent_origination_ctx:
704 ctx_name = self.agent_origination_ctx.name
705
706 mapping = IRHTTPMapping(
707 self,
708 aconf,
709 rkey=self.ambassador_module.rkey,
710 location=self.ambassador_module.location,
711 name="agent-fallback-mapping",
712 metadata_labels={"ambassador_diag_class": "private"},
713 prefix="/",
714 rewrite="/",
715 service=f"127.0.0.1:{agent_port}",
716 grpc=agent_grpc,
717 # Making sure we don't have shorter timeouts on intercepts than the original Mapping
718 timeout_ms=60000,
719 idle_timeout_ms=60000,
720 tls=ctx_name,
721 precedence=-999999,
722 ) # No, really. See comment above.
723
724 mapping.referenced_by(self.ambassador_module)
725 self.add_mapping(aconf, mapping)
726
727 def cache_fetch(self, key: str) -> Optional[IRResource]:
728 """
729 Fetch a key from our cache. If we get anything, make sure that its
730 IR pointer is set back to us -- since the cache can easily outlive
731 the IR, chances are pretty high that the object might've originally
732 been part of a different IR.
733
734 Yes, this implies that trying to use the cache for multiple IRs at
735 the same time is a Very Bad Idea.
736 """
737
738 rsrc = self.cache[key]
739
740 # Did we get anything?
741 if rsrc is not None:
742 # By definition, anything the IR layer pulls from the cache must be
743 # an IRResource.
744 assert isinstance(rsrc, IRResource)
745
746 # Since it's an IRResource, it has a pointer to the IR. Reset that.
747 rsrc.ir = self
748
749 return rsrc
750
751 def cache_add(self, rsrc: IRResource) -> None:
752 """
753 Add an IRResource to our cache. Mostly this is here to let mypy check
754 that everything cached by the IR layer is an IRResource.
755 """
756 self.cache.add(rsrc)
757
758 def cache_link(self, owner: IRResource, owned: IRResource) -> None:
759 """
760 Link two IRResources in our cache. Mostly this is here to let mypy check
761 that everything linked by the IR layer is an IRResource.
762 """
763 self.cache.link(owner, owned)
764
765 def save_resource(self, resource: IRResource) -> IRResource:
766 if resource.is_active():
767 self.saved_resources[resource.rkey] = resource
768
769 return resource
770
771 def save_host(self, host: IRHost) -> None:
772 extant_host = self.hosts.get(host.name, None)
773 is_valid = True
774
775 if extant_host:
776 self.post_error(
777 "Duplicate Host %s; keeping definition from %s" % (host.name, extant_host.location)
778 )
779 is_valid = False
780
781 if is_valid:
782 self.hosts[host.name] = host
783
784 # Get saved hosts.
785 def get_hosts(self) -> List[IRHost]:
786 return list(self.hosts.values())
787
788 # Save secrets from our aconf.
789 def save_secret_info(self, aconf):
790 aconf_secrets = aconf.get_config("secrets") or {}
791 self.logger.debug(f"IR: aconf has secrets: {aconf_secrets.keys()}")
792
793 for secret_key, aconf_secret in aconf_secrets.items():
794 # Ignore anything that doesn't at least have a public half.
795 #
796 # (We include 'user_key' here because ACME private keys use that, and they
797 # should not generate errors.)
798 # (We include 'crl_pem' here because CRL secrets use that, and they
799 # should not generate errors.)
800 if (
801 aconf_secret.get("tls_crt")
802 or aconf_secret.get("cert-chain_pem")
803 or aconf_secret.get("user_key")
804 or aconf_secret.get("crl_pem")
805 ):
806 secret_info = SecretInfo.from_aconf_secret(aconf_secret)
807 secret_name = secret_info.name
808 secret_namespace = secret_info.namespace
809
810 self.logger.debug(
811 'saving "%s.%s" (from %s) in secret_info',
812 secret_name,
813 secret_namespace,
814 secret_key,
815 )
816 self.secret_info[f"{secret_name}.{secret_namespace}"] = secret_info
817 else:
818 self.logger.debug(
819 "not saving secret_info from %s because there is no public half", secret_key
820 )
821
822 def save_tls_context(self, ctx: IRTLSContext) -> None:
823 extant_ctx = self.tls_contexts.get(ctx.name, None)
824 is_valid = True
825
826 if extant_ctx:
827 self.post_error(
828 "Duplicate TLSContext %s; keeping definition from %s"
829 % (ctx.name, extant_ctx.location)
830 )
831 is_valid = False
832
833 if ctx.get("redirect_cleartext_from", None) is not None:
834 if self.redirect_cleartext_from is None:
835 self.redirect_cleartext_from = ctx.redirect_cleartext_from
836 else:
837 if self.redirect_cleartext_from != ctx.redirect_cleartext_from:
838 self.post_error(
839 "TLSContext: %s; configured conflicting redirect_from port: %s"
840 % (ctx.name, ctx.redirect_cleartext_from)
841 )
842 is_valid = False
843
844 if is_valid:
845 self.tls_contexts[ctx.name] = ctx
846
847 def get_resolver(self, name: str) -> Optional[IRServiceResolver]:
848 return self.resolvers.get(name, None)
849
850 def add_resolver(self, resolver: IRServiceResolver) -> None:
851 self.resolvers[resolver.name] = resolver
852
853 def has_tls_context(self, name: str) -> bool:
854 return bool(self.get_tls_context(name))
855
856 def get_tls_context(self, name: str) -> Optional[IRTLSContext]:
857 return self.tls_contexts.get(name, None)
858
859 def get_tls_contexts(self) -> ValuesView[IRTLSContext]:
860 return self.tls_contexts.values()
861
862 def resolve_secret(self, resource: IRResource, secret_name: str, namespace: str):
863 # OK. Do we already have a SavedSecret for this?
864 ss_key = f"{secret_name}.{namespace}"
865
866 ss = self.saved_secrets.get(ss_key, None)
867
868 if ss:
869 # Done. Return it.
870 self.logger.debug(f"resolve_secret {ss_key}: using cached SavedSecret")
871 self.secret_handler.still_needed(resource, secret_name, namespace)
872 return ss
873
874 # OK, do we have a secret_info for it??
875 # self.logger.debug(f"resolve_secret {ss_key}: checking secret_info")
876
877 secret_info = self.secret_info.get(ss_key, None)
878
879 if secret_info:
880 self.logger.debug(f"resolve_secret {ss_key}: found secret_info")
881 self.secret_handler.still_needed(resource, secret_name, namespace)
882 else:
883 # No secret_info, so ask the secret_handler to find us one.
884 self.logger.debug(f"resolve_secret {ss_key}: no secret_info, asking handler to load")
885 secret_info = self.secret_handler.load_secret(resource, secret_name, namespace)
886
887 if not secret_info:
888 self.logger.error(f"Secret {ss_key} unknown")
889
890 ss = SavedSecret(secret_name, namespace, None, None, None, None, None)
891 else:
892 self.logger.debug(f"resolve_secret {ss_key}: found secret, asking handler to cache")
893
894 # OK, we got a secret_info. Cache that using the secret handler.
895 ss = self.secret_handler.cache_secret(resource, secret_info)
896
897 # Save this for next time.
898 self.saved_secrets[secret_name] = ss
899 return ss
900
901 def resolve_resolver(
902 self, cluster: IRCluster, resolver_name: Optional[str]
903 ) -> IRServiceResolver:
904 # Which resolver should we use?
905 if not resolver_name:
906 resolver_name = self.ambassador_module.get("resolver", "kubernetes-service")
907
908 # Casting to str is OK because the Ambassador module's resolver must be a string,
909 # so all the paths for resolver_name land with it being a string.
910 resolver = self.get_resolver(typecast(str, resolver_name))
911 assert resolver is not None
912 return resolver
913
914 def resolve_targets(
915 self,
916 cluster: IRCluster,
917 resolver_name: Optional[str],
918 hostname: str,
919 namespace: str,
920 port: int,
921 ) -> Optional[SvcEndpointSet]:
922 # Is the host already an IP address?
923 is_ip_address = False
924
925 try:
926 x = ip_address(hostname)
927 is_ip_address = True
928 except ValueError:
929 pass
930
931 if is_ip_address:
932 # Already an IP address, great.
933 self.logger.debug(f"cluster {cluster.name}: {hostname} is already an IP address")
934
935 return [{"ip": hostname, "port": port, "target_kind": "IPaddr"}]
936
937 resolver = self.resolve_resolver(cluster, resolver_name)
938
939 # It should not be possible for resolver to be unset here.
940 if not resolver:
941 self.post_error(
942 f"cluster {cluster.name} has invalid resolver {resolver_name}?", rkey=cluster.rkey
943 )
944 return None
945
946 # OK, ask the resolver for the target list. Understanding the mechanics of resolution
947 # and the load balancer policy and all that is up to the resolver.
948 return resolver.resolve(self, cluster, hostname, namespace, port)
949
950 def save_filter(self, resource: IRFilter, already_saved=False) -> None:
951 if resource.is_active():
952 if not already_saved:
953 resource = typecast(IRFilter, self.save_resource(resource))
954
955 self.filters.append(resource)
956
957 def walk_saved_resources(self, aconf, method_name):
958 for res in self.saved_resources.values():
959 getattr(res, method_name)(self, aconf)
960
961 def save_listener(self, listener: IRListener) -> None:
962 listener_key = listener.bind_to()
963
964 extant_listener = self.listeners.get(listener_key, None)
965 is_valid = True
966 if extant_listener:
967 err_msg = (
968 f"Duplicate listener {listener.name} on {listener.socket_protocol.lower()}://{listener.bind_address}:{listener.port};"
969 f" keeping definition from {extant_listener.location}"
970 )
971 self.post_error(err_msg)
972 is_valid = False
973
974 if is_valid:
975 self.listeners[listener_key] = listener
976
977 def add_mapping(self, aconf: Config, mapping: IRBaseMapping) -> Optional[IRBaseMappingGroup]:
978 mapping.check_status()
979
980 if mapping.is_active():
981 if mapping.group_id not in self.groups:
982 # Is this group in our external cache?
983 group_key = mapping.group_class().key_for_id(mapping.group_id)
984 group = self.cache_fetch(group_key)
985
986 if group is not None:
987 self.logger.debug(f"IR: got group from cache for {mapping.name}")
988 else:
989 self.logger.debug(f"IR: synthesizing group for {mapping.name}")
990 group_name = "GROUP: %s" % mapping.name
991 group_class = mapping.group_class()
992 group = group_class(
993 ir=self,
994 aconf=aconf,
995 location=mapping.location,
996 name=group_name,
997 mapping=mapping,
998 )
999
1000 # There's no way group can be anything but a non-None IRBaseMappingGroup
1001 # here. assert() that so that mypy understands it.
1002 assert isinstance(group, IRBaseMappingGroup) # for mypy
1003 self.groups[group.group_id] = group
1004 else:
1005 self.logger.debug(f"IR: already have group for {mapping.name}")
1006 group = self.groups[mapping.group_id]
1007 group.add_mapping(aconf, mapping)
1008
1009 self.cache_add(mapping)
1010 self.cache_add(group)
1011 self.cache_link(mapping, group)
1012
1013 return group
1014 else:
1015 return None
1016
1017 def ordered_groups(self) -> Iterable[IRBaseMappingGroup]:
1018 return reversed(sorted(self.groups.values(), key=lambda x: x["group_weight"]))
1019
1020 def has_cluster(self, name: str) -> bool:
1021 return name in self.clusters
1022
1023 def get_cluster(self, name: str) -> Optional[IRCluster]:
1024 return self.clusters.get(name, None)
1025
1026 def add_cluster(self, cluster: IRCluster) -> IRCluster:
1027 if not self.has_cluster(cluster.name):
1028 self.logger.debug("IR: add_cluster: new cluster %s" % cluster.name)
1029 self.clusters[cluster.name] = cluster
1030
1031 if cluster.is_edge_stack_sidecar():
1032 self.logger.debug(f"IR: cluster {cluster.name} is the sidecar cluster name")
1033 self.sidecar_cluster_name = cluster.name
1034 else:
1035 self.logger.debug(
1036 "IR: add_cluster: extant cluster %s (%s)"
1037 % (cluster.name, cluster.get("envoy_name", "-"))
1038 )
1039
1040 return self.clusters[cluster.name]
1041
1042 def merge_cluster(self, cluster: IRCluster) -> bool:
1043 extant = self.get_cluster(cluster.name)
1044
1045 if extant:
1046 return extant.merge(cluster)
1047 else:
1048 self.add_cluster(cluster)
1049 return True
1050
1051 def has_grpc_service(self, name: str) -> bool:
1052 return name in self.grpc_services
1053
1054 def add_grpc_service(self, name: str, cluster: IRCluster) -> IRCluster:
1055 if not self.has_grpc_service(name):
1056 if not self.has_cluster(cluster.name):
1057 self.clusters[cluster.name] = cluster
1058
1059 self.grpc_services[name] = cluster
1060
1061 return self.grpc_services[name]
1062
1063 def as_dict(self) -> Dict[str, Any]:
1064 od = {
1065 "identity": {
1066 "ambassador_id": self.ambassador_id,
1067 "ambassador_namespace": self.ambassador_namespace,
1068 "ambassador_nodename": self.ambassador_nodename,
1069 },
1070 "ambassador": self.ambassador_module.as_dict(),
1071 "clusters": {
1072 cluster_name: cluster.as_dict() for cluster_name, cluster in self.clusters.items()
1073 },
1074 "grpc_services": {
1075 svc_name: cluster.as_dict() for svc_name, cluster in self.grpc_services.items()
1076 },
1077 "hosts": [host.as_dict() for host in self.hosts.values()],
1078 "listeners": [self.listeners[x].as_dict() for x in sorted(self.listeners.keys())],
1079 "filters": [filt.as_dict() for filt in self.filters],
1080 "groups": [group.as_dict() for group in self.ordered_groups()],
1081 "tls_contexts": [context.as_dict() for context in self.tls_contexts.values()],
1082 "services": self.services,
1083 "k8s_status_updates": self.k8s_status_updates,
1084 }
1085
1086 if self.log_services:
1087 od["log_services"] = [srv.as_dict() for srv in self.log_services.values()]
1088
1089 if self.tracing:
1090 od["tracing"] = self.tracing.as_dict()
1091
1092 if self.ratelimit:
1093 od["ratelimit"] = self.ratelimit.as_dict()
1094
1095 return od
1096
1097 def as_json(self) -> str:
1098 return dump_json(self.as_dict(), pretty=True)
1099
1100 def features(self) -> Dict[str, Any]:
1101 od: Dict[str, Union[bool, int, Optional[str], Dict]] = {}
1102
1103 if self.aconf.helm_chart:
1104 od["helm_chart"] = self.aconf.helm_chart
1105 od["managed_by"] = self.aconf.pod_labels.get("app.kubernetes.io/managed-by", "")
1106
1107 tls_termination_count = 0 # TLS termination contexts
1108 tls_origination_count = 0 # TLS origination contexts
1109 tls_crl_file_count = 0 # CRL files used
1110
1111 using_tls_module = False
1112 using_tls_contexts = False
1113
1114 for ctx in self.get_tls_contexts():
1115 if ctx:
1116 secret_info = ctx.get("secret_info", {})
1117
1118 if secret_info:
1119 using_tls_contexts = True
1120
1121 if secret_info.get("certificate_chain_file", None):
1122 tls_termination_count += 1
1123
1124 if secret_info.get("cacert_chain_file", None):
1125 tls_origination_count += 1
1126
1127 if secret_info.get("crl_file", None):
1128 tls_crl_file_count += 1
1129
1130 if ctx.get("_legacy", False):
1131 using_tls_module = True
1132
1133 od["tls_using_module"] = using_tls_module
1134 od["tls_using_contexts"] = using_tls_contexts
1135 od["tls_termination_count"] = tls_termination_count
1136 od["tls_origination_count"] = tls_origination_count
1137 od["tls_crl_file_count"] = tls_crl_file_count
1138
1139 for key in ["diagnostics", "liveness_probe", "readiness_probe", "statsd"]:
1140 od[key] = self.ambassador_module.get(key, {}).get("enabled", False)
1141
1142 for key in [
1143 "use_proxy_proto",
1144 "use_remote_address",
1145 "x_forwarded_proto_redirect",
1146 "enable_http10",
1147 "add_linkerd_headers",
1148 "use_ambassador_namespace_for_service_resolution",
1149 "proper_case",
1150 "preserve_external_request_id",
1151 ]:
1152 od[key] = self.ambassador_module.get(key, False)
1153
1154 od["service_resource_total"] = len(list(self.services.keys()))
1155
1156 od["listener_idle_timeout_ms"] = self.ambassador_module.get(
1157 "listener_idle_timeout_ms", None
1158 )
1159 od["headers_with_underscores_action"] = self.ambassador_module.get(
1160 "headers_with_underscores_action", None
1161 )
1162 od["max_request_headers_kb"] = self.ambassador_module.get("max_request_headers_kb", None)
1163
1164 od["server_name"] = bool(self.ambassador_module.server_name != "envoy")
1165
1166 od["custom_ambassador_id"] = bool(self.ambassador_id != "default")
1167
1168 od["buffer_limit_bytes"] = self.ambassador_module.get("buffer_limit_bytes", None)
1169
1170 default_port = (
1171 Constants.SERVICE_PORT_HTTPS if tls_termination_count else Constants.SERVICE_PORT_HTTP
1172 )
1173
1174 od["custom_listener_port"] = bool(self.ambassador_module.service_port != default_port)
1175
1176 od["allow_chunked_length"] = self.ambassador_module.get("allow_chunked_length", None)
1177
1178 cluster_count = 0
1179 cluster_grpc_count = 0 # clusters using GRPC upstream
1180 cluster_http_count = 0 # clusters using HTTP or HTTPS upstream
1181 cluster_tls_count = 0 # clusters using TLS origination
1182
1183 cluster_routing_kube_count = 0 # clusters routing using kube
1184 cluster_routing_envoy_rr_count = 0 # clusters routing using envoy round robin
1185 cluster_routing_envoy_rh_count = 0 # clusters routing using envoy ring hash
1186 cluster_routing_envoy_maglev_count = 0 # clusters routing using envoy maglev
1187 cluster_routing_envoy_lr_count = 0 # clusters routing using envoy least request
1188
1189 endpoint_grpc_count = 0 # endpoints using GRPC upstream
1190 endpoint_http_count = 0 # endpoints using HTTP/HTTPS upstream
1191 endpoint_tls_count = 0 # endpoints using TLS origination
1192
1193 endpoint_routing_kube_count = 0 # endpoints Kube is routing to
1194 endpoint_routing_envoy_rr_count = 0 # endpoints Envoy round robin is routing to
1195 endpoint_routing_envoy_rh_count = 0 # endpoints Envoy ring hash is routing to
1196 endpoint_routing_envoy_maglev_count = 0 # endpoints Envoy maglev is routing to
1197 endpoint_routing_envoy_lr_count = 0 # endpoints Envoy least request is routing to
1198
1199 for cluster in self.clusters.values():
1200 cluster_count += 1
1201 using_tls = False
1202 using_http = False
1203 using_grpc = False
1204
1205 lb_type = "kube"
1206
1207 if cluster.get("enable_endpoints", False):
1208 lb_type = cluster.get("lb_type", "round_robin")
1209
1210 if lb_type == "kube":
1211 cluster_routing_kube_count += 1
1212 elif lb_type == "ring_hash":
1213 cluster_routing_envoy_rh_count += 1
1214 elif lb_type == "maglev":
1215 cluster_routing_envoy_maglev_count += 1
1216 elif lb_type == "least_request":
1217 cluster_routing_envoy_lr_count += 1
1218 else:
1219 cluster_routing_envoy_rr_count += 1
1220
1221 if cluster.get("tls_context", None):
1222 using_tls = True
1223 cluster_tls_count += 1
1224
1225 if cluster.get("grpc", False):
1226 using_grpc = True
1227 cluster_grpc_count += 1
1228 else:
1229 using_http = True
1230 cluster_http_count += 1
1231
1232 cluster_endpoints = cluster.urls if (lb_type == "kube") else cluster.get("targets", [])
1233
1234 # Paranoia, really.
1235 if not cluster_endpoints:
1236 cluster_endpoints = []
1237
1238 num_endpoints = len(cluster_endpoints)
1239
1240 # self.logger.debug(f'cluster {cluster.name}: lb_type {lb_type}, endpoints {cluster_endpoints} ({num_endpoints})')
1241
1242 if using_tls:
1243 endpoint_tls_count += num_endpoints
1244
1245 if using_http:
1246 endpoint_http_count += num_endpoints
1247
1248 if using_grpc:
1249 endpoint_grpc_count += num_endpoints
1250
1251 if lb_type == "kube":
1252 endpoint_routing_kube_count += num_endpoints
1253 elif lb_type == "ring_hash":
1254 endpoint_routing_envoy_rh_count += num_endpoints
1255 elif lb_type == "maglev":
1256 endpoint_routing_envoy_maglev_count += num_endpoints
1257 elif lb_type == "least_request":
1258 endpoint_routing_envoy_lr_count += num_endpoints
1259 else:
1260 endpoint_routing_envoy_rr_count += num_endpoints
1261
1262 od["cluster_count"] = cluster_count
1263 od["cluster_grpc_count"] = cluster_grpc_count
1264 od["cluster_http_count"] = cluster_http_count
1265 od["cluster_tls_count"] = cluster_tls_count
1266 od["cluster_routing_kube_count"] = cluster_routing_kube_count
1267 od["cluster_routing_envoy_rr_count"] = cluster_routing_envoy_rr_count
1268 od["cluster_routing_envoy_rh_count"] = cluster_routing_envoy_rh_count
1269 od["cluster_routing_envoy_maglev_count"] = cluster_routing_envoy_maglev_count
1270 od["cluster_routing_envoy_lr_count"] = cluster_routing_envoy_lr_count
1271
1272 od["endpoint_routing"] = Config.enable_endpoints
1273
1274 od["endpoint_grpc_count"] = endpoint_grpc_count
1275 od["endpoint_http_count"] = endpoint_http_count
1276 od["endpoint_tls_count"] = endpoint_tls_count
1277 od["endpoint_routing_kube_count"] = endpoint_routing_kube_count
1278 od["endpoint_routing_envoy_rr_count"] = endpoint_routing_envoy_rr_count
1279 od["endpoint_routing_envoy_rh_count"] = endpoint_routing_envoy_rh_count
1280 od["endpoint_routing_envoy_maglev_count"] = endpoint_routing_envoy_maglev_count
1281 od["endpoint_routing_envoy_lr_count"] = endpoint_routing_envoy_lr_count
1282
1283 od["cluster_ingress_count"] = 0 # Provided for backward compatibility only.
1284 od["knative_ingress_count"] = self.aconf.get_count("knative_ingress")
1285
1286 od["k8s_ingress_count"] = self.aconf.get_count("k8s_ingress")
1287 od["k8s_ingress_class_count"] = self.aconf.get_count("k8s_ingress_class")
1288
1289 extauth = False
1290 extauth_proto: Optional[str] = None
1291 extauth_allow_body = False
1292 extauth_host_count = 0
1293
1294 ratelimit = False
1295 ratelimit_data_plane_proto = False
1296 ratelimit_custom_domain = False
1297
1298 tracing = False
1299 tracing_driver: Optional[str] = None
1300
1301 for filter in self.filters:
1302 if filter.kind == "IRAuth":
1303 extauth = True
1304 extauth_proto = filter.get("proto", "http")
1305 extauth_allow_body = filter.get("allow_request_body", False)
1306 extauth_host_count = len(filter.hosts.keys())
1307
1308 if self.ratelimit:
1309 ratelimit = True
1310 ratelimit_data_plane_proto = self.ratelimit.get("data_plane_proto", False)
1311 ratelimit_custom_domain = bool(self.ratelimit.domain != "ambassador")
1312
1313 if self.tracing:
1314 tracing = True
1315 tracing_driver = self.tracing.driver
1316
1317 od["extauth"] = extauth
1318 od["extauth_proto"] = extauth_proto
1319 od["extauth_allow_body"] = extauth_allow_body
1320 od["extauth_host_count"] = extauth_host_count
1321 od["ratelimit"] = ratelimit
1322 od["ratelimit_data_plane_proto"] = ratelimit_data_plane_proto
1323 od["ratelimit_custom_domain"] = ratelimit_custom_domain
1324 od["tracing"] = tracing
1325 od["tracing_driver"] = tracing_driver
1326
1327 group_count = 0
1328 group_http_count = 0 # HTTPMappingGroups
1329 group_tcp_count = 0 # TCPMappingGroups
1330 group_precedence_count = 0 # groups using explicit precedence
1331 group_header_match_count = 0 # groups using header matches
1332 group_regex_header_count = 0 # groups using regex header matches
1333 group_regex_prefix_count = 0 # groups using regex prefix matches
1334 group_shadow_count = 0 # groups using shadows
1335 group_shadow_weighted_count = 0 # groups using shadows with non-100% weights
1336 group_host_redirect_count = 0 # groups using host_redirect
1337 group_host_rewrite_count = 0 # groups using host_rewrite
1338 group_canary_count = 0 # groups coalescing multiple mappings
1339 group_resolver_kube_service = 0 # groups using the KubernetesServiceResolver
1340 group_resolver_kube_endpoint = 0 # groups using the KubernetesServiceResolver
1341 group_resolver_consul = 0 # groups using the ConsulResolver
1342 mapping_count = 0 # total mappings
1343
1344 for group in self.ordered_groups():
1345 group_count += 1
1346
1347 if group.get("kind", "IRHTTPMappingGroup") == "IRTCPMappingGroup":
1348 group_tcp_count += 1
1349 else:
1350 group_http_count += 1
1351
1352 if group.get("precedence", 0) != 0:
1353 group_precedence_count += 1
1354
1355 using_headers = False
1356 using_regex_headers = False
1357
1358 for header in group.get("headers", []):
1359 using_headers = True
1360
1361 if header["regex"]:
1362 using_regex_headers = True
1363 break
1364
1365 if using_headers:
1366 group_header_match_count += 1
1367
1368 if using_regex_headers:
1369 group_regex_header_count += 1
1370
1371 if len(group.mappings) > 1:
1372 group_canary_count += 1
1373
1374 mapping_count += len(group.mappings)
1375
1376 if group.get("shadows", []):
1377 group_shadow_count += 1
1378
1379 if group.get("weight", 100) != 100:
1380 group_shadow_weighted_count += 1
1381
1382 if group.get("host_redirect", {}):
1383 group_host_redirect_count += 1
1384
1385 if group.get("host_rewrite", None):
1386 group_host_rewrite_count += 1
1387
1388 res_name = group.get(
1389 "resolver", self.ambassador_module.get("resolver", "kubernetes-service")
1390 )
1391 resolver = self.get_resolver(res_name)
1392
1393 if resolver:
1394 if resolver.kind == "KubernetesServiceResolver":
1395 group_resolver_kube_service += 1
1396 elif resolver.kind == "KubernetesEndpoinhResolver":
1397 group_resolver_kube_endpoint += 1
1398 elif resolver.kind == "ConsulResolver":
1399 group_resolver_consul += 1
1400
1401 od["group_count"] = group_count
1402 od["group_http_count"] = group_http_count
1403 od["group_tcp_count"] = group_tcp_count
1404 od["group_precedence_count"] = group_precedence_count
1405 od["group_header_match_count"] = group_header_match_count
1406 od["group_regex_header_count"] = group_regex_header_count
1407 od["group_regex_prefix_count"] = group_regex_prefix_count
1408 od["group_shadow_count"] = group_shadow_count
1409 od["group_shadow_weighted_count"] = group_shadow_weighted_count
1410 od["group_host_redirect_count"] = group_host_redirect_count
1411 od["group_host_rewrite_count"] = group_host_rewrite_count
1412 od["group_canary_count"] = group_canary_count
1413 od["group_resolver_kube_service"] = group_resolver_kube_service
1414 od["group_resolver_kube_endpoint"] = group_resolver_kube_endpoint
1415 od["group_resolver_consul"] = group_resolver_consul
1416 od["mapping_count"] = mapping_count
1417
1418 od["listener_count"] = len(self.listeners)
1419 od["host_count"] = len(self.hosts)
1420
1421 invalid_counts: Dict[str, int] = {}
1422
1423 if self.invalid:
1424 for obj in self.invalid:
1425 kind = obj.get("kind") or "(unknown)"
1426
1427 invalid_counts[kind] = invalid_counts.get(kind, 0) + 1
1428
1429 od["invalid_counts"] = invalid_counts
1430
1431 # Fast reconfiguration information is supplied in check_scout in diagd.py.
1432
1433 return od
View as plain text