...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/abstract_tests.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1import base64
     2import os
     3import shutil
     4import subprocess
     5import sys
     6from typing import Any, ClassVar, Generator, List, Optional, Sequence, Tuple, Union
     7from typing import cast as typecast
     8
     9import pytest
    10import yaml
    11
    12# These type: ignores are because, weirdly, the yaml.CSafe* variants don't share
    13# a type with their non-C variants. No clue why not.
    14yaml_loader = yaml.SafeLoader  # type: ignore
    15yaml_dumper = yaml.SafeDumper  # type: ignore
    16
    17try:
    18    yaml_loader = yaml.CSafeLoader  # type: ignore
    19    yaml_dumper = yaml.CSafeDumper  # type: ignore
    20except AttributeError:
    21    pass
    22
    23import tests.integration.manifests as integration_manifests
    24from kat.harness import Name, Node, Query, Test, abstract_test, sanitize
    25from kat.utils import ShellCommand
    26
    27AMBASSADOR_LOCAL = """
    28---
    29apiVersion: v1
    30kind: Secret
    31metadata:
    32  name: {self.path.k8s}
    33  annotations:
    34    kubernetes.io/service-account.name: {self.path.k8s}
    35type: kubernetes.io/service-account-token
    36"""
    37
    38
    39def assert_default_errors(errors, include_ingress_errors=True):
    40    default_errors = [
    41        [
    42            "",
    43            "Ambassador could not find core CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    44        ],
    45        [
    46            "",
    47            "Ambassador could not find Resolver type CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    48        ],
    49        [
    50            "",
    51            "Ambassador could not find the Host CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    52        ],
    53        [
    54            "",
    55            "Ambassador could not find the LogService CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
    56        ],
    57    ]
    58
    59    if include_ingress_errors:
    60        default_errors.append(
    61            [
    62                "",
    63                "Ambassador is not permitted to read Ingress resources. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/running/ingress-controller/#ambassador-as-an-ingress-controller for more information. You can continue using Ambassador, but Ingress resources will be ignored...",
    64            ]
    65        )
    66
    67    number_of_default_errors = len(default_errors)
    68
    69    if errors[:number_of_default_errors] != default_errors:
    70        assert False, f"default error table mismatch: got\n{errors}"
    71
    72    for error in errors[number_of_default_errors:]:
    73        assert (
    74            "found invalid port" in error[1]
    75        ), "Could not find 'found invalid port' in the error {}".format(error[1])
    76
    77
    78DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
    79
    80
    81@abstract_test
    82class AmbassadorTest(Test):
    83
    84    """
    85    AmbassadorTest is a top level ambassador test.
    86    """
    87
    88    OFFSET: ClassVar[int] = 0
    89    IMAGE_BUILT: ClassVar[bool] = False
    90
    91    _index: Optional[int] = None
    92    _ambassador_id: Optional[str] = None
    93    single_namespace: bool = False
    94    disable_endpoints: bool = False
    95    name: str
    96    path: Name
    97    extra_ports: Optional[List[int]] = None
    98    debug_diagd: bool = True
    99    debug_envoy: bool = False
   100    manifest_envs = ""
   101    is_ambassador = True
   102    allow_edge_stack_redirect = False
   103    edge_stack_cleartext_host = True
   104    envoy_api_version: Optional[str] = None
   105
   106    env: List[str] = []
   107
   108    def manifests(self) -> str:
   109        rbac = integration_manifests.load("rbac_cluster_scope")
   110
   111        self.manifest_envs += """
   112    - name: POLL_EVERY_SECS
   113      value: "0"
   114    - name: CONSUL_WATCHER_PORT
   115      value: "8500"
   116"""
   117
   118        if os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true").lower() == "false":
   119            self.manifest_envs += """
   120    - name: AMBASSADOR_FAST_RECONFIGURE
   121      value: "false"
   122"""
   123
   124        amb_debug = []
   125        if self.debug_diagd:
   126            amb_debug.append("diagd")
   127        if self.debug_envoy:
   128            amb_debug.append("envoy")
   129        if amb_debug:
   130            self.manifest_envs += """
   131    - name: AMBASSADOR_DEBUG
   132      value: "%s"
   133    - name: AES_LOG_LEVEL
   134      value: "debug"
   135""" % ":".join(
   136                amb_debug
   137            )
   138
   139        if self.ambassador_id:
   140            self.manifest_envs += f"""
   141    - name: AMBASSADOR_LABEL_SELECTOR
   142      value: "kat-ambassador-id={self.ambassador_id}"
   143"""
   144
   145        if self.single_namespace:
   146            self.manifest_envs += """
   147    - name: AMBASSADOR_SINGLE_NAMESPACE
   148      value: "yes"
   149"""
   150            rbac = integration_manifests.load("rbac_namespace_scope")
   151
   152        if self.disable_endpoints:
   153            self.manifest_envs += """
   154    - name: AMBASSADOR_DISABLE_ENDPOINTS
   155      value: "yes"
   156"""
   157        if not self.allow_edge_stack_redirect:
   158            self.manifest_envs += """
   159    - name: AMBASSADOR_NO_TLS_REDIRECT
   160      value: "yes"
   161"""
   162
   163        if self.envoy_api_version is not None:
   164            self.manifest_envs += f"""
   165    - name: AMBASSADOR_ENVOY_API_VERSION
   166      value: "{self.envoy_api_version}"
   167"""
   168        elif os.environ.get("AMBASSADOR_ENVOY_API_VERSION", "") != "":
   169            self.manifest_envs += (
   170                """
   171    - name: AMBASSADOR_ENVOY_API_VERSION
   172      value: "%s"
   173"""
   174                % os.environ["AMBASSADOR_ENVOY_API_VERSION"]
   175            )
   176
   177        eports = ""
   178
   179        if self.extra_ports:
   180            for port in self.extra_ports:
   181                eports += f"""
   182  - name: extra-{port}
   183    protocol: TCP
   184    port: {port}
   185    targetPort: {port}
   186"""
   187
   188        if DEV:
   189            return self.format(rbac + AMBASSADOR_LOCAL, extra_ports=eports)
   190        else:
   191            return self.format(
   192                rbac + integration_manifests.load("ambassador"),
   193                envs=self.manifest_envs,
   194                extra_ports=eports,
   195                capabilities_block="",
   196            )
   197
   198    @property
   199    def index(self) -> int:
   200        if self._index is None:
   201            # lock here?
   202            self._index = AmbassadorTest.OFFSET
   203            AmbassadorTest.OFFSET += 1
   204
   205        return typecast(int, self._index)
   206
   207    def post_manifest(self):
   208        if not DEV:
   209            return
   210
   211        if os.environ.get("KAT_SKIP_DOCKER"):
   212            return
   213
   214        image = os.environ["AMBASSADOR_DOCKER_IMAGE"]
   215        cached_image = os.environ["BASE_PY_IMAGE"]
   216        ambassador_base_image = os.environ["BASE_GO_IMAGE"]
   217
   218        if not AmbassadorTest.IMAGE_BUILT:
   219            AmbassadorTest.IMAGE_BUILT = True
   220
   221            cmd = ShellCommand(
   222                "docker", "ps", "-a", "-f", "label=kat-family=ambassador", "--format", "{{.ID}}"
   223            )
   224
   225            if cmd.check("find old docker container IDs"):
   226                ids = cmd.stdout.split("\n")
   227
   228                while ids:
   229                    if ids[-1]:
   230                        break
   231
   232                    ids.pop()
   233
   234                if ids:
   235                    print("Killing old containers...")
   236                    ShellCommand.run("kill old containers", "docker", "kill", *ids, verbose=True)
   237                    ShellCommand.run("rm old containers", "docker", "rm", *ids, verbose=True)
   238
   239            context = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
   240
   241            print("Starting docker build...", end="")
   242            sys.stdout.flush()
   243
   244            cmd = ShellCommand(
   245                "docker",
   246                "build",
   247                "--build-arg",
   248                "BASE_PY_IMAGE={}".format(cached_image),
   249                "--build-arg",
   250                "BASE_GO_IMAGE={}".format(ambassador_base_image),
   251                context,
   252                "-t",
   253                image,
   254            )
   255
   256            if cmd.check("docker build Ambassador image"):
   257                print("done.")
   258            else:
   259                pytest.exit("container failed to build")
   260
   261        fname = "/tmp/k8s-%s.yaml" % self.path.k8s
   262        if os.path.exists(fname):
   263            with open(fname) as fd:
   264                content = fd.read()
   265        else:
   266            nsp = getattr(self, "namespace", None) or "default"
   267
   268            cmd = ShellCommand(
   269                "tools/bin/kubectl", "get", "-n", nsp, "-o", "yaml", "secret", self.path.k8s
   270            )
   271
   272            if not cmd.check(f"fetch secret for {self.path.k8s}"):
   273                pytest.exit(f"could not fetch secret for {self.path.k8s}")
   274
   275            content = cmd.stdout
   276
   277            with open(fname, "wb") as fd:
   278                fd.write(content.encode("utf-8"))
   279
   280        try:
   281            secret = yaml.load(content, Loader=yaml_loader)
   282        except Exception as e:
   283            print("could not parse YAML:\n%s" % content)
   284            raise e
   285
   286        data = secret["data"]
   287        # secret_dir = tempfile.mkdtemp(prefix=self.path.k8s, suffix="secret")
   288        secret_dir = "/tmp/%s-ambassadormixin-%s" % (self.path.k8s, "secret")
   289
   290        shutil.rmtree(secret_dir, ignore_errors=True)
   291        os.mkdir(secret_dir, 0o777)
   292
   293        for k, v in data.items():
   294            with open(os.path.join(secret_dir, k), "wb") as f:
   295                f.write(base64.decodebytes(bytes(v, "utf8")))
   296        print("Launching %s container." % self.path.k8s)
   297        command = ["docker", "run", "-d", "-l", "kat-family=ambassador", "--name", self.path.k8s]
   298
   299        envs = [
   300            "KUBERNETES_SERVICE_HOST=kubernetes",
   301            "KUBERNETES_SERVICE_PORT=443",
   302            "AMBASSADOR_SNAPSHOT_COUNT=1",
   303            "AMBASSADOR_CONFIG_BASE_DIR=/tmp/ambassador",
   304            "POLL_EVERY_SECS=0",
   305            "CONSUL_WATCHER_PORT=8500",
   306            "AMBASSADOR_UPDATE_MAPPING_STATUS=false",
   307            "AMBASSADOR_ID=%s" % self.ambassador_id,
   308        ]
   309
   310        if self.namespace:
   311            envs.append("AMBASSADOR_NAMESPACE=%s" % self.namespace)
   312
   313        if self.single_namespace:
   314            envs.append("AMBASSADOR_SINGLE_NAMESPACE=yes")
   315
   316        if self.disable_endpoints:
   317            envs.append("AMBASSADOR_DISABLE_ENDPOINTS=yes")
   318
   319        amb_debug = []
   320        if self.debug_diagd:
   321            amb_debug.append("diagd")
   322        if self.debug_envoy:
   323            amb_debug.append("envoy")
   324        if amb_debug:
   325            envs.append("AMBASSADOR_DEBUG=%s" % ":".join(amb_debug))
   326
   327        envs.extend(self.env)
   328        [command.extend(["-e", env]) for env in envs]
   329
   330        ports = [
   331            "%s:8877" % (8877 + self.index),
   332            "%s:8001" % (8001 + self.index),
   333            "%s:8080" % (8080 + self.index),
   334            "%s:8443" % (8443 + self.index),
   335        ]
   336
   337        if self.extra_ports:
   338            for port in self.extra_ports:
   339                ports.append(f"{port}:{port}")
   340
   341        [command.extend(["-p", port]) for port in ports]
   342
   343        volumes = ["%s:/var/run/secrets/kubernetes.io/serviceaccount" % secret_dir]
   344        [command.extend(["-v", volume]) for volume in volumes]
   345
   346        command.append(image)
   347
   348        if os.environ.get("KAT_SHOW_DOCKER"):
   349            print(" ".join(command))
   350
   351        cmd = ShellCommand(*command)
   352
   353        if not cmd.check(f"start container for {self.path.k8s}"):
   354            pytest.exit(f"could not start container for {self.path.k8s}")
   355
   356    def queries(self):
   357        if DEV:
   358            cmd = ShellCommand("docker", "ps", "-qf", "name=%s" % self.path.k8s)
   359
   360            if not cmd.check(f"docker check for {self.path.k8s}"):
   361                if not cmd.stdout.strip():
   362                    log_cmd = ShellCommand(
   363                        "docker", "logs", self.path.k8s, stderr=subprocess.STDOUT
   364                    )
   365
   366                    if log_cmd.check(f"docker logs for {self.path.k8s}"):
   367                        print(cmd.stdout)
   368
   369                    pytest.exit(f"container failed to start for {self.path.k8s}")
   370
   371        return ()
   372
   373    def scheme(self) -> str:
   374        return "http"
   375
   376    def url(self, prefix, scheme=None, port=None) -> str:
   377        if scheme is None:
   378            scheme = self.scheme()
   379
   380        if DEV:
   381            if not port:
   382                port = 8443 if scheme == "https" else 8080
   383                port += self.index
   384
   385            return "%s://%s/%s" % (scheme, "localhost:%s" % port, prefix)
   386        else:
   387            host_and_port = self.path.fqdn
   388
   389            if port:
   390                host_and_port += f":{port}"
   391
   392            return "%s://%s/%s" % (scheme, host_and_port, prefix)
   393
   394    def requirements(self):
   395        yield ("url", Query(self.url("ambassador/v0/check_ready")))
   396        yield ("url", Query(self.url("ambassador/v0/check_alive")))
   397
   398
   399@abstract_test
   400class ServiceType(Node):
   401
   402    path: Name
   403    _manifests: Optional[str]
   404    use_superpod: bool = True
   405
   406    def __init__(
   407        self, service_manifests: str = None, namespace: str = None, *args, **kwargs
   408    ) -> None:
   409        super().__init__(namespace=namespace, *args, **kwargs)
   410
   411        self._manifests = service_manifests
   412
   413        if self._manifests:
   414            self.use_superpod = False
   415
   416    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   417        yield from ()
   418
   419    def manifests(self):
   420        if self.use_superpod:
   421            return None
   422
   423        return self.format(self._manifests)
   424
   425    def requirements(self):
   426        if self.use_superpod:
   427            yield from ()
   428
   429        yield ("url", Query("http://%s" % self.path.fqdn))
   430        yield ("url", Query("https://%s" % self.path.fqdn))
   431
   432
   433@abstract_test
   434class ServiceTypeGrpc(Node):
   435
   436    path: Name
   437
   438    def __init__(self, service_manifests: str = None, *args, **kwargs) -> None:
   439        super().__init__(*args, **kwargs)
   440        self._manifests = service_manifests or integration_manifests.load("backend")
   441
   442    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   443        yield from ()
   444
   445    def manifests(self):
   446        return self.format(self._manifests)
   447
   448    def requirements(self):
   449        yield ("url", Query("http://%s" % self.path.fqdn))
   450        yield ("url", Query("https://%s" % self.path.fqdn))
   451
   452
   453class HTTP(ServiceType):
   454    pass
   455
   456
   457class GRPC(ServiceType):
   458    pass
   459
   460
   461class EGRPC(ServiceType):
   462    skip_variant: ClassVar[bool] = True
   463
   464    def __init__(self, *args, **kwargs) -> None:
   465        # Do this unconditionally, because that's the point of this class.
   466        kwargs["service_manifests"] = integration_manifests.load("grpc_echo_backend")
   467        super().__init__(*args, **kwargs)
   468
   469    def requirements(self):
   470        yield (
   471            "url",
   472            Query(
   473                "http://%s/echo.EchoService/Echo" % self.path.fqdn,
   474                headers={"content-type": "application/grpc", "requested-status": "0"},
   475                expected=200,
   476                grpc_type="real",
   477            ),
   478        )
   479
   480
   481class AHTTP(ServiceType):
   482    skip_variant: ClassVar[bool] = True
   483
   484    def __init__(self, *args, **kwargs) -> None:
   485        # Do this unconditionally, because that's the point of this class.
   486        kwargs["service_manifests"] = integration_manifests.load("auth_backend")
   487        super().__init__(*args, **kwargs)
   488
   489
   490class AGRPC(ServiceType):
   491    skip_variant: ClassVar[bool] = True
   492
   493    def __init__(self, protocol_version: str = "v2", *args, **kwargs) -> None:
   494        self.protocol_version = protocol_version
   495
   496        # Do this unconditionally, because that's the point of this class.
   497        kwargs["service_manifests"] = integration_manifests.load("grpc_auth_backend")
   498        super().__init__(*args, **kwargs)
   499
   500    def requirements(self):
   501        yield ("pod", self.path.k8s)
   502
   503
   504class RLSGRPC(ServiceType):
   505    skip_variant: ClassVar[bool] = True
   506
   507    def __init__(self, protocol_version: str = "v2", *args, **kwargs) -> None:
   508        self.protocol_version = protocol_version
   509
   510        # Do this unconditionally, because that's the point of this class.
   511        kwargs["service_manifests"] = integration_manifests.load("grpc_rls_backend")
   512        super().__init__(*args, **kwargs)
   513
   514    def requirements(self):
   515        yield ("pod", self.path.k8s)
   516
   517
   518class ALSGRPC(ServiceType):
   519    skip_variant: ClassVar[bool] = True
   520
   521    def __init__(self, *args, **kwargs) -> None:
   522        # Do this unconditionally, because that's the point of this class.
   523        kwargs["service_manifests"] = integration_manifests.load("grpc_als_backend")
   524        super().__init__(*args, **kwargs)
   525
   526    def requirements(self):
   527        yield ("pod", self.path.k8s)
   528
   529
   530class HTTPBin(ServiceType):
   531    skip_variant: ClassVar[bool] = True
   532
   533    def __init__(self, *args, **kwargs) -> None:
   534        # Do this unconditionally, because that's the point of this class.
   535        kwargs["service_manifests"] = integration_manifests.load("httpbin_backend")
   536        super().__init__(*args, **kwargs)
   537
   538    def requirements(self):
   539        yield ("url", Query("http://%s/status/200" % self.path.fqdn))
   540
   541
   542class WebsocketEcho(ServiceType):
   543    skip_variant: ClassVar[bool] = True
   544
   545    def __init__(self, *args, **kwargs) -> None:
   546        # Do this unconditionally, because that's the point of this class.
   547        kwargs["service_manifests"] = integration_manifests.load("websocket_echo_backend")
   548        super().__init__(*args, **kwargs)
   549
   550    def requirements(self):
   551        yield ("url", Query("http://%s/" % self.path.fqdn, expected=404))
   552
   553
   554class StatsDSink(ServiceType):
   555    skip_variant: ClassVar[bool] = True
   556    target_cluster: str
   557
   558    def __init__(self, target_cluster: str, *args, **kwargs) -> None:
   559        self.target_cluster = target_cluster
   560        # Do this unconditionally, because that's the point of this class.
   561        kwargs["service_manifests"] = integration_manifests.load("statsd_backend")
   562        super().__init__(*args, **kwargs)
   563
   564    def requirements(self):
   565        yield ("url", Query("http://%s/SUMMARY" % self.path.fqdn))
   566
   567
   568@abstract_test
   569class MappingTest(Test):
   570
   571    target: ServiceType
   572    options: Sequence["OptionTest"]
   573    parent: AmbassadorTest
   574
   575    no_local_mode = True
   576    skip_local_instead_of_xfail = "Plain (MappingTest)"
   577
   578    def init(self, target: ServiceType, options=()) -> None:
   579        self.target = target
   580        self.options = list(options)
   581        self.is_ambassador = True
   582
   583
   584@abstract_test
   585class OptionTest(Test):
   586
   587    VALUES: ClassVar[Any] = None
   588    value: Any
   589    parent: Test
   590
   591    no_local_mode = True
   592    skip_local_instead_of_xfail = "Plain (OptionTests)"
   593
   594    @classmethod
   595    def variants(cls) -> Generator[Node, None, None]:
   596        if cls.VALUES is None:
   597            yield cls()
   598        else:
   599            for val in cls.VALUES:
   600                yield cls(val, name=sanitize(val))
   601
   602    def init(self, value=None):
   603        self.value = value

View as plain text