...

Text file src/github.com/datawire/ambassador/v2/python/kat/harness.py

Documentation: github.com/datawire/ambassador/v2/python/kat

     1import base64
     2import fnmatch
     3import functools
     4import inspect
     5import json
     6import os
     7import subprocess
     8import sys
     9import threading
    10import time
    11import traceback
    12from abc import ABC
    13from collections import OrderedDict
    14from hashlib import sha256
    15from typing import (
    16    Any,
    17    Callable,
    18    Dict,
    19    List,
    20    NamedTuple,
    21    Optional,
    22    Sequence,
    23    Tuple,
    24    Type,
    25    Union,
    26    cast,
    27)
    28
    29import pytest
    30import yaml as pyyaml
    31from packaging import version
    32from yaml.parser import ParserError as YAMLParseError
    33from yaml.scanner import ScannerError as YAMLScanError
    34
    35import tests.integration.manifests as integration_manifests
    36from ambassador.utils import parse_bool
    37from multi import multi
    38from tests.kubeutils import apply_kube_artifacts
    39from tests.manifests import cleartext_host_manifest, default_listener_manifest
    40
    41from .parser import Tag, dump, load
    42from .utils import ShellCommand
    43
    44pyyaml_loader: Any = pyyaml.SafeLoader
    45pyyaml_dumper: Any = pyyaml.SafeDumper
    46
    47try:
    48    pyyaml_loader = pyyaml.CSafeLoader
    49    pyyaml_dumper = pyyaml.CSafeDumper
    50except AttributeError:
    51    pass
    52
    53# Run mode can be local (don't do any Envoy stuff), envoy (only do Envoy stuff),
    54# or all (allow both). Default is all.
    55RUN_MODE = os.environ.get("KAT_RUN_MODE", "all").lower()
    56
    57# We may have a SOURCE_ROOT override from the environment
    58SOURCE_ROOT = os.environ.get("SOURCE_ROOT", "")
    59
    60# Figure out if we're running in Edge Stack or what.
    61if os.path.exists("/buildroot/apro.version"):
    62    # We let /buildroot/apro.version remain a source of truth to minimize the
    63    # chances that we break anything that currently uses the builder shell.
    64    EDGE_STACK = True
    65else:
    66    # If we do not see concrete evidence of running in an apro builder shell,
    67    # then try to decide if the user wants us to assume we're running Edge Stack
    68    # from an environment variable. And if that isn't set, just assume OSS.
    69    EDGE_STACK = parse_bool(os.environ.get("EDGE_STACK", "false"))
    70
    71if EDGE_STACK:
    72    # Hey look, we're running inside Edge Stack!
    73    print("RUNNING IN EDGE STACK")
    74    # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
    75    # running in a build shell and we should look for sources in the usual location.
    76    if not SOURCE_ROOT:
    77        SOURCE_ROOT = "/buildroot/apro"
    78    GOLD_ROOT = os.path.join(SOURCE_ROOT, "tests/pytest/gold")
    79else:
    80    # We're either not running in Edge Stack or we're not sure, so just assume OSS.
    81    print("RUNNING IN OSS")
    82    # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
    83    # running in a build shell and we should look for sources in the usual location.
    84    if not SOURCE_ROOT:
    85        SOURCE_ROOT = "/buildroot/ambassador"
    86    GOLD_ROOT = os.path.join(SOURCE_ROOT, "python/tests/gold")
    87
    88
    89def run(cmd):
    90    status = os.system(cmd)
    91    if status != 0:
    92        raise RuntimeError("command failed[%s]: %s" % (status, cmd))
    93
    94
    95def kube_version_json():
    96    result = subprocess.Popen(
    97        "tools/bin/kubectl version -o json", stdout=subprocess.PIPE, shell=True
    98    )
    99    stdout, _ = result.communicate()
   100    return json.loads(stdout)
   101
   102
   103def strip_version(ver: str):
   104    """
   105    strip_version is needed to strip a major/minor version of non-standard symbols. For example, when working with GKE,
   106    `kubectl version` returns a minor version like '14+', which is not semver or any standard version, for that matter.
   107    So we handle exceptions like that here.
   108    :param ver: version string
   109    :return: stripped version
   110    """
   111
   112    try:
   113        return int(ver)
   114    except ValueError as e:
   115        # GKE returns weird versions with '+' in the end
   116        if ver[-1] == "+":
   117            return int(ver[:-1])
   118
   119        # If we still have not taken care of this, raise the error
   120        raise ValueError(e)
   121
   122
   123def kube_server_version(version_json=None):
   124    if not version_json:
   125        version_json = kube_version_json()
   126
   127    server_json = version_json.get("serverVersion", {})
   128
   129    if server_json:
   130        server_major = strip_version(server_json.get("major", None))
   131        server_minor = strip_version(server_json.get("minor", None))
   132
   133        return f"{server_major}.{server_minor}"
   134    else:
   135        return None
   136
   137
   138def kube_client_version(version_json=None):
   139    if not version_json:
   140        version_json = kube_version_json()
   141
   142    client_json = version_json.get("clientVersion", {})
   143
   144    if client_json:
   145        client_major = strip_version(client_json.get("major", None))
   146        client_minor = strip_version(client_json.get("minor", None))
   147
   148        return f"{client_major}.{client_minor}"
   149    else:
   150        return None
   151
   152
   153def is_kube_server_client_compatible(
   154    debug_desc: str, requested_server_version: str, requested_client_version: str
   155) -> bool:
   156    is_cluster_compatible = True
   157    kube_json = kube_version_json()
   158
   159    server_version = kube_server_version(kube_json)
   160    client_version = kube_client_version(kube_json)
   161
   162    if server_version:
   163        if version.parse(server_version) < version.parse(requested_server_version):
   164            print(f"server version {server_version} is incompatible with {debug_desc}")
   165            is_cluster_compatible = False
   166        else:
   167            print(f"server version {server_version} is compatible with {debug_desc}")
   168    else:
   169        print("could not determine Kubernetes server version?")
   170
   171    if client_version:
   172        if version.parse(client_version) < version.parse(requested_client_version):
   173            print(f"client version {client_version} is incompatible with {debug_desc}")
   174            is_cluster_compatible = False
   175        else:
   176            print(f"client version {client_version} is compatible with {debug_desc}")
   177    else:
   178        print("could not determine Kubernetes client version?")
   179
   180    return is_cluster_compatible
   181
   182
   183def is_ingress_class_compatible() -> bool:
   184    return is_kube_server_client_compatible("IngressClass", "1.18", "1.14")
   185
   186
   187def is_knative_compatible() -> bool:
   188    # Skip KNative immediately for run_mode local.
   189    if RUN_MODE == "local":
   190        return False
   191
   192    return is_kube_server_client_compatible("Knative", "1.14", "1.14")
   193
   194
   195def get_digest(data: str) -> str:
   196    s = sha256()
   197    s.update(data.encode("utf-8"))
   198    return s.hexdigest()
   199
   200
   201def has_changed(data: str, path: str) -> Tuple[bool, str]:
   202    cur_size = len(data.strip()) if data else 0
   203    cur_hash = get_digest(data)
   204
   205    # print(f'has_changed: data size {cur_size} - {cur_hash}')
   206
   207    prev_data = None
   208    changed = True
   209    reason = f"no {path} present"
   210
   211    if os.path.exists(path):
   212        with open(path) as f:
   213            prev_data = f.read()
   214
   215    prev_size = len(prev_data.strip()) if prev_data else 0
   216    prev_hash = None
   217
   218    if prev_data:
   219        prev_hash = get_digest(prev_data)
   220
   221    # print(f'has_changed: prev_data size {prev_size} - {prev_hash}')
   222
   223    if data:
   224        if data != prev_data:
   225            reason = f"different data in {path}"
   226        else:
   227            changed = False
   228            reason = f"same data in {path}"
   229
   230        if changed:
   231            # print(f'has_changed: updating {path}')
   232            with open(path, "w") as f:
   233                f.write(data)
   234
   235    # For now, we always have to reapply with split testing.
   236    if not changed:
   237        changed = True
   238        reason = "always reapply for split test"
   239
   240    return (changed, reason)
   241
   242
   243COUNTERS: Dict[Type, int] = {}
   244
   245SANITIZATIONS = OrderedDict(
   246    (
   247        ("://", "SCHEME"),
   248        (":", "COLON"),
   249        (" ", "SPACE"),
   250        ("/t", "TAB"),
   251        (".", "DOT"),
   252        ("?", "QMARK"),
   253        ("/", "SLASH"),
   254    )
   255)
   256
   257
   258def sanitize(obj):
   259    if isinstance(obj, str):
   260        for k, v in SANITIZATIONS.items():
   261            if obj.startswith(k):
   262                obj = obj.replace(k, v + "-")
   263            elif obj.endswith(k):
   264                obj = obj.replace(k, "-" + v)
   265            else:
   266                obj = obj.replace(k, "-" + v + "-")
   267        return obj
   268    elif isinstance(obj, dict):
   269        if "value" in obj:
   270            return obj["value"]
   271        else:
   272            return "-".join("%s-%s" % (sanitize(k), sanitize(v)) for k, v in sorted(obj.items()))
   273    else:
   274        cls = obj.__class__
   275        count = COUNTERS.get(cls, 0)
   276        COUNTERS[cls] = count + 1
   277        if count == 0:
   278            return cls.__name__
   279        else:
   280            return "%s-%s" % (cls.__name__, count)
   281
   282
   283def abstract_test(cls: type):
   284    cls.abstract_test = True  # type: ignore
   285    return cls
   286
   287
   288def get_nodes(node_type: type):
   289    if not inspect.isabstract(node_type) and not node_type.__dict__.get("abstract_test", False):
   290        yield node_type
   291    for sc in node_type.__subclasses__():
   292        if not sc.__dict__.get("skip_variant", False):
   293            for ssc in get_nodes(sc):
   294                yield ssc
   295
   296
   297def variants(cls, *args, **kwargs) -> Tuple[Any]:
   298    return tuple(a for n in get_nodes(cls) for a in n.variants(*args, **kwargs))  # type: ignore
   299
   300
   301class Name(NamedTuple):
   302    name: str
   303    namespace: str
   304
   305    @property
   306    def k8s(self) -> str:
   307        return self.name.replace(".", "-").lower()
   308
   309    @property
   310    def fqdn(self) -> str:
   311        return ".".join([self.k8s, self.namespace, "svc", "cluster", "local"])
   312
   313
   314class NodeLocal(threading.local):
   315    def __init__(self):
   316        self.current = None
   317
   318
   319_local = NodeLocal()
   320
   321
   322def _argprocess(o):
   323    if isinstance(o, Node):
   324        return o.clone()
   325    elif isinstance(o, tuple):
   326        return tuple(_argprocess(i) for i in o)
   327    elif isinstance(o, list):
   328        return [_argprocess(i) for i in o]
   329    elif isinstance(o, dict):
   330        return {_argprocess(k): _argprocess(v) for k, v in o.items()}
   331    else:
   332        return o
   333
   334
   335class Node(ABC):
   336
   337    parent: "Node"
   338    children: List["Node"]
   339    name: str
   340    ambassador_id: str
   341    namespace: str
   342    is_ambassador = False
   343    local_result: Optional[Dict[str, str]] = None
   344    xfail: Optional[str]
   345
   346    def __init__(
   347        self,
   348        *args,
   349        name: Optional[str] = None,
   350        namespace: Optional[str] = None,
   351        _clone: Optional["Node"] = None,
   352        **kwargs,
   353    ) -> None:
   354        # If self.skip is set to true, this node is skipped
   355        self.skip_node = False
   356        self.xfail = None
   357
   358        if _clone:
   359            args = _clone._args  # type: ignore
   360            kwargs = _clone._kwargs  # type: ignore
   361            if name:
   362                name = "-".join((_clone.name, name))
   363            else:
   364                name = _clone.name
   365            self._args = _clone._args  # type: ignore
   366            self._kwargs = _clone._kwargs  # type: ignore
   367        else:
   368            self._args = args
   369            self._kwargs = kwargs
   370            if name:
   371                name = "-".join((self.__class__.__name__, name))
   372            else:
   373                name = self.__class__.__name__
   374
   375        saved = _local.current
   376        self.parent = _local.current
   377
   378        if namespace:
   379            self.namespace = namespace
   380        if not getattr(self, "namespace", ""):
   381            # We do the above `getattr` instead of just an `else` because subclasses might set a
   382            # default namespace; so `self.namespace` might already be set before calling __init__().
   383            if self.parent and self.parent.namespace:
   384                # We have no namespace assigned, but our parent does have a namespace
   385                # defined. Copy the namespace down from our parent.
   386                self.namespace = self.parent.namespace
   387            else:
   388                self.namespace = "default"
   389
   390        _local.current = self
   391        self.children = []
   392        if self.parent is not None:
   393            self.parent.children.append(self)
   394        try:
   395            init = getattr(self, "init", lambda *a, **kw: None)
   396            init(*_argprocess(args), **_argprocess(kwargs))
   397        finally:
   398            _local.current = saved
   399
   400        # This has to come after the above to init(), because the format-string might reference
   401        # things that get set by init().
   402        self.name = self.format(name)
   403
   404        names = {}  # type: ignore
   405        for c in self.children:
   406            assert (
   407                c.name not in names
   408            ), "test %s of type %s has duplicate children: %s of type %s, %s" % (
   409                self.name,
   410                self.__class__.__name__,
   411                c.name,
   412                c.__class__.__name__,
   413                names[c.name].__class__.__name__,
   414            )
   415            names[c.name] = c
   416
   417    def clone(self, name=None):
   418        return self.__class__(_clone=self, name=name)
   419
   420    def find_local_result(self, stop_at_first_ambassador: bool = False) -> Optional[Dict[str, str]]:
   421        test_name = self.format("{self.path.k8s}")
   422
   423        # print(f"{test_name} {type(self)} FIND_LOCAL_RESULT")
   424
   425        end_result: Optional[Dict[str, str]] = None
   426
   427        n: Optional[Node] = self
   428
   429        while n:
   430            node_name = n.format("{self.path.k8s}")
   431            parent = n.parent
   432            parent_name = parent.format("{self.path.k8s}") if parent else "-none-"
   433
   434            end_result = getattr(n, "local_result", None)
   435            result_str = end_result["result"] if end_result else "-none-"
   436            # print(f"{test_name}: {'ambassador' if n.is_ambassador else 'node'} {node_name}, parent {parent_name}, local_result = {result_str}")
   437
   438            if end_result is not None:
   439                break
   440
   441            if n.is_ambassador and stop_at_first_ambassador:
   442                # This is an Ambassador: don't continue past it.
   443                break
   444
   445            n = n.parent
   446
   447        return end_result
   448
   449    def check_local(self, gold_root: str, k8s_yaml_path: str) -> Tuple[bool, bool]:
   450        testname = self.format("{self.path.k8s}")
   451
   452        if self.xfail:
   453            # XFail early -- but still return True, True so that we don't try to run Envoy on it.
   454            self.local_result = {"result": "xfail", "reason": self.xfail}
   455            # print(f"==== XFAIL: {testname} local: {self.xfail}")
   456            return (True, True)
   457
   458        if not self.is_ambassador:
   459            # print(f"{testname} ({type(self)}) is not an Ambassador")
   460            return (False, False)
   461
   462        if not self.ambassador_id:
   463            print(f"{testname} ({type(self)}) is an Ambassador but has no ambassador_id?")
   464            return (False, False)
   465
   466        ambassador_namespace = getattr(self, "namespace", "default")
   467        ambassador_single_namespace = getattr(self, "single_namespace", False)
   468
   469        no_local_mode: bool = getattr(self, "no_local_mode", False)
   470        skip_local_reason: Optional[str] = getattr(self, "skip_local_instead_of_xfail", None)
   471
   472        # print(f"{testname}: ns {ambassador_namespace} ({'single' if ambassador_single_namespace else 'multi'})")
   473
   474        gold_path = os.path.join(gold_root, testname)
   475
   476        if os.path.isdir(gold_path) and not no_local_mode:
   477            # print(f"==== {testname} running locally from {gold_path}")
   478
   479            # Yeah, I know, brutal hack.
   480            #
   481            # XXX (Flynn) This code isn't used and we don't know if it works. If you try
   482            # it, bring it up-to-date with the environment created in abstract_tests.py
   483            envstuff = ["env", f"AMBASSADOR_NAMESPACE={ambassador_namespace}"]
   484
   485            cmd = [
   486                "mockery",
   487                "--debug",
   488                k8s_yaml_path,
   489                "-w",
   490                "python /ambassador/watch_hook.py",
   491                "--kat",
   492                self.ambassador_id,
   493                "--diff",
   494                gold_path,
   495            ]
   496
   497            if ambassador_single_namespace:
   498                envstuff.append("AMBASSADOR_SINGLE_NAMESPACE=yes")
   499                cmd += ["-n", ambassador_namespace]
   500
   501            if not getattr(self, "allow_edge_stack_redirect", False):
   502                envstuff.append("AMBASSADOR_NO_TLS_REDIRECT=yes")
   503
   504            cmd = envstuff + cmd
   505
   506            w = ShellCommand(*cmd)
   507
   508            if w.status():
   509                print(f"==== GOOD: {testname} local against {gold_path}")
   510                self.local_result = {"result": "pass"}
   511            else:
   512                print(f"==== FAIL: {testname} local against {gold_path}")
   513
   514                self.local_result = {"result": "fail", "stdout": w.stdout, "stderr": w.stderr}
   515
   516            return (True, True)
   517        else:
   518            # If we have a local reason, has a parent already subsumed us?
   519            #
   520            # XXX The way KAT works, our parent will have always run earlier than us, so
   521            # it's not clear if we can ever not have been subsumed.
   522
   523            if skip_local_reason:
   524                local_result = self.find_local_result()
   525
   526                if local_result:
   527                    self.local_result = {
   528                        "result": "skip",
   529                        "reason": f"subsumed by {skip_local_reason} -- {local_result['result']}",
   530                    }
   531                    # print(f"==== {self.local_result['result'].upper()} {testname} {self.local_result['reason']}")
   532                    return (True, True)
   533
   534            # OK, we weren't already subsumed. If we're in local mode, we'll skip or xfail
   535            # depending on skip_local_reason.
   536
   537            if RUN_MODE == "local":
   538                if skip_local_reason:
   539                    self.local_result = {
   540                        "result": "skip",
   541                        # 'reason': f"subsumed by {skip_local_reason} without result in local mode"
   542                    }
   543                    print(
   544                        f"==== {self.local_result['result'].upper()} {testname} {self.local_result['reason']}"
   545                    )
   546                    return (True, True)
   547                else:
   548                    # XFail -- but still return True, True so that we don't try to run Envoy on it.
   549                    self.local_result = {
   550                        "result": "xfail",
   551                        "reason": f"missing local cache {gold_path}",
   552                    }
   553                    # print(f"==== {self.local_result['result'].upper()} {testname} {self.local_result['reason']}")
   554                    return (True, True)
   555
   556            # If here, we're not in local mode. Allow Envoy to run.
   557            self.local_result = None
   558            # print(f"==== IGNORE {testname} no local cache")
   559            return (True, False)
   560
   561    def has_local_result(self) -> bool:
   562        return bool(self.local_result)
   563
   564    @classmethod
   565    def variants(cls):
   566        yield cls()
   567
   568    @property
   569    def path(self) -> Name:
   570        if self.parent is None:
   571            return Name(self.name, self.namespace)
   572        else:
   573            return Name(self.parent.path.name + "." + self.name, self.namespace)
   574
   575    @property
   576    def traversal(self):
   577        yield self
   578        for c in self.children:
   579            for d in c.traversal:
   580                yield d
   581
   582    @property
   583    def ancestors(self):
   584        yield self
   585        if self.parent is not None:
   586            for a in self.parent.ancestors:
   587                yield a
   588
   589    @property
   590    def depth(self):
   591        if self.parent is None:
   592            return 0
   593        else:
   594            return self.parent.depth + 1
   595
   596    def format(self, st, **kwargs):
   597        return integration_manifests.format(st, self=self, **kwargs)
   598
   599    @functools.lru_cache()
   600    def matches(self, pattern):
   601        if fnmatch.fnmatch(self.path.name, "*%s*" % pattern):
   602            return True
   603        for c in self.children:
   604            if c.matches(pattern):
   605                return True
   606        return False
   607
   608    def requirements(self):
   609        yield from ()
   610
   611    # log_kube_artifacts writes various logs about our underlying Kubernetes objects to
   612    # a place where the artifact publisher can find them. See run-tests.sh.
   613    def log_kube_artifacts(self):
   614        if not getattr(self, "already_logged", False):
   615            self.already_logged = True
   616
   617            print(f"logging kube artifacts for {self.path.k8s}")
   618            sys.stdout.flush()
   619
   620            DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
   621
   622            log_path = f"/tmp/kat-logs-{self.path.k8s}"
   623
   624            if DEV:
   625                os.system(f"docker logs {self.path.k8s} >{log_path} 2>&1")
   626            else:
   627                os.system(
   628                    f"tools/bin/kubectl logs -n {self.namespace} {self.path.k8s} >{log_path} 2>&1"
   629                )
   630
   631                event_path = f"/tmp/kat-events-{self.path.k8s}"
   632
   633                fs1 = f"involvedObject.name={self.path.k8s}"
   634                fs2 = f"involvedObject.namespace={self.namespace}"
   635
   636                cmd = f'tools/bin/kubectl get events -o json --field-selector "{fs1}" --field-selector "{fs2}"'
   637                os.system(f'echo ==== "{cmd}" >{event_path}')
   638                os.system(f"{cmd} >>{event_path} 2>&1")
   639
   640
   641class Test(Node):
   642
   643    results: Sequence["Result"]
   644
   645    __test__ = False
   646
   647    def config(self):
   648        yield from ()
   649
   650    def manifests(self):
   651        return None
   652
   653    def queries(self):
   654        yield from ()
   655
   656    def check(self):
   657        pass
   658
   659    def handle_local_result(self) -> bool:
   660        test_name = self.format("{self.path.k8s}")
   661
   662        # print(f"{test_name} {type(self)} HANDLE_LOCAL_RESULT")
   663
   664        end_result = self.find_local_result()
   665
   666        if end_result is not None:
   667            result_type = end_result["result"]
   668
   669            if result_type == "pass":
   670                pass
   671            elif result_type == "skip":
   672                pytest.skip(end_result["reason"])
   673            elif result_type == "fail":
   674                sys.stdout.write(end_result["stdout"])
   675
   676                if os.environ.get("KAT_VERBOSE", None):
   677                    sys.stderr.write(end_result["stderr"])
   678
   679                pytest.fail("local check failed")
   680            elif result_type == "xfail":
   681                pytest.xfail(end_result["reason"])
   682
   683            return True
   684
   685        return False
   686
   687    @property
   688    def ambassador_id(self):
   689        if self.parent is None:
   690            return self.path.k8s
   691        else:
   692            return self.parent.ambassador_id
   693
   694
   695@multi
   696def encode_body(obj):
   697    yield type(obj)
   698
   699
   700@encode_body.when(bytes)  # type: ignore
   701def encode_body(b):
   702    return base64.encodebytes(b).decode("utf-8")
   703
   704
   705@encode_body.when(str)  # type: ignore
   706def encode_body(s):
   707    return encode_body(s.encode("utf-8"))
   708
   709
   710@encode_body.default  # type: ignore
   711def encode_body(obj):
   712    return encode_body(json.dumps(obj))
   713
   714
   715class Query:
   716    def __init__(
   717        self,
   718        url,
   719        expected=None,
   720        method="GET",
   721        headers=None,
   722        messages=None,
   723        insecure=False,
   724        skip=None,
   725        xfail=None,
   726        phase=1,
   727        debug=False,
   728        sni=False,
   729        error=None,
   730        client_crt=None,
   731        client_key=None,
   732        client_cert_required=False,
   733        ca_cert=None,
   734        grpc_type=None,
   735        cookies=None,
   736        ignore_result=False,
   737        body=None,
   738        minTLSv="",
   739        maxTLSv="",
   740        cipherSuites=[],
   741        ecdhCurves=[],
   742    ):
   743        self.method = method
   744        self.url = url
   745        self.headers = headers
   746        self.body = body
   747        self.cookies = cookies
   748        self.messages = messages
   749        self.insecure = insecure
   750        self.minTLSv = minTLSv
   751        self.maxTLSv = maxTLSv
   752        self.cipherSuites = cipherSuites
   753        self.ecdhCurves = ecdhCurves
   754        if expected is None:
   755            if url.lower().startswith("ws:"):
   756                self.expected = 101
   757            else:
   758                self.expected = 200
   759        else:
   760            self.expected = expected
   761        self.skip = skip
   762        self.xfail = xfail
   763        self.ignore_result = ignore_result
   764        self.phase = phase
   765        self.parent = None
   766        self.result = None
   767        self.debug = debug
   768        self.sni = sni
   769        self.error = error
   770        self.client_cert_required = client_cert_required
   771        self.client_cert = client_crt
   772        self.client_key = client_key
   773        self.ca_cert = ca_cert
   774        assert grpc_type in (None, "real", "bridge", "web"), grpc_type
   775        self.grpc_type = grpc_type
   776
   777    def as_json(self):
   778        result = {
   779            "test": self.parent.path.name,
   780            "id": id(self),
   781            "url": self.url,
   782            "insecure": self.insecure,
   783        }
   784        if self.sni:
   785            result["sni"] = self.sni
   786        if self.method:
   787            result["method"] = self.method
   788        if self.method:
   789            result["maxTLSv"] = self.maxTLSv
   790        if self.method:
   791            result["minTLSv"] = self.minTLSv
   792        if self.cipherSuites:
   793            result["cipherSuites"] = self.cipherSuites
   794        if self.ecdhCurves:
   795            result["ecdhCurves"] = self.ecdhCurves
   796        if self.headers:
   797            result["headers"] = self.headers
   798        if self.body is not None:
   799            result["body"] = encode_body(self.body)
   800        if self.cookies:
   801            result["cookies"] = self.cookies
   802        if self.messages is not None:
   803            result["messages"] = self.messages
   804        if self.client_cert is not None:
   805            result["client_cert"] = self.client_cert
   806        if self.client_key is not None:
   807            result["client_key"] = self.client_key
   808        if self.ca_cert is not None:
   809            result["ca_cert"] = self.ca_cert
   810        if self.client_cert_required:
   811            result["client_cert_required"] = self.client_cert_required
   812        if self.grpc_type:
   813            result["grpc_type"] = self.grpc_type
   814
   815        return result
   816
   817
   818class Result:
   819    def __init__(self, query, res):
   820        self.query = query
   821        query.result = self
   822        self.parent = query.parent
   823        self.status = res.get("status")
   824        self.headers = res.get("headers")
   825        self.messages = res.get("messages")
   826        self.tls = res.get("tls")
   827        if "body" in res:
   828            self.body = base64.decodebytes(bytes(res["body"], "ASCII"))
   829        else:
   830            self.body = None
   831        self.text = res.get("text")
   832        self.json = res.get("json")
   833        self.backend = BackendResult(self.json) if self.json else None
   834        self.error = res.get("error")
   835
   836    def __repr__(self):
   837        return str(self.as_dict())
   838
   839    def check(self):
   840        if self.query.skip:
   841            pytest.skip(self.query.skip)
   842
   843        if self.query.xfail:
   844            pytest.xfail(self.query.xfail)
   845
   846        if not self.query.ignore_result:
   847            if self.query.error is not None:
   848                found = False
   849                errors = self.query.error
   850
   851                if isinstance(self.query.error, str):
   852                    errors = [self.query.error]
   853
   854                if self.error is not None:
   855                    for error in errors:
   856                        if error in self.error:
   857                            found = True
   858                            break
   859
   860                assert found, "{}: expected error to contain any of {}; got {} instead".format(
   861                    self.query.url,
   862                    ", ".join(["'%s'" % x for x in errors]),
   863                    ("'%s'" % self.error) if self.error else "no error",
   864                )
   865            else:
   866                if self.query.expected != self.status:
   867                    self.parent.log_kube_artifacts()
   868
   869                assert (
   870                    self.query.expected == self.status
   871                ), "%s: expected status code %s, got %s instead with error %s" % (
   872                    self.query.url,
   873                    self.query.expected,
   874                    self.status,
   875                    self.error,
   876                )
   877
   878    def as_dict(self) -> Dict[str, Any]:
   879        od = {
   880            "query": self.query.as_json(),
   881            "status": self.status,
   882            "error": self.error,
   883            "headers": self.headers,
   884        }
   885
   886        if self.backend and self.backend.name:
   887            od["backend"] = self.backend.as_dict()
   888        else:
   889            od["json"] = self.json
   890            od["text"] = self.text
   891
   892        return od
   893
   894        # 'RENDERED': {
   895        #     'client': {
   896        #         'request': self.query.as_json(),
   897        #         'response': {
   898        #             'status': self.status,
   899        #             'error': self.error,
   900        #             'headers': self.headers
   901        #         }
   902        #     },
   903        #     'upstream': {
   904        #         'name': self.backend.name,
   905        #         'request': {
   906        #             'headers': self.backend.request.headers,
   907        #             'url': {
   908        #                 'fragment': self.backend.request.url.fragment,
   909        #                 'host': self.backend.request.url.host,
   910        #                 'opaque': self.backend.request.url.opaque,
   911        #                 'path': self.backend.request.url.path,
   912        #                 'query': self.backend.request.url.query,
   913        #                 'rawQuery': self.backend.request.url.rawQuery,
   914        #                 'scheme': self.backend.request.url.scheme,
   915        #                 'username': self.backend.request.url.username,
   916        #                 'password': self.backend.request.url.password,
   917        #             },
   918        #             'host': self.backend.request.host,
   919        #             'tls': {
   920        #                 'enabled': self.backend.request.tls.enabled,
   921        #                 'server_name': self.backend.request.tls.server_name,
   922        #                 'version': self.backend.request.tls.version,
   923        #                 'negotiated_protocol': self.backend.request.tls.negotiated_protocol,
   924        #             },
   925        #         },
   926        #         'response': {
   927        #             'headers': self.backend.response.headers
   928        #         }
   929        #     }
   930        # }
   931
   932
   933class BackendURL:
   934    def __init__(
   935        self,
   936        fragment=None,
   937        host=None,
   938        opaque=None,
   939        path=None,
   940        query=None,
   941        rawQuery=None,
   942        scheme=None,
   943        username=None,
   944        password=None,
   945    ):
   946        self.fragment = fragment
   947        self.host = host
   948        self.opaque = opaque
   949        self.path = path
   950        self.query = query
   951        self.rawQuery = rawQuery
   952        self.scheme = scheme
   953        self.username = username
   954        self.password = password
   955
   956    def as_dict(self) -> Dict["str", Any]:
   957        return {
   958            "fragment": self.fragment,
   959            "host": self.host,
   960            "opaque": self.opaque,
   961            "path": self.path,
   962            "query": self.query,
   963            "rawQuery": self.rawQuery,
   964            "scheme": self.scheme,
   965            "username": self.username,
   966            "password": self.password,
   967        }
   968
   969
   970class BackendRequest:
   971    def __init__(self, req):
   972        self.url = BackendURL(**req.get("url"))
   973        self.headers = req.get("headers", {})
   974        self.host = req.get("host", None)
   975        self.tls = BackendTLS(req.get("tls", {}))
   976
   977    def as_dict(self) -> Dict[str, Any]:
   978        od = {
   979            "headers": self.headers,
   980            "host": self.host,
   981        }
   982
   983        if self.url:
   984            od["url"] = self.url.as_dict()
   985
   986        if self.tls:
   987            od["tls"] = self.tls.as_dict()
   988
   989        return od
   990
   991
   992class BackendTLS:
   993    def __init__(self, tls):
   994        self.enabled = tls["enabled"]
   995        self.server_name = tls.get("server-name")
   996        self.version = tls.get("version")
   997        self.negotiated_protocol = tls.get("negotiated-protocol")
   998        self.negotiated_protocol_version = tls.get("negotiated-protocol-version")
   999
  1000    def as_dict(self) -> Dict[str, Any]:
  1001        return {
  1002            "enabled": self.enabled,
  1003            "server_name": self.server_name,
  1004            "version": self.version,
  1005            "negotiated_protocol": self.negotiated_protocol,
  1006            "negotiated_protocol_version": self.negotiated_protocol_version,
  1007        }
  1008
  1009
  1010class BackendResponse:
  1011    def __init__(self, resp):
  1012        self.headers = resp.get("headers", {})
  1013
  1014    def as_dict(self) -> Dict[str, Any]:
  1015        return {"headers": self.headers}
  1016
  1017
  1018def dictify(obj):
  1019    if getattr(obj, "as_dict", None):
  1020        return obj.as_dict()
  1021    else:
  1022        return obj
  1023
  1024
  1025class BackendResult:
  1026    def __init__(self, bres):
  1027        self.name = "raw"
  1028        self.request = None
  1029        self.response = bres
  1030
  1031        if isinstance(bres, dict):
  1032            self.name = bres.get("backend")
  1033            self.request = BackendRequest(bres["request"]) if "request" in bres else None
  1034            self.response = BackendResponse(bres["response"]) if "response" in bres else None
  1035
  1036    def as_dict(self) -> Dict[str, Any]:
  1037        od = {"name": self.name}
  1038
  1039        if self.request:
  1040            od["request"] = dictify(self.request)
  1041
  1042        if self.response:
  1043            od["response"] = dictify(self.response)
  1044
  1045        return od
  1046
  1047
  1048def label(yaml, scope):
  1049    for obj in yaml:
  1050        md = obj["metadata"]
  1051
  1052        if "labels" not in md:
  1053            md["labels"] = {}
  1054
  1055        obj["metadata"]["labels"]["scope"] = scope
  1056    return yaml
  1057
  1058
  1059CLIENT_GO = "kat_client"
  1060
  1061
  1062def run_queries(name: str, queries: Sequence[Query]) -> Sequence[Result]:
  1063    jsonified = []
  1064    byid = {}
  1065
  1066    for q in queries:
  1067        jsonified.append(q.as_json())
  1068        byid[id(q)] = q
  1069
  1070    path_urls = f"/tmp/kat-client-{name}-urls.json"
  1071    path_results = f"/tmp/kat-client-{name}-results.json"
  1072    path_log = f"/tmp/kat-client-{name}.log"
  1073
  1074    with open(path_urls, "w") as f:
  1075        json.dump(jsonified, f)
  1076
  1077    # run(f"{CLIENT_GO} -input {path_urls} -output {path_results} 2> {path_log}")
  1078    res = ShellCommand.run(
  1079        "Running queries",
  1080        f"tools/bin/kubectl exec -n default -i kat /work/kat_client < '{path_urls}' > '{path_results}' 2> '{path_log}'",
  1081        shell=True,
  1082    )
  1083
  1084    if not res:
  1085        ret = [Result(q, {"error": "Command execution error"}) for q in queries]
  1086        return ret
  1087
  1088    with open(path_results, "r") as f:
  1089        content = f.read()
  1090        try:
  1091            json_results = json.loads(content)
  1092        except Exception as e:
  1093            ret = [
  1094                Result(q, {"error": "Could not parse JSON content after running KAT queries"})
  1095                for q in queries
  1096            ]
  1097            return ret
  1098
  1099    results = []
  1100
  1101    for r in json_results:
  1102        res = r["result"]
  1103        q = byid[r["id"]]
  1104        results.append(Result(q, res))
  1105
  1106    return results
  1107
  1108
  1109# yuck
  1110DOCTEST = False
  1111
  1112
  1113class Superpod:
  1114    def __init__(self, namespace: str) -> None:
  1115        self.namespace = namespace
  1116        self.next_clear = 8080
  1117        self.next_tls = 8443
  1118        self.service_names: Dict[int, str] = {}
  1119        self.name = "superpod-%s" % (self.namespace or "default")
  1120
  1121    def allocate(self, service_name) -> List[int]:
  1122        ports = [self.next_clear, self.next_tls]
  1123        self.service_names[self.next_clear] = service_name
  1124        self.service_names[self.next_tls] = service_name
  1125
  1126        self.next_clear += 1
  1127        self.next_tls += 1
  1128
  1129        return ports
  1130
  1131    def get_manifest_list(self) -> List[Dict[str, Any]]:
  1132        manifest = load(
  1133            "superpod",
  1134            integration_manifests.format(integration_manifests.load("superpod_pod")),
  1135            Tag.MAPPING,
  1136        )
  1137
  1138        assert len(manifest) == 1, "SUPERPOD manifest must have exactly one object"
  1139
  1140        m = manifest[0]
  1141
  1142        template = m["spec"]["template"]
  1143
  1144        ports: List[Dict[str, int]] = []
  1145        envs: List[Dict[str, Union[str, int]]] = template["spec"]["containers"][0]["env"]
  1146
  1147        for p in sorted(self.service_names.keys()):
  1148            ports.append({"containerPort": p})
  1149            envs.append({"name": f"BACKEND_{p}", "value": self.service_names[p]})
  1150
  1151        template["spec"]["containers"][0]["ports"] = ports
  1152
  1153        if "metadata" not in m:
  1154            m["metadata"] = {}
  1155
  1156        metadata = m["metadata"]
  1157        metadata["name"] = self.name
  1158
  1159        m["spec"]["selector"]["matchLabels"]["backend"] = self.name
  1160        template["metadata"]["labels"]["backend"] = self.name
  1161
  1162        if self.namespace:
  1163            # Fix up the namespace.
  1164            if "namespace" not in metadata:
  1165                metadata["namespace"] = self.namespace
  1166
  1167        return list(manifest)
  1168
  1169
  1170class Runner:
  1171    def __init__(self, *classes, scope=None):
  1172        self.scope = scope or "-".join(c.__name__ for c in classes)
  1173        self.roots = tuple(v for c in classes for v in variants(c))
  1174        self.nodes = [n for r in self.roots for n in r.traversal if not n.skip_node]
  1175        self.tests = [n for n in self.nodes if isinstance(n, Test)]
  1176        self.ids = [t.path.name for t in self.tests]
  1177        self.done = False
  1178        self.skip_nonlocal_tests = False
  1179        self.ids_to_strip: Dict[str, bool] = {}
  1180        self.names_to_ignore: Dict[str, bool] = {}
  1181
  1182        @pytest.mark.parametrize("t", self.tests, ids=self.ids)
  1183        def test(request, capsys, t):
  1184            if t.xfail:
  1185                pytest.xfail(t.xfail)
  1186            else:
  1187                selected = set(
  1188                    item.callspec.getparam("t")
  1189                    for item in request.session.items
  1190                    if item.function == test
  1191                )
  1192
  1193                with capsys.disabled():
  1194                    self.setup(selected)
  1195
  1196                if not t.handle_local_result():
  1197                    # XXX: should aggregate the result of url checks
  1198                    i = 0
  1199                    for r in t.results:
  1200                        try:
  1201                            r.check()
  1202                        except AssertionError as e:
  1203                            # Add some context so that you can tell which query is failing.
  1204                            e.args = (f"query[{i}]: {e.args[0]}", *e.args[1:])
  1205                            raise
  1206                        i += 1
  1207
  1208                    t.check()
  1209
  1210        self.__func__ = test
  1211        self.__test__ = True
  1212
  1213    def __call__(self):
  1214        assert False, "this is here for py.test discovery purposes only"
  1215
  1216    def setup(self, selected):
  1217        if not self.done:
  1218            if not DOCTEST:
  1219                print()
  1220
  1221            expanded_up = set(selected)
  1222
  1223            for s in selected:
  1224                for n in s.ancestors:
  1225                    if not n.xfail:
  1226                        expanded_up.add(n)
  1227
  1228            expanded = set(expanded_up)
  1229
  1230            for s in selected:
  1231                for n in s.traversal:
  1232                    if not n.xfail:
  1233                        expanded.add(n)
  1234
  1235            try:
  1236                self._setup_k8s(expanded)
  1237
  1238                if self.skip_nonlocal_tests:
  1239                    self.done = True
  1240                    return
  1241
  1242                for t in self.tests:
  1243                    if t.has_local_result():
  1244                        # print(f"{t.name}: SKIP due to local result")
  1245                        continue
  1246
  1247                    if t in expanded_up:
  1248                        pre_query: Callable = getattr(t, "pre_query", None)
  1249
  1250                        if pre_query:
  1251                            pre_query()
  1252
  1253                self._query(expanded_up)
  1254            except:
  1255                traceback.print_exc()
  1256                pytest.exit("setup failed")
  1257            finally:
  1258                self.done = True
  1259
  1260    def get_manifests_and_namespaces(self, selected) -> Tuple[Any, List[str]]:
  1261        manifests: OrderedDict[Any, list] = OrderedDict()  # type: ignore
  1262        superpods: Dict[str, Superpod] = {}
  1263        namespaces = []
  1264        for n in (n for n in self.nodes if n in selected and not n.xfail):
  1265            manifest = None
  1266            nsp = None
  1267            ambassador_id = None
  1268
  1269            # print('manifesting for {n.path}')
  1270
  1271            # Walk up the parent chain to find our namespace and ambassador_id.
  1272            cur = n
  1273
  1274            while cur:
  1275                if not nsp:
  1276                    nsp = getattr(cur, "namespace", None)
  1277                    # print(f'... {cur.name} has namespace {nsp}')
  1278
  1279                if not ambassador_id:
  1280                    ambassador_id = getattr(cur, "ambassador_id", None)
  1281                    # print(f'... {cur.name} has ambassador_id {ambassador_id}')
  1282
  1283                if nsp and ambassador_id:
  1284                    # print(f'... good for namespace and ambassador_id')
  1285                    break
  1286
  1287                cur = cur.parent
  1288
  1289            # OK. Does this node want to use a superpod?
  1290            if getattr(n, "use_superpod", False):
  1291                # Yup. OK. Do we already have a superpod for this namespace?
  1292                superpod = superpods.get(nsp, None)  # type: ignore
  1293
  1294                if not superpod:
  1295                    # We don't have one, so we need to create one.
  1296                    superpod = Superpod(nsp)  # type: ignore
  1297                    superpods[nsp] = superpod  # type: ignore
  1298
  1299                # print(f'superpodifying {n.name}')
  1300
  1301                # Next up: use the backend_service.yaml manifest as a template...
  1302                yaml = n.format(integration_manifests.load("backend_service"))
  1303                manifest = load(n.path, yaml, Tag.MAPPING)
  1304
  1305                assert (
  1306                    len(manifest) == 1
  1307                ), "backend_service.yaml manifest must have exactly one object"
  1308
  1309                m = manifest[0]
  1310
  1311                # Update the manifest's selector...
  1312                m["spec"]["selector"]["backend"] = superpod.name
  1313
  1314                # ...and labels if needed...
  1315                if ambassador_id:
  1316                    m["metadata"]["labels"] = {"kat-ambassador-id": ambassador_id}
  1317
  1318                # ...and target ports.
  1319                superpod_ports = superpod.allocate(n.path.k8s)
  1320
  1321                m["spec"]["ports"][0]["targetPort"] = superpod_ports[0]
  1322                m["spec"]["ports"][1]["targetPort"] = superpod_ports[1]
  1323            else:
  1324                # The non-superpod case...
  1325                yaml = n.manifests()
  1326
  1327                if yaml is not None:
  1328                    is_plain_test = n.path.k8s.startswith("plain-")
  1329
  1330                    if n.is_ambassador and not is_plain_test:
  1331                        add_default_http_listener = getattr(n, "add_default_http_listener", True)
  1332                        add_default_https_listener = getattr(n, "add_default_https_listener", True)
  1333                        add_cleartext_host = getattr(n, "edge_stack_cleartext_host", False)
  1334
  1335                        if add_default_http_listener:
  1336                            # print(f"{n.path.k8s} adding default HTTP Listener")
  1337                            yaml += default_listener_manifest % {
  1338                                "namespace": nsp,
  1339                                "port": 8080,
  1340                                "protocol": "HTTPS",
  1341                                "securityModel": "XFP",
  1342                            }
  1343
  1344                        if add_default_https_listener:
  1345                            # print(f"{n.path.k8s} adding default HTTPS Listener")
  1346                            yaml += default_listener_manifest % {
  1347                                "namespace": nsp,
  1348                                "port": 8443,
  1349                                "protocol": "HTTPS",
  1350                                "securityModel": "XFP",
  1351                            }
  1352
  1353                        if EDGE_STACK and add_cleartext_host:
  1354                            # print(f"{n.path.k8s} adding Host")
  1355
  1356                            host_yaml = cleartext_host_manifest % nsp
  1357                            yaml += host_yaml
  1358
  1359                    yaml = n.format(yaml)
  1360
  1361                    try:
  1362                        manifest = load(n.path, yaml, Tag.MAPPING)
  1363                    except Exception as e:
  1364                        print(f"parse failure! {e}")
  1365                        print(yaml)
  1366
  1367            if manifest:
  1368                # print(manifest)
  1369
  1370                # Make sure namespaces and labels are properly set.
  1371                for m in manifest:
  1372                    if "metadata" not in m:
  1373                        m["metadata"] = {}
  1374
  1375                    metadata = m["metadata"]
  1376
  1377                    if "labels" not in metadata:
  1378                        metadata["labels"] = {}
  1379
  1380                    if ambassador_id:
  1381                        metadata["labels"]["kat-ambassador-id"] = ambassador_id
  1382
  1383                    if nsp:
  1384                        if "namespace" not in metadata:
  1385                            metadata["namespace"] = nsp
  1386
  1387                # ...and, finally, save the manifest list.
  1388                manifests[n] = list(manifest)
  1389                if str(nsp) not in namespaces:
  1390                    namespaces.append(str(nsp))
  1391
  1392        for superpod in superpods.values():
  1393            manifests[superpod] = superpod.get_manifest_list()
  1394
  1395        return manifests, namespaces
  1396
  1397    def do_local_checks(self, selected, fname) -> bool:
  1398        if RUN_MODE == "envoy":
  1399            print("Local mode not allowed, continuing to Envoy mode")
  1400            return False
  1401
  1402        all_valid = True
  1403        self.ids_to_strip = {}
  1404        # This feels a bit wrong?
  1405        self.names_to_ignore = {}
  1406
  1407        for n in (n for n in self.nodes if n in selected):
  1408            local_possible, local_checked = n.check_local(GOLD_ROOT, fname)
  1409
  1410            if local_possible:
  1411                if local_checked:
  1412                    self.ids_to_strip[n.ambassador_id] = True
  1413                else:
  1414                    all_valid = False
  1415
  1416        return all_valid
  1417
  1418    def _setup_k8s(self, selected):
  1419        # First up, get the full manifest and save it to disk.
  1420        manifests, namespaces = self.get_manifests_and_namespaces(selected)
  1421
  1422        configs = OrderedDict()
  1423        for n in (n for n in self.nodes if n in selected and not n.xfail):
  1424            configs[n] = []
  1425            for cfg in n.config():
  1426                if isinstance(cfg, str):
  1427                    parent_config = configs[n.parent][0][1][0]
  1428
  1429                    try:
  1430                        for o in load(n.path, cfg, Tag.MAPPING):
  1431                            parent_config.merge(o)
  1432                    except (YAMLScanError, YAMLParseError) as e:
  1433                        raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg))
  1434                else:
  1435                    target = cfg[0]
  1436
  1437                    try:
  1438                        yaml = load(n.path, cfg[1], Tag.MAPPING)
  1439
  1440                        if n.ambassador_id:
  1441                            for obj in yaml:
  1442                                if "ambassador_id" not in obj:
  1443                                    obj["ambassador_id"] = [n.ambassador_id]
  1444
  1445                        configs[n].append((target, yaml))
  1446                    except (YAMLScanError, YAMLParseError) as e:
  1447                        raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg[1]))
  1448
  1449        for tgt_cfgs in configs.values():
  1450            for target, cfg in tgt_cfgs:
  1451                for t in target.traversal:
  1452                    if t in manifests:
  1453                        k8s_yaml = manifests[t]
  1454                        for item in k8s_yaml:
  1455                            if item["kind"].lower() == "service":
  1456                                md = item["metadata"]
  1457                                if "annotations" not in md:
  1458                                    md["annotations"] = {}
  1459
  1460                                anns = md["annotations"]
  1461
  1462                                if "getambassador.io/config" in anns:
  1463                                    anns["getambassador.io/config"] += "\n" + dump(cfg)
  1464                                else:
  1465                                    anns["getambassador.io/config"] = dump(cfg)
  1466
  1467                                break
  1468                        else:
  1469                            continue
  1470                        break
  1471                else:
  1472                    assert False, "no service found for target: %s" % target.path.name
  1473
  1474        yaml = ""
  1475
  1476        for v in manifests.values():
  1477            yaml += dump(label(v, self.scope)) + "\n"
  1478
  1479        fname = "/tmp/k8s-%s.yaml" % self.scope
  1480
  1481        self.applied_manifests = False
  1482
  1483        # Always apply at this point, since we're doing the multi-run thing.
  1484        manifest_changed, manifest_reason = has_changed(yaml, fname)
  1485
  1486        # OK. Try running local stuff.
  1487        if self.do_local_checks(selected, fname):
  1488            # Everything that could run locally did. Good enough.
  1489            self.skip_nonlocal_tests = True
  1490            return True
  1491
  1492        # Something didn't work out quite right.
  1493        print(f"Continuing with Kube tests...")
  1494        # print(f"ids_to_strip {self.ids_to_strip}")
  1495
  1496        # XXX It is _so stupid_ that we're reparsing the whole manifest here.
  1497        xxx_crap = pyyaml.load_all(open(fname, "r").read(), Loader=pyyaml_loader)
  1498
  1499        # Strip things we don't need from the manifest.
  1500        trimmed_manifests = []
  1501        trimmed = 0
  1502        kept = 0
  1503
  1504        for obj in xxx_crap:
  1505            keep = True
  1506
  1507            kind = "-nokind-"
  1508            name = "-noname-"
  1509            metadata: Dict[str, Any] = {}
  1510            labels: Dict[str, str] = {}
  1511            id_to_check: Optional[str] = None
  1512
  1513            if "kind" in obj:
  1514                kind = obj["kind"]
  1515
  1516            if "metadata" in obj:
  1517                metadata = obj["metadata"]
  1518
  1519            if "name" in metadata:
  1520                name = metadata["name"]
  1521
  1522            if "labels" in metadata:
  1523                labels = metadata["labels"]
  1524
  1525            if "kat-ambassador-id" in labels:
  1526                id_to_check = labels["kat-ambassador-id"]
  1527
  1528            # print(f"metadata {metadata} id_to_check {id_to_check} obj {obj}")
  1529
  1530            # Keep namespaces, just in case.
  1531            if kind == "Namespace":
  1532                keep = True
  1533            else:
  1534                if id_to_check and (id_to_check in self.ids_to_strip):
  1535                    keep = False
  1536                    # print(f"...drop {kind} {name} (ID {id_to_check})")
  1537                    self.names_to_ignore[name] = True
  1538
  1539            if keep:
  1540                kept += 1
  1541                trimmed_manifests.append(obj)
  1542            else:
  1543                trimmed += 1
  1544
  1545        if trimmed:
  1546            print(f"After trimming: kept {kept}, trimmed {trimmed}")
  1547
  1548        yaml = pyyaml.dump_all(trimmed_manifests, Dumper=pyyaml_dumper)
  1549
  1550        fname = "/tmp/k8s-%s-trimmed.yaml" % self.scope
  1551
  1552        self.applied_manifests = False
  1553
  1554        # Always apply at this point, since we're doing the multi-run thing.
  1555        manifest_changed, manifest_reason = has_changed(yaml, fname)
  1556
  1557        # First up: CRDs.
  1558        input_crds = integration_manifests.crd_manifests()
  1559        if is_knative_compatible():
  1560            input_crds += integration_manifests.load("knative_serving_crds")
  1561
  1562        # Strip out all of the schema validation, so that we can test with broken CRDs.
  1563        # (KAT isn't really in the business of testing to be sure that Kubernetes can
  1564        # run the K8s validators...)
  1565        crds = pyyaml.load_all(input_crds, Loader=pyyaml_loader)
  1566
  1567        # Collect the CRDs with schema validation stripped in stripped_crds, because
  1568        # pyyaml.load_all actually returns something more complex than a simple list,
  1569        # so it doesn't reserialize well after being modified.
  1570        stripped_crds = []
  1571
  1572        for crd in crds:
  1573            # Guard against empty CRDs (the KNative files have some blank lines at
  1574            # the end).
  1575            if not crd:
  1576                continue
  1577
  1578            if crd["apiVersion"] == "apiextensions.k8s.io/v1":
  1579                # We can't naively strip the schema validation from apiextensions.k8s.io/v1 CRDs
  1580                # because it is required; otherwise the API server would refuse to create the CRD,
  1581                # telling us:
  1582                #
  1583                #     CustomResourceDefinition.apiextensions.k8s.io "…" is invalid: spec.versions[0].schema.openAPIV3Schema: Required value: schemas are required
  1584                #
  1585                # So instead we must replace it with a schema that allows anything.
  1586                for version in crd["spec"]["versions"]:
  1587                    if "schema" in version:
  1588                        version["schema"] = {
  1589                            "openAPIV3Schema": {
  1590                                "type": "object",
  1591                                "properties": {
  1592                                    "apiVersion": {"type": "string"},
  1593                                    "kind": {"type": "string"},
  1594                                    "metadata": {"type": "object"},
  1595                                    "spec": {
  1596                                        "type": "object",
  1597                                        "x-kubernetes-preserve-unknown-fields": True,
  1598                                    },
  1599                                },
  1600                            },
  1601                        }
  1602            elif crd["apiVersion"] == "apiextensions.k8s.io/v1beta1":
  1603                crd["spec"].pop("validation", None)
  1604                for version in crd["spec"]["versions"]:
  1605                    version.pop("schema", None)
  1606            stripped_crds.append(crd)
  1607
  1608        final_crds = pyyaml.dump_all(stripped_crds, Dumper=pyyaml_dumper)
  1609        changed, reason = has_changed(final_crds, "/tmp/k8s-CRDs.yaml")
  1610
  1611        if changed:
  1612            print(f"CRDS changed ({reason}), applying.")
  1613            if not ShellCommand.run_with_retry(
  1614                "Apply CRDs",
  1615                "tools/bin/kubectl",
  1616                "apply",
  1617                "-f",
  1618                "/tmp/k8s-CRDs.yaml",
  1619                retries=5,
  1620                sleep_seconds=10,
  1621            ):
  1622                raise RuntimeError("Failed applying CRDs")
  1623
  1624            tries_left = 10
  1625
  1626            while (
  1627                os.system("tools/bin/kubectl get crd mappings.getambassador.io > /dev/null 2>&1")
  1628                != 0
  1629            ):
  1630                tries_left -= 1
  1631
  1632                if tries_left <= 0:
  1633                    raise RuntimeError("CRDs never became available")
  1634
  1635                print("sleeping for CRDs... (%d)" % tries_left)
  1636                time.sleep(5)
  1637        else:
  1638            print(f"CRDS unchanged {reason}, skipping apply.")
  1639
  1640        # Next up: the KAT pod.
  1641        kat_client_manifests = integration_manifests.load("kat_client_pod")
  1642        if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
  1643            kat_client_manifests = (
  1644                integration_manifests.namespace_manifest("default") + kat_client_manifests
  1645            )
  1646        changed, reason = has_changed(
  1647            integration_manifests.format(kat_client_manifests), "/tmp/k8s-kat-pod.yaml"
  1648        )
  1649
  1650        if changed:
  1651            print(f"KAT pod definition changed ({reason}), applying")
  1652            if not ShellCommand.run_with_retry(
  1653                "Apply KAT pod",
  1654                "tools/bin/kubectl",
  1655                "apply",
  1656                "-f",
  1657                "/tmp/k8s-kat-pod.yaml",
  1658                "-n",
  1659                "default",
  1660                retries=5,
  1661                sleep_seconds=10,
  1662            ):
  1663                raise RuntimeError("Could not apply manifest for KAT pod")
  1664
  1665            tries_left = 3
  1666            time.sleep(1)
  1667
  1668            while True:
  1669                if ShellCommand.run(
  1670                    "wait for KAT pod",
  1671                    "tools/bin/kubectl",
  1672                    "-n",
  1673                    "default",
  1674                    "wait",
  1675                    "--timeout=30s",
  1676                    "--for=condition=Ready",
  1677                    "pod",
  1678                    "kat",
  1679                ):
  1680                    print("KAT pod ready")
  1681                    break
  1682
  1683                tries_left -= 1
  1684
  1685                if tries_left <= 0:
  1686                    raise RuntimeError("KAT pod never became available")
  1687
  1688                print("sleeping for KAT pod... (%d)" % tries_left)
  1689                time.sleep(5)
  1690        else:
  1691            print(f"KAT pod definition unchanged {reason}, skipping apply.")
  1692
  1693        # Use a dummy pod to get around the !*@&#$!*@&# DockerHub rate limit.
  1694        # XXX Better: switch to GCR.
  1695        dummy_pod = integration_manifests.load("dummy_pod")
  1696        if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
  1697            dummy_pod = integration_manifests.namespace_manifest("default") + dummy_pod
  1698        changed, reason = has_changed(
  1699            integration_manifests.format(dummy_pod), "/tmp/k8s-dummy-pod.yaml"
  1700        )
  1701
  1702        if changed:
  1703            print(f"Dummy pod definition changed ({reason}), applying")
  1704            if not ShellCommand.run_with_retry(
  1705                "Apply dummy pod",
  1706                "tools/bin/kubectl",
  1707                "apply",
  1708                "-f",
  1709                "/tmp/k8s-dummy-pod.yaml",
  1710                "-n",
  1711                "default",
  1712                retries=5,
  1713                sleep_seconds=10,
  1714            ):
  1715                raise RuntimeError("Could not apply manifest for dummy pod")
  1716
  1717            tries_left = 3
  1718            time.sleep(1)
  1719
  1720            while True:
  1721                if ShellCommand.run(
  1722                    "wait for dummy pod",
  1723                    "tools/bin/kubectl",
  1724                    "-n",
  1725                    "default",
  1726                    "wait",
  1727                    "--timeout=30s",
  1728                    "--for=condition=Ready",
  1729                    "pod",
  1730                    "dummy-pod",
  1731                ):
  1732                    print("Dummy pod ready")
  1733                    break
  1734
  1735                tries_left -= 1
  1736
  1737                if tries_left <= 0:
  1738                    raise RuntimeError("Dummy pod never became available")
  1739
  1740                print("sleeping for dummy pod... (%d)" % tries_left)
  1741                time.sleep(5)
  1742        else:
  1743            print(f"Dummy pod definition unchanged {reason}, skipping apply.")
  1744
  1745        # # Clear out old stuff.
  1746        if os.environ.get("DEV_CLEAN_K8S_RESOURCES", False):
  1747            print("Clearing cluster...")
  1748            ShellCommand.run(
  1749                "clear old Kubernetes namespaces",
  1750                "tools/bin/kubectl",
  1751                "delete",
  1752                "namespaces",
  1753                "-l",
  1754                "scope=AmbassadorTest",
  1755                verbose=True,
  1756            )
  1757            ShellCommand.run(
  1758                "clear old Kubernetes pods etc.",
  1759                "tools/bin/kubectl",
  1760                "delete",
  1761                "all",
  1762                "-l",
  1763                "scope=AmbassadorTest",
  1764                "--all-namespaces",
  1765                verbose=True,
  1766            )
  1767
  1768        # XXX: better prune selector label
  1769        if manifest_changed:
  1770            print(f"manifest changed ({manifest_reason}), applying...")
  1771            if not ShellCommand.run_with_retry(
  1772                "Applying k8s manifests",
  1773                "tools/bin/kubectl",
  1774                "apply",
  1775                "--prune",
  1776                "-l",
  1777                "scope=%s" % self.scope,
  1778                "-f",
  1779                fname,
  1780                retries=5,
  1781                sleep_seconds=10,
  1782            ):
  1783                raise RuntimeError("Could not apply manifests")
  1784            self.applied_manifests = True
  1785
  1786        for n in self.nodes:
  1787            if n in selected and not n.xfail:
  1788                action = getattr(n, "post_manifest", None)
  1789                if action:
  1790                    action()
  1791
  1792        self._wait(selected)
  1793
  1794        print("Waiting 5s after requirements, just because...")
  1795        time.sleep(5)
  1796
  1797    @staticmethod
  1798    def _req_str(kind, req) -> str:
  1799        printable = req
  1800
  1801        if kind == "url":
  1802            printable = req.url
  1803
  1804        return printable
  1805
  1806    def _wait(self, selected):
  1807        requirements = []
  1808
  1809        for node in selected:
  1810            if node.xfail:
  1811                continue
  1812
  1813            node_name = node.format("{self.path.k8s}")
  1814            ambassador_id = getattr(node, "ambassador_id", None)
  1815
  1816            # print(f"{node_name} {ambassador_id}")
  1817
  1818            if node.has_local_result():
  1819                # print(f"{node_name} has local result, skipping")
  1820                continue
  1821
  1822            if ambassador_id and ambassador_id in self.ids_to_strip:
  1823                # print(f"{node_name} has id {ambassador_id}, stripping")
  1824                continue
  1825
  1826            if node_name in self.names_to_ignore:
  1827                # print(f"{node_name} marked to ignore, stripping")
  1828                continue
  1829
  1830            # if RUN_MODE != "envoy":
  1831            #     print(f"{node_name}: including in nonlocal tests")
  1832
  1833            for kind, req in node.requirements():
  1834                # print(f"{node_name} add req ({node_name}, {kind}, {self._req_str(kind, req)})")
  1835                requirements.append((node, kind, req))
  1836
  1837        homogenous = {}
  1838
  1839        for node, kind, name in requirements:
  1840            if kind not in homogenous:
  1841                homogenous[kind] = []
  1842
  1843            homogenous[kind].append((node, name))
  1844
  1845        kinds = ["pod", "url"]
  1846        delay = 5
  1847        start = time.time()
  1848        limit = int(os.environ.get("KAT_REQ_LIMIT", "900"))
  1849
  1850        print("Starting requirements check (limit %ds)... " % limit)
  1851
  1852        holdouts = {}
  1853
  1854        while time.time() - start < limit:
  1855            for kind in kinds:
  1856                if kind not in homogenous:
  1857                    continue
  1858
  1859                reqs = homogenous[kind]
  1860
  1861                print("Checking %s %s requirements... " % (len(reqs), kind), end="")
  1862
  1863                # print("\n")
  1864                # for node, req in reqs:
  1865                #     print(f"...{node.format('{self.path.k8s}')} - {self._req_str(kind, req)}")
  1866
  1867                sys.stdout.flush()
  1868
  1869                is_ready, _holdouts = self._ready(kind, reqs)
  1870
  1871                if not is_ready:
  1872                    holdouts[kind] = _holdouts
  1873                    delay = int(min(delay * 2, 10))
  1874                    print("sleeping %ss..." % delay)
  1875                    sys.stdout.flush()
  1876                    time.sleep(delay)
  1877                else:
  1878                    print("satisfied.")
  1879                    sys.stdout.flush()
  1880                    kinds.remove(kind)
  1881
  1882                break
  1883            else:
  1884                return
  1885
  1886        print("requirements not satisfied in %s seconds:" % limit)
  1887
  1888        for kind in kinds:
  1889            _holdouts = holdouts.get(kind, [])
  1890
  1891            if _holdouts:
  1892                print(f"  {kind}:")
  1893
  1894                for node, text in _holdouts:
  1895                    print(f"    {node.path.k8s} ({text})")
  1896                    node.log_kube_artifacts()
  1897
  1898        assert False, "requirements not satisfied in %s seconds" % limit
  1899
  1900    @multi
  1901    def _ready(self, kind, _):
  1902        return kind
  1903
  1904    @_ready.when("pod")  # type: ignore
  1905    def _ready(self, _, requirements):
  1906        pods = self._pods(self.scope)
  1907        not_ready = []
  1908
  1909        for node, name in requirements:
  1910            if not pods.get(name, False):
  1911                not_ready.append((node, name))
  1912
  1913        if not_ready:
  1914            print("%d not ready (%s), " % (len(not_ready), name), end="")
  1915            return (False, not_ready)
  1916
  1917        return (True, None)
  1918
  1919    @_ready.when("url")  # type: ignore
  1920    def _ready(self, _, requirements):
  1921        queries = []
  1922
  1923        for node, q in requirements:
  1924            q.insecure = True
  1925            q.parent = node
  1926            queries.append(q)
  1927
  1928        # print("URL Reqs:")
  1929        # print("\n".join([ f'{q.parent.name}: {q.url}' for q in queries ]))
  1930
  1931        result = run_queries("reqcheck", queries)
  1932
  1933        not_ready = [r for r in result if r.status != r.query.expected]
  1934
  1935        if not_ready:
  1936            first = not_ready[0]
  1937            print(
  1938                "%d not ready (%s: %s) "
  1939                % (len(not_ready), first.query.url, first.status or first.error),
  1940                end="",
  1941            )
  1942            return (
  1943                False,
  1944                [
  1945                    (x.query.parent, "%s -- %s" % (x.query.url, x.status or x.error))
  1946                    for x in not_ready
  1947                ],
  1948            )
  1949        else:
  1950            return (True, None)
  1951
  1952    def _pods(self, scope=None):
  1953        scope_for_path = scope if scope else "global"
  1954        label_for_scope = f"-l scope={scope}" if scope else ""
  1955
  1956        fname = f"/tmp/pods-{scope_for_path}.json"
  1957        if not ShellCommand.run_with_retry(
  1958            "Getting pods",
  1959            f"tools/bin/kubectl get pod {label_for_scope} --all-namespaces -o json > {fname}",
  1960            shell=True,
  1961            retries=5,
  1962            sleep_seconds=10,
  1963        ):
  1964            raise RuntimeError("Could not get pods")
  1965
  1966        with open(fname) as f:
  1967            raw_pods = json.load(f)
  1968
  1969        pods = {}
  1970
  1971        for p in raw_pods["items"]:
  1972            name = p["metadata"]["name"]
  1973
  1974            cstats = p["status"].get("containerStatuses", [])
  1975
  1976            all_ready = True
  1977
  1978            for status in cstats:
  1979                ready = status.get("ready", False)
  1980
  1981                if not ready:
  1982                    all_ready = False
  1983                    # print(f'pod {name} is not ready: {status.get("state", "unknown state")}')
  1984
  1985            pods[name] = all_ready
  1986
  1987        return pods
  1988
  1989    def _query(self, selected) -> None:
  1990        queries = []
  1991
  1992        for t in self.tests:
  1993            t_name = t.format("{self.path.k8s}")
  1994
  1995            if t in selected:
  1996                t.pending = []
  1997                t.queried = []
  1998                t.results = []
  1999            else:
  2000                continue
  2001
  2002            if t.has_local_result():
  2003                # print(f"{t_name}: SKIP QUERY due to local result")
  2004                continue
  2005
  2006            ambassador_id = getattr(t, "ambassador_id", None)
  2007
  2008            if ambassador_id and ambassador_id in self.ids_to_strip:
  2009                # print(f"{t_name}: SKIP QUERY due to ambassador_id {ambassador_id}")
  2010                continue
  2011
  2012            # print(f"{t_name}: INCLUDE QUERY")
  2013            for q in t.queries():
  2014                q.parent = t
  2015                t.pending.append(q)
  2016                queries.append(q)
  2017
  2018        phases = sorted(set([q.phase for q in queries]))
  2019        first = True
  2020
  2021        for phase in phases:
  2022            if not first:
  2023                phase_delay = int(os.environ.get("KAT_PHASE_DELAY", 10))
  2024                print(
  2025                    "Waiting for {} seconds before starting phase {}...".format(phase_delay, phase)
  2026                )
  2027                time.sleep(phase_delay)
  2028
  2029            first = False
  2030
  2031            phase_queries = [q for q in queries if q.phase == phase]
  2032
  2033            print("Querying %s urls in phase %s..." % (len(phase_queries), phase), end="")
  2034            sys.stdout.flush()
  2035
  2036            results = run_queries(f"phase{phase}", phase_queries)
  2037
  2038            print(" done.")
  2039
  2040            for r in results:
  2041                t = r.parent
  2042                t.queried.append(r.query)
  2043
  2044                if getattr(t, "debug", False) or getattr(r.query, "debug", False):
  2045                    print(
  2046                        "%s result: %s"
  2047                        % (t.name, json.dumps(r.as_dict(), sort_keys=True, indent=4))
  2048                    )
  2049
  2050                t.results.append(r)
  2051                t.pending.remove(r.query)

View as plain text