...

Text file src/github.com/emissary-ingress/emissary/v3/python/ambassador/ir/ir.py

Documentation: github.com/emissary-ingress/emissary/v3/python/ambassador/ir

     1# Copyright 2018 Datawire. All rights reserved.
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#     http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License
    14import hashlib
    15import logging
    16import os
    17from ipaddress import ip_address
    18from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, ValuesView
    19from typing import cast as typecast
    20
    21from ..cache import Cache, NullCache
    22from ..config import Config
    23from ..constants import Constants
    24from ..fetch import ResourceFetcher
    25from ..utils import RichStatus, SavedSecret, SecretHandler, SecretInfo, dump_json, parse_bool
    26from ..VERSION import Commit, Version
    27from .irambassador import IRAmbassador
    28from .irauth import IRAuth
    29from .irbasemapping import IRBaseMapping
    30from .irbasemappinggroup import IRBaseMappingGroup
    31from .ircluster import IRCluster
    32from .irerrorresponse import IRErrorResponse
    33from .irfilter import IRFilter
    34from .irhost import HostFactory, IRHost
    35from .irhttpmapping import IRHTTPMapping
    36from .irlistener import IRListener, ListenerFactory
    37from .irlogservice import IRLogService, IRLogServiceFactory
    38from .irmappingfactory import MappingFactory
    39from .irratelimit import IRRateLimit
    40from .irresource import IRResource
    41from .irserviceresolver import IRServiceResolver, IRServiceResolverFactory, SvcEndpointSet
    42from .irtls import IRAmbassadorTLS, TLSModuleFactory
    43from .irtlscontext import IRTLSContext, TLSContextFactory
    44from .irtracing import IRTracing
    45
    46#############################################################################
    47## ir.py -- the Ambassador Intermediate Representation (IR)
    48##
    49## After getting an ambassador.Config, you can create an ambassador.IR. The
    50## IR is the basis for everything else: you can use it to configure an Envoy
    51## or to run diagnostics.
    52##
    53## IRs are not meant to be terribly long-lived: if anything at all changes
    54## in your world, you should toss the IR and make a new one. In particular,
    55## it is _absolutely not OK_ to try to edit the contents of an IR and then
    56## re-run any of the generators -- IRs are to be considered immutable once
    57## created.
    58##
    59## This goes double in the incremental-reconfiguration world: the IRResources
    60## that make up the IR all point back to their IR to make life easier on the
    61## generators, so - to ease the transition to the incremental-reconfiguration
    62## world - right now we reset the IR pointer when we pull these objects out
    63## the cache. In the future this should be fixed, but at present, you can
    64## really mess up your world if you try to have two active IRs sharing a
    65## cache.
    66
    67
    68IRFileChecker = Callable[[str], bool]
    69
    70
    71class IR:
    72    ambassador_module: IRAmbassador
    73    ambassador_id: str
    74    ambassador_namespace: str
    75    ambassador_nodename: str
    76    aconf: Config
    77    cache: Cache
    78    clusters: Dict[str, IRCluster]
    79    agent_active: bool
    80    agent_service: Optional[str]
    81    agent_origination_ctx: Optional[IRTLSContext]
    82    edge_stack_allowed: bool
    83    file_checker: IRFileChecker
    84    filters: List[IRFilter]
    85    groups: Dict[str, IRBaseMappingGroup]
    86    grpc_services: Dict[str, IRCluster]
    87    hosts: Dict[str, IRHost]
    88    invalid: List[Dict]
    89    invalidate_groups_for: List[str]
    90    # The key for listeners is "{socket_protocol}-{bindaddr}-{port}" (see IRListener.bind_to())
    91    listeners: Dict[str, IRListener]
    92    log_services: Dict[str, IRLogService]
    93    ratelimit: Optional[IRRateLimit]
    94    redirect_cleartext_from: Optional[int]
    95    resolvers: Dict[str, IRServiceResolver]
    96    router_config: Dict[str, Any]
    97    saved_resources: Dict[str, IRResource]
    98    saved_secrets: Dict[str, SavedSecret]
    99    secret_handler: SecretHandler
   100    secret_root: str
   101    sidecar_cluster_name: Optional[str]
   102    tls_contexts: Dict[str, IRTLSContext]
   103    tls_module: Optional[IRAmbassadorTLS]
   104    tracing: Optional[IRTracing]
   105
   106    @classmethod
   107    def check_deltas(
   108        cls, logger: logging.Logger, fetcher: "ResourceFetcher", cache: Optional[Cache] = None
   109    ) -> Tuple[str, bool, List[str]]:
   110        # Assume that this should be marked as a complete reconfigure, and that we'll be
   111        # resetting the cache.
   112        config_type = "complete"
   113        reset_cache = True
   114
   115        # to_invalidate is the list of things we can invalidate right now. If we're
   116        # running with a cache, every valid Delta will get its cache key added into
   117        # to_invalidate; after we finish looking at all the deltas, we'll invalidate
   118        # all the entries in this list.
   119        #
   120        # Mapping deltas, though, are more complex: not only must we invalidate the
   121        # Mapping, but we _also_ need to invalidate any cached Group that contains
   122        # the Mapping (otherwise, adding a new Mapping to a cached Group won't work).
   123        # This is messy, because the Delta doesn't have the information we need to
   124        # compute the Group's cache key.
   125        #
   126        # We deal with this by adding the cache keys of any Mapping deltas to the
   127        # invalidate_groups_for list, and then handing that to the IR so that the
   128        # MappingFactory can use it to do the right thing.
   129        #
   130        # "But wait," I hear you cry, "you're only checking Mappings and TCPMappings
   131        # right now anyway, so why bother separating these things?" That's because
   132        # we expect the use of the cache to broaden, so we'll just go ahead and do
   133        # this.
   134        to_invalidate: List[str] = []
   135        invalidate_groups_for: List[str] = []
   136
   137        # OK. If we don't have a cache, just skip all this crap.
   138        if cache is not None:
   139            # We have a cache. Start by assuming that we'll need to reset it,
   140            # unless there are no deltas at all.
   141            reset_cache = len(fetcher.deltas) > 0
   142
   143            # Next up: are there any deltas?
   144            if fetcher.deltas:
   145                # Yes. We're going to walk over them all and assemble a list
   146                # of things to delete and a count of errors while processing our
   147                # list.
   148
   149                delta_errors = 0
   150
   151                for delta in fetcher.deltas:
   152                    logger.debug(f"Delta: {delta}")
   153
   154                    # The "kind" of a Delta must be a string; assert that to make
   155                    # mypy happy.
   156
   157                    delta_kind = delta["kind"]
   158                    assert isinstance(delta_kind, str)
   159
   160                    # Only worry about Mappings and TCPMappings right now.
   161                    if (delta_kind == "Mapping") or (delta_kind == "TCPMapping"):
   162                        # XXX C'mon, mypy, is this cast really necessary?
   163                        metadata = typecast(Dict[str, str], delta.get("metadata", {}))
   164                        name = metadata.get("name", "")
   165                        namespace = metadata.get("namespace", "")
   166
   167                        if not name or not namespace:
   168                            # This is an error.
   169                            delta_errors += 1
   170
   171                            logger.error(f"Delta object needs name and namespace: {delta}")
   172                        else:
   173                            key = IRBaseMapping.make_cache_key(delta_kind, name, namespace)
   174                            to_invalidate.append(key)
   175
   176                            # If we're invalidating the Mapping, we need to invalidate its Group.
   177                            invalidate_groups_for.append(key)
   178
   179                # OK. If we have things to invalidate, and we have NO ERRORS...
   180                if to_invalidate and not delta_errors:
   181                    # ...then we can invalidate all those things instead of clearing the cache.
   182                    reset_cache = False
   183
   184                    for key in to_invalidate:
   185                        logger.debug(f"Delta: invalidating {key}")
   186                        cache.invalidate(key)
   187
   188            # When all is said and done, it's an incremental if we don't need to reset
   189            # the cache.
   190            if not reset_cache:
   191                config_type = "incremental"
   192
   193                # This is _not_ an incremental reconfigure. Reset the cache...
   194            else:
   195                # OK, we're doing an incremental reconfigure.
   196                config_type = "incremental"
   197
   198            cache.dump("Checking incoming deltas (reset_cache %s)", reset_cache)
   199
   200        return (config_type, reset_cache, invalidate_groups_for)
   201
   202    def __init__(
   203        self,
   204        aconf: Config,
   205        secret_handler: SecretHandler,
   206        file_checker: Optional[IRFileChecker] = None,
   207        logger: Optional[logging.Logger] = None,
   208        invalidate_groups_for: Optional[List[str]] = None,
   209        cache: Optional[Cache] = None,
   210        watch_only=False,
   211    ) -> None:
   212        # Initialize the basics...
   213        self.ambassador_id = Config.ambassador_id
   214        self.ambassador_namespace = Config.ambassador_namespace
   215        self.ambassador_nodename = aconf.ambassador_nodename
   216        self.statsd = aconf.statsd
   217
   218        # ...then make sure we have a logger...
   219        self.logger = logger or logging.getLogger("ambassador.ir")
   220
   221        # ...then make sure we have a cache (which might be a NullCache)...
   222        self.cache = cache or NullCache(self.logger)
   223        self.invalidate_groups_for = invalidate_groups_for or []
   224
   225        # ...then, finally, grab all the invalid objects from the aconf. This is for metrics later.
   226        self.invalid = aconf.invalid
   227
   228        self.cache.dump("Fetcher")
   229
   230        # We're using setattr since since mypy complains about assigning directly to a method.
   231        secret_root = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
   232
   233        # This setattr business is because mypy seems to think that, since self.file_checker is
   234        # callable, any mention of self.file_checker must be a function call. Sigh.
   235        setattr(self, "file_checker", file_checker if file_checker is not None else os.path.isfile)
   236
   237        # The secret_handler is _required_.
   238        self.secret_handler = secret_handler
   239
   240        assert self.secret_handler, "Ambassador.IR requires a SecretHandler at initialization"
   241
   242        self.logger.debug("IR __init__:")
   243        self.logger.debug("IR: Version         %s built from commit %s" % (Version, Commit))
   244        self.logger.debug("IR: AMBASSADOR_ID   %s" % self.ambassador_id)
   245        self.logger.debug("IR: Namespace       %s" % self.ambassador_namespace)
   246        self.logger.debug("IR: Nodename        %s" % self.ambassador_nodename)
   247        self.logger.debug(
   248            "IR: Endpoints       %s" % "enabled" if Config.enable_endpoints else "disabled"
   249        )
   250
   251        self.logger.debug("IR: file checker:   %s" % getattr(self, "file_checker").__name__)
   252        self.logger.debug("IR: secret handler: %s" % type(self.secret_handler).__name__)
   253
   254        # First up: save the Config object. Its source map may be necessary later.
   255        self.aconf = aconf
   256
   257        # Next, we'll want a way to keep track of resources we end up working
   258        # with. It starts out empty.
   259        self.saved_resources = {}
   260
   261        # Also, we have no saved secret stuff yet...
   262        self.saved_secrets = {}
   263        self.secret_info: Dict[str, SecretInfo] = {}
   264
   265        # ...and the initial IR state is empty _except for k8s_status_updates_.
   266        #
   267        # Note that we use a map for clusters, not a list -- the reason is that
   268        # multiple mappings can use the same service, and we don't want multiple
   269        # clusters.
   270
   271        self.breakers = {}
   272        self.clusters = {}
   273        self.filters = []
   274        self.groups = {}
   275        self.grpc_services = {}
   276        self.hosts = {}
   277        # self.invalidate_groups_for is handled above.
   278        # self.k8s_status_updates is handled below.
   279        self.listeners = {}
   280        self.log_services = {}
   281        self.outliers = {}
   282        self.ratelimit = None
   283        self.redirect_cleartext_from = None
   284        self.resolvers = {}
   285        self.saved_secrets = {}
   286        self.secret_info = {}
   287        self.services = {}
   288        self.sidecar_cluster_name = None
   289        self.tls_contexts = {}
   290        self.tls_module = None
   291        self.tracing = None
   292
   293        # Copy k8s_status_updates from our aconf.
   294        self.k8s_status_updates = aconf.k8s_status_updates
   295
   296        # Check on the intercept agent and edge stack. Note that the Edge Stack touchfile is _not_
   297        # within $AMBASSADOR_CONFIG_BASE_DIR: it stays in /ambassador no matter what.
   298
   299        self.agent_active = os.environ.get("AGENT_SERVICE", None) != None
   300        # Allow an environment variable to state whether we're in Edge Stack. But keep the
   301        # existing condition as sufficient, so that there is less of a chance of breaking
   302        # things running in a container with this file present.
   303        self.edge_stack_allowed = parse_bool(
   304            os.environ.get("EDGE_STACK", "false")
   305        ) or os.path.exists("/ambassador/.edge_stack")
   306        self.agent_origination_ctx = None
   307
   308        # OK, time to get this show on the road. First things first: set up the
   309        # Ambassador module.
   310        #
   311        # The Ambassador module is special: it doesn't do anything in its setup() method, but
   312        # instead defers all its heavy lifting to its finalize() method. Why? Because we need
   313        # to create the Ambassador module very early to allow IRResource.lookup() to work, but
   314        # we need to go pull in secrets and such before we can get all the Ambassador-module
   315        # stuff fully set up.
   316        #
   317        # So. First, create the module.
   318        self.ambassador_module = typecast(
   319            IRAmbassador, self.save_resource(IRAmbassador(self, aconf))
   320        )
   321
   322        # Next, grab whatever information our aconf has about secrets...
   323        self.save_secret_info(aconf)
   324
   325        # ...and then it's on to default TLS stuff, both from the TLS module and from
   326        # any TLS contexts.
   327        #
   328        # XXX This feels like a hack -- shouldn't it be class-wide initialization
   329        # in TLSModule or TLSContext? So far it's the only place we need anything like
   330        # this though.
   331
   332        TLSModuleFactory.load_all(self, aconf)
   333        TLSContextFactory.load_all(self, aconf)
   334
   335        # After TLSContexts, grab Listeners...
   336        ListenerFactory.load_all(self, aconf)
   337
   338        # Now that we know all of the listeners, we can check to see if there are any shared bindings
   339        # accross protocols (TCP & UDP sharing same addres & port). When a TCP/HTTP listener binds
   340        # to the same address and port of the UPD/HTTP Listener then it will be marked as http3_enabled=True.
   341        # This causes the `alt-svc` header to be auto-injected into http responses on the TCP/HTTP responses.
   342        # The alt-service header notifies clients (browsers, curl, libraries) that they can upgrade
   343        # TCP connections to UDP (HTTP/3) connections.
   344        #
   345        # Note: at first glance it would seem this logic should sit inside the Listener class but
   346        # we wait until all the listeners are loaded so that we can check for the existance of a
   347        # "companion" TCP Listener. If a UDP listener was the first to be parsed then
   348        # we wouldn't know at that time. Thus we need to wait until after all of them have been loaded.
   349        udp_listeners = (l for l in self.listeners.values() if l.socket_protocol == "UDP")
   350        for udp_listener in udp_listeners:
   351            ## this matches the `listener.bind_to` for the tcp listener
   352            tcp_listener_key = f"tcp-{udp_listener.bind_address}-{udp_listener.port}"
   353            tcp_listener = self.listeners.get(tcp_listener_key, None)
   354
   355            if tcp_listener is not None:
   356                tcp_listener.http3_enabled = True
   357
   358                if "HTTP" in tcp_listener.protocolStack:
   359                    tcp_listener.http3_enabled = True
   360
   361        # ...then grab whatever we know about Hosts...
   362        HostFactory.load_all(self, aconf)
   363
   364        # ...then set up for the intercept agent, if that's a thing.
   365        self.agent_init(aconf)
   366
   367        # Finally, finalize all the Host stuff (including the !*@#&!* fallback context)...
   368        HostFactory.finalize(self, aconf)
   369
   370        # Now we can finalize the Ambassador module, to tidy up secrets et al. We do this
   371        # here so that secrets and TLS contexts are available.
   372        if not self.ambassador_module.finalize(self, aconf):
   373            # Uhoh.
   374            self.ambassador_module.set_active(False)  # This can't be good.
   375
   376        _activity_str = "watching" if watch_only else "starting"
   377        _mode_str = "OSS"
   378
   379        if self.agent_active:
   380            _mode_str = "Intercept Agent"
   381        elif self.edge_stack_allowed:
   382            _mode_str = "Edge Stack"
   383
   384        self.logger.debug(f"IR: {_activity_str} {_mode_str}")
   385
   386        # Next up, initialize our IRServiceResolvers...
   387        IRServiceResolverFactory.load_all(self, aconf)
   388
   389        # ...and then we can finalize the agent, if that's a thing.
   390        self.agent_finalize(aconf)
   391
   392        # Once here, if we're only watching, we're done.
   393        if watch_only:
   394            return
   395
   396        # REMEMBER FOR SAVING YOU NEED TO CALL save_resource!
   397        # THIS IS VERY IMPORTANT!
   398
   399        # Save circuit breakers, outliers, and services.
   400        self.breakers = aconf.get_config("CircuitBreaker") or {}
   401        self.outliers = aconf.get_config("OutlierDetection") or {}
   402        self.services = aconf.get_config("service") or {}
   403
   404        # Save tracing, ratelimit, and logging settings.
   405        self.tracing = typecast(IRTracing, self.save_resource(IRTracing(self, aconf)))
   406        self.ratelimit = typecast(IRRateLimit, self.save_resource(IRRateLimit(self, aconf)))
   407        IRLogServiceFactory.load_all(self, aconf)
   408
   409        # After the Ambassador and TLS modules are done, we need to set up the
   410        # filter chains. Note that order of the filters matters. Start with CORS,
   411        # so that preflights will work even for things behind auth.
   412
   413        self.save_filter(
   414            IRFilter(ir=self, aconf=aconf, rkey="ir.cors", kind="ir.cors", name="cors", config={})
   415        )
   416
   417        # Next is auth...
   418        self.save_filter(IRAuth(self, aconf))
   419
   420        # ...then the ratelimit filter...
   421        if self.ratelimit:
   422            self.save_filter(self.ratelimit, already_saved=True)
   423
   424        # ...and the error response filter...
   425        self.save_filter(
   426            IRErrorResponse(
   427                self,
   428                aconf,
   429                self.ambassador_module.get("error_response_overrides", None),
   430                referenced_by_obj=self.ambassador_module,
   431            )
   432        )
   433
   434        # ...and, finally, the barely-configurable router filter.
   435        router_config = {}
   436
   437        if self.tracing:
   438            router_config["start_child_span"] = True
   439
   440        self.save_filter(
   441            IRFilter(
   442                ir=self,
   443                aconf=aconf,
   444                rkey="ir.router",
   445                kind="ir.router",
   446                name="router",
   447                type="decoder",
   448                config=router_config,
   449            )
   450        )
   451
   452        # We would handle other modules here -- but guess what? There aren't any.
   453        # At this point ambassador, tls, and the deprecated auth module are all there
   454        # are, and they're handled above. So. At this point go sort out all the Mappings.
   455        MappingFactory.load_all(self, aconf)
   456
   457        self.walk_saved_resources(aconf, "add_mappings")
   458
   459        TLSModuleFactory.finalize(self, aconf)
   460        MappingFactory.finalize(self, aconf)
   461
   462        # We can't finalize the listeners until _after_ we have all the TCPMapping
   463        # information we might need, so that happens here.
   464        ListenerFactory.finalize(self, aconf)
   465
   466        # At this point we should know the full set of clusters, so we can generate
   467        # appropriate envoy names.
   468        #
   469        # Envoy cluster name generation happens in two steps. First, we check every
   470        # cluster and set the envoy name to the cluster name if it is short enough.
   471        # If it isn't, we group all of the long cluster names by a common prefix
   472        # and normalize them later.
   473        #
   474        # This ensures that:
   475        # - All IRCluster objects have an envoy_name
   476        # - All envoy_name fields are valid cluster names, ie: they are short enough
   477        collisions: Dict[str, List[str]] = {}
   478
   479        for name in sorted(self.clusters.keys()):
   480            if len(name) > 60:
   481                # Too long. Gather this cluster by name prefix and normalize
   482                # its name below.
   483                h = hashlib.new("sha1")
   484                h.update(name.encode("utf-8"))
   485                hd = h.hexdigest()[0:16].upper()
   486
   487                short_name = name[0:40] + "-" + hd
   488
   489                cluster = self.clusters[name]
   490                self.logger.debug(f"COLLISION: compress {name} to {short_name}")
   491
   492                collision_list = collisions.setdefault(short_name, [])
   493                collision_list.append(name)
   494            else:
   495                # Short enough, set the envoy name to the cluster name.
   496                self.clusters[name]["envoy_name"] = name
   497
   498        for short_name in sorted(collisions.keys()):
   499            name_list = collisions[short_name]
   500
   501            i = 0
   502
   503            for name in sorted(name_list):
   504                mangled_name = "%s-%d" % (short_name, i)
   505                i += 1
   506
   507                cluster = self.clusters[name]
   508                self.logger.debug("COLLISION: mangle %s => %s" % (name, mangled_name))
   509
   510                # We must not modify a cluster's name (nor its rkey, for that matter)
   511                # because our object caching implementation depends on stable object
   512                # names and keys. If we were to update it, we could lose track of an
   513                # existing object and accidentally create a duplicate (tested in
   514                # python/tests/test_cache.py test_long_cluster_1).
   515                #
   516                # Instead, the resulting IR must set envoy_name to the mangled name, which
   517                # is guaranteed to be valid in envoy configuration.
   518                #
   519                # An important consequence of this choice is that we must never read back
   520                # envoy config to create IRCluster config, since the cluster names are
   521                # not necessarily the same. This is currently fine, since we never use
   522                # envoy config as a source of truth - we leave that to the cluster annotations
   523                # and CRDs.
   524                #
   525                # Another important consideration is that when the cache is active, we need
   526                # to shred any cached cluster with this mangled_name, because the mangled_name
   527                # can change as new clusters appear! This is obviously not ideal.
   528                #
   529                # XXX This is doubly a hack because it's duplicating this magic format from
   530                # v2cluster.py and v3cluster.py.
   531                self.cache.invalidate(f"V2-{cluster.cache_key}")
   532                self.cache.invalidate(f"V3-{cluster.cache_key}")
   533                self.cache.dump(
   534                    "Invalidate clusters V2-%s, V3-%s", cluster.cache_key, cluster.cache_key
   535                )
   536
   537                # OK. Finally, we can update the envoy_name.
   538                cluster["envoy_name"] = mangled_name
   539                self.logger.debug("COLLISION: envoy_name %s" % cluster["envoy_name"])
   540
   541        # After we have the cluster names fixed up, go finalize filters.
   542        if self.tracing:
   543            self.tracing.finalize()
   544
   545        if self.ratelimit:
   546            self.ratelimit.finalize()
   547
   548        for filter in self.filters:
   549            filter.finalize()
   550
   551    # XXX Brutal hackery here! Probably this is a clue that Config and IR and such should have
   552    # a common container that can hold errors.
   553    def post_error(
   554        self,
   555        rc: Union[str, RichStatus],
   556        resource: Optional[IRResource] = None,
   557        rkey: Optional[str] = None,
   558        log_level=logging.INFO,
   559    ):
   560        self.aconf.post_error(rc, resource=resource, rkey=rkey, log_level=log_level)
   561
   562    def agent_init(self, aconf: Config) -> None:
   563        """
   564        Initialize as the Intercept Agent, if we're doing that.
   565
   566        THIS WHOLE METHOD NEEDS TO GO AWAY: instead, just configure the agent with CRDs as usual.
   567        However, that's just too painful to contemplate without `edgectl inject-agent`.
   568
   569        :param aconf: Config to work with
   570        :return: None
   571        """
   572
   573        # Intercept stuff is an Edge Stack thing.
   574        if not (self.edge_stack_allowed and self.agent_active):
   575            self.logger.debug("Intercept agent not active, skipping initialization")
   576            return
   577
   578        self.agent_service = os.environ.get("AGENT_SERVICE", None)
   579
   580        if self.agent_service is None:
   581            # This is technically impossible, but whatever.
   582            self.logger.info("Intercept agent active but no AGENT_SERVICE? skipping initialization")
   583            self.agent_active = False
   584            return
   585
   586        self.logger.debug(f"Intercept agent active for {self.agent_service}, initializing")
   587
   588        # We're going to either create a Host to terminate TLS, or to do cleartext. In neither
   589        # case will we do ACME. Set additionalPort to -1 so we don't grab 8080 in the TLS case.
   590        host_args: Dict[str, Any] = {
   591            "hostname": "*",
   592            "selector": {"matchLabels": {"intercept": self.agent_service}},
   593            "acmeProvider": {"authority": "none"},
   594            "requestPolicy": {
   595                "insecure": {
   596                    "additionalPort": -1,
   597                },
   598            },
   599        }
   600
   601        # Have they asked us to do TLS?
   602        agent_termination_secret = os.environ.get("AGENT_TLS_TERM_SECRET", None)
   603
   604        if agent_termination_secret:
   605            # Yup.
   606            host_args["tlsSecret"] = {"name": agent_termination_secret}
   607        else:
   608            # No termination secret, so do cleartext.
   609            host_args["requestPolicy"]["insecure"]["action"] = "Route"
   610
   611        host = IRHost(
   612            self,
   613            aconf,
   614            rkey=self.ambassador_module.rkey,
   615            location=self.ambassador_module.location,
   616            name="agent-host",
   617            **host_args,
   618        )
   619
   620        if host.is_active():
   621            host.referenced_by(self.ambassador_module)
   622            host.sourced_by(self.ambassador_module)
   623
   624            self.logger.debug(f"Intercept agent: saving host {host}")
   625            # self.logger.debug(host.as_json())
   626            self.save_host(host)
   627        else:
   628            self.logger.debug(f"Intercept agent: not saving inactive host {host}")
   629
   630        # How about originating TLS?
   631        agent_origination_secret = os.environ.get("AGENT_TLS_ORIG_SECRET", None)
   632
   633        if agent_origination_secret:
   634            # Uhhhh. Synthesize a TLSContext for this, I guess.
   635            #
   636            # XXX What if they already have a context with this name?
   637            ctx = IRTLSContext(
   638                self,
   639                aconf,
   640                rkey=self.ambassador_module.rkey,
   641                location=self.ambassador_module.location,
   642                name="agent-origination-context",
   643                secret=agent_origination_secret,
   644            )
   645
   646            ctx.referenced_by(self.ambassador_module)
   647            self.save_tls_context(ctx)
   648
   649            self.logger.debug(f"Intercept agent: saving origination TLSContext {ctx.name}")
   650            # self.logger.debug(ctx.as_json())
   651
   652            self.agent_origination_ctx = ctx
   653
   654    def agent_finalize(self, aconf) -> None:
   655        if not (self.edge_stack_allowed and self.agent_active):
   656            self.logger.debug(f"Intercept agent not active, skipping finalization")
   657            return
   658
   659        # self.logger.info(f"Intercept agent active for {self.agent_service}, finalizing")
   660
   661        # We don't want to listen on the default AES ports (8080, 8443) as that is likely to
   662        # conflict with the user's application running in the same Pod.
   663        agent_listen_port_str = os.environ.get("AGENT_LISTEN_PORT", None)
   664
   665        agent_grpc = os.environ.get("AGENT_ENABLE_GRPC", "false")
   666
   667        if agent_listen_port_str is None:
   668            self.ambassador_module.service_port = Constants.SERVICE_PORT_AGENT
   669        else:
   670            try:
   671                self.ambassador_module.service_port = int(agent_listen_port_str)
   672            except ValueError:
   673                self.post_error(f"Intercept agent listen port {agent_listen_port_str} is not valid")
   674                self.agent_active = False
   675                return
   676
   677        agent_port_str = os.environ.get("AGENT_PORT", None)
   678
   679        if agent_port_str is None:
   680            self.post_error("Intercept agent requires both AGENT_SERVICE and AGENT_PORT to be set")
   681            self.agent_active = False
   682            return
   683
   684        agent_port = -1
   685
   686        try:
   687            agent_port = int(agent_port_str)
   688        except:
   689            self.post_error(f"Intercept agent port {agent_port_str} is not valid")
   690            self.agent_active = False
   691            return
   692
   693        # self.logger.info(f"Intercept agent active for {self.agent_service}:{agent_port}, adding fallback mapping")
   694
   695        # XXX OMG this is a crock. Don't use precedence -1000000 for this, because otherwise Edge
   696        # Stack might decide it's the Edge Policy Console fallback mapping and force it to be
   697        # routed insecure. !*@&#*!@&#* We need per-mapping security settings.
   698        #
   699        # XXX What if they already have a mapping with this name?
   700
   701        ctx_name = None
   702
   703        if self.agent_origination_ctx:
   704            ctx_name = self.agent_origination_ctx.name
   705
   706        mapping = IRHTTPMapping(
   707            self,
   708            aconf,
   709            rkey=self.ambassador_module.rkey,
   710            location=self.ambassador_module.location,
   711            name="agent-fallback-mapping",
   712            metadata_labels={"ambassador_diag_class": "private"},
   713            prefix="/",
   714            rewrite="/",
   715            service=f"127.0.0.1:{agent_port}",
   716            grpc=agent_grpc,
   717            # Making sure we don't have shorter timeouts on intercepts than the original Mapping
   718            timeout_ms=60000,
   719            idle_timeout_ms=60000,
   720            tls=ctx_name,
   721            precedence=-999999,
   722        )  # No, really. See comment above.
   723
   724        mapping.referenced_by(self.ambassador_module)
   725        self.add_mapping(aconf, mapping)
   726
   727    def cache_fetch(self, key: str) -> Optional[IRResource]:
   728        """
   729        Fetch a key from our cache. If we get anything, make sure that its
   730        IR pointer is set back to us -- since the cache can easily outlive
   731        the IR, chances are pretty high that the object might've originally
   732        been part of a different IR.
   733
   734        Yes, this implies that trying to use the cache for multiple IRs at
   735        the same time is a Very Bad Idea.
   736        """
   737
   738        rsrc = self.cache[key]
   739
   740        # Did we get anything?
   741        if rsrc is not None:
   742            # By definition, anything the IR layer pulls from the cache must be
   743            # an IRResource.
   744            assert isinstance(rsrc, IRResource)
   745
   746            # Since it's an IRResource, it has a pointer to the IR. Reset that.
   747            rsrc.ir = self
   748
   749        return rsrc
   750
   751    def cache_add(self, rsrc: IRResource) -> None:
   752        """
   753        Add an IRResource to our cache. Mostly this is here to let mypy check
   754        that everything cached by the IR layer is an IRResource.
   755        """
   756        self.cache.add(rsrc)
   757
   758    def cache_link(self, owner: IRResource, owned: IRResource) -> None:
   759        """
   760        Link two IRResources in our cache. Mostly this is here to let mypy check
   761        that everything linked by the IR layer is an IRResource.
   762        """
   763        self.cache.link(owner, owned)
   764
   765    def save_resource(self, resource: IRResource) -> IRResource:
   766        if resource.is_active():
   767            self.saved_resources[resource.rkey] = resource
   768
   769        return resource
   770
   771    def save_host(self, host: IRHost) -> None:
   772        extant_host = self.hosts.get(host.name, None)
   773        is_valid = True
   774
   775        if extant_host:
   776            self.post_error(
   777                "Duplicate Host %s; keeping definition from %s" % (host.name, extant_host.location)
   778            )
   779            is_valid = False
   780
   781        if is_valid:
   782            self.hosts[host.name] = host
   783
   784    # Get saved hosts.
   785    def get_hosts(self) -> List[IRHost]:
   786        return list(self.hosts.values())
   787
   788    # Save secrets from our aconf.
   789    def save_secret_info(self, aconf):
   790        aconf_secrets = aconf.get_config("secrets") or {}
   791        self.logger.debug(f"IR: aconf has secrets: {aconf_secrets.keys()}")
   792
   793        for secret_key, aconf_secret in aconf_secrets.items():
   794            # Ignore anything that doesn't at least have a public half.
   795            #
   796            # (We include 'user_key' here because ACME private keys use that, and they
   797            # should not generate errors.)
   798            # (We include 'crl_pem' here because CRL secrets use that, and they
   799            # should not generate errors.)
   800            if (
   801                aconf_secret.get("tls_crt")
   802                or aconf_secret.get("cert-chain_pem")
   803                or aconf_secret.get("user_key")
   804                or aconf_secret.get("crl_pem")
   805            ):
   806                secret_info = SecretInfo.from_aconf_secret(aconf_secret)
   807                secret_name = secret_info.name
   808                secret_namespace = secret_info.namespace
   809
   810                self.logger.debug(
   811                    'saving "%s.%s" (from %s) in secret_info',
   812                    secret_name,
   813                    secret_namespace,
   814                    secret_key,
   815                )
   816                self.secret_info[f"{secret_name}.{secret_namespace}"] = secret_info
   817            else:
   818                self.logger.debug(
   819                    "not saving secret_info from %s because there is no public half", secret_key
   820                )
   821
   822    def save_tls_context(self, ctx: IRTLSContext) -> None:
   823        extant_ctx = self.tls_contexts.get(ctx.name, None)
   824        is_valid = True
   825
   826        if extant_ctx:
   827            self.post_error(
   828                "Duplicate TLSContext %s; keeping definition from %s"
   829                % (ctx.name, extant_ctx.location)
   830            )
   831            is_valid = False
   832
   833        if ctx.get("redirect_cleartext_from", None) is not None:
   834            if self.redirect_cleartext_from is None:
   835                self.redirect_cleartext_from = ctx.redirect_cleartext_from
   836            else:
   837                if self.redirect_cleartext_from != ctx.redirect_cleartext_from:
   838                    self.post_error(
   839                        "TLSContext: %s; configured conflicting redirect_from port: %s"
   840                        % (ctx.name, ctx.redirect_cleartext_from)
   841                    )
   842                    is_valid = False
   843
   844        if is_valid:
   845            self.tls_contexts[ctx.name] = ctx
   846
   847    def get_resolver(self, name: str) -> Optional[IRServiceResolver]:
   848        return self.resolvers.get(name, None)
   849
   850    def add_resolver(self, resolver: IRServiceResolver) -> None:
   851        self.resolvers[resolver.name] = resolver
   852
   853    def has_tls_context(self, name: str) -> bool:
   854        return bool(self.get_tls_context(name))
   855
   856    def get_tls_context(self, name: str) -> Optional[IRTLSContext]:
   857        return self.tls_contexts.get(name, None)
   858
   859    def get_tls_contexts(self) -> ValuesView[IRTLSContext]:
   860        return self.tls_contexts.values()
   861
   862    def resolve_secret(self, resource: IRResource, secret_name: str, namespace: str):
   863        # OK. Do we already have a SavedSecret for this?
   864        ss_key = f"{secret_name}.{namespace}"
   865
   866        ss = self.saved_secrets.get(ss_key, None)
   867
   868        if ss:
   869            # Done. Return it.
   870            self.logger.debug(f"resolve_secret {ss_key}: using cached SavedSecret")
   871            self.secret_handler.still_needed(resource, secret_name, namespace)
   872            return ss
   873
   874        # OK, do we have a secret_info for it??
   875        # self.logger.debug(f"resolve_secret {ss_key}: checking secret_info")
   876
   877        secret_info = self.secret_info.get(ss_key, None)
   878
   879        if secret_info:
   880            self.logger.debug(f"resolve_secret {ss_key}: found secret_info")
   881            self.secret_handler.still_needed(resource, secret_name, namespace)
   882        else:
   883            # No secret_info, so ask the secret_handler to find us one.
   884            self.logger.debug(f"resolve_secret {ss_key}: no secret_info, asking handler to load")
   885            secret_info = self.secret_handler.load_secret(resource, secret_name, namespace)
   886
   887        if not secret_info:
   888            self.logger.error(f"Secret {ss_key} unknown")
   889
   890            ss = SavedSecret(secret_name, namespace, None, None, None, None, None)
   891        else:
   892            self.logger.debug(f"resolve_secret {ss_key}: found secret, asking handler to cache")
   893
   894            # OK, we got a secret_info. Cache that using the secret handler.
   895            ss = self.secret_handler.cache_secret(resource, secret_info)
   896
   897            # Save this for next time.
   898            self.saved_secrets[secret_name] = ss
   899        return ss
   900
   901    def resolve_resolver(
   902        self, cluster: IRCluster, resolver_name: Optional[str]
   903    ) -> IRServiceResolver:
   904        # Which resolver should we use?
   905        if not resolver_name:
   906            resolver_name = self.ambassador_module.get("resolver", "kubernetes-service")
   907
   908        # Casting to str is OK because the Ambassador module's resolver must be a string,
   909        # so all the paths for resolver_name land with it being a string.
   910        resolver = self.get_resolver(typecast(str, resolver_name))
   911        assert resolver is not None
   912        return resolver
   913
   914    def resolve_targets(
   915        self,
   916        cluster: IRCluster,
   917        resolver_name: Optional[str],
   918        hostname: str,
   919        namespace: str,
   920        port: int,
   921    ) -> Optional[SvcEndpointSet]:
   922        # Is the host already an IP address?
   923        is_ip_address = False
   924
   925        try:
   926            x = ip_address(hostname)
   927            is_ip_address = True
   928        except ValueError:
   929            pass
   930
   931        if is_ip_address:
   932            # Already an IP address, great.
   933            self.logger.debug(f"cluster {cluster.name}: {hostname} is already an IP address")
   934
   935            return [{"ip": hostname, "port": port, "target_kind": "IPaddr"}]
   936
   937        resolver = self.resolve_resolver(cluster, resolver_name)
   938
   939        # It should not be possible for resolver to be unset here.
   940        if not resolver:
   941            self.post_error(
   942                f"cluster {cluster.name} has invalid resolver {resolver_name}?", rkey=cluster.rkey
   943            )
   944            return None
   945
   946        # OK, ask the resolver for the target list. Understanding the mechanics of resolution
   947        # and the load balancer policy and all that is up to the resolver.
   948        return resolver.resolve(self, cluster, hostname, namespace, port)
   949
   950    def save_filter(self, resource: IRFilter, already_saved=False) -> None:
   951        if resource.is_active():
   952            if not already_saved:
   953                resource = typecast(IRFilter, self.save_resource(resource))
   954
   955            self.filters.append(resource)
   956
   957    def walk_saved_resources(self, aconf, method_name):
   958        for res in self.saved_resources.values():
   959            getattr(res, method_name)(self, aconf)
   960
   961    def save_listener(self, listener: IRListener) -> None:
   962        listener_key = listener.bind_to()
   963
   964        extant_listener = self.listeners.get(listener_key, None)
   965        is_valid = True
   966        if extant_listener:
   967            err_msg = (
   968                f"Duplicate listener {listener.name} on {listener.socket_protocol.lower()}://{listener.bind_address}:{listener.port};"
   969                f" keeping definition from {extant_listener.location}"
   970            )
   971            self.post_error(err_msg)
   972            is_valid = False
   973
   974        if is_valid:
   975            self.listeners[listener_key] = listener
   976
   977    def add_mapping(self, aconf: Config, mapping: IRBaseMapping) -> Optional[IRBaseMappingGroup]:
   978        mapping.check_status()
   979
   980        if mapping.is_active():
   981            if mapping.group_id not in self.groups:
   982                # Is this group in our external cache?
   983                group_key = mapping.group_class().key_for_id(mapping.group_id)
   984                group = self.cache_fetch(group_key)
   985
   986                if group is not None:
   987                    self.logger.debug(f"IR: got group from cache for {mapping.name}")
   988                else:
   989                    self.logger.debug(f"IR: synthesizing group for {mapping.name}")
   990                    group_name = "GROUP: %s" % mapping.name
   991                    group_class = mapping.group_class()
   992                    group = group_class(
   993                        ir=self,
   994                        aconf=aconf,
   995                        location=mapping.location,
   996                        name=group_name,
   997                        mapping=mapping,
   998                    )
   999
  1000                # There's no way group can be anything but a non-None IRBaseMappingGroup
  1001                # here. assert() that so that mypy understands it.
  1002                assert isinstance(group, IRBaseMappingGroup)  # for mypy
  1003                self.groups[group.group_id] = group
  1004            else:
  1005                self.logger.debug(f"IR: already have group for {mapping.name}")
  1006                group = self.groups[mapping.group_id]
  1007                group.add_mapping(aconf, mapping)
  1008
  1009            self.cache_add(mapping)
  1010            self.cache_add(group)
  1011            self.cache_link(mapping, group)
  1012
  1013            return group
  1014        else:
  1015            return None
  1016
  1017    def ordered_groups(self) -> Iterable[IRBaseMappingGroup]:
  1018        return reversed(sorted(self.groups.values(), key=lambda x: x["group_weight"]))
  1019
  1020    def has_cluster(self, name: str) -> bool:
  1021        return name in self.clusters
  1022
  1023    def get_cluster(self, name: str) -> Optional[IRCluster]:
  1024        return self.clusters.get(name, None)
  1025
  1026    def add_cluster(self, cluster: IRCluster) -> IRCluster:
  1027        if not self.has_cluster(cluster.name):
  1028            self.logger.debug("IR: add_cluster: new cluster %s" % cluster.name)
  1029            self.clusters[cluster.name] = cluster
  1030
  1031            if cluster.is_edge_stack_sidecar():
  1032                self.logger.debug(f"IR: cluster {cluster.name} is the sidecar cluster name")
  1033                self.sidecar_cluster_name = cluster.name
  1034        else:
  1035            self.logger.debug(
  1036                "IR: add_cluster: extant cluster %s (%s)"
  1037                % (cluster.name, cluster.get("envoy_name", "-"))
  1038            )
  1039
  1040        return self.clusters[cluster.name]
  1041
  1042    def merge_cluster(self, cluster: IRCluster) -> bool:
  1043        extant = self.get_cluster(cluster.name)
  1044
  1045        if extant:
  1046            return extant.merge(cluster)
  1047        else:
  1048            self.add_cluster(cluster)
  1049            return True
  1050
  1051    def has_grpc_service(self, name: str) -> bool:
  1052        return name in self.grpc_services
  1053
  1054    def add_grpc_service(self, name: str, cluster: IRCluster) -> IRCluster:
  1055        if not self.has_grpc_service(name):
  1056            if not self.has_cluster(cluster.name):
  1057                self.clusters[cluster.name] = cluster
  1058
  1059            self.grpc_services[name] = cluster
  1060
  1061        return self.grpc_services[name]
  1062
  1063    def as_dict(self) -> Dict[str, Any]:
  1064        od = {
  1065            "identity": {
  1066                "ambassador_id": self.ambassador_id,
  1067                "ambassador_namespace": self.ambassador_namespace,
  1068                "ambassador_nodename": self.ambassador_nodename,
  1069            },
  1070            "ambassador": self.ambassador_module.as_dict(),
  1071            "clusters": {
  1072                cluster_name: cluster.as_dict() for cluster_name, cluster in self.clusters.items()
  1073            },
  1074            "grpc_services": {
  1075                svc_name: cluster.as_dict() for svc_name, cluster in self.grpc_services.items()
  1076            },
  1077            "hosts": [host.as_dict() for host in self.hosts.values()],
  1078            "listeners": [self.listeners[x].as_dict() for x in sorted(self.listeners.keys())],
  1079            "filters": [filt.as_dict() for filt in self.filters],
  1080            "groups": [group.as_dict() for group in self.ordered_groups()],
  1081            "tls_contexts": [context.as_dict() for context in self.tls_contexts.values()],
  1082            "services": self.services,
  1083            "k8s_status_updates": self.k8s_status_updates,
  1084        }
  1085
  1086        if self.log_services:
  1087            od["log_services"] = [srv.as_dict() for srv in self.log_services.values()]
  1088
  1089        if self.tracing:
  1090            od["tracing"] = self.tracing.as_dict()
  1091
  1092        if self.ratelimit:
  1093            od["ratelimit"] = self.ratelimit.as_dict()
  1094
  1095        return od
  1096
  1097    def as_json(self) -> str:
  1098        return dump_json(self.as_dict(), pretty=True)
  1099
  1100    def features(self) -> Dict[str, Any]:
  1101        od: Dict[str, Union[bool, int, Optional[str], Dict]] = {}
  1102
  1103        if self.aconf.helm_chart:
  1104            od["helm_chart"] = self.aconf.helm_chart
  1105        od["managed_by"] = self.aconf.pod_labels.get("app.kubernetes.io/managed-by", "")
  1106
  1107        tls_termination_count = 0  # TLS termination contexts
  1108        tls_origination_count = 0  # TLS origination contexts
  1109        tls_crl_file_count = 0  # CRL files used
  1110
  1111        using_tls_module = False
  1112        using_tls_contexts = False
  1113
  1114        for ctx in self.get_tls_contexts():
  1115            if ctx:
  1116                secret_info = ctx.get("secret_info", {})
  1117
  1118                if secret_info:
  1119                    using_tls_contexts = True
  1120
  1121                    if secret_info.get("certificate_chain_file", None):
  1122                        tls_termination_count += 1
  1123
  1124                    if secret_info.get("cacert_chain_file", None):
  1125                        tls_origination_count += 1
  1126
  1127                    if secret_info.get("crl_file", None):
  1128                        tls_crl_file_count += 1
  1129
  1130                if ctx.get("_legacy", False):
  1131                    using_tls_module = True
  1132
  1133        od["tls_using_module"] = using_tls_module
  1134        od["tls_using_contexts"] = using_tls_contexts
  1135        od["tls_termination_count"] = tls_termination_count
  1136        od["tls_origination_count"] = tls_origination_count
  1137        od["tls_crl_file_count"] = tls_crl_file_count
  1138
  1139        for key in ["diagnostics", "liveness_probe", "readiness_probe", "statsd"]:
  1140            od[key] = self.ambassador_module.get(key, {}).get("enabled", False)
  1141
  1142        for key in [
  1143            "use_proxy_proto",
  1144            "use_remote_address",
  1145            "x_forwarded_proto_redirect",
  1146            "enable_http10",
  1147            "add_linkerd_headers",
  1148            "use_ambassador_namespace_for_service_resolution",
  1149            "proper_case",
  1150            "preserve_external_request_id",
  1151        ]:
  1152            od[key] = self.ambassador_module.get(key, False)
  1153
  1154        od["service_resource_total"] = len(list(self.services.keys()))
  1155
  1156        od["listener_idle_timeout_ms"] = self.ambassador_module.get(
  1157            "listener_idle_timeout_ms", None
  1158        )
  1159        od["headers_with_underscores_action"] = self.ambassador_module.get(
  1160            "headers_with_underscores_action", None
  1161        )
  1162        od["max_request_headers_kb"] = self.ambassador_module.get("max_request_headers_kb", None)
  1163
  1164        od["server_name"] = bool(self.ambassador_module.server_name != "envoy")
  1165
  1166        od["custom_ambassador_id"] = bool(self.ambassador_id != "default")
  1167
  1168        od["buffer_limit_bytes"] = self.ambassador_module.get("buffer_limit_bytes", None)
  1169
  1170        default_port = (
  1171            Constants.SERVICE_PORT_HTTPS if tls_termination_count else Constants.SERVICE_PORT_HTTP
  1172        )
  1173
  1174        od["custom_listener_port"] = bool(self.ambassador_module.service_port != default_port)
  1175
  1176        od["allow_chunked_length"] = self.ambassador_module.get("allow_chunked_length", None)
  1177
  1178        cluster_count = 0
  1179        cluster_grpc_count = 0  # clusters using GRPC upstream
  1180        cluster_http_count = 0  # clusters using HTTP or HTTPS upstream
  1181        cluster_tls_count = 0  # clusters using TLS origination
  1182
  1183        cluster_routing_kube_count = 0  # clusters routing using kube
  1184        cluster_routing_envoy_rr_count = 0  # clusters routing using envoy round robin
  1185        cluster_routing_envoy_rh_count = 0  # clusters routing using envoy ring hash
  1186        cluster_routing_envoy_maglev_count = 0  # clusters routing using envoy maglev
  1187        cluster_routing_envoy_lr_count = 0  # clusters routing using envoy least request
  1188
  1189        endpoint_grpc_count = 0  # endpoints using GRPC upstream
  1190        endpoint_http_count = 0  # endpoints using HTTP/HTTPS upstream
  1191        endpoint_tls_count = 0  # endpoints using TLS origination
  1192
  1193        endpoint_routing_kube_count = 0  # endpoints Kube is routing to
  1194        endpoint_routing_envoy_rr_count = 0  # endpoints Envoy round robin is routing to
  1195        endpoint_routing_envoy_rh_count = 0  # endpoints Envoy ring hash is routing to
  1196        endpoint_routing_envoy_maglev_count = 0  # endpoints Envoy maglev is routing to
  1197        endpoint_routing_envoy_lr_count = 0  # endpoints Envoy least request is routing to
  1198
  1199        for cluster in self.clusters.values():
  1200            cluster_count += 1
  1201            using_tls = False
  1202            using_http = False
  1203            using_grpc = False
  1204
  1205            lb_type = "kube"
  1206
  1207            if cluster.get("enable_endpoints", False):
  1208                lb_type = cluster.get("lb_type", "round_robin")
  1209
  1210            if lb_type == "kube":
  1211                cluster_routing_kube_count += 1
  1212            elif lb_type == "ring_hash":
  1213                cluster_routing_envoy_rh_count += 1
  1214            elif lb_type == "maglev":
  1215                cluster_routing_envoy_maglev_count += 1
  1216            elif lb_type == "least_request":
  1217                cluster_routing_envoy_lr_count += 1
  1218            else:
  1219                cluster_routing_envoy_rr_count += 1
  1220
  1221            if cluster.get("tls_context", None):
  1222                using_tls = True
  1223                cluster_tls_count += 1
  1224
  1225            if cluster.get("grpc", False):
  1226                using_grpc = True
  1227                cluster_grpc_count += 1
  1228            else:
  1229                using_http = True
  1230                cluster_http_count += 1
  1231
  1232            cluster_endpoints = cluster.urls if (lb_type == "kube") else cluster.get("targets", [])
  1233
  1234            # Paranoia, really.
  1235            if not cluster_endpoints:
  1236                cluster_endpoints = []
  1237
  1238            num_endpoints = len(cluster_endpoints)
  1239
  1240            # self.logger.debug(f'cluster {cluster.name}: lb_type {lb_type}, endpoints {cluster_endpoints} ({num_endpoints})')
  1241
  1242            if using_tls:
  1243                endpoint_tls_count += num_endpoints
  1244
  1245            if using_http:
  1246                endpoint_http_count += num_endpoints
  1247
  1248            if using_grpc:
  1249                endpoint_grpc_count += num_endpoints
  1250
  1251            if lb_type == "kube":
  1252                endpoint_routing_kube_count += num_endpoints
  1253            elif lb_type == "ring_hash":
  1254                endpoint_routing_envoy_rh_count += num_endpoints
  1255            elif lb_type == "maglev":
  1256                endpoint_routing_envoy_maglev_count += num_endpoints
  1257            elif lb_type == "least_request":
  1258                endpoint_routing_envoy_lr_count += num_endpoints
  1259            else:
  1260                endpoint_routing_envoy_rr_count += num_endpoints
  1261
  1262        od["cluster_count"] = cluster_count
  1263        od["cluster_grpc_count"] = cluster_grpc_count
  1264        od["cluster_http_count"] = cluster_http_count
  1265        od["cluster_tls_count"] = cluster_tls_count
  1266        od["cluster_routing_kube_count"] = cluster_routing_kube_count
  1267        od["cluster_routing_envoy_rr_count"] = cluster_routing_envoy_rr_count
  1268        od["cluster_routing_envoy_rh_count"] = cluster_routing_envoy_rh_count
  1269        od["cluster_routing_envoy_maglev_count"] = cluster_routing_envoy_maglev_count
  1270        od["cluster_routing_envoy_lr_count"] = cluster_routing_envoy_lr_count
  1271
  1272        od["endpoint_routing"] = Config.enable_endpoints
  1273
  1274        od["endpoint_grpc_count"] = endpoint_grpc_count
  1275        od["endpoint_http_count"] = endpoint_http_count
  1276        od["endpoint_tls_count"] = endpoint_tls_count
  1277        od["endpoint_routing_kube_count"] = endpoint_routing_kube_count
  1278        od["endpoint_routing_envoy_rr_count"] = endpoint_routing_envoy_rr_count
  1279        od["endpoint_routing_envoy_rh_count"] = endpoint_routing_envoy_rh_count
  1280        od["endpoint_routing_envoy_maglev_count"] = endpoint_routing_envoy_maglev_count
  1281        od["endpoint_routing_envoy_lr_count"] = endpoint_routing_envoy_lr_count
  1282
  1283        od["cluster_ingress_count"] = 0  # Provided for backward compatibility only.
  1284        od["knative_ingress_count"] = self.aconf.get_count("knative_ingress")
  1285
  1286        od["k8s_ingress_count"] = self.aconf.get_count("k8s_ingress")
  1287        od["k8s_ingress_class_count"] = self.aconf.get_count("k8s_ingress_class")
  1288
  1289        extauth = False
  1290        extauth_proto: Optional[str] = None
  1291        extauth_allow_body = False
  1292        extauth_host_count = 0
  1293
  1294        ratelimit = False
  1295        ratelimit_data_plane_proto = False
  1296        ratelimit_custom_domain = False
  1297
  1298        tracing = False
  1299        tracing_driver: Optional[str] = None
  1300
  1301        for filter in self.filters:
  1302            if filter.kind == "IRAuth":
  1303                extauth = True
  1304                extauth_proto = filter.get("proto", "http")
  1305                extauth_allow_body = filter.get("allow_request_body", False)
  1306                extauth_host_count = len(filter.hosts.keys())
  1307
  1308        if self.ratelimit:
  1309            ratelimit = True
  1310            ratelimit_data_plane_proto = self.ratelimit.get("data_plane_proto", False)
  1311            ratelimit_custom_domain = bool(self.ratelimit.domain != "ambassador")
  1312
  1313        if self.tracing:
  1314            tracing = True
  1315            tracing_driver = self.tracing.driver
  1316
  1317        od["extauth"] = extauth
  1318        od["extauth_proto"] = extauth_proto
  1319        od["extauth_allow_body"] = extauth_allow_body
  1320        od["extauth_host_count"] = extauth_host_count
  1321        od["ratelimit"] = ratelimit
  1322        od["ratelimit_data_plane_proto"] = ratelimit_data_plane_proto
  1323        od["ratelimit_custom_domain"] = ratelimit_custom_domain
  1324        od["tracing"] = tracing
  1325        od["tracing_driver"] = tracing_driver
  1326
  1327        group_count = 0
  1328        group_http_count = 0  # HTTPMappingGroups
  1329        group_tcp_count = 0  # TCPMappingGroups
  1330        group_precedence_count = 0  # groups using explicit precedence
  1331        group_header_match_count = 0  # groups using header matches
  1332        group_regex_header_count = 0  # groups using regex header matches
  1333        group_regex_prefix_count = 0  # groups using regex prefix matches
  1334        group_shadow_count = 0  # groups using shadows
  1335        group_shadow_weighted_count = 0  # groups using shadows with non-100% weights
  1336        group_host_redirect_count = 0  # groups using host_redirect
  1337        group_host_rewrite_count = 0  # groups using host_rewrite
  1338        group_canary_count = 0  # groups coalescing multiple mappings
  1339        group_resolver_kube_service = 0  # groups using the KubernetesServiceResolver
  1340        group_resolver_kube_endpoint = 0  # groups using the KubernetesServiceResolver
  1341        group_resolver_consul = 0  # groups using the ConsulResolver
  1342        mapping_count = 0  # total mappings
  1343
  1344        for group in self.ordered_groups():
  1345            group_count += 1
  1346
  1347            if group.get("kind", "IRHTTPMappingGroup") == "IRTCPMappingGroup":
  1348                group_tcp_count += 1
  1349            else:
  1350                group_http_count += 1
  1351
  1352            if group.get("precedence", 0) != 0:
  1353                group_precedence_count += 1
  1354
  1355            using_headers = False
  1356            using_regex_headers = False
  1357
  1358            for header in group.get("headers", []):
  1359                using_headers = True
  1360
  1361                if header["regex"]:
  1362                    using_regex_headers = True
  1363                    break
  1364
  1365            if using_headers:
  1366                group_header_match_count += 1
  1367
  1368            if using_regex_headers:
  1369                group_regex_header_count += 1
  1370
  1371            if len(group.mappings) > 1:
  1372                group_canary_count += 1
  1373
  1374            mapping_count += len(group.mappings)
  1375
  1376            if group.get("shadows", []):
  1377                group_shadow_count += 1
  1378
  1379                if group.get("weight", 100) != 100:
  1380                    group_shadow_weighted_count += 1
  1381
  1382            if group.get("host_redirect", {}):
  1383                group_host_redirect_count += 1
  1384
  1385            if group.get("host_rewrite", None):
  1386                group_host_rewrite_count += 1
  1387
  1388            res_name = group.get(
  1389                "resolver", self.ambassador_module.get("resolver", "kubernetes-service")
  1390            )
  1391            resolver = self.get_resolver(res_name)
  1392
  1393            if resolver:
  1394                if resolver.kind == "KubernetesServiceResolver":
  1395                    group_resolver_kube_service += 1
  1396                elif resolver.kind == "KubernetesEndpoinhResolver":
  1397                    group_resolver_kube_endpoint += 1
  1398                elif resolver.kind == "ConsulResolver":
  1399                    group_resolver_consul += 1
  1400
  1401        od["group_count"] = group_count
  1402        od["group_http_count"] = group_http_count
  1403        od["group_tcp_count"] = group_tcp_count
  1404        od["group_precedence_count"] = group_precedence_count
  1405        od["group_header_match_count"] = group_header_match_count
  1406        od["group_regex_header_count"] = group_regex_header_count
  1407        od["group_regex_prefix_count"] = group_regex_prefix_count
  1408        od["group_shadow_count"] = group_shadow_count
  1409        od["group_shadow_weighted_count"] = group_shadow_weighted_count
  1410        od["group_host_redirect_count"] = group_host_redirect_count
  1411        od["group_host_rewrite_count"] = group_host_rewrite_count
  1412        od["group_canary_count"] = group_canary_count
  1413        od["group_resolver_kube_service"] = group_resolver_kube_service
  1414        od["group_resolver_kube_endpoint"] = group_resolver_kube_endpoint
  1415        od["group_resolver_consul"] = group_resolver_consul
  1416        od["mapping_count"] = mapping_count
  1417
  1418        od["listener_count"] = len(self.listeners)
  1419        od["host_count"] = len(self.hosts)
  1420
  1421        invalid_counts: Dict[str, int] = {}
  1422
  1423        if self.invalid:
  1424            for obj in self.invalid:
  1425                kind = obj.get("kind") or "(unknown)"
  1426
  1427                invalid_counts[kind] = invalid_counts.get(kind, 0) + 1
  1428
  1429        od["invalid_counts"] = invalid_counts
  1430
  1431        # Fast reconfiguration information is supplied in check_scout in diagd.py.
  1432
  1433        return od

View as plain text