...

Text file src/github.com/emissary-ingress/emissary/v3/python/kat/harness.py

Documentation: github.com/emissary-ingress/emissary/v3/python/kat

     1import base64
     2import fnmatch
     3import functools
     4import inspect
     5import json
     6import os
     7import subprocess
     8import sys
     9import threading
    10import time
    11import traceback
    12from abc import ABC
    13from collections import OrderedDict
    14from functools import singledispatch
    15from hashlib import sha256
    16from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Type, Union, cast
    17
    18import pytest
    19import yaml as pyyaml
    20from packaging import version
    21from yaml.parser import ParserError as YAMLParseError
    22from yaml.scanner import ScannerError as YAMLScanError
    23
    24import tests.integration.manifests as integration_manifests
    25from ambassador.utils import parse_bool
    26from tests.manifests import cleartext_host_manifest, default_listener_manifest
    27
    28from .parser import SequenceView, Tag, dump, load
    29from .utils import ShellCommand
    30
    31pyyaml_loader: Any = pyyaml.SafeLoader
    32pyyaml_dumper: Any = pyyaml.SafeDumper
    33
    34try:
    35    pyyaml_loader = pyyaml.CSafeLoader
    36    pyyaml_dumper = pyyaml.CSafeDumper
    37except AttributeError:
    38    pass
    39
    40# We may have a SOURCE_ROOT override from the environment
    41SOURCE_ROOT = os.environ.get("SOURCE_ROOT", "")
    42
    43# Figure out if we're running in Edge Stack or what.
    44if os.path.exists("/buildroot/apro.version"):
    45    # We let /buildroot/apro.version remain a source of truth to minimize the
    46    # chances that we break anything that currently uses the builder shell.
    47    EDGE_STACK = True
    48else:
    49    # If we do not see concrete evidence of running in an apro builder shell,
    50    # then try to decide if the user wants us to assume we're running Edge Stack
    51    # from an environment variable. And if that isn't set, just assume OSS.
    52    EDGE_STACK = parse_bool(os.environ.get("EDGE_STACK", "false"))
    53
    54if EDGE_STACK:
    55    # Hey look, we're running inside Edge Stack!
    56    print("RUNNING IN EDGE STACK")
    57    # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
    58    # running in a build shell and we should look for sources in the usual location.
    59    if not SOURCE_ROOT:
    60        SOURCE_ROOT = "/buildroot/apro"
    61else:
    62    # We're either not running in Edge Stack or we're not sure, so just assume OSS.
    63    print("RUNNING IN OSS")
    64    # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
    65    # running in a build shell and we should look for sources in the usual location.
    66    if not SOURCE_ROOT:
    67        SOURCE_ROOT = "/buildroot/ambassador"
    68
    69
    70def run(cmd):
    71    status = os.system(cmd)
    72    if status != 0:
    73        raise RuntimeError("command failed[%s]: %s" % (status, cmd))
    74
    75
    76def kube_version_json():
    77    result = subprocess.Popen(
    78        "tools/bin/kubectl version -o json", stdout=subprocess.PIPE, shell=True
    79    )
    80    stdout, _ = result.communicate()
    81    return json.loads(stdout)
    82
    83
    84def strip_version(ver: str):
    85    """
    86    strip_version is needed to strip a major/minor version of non-standard symbols. For example, when working with GKE,
    87    `kubectl version` returns a minor version like '14+', which is not semver or any standard version, for that matter.
    88    So we handle exceptions like that here.
    89    :param ver: version string
    90    :return: stripped version
    91    """
    92
    93    try:
    94        return int(ver)
    95    except ValueError as e:
    96        # GKE returns weird versions with '+' in the end
    97        if ver[-1] == "+":
    98            return int(ver[:-1])
    99
   100        # If we still have not taken care of this, raise the error
   101        raise ValueError(e)
   102
   103
   104def kube_server_version(version_json=None):
   105    if not version_json:
   106        version_json = kube_version_json()
   107
   108    server_json = version_json.get("serverVersion", {})
   109
   110    if server_json:
   111        server_major = strip_version(server_json.get("major", None))
   112        server_minor = strip_version(server_json.get("minor", None))
   113
   114        return f"{server_major}.{server_minor}"
   115    else:
   116        return None
   117
   118
   119def kube_client_version(version_json=None):
   120    if not version_json:
   121        version_json = kube_version_json()
   122
   123    client_json = version_json.get("clientVersion", {})
   124
   125    if client_json:
   126        client_major = strip_version(client_json.get("major", None))
   127        client_minor = strip_version(client_json.get("minor", None))
   128
   129        return f"{client_major}.{client_minor}"
   130    else:
   131        return None
   132
   133
   134def is_kube_server_client_compatible(
   135    debug_desc: str, requested_server_version: str, requested_client_version: str
   136) -> bool:
   137    is_cluster_compatible = True
   138    kube_json = kube_version_json()
   139
   140    server_version = kube_server_version(kube_json)
   141    client_version = kube_client_version(kube_json)
   142
   143    if server_version:
   144        if version.parse(server_version) < version.parse(requested_server_version):
   145            print(f"server version {server_version} is incompatible with {debug_desc}")
   146            is_cluster_compatible = False
   147        else:
   148            print(f"server version {server_version} is compatible with {debug_desc}")
   149    else:
   150        print("could not determine Kubernetes server version?")
   151
   152    if client_version:
   153        if version.parse(client_version) < version.parse(requested_client_version):
   154            print(f"client version {client_version} is incompatible with {debug_desc}")
   155            is_cluster_compatible = False
   156        else:
   157            print(f"client version {client_version} is compatible with {debug_desc}")
   158    else:
   159        print("could not determine Kubernetes client version?")
   160
   161    return is_cluster_compatible
   162
   163
   164def is_ingress_class_compatible() -> bool:
   165    return is_kube_server_client_compatible("IngressClass", "1.18", "1.14")
   166
   167
   168def is_knative_compatible() -> bool:
   169    return is_kube_server_client_compatible("Knative", "1.14", "1.14")
   170
   171
   172def get_digest(data: str) -> str:
   173    s = sha256()
   174    s.update(data.encode("utf-8"))
   175    return s.hexdigest()
   176
   177
   178def has_changed(data: str, path: str) -> Tuple[bool, str]:
   179    cur_size = len(data.strip()) if data else 0
   180    cur_hash = get_digest(data)
   181
   182    # print(f'has_changed: data size {cur_size} - {cur_hash}')
   183
   184    prev_data = None
   185    changed = True
   186    reason = f"no {path} present"
   187
   188    if os.path.exists(path):
   189        with open(path) as f:
   190            prev_data = f.read()
   191
   192    prev_size = len(prev_data.strip()) if prev_data else 0
   193    prev_hash = None
   194
   195    if prev_data:
   196        prev_hash = get_digest(prev_data)
   197
   198    # print(f'has_changed: prev_data size {prev_size} - {prev_hash}')
   199
   200    if data:
   201        if data != prev_data:
   202            reason = f"different data in {path}"
   203        else:
   204            changed = False
   205            reason = f"same data in {path}"
   206
   207        if changed:
   208            # print(f'has_changed: updating {path}')
   209            with open(path, "w") as f:
   210                f.write(data)
   211
   212    # For now, we always have to reapply with split testing.
   213    if not changed:
   214        changed = True
   215        reason = "always reapply for split test"
   216
   217    return (changed, reason)
   218
   219
   220COUNTERS: Dict[Type, int] = {}
   221
   222SANITIZATIONS = OrderedDict(
   223    (
   224        ("://", "SCHEME"),
   225        (":", "COLON"),
   226        (" ", "SPACE"),
   227        ("/t", "TAB"),
   228        (".", "DOT"),
   229        ("?", "QMARK"),
   230        ("/", "SLASH"),
   231    )
   232)
   233
   234
   235def sanitize(obj):
   236    if isinstance(obj, str):
   237        for k, v in SANITIZATIONS.items():
   238            if obj.startswith(k):
   239                obj = obj.replace(k, v + "-")
   240            elif obj.endswith(k):
   241                obj = obj.replace(k, "-" + v)
   242            else:
   243                obj = obj.replace(k, "-" + v + "-")
   244        return obj
   245    elif isinstance(obj, dict):
   246        if "value" in obj:
   247            return obj["value"]
   248        else:
   249            return "-".join("%s-%s" % (sanitize(k), sanitize(v)) for k, v in sorted(obj.items()))
   250    else:
   251        cls = obj.__class__
   252        count = COUNTERS.get(cls, 0)
   253        COUNTERS[cls] = count + 1
   254        if count == 0:
   255            return cls.__name__
   256        else:
   257            return "%s-%s" % (cls.__name__, count)
   258
   259
   260def abstract_test(cls: type):
   261    cls.abstract_test = True  # type: ignore
   262    return cls
   263
   264
   265def get_nodes(node_type: type):
   266    if not inspect.isabstract(node_type) and not node_type.__dict__.get("abstract_test", False):
   267        yield node_type
   268    for sc in node_type.__subclasses__():
   269        if not sc.__dict__.get("skip_variant", False):
   270            for ssc in get_nodes(sc):
   271                yield ssc
   272
   273
   274def variants(cls, *args, **kwargs) -> Tuple[Any]:
   275    return tuple(a for n in get_nodes(cls) for a in n.variants(*args, **kwargs))  # type: ignore
   276
   277
   278class Name(NamedTuple):
   279    name: str
   280    namespace: str
   281
   282    @property
   283    def k8s(self) -> str:
   284        return self.name.replace(".", "-").lower()
   285
   286    @property
   287    def fqdn(self) -> str:
   288        return ".".join([self.k8s, self.namespace, "svc", "cluster", "local"])
   289
   290
   291class NodeLocal(threading.local):
   292    current: Optional["Node"]
   293
   294    def __init__(self):
   295        self.current = None
   296
   297
   298_local = NodeLocal()
   299
   300
   301def _argprocess(o):
   302    if isinstance(o, Node):
   303        return o.clone()
   304    elif isinstance(o, tuple):
   305        return tuple(_argprocess(i) for i in o)
   306    elif isinstance(o, list):
   307        return [_argprocess(i) for i in o]
   308    elif isinstance(o, dict):
   309        return {_argprocess(k): _argprocess(v) for k, v in o.items()}
   310    else:
   311        return o
   312
   313
   314class Node(ABC):
   315
   316    parent: Optional["Node"]
   317    children: List["Node"]
   318    name: str
   319    ambassador_id: str
   320    namespace: str
   321    is_ambassador = False
   322    xfail: Optional[str]
   323
   324    def __init__(
   325        self,
   326        *args,
   327        name: Optional[str] = None,
   328        namespace: Optional[str] = None,
   329        _clone: Optional["Node"] = None,
   330        **kwargs,
   331    ) -> None:
   332        # If self.skip is set to true, this node is skipped
   333        self.skip_node = False
   334        self.xfail: Optional[str] = None
   335
   336        if _clone:
   337            args = _clone._args  # type: ignore
   338            kwargs = _clone._kwargs  # type: ignore
   339            if name:
   340                name = "-".join((_clone.name, name))
   341            else:
   342                name = _clone.name
   343            self._args = _clone._args  # type: ignore
   344            self._kwargs = _clone._kwargs  # type: ignore
   345        else:
   346            self._args = args
   347            self._kwargs = kwargs
   348            if name:
   349                name = "-".join((self.__class__.__name__, name))
   350            else:
   351                name = self.__class__.__name__
   352
   353        saved = _local.current
   354        self.parent = _local.current
   355
   356        if namespace:
   357            self.namespace = namespace
   358        if not getattr(self, "namespace", ""):
   359            # We do the above `getattr` instead of just an `else` because subclasses might set a
   360            # default namespace; so `self.namespace` might already be set before calling __init__().
   361            if self.parent and self.parent.namespace:
   362                # We have no namespace assigned, but our parent does have a namespace
   363                # defined. Copy the namespace down from our parent.
   364                self.namespace = self.parent.namespace
   365            else:
   366                self.namespace = "default"
   367
   368        _local.current = self
   369        self.children = []
   370        if self.parent is not None:
   371            self.parent.children.append(self)
   372        try:
   373            init = getattr(self, "init", lambda *a, **kw: None)
   374            init(*_argprocess(args), **_argprocess(kwargs))
   375        finally:
   376            _local.current = saved
   377
   378        # This has to come after the above to init(), because the format-string might reference
   379        # things that get set by init().
   380        self.name = self.format(name)
   381
   382        names = {}  # type: ignore
   383        for c in self.children:
   384            assert (
   385                c.name not in names
   386            ), "test %s of type %s has duplicate children: %s of type %s, %s" % (
   387                self.name,
   388                self.__class__.__name__,
   389                c.name,
   390                c.__class__.__name__,
   391                names[c.name].__class__.__name__,
   392            )
   393            names[c.name] = c
   394
   395    def clone(self, name=None):
   396        return self.__class__(_clone=self, name=name)
   397
   398    @classmethod
   399    def variants(cls):
   400        yield cls()
   401
   402    @property
   403    def path(self) -> Name:
   404        if self.parent is None:
   405            return Name(self.name, self.namespace)
   406        else:
   407            return Name(self.parent.path.name + "." + self.name, self.namespace)
   408
   409    @property
   410    def traversal(self):
   411        yield self
   412        for c in self.children:
   413            for d in c.traversal:
   414                yield d
   415
   416    @property
   417    def ancestors(self):
   418        yield self
   419        if self.parent is not None:
   420            for a in self.parent.ancestors:
   421                yield a
   422
   423    @property
   424    def depth(self):
   425        if self.parent is None:
   426            return 0
   427        else:
   428            return self.parent.depth + 1
   429
   430    def format(self, st, **kwargs):
   431        return integration_manifests.format(st, self=self, **kwargs)
   432
   433    @functools.lru_cache()
   434    def matches(self, pattern):
   435        if fnmatch.fnmatch(self.path.name, "*%s*" % pattern):
   436            return True
   437        for c in self.children:
   438            if c.matches(pattern):
   439                return True
   440        return False
   441
   442    def requirements(self):
   443        yield from ()
   444
   445    # log_kube_artifacts writes various logs about our underlying Kubernetes objects to
   446    # a place where the artifact publisher can find them. See run-tests.sh.
   447    def log_kube_artifacts(self):
   448        if not getattr(self, "already_logged", False):
   449            self.already_logged = True
   450
   451            print(f"logging kube artifacts for {self.path.k8s}")
   452            sys.stdout.flush()
   453
   454            DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
   455
   456            log_path = f"/tmp/kat-logs-{self.path.k8s}"
   457
   458            if DEV:
   459                os.system(f"docker logs {self.path.k8s} >{log_path} 2>&1")
   460            else:
   461                os.system(
   462                    f"tools/bin/kubectl logs -n {self.namespace} {self.path.k8s} >{log_path} 2>&1"
   463                )
   464
   465                event_path = f"/tmp/kat-events-{self.path.k8s}"
   466
   467                fs1 = f"involvedObject.name={self.path.k8s}"
   468                fs2 = f"involvedObject.namespace={self.namespace}"
   469
   470                cmd = f'tools/bin/kubectl get events -o json --field-selector "{fs1}" --field-selector "{fs2}"'
   471                os.system(f'echo ==== "{cmd}" >{event_path}')
   472                os.system(f"{cmd} >>{event_path} 2>&1")
   473
   474
   475class Test(Node):
   476
   477    results: List["Result"] = []
   478    pending: List["Query"] = []
   479    queried: List["Query"] = []
   480
   481    __test__ = False
   482
   483    def config(self):
   484        yield from ()
   485
   486    def manifests(self):
   487        return None
   488
   489    def queries(self):
   490        yield from ()
   491
   492    def check(self):
   493        pass
   494
   495    @property
   496    def ambassador_id(self):
   497        if self.parent is None:
   498            return self.path.k8s
   499        else:
   500            return self.parent.ambassador_id
   501
   502
   503@singledispatch
   504def encode_body(obj):
   505    return encode_body(json.dumps(obj))
   506
   507
   508@encode_body.register
   509def encode_body_bytes(b: bytes):
   510    return base64.encodebytes(b).decode("utf-8")
   511
   512
   513@encode_body.register
   514def encode_body_str(s: str):
   515    return encode_body(s.encode("utf-8"))
   516
   517
   518class Query:
   519    def __init__(
   520        self,
   521        url,
   522        expected=None,
   523        method="GET",
   524        headers=None,
   525        messages=None,
   526        insecure=False,
   527        skip=None,
   528        xfail=None,
   529        phase=1,
   530        debug=False,
   531        sni=False,
   532        error=None,
   533        client_crt=None,
   534        client_key=None,
   535        client_cert_required=False,
   536        ca_cert=None,
   537        grpc_type=None,
   538        cookies=None,
   539        ignore_result=False,
   540        body=None,
   541        minTLSv="",
   542        maxTLSv="",
   543        cipherSuites=[],
   544        ecdhCurves=[],
   545    ):
   546        self.method = method
   547        self.url = url
   548        self.headers = headers
   549        self.body = body
   550        self.cookies = cookies
   551        self.messages = messages
   552        self.insecure = insecure
   553        self.minTLSv = minTLSv
   554        self.maxTLSv = maxTLSv
   555        self.cipherSuites = cipherSuites
   556        self.ecdhCurves = ecdhCurves
   557        if expected is None:
   558            if url.lower().startswith("ws:"):
   559                self.expected = 101
   560            else:
   561                self.expected = 200
   562        else:
   563            self.expected = expected
   564        self.skip = skip
   565        self.xfail = xfail
   566        self.ignore_result = ignore_result
   567        self.phase = phase
   568        self.parent = None
   569        self.result = None
   570        self.debug = debug
   571        self.sni = sni
   572        self.error = error
   573        self.client_cert_required = client_cert_required
   574        self.client_cert = client_crt
   575        self.client_key = client_key
   576        self.ca_cert = ca_cert
   577        assert grpc_type in (None, "real", "bridge", "web"), grpc_type
   578        self.grpc_type = grpc_type
   579
   580    def as_json(self):
   581        assert self.parent
   582        result = {
   583            "test": self.parent.path.name,
   584            "id": id(self),
   585            "url": self.url,
   586            "insecure": self.insecure,
   587        }
   588        if self.sni:
   589            result["sni"] = self.sni
   590        if self.method:
   591            result["method"] = self.method
   592        if self.method:
   593            result["maxTLSv"] = self.maxTLSv
   594        if self.method:
   595            result["minTLSv"] = self.minTLSv
   596        if self.cipherSuites:
   597            result["cipherSuites"] = self.cipherSuites
   598        if self.ecdhCurves:
   599            result["ecdhCurves"] = self.ecdhCurves
   600        if self.headers:
   601            result["headers"] = self.headers
   602        if self.body is not None:
   603            result["body"] = encode_body(self.body)
   604        if self.cookies:
   605            result["cookies"] = self.cookies
   606        if self.messages is not None:
   607            result["messages"] = self.messages
   608        if self.client_cert is not None:
   609            result["client_cert"] = self.client_cert
   610        if self.client_key is not None:
   611            result["client_key"] = self.client_key
   612        if self.ca_cert is not None:
   613            result["ca_cert"] = self.ca_cert
   614        if self.client_cert_required:
   615            result["client_cert_required"] = self.client_cert_required
   616        if self.grpc_type:
   617            result["grpc_type"] = self.grpc_type
   618
   619        return result
   620
   621
   622class Result:
   623    body: Optional[bytes]
   624
   625    def __init__(self, query, res):
   626        self.query = query
   627        query.result = self
   628        self.parent = query.parent
   629        self.status = res.get("status")
   630        self.headers = res.get("headers")
   631        self.messages = res.get("messages")
   632        self.tls = res.get("tls")
   633        if "body" in res:
   634            self.body = base64.decodebytes(bytes(res["body"], "ASCII"))
   635        else:
   636            self.body = None
   637        self.text = res.get("text")
   638        self.json = res.get("json")
   639        self.backend = BackendResult(self.json) if self.json else None
   640        self.error = res.get("error")
   641
   642    def __repr__(self):
   643        return str(self.as_dict())
   644
   645    def check(self):
   646        if self.query.skip:
   647            pytest.skip(self.query.skip)
   648
   649        if self.query.xfail:
   650            pytest.xfail(self.query.xfail)
   651
   652        if not self.query.ignore_result:
   653            if self.query.error is not None:
   654                found = False
   655                errors = self.query.error
   656
   657                if isinstance(self.query.error, str):
   658                    errors = [self.query.error]
   659
   660                if self.error is not None:
   661                    for error in errors:
   662                        if error in self.error:
   663                            found = True
   664                            break
   665
   666                assert found, "{}: expected error to contain any of {}; got {} instead".format(
   667                    self.query.url,
   668                    ", ".join(["'%s'" % x for x in errors]),
   669                    ("'%s'" % self.error) if self.error else "no error",
   670                )
   671            else:
   672                if isinstance(self.query.expected, list):
   673                    if self.status not in self.query.expected:
   674                        self.parent.log_kube_artifacts()
   675                    assert (
   676                        self.status in self.query.expected
   677                    ), "%s: expected status code %s, got %s instead with error %s" % (
   678                        self.query.url,
   679                        self.query.expected,
   680                        self.status,
   681                        self.error,
   682                    )
   683                else:
   684
   685                    if self.query.expected != self.status:
   686                        self.parent.log_kube_artifacts()
   687                    assert (
   688                        self.query.expected == self.status
   689                    ), "%s: expected status code %s, got %s instead with error %s" % (
   690                        self.query.url,
   691                        self.query.expected,
   692                        self.status,
   693                        self.error,
   694                    )
   695
   696    def as_dict(self) -> Dict[str, Any]:
   697        od = {
   698            "query": self.query.as_json(),
   699            "status": self.status,
   700            "error": self.error,
   701            "headers": self.headers,
   702        }
   703
   704        if self.backend and self.backend.name:
   705            od["backend"] = self.backend.as_dict()
   706        else:
   707            od["json"] = self.json
   708            od["text"] = self.text
   709
   710        return od
   711
   712        # 'RENDERED': {
   713        #     'client': {
   714        #         'request': self.query.as_json(),
   715        #         'response': {
   716        #             'status': self.status,
   717        #             'error': self.error,
   718        #             'headers': self.headers
   719        #         }
   720        #     },
   721        #     'upstream': {
   722        #         'name': self.backend.name,
   723        #         'request': {
   724        #             'headers': self.backend.request.headers,
   725        #             'url': {
   726        #                 'fragment': self.backend.request.url.fragment,
   727        #                 'host': self.backend.request.url.host,
   728        #                 'opaque': self.backend.request.url.opaque,
   729        #                 'path': self.backend.request.url.path,
   730        #                 'query': self.backend.request.url.query,
   731        #                 'rawQuery': self.backend.request.url.rawQuery,
   732        #                 'scheme': self.backend.request.url.scheme,
   733        #                 'username': self.backend.request.url.username,
   734        #                 'password': self.backend.request.url.password,
   735        #             },
   736        #             'host': self.backend.request.host,
   737        #             'tls': {
   738        #                 'enabled': self.backend.request.tls.enabled,
   739        #                 'server_name': self.backend.request.tls.server_name,
   740        #                 'version': self.backend.request.tls.version,
   741        #                 'negotiated_protocol': self.backend.request.tls.negotiated_protocol,
   742        #             },
   743        #         },
   744        #         'response': {
   745        #             'headers': self.backend.response.headers
   746        #         }
   747        #     }
   748        # }
   749
   750
   751class BackendURL:
   752    def __init__(
   753        self,
   754        fragment=None,
   755        host=None,
   756        opaque=None,
   757        path=None,
   758        query=None,
   759        rawQuery=None,
   760        scheme=None,
   761        username=None,
   762        password=None,
   763    ):
   764        self.fragment = fragment
   765        self.host = host
   766        self.opaque = opaque
   767        self.path = path
   768        self.query = query
   769        self.rawQuery = rawQuery
   770        self.scheme = scheme
   771        self.username = username
   772        self.password = password
   773
   774    def as_dict(self) -> Dict["str", Any]:
   775        return {
   776            "fragment": self.fragment,
   777            "host": self.host,
   778            "opaque": self.opaque,
   779            "path": self.path,
   780            "query": self.query,
   781            "rawQuery": self.rawQuery,
   782            "scheme": self.scheme,
   783            "username": self.username,
   784            "password": self.password,
   785        }
   786
   787
   788class BackendRequest:
   789    def __init__(self, req):
   790        self.url = BackendURL(**req.get("url"))
   791        self.headers = req.get("headers", {})
   792        self.host = req.get("host", None)
   793        self.tls = BackendTLS(req.get("tls", {}))
   794
   795    def as_dict(self) -> Dict[str, Any]:
   796        od = {
   797            "headers": self.headers,
   798            "host": self.host,
   799        }
   800
   801        if self.url:
   802            od["url"] = self.url.as_dict()
   803
   804        if self.tls:
   805            od["tls"] = self.tls.as_dict()
   806
   807        return od
   808
   809
   810class BackendTLS:
   811    def __init__(self, tls):
   812        self.enabled = tls["enabled"]
   813        self.server_name = tls.get("server-name")
   814        self.version = tls.get("version")
   815        self.negotiated_protocol = tls.get("negotiated-protocol")
   816        self.negotiated_protocol_version = tls.get("negotiated-protocol-version")
   817
   818    def as_dict(self) -> Dict[str, Any]:
   819        return {
   820            "enabled": self.enabled,
   821            "server_name": self.server_name,
   822            "version": self.version,
   823            "negotiated_protocol": self.negotiated_protocol,
   824            "negotiated_protocol_version": self.negotiated_protocol_version,
   825        }
   826
   827
   828class BackendResponse:
   829    def __init__(self, resp):
   830        self.headers = resp.get("headers", {})
   831
   832    def as_dict(self) -> Dict[str, Any]:
   833        return {"headers": self.headers}
   834
   835
   836def dictify(obj):
   837    if getattr(obj, "as_dict", None):
   838        return obj.as_dict()
   839    else:
   840        return obj
   841
   842
   843class BackendResult:
   844    def __init__(self, bres):
   845        self.name = "raw"
   846        self.request = None
   847        self.response = bres
   848
   849        if isinstance(bres, dict):
   850            self.name = cast(str, bres.get("backend"))
   851            self.request = BackendRequest(bres["request"]) if "request" in bres else None
   852            self.response = BackendResponse(bres["response"]) if "response" in bres else None
   853
   854    def as_dict(self) -> Dict[str, Any]:
   855        od = {"name": self.name}
   856
   857        if self.request:
   858            od["request"] = dictify(self.request)
   859
   860        if self.response:
   861            od["response"] = dictify(self.response)
   862
   863        return od
   864
   865
   866def label(yaml, scope):
   867    for obj in yaml:
   868        md = obj["metadata"]
   869
   870        if "labels" not in md:
   871            md["labels"] = {}
   872
   873        obj["metadata"]["labels"]["scope"] = scope
   874    return yaml
   875
   876
   877CLIENT_GO = "kat_client"
   878
   879
   880def run_queries(name: str, queries: Sequence[Query]) -> Sequence[Result]:
   881    jsonified = []
   882    byid = {}
   883
   884    for q in queries:
   885        jsonified.append(q.as_json())
   886        byid[id(q)] = q
   887
   888    path_urls = f"/tmp/kat-client-{name}-urls.json"
   889    path_results = f"/tmp/kat-client-{name}-results.json"
   890    path_log = f"/tmp/kat-client-{name}.log"
   891
   892    with open(path_urls, "w") as f:
   893        json.dump(jsonified, f)
   894
   895    # run(f"{CLIENT_GO} -input {path_urls} -output {path_results} 2> {path_log}")
   896    res = ShellCommand.run(
   897        "Running queries",
   898        f"tools/bin/kubectl exec -n default -i kat -- /work/kat_client < '{path_urls}' > '{path_results}' 2> '{path_log}'",
   899        shell=True,
   900    )
   901
   902    if not res:
   903        ret = [Result(q, {"error": "Command execution error"}) for q in queries]
   904        return ret
   905
   906    with open(path_results, "r") as f:
   907        content = f.read()
   908        try:
   909            json_results = json.loads(content)
   910        except Exception as e:
   911            ret = [
   912                Result(q, {"error": "Could not parse JSON content after running KAT queries"})
   913                for q in queries
   914            ]
   915            return ret
   916
   917    results = []
   918
   919    for r in json_results:
   920        res = r["result"]
   921        q = byid[r["id"]]
   922        results.append(Result(q, res))
   923
   924    return results
   925
   926
   927# yuck
   928DOCTEST = False
   929
   930
   931class Superpod:
   932    def __init__(self, namespace: str) -> None:
   933        self.namespace = namespace
   934        self.next_clear = 8080
   935        self.next_tls = 8443
   936        self.service_names: Dict[int, str] = {}
   937        self.name = "superpod-%s" % (self.namespace or "default")
   938
   939    def allocate(self, service_name) -> List[int]:
   940        ports = [self.next_clear, self.next_tls]
   941        self.service_names[self.next_clear] = service_name
   942        self.service_names[self.next_tls] = service_name
   943
   944        self.next_clear += 1
   945        self.next_tls += 1
   946
   947        return ports
   948
   949    def get_manifest_list(self) -> List[Dict[str, Any]]:
   950        manifest = load(
   951            "superpod",
   952            integration_manifests.format(integration_manifests.load("superpod_pod")),
   953            Tag.MAPPING,
   954        )
   955
   956        assert len(manifest) == 1, "SUPERPOD manifest must have exactly one object"
   957
   958        m = manifest[0]
   959
   960        template = m["spec"]["template"]
   961
   962        ports: List[Dict[str, int]] = []
   963        envs: List[Dict[str, Union[str, int]]] = template["spec"]["containers"][0]["env"]
   964
   965        for p in sorted(self.service_names.keys()):
   966            ports.append({"containerPort": p})
   967            envs.append({"name": f"BACKEND_{p}", "value": self.service_names[p]})
   968
   969        template["spec"]["containers"][0]["ports"] = ports
   970
   971        if "metadata" not in m:
   972            m["metadata"] = {}
   973
   974        metadata = m["metadata"]
   975        metadata["name"] = self.name
   976
   977        m["spec"]["selector"]["matchLabels"]["backend"] = self.name
   978        template["metadata"]["labels"]["backend"] = self.name
   979
   980        if self.namespace:
   981            # Fix up the namespace.
   982            if "namespace" not in metadata:
   983                metadata["namespace"] = self.namespace
   984
   985        return list(manifest)
   986
   987
   988class Runner:
   989    def __init__(self, *classes, scope=None):
   990        self.scope = scope or "-".join(c.__name__ for c in classes)
   991        self.roots = tuple(v for c in classes for v in variants(c))
   992        self.nodes = [n for r in self.roots for n in r.traversal if not n.skip_node]
   993        self.tests = [n for n in self.nodes if isinstance(n, Test)]
   994        self.ids = [t.path.name for t in self.tests]
   995        self.done = False
   996        self.ids_to_strip: Dict[str, bool] = {}
   997        self.names_to_ignore: Dict[str, bool] = {}
   998
   999        @pytest.mark.parametrize("t", self.tests, ids=self.ids)
  1000        def test(request, capsys, t):
  1001            if t.xfail:
  1002                pytest.xfail(t.xfail)
  1003            else:
  1004                selected = set(
  1005                    item.callspec.getparam("t")
  1006                    for item in request.session.items
  1007                    if item.function == test
  1008                )
  1009
  1010                with capsys.disabled():
  1011                    self.setup(selected)
  1012
  1013                # XXX: should aggregate the result of url checks
  1014                i = 0
  1015                for r in t.results:
  1016                    try:
  1017                        r.check()
  1018                    except AssertionError as e:
  1019                        # Add some context so that you can tell which query is failing.
  1020                        e.args = (f"query[{i}]: {e.args[0]}", *e.args[1:])
  1021                        raise
  1022                    i += 1
  1023
  1024                t.check()
  1025
  1026        self.__func__ = test
  1027        self.__test__ = True
  1028
  1029    def __call__(self):
  1030        assert False, "this is here for py.test discovery purposes only"
  1031
  1032    def setup(self, selected):
  1033        if not self.done:
  1034            if not DOCTEST:
  1035                print()
  1036
  1037            expanded_up = set(selected)
  1038
  1039            for s in selected:
  1040                for n in s.ancestors:
  1041                    if not n.xfail:
  1042                        expanded_up.add(n)
  1043
  1044            expanded = set(expanded_up)
  1045
  1046            for s in selected:
  1047                for n in s.traversal:
  1048                    if not n.xfail:
  1049                        expanded.add(n)
  1050
  1051            try:
  1052                self._setup_k8s(expanded)
  1053                self._query(expanded_up)
  1054            except:
  1055                traceback.print_exc()
  1056                pytest.exit("setup failed")
  1057            finally:
  1058                self.done = True
  1059
  1060    def get_manifests_and_namespaces(self, selected) -> Tuple[Any, List[str]]:
  1061        manifests: OrderedDict[Any, list] = OrderedDict()  # type: ignore
  1062        superpods: Dict[str, Superpod] = {}
  1063        namespaces = []
  1064        for n in (n for n in self.nodes if n in selected and not n.xfail):
  1065            manifest = None
  1066            nsp = None
  1067            ambassador_id = None
  1068
  1069            # print('manifesting for {n.path}')
  1070
  1071            # Walk up the parent chain to find our namespace and ambassador_id.
  1072            cur = n
  1073
  1074            while cur:
  1075                if not nsp:
  1076                    nsp = getattr(cur, "namespace", None)
  1077                    # print(f'... {cur.name} has namespace {nsp}')
  1078
  1079                if not ambassador_id:
  1080                    ambassador_id = getattr(cur, "ambassador_id", None)
  1081                    # print(f'... {cur.name} has ambassador_id {ambassador_id}')
  1082
  1083                if nsp and ambassador_id:
  1084                    # print(f'... good for namespace and ambassador_id')
  1085                    break
  1086
  1087                cur = cur.parent
  1088
  1089            # OK. Does this node want to use a superpod?
  1090            if getattr(n, "use_superpod", False):
  1091                # Yup. OK. Do we already have a superpod for this namespace?
  1092                superpod = superpods.get(nsp, None)  # type: ignore
  1093
  1094                if not superpod:
  1095                    # We don't have one, so we need to create one.
  1096                    superpod = Superpod(nsp)  # type: ignore
  1097                    superpods[nsp] = superpod  # type: ignore
  1098
  1099                # print(f'superpodifying {n.name}')
  1100
  1101                # Next up: use the backend_service.yaml manifest as a template...
  1102                yaml = n.format(integration_manifests.load("backend_service"))
  1103                manifest = load(n.path, yaml, Tag.MAPPING)
  1104
  1105                assert (
  1106                    len(manifest) == 1
  1107                ), "backend_service.yaml manifest must have exactly one object"
  1108
  1109                m = manifest[0]
  1110
  1111                # Update the manifest's selector...
  1112                m["spec"]["selector"]["backend"] = superpod.name
  1113
  1114                # ...and labels if needed...
  1115                if ambassador_id:
  1116                    m["metadata"]["labels"] = {"kat-ambassador-id": ambassador_id}
  1117
  1118                # ...and target ports.
  1119                superpod_ports = superpod.allocate(n.path.k8s)
  1120
  1121                m["spec"]["ports"][0]["targetPort"] = superpod_ports[0]
  1122                m["spec"]["ports"][1]["targetPort"] = superpod_ports[1]
  1123            else:
  1124                # The non-superpod case...
  1125                yaml = n.manifests()
  1126
  1127                if yaml is not None:
  1128                    is_plain_test = n.path.k8s.startswith("plain-")
  1129
  1130                    if n.is_ambassador and not is_plain_test:
  1131                        add_default_http_listener = getattr(n, "add_default_http_listener", True)
  1132                        add_default_https_listener = getattr(n, "add_default_https_listener", True)
  1133                        add_cleartext_host = getattr(n, "edge_stack_cleartext_host", False)
  1134
  1135                        if add_default_http_listener:
  1136                            # print(f"{n.path.k8s} adding default HTTP Listener")
  1137                            yaml += default_listener_manifest % {
  1138                                "namespace": nsp,
  1139                                "port": 8080,
  1140                                "protocol": "HTTPS",
  1141                                "securityModel": "XFP",
  1142                            }
  1143
  1144                        if add_default_https_listener:
  1145                            # print(f"{n.path.k8s} adding default HTTPS Listener")
  1146                            yaml += default_listener_manifest % {
  1147                                "namespace": nsp,
  1148                                "port": 8443,
  1149                                "protocol": "HTTPS",
  1150                                "securityModel": "XFP",
  1151                            }
  1152
  1153                        if EDGE_STACK and add_cleartext_host:
  1154                            # print(f"{n.path.k8s} adding Host")
  1155
  1156                            host_yaml = cleartext_host_manifest % nsp
  1157                            yaml += host_yaml
  1158
  1159                    yaml = n.format(yaml)
  1160
  1161                    try:
  1162                        manifest = load(n.path, yaml, Tag.MAPPING)
  1163                    except Exception as e:
  1164                        print(f"parse failure! {e}")
  1165                        print(yaml)
  1166
  1167            if manifest:
  1168                # print(manifest)
  1169
  1170                # Make sure namespaces and labels are properly set.
  1171                for m in manifest:
  1172                    if "metadata" not in m:
  1173                        m["metadata"] = {}
  1174
  1175                    metadata = m["metadata"]
  1176
  1177                    if "labels" not in metadata:
  1178                        metadata["labels"] = {}
  1179
  1180                    if ambassador_id:
  1181                        metadata["labels"]["kat-ambassador-id"] = ambassador_id
  1182
  1183                    if nsp:
  1184                        if "namespace" not in metadata:
  1185                            metadata["namespace"] = nsp
  1186
  1187                # ...and, finally, save the manifest list.
  1188                manifests[n] = list(manifest)
  1189                if str(nsp) not in namespaces:
  1190                    namespaces.append(str(nsp))
  1191
  1192        for superpod in superpods.values():
  1193            manifests[superpod] = superpod.get_manifest_list()
  1194
  1195        return manifests, namespaces
  1196
  1197    def _setup_k8s(self, selected):
  1198        # First up, get the full manifest and save it to disk.
  1199        manifests, namespaces = self.get_manifests_and_namespaces(selected)
  1200
  1201        configs: Dict[Node, List[Tuple[str, SequenceView]]] = OrderedDict()
  1202        for n in (n for n in self.nodes if n in selected and not n.xfail):
  1203            configs[n] = []
  1204            for cfg in n.config():
  1205                if isinstance(cfg, str):
  1206                    parent_config = configs[n.parent][0][1][0]
  1207
  1208                    try:
  1209                        for o in load(n.path, cfg, Tag.MAPPING):
  1210                            parent_config.merge(o)
  1211                    except (YAMLScanError, YAMLParseError) as e:
  1212                        raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg))
  1213                else:
  1214                    target = cfg[0]
  1215
  1216                    try:
  1217                        yaml_view = load(n.path, cfg[1], Tag.MAPPING)
  1218
  1219                        if n.ambassador_id:
  1220                            for obj in yaml_view:
  1221                                if "ambassador_id" not in obj:
  1222                                    obj["ambassador_id"] = [n.ambassador_id]
  1223
  1224                        configs[n].append((target, yaml_view))
  1225                    except (YAMLScanError, YAMLParseError) as e:
  1226                        raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg[1]))
  1227
  1228        for tgt_cfgs in configs.values():
  1229            for target, cfg in tgt_cfgs:
  1230                for t in target.traversal:
  1231                    if t in manifests:
  1232                        k8s_yaml = manifests[t]
  1233                        for item in k8s_yaml:
  1234                            if item["kind"].lower() == "service":
  1235                                md = item["metadata"]
  1236                                if "annotations" not in md:
  1237                                    md["annotations"] = {}
  1238
  1239                                anns = md["annotations"]
  1240
  1241                                if "getambassador.io/config" in anns:
  1242                                    anns["getambassador.io/config"] += "\n" + dump(cfg)
  1243                                else:
  1244                                    anns["getambassador.io/config"] = dump(cfg)
  1245
  1246                                break
  1247                        else:
  1248                            continue
  1249                        break
  1250                else:
  1251                    assert False, "no service found for target: %s" % target.path.name
  1252
  1253        yaml = ""
  1254
  1255        for v in manifests.values():
  1256            yaml += dump(label(v, self.scope)) + "\n"
  1257
  1258        fname = "/tmp/k8s-%s.yaml" % self.scope
  1259
  1260        self.applied_manifests = False
  1261
  1262        # Always apply at this point, since we're doing the multi-run thing.
  1263        manifest_changed, manifest_reason = has_changed(yaml, fname)
  1264
  1265        # XXX It is _so stupid_ that we're reparsing the whole manifest here.
  1266        xxx_crap = pyyaml.load_all(open(fname, "r").read(), Loader=pyyaml_loader)
  1267
  1268        # Strip things we don't need from the manifest.
  1269        trimmed_manifests = []
  1270        trimmed = 0
  1271        kept = 0
  1272
  1273        for obj in xxx_crap:
  1274            keep = True
  1275
  1276            kind = "-nokind-"
  1277            name = "-noname-"
  1278            metadata: Dict[str, Any] = {}
  1279            labels: Dict[str, str] = {}
  1280            id_to_check: Optional[str] = None
  1281
  1282            if "kind" in obj:
  1283                kind = obj["kind"]
  1284
  1285            if "metadata" in obj:
  1286                metadata = obj["metadata"]
  1287
  1288            if "name" in metadata:
  1289                name = metadata["name"]
  1290
  1291            if "labels" in metadata:
  1292                labels = metadata["labels"]
  1293
  1294            if "kat-ambassador-id" in labels:
  1295                id_to_check = labels["kat-ambassador-id"]
  1296
  1297            # print(f"metadata {metadata} id_to_check {id_to_check} obj {obj}")
  1298
  1299            # Keep namespaces, just in case.
  1300            if kind == "Namespace":
  1301                keep = True
  1302            else:
  1303                if id_to_check and (id_to_check in self.ids_to_strip):
  1304                    keep = False
  1305                    # print(f"...drop {kind} {name} (ID {id_to_check})")
  1306                    self.names_to_ignore[name] = True
  1307
  1308            if keep:
  1309                kept += 1
  1310                trimmed_manifests.append(obj)
  1311            else:
  1312                trimmed += 1
  1313
  1314        if trimmed:
  1315            print(f"After trimming: kept {kept}, trimmed {trimmed}")
  1316
  1317        yaml = pyyaml.dump_all(trimmed_manifests, Dumper=pyyaml_dumper)
  1318
  1319        fname = "/tmp/k8s-%s-trimmed.yaml" % self.scope
  1320
  1321        self.applied_manifests = False
  1322
  1323        # Always apply at this point, since we're doing the multi-run thing.
  1324        manifest_changed, manifest_reason = has_changed(yaml, fname)
  1325
  1326        # First up: CRDs.
  1327        input_crds = integration_manifests.crd_manifests()
  1328        if is_knative_compatible():
  1329            input_crds += integration_manifests.load("knative_serving_crds")
  1330
  1331        # Strip out all of the schema validation, so that we can test with broken CRDs.
  1332        # (KAT isn't really in the business of testing to be sure that Kubernetes can
  1333        # run the K8s validators...)
  1334        crds = pyyaml.load_all(input_crds, Loader=pyyaml_loader)
  1335
  1336        # Collect the CRDs with schema validation stripped in stripped_crds, because
  1337        # pyyaml.load_all actually returns something more complex than a simple list,
  1338        # so it doesn't reserialize well after being modified.
  1339        stripped_crds = []
  1340
  1341        for crd in crds:
  1342            # Guard against empty CRDs (the KNative files have some blank lines at
  1343            # the end).
  1344            if not crd:
  1345                continue
  1346
  1347            if crd["apiVersion"] == "apiextensions.k8s.io/v1":
  1348                # We can't naively strip the schema validation from apiextensions.k8s.io/v1 CRDs
  1349                # because it is required; otherwise the API server would refuse to create the CRD,
  1350                # telling us:
  1351                #
  1352                #     CustomResourceDefinition.apiextensions.k8s.io "…" is invalid: spec.versions[0].schema.openAPIV3Schema: Required value: schemas are required
  1353                #
  1354                # So instead we must replace it with a schema that allows anything.
  1355                for version in crd["spec"]["versions"]:
  1356                    if "schema" in version:
  1357                        version["schema"] = {
  1358                            "openAPIV3Schema": {
  1359                                "type": "object",
  1360                                "properties": {
  1361                                    "apiVersion": {"type": "string"},
  1362                                    "kind": {"type": "string"},
  1363                                    "metadata": {"type": "object"},
  1364                                    "spec": {
  1365                                        "type": "object",
  1366                                        "x-kubernetes-preserve-unknown-fields": True,
  1367                                    },
  1368                                },
  1369                            },
  1370                        }
  1371            elif crd["apiVersion"] == "apiextensions.k8s.io/v1beta1":
  1372                crd["spec"].pop("validation", None)
  1373                for version in crd["spec"]["versions"]:
  1374                    version.pop("schema", None)
  1375            stripped_crds.append(crd)
  1376
  1377        final_crds = pyyaml.dump_all(stripped_crds, Dumper=pyyaml_dumper)
  1378        changed, reason = has_changed(final_crds, "/tmp/k8s-CRDs.yaml")
  1379
  1380        if changed:
  1381            print(f"CRDS changed ({reason}), applying.")
  1382            if not ShellCommand.run_with_retry(
  1383                "Apply CRDs",
  1384                "tools/bin/kubectl",
  1385                "apply",
  1386                "-f",
  1387                "/tmp/k8s-CRDs.yaml",
  1388                retries=5,
  1389                sleep_seconds=10,
  1390            ):
  1391                raise RuntimeError("Failed applying CRDs")
  1392
  1393            print("waiting for emissary-apiext server to become available")
  1394            if os.system(
  1395                "kubectl wait --timeout=90s --for=condition=available deployment emissary-apiext -n emissary-system > /dev/null 2>&1"
  1396            ):
  1397                raise RuntimeError(
  1398                    "emissary-apiext server did not become available within 90 seconds"
  1399                )
  1400            print("emissary-apiext server is available")
  1401
  1402            tries_left = 10
  1403
  1404            while (
  1405                os.system("tools/bin/kubectl get crd mappings.getambassador.io > /dev/null 2>&1")
  1406                != 0
  1407            ):
  1408                tries_left -= 1
  1409
  1410                if tries_left <= 0:
  1411                    raise RuntimeError("CRDs never became available")
  1412
  1413                print("sleeping for CRDs... (%d)" % tries_left)
  1414                time.sleep(5)
  1415        else:
  1416            print(f"CRDS unchanged {reason}, skipping apply.")
  1417
  1418        # Next up: the KAT pod.
  1419        kat_client_manifests = integration_manifests.load("kat_client_pod")
  1420        if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
  1421            kat_client_manifests = (
  1422                integration_manifests.namespace_manifest("default") + kat_client_manifests
  1423            )
  1424        changed, reason = has_changed(
  1425            integration_manifests.format(kat_client_manifests), "/tmp/k8s-kat-pod.yaml"
  1426        )
  1427
  1428        if changed:
  1429            print(f"KAT pod definition changed ({reason}), applying")
  1430            if not ShellCommand.run_with_retry(
  1431                "Apply KAT pod",
  1432                "tools/bin/kubectl",
  1433                "apply",
  1434                "-f",
  1435                "/tmp/k8s-kat-pod.yaml",
  1436                "-n",
  1437                "default",
  1438                retries=5,
  1439                sleep_seconds=10,
  1440            ):
  1441                raise RuntimeError("Could not apply manifest for KAT pod")
  1442
  1443            tries_left = 3
  1444            time.sleep(1)
  1445
  1446            while True:
  1447                if ShellCommand.run(
  1448                    "wait for KAT pod",
  1449                    "tools/bin/kubectl",
  1450                    "-n",
  1451                    "default",
  1452                    "wait",
  1453                    "--timeout=30s",
  1454                    "--for=condition=Ready",
  1455                    "pod",
  1456                    "kat",
  1457                ):
  1458                    print("KAT pod ready")
  1459                    break
  1460
  1461                tries_left -= 1
  1462
  1463                if tries_left <= 0:
  1464                    raise RuntimeError("KAT pod never became available")
  1465
  1466                print("sleeping for KAT pod... (%d)" % tries_left)
  1467                time.sleep(5)
  1468        else:
  1469            print(f"KAT pod definition unchanged {reason}, skipping apply.")
  1470
  1471        # Use a dummy pod to get around the !*@&#$!*@&# DockerHub rate limit.
  1472        # XXX Better: switch to GCR.
  1473        dummy_pod = integration_manifests.load("dummy_pod")
  1474        if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
  1475            dummy_pod = integration_manifests.namespace_manifest("default") + dummy_pod
  1476        changed, reason = has_changed(
  1477            integration_manifests.format(dummy_pod), "/tmp/k8s-dummy-pod.yaml"
  1478        )
  1479
  1480        if changed:
  1481            print(f"Dummy pod definition changed ({reason}), applying")
  1482            if not ShellCommand.run_with_retry(
  1483                "Apply dummy pod",
  1484                "tools/bin/kubectl",
  1485                "apply",
  1486                "-f",
  1487                "/tmp/k8s-dummy-pod.yaml",
  1488                "-n",
  1489                "default",
  1490                retries=5,
  1491                sleep_seconds=10,
  1492            ):
  1493                raise RuntimeError("Could not apply manifest for dummy pod")
  1494
  1495            tries_left = 3
  1496            time.sleep(1)
  1497
  1498            while True:
  1499                if ShellCommand.run(
  1500                    "wait for dummy pod",
  1501                    "tools/bin/kubectl",
  1502                    "-n",
  1503                    "default",
  1504                    "wait",
  1505                    "--timeout=30s",
  1506                    "--for=condition=Ready",
  1507                    "pod",
  1508                    "dummy-pod",
  1509                ):
  1510                    print("Dummy pod ready")
  1511                    break
  1512
  1513                tries_left -= 1
  1514
  1515                if tries_left <= 0:
  1516                    raise RuntimeError("Dummy pod never became available")
  1517
  1518                print("sleeping for dummy pod... (%d)" % tries_left)
  1519                time.sleep(5)
  1520        else:
  1521            print(f"Dummy pod definition unchanged {reason}, skipping apply.")
  1522
  1523        # # Clear out old stuff.
  1524        if os.environ.get("DEV_CLEAN_K8S_RESOURCES", False):
  1525            print("Clearing cluster...")
  1526            ShellCommand.run(
  1527                "clear old Kubernetes namespaces",
  1528                "tools/bin/kubectl",
  1529                "delete",
  1530                "namespaces",
  1531                "-l",
  1532                "scope=AmbassadorTest",
  1533                verbose=True,
  1534            )
  1535            ShellCommand.run(
  1536                "clear old Kubernetes pods etc.",
  1537                "tools/bin/kubectl",
  1538                "delete",
  1539                "all",
  1540                "-l",
  1541                "scope=AmbassadorTest",
  1542                "--all-namespaces",
  1543                verbose=True,
  1544            )
  1545
  1546        # XXX: better prune selector label
  1547        if manifest_changed:
  1548            print(f"manifest changed ({manifest_reason}), applying...")
  1549            if not ShellCommand.run_with_retry(
  1550                "Applying k8s manifests",
  1551                "tools/bin/kubectl",
  1552                "apply",
  1553                "--prune",
  1554                "-l",
  1555                "scope=%s" % self.scope,
  1556                "-f",
  1557                fname,
  1558                retries=5,
  1559                sleep_seconds=10,
  1560            ):
  1561                raise RuntimeError("Could not apply manifests")
  1562            self.applied_manifests = True
  1563
  1564        for n in self.nodes:
  1565            if n in selected and not n.xfail:
  1566                action = getattr(n, "post_manifest", None)
  1567                if action:
  1568                    action()
  1569
  1570        self._wait(selected)
  1571
  1572        print("Waiting 5s after requirements, just because...")
  1573        time.sleep(5)
  1574
  1575    @staticmethod
  1576    def _req_str(kind, req) -> str:
  1577        printable = req
  1578
  1579        if kind == "url":
  1580            printable = req.url
  1581
  1582        return printable
  1583
  1584    def _wait(self, selected: Sequence[Node]):
  1585        requirements: List[Tuple[Node, str, Query]] = []
  1586
  1587        for node in selected:
  1588            if node.xfail:
  1589                continue
  1590
  1591            node_name = node.format("{self.path.k8s}")
  1592            ambassador_id = getattr(node, "ambassador_id", None)
  1593
  1594            # print(f"{node_name} {ambassador_id}")
  1595
  1596            if ambassador_id and ambassador_id in self.ids_to_strip:
  1597                # print(f"{node_name} has id {ambassador_id}, stripping")
  1598                continue
  1599
  1600            if node_name in self.names_to_ignore:
  1601                # print(f"{node_name} marked to ignore, stripping")
  1602                continue
  1603
  1604            for kind, req in node.requirements():
  1605                # print(f"{node_name} add req ({node_name}, {kind}, {self._req_str(kind, req)})")
  1606                requirements.append((node, kind, req))
  1607
  1608        homogenous: Dict[str, List[Tuple[Node, Query]]] = {}
  1609
  1610        for node, kind, name in requirements:
  1611            if kind not in homogenous:
  1612                homogenous[kind] = []
  1613
  1614            homogenous[kind].append((node, name))
  1615
  1616        kinds = ["pod", "url"]
  1617        delay = 5
  1618        start = time.time()
  1619        limit = int(os.environ.get("KAT_REQ_LIMIT", "900"))
  1620
  1621        print("Starting requirements check (limit %ds)... " % limit)
  1622
  1623        holdouts = {}
  1624
  1625        while time.time() - start < limit:
  1626            for kind in kinds:
  1627                if kind not in homogenous:
  1628                    continue
  1629
  1630                reqs = homogenous[kind]
  1631
  1632                print("Checking %s %s requirements... " % (len(reqs), kind), end="")
  1633
  1634                # print("\n")
  1635                # for node, req in reqs:
  1636                #     print(f"...{node.format('{self.path.k8s}')} - {self._req_str(kind, req)}")
  1637
  1638                sys.stdout.flush()
  1639
  1640                is_ready, _holdouts = self._ready(kind, reqs)
  1641
  1642                if not is_ready:
  1643                    holdouts[kind] = _holdouts
  1644                    delay = int(min(delay * 2, 10))
  1645                    print("sleeping %ss..." % delay)
  1646                    sys.stdout.flush()
  1647                    time.sleep(delay)
  1648                else:
  1649                    print("satisfied.")
  1650                    sys.stdout.flush()
  1651                    kinds.remove(kind)
  1652
  1653                break
  1654            else:
  1655                return
  1656
  1657        print("requirements not satisfied in %s seconds:" % limit)
  1658
  1659        for kind in kinds:
  1660            _holdouts = holdouts.get(kind, [])
  1661
  1662            if _holdouts:
  1663                print(f"  {kind}:")
  1664
  1665                for node, text in _holdouts:
  1666                    print(f"    {node.path.k8s} ({text})")
  1667                    node.log_kube_artifacts()
  1668
  1669        assert False, "requirements not satisfied in %s seconds" % limit
  1670
  1671    def _ready(self, kind, requirements):
  1672        fn = {
  1673            "pod": self._ready_pod,
  1674            "url": self._ready_url,
  1675        }[kind]
  1676
  1677        return fn(kind, requirements)
  1678
  1679    def _ready_pod(self, _, requirements):
  1680        pods = self._pods(self.scope)
  1681        not_ready = []
  1682
  1683        for node, name in requirements:
  1684            if not pods.get(name, False):
  1685                not_ready.append((node, name))
  1686
  1687        if not_ready:
  1688            print("%d not ready (%s), " % (len(not_ready), name), end="")
  1689            return (False, not_ready)
  1690
  1691        return (True, None)
  1692
  1693    def _ready_url(self, _, requirements):
  1694        queries = []
  1695
  1696        for node, q in requirements:
  1697            q.insecure = True
  1698            q.parent = node
  1699            queries.append(q)
  1700
  1701        # print("URL Reqs:")
  1702        # print("\n".join([ f'{q.parent.name}: {q.url}' for q in queries ]))
  1703
  1704        result = run_queries("reqcheck", queries)
  1705
  1706        not_ready = [r for r in result if r.status != r.query.expected]
  1707
  1708        if not_ready:
  1709            first = not_ready[0]
  1710            print(
  1711                "%d not ready (%s: %s) "
  1712                % (len(not_ready), first.query.url, first.status or first.error),
  1713                end="",
  1714            )
  1715            return (
  1716                False,
  1717                [
  1718                    (x.query.parent, "%s -- %s" % (x.query.url, x.status or x.error))
  1719                    for x in not_ready
  1720                ],
  1721            )
  1722        else:
  1723            return (True, None)
  1724
  1725    def _pods(self, scope=None):
  1726        scope_for_path = scope if scope else "global"
  1727        label_for_scope = f"-l scope={scope}" if scope else ""
  1728
  1729        fname = f"/tmp/pods-{scope_for_path}.json"
  1730        if not ShellCommand.run_with_retry(
  1731            "Getting pods",
  1732            f"tools/bin/kubectl get pod {label_for_scope} --all-namespaces -o json > {fname}",
  1733            shell=True,
  1734            retries=5,
  1735            sleep_seconds=10,
  1736        ):
  1737            raise RuntimeError("Could not get pods")
  1738
  1739        with open(fname) as f:
  1740            raw_pods = json.load(f)
  1741
  1742        pods = {}
  1743
  1744        for p in raw_pods["items"]:
  1745            name = p["metadata"]["name"]
  1746
  1747            cstats = p["status"].get("containerStatuses", [])
  1748
  1749            all_ready = True
  1750
  1751            for status in cstats:
  1752                ready = status.get("ready", False)
  1753
  1754                if not ready:
  1755                    all_ready = False
  1756                    # print(f'pod {name} is not ready: {status.get("state", "unknown state")}')
  1757
  1758            pods[name] = all_ready
  1759
  1760        return pods
  1761
  1762    def _query(self, selected) -> None:
  1763        queries = []
  1764
  1765        for t in self.tests:
  1766            t_name = t.format("{self.path.k8s}")
  1767
  1768            if t in selected:
  1769                t.pending = []
  1770                t.queried = []
  1771                t.results = []
  1772            else:
  1773                continue
  1774
  1775            ambassador_id = getattr(t, "ambassador_id", None)
  1776
  1777            if ambassador_id and ambassador_id in self.ids_to_strip:
  1778                # print(f"{t_name}: SKIP QUERY due to ambassador_id {ambassador_id}")
  1779                continue
  1780
  1781            # print(f"{t_name}: INCLUDE QUERY")
  1782            for q in t.queries():
  1783                q.parent = t
  1784                t.pending.append(q)
  1785                queries.append(q)
  1786
  1787        phases = sorted(set([q.phase for q in queries]))
  1788        first = True
  1789
  1790        for phase in phases:
  1791            if not first:
  1792                phase_delay = int(os.environ.get("KAT_PHASE_DELAY", 10))
  1793                print(
  1794                    "Waiting for {} seconds before starting phase {}...".format(phase_delay, phase)
  1795                )
  1796                time.sleep(phase_delay)
  1797
  1798            first = False
  1799
  1800            phase_queries = [q for q in queries if q.phase == phase]
  1801
  1802            print("Querying %s urls in phase %s..." % (len(phase_queries), phase), end="")
  1803            sys.stdout.flush()
  1804
  1805            results = run_queries(f"phase{phase}", phase_queries)
  1806
  1807            print(" done.")
  1808
  1809            for r in results:
  1810                t = r.parent
  1811                t.queried.append(r.query)
  1812
  1813                if getattr(t, "debug", False) or getattr(r.query, "debug", False):
  1814                    print(
  1815                        "%s result: %s"
  1816                        % (t.name, json.dumps(r.as_dict(), sort_keys=True, indent=4))
  1817                    )
  1818
  1819                t.results.append(r)
  1820                t.pending.remove(r.query)

View as plain text