...

Text file src/github.com/datawire/ambassador/v2/python/kubewatch.py

Documentation: github.com/datawire/ambassador/v2/python

     1# Copyright 2018 Datawire. All rights reserved.
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#     http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License
    14
    15import logging
    16import os
    17import uuid
    18from pathlib import Path
    19
    20import click
    21from kubernetes import client, config
    22from kubernetes.client.rest import ApiException
    23
    24from ambassador.VERSION import Version
    25
    26__version__ = Version
    27ambassador_id = os.getenv("AMBASSADOR_ID", "default")
    28ambassador_namespace = os.environ.get("AMBASSADOR_NAMESPACE", "default")
    29ambassador_single_namespace = bool("AMBASSADOR_SINGLE_NAMESPACE" in os.environ)
    30ambassador_basedir = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
    31
    32logging.basicConfig(
    33    level=logging.INFO,  # if appDebug else logging.INFO,
    34    format="%%(asctime)s kubewatch [%%(process)d T%%(threadName)s] %s %%(levelname)s: %%(message)s"
    35    % __version__,
    36    datefmt="%Y-%m-%d %H:%M:%S",
    37)
    38
    39# logging.getLogger("datawire.scout").setLevel(logging.DEBUG)
    40logger = logging.getLogger("kubewatch")
    41logger.setLevel(logging.INFO)
    42
    43
    44def kube_v1():
    45    # Assume we got nothin'.
    46    k8s_api = None
    47
    48    # XXX: is there a better way to check if we are inside a cluster or not?
    49    if "KUBERNETES_SERVICE_HOST" in os.environ:
    50        # If this goes horribly wrong and raises an exception (it shouldn't),
    51        # we'll crash, and Kubernetes will kill the pod. That's probably not an
    52        # unreasonable response.
    53        config.load_incluster_config()
    54        if "AMBASSADOR_VERIFY_SSL_FALSE" in os.environ:
    55            configuration = client.Configuration()
    56            configuration.verify_ssl = False
    57            client.Configuration.set_default(configuration)
    58        k8s_api = client.CoreV1Api()
    59    else:
    60        # Here, we might be running in docker, in which case we'll likely not
    61        # have any Kube secrets, and that's OK.
    62        try:
    63            config.load_kube_config()
    64            k8s_api = client.CoreV1Api()
    65        except FileNotFoundError:
    66            # Meh, just ride through.
    67            logger.info("No K8s")
    68
    69    return k8s_api
    70
    71
    72def hack_stored_versions(self):
    73    return self._stored_versions
    74
    75
    76def hack_stored_versions_setter(self, stored_versions):
    77    self._stored_versions = stored_versions
    78
    79
    80def hack_accepted_names(self):
    81    return self._accepted_names
    82
    83
    84def hack_accepted_names_setter(self, accepted_names):
    85    self._accepted_names = accepted_names
    86
    87
    88def hack_conditions(self):
    89    return self._conditions
    90
    91
    92def hack_conditions_setter(self, conditions):
    93    self._conditions = conditions
    94
    95
    96def check_crd_type(crd):
    97    status = False
    98
    99    try:
   100        client.apis.ApiextensionsV1beta1Api().read_custom_resource_definition(crd)
   101        status = True
   102    except client.rest.ApiException as e:
   103        if e.status == 404:
   104            logger.debug(f"CRD type definition not found for {crd}")
   105        else:
   106            logger.debug(f"CRD type definition unreadable for {crd}: {e.reason}")
   107
   108    return status
   109
   110
   111def check_ingresses():
   112    status = False
   113
   114    k8s_v1b1 = client.ExtensionsV1beta1Api(client.ApiClient(client.Configuration()))
   115
   116    if k8s_v1b1:
   117        try:
   118            if ambassador_single_namespace:
   119                k8s_v1b1.list_namespaced_ingress(ambassador_namespace)
   120            else:
   121                k8s_v1b1.list_ingress_for_all_namespaces()
   122            status = True
   123        except ApiException as e:
   124            logger.debug(f"Ingress check got {e.status}")
   125
   126    return status
   127
   128
   129def check_ingress_classes():
   130    status = False
   131
   132    api_client = client.ApiClient(client.Configuration())
   133
   134    if api_client:
   135        try:
   136            # Sadly, the Kubernetes Python library is not built with forward-compatibility in mind.
   137            # Since IngressClass is a new resource, it is not discoverable through the python wrapper apis.
   138            # Here, we extracted (read copy/pasted) a sample call from k8s_v1b1.list_ingress_for_all_namespaces()
   139            # where we use the rest ApiClient to read ingressclasses.
   140
   141            path_params = {}
   142            query_params = []
   143            header_params = {}
   144
   145            header_params["Accept"] = api_client.select_header_accept(
   146                [
   147                    "application/json",
   148                    "application/yaml",
   149                    "application/vnd.kubernetes.protobuf",
   150                    "application/json;stream=watch",
   151                    "application/vnd.kubernetes.protobuf;stream=watch",
   152                ]
   153            )
   154
   155            header_params["Content-Type"] = api_client.select_header_content_type(["*/*"])
   156
   157            auth_settings = ["BearerToken"]
   158
   159            api_client.call_api(
   160                "/apis/networking.k8s.io/v1beta1/ingressclasses",
   161                "GET",
   162                path_params,
   163                query_params,
   164                header_params,
   165                auth_settings=auth_settings,
   166            )
   167            status = True
   168        except ApiException as e:
   169            logger.debug(f"IngressClass check got {e.status}")
   170
   171    return status
   172
   173
   174def get_api_resources(group, version):
   175    api_client = client.ApiClient(client.Configuration())
   176
   177    if api_client:
   178        try:
   179            # Sadly, the Kubernetes Python library supports a method equivalent to `kubectl api-versions`
   180            # but nothing for `kubectl api-resources`.
   181            # Here, we extracted (read copy/pasted) a sample call from ApisApi().get_api_versions()
   182            # where we use the rest ApiClient to list api resources specific to a group.
   183
   184            path_params = {}
   185            query_params = []
   186            header_params = {}
   187
   188            header_params["Accept"] = api_client.select_header_accept(["application/json"])
   189
   190            auth_settings = ["BearerToken"]
   191
   192            (data) = api_client.call_api(
   193                f"/apis/{group}/{version}",
   194                "GET",
   195                path_params,
   196                query_params,
   197                header_params,
   198                auth_settings=auth_settings,
   199                response_type="V1APIResourceList",
   200            )
   201            return data[0]
   202        except ApiException as e:
   203            logger.error(f"get_api_resources {e.status}")
   204
   205    return None
   206
   207
   208def touch_file(touchfile):
   209    touchpath = Path(ambassador_basedir, touchfile)
   210    try:
   211        touchpath.touch()
   212    except PermissionError as e:
   213        logger.error(e)
   214
   215
   216@click.command()
   217@click.option("--debug", is_flag=True, help="Enable debugging")
   218def main(debug):
   219    if debug:
   220        logger.setLevel(logging.DEBUG)
   221
   222    found = None
   223    root_id = None
   224
   225    cluster_id = os.environ.get(
   226        "AMBASSADOR_CLUSTER_ID", os.environ.get("AMBASSADOR_SCOUT_ID", None)
   227    )
   228    wanted = ambassador_namespace if ambassador_single_namespace else "default"
   229
   230    # Go ahead and try connecting to Kube.
   231    v1 = kube_v1()
   232
   233    # OK. Do the cluster ID dance. If we already have one from the environment,
   234    # we're good.
   235
   236    if cluster_id:
   237        found = "environment"
   238    else:
   239        if v1:
   240            # No ID from the environment, but we can try a lookup using Kube.
   241            logger.debug("looking up ID for namespace %s" % wanted)
   242
   243            try:
   244                ret = v1.read_namespace(wanted)
   245                root_id = ret.metadata.uid
   246                found = "namespace %s" % wanted
   247            except ApiException as e:
   248                # This means our namespace wasn't found?
   249                logger.error("couldn't read namespace %s? %s" % (wanted, e))
   250
   251        if not root_id:
   252            # OK, so we had a crack at this and something went wrong. Give up and hardcode
   253            # something.
   254            root_id = "00000000-0000-0000-0000-000000000000"
   255            found = "hardcoded ID"
   256
   257        # One way or the other, we need to generate an ID here.
   258        cluster_url = "d6e_id://%s/%s" % (root_id, ambassador_id)
   259        logger.debug("cluster ID URL is %s" % cluster_url)
   260
   261        cluster_id = str(uuid.uuid5(uuid.NAMESPACE_URL, cluster_url)).lower()
   262
   263    # How about CRDs?
   264
   265    if v1:
   266        # We were able to connect to Kube, so let's try to check for missing CRDs too.
   267
   268        required_crds = [
   269            (
   270                ".ambassador_ignore_crds",
   271                "Main CRDs",
   272                [
   273                    "authservices.getambassador.io",
   274                    "mappings.getambassador.io",
   275                    "modules.getambassador.io",
   276                    "ratelimitservices.getambassador.io",
   277                    "tcpmappings.getambassador.io",
   278                    "tlscontexts.getambassador.io",
   279                    "tracingservices.getambassador.io",
   280                ],
   281            ),
   282            (
   283                ".ambassador_ignore_crds_2",
   284                "Resolver CRDs",
   285                [
   286                    "consulresolvers.getambassador.io",
   287                    "kubernetesendpointresolvers.getambassador.io",
   288                    "kubernetesserviceresolvers.getambassador.io",
   289                ],
   290            ),
   291            (".ambassador_ignore_crds_3", "Host CRDs", ["hosts.getambassador.io"]),
   292            (".ambassador_ignore_crds_4", "LogService CRDs", ["logservices.getambassador.io"]),
   293            (".ambassador_ignore_crds_5", "DevPortal CRDs", ["devportals.getambassador.io"]),
   294        ]
   295
   296        # Flynn would say "Ew.", but we need to patch this till https://github.com/kubernetes-client/python/issues/376
   297        # and https://github.com/kubernetes-client/gen/issues/52 are fixed \_(0.0)_/
   298        client.models.V1beta1CustomResourceDefinitionStatus.accepted_names = property(
   299            hack_accepted_names, hack_accepted_names_setter
   300        )
   301
   302        client.models.V1beta1CustomResourceDefinitionStatus.conditions = property(
   303            hack_conditions, hack_conditions_setter
   304        )
   305
   306        client.models.V1beta1CustomResourceDefinitionStatus.stored_versions = property(
   307            hack_stored_versions, hack_stored_versions_setter
   308        )
   309
   310        known_api_resources = []
   311        api_resources = get_api_resources("getambassador.io", "v2")
   312        if api_resources:
   313            known_api_resources = list(
   314                map(lambda r: r.name + ".getambassador.io", api_resources.resources)
   315            )
   316
   317        for touchfile, description, required in required_crds:
   318            for crd in required:
   319                if not crd in known_api_resources:
   320                    touch_file(touchfile)
   321
   322                    logger.debug(
   323                        f"{description} are not available."
   324                        + " To enable CRD support, configure the Ambassador CRD type definitions and RBAC,"
   325                        + " then restart the Ambassador pod."
   326                    )
   327                    # logger.debug(f'touched {touchpath}')
   328
   329        if not check_ingress_classes():
   330            touch_file(".ambassador_ignore_ingress_class")
   331
   332            logger.debug(
   333                f"Ambassador does not have permission to read IngressClass resources."
   334                + " To enable IngressClass support, configure RBAC to allow Ambassador to read IngressClass"
   335                " resources, then restart the Ambassador pod."
   336            )
   337
   338        if not check_ingresses():
   339            touch_file(".ambassador_ignore_ingress")
   340
   341            logger.debug(
   342                f"Ambassador does not have permission to read Ingress resources."
   343                + " To enable Ingress support, configure RBAC to allow Ambassador to read Ingress resources,"
   344                + " then restart the Ambassador pod."
   345            )
   346
   347        # Check for our operator's CRD now
   348        if check_crd_type("ambassadorinstallations.getambassador.io"):
   349            touch_file(".ambassadorinstallations_ok")
   350            logger.debug("ambassadorinstallations.getambassador.io CRD available")
   351        else:
   352            logger.debug("ambassadorinstallations.getambassador.io CRD not available")
   353
   354        # Have we been asked to do Knative support?
   355        if os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true":
   356            # Yes. Check for their CRD types.
   357
   358            if check_crd_type("clusteringresses.networking.internal.knative.dev"):
   359                touch_file(".knative_clusteringress_ok")
   360                logger.debug("Knative clusteringresses available")
   361            else:
   362                logger.debug("Knative clusteringresses not available")
   363
   364            if check_crd_type("ingresses.networking.internal.knative.dev"):
   365                touch_file(".knative_ingress_ok")
   366                logger.debug("Knative ingresses available")
   367            else:
   368                logger.debug("Knative ingresses not available")
   369    else:
   370        # If we couldn't talk to Kube, log that, but broadly we'll expect our caller
   371        # to DTRT around CRDs.
   372
   373        logger.debug("Kubernetes is not available, so not doing CRD check")
   374
   375    # Finally, spit out the cluster ID for our caller.
   376    logger.debug("cluster ID is %s (from %s)" % (cluster_id, found))
   377
   378    print(cluster_id)
   379
   380
   381if __name__ == "__main__":
   382    main()

View as plain text