...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/kat/t_mappingtests_plain.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/kat

     1from typing import Dict, Generator, Tuple, Union
     2
     3from abstract_tests import (
     4    HTTP,
     5    AmbassadorTest,
     6    HTTPBin,
     7    MappingTest,
     8    Node,
     9    OptionTest,
    10    ServiceType,
    11    WebsocketEcho,
    12)
    13from ambassador.constants import Constants
    14from kat.harness import EDGE_STACK, Query, variants
    15
    16# This is the place to add new MappingTests.
    17
    18
    19def unique(options):
    20    added = set()
    21    result = []
    22    for o in options:
    23        if o.__class__ not in added:
    24            added.add(o.__class__)
    25            result.append(o)
    26    return tuple(result)
    27
    28
    29class SimpleMapping(MappingTest):
    30
    31    parent: AmbassadorTest
    32    target: ServiceType
    33
    34    @classmethod
    35    def variants(cls) -> Generator[Node, None, None]:
    36        for st in variants(ServiceType):
    37            yield cls(st, name="{self.target.name}")
    38
    39            for mot in variants(OptionTest):
    40                yield cls(st, (mot,), name="{self.target.name}-{self.options[0].name}")
    41
    42            yield cls(
    43                st,
    44                unique(v for v in variants(OptionTest) if not getattr(v, "isolated", False)),
    45                name="{self.target.name}-all",
    46            )
    47
    48    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    49        yield self, self.format(
    50            """
    51---
    52apiVersion: getambassador.io/v3alpha1
    53kind: Mapping
    54name:  {self.name}
    55hostname: "*"
    56prefix: /{self.name}/
    57service: http://{self.target.path.fqdn}
    58"""
    59        )
    60
    61    def queries(self):
    62        yield Query(self.parent.url(self.name + "/"))
    63        yield Query(self.parent.url(f"need-normalization/../{self.name}/"))
    64
    65    def check(self):
    66        for r in self.results:
    67            if r.backend:
    68                assert r.backend.name == self.target.path.k8s, (
    69                    r.backend.name,
    70                    self.target.path.k8s,
    71                )
    72                assert r.backend.request
    73                assert r.backend.request.headers["x-envoy-original-path"][0] == f"/{self.name}/"
    74
    75
    76class SimpleMappingIngress(MappingTest):
    77
    78    parent: AmbassadorTest
    79    target: ServiceType
    80
    81    @classmethod
    82    def variants(cls) -> Generator[Node, None, None]:
    83        for st in variants(ServiceType):
    84            yield cls(st, name="{self.target.name}")
    85
    86    def manifests(self) -> str:
    87        return f"""
    88apiVersion: networking.k8s.io/v1
    89kind: Ingress
    90metadata:
    91  annotations:
    92    kubernetes.io/ingress.class: ambassador
    93    getambassador.io/ambassador-id: plain
    94  name: {self.name.lower()}
    95spec:
    96  rules:
    97  - http:
    98      paths:
    99      - backend:
   100          service:
   101            name: {self.target.path.k8s}
   102            port:
   103              number: 80
   104        path: /{self.name}/
   105        pathType: Prefix
   106"""
   107
   108    def queries(self):
   109        yield Query(self.parent.url(self.name + "/"))  # , xfail="IHA hostglob")
   110        yield Query(
   111            self.parent.url(f"need-normalization/../{self.name}/")
   112        )  # , xfail="IHA hostglob")
   113
   114    def check(self):
   115        for r in self.results:
   116            if r.backend:
   117                assert r.backend.name == self.target.path.k8s, (
   118                    r.backend.name,
   119                    self.target.path.k8s,
   120                )
   121                assert r.backend.request
   122                assert r.backend.request.headers["x-envoy-original-path"][0] == f"/{self.name}/"
   123
   124
   125# Disabled SimpleMappingIngressDefaultBackend since adding a default fallback mapping would break other
   126# assertions, expecting to 404 if mappings don't match in Plain.
   127# class SimpleMappingIngressDefaultBackend(MappingTest):
   128#
   129#     parent: AmbassadorTest
   130#     target: ServiceType
   131#
   132#     @classmethod
   133#    def variants(cls) -> Generator[Node, None, None]:
   134#         for st in variants(ServiceType):
   135#             yield cls(st, name="{self.target.name}")
   136#
   137#     def manifests(self) -> str:
   138#         return f"""
   139# apiVersion: networking.k8s.io/v1
   140# kind: Ingress
   141# metadata:
   142#   annotations:
   143#     kubernetes.io/ingress.class: ambassador
   144#     getambassador.io/ambassador-id: plain
   145#   name: {self.name.lower()}
   146# spec:
   147#   backend:
   148#     service:
   149#       name: {self.target.path.k8s}
   150#       port:
   151#         number: 80
   152# """
   153#
   154#     def queries(self):
   155#         yield Query(self.parent.url(self.name))
   156#
   157#     def check(self):
   158#         for r in self.results:
   159#             if r.backend:
   160#                 assert r.backend.name == self.target.path.k8s, (r.backend.name, self.target.path.k8s)
   161#                 assert r.backend.request.headers['x-envoy-original-path'][0] == f'/{self.name}'
   162
   163
   164class SimpleIngressWithAnnotations(MappingTest):
   165
   166    parent: AmbassadorTest
   167    target: ServiceType
   168
   169    @classmethod
   170    def variants(cls) -> Generator[Node, None, None]:
   171        for st in variants(ServiceType):
   172            yield cls(st, name="{self.target.name}")
   173
   174    def manifests(self) -> str:
   175        return f"""
   176apiVersion: networking.k8s.io/v1
   177kind: Ingress
   178metadata:
   179  annotations:
   180    kubernetes.io/ingress.class: ambassador
   181    getambassador.io/ambassador-id: plain
   182    getambassador.io/config: |
   183      ---
   184      apiVersion: getambassador.io/v3alpha1
   185      kind: Mapping
   186      name:  {self.name}-nested
   187      hostname: "*"
   188      prefix: /{self.name}-nested/
   189      service: http://{self.target.path.fqdn}
   190      ambassador_id: [plain]
   191  name: {self.name.lower()}
   192spec:
   193  rules:
   194  - http:
   195      paths:
   196      - backend:
   197          service:
   198            name: {self.target.path.k8s}
   199            port:
   200              number: 80
   201        path: /{self.name}/
   202        pathType: Prefix
   203"""
   204
   205    def queries(self):
   206        yield Query(self.parent.url(self.name + "/"))  # , xfail="IHA hostglob")
   207        yield Query(
   208            self.parent.url(f"need-normalization/../{self.name}/")
   209        )  # , xfail="IHA hostglob")
   210        yield Query(self.parent.url(self.name + "-nested/"))  # , xfail="IHA hostglob")
   211        yield Query(
   212            self.parent.url(self.name + "-non-existent/"), expected=404
   213        )  # , xfail="IHA hostglob")
   214
   215    def check(self):
   216        for r in self.results:
   217            if r.backend:
   218                assert r.backend.name == self.target.path.k8s, (
   219                    r.backend.name,
   220                    self.target.path.k8s,
   221                )
   222                assert r.backend.request
   223                assert r.backend.request.headers["x-envoy-original-path"][0] in (
   224                    f"/{self.name}/",
   225                    f"/{self.name}-nested/",
   226                )
   227
   228
   229class HostHeaderMappingIngress(MappingTest):
   230
   231    parent: AmbassadorTest
   232
   233    @classmethod
   234    def variants(cls) -> Generator[Node, None, None]:
   235        for st in variants(ServiceType):
   236            yield cls(st, name="{self.target.name}")
   237
   238    def manifests(self) -> str:
   239        return f"""
   240apiVersion: networking.k8s.io/v1
   241kind: Ingress
   242metadata:
   243  annotations:
   244    kubernetes.io/ingress.class: ambassador
   245    getambassador.io/ambassador-id: plain
   246  name: {self.name.lower()}
   247spec:
   248  rules:
   249  - host: inspector.external
   250    http:
   251      paths:
   252      - backend:
   253          service:
   254            name: {self.target.path.k8s}
   255            port:
   256              number: 80
   257        path: /{self.name}/
   258        pathType: Prefix
   259"""
   260
   261    def queries(self):
   262        yield Query(self.parent.url(self.name + "/"), expected=404)
   263        yield Query(
   264            self.parent.url(self.name + "/"), headers={"Host": "inspector.internal"}, expected=404
   265        )
   266        yield Query(self.parent.url(self.name + "/"), headers={"Host": "inspector.external"})
   267
   268
   269class HostHeaderMapping(MappingTest):
   270
   271    parent: AmbassadorTest
   272
   273    @classmethod
   274    def variants(cls) -> Generator[Node, None, None]:
   275        for st in variants(ServiceType):
   276            yield cls(st, name="{self.target.name}")
   277
   278    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   279        yield self, self.format(
   280            """
   281---
   282apiVersion: getambassador.io/v3alpha1
   283kind: Mapping
   284name:  {self.name}
   285prefix: /{self.name}/
   286service: http://{self.target.path.fqdn}
   287host: inspector.external
   288"""
   289        )
   290
   291    def queries(self):
   292        yield Query(self.parent.url(self.name + "/"), expected=404)
   293        yield Query(
   294            self.parent.url(self.name + "/"), headers={"Host": "inspector.internal"}, expected=404
   295        )
   296        yield Query(self.parent.url(self.name + "/"), headers={"Host": "inspector.external"})
   297        # Test that a host header with a port value that does match the listener's configured port is not
   298        # stripped for the purpose of routing, so it does not match the Mapping. This is the default behavior,
   299        # and can be overridden using `strip_matching_host_port`, tested below.
   300        yield Query(
   301            self.parent.url(self.name + "/"),
   302            headers={"Host": "inspector.external:" + str(Constants.SERVICE_PORT_HTTP)},
   303            expected=404,
   304        )
   305
   306
   307class InvalidPortMapping(MappingTest):
   308
   309    parent: AmbassadorTest
   310
   311    @classmethod
   312    def variants(cls) -> Generator[Node, None, None]:
   313        for st in variants(ServiceType):
   314            yield cls(st, name="{self.target.name}")
   315
   316    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   317        yield self, self.format(
   318            """
   319---
   320apiVersion: getambassador.io/v3alpha1
   321kind: Mapping
   322name:  {self.name}
   323hostname: "*"
   324prefix: /{self.name}/
   325service: http://{self.target.path.fqdn}:80.invalid
   326"""
   327        )
   328
   329    def queries(self):
   330        yield Query(self.parent.url("ambassador/v0/diag/?json=true&filter=errors"))
   331
   332    def check(self):
   333        error_string = "found invalid port for service"
   334        found_error = False
   335        for error_list in self.results[0].json:
   336            for error in error_list:
   337                if error.find(error_string) != -1:
   338                    found_error = True
   339        assert found_error, "could not find the relevant error - {}".format(error_string)
   340
   341
   342class WebSocketMapping(MappingTest):
   343
   344    parent: AmbassadorTest
   345    target: ServiceType
   346
   347    def init(self):
   348        MappingTest.init(self, WebsocketEcho())
   349
   350    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   351        yield self, self.format(
   352            """
   353---
   354apiVersion: getambassador.io/v3alpha1
   355kind: Mapping
   356name:  {self.name}
   357hostname: "*"
   358prefix: /{self.name}/
   359service: {self.target.path.fqdn}
   360use_websocket: true
   361"""
   362        )
   363
   364    def queries(self):
   365        yield Query(self.parent.url(self.name + "/"), expected=404)
   366
   367        yield Query(self.parent.url(self.name + "/", scheme="ws"), messages=["one", "two", "three"])
   368
   369    def check(self):
   370        assert self.results[-1].messages == ["one", "two", "three"], "invalid messages: %s" % repr(
   371            self.results[-1].messages
   372        )
   373
   374
   375class TLSOrigination(MappingTest):
   376
   377    parent: AmbassadorTest
   378    definition: str
   379
   380    IMPLICIT = """
   381---
   382apiVersion: getambassador.io/v3alpha1
   383kind: Mapping
   384name:  {self.name}
   385hostname: "*"
   386prefix: /{self.name}/
   387service: https://{self.target.path.fqdn}
   388"""
   389
   390    EXPLICIT = """
   391---
   392apiVersion: getambassador.io/v3alpha1
   393kind: Mapping
   394name:  {self.name}
   395hostname: "*"
   396prefix: /{self.name}/
   397service: https://{self.target.path.fqdn}
   398"""
   399
   400    @classmethod
   401    def variants(cls) -> Generator[Node, None, None]:
   402        for v in variants(ServiceType):
   403            for name, dfn in ("IMPLICIT", cls.IMPLICIT), ("EXPLICIT", cls.EXPLICIT):
   404                yield cls(v, dfn, name="{self.target.name}-%s" % name)
   405
   406    def init(self, target, definition):
   407        MappingTest.init(self, target)
   408        self.definition = definition
   409
   410    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   411        yield self.target, self.format(self.definition)
   412
   413    def queries(self):
   414        yield Query(self.parent.url(self.name + "/"))
   415
   416    def check(self):
   417        for r in self.results:
   418            assert r.backend
   419            assert r.backend.request
   420            assert r.backend.request.tls.enabled
   421
   422
   423class HostRedirectMapping(MappingTest):
   424    parent: AmbassadorTest
   425    target: ServiceType
   426
   427    def init(self):
   428        MappingTest.init(self, HTTP())
   429
   430    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   431        yield self.target, self.format(
   432            """
   433---
   434apiVersion: getambassador.io/v3alpha1
   435kind: Mapping
   436name:  {self.name}
   437hostname: "*"
   438prefix: /{self.name}/
   439service: foobar.com
   440host_redirect: true
   441---
   442apiVersion: getambassador.io/v3alpha1
   443kind: Mapping
   444name:  {self.name}-2
   445hostname: "*"
   446prefix: /{self.name}-2/
   447case_sensitive: false
   448service: foobar.com
   449host_redirect: true
   450---
   451apiVersion: getambassador.io/v3alpha1
   452kind: Mapping
   453name:  {self.name}-3
   454hostname: "*"
   455prefix: /{self.name}-3/foo/
   456service: foobar.com
   457host_redirect: true
   458path_redirect: /redirect/
   459redirect_response_code: 302
   460---
   461apiVersion: getambassador.io/v3alpha1
   462kind: Mapping
   463name:  {self.name}-4
   464hostname: "*"
   465prefix: /{self.name}-4/foo/bar/baz
   466service: foobar.com
   467host_redirect: true
   468prefix_redirect: /foobar/baz
   469redirect_response_code: 307
   470---
   471apiVersion: getambassador.io/v3alpha1
   472kind: Mapping
   473name:  {self.name}-5
   474hostname: "*"
   475prefix: /{self.name}-5/assets/([a-f0-9]{{12}})/images
   476prefix_regex: true
   477service: foobar.com
   478host_redirect: true
   479regex_redirect:
   480  pattern: /{self.name}-5/assets/([a-f0-9]{{12}})/images
   481  substitution: /images/\\1
   482redirect_response_code: 308
   483"""
   484        )
   485
   486    def queries(self):
   487        # [0]
   488        yield Query(self.parent.url(self.name + "/anything?itworked=true"), expected=301)
   489
   490        # [1]
   491        yield Query(self.parent.url(self.name.upper() + "/anything?itworked=true"), expected=404)
   492
   493        # [2]
   494        yield Query(self.parent.url(self.name + "-2/anything?itworked=true"), expected=301)
   495
   496        # [3]
   497        yield Query(self.parent.url(self.name.upper() + "-2/anything?itworked=true"), expected=301)
   498
   499        # [4]
   500        yield Query(self.parent.url(self.name + "-3/foo/anything"), expected=302)
   501
   502        # [5]
   503        yield Query(self.parent.url(self.name + "-4/foo/bar/baz/anything"), expected=307)
   504
   505        # [6]
   506        yield Query(self.parent.url(self.name + "-5/assets/abcd0000f123/images"), expected=308)
   507
   508        # [7]
   509        yield Query(
   510            self.parent.url(self.name + "-5/assets/abcd0000f123/images?itworked=true"), expected=308
   511        )
   512
   513    def check(self):
   514        # [0]
   515        assert self.results[0].headers["Location"] == [
   516            self.format("http://foobar.com/{self.name}/anything?itworked=true")
   517        ], f"Unexpected Location {self.results[0].headers['Location']}"
   518
   519        # [1]
   520        assert self.results[1].status == 404
   521
   522        # [2]
   523        assert self.results[2].headers["Location"] == [
   524            self.format("http://foobar.com/{self.name}-2/anything?itworked=true")
   525        ], f"Unexpected Location {self.results[2].headers['Location']}"
   526
   527        # [3]
   528        assert self.results[3].headers["Location"] == [
   529            self.format("http://foobar.com/" + self.name.upper() + "-2/anything?itworked=true")
   530        ], f"Unexpected Location {self.results[3].headers['Location']}"
   531
   532        # [4]
   533        assert self.results[4].headers["Location"] == [
   534            self.format("http://foobar.com/redirect/")
   535        ], f"Unexpected Location {self.results[4].headers['Location']}"
   536
   537        # [5]
   538        assert self.results[5].headers["Location"] == [
   539            self.format("http://foobar.com/foobar/baz/anything")
   540        ], f"Unexpected Location {self.results[5].headers['Location']}"
   541
   542        # [6]
   543        assert self.results[6].headers["Location"] == [
   544            self.format("http://foobar.com/images/abcd0000f123")
   545        ], f"Unexpected Location {self.results[6].headers['Location']}"
   546
   547        # [7]
   548        assert self.results[7].headers["Location"] == [
   549            self.format("http://foobar.com/images/abcd0000f123?itworked=true")
   550        ], f"Unexpected Location {self.results[7].headers['Location']}"
   551
   552
   553class CanaryMapping(MappingTest):
   554
   555    parent: AmbassadorTest
   556    target: ServiceType
   557    canary: ServiceType
   558    weight: int
   559
   560    @classmethod
   561    def variants(cls) -> Generator[Node, None, None]:
   562        for v in variants(ServiceType):
   563            for w in (0, 10, 50, 100):
   564                yield cls(v, v.clone("canary"), w, name="{self.target.name}-{self.weight}")
   565
   566    # XXX This type: ignore is here because we're deliberately overriding the
   567    # parent's init to have a different signature... but it's also intimately
   568    # (nay, incestuously) related to the variant()'s yield() above, and I really
   569    # don't want to deal with that right now. So. We'll deal with it later.
   570    def init(self, target: ServiceType, canary: ServiceType, weight):  # type: ignore
   571        MappingTest.init(self, target)
   572        self.canary = canary
   573        self.weight = weight
   574
   575    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   576        yield self.target, self.format(
   577            """
   578---
   579apiVersion: getambassador.io/v3alpha1
   580kind: Mapping
   581name:  {self.name}
   582hostname: "*"
   583prefix: /{self.name}/
   584service: http://{self.target.path.fqdn}
   585"""
   586        )
   587        yield self.canary, self.format(
   588            """
   589---
   590apiVersion: getambassador.io/v3alpha1
   591kind: Mapping
   592name:  {self.name}-canary
   593hostname: "*"
   594prefix: /{self.name}/
   595service: http://{self.canary.path.fqdn}
   596weight: {self.weight}
   597"""
   598        )
   599
   600    def queries(self):
   601        for i in range(100):
   602            yield Query(self.parent.url(self.name + "/"))
   603
   604    def check(self):
   605        hist: Dict[str, int] = {}
   606
   607        for r in self.results:
   608            assert r.backend
   609            hist[r.backend.name] = hist.get(r.backend.name, 0) + 1
   610
   611        if self.weight == 0:
   612            assert hist.get(self.canary.path.k8s, 0) == 0
   613            assert hist.get(self.target.path.k8s, 0) == 100
   614        elif self.weight == 100:
   615            assert hist.get(self.canary.path.k8s, 0) == 100
   616            assert hist.get(self.target.path.k8s, 0) == 0
   617        else:
   618            canary = 100 * hist.get(self.canary.path.k8s, 0) / len(self.results)
   619            main = 100 * hist.get(self.target.path.k8s, 0) / len(self.results)
   620
   621            assert (
   622                abs(self.weight - canary) < 25
   623            ), f"weight {self.weight} routed {canary}% to canary"
   624            assert (
   625                abs(100 - (canary + main)) < 2
   626            ), f"weight {self.weight} routed only {canary + main}% at all?"
   627
   628
   629class CanaryDiffMapping(MappingTest):
   630
   631    parent: AmbassadorTest
   632    target: ServiceType
   633    canary: ServiceType
   634    weight: int
   635
   636    @classmethod
   637    def variants(cls) -> Generator[Node, None, None]:
   638        for v in variants(ServiceType):
   639            for w in (0, 10, 50, 100):
   640                yield cls(v, v.clone("canary"), w, name="{self.target.name}-{self.weight}")
   641
   642    # XXX This type: ignore is here because we're deliberately overriding the
   643    # parent's init to have a different signature... but it's also intimately
   644    # (nay, incestuously) related to the variant()'s yield() above, and I really
   645    # don't want to deal with that right now. So. We'll deal with it later.
   646    def init(self, target: ServiceType, canary: ServiceType, weight):  # type: ignore
   647        MappingTest.init(self, target)
   648        self.canary = canary
   649        self.weight = weight
   650
   651    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   652        yield self.target, self.format(
   653            """
   654---
   655apiVersion: getambassador.io/v3alpha1
   656kind: Mapping
   657name:  {self.name}
   658hostname: "*"
   659prefix: /{self.name}/
   660service: http://{self.target.path.fqdn}
   661host_rewrite: canary.1.example.com
   662"""
   663        )
   664        yield self.canary, self.format(
   665            """
   666---
   667apiVersion: getambassador.io/v3alpha1
   668kind: Mapping
   669name:  {self.name}-canary
   670hostname: "*"
   671prefix: /{self.name}/
   672service: http://{self.canary.path.fqdn}
   673host_rewrite: canary.2.example.com
   674weight: {self.weight}
   675"""
   676        )
   677
   678    def queries(self):
   679        for i in range(100):
   680            yield Query(self.parent.url(self.name + "/"))
   681
   682    def check(self):
   683        request_hosts = ["canary.1.example.com", "canary.2.example.com"]
   684
   685        hist: Dict[str, int] = {}
   686
   687        for r in self.results:
   688            assert r.backend
   689            hist[r.backend.name] = hist.get(r.backend.name, 0) + 1
   690            assert r.backend.request
   691            assert (
   692                r.backend.request.host in request_hosts
   693            ), f"Expected host {request_hosts}, got {r.backend.request.host}"
   694
   695        if self.weight == 0:
   696            assert hist.get(self.canary.path.k8s, 0) == 0
   697            assert hist.get(self.target.path.k8s, 0) == 100
   698        elif self.weight == 100:
   699            assert hist.get(self.canary.path.k8s, 0) == 100
   700            assert hist.get(self.target.path.k8s, 0) == 0
   701        else:
   702            canary = 100 * hist.get(self.canary.path.k8s, 0) / len(self.results)
   703            main = 100 * hist.get(self.target.path.k8s, 0) / len(self.results)
   704
   705            assert (
   706                abs(self.weight - canary) < 25
   707            ), f"weight {self.weight} routed {canary}% to canary"
   708            assert (
   709                abs(100 - (canary + main)) < 2
   710            ), f"weight {self.weight} routed only {canary + main}% at all?"
   711
   712
   713class AddRespHeadersMapping(MappingTest):
   714    parent: AmbassadorTest
   715    target: ServiceType
   716
   717    def init(self):
   718        MappingTest.init(self, HTTPBin())
   719
   720    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   721        yield self, self.format(
   722            """
   723---
   724apiVersion: getambassador.io/v3alpha1
   725kind: Mapping
   726name:  {self.name}
   727hostname: "*"
   728prefix: /{self.name}/
   729service: {self.target.path.fqdn}
   730add_response_headers:
   731    koo:
   732        append: False
   733        value: KooK
   734    zoo:
   735        append: True
   736        value: ZooZ
   737    test:
   738        value: boo
   739    foo:
   740        value: Foo
   741"""
   742        )
   743
   744    def queries(self):
   745        yield Query(self.parent.url(self.name) + "/response-headers?zoo=Zoo&test=Test&koo=Koot")
   746
   747    def check(self):
   748        for r in self.results:
   749            if r.headers:
   750                # print(r.headers)
   751                assert r.headers["Koo"] == ["KooK"]
   752                assert r.headers["Zoo"] == ["Zoo", "ZooZ"]
   753                assert r.headers["Test"] == ["Test", "boo"]
   754                assert r.headers["Foo"] == ["Foo"]
   755
   756
   757# To make sure queries to Edge stack related paths adds X-Content-Type-Options = nosniff in the response header
   758# and not to any other mappings/routes
   759class EdgeStackMapping(MappingTest):
   760    parent: AmbassadorTest
   761    target: ServiceType
   762
   763    def init(self):
   764        MappingTest.init(self, HTTP())
   765
   766        if not EDGE_STACK:
   767            self.skip_node = True
   768
   769    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   770        yield self.target, self.format(
   771            """
   772---
   773apiVersion: getambassador.io/v3alpha1
   774kind: Mapping
   775name:  {self.name}
   776hostname: "*"
   777prefix: /{self.name}/
   778service: http://{self.target.path.fqdn}
   779"""
   780        )
   781
   782    def queries(self):
   783        yield Query(self.parent.url("edge_stack/admin/"), expected=404)
   784        yield Query(self.parent.url(self.name + "/"), expected=200)
   785
   786    def check(self):
   787        # assert self.results[0].headers['X-Content-Type-Options'] == ['nosniff']
   788        assert "X-Content-Type-Options" not in self.results[1].headers
   789
   790
   791class RemoveReqHeadersMapping(MappingTest):
   792    parent: AmbassadorTest
   793    target: ServiceType
   794
   795    def init(self):
   796        MappingTest.init(self, HTTPBin())
   797
   798    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   799        yield self, self.format(
   800            """
   801---
   802apiVersion: getambassador.io/v3alpha1
   803kind: Mapping
   804name:  {self.name}
   805hostname: "*"
   806prefix: /{self.name}/
   807service: {self.target.path.fqdn}
   808remove_request_headers:
   809- zoo
   810- aoo
   811"""
   812        )
   813
   814    def queries(self):
   815        yield Query(
   816            self.parent.url(self.name + "/headers"),
   817            headers={"zoo": "ZooZ", "aoo": "AooA", "foo": "FooF"},
   818        )
   819
   820    def check(self):
   821        for r in self.results:
   822            # print(r.json)
   823            if "headers" in r.json:
   824                assert r.json["headers"]["Foo"] == "FooF"
   825                assert "Zoo" not in r.json["headers"]
   826                assert "Aoo" not in r.json["headers"]
   827
   828
   829class AddReqHeadersMapping(MappingTest):
   830    parent: AmbassadorTest
   831    target: ServiceType
   832
   833    @classmethod
   834    def variants(cls) -> Generator[Node, None, None]:
   835        for st in variants(ServiceType):
   836            yield cls(st, name="{self.target.name}")
   837
   838    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   839        yield self, self.format(
   840            """
   841---
   842apiVersion: getambassador.io/v3alpha1
   843kind: Mapping
   844name:  {self.name}
   845hostname: "*"
   846prefix: /{self.name}/
   847service: http://{self.target.path.fqdn}
   848add_request_headers:
   849    zoo:
   850        append: False
   851        value: Zoo
   852    aoo:
   853        append: True
   854        value: aoo
   855    boo:
   856        value: boo
   857    foo:
   858        value: Foo
   859"""
   860        )
   861
   862    def queries(self):
   863        yield Query(
   864            self.parent.url(self.name + "/"),
   865            headers={"zoo": "ZooZ", "aoo": "AooA", "boo": "BooB", "foo": "FooF"},
   866        )
   867
   868    def check(self):
   869        for r in self.results:
   870            if r.backend:
   871                assert r.backend.request
   872                assert r.backend.request.headers["zoo"] == ["Zoo"]
   873                assert r.backend.request.headers["aoo"] == ["AooA", "aoo"]
   874                assert r.backend.request.headers["boo"] == ["BooB", "boo"]
   875                assert r.backend.request.headers["foo"] == ["FooF", "Foo"]

View as plain text