...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/kat/abstract_tests.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/kat

     1import base64
     2import os
     3import shutil
     4import subprocess
     5import sys
     6from random import random
     7from typing import Any, ClassVar, Generator, List, Optional, Sequence, Tuple, Union
     8from typing import cast as typecast
     9
    10import pytest
    11import yaml
    12
    13# These type: ignores are because, weirdly, the yaml.CSafe* variants don't share
    14# a type with their non-C variants. No clue why not.
    15yaml_loader = yaml.SafeLoader  # type: ignore
    16yaml_dumper = yaml.SafeDumper  # type: ignore
    17
    18try:
    19    yaml_loader = yaml.CSafeLoader  # type: ignore
    20    yaml_dumper = yaml.CSafeDumper  # type: ignore
    21except AttributeError:
    22    pass
    23
    24import tests.integration.manifests as integration_manifests
    25from kat.harness import Name, Node, Query, Test, abstract_test, sanitize
    26from kat.utils import ShellCommand
    27
    28AMBASSADOR_LOCAL = """
    29---
    30apiVersion: v1
    31kind: Secret
    32metadata:
    33  name: {self.path.k8s}
    34  annotations:
    35    kubernetes.io/service-account.name: {self.path.k8s}
    36type: kubernetes.io/service-account-token
    37"""
    38
    39
    40def assert_default_errors(errors, include_ingress_errors=True):
    41    default_errors = [
    42        [
    43            "",
    44            "Ambassador could not find core CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    45        ],
    46        [
    47            "",
    48            "Ambassador could not find Resolver type CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    49        ],
    50        [
    51            "",
    52            "Ambassador could not find the Host CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    53        ],
    54        [
    55            "",
    56            "Ambassador could not find the LogService CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    57        ],
    58    ]
    59
    60    if include_ingress_errors:
    61        default_errors.append(
    62            [
    63                "",
    64                "Ambassador is not permitted to read Ingress resources. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/running/ingress-controller/#ambassador-as-an-ingress-controller for more information. You can continue using Ambassador, but Ingress resources will be ignored...",
    65            ]
    66        )
    67
    68    number_of_default_errors = len(default_errors)
    69
    70    if errors[:number_of_default_errors] != default_errors:
    71        assert False, f"default error table mismatch: got\n{errors}"
    72
    73    for error in errors[number_of_default_errors:]:
    74        assert (
    75            "found invalid port" in error[1]
    76        ), "Could not find 'found invalid port' in the error {}".format(error[1])
    77
    78
    79DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
    80
    81
    82@abstract_test
    83class AmbassadorTest(Test):
    84
    85    """
    86    AmbassadorTest is a top level ambassador test.
    87    """
    88
    89    OFFSET: ClassVar[int] = 0
    90    IMAGE_BUILT: ClassVar[bool] = False
    91
    92    _index: Optional[int] = None
    93    _ambassador_id: Optional[str] = None
    94    single_namespace: bool = False
    95    disable_endpoints: bool = False
    96    name: str
    97    path: Name
    98    extra_ports: Optional[List[int]] = None
    99    debug_diagd: bool = True
   100    debug_envoy: bool = False
   101    manifest_envs = ""
   102    is_ambassador = True
   103    allow_edge_stack_redirect = False
   104    edge_stack_cleartext_host = True
   105
   106    env: List[str] = []
   107
   108    def manifests(self) -> str:
   109        rbac = integration_manifests.load("rbac_cluster_scope")
   110
   111        self.manifest_envs += """
   112    - name: POLL_EVERY_SECS
   113      value: "0"
   114    - name: CONSUL_WATCHER_PORT
   115      value: "8500"
   116"""
   117
   118        if os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true").lower() == "false":
   119            self.manifest_envs += """
   120    - name: AMBASSADOR_FAST_RECONFIGURE
   121      value: "false"
   122"""
   123
   124        amb_debug = []
   125        if self.debug_diagd:
   126            amb_debug.append("diagd")
   127        if self.debug_envoy:
   128            amb_debug.append("envoy")
   129        if amb_debug:
   130            self.manifest_envs += """
   131    - name: AMBASSADOR_DEBUG
   132      value: "%s"
   133    - name: AES_LOG_LEVEL
   134      value: "debug"
   135""" % ":".join(
   136                amb_debug
   137            )
   138
   139        if self.ambassador_id:
   140            self.manifest_envs += f"""
   141    - name: AMBASSADOR_LABEL_SELECTOR
   142      value: "kat-ambassador-id={self.ambassador_id}"
   143"""
   144
   145        if self.single_namespace:
   146            self.manifest_envs += """
   147    - name: AMBASSADOR_SINGLE_NAMESPACE
   148      value: "yes"
   149"""
   150            rbac = integration_manifests.load("rbac_namespace_scope")
   151
   152        if self.disable_endpoints:
   153            self.manifest_envs += """
   154    - name: AMBASSADOR_DISABLE_ENDPOINTS
   155      value: "yes"
   156"""
   157        if not self.allow_edge_stack_redirect:
   158            self.manifest_envs += """
   159    - name: AMBASSADOR_NO_TLS_REDIRECT
   160      value: "yes"
   161"""
   162
   163        eports = ""
   164
   165        if self.extra_ports:
   166            for port in self.extra_ports:
   167                eports += f"""
   168  - name: extra-{port}
   169    protocol: TCP
   170    port: {port}
   171    targetPort: {port}
   172"""
   173
   174        if DEV:
   175            return self.format(rbac + AMBASSADOR_LOCAL, extra_ports=eports)
   176        else:
   177            return self.format(
   178                rbac + integration_manifests.load("ambassador"),
   179                envs=self.manifest_envs,
   180                extra_ports=eports,
   181                capabilities_block="",
   182            )
   183
   184    @property
   185    def index(self) -> int:
   186        if self._index is None:
   187            # lock here?
   188            self._index = AmbassadorTest.OFFSET
   189            AmbassadorTest.OFFSET += 1
   190
   191        return typecast(int, self._index)
   192
   193    def post_manifest(self):
   194        if not DEV:
   195            return
   196
   197        if os.environ.get("KAT_SKIP_DOCKER"):
   198            return
   199
   200        image = os.environ["AMBASSADOR_DOCKER_IMAGE"]
   201        cached_image = os.environ["BASE_PY_IMAGE"]
   202        ambassador_base_image = os.environ["BASE_GO_IMAGE"]
   203
   204        if not AmbassadorTest.IMAGE_BUILT:
   205            AmbassadorTest.IMAGE_BUILT = True
   206
   207            cmd = ShellCommand(
   208                "docker", "ps", "-a", "-f", "label=kat-family=ambassador", "--format", "{{.ID}}"
   209            )
   210
   211            if cmd.check("find old docker container IDs"):
   212                ids = cmd.stdout.split("\n")
   213
   214                while ids:
   215                    if ids[-1]:
   216                        break
   217
   218                    ids.pop()
   219
   220                if ids:
   221                    print("Killing old containers...")
   222                    ShellCommand.run("kill old containers", "docker", "kill", *ids, verbose=True)
   223                    ShellCommand.run("rm old containers", "docker", "rm", *ids, verbose=True)
   224
   225            context = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
   226
   227            print("Starting docker build...", end="")
   228            sys.stdout.flush()
   229
   230            cmd = ShellCommand(
   231                "docker",
   232                "build",
   233                "--build-arg",
   234                "BASE_PY_IMAGE={}".format(cached_image),
   235                "--build-arg",
   236                "BASE_GO_IMAGE={}".format(ambassador_base_image),
   237                context,
   238                "-t",
   239                image,
   240            )
   241
   242            if cmd.check("docker build Ambassador image"):
   243                print("done.")
   244            else:
   245                pytest.exit("container failed to build")
   246
   247        fname = "/tmp/k8s-%s.yaml" % self.path.k8s
   248        if os.path.exists(fname):
   249            with open(fname) as fd:
   250                content = fd.read()
   251        else:
   252            nsp = getattr(self, "namespace", None) or "default"
   253
   254            cmd = ShellCommand(
   255                "tools/bin/kubectl", "get", "-n", nsp, "-o", "yaml", "secret", self.path.k8s
   256            )
   257
   258            if not cmd.check(f"fetch secret for {self.path.k8s}"):
   259                pytest.exit(f"could not fetch secret for {self.path.k8s}")
   260
   261            content = cmd.stdout
   262
   263            with open(fname, "wb") as fd:
   264                fd.write(content.encode("utf-8"))
   265
   266        try:
   267            secret = yaml.load(content, Loader=yaml_loader)
   268        except Exception as e:
   269            print("could not parse YAML:\n%s" % content)
   270            raise e
   271
   272        data = secret["data"]
   273        # secret_dir = tempfile.mkdtemp(prefix=self.path.k8s, suffix="secret")
   274        secret_dir = "/tmp/%s-ambassadormixin-%s" % (self.path.k8s, "secret")
   275
   276        shutil.rmtree(secret_dir, ignore_errors=True)
   277        os.mkdir(secret_dir, 0o777)
   278
   279        for k, v in data.items():
   280            with open(os.path.join(secret_dir, k), "wb") as f:
   281                f.write(base64.decodebytes(bytes(v, "utf8")))
   282        print("Launching %s container." % self.path.k8s)
   283        command = ["docker", "run", "-d", "-l", "kat-family=ambassador", "--name", self.path.k8s]
   284
   285        envs = [
   286            "KUBERNETES_SERVICE_HOST=kubernetes",
   287            "KUBERNETES_SERVICE_PORT=443",
   288            "AMBASSADOR_SNAPSHOT_COUNT=1",
   289            "AMBASSADOR_CONFIG_BASE_DIR=/tmp/ambassador",
   290            "POLL_EVERY_SECS=0",
   291            "CONSUL_WATCHER_PORT=8500",
   292            "AMBASSADOR_UPDATE_MAPPING_STATUS=false",
   293            "AMBASSADOR_ID=%s" % self.ambassador_id,
   294        ]
   295
   296        if self.namespace:
   297            envs.append("AMBASSADOR_NAMESPACE=%s" % self.namespace)
   298
   299        if self.single_namespace:
   300            envs.append("AMBASSADOR_SINGLE_NAMESPACE=yes")
   301
   302        if self.disable_endpoints:
   303            envs.append("AMBASSADOR_DISABLE_ENDPOINTS=yes")
   304
   305        amb_debug = []
   306        if self.debug_diagd:
   307            amb_debug.append("diagd")
   308        if self.debug_envoy:
   309            amb_debug.append("envoy")
   310        if amb_debug:
   311            envs.append("AMBASSADOR_DEBUG=%s" % ":".join(amb_debug))
   312
   313        envs.extend(self.env)
   314        for env in envs:
   315            command.extend(["-e", env])
   316
   317        ports = [
   318            "%s:8877" % (8877 + self.index),
   319            "%s:8001" % (8001 + self.index),
   320            "%s:8080" % (8080 + self.index),
   321            "%s:8443" % (8443 + self.index),
   322        ]
   323
   324        if self.extra_ports:
   325            for port in self.extra_ports:
   326                ports.append(f"{port}:{port}")
   327
   328        for port_str in ports:
   329            command.extend(["-p", port_str])
   330
   331        volumes = ["%s:/var/run/secrets/kubernetes.io/serviceaccount" % secret_dir]
   332        for volume in volumes:
   333            command.extend(["-v", volume])
   334
   335        command.append(image)
   336
   337        if os.environ.get("KAT_SHOW_DOCKER"):
   338            print(" ".join(command))
   339
   340        cmd = ShellCommand(*command)
   341
   342        if not cmd.check(f"start container for {self.path.k8s}"):
   343            pytest.exit(f"could not start container for {self.path.k8s}")
   344
   345    def queries(self):
   346        if DEV:
   347            cmd = ShellCommand("docker", "ps", "-qf", "name=%s" % self.path.k8s)
   348
   349            if not cmd.check(f"docker check for {self.path.k8s}"):
   350                if not cmd.stdout.strip():
   351                    log_cmd = ShellCommand(
   352                        "docker", "logs", self.path.k8s, stderr=subprocess.STDOUT
   353                    )
   354
   355                    if log_cmd.check(f"docker logs for {self.path.k8s}"):
   356                        print(cmd.stdout)
   357
   358                    pytest.exit(f"container failed to start for {self.path.k8s}")
   359
   360        return ()
   361
   362    def scheme(self) -> str:
   363        return "http"
   364
   365    def url(self, prefix, scheme=None, port=None) -> str:
   366        if scheme is None:
   367            scheme = self.scheme()
   368
   369        if DEV:
   370            if not port:
   371                port = 8443 if scheme == "https" else 8080
   372                port += self.index
   373
   374            return "%s://%s/%s" % (scheme, "localhost:%s" % port, prefix)
   375        else:
   376            host_and_port = self.path.fqdn
   377
   378            if port:
   379                host_and_port += f":{port}"
   380
   381            return "%s://%s/%s" % (scheme, host_and_port, prefix)
   382
   383    def requirements(self):
   384        yield ("url", Query(self.url("ambassador/v0/check_ready")))
   385        yield ("url", Query(self.url("ambassador/v0/check_alive")))
   386
   387
   388@abstract_test
   389class ServiceType(Node):
   390
   391    path: Name
   392    _manifests: Optional[str]
   393    use_superpod: bool = True
   394
   395    def __init__(
   396        self, service_manifests: str = None, namespace: str = None, *args, **kwargs
   397    ) -> None:
   398        super().__init__(namespace=namespace, *args, **kwargs)
   399
   400        self._manifests = service_manifests
   401
   402        if self._manifests:
   403            self.use_superpod = False
   404
   405    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   406        yield from ()
   407
   408    def manifests(self):
   409        if self.use_superpod:
   410            return None
   411
   412        return self.format(self._manifests)
   413
   414    def requirements(self):
   415        if self.use_superpod:
   416            yield from ()
   417
   418        yield ("url", Query("http://%s" % self.path.fqdn))
   419        yield ("url", Query("https://%s" % self.path.fqdn))
   420
   421
   422@abstract_test
   423class ServiceTypeGrpc(Node):
   424
   425    path: Name
   426
   427    def __init__(self, service_manifests: str = None, *args, **kwargs) -> None:
   428        super().__init__(*args, **kwargs)
   429        self._manifests = service_manifests or integration_manifests.load("backend")
   430
   431    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   432        yield from ()
   433
   434    def manifests(self):
   435        return self.format(self._manifests)
   436
   437    def requirements(self):
   438        yield ("url", Query("http://%s" % self.path.fqdn))
   439        yield ("url", Query("https://%s" % self.path.fqdn))
   440
   441
   442class HTTP(ServiceType):
   443    pass
   444
   445
   446class GRPC(ServiceType):
   447    pass
   448
   449
   450class EGRPC(ServiceType):
   451    skip_variant: ClassVar[bool] = True
   452
   453    def __init__(self, *args, **kwargs) -> None:
   454        # Do this unconditionally, because that's the point of this class.
   455        kwargs["service_manifests"] = integration_manifests.load("grpc_echo_backend")
   456        super().__init__(*args, **kwargs)
   457
   458    def requirements(self):
   459        yield (
   460            "url",
   461            Query(
   462                "http://%s/echo.EchoService/Echo" % self.path.fqdn,
   463                headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
   464                expected=200,
   465                grpc_type="real",
   466            ),
   467        )
   468
   469
   470class HealthCheckServer(ServiceType):
   471    skip_variant: ClassVar[bool] = True
   472
   473    def __init__(self, *args, **kwargs) -> None:
   474        # We want to reset the health check server between runs since the test involves making one
   475        # of the pods unhealthy. 'nonce' is a
   476        # horrible hack to get the Pod to roll over each invocation.
   477        self.nonce = random()
   478        self.use_superpod = False
   479        # Do this unconditionally, because that's the point of this class.
   480        kwargs["service_manifests"] = integration_manifests.load("health_check_server")
   481        super().__init__(*args, **kwargs)
   482
   483    def requirements(self):
   484        yield ("deployment", self.path.k8s)
   485        yield ("url", Query("http://%s" % self.path.fqdn))
   486        yield ("url", Query("https://%s" % self.path.fqdn))
   487
   488
   489class AHTTP(ServiceType):
   490    skip_variant: ClassVar[bool] = True
   491
   492    def __init__(self, *args, **kwargs) -> None:
   493        # Do this unconditionally, because that's the point of this class.
   494        kwargs["service_manifests"] = integration_manifests.load("auth_backend")
   495        super().__init__(*args, **kwargs)
   496
   497
   498class AGRPC(ServiceType):
   499    skip_variant: ClassVar[bool] = True
   500
   501    def __init__(self, protocol_version: str = "v3", *args, **kwargs) -> None:
   502        self.protocol_version = protocol_version
   503
   504        # Do this unconditionally, because that's the point of this class.
   505        kwargs["service_manifests"] = integration_manifests.load("grpc_auth_backend")
   506        super().__init__(*args, **kwargs)
   507
   508    def requirements(self):
   509        yield ("pod", self.path.k8s)
   510
   511
   512class RLSGRPC(ServiceType):
   513    skip_variant: ClassVar[bool] = True
   514
   515    def __init__(self, protocol_version: str = "v3", *args, **kwargs) -> None:
   516        self.protocol_version = protocol_version
   517
   518        # Do this unconditionally, because that's the point of this class.
   519        kwargs["service_manifests"] = integration_manifests.load("grpc_rls_backend")
   520        super().__init__(*args, **kwargs)
   521
   522    def requirements(self):
   523        yield ("pod", self.path.k8s)
   524
   525
   526class ALSGRPC(ServiceType):
   527    skip_variant: ClassVar[bool] = True
   528
   529    def __init__(self, *args, **kwargs) -> None:
   530        # Do this unconditionally, because that's the point of this class.
   531        kwargs["service_manifests"] = integration_manifests.load("grpc_als_backend")
   532        super().__init__(*args, **kwargs)
   533
   534    def requirements(self):
   535        yield ("pod", self.path.k8s)
   536
   537
   538class HTTPBin(ServiceType):
   539    skip_variant: ClassVar[bool] = True
   540
   541    def __init__(self, *args, **kwargs) -> None:
   542        # Do this unconditionally, because that's the point of this class.
   543        kwargs["service_manifests"] = integration_manifests.load("httpbin_backend")
   544        super().__init__(*args, **kwargs)
   545
   546    def requirements(self):
   547        yield ("url", Query("http://%s/status/200" % self.path.fqdn))
   548
   549
   550class WebsocketEcho(ServiceType):
   551    skip_variant: ClassVar[bool] = True
   552
   553    def __init__(self, *args, **kwargs) -> None:
   554        # Do this unconditionally, because that's the point of this class.
   555        kwargs["service_manifests"] = integration_manifests.load("websocket_echo_backend")
   556        super().__init__(*args, **kwargs)
   557
   558    def requirements(self):
   559        yield ("url", Query("http://%s/" % self.path.fqdn, expected=404))
   560
   561
   562class StatsDSink(ServiceType):
   563    skip_variant: ClassVar[bool] = True
   564    target_cluster: str
   565
   566    def __init__(self, target_cluster: str, *args, **kwargs) -> None:
   567        self.target_cluster = target_cluster
   568        # Do this unconditionally, because that's the point of this class.
   569        kwargs["service_manifests"] = integration_manifests.load("statsd_backend")
   570        super().__init__(*args, **kwargs)
   571
   572    def requirements(self):
   573        yield ("url", Query("http://%s/SUMMARY" % self.path.fqdn))
   574
   575
   576@abstract_test
   577class MappingTest(Test):
   578
   579    target: ServiceType
   580    options: Sequence["OptionTest"]
   581    parent: AmbassadorTest
   582
   583    no_local_mode = True
   584    skip_local_instead_of_xfail = "Plain (MappingTest)"
   585
   586    def init(self, target: ServiceType, options=()) -> None:
   587        self.target = target
   588        self.options = list(options)
   589        self.is_ambassador = True
   590
   591
   592@abstract_test
   593class OptionTest(Test):
   594
   595    VALUES: ClassVar[Any] = None
   596    value: Any
   597    parent: Test
   598
   599    no_local_mode = True
   600    skip_local_instead_of_xfail = "Plain (OptionTests)"
   601
   602    @classmethod
   603    def variants(cls) -> Generator[Node, None, None]:
   604        if cls.VALUES is None:
   605            yield cls()
   606        else:
   607            for val in cls.VALUES:
   608                yield cls(val, name=sanitize(val))
   609
   610    def init(self, value=None):
   611        self.value = value

View as plain text