...

Text file src/github.com/emissary-ingress/emissary/v3/python/kubewatch.py

Documentation: github.com/emissary-ingress/emissary/v3/python

     1# Copyright 2018 Datawire. All rights reserved.
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#     http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License
    14
    15import logging
    16import os
    17import uuid
    18from pathlib import Path
    19from typing import Dict, List
    20
    21import click
    22from kubernetes import client, config
    23from kubernetes.client.rest import ApiException
    24
    25from ambassador.VERSION import Version
    26
    27__version__ = Version
    28ambassador_id = os.getenv("AMBASSADOR_ID", "default")
    29ambassador_namespace = os.environ.get("AMBASSADOR_NAMESPACE", "default")
    30ambassador_single_namespace = bool("AMBASSADOR_SINGLE_NAMESPACE" in os.environ)
    31ambassador_basedir = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
    32
    33logging.basicConfig(
    34    level=logging.INFO,  # if appDebug else logging.INFO,
    35    format="%%(asctime)s kubewatch [%%(process)d T%%(threadName)s] %s %%(levelname)s: %%(message)s"
    36    % __version__,
    37    datefmt="%Y-%m-%d %H:%M:%S",
    38)
    39
    40# logging.getLogger("datawire.scout").setLevel(logging.DEBUG)
    41logger = logging.getLogger("kubewatch")
    42logger.setLevel(logging.INFO)
    43
    44
    45def kube_v1():
    46    # Assume we got nothin'.
    47    k8s_api = None
    48
    49    # XXX: is there a better way to check if we are inside a cluster or not?
    50    if "KUBERNETES_SERVICE_HOST" in os.environ:
    51        # If this goes horribly wrong and raises an exception (it shouldn't),
    52        # we'll crash, and Kubernetes will kill the pod. That's probably not an
    53        # unreasonable response.
    54        config.load_incluster_config()
    55        if "AMBASSADOR_VERIFY_SSL_FALSE" in os.environ:
    56            configuration = client.Configuration()
    57            configuration.verify_ssl = False
    58            client.Configuration.set_default(configuration)
    59        k8s_api = client.CoreV1Api()
    60    else:
    61        # Here, we might be running in docker, in which case we'll likely not
    62        # have any Kube secrets, and that's OK.
    63        try:
    64            config.load_kube_config()
    65            k8s_api = client.CoreV1Api()
    66        except FileNotFoundError:
    67            # Meh, just ride through.
    68            logger.info("No K8s")
    69
    70    return k8s_api
    71
    72
    73def hack_stored_versions(self):
    74    return self._stored_versions
    75
    76
    77def hack_stored_versions_setter(self, stored_versions):
    78    self._stored_versions = stored_versions
    79
    80
    81def hack_accepted_names(self):
    82    return self._accepted_names
    83
    84
    85def hack_accepted_names_setter(self, accepted_names):
    86    self._accepted_names = accepted_names
    87
    88
    89def hack_conditions(self):
    90    return self._conditions
    91
    92
    93def hack_conditions_setter(self, conditions):
    94    self._conditions = conditions
    95
    96
    97def check_crd_type(crd):
    98    status = False
    99
   100    try:
   101        client.apis.ApiextensionsV1beta1Api().read_custom_resource_definition(crd)
   102        status = True
   103    except client.rest.ApiException as e:
   104        if e.status == 404:
   105            logger.debug(f"CRD type definition not found for {crd}")
   106        else:
   107            logger.debug(f"CRD type definition unreadable for {crd}: {e.reason}")
   108
   109    return status
   110
   111
   112def check_ingresses():
   113    status = False
   114
   115    k8s_v1b1 = client.ExtensionsV1beta1Api(client.ApiClient(client.Configuration()))
   116
   117    if k8s_v1b1:
   118        try:
   119            if ambassador_single_namespace:
   120                k8s_v1b1.list_namespaced_ingress(ambassador_namespace)
   121            else:
   122                k8s_v1b1.list_ingress_for_all_namespaces()
   123            status = True
   124        except ApiException as e:
   125            logger.debug(f"Ingress check got {e.status}")
   126
   127    return status
   128
   129
   130def check_ingress_classes():
   131    status = False
   132
   133    api_client = client.ApiClient(client.Configuration())
   134
   135    if api_client:
   136        try:
   137            # Sadly, the Kubernetes Python library is not built with forward-compatibility in mind.
   138            # Since IngressClass is a new resource, it is not discoverable through the python wrapper apis.
   139            # Here, we extracted (read copy/pasted) a sample call from k8s_v1b1.list_ingress_for_all_namespaces()
   140            # where we use the rest ApiClient to read ingressclasses.
   141
   142            path_params: Dict[str, str] = {}
   143            query_params: List[str] = []
   144            header_params: Dict[str, str] = {}
   145
   146            header_params["Accept"] = api_client.select_header_accept(
   147                [
   148                    "application/json",
   149                    "application/yaml",
   150                    "application/vnd.kubernetes.protobuf",
   151                    "application/json;stream=watch",
   152                    "application/vnd.kubernetes.protobuf;stream=watch",
   153                ]
   154            )
   155
   156            header_params["Content-Type"] = api_client.select_header_content_type(["*/*"])
   157
   158            auth_settings = ["BearerToken"]
   159
   160            api_client.call_api(
   161                "/apis/networking.k8s.io/v1beta1/ingressclasses",
   162                "GET",
   163                path_params,
   164                query_params,
   165                header_params,
   166                auth_settings=auth_settings,
   167            )
   168            status = True
   169        except ApiException as e:
   170            logger.debug(f"IngressClass check got {e.status}")
   171
   172    return status
   173
   174
   175def get_api_resources(group, version):
   176    api_client = client.ApiClient(client.Configuration())
   177
   178    if api_client:
   179        try:
   180            # Sadly, the Kubernetes Python library supports a method equivalent to `kubectl api-versions`
   181            # but nothing for `kubectl api-resources`.
   182            # Here, we extracted (read copy/pasted) a sample call from ApisApi().get_api_versions()
   183            # where we use the rest ApiClient to list api resources specific to a group.
   184
   185            path_params: Dict[str, str] = {}
   186            query_params: List[str] = []
   187            header_params: Dict[str, str] = {}
   188
   189            header_params["Accept"] = api_client.select_header_accept(["application/json"])
   190
   191            auth_settings = ["BearerToken"]
   192
   193            (data) = api_client.call_api(
   194                f"/apis/{group}/{version}",
   195                "GET",
   196                path_params,
   197                query_params,
   198                header_params,
   199                auth_settings=auth_settings,
   200                response_type="V1APIResourceList",
   201            )
   202            return data[0]
   203        except ApiException as e:
   204            logger.error(f"get_api_resources {e.status}")
   205
   206    return None
   207
   208
   209def touch_file(touchfile):
   210    touchpath = Path(ambassador_basedir, touchfile)
   211    try:
   212        touchpath.touch()
   213    except PermissionError as e:
   214        logger.error(e)
   215
   216
   217@click.command()
   218@click.option("--debug", is_flag=True, help="Enable debugging")
   219def main(debug):
   220    if debug:
   221        logger.setLevel(logging.DEBUG)
   222
   223    found = None
   224    root_id = None
   225
   226    cluster_id = os.environ.get(
   227        "AMBASSADOR_CLUSTER_ID", os.environ.get("AMBASSADOR_SCOUT_ID", None)
   228    )
   229    wanted = ambassador_namespace if ambassador_single_namespace else "default"
   230
   231    # Go ahead and try connecting to Kube.
   232    v1 = kube_v1()
   233
   234    # OK. Do the cluster ID dance. If we already have one from the environment,
   235    # we're good.
   236
   237    if cluster_id:
   238        found = "environment"
   239    else:
   240        if v1:
   241            # No ID from the environment, but we can try a lookup using Kube.
   242            logger.debug("looking up ID for namespace %s" % wanted)
   243
   244            try:
   245                ret = v1.read_namespace(wanted)
   246                root_id = ret.metadata.uid
   247                found = "namespace %s" % wanted
   248            except ApiException as e:
   249                # This means our namespace wasn't found?
   250                logger.error("couldn't read namespace %s? %s" % (wanted, e))
   251
   252        if not root_id:
   253            # OK, so we had a crack at this and something went wrong. Give up and hardcode
   254            # something.
   255            root_id = "00000000-0000-0000-0000-000000000000"
   256            found = "hardcoded ID"
   257
   258        # One way or the other, we need to generate an ID here.
   259        cluster_url = "d6e_id://%s/%s" % (root_id, ambassador_id)
   260        logger.debug("cluster ID URL is %s" % cluster_url)
   261
   262        cluster_id = str(uuid.uuid5(uuid.NAMESPACE_URL, cluster_url)).lower()
   263
   264    # How about CRDs?
   265
   266    if v1:
   267        # We were able to connect to Kube, so let's try to check for missing CRDs too.
   268
   269        required_crds = [
   270            (
   271                ".ambassador_ignore_crds",
   272                "Main CRDs",
   273                [
   274                    "authservices.getambassador.io",
   275                    "mappings.getambassador.io",
   276                    "modules.getambassador.io",
   277                    "ratelimitservices.getambassador.io",
   278                    "tcpmappings.getambassador.io",
   279                    "tlscontexts.getambassador.io",
   280                    "tracingservices.getambassador.io",
   281                ],
   282            ),
   283            (
   284                ".ambassador_ignore_crds_2",
   285                "Resolver CRDs",
   286                [
   287                    "consulresolvers.getambassador.io",
   288                    "kubernetesendpointresolvers.getambassador.io",
   289                    "kubernetesserviceresolvers.getambassador.io",
   290                ],
   291            ),
   292            (".ambassador_ignore_crds_3", "Host CRDs", ["hosts.getambassador.io"]),
   293            (".ambassador_ignore_crds_4", "LogService CRDs", ["logservices.getambassador.io"]),
   294            (".ambassador_ignore_crds_5", "DevPortal CRDs", ["devportals.getambassador.io"]),
   295        ]
   296
   297        # Flynn would say "Ew.", but we need to patch this till https://github.com/kubernetes-client/python/issues/376
   298        # and https://github.com/kubernetes-client/gen/issues/52 are fixed \_(0.0)_/
   299        client.models.V1beta1CustomResourceDefinitionStatus.accepted_names = property(
   300            hack_accepted_names, hack_accepted_names_setter
   301        )
   302
   303        client.models.V1beta1CustomResourceDefinitionStatus.conditions = property(
   304            hack_conditions, hack_conditions_setter
   305        )
   306
   307        client.models.V1beta1CustomResourceDefinitionStatus.stored_versions = property(
   308            hack_stored_versions, hack_stored_versions_setter
   309        )
   310
   311        known_api_resources = []
   312        api_resources = get_api_resources("getambassador.io", "v2")
   313        if api_resources:
   314            known_api_resources = list(
   315                map(lambda r: r.name + ".getambassador.io", api_resources.resources)
   316            )
   317
   318        for touchfile, description, required in required_crds:
   319            for crd in required:
   320                if not crd in known_api_resources:
   321                    touch_file(touchfile)
   322
   323                    logger.debug(
   324                        f"{description} are not available."
   325                        + " To enable CRD support, configure the Ambassador CRD type definitions and RBAC,"
   326                        + " then restart the Ambassador pod."
   327                    )
   328                    # logger.debug(f'touched {touchpath}')
   329
   330        if not check_ingress_classes():
   331            touch_file(".ambassador_ignore_ingress_class")
   332
   333            logger.debug(
   334                f"Ambassador does not have permission to read IngressClass resources."
   335                + " To enable IngressClass support, configure RBAC to allow Ambassador to read IngressClass"
   336                " resources, then restart the Ambassador pod."
   337            )
   338
   339        if not check_ingresses():
   340            touch_file(".ambassador_ignore_ingress")
   341
   342            logger.debug(
   343                f"Ambassador does not have permission to read Ingress resources."
   344                + " To enable Ingress support, configure RBAC to allow Ambassador to read Ingress resources,"
   345                + " then restart the Ambassador pod."
   346            )
   347
   348        # Check for our operator's CRD now
   349        if check_crd_type("ambassadorinstallations.getambassador.io"):
   350            touch_file(".ambassadorinstallations_ok")
   351            logger.debug("ambassadorinstallations.getambassador.io CRD available")
   352        else:
   353            logger.debug("ambassadorinstallations.getambassador.io CRD not available")
   354
   355        # Have we been asked to do Knative support?
   356        if os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true":
   357            # Yes. Check for their CRD types.
   358
   359            if check_crd_type("clusteringresses.networking.internal.knative.dev"):
   360                touch_file(".knative_clusteringress_ok")
   361                logger.debug("Knative clusteringresses available")
   362            else:
   363                logger.debug("Knative clusteringresses not available")
   364
   365            if check_crd_type("ingresses.networking.internal.knative.dev"):
   366                touch_file(".knative_ingress_ok")
   367                logger.debug("Knative ingresses available")
   368            else:
   369                logger.debug("Knative ingresses not available")
   370    else:
   371        # If we couldn't talk to Kube, log that, but broadly we'll expect our caller
   372        # to DTRT around CRDs.
   373
   374        logger.debug("Kubernetes is not available, so not doing CRD check")
   375
   376    # Finally, spit out the cluster ID for our caller.
   377    logger.debug("cluster ID is %s (from %s)" % (cluster_id, found))
   378
   379    print(cluster_id)
   380
   381
   382if __name__ == "__main__":
   383    main()

View as plain text