...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/t_mappingtests_plain.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3from abstract_tests import (
     4    HTTP,
     5    AmbassadorTest,
     6    HTTPBin,
     7    MappingTest,
     8    Node,
     9    OptionTest,
    10    ServiceType,
    11    WebsocketEcho,
    12)
    13from ambassador.constants import Constants
    14from kat.harness import EDGE_STACK, Query, variants
    15
    16# This is the place to add new MappingTests.
    17
    18
    19def unique(options):
    20    added = set()
    21    result = []
    22    for o in options:
    23        if o.__class__ not in added:
    24            added.add(o.__class__)
    25            result.append(o)
    26    return tuple(result)
    27
    28
    29class SimpleMapping(MappingTest):
    30
    31    parent: AmbassadorTest
    32    target: ServiceType
    33
    34    @classmethod
    35    def variants(cls) -> Generator[Node, None, None]:
    36        for st in variants(ServiceType):
    37            yield cls(st, name="{self.target.name}")
    38
    39            for mot in variants(OptionTest):
    40                yield cls(st, (mot,), name="{self.target.name}-{self.options[0].name}")
    41
    42            yield cls(
    43                st,
    44                unique(v for v in variants(OptionTest) if not getattr(v, "isolated", False)),
    45                name="{self.target.name}-all",
    46            )
    47
    48    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    49        yield self, self.format(
    50            """
    51---
    52apiVersion: getambassador.io/v3alpha1
    53kind: Mapping
    54name:  {self.name}
    55hostname: "*"
    56prefix: /{self.name}/
    57service: http://{self.target.path.fqdn}
    58"""
    59        )
    60
    61    def queries(self):
    62        yield Query(self.parent.url(self.name + "/"))
    63        yield Query(self.parent.url(f"need-normalization/../{self.name}/"))
    64
    65    def check(self):
    66        for r in self.results:
    67            if r.backend:
    68                assert r.backend.name == self.target.path.k8s, (
    69                    r.backend.name,
    70                    self.target.path.k8s,
    71                )
    72                assert r.backend.request.headers["x-envoy-original-path"][0] == f"/{self.name}/"
    73
    74
    75class SimpleMappingIngress(MappingTest):
    76
    77    parent: AmbassadorTest
    78    target: ServiceType
    79
    80    @classmethod
    81    def variants(cls) -> Generator[Node, None, None]:
    82        for st in variants(ServiceType):
    83            yield cls(st, name="{self.target.name}")
    84
    85    def manifests(self) -> str:
    86        return f"""
    87apiVersion: networking.k8s.io/v1
    88kind: Ingress
    89metadata:
    90  annotations:
    91    kubernetes.io/ingress.class: ambassador
    92    getambassador.io/ambassador-id: plain
    93  name: {self.name.lower()}
    94spec:
    95  rules:
    96  - http:
    97      paths:
    98      - backend:
    99          service:
   100            name: {self.target.path.k8s}
   101            port:
   102              number: 80
   103        path: /{self.name}/
   104        pathType: Prefix
   105"""
   106
   107    def queries(self):
   108        yield Query(self.parent.url(self.name + "/"))  # , xfail="IHA hostglob")
   109        yield Query(
   110            self.parent.url(f"need-normalization/../{self.name}/")
   111        )  # , xfail="IHA hostglob")
   112
   113    def check(self):
   114        for r in self.results:
   115            if r.backend:
   116                assert r.backend.name == self.target.path.k8s, (
   117                    r.backend.name,
   118                    self.target.path.k8s,
   119                )
   120                assert r.backend.request.headers["x-envoy-original-path"][0] == f"/{self.name}/"
   121
   122
   123# Disabled SimpleMappingIngressDefaultBackend since adding a default fallback mapping would break other
   124# assertions, expecting to 404 if mappings don't match in Plain.
   125# class SimpleMappingIngressDefaultBackend(MappingTest):
   126#
   127#     parent: AmbassadorTest
   128#     target: ServiceType
   129#
   130#     @classmethod
   131#    def variants(cls) -> Generator[Node, None, None]:
   132#         for st in variants(ServiceType):
   133#             yield cls(st, name="{self.target.name}")
   134#
   135#     def manifests(self) -> str:
   136#         return f"""
   137# apiVersion: networking.k8s.io/v1
   138# kind: Ingress
   139# metadata:
   140#   annotations:
   141#     kubernetes.io/ingress.class: ambassador
   142#     getambassador.io/ambassador-id: plain
   143#   name: {self.name.lower()}
   144# spec:
   145#   backend:
   146#     service:
   147#       name: {self.target.path.k8s}
   148#       port:
   149#         number: 80
   150# """
   151#
   152#     def queries(self):
   153#         yield Query(self.parent.url(self.name))
   154#
   155#     def check(self):
   156#         for r in self.results:
   157#             if r.backend:
   158#                 assert r.backend.name == self.target.path.k8s, (r.backend.name, self.target.path.k8s)
   159#                 assert r.backend.request.headers['x-envoy-original-path'][0] == f'/{self.name}'
   160
   161
   162class SimpleIngressWithAnnotations(MappingTest):
   163
   164    parent: AmbassadorTest
   165    target: ServiceType
   166
   167    @classmethod
   168    def variants(cls) -> Generator[Node, None, None]:
   169        for st in variants(ServiceType):
   170            yield cls(st, name="{self.target.name}")
   171
   172    def manifests(self) -> str:
   173        return f"""
   174apiVersion: networking.k8s.io/v1
   175kind: Ingress
   176metadata:
   177  annotations:
   178    kubernetes.io/ingress.class: ambassador
   179    getambassador.io/ambassador-id: plain
   180    getambassador.io/config: |
   181      ---
   182      apiVersion: getambassador.io/v3alpha1
   183      kind: Mapping
   184      name:  {self.name}-nested
   185      hostname: "*"
   186      prefix: /{self.name}-nested/
   187      service: http://{self.target.path.fqdn}
   188      ambassador_id: [plain]
   189  name: {self.name.lower()}
   190spec:
   191  rules:
   192  - http:
   193      paths:
   194      - backend:
   195          service:
   196            name: {self.target.path.k8s}
   197            port:
   198              number: 80
   199        path: /{self.name}/
   200        pathType: Prefix
   201"""
   202
   203    def queries(self):
   204        yield Query(self.parent.url(self.name + "/"))  # , xfail="IHA hostglob")
   205        yield Query(
   206            self.parent.url(f"need-normalization/../{self.name}/")
   207        )  # , xfail="IHA hostglob")
   208        yield Query(self.parent.url(self.name + "-nested/"))  # , xfail="IHA hostglob")
   209        yield Query(
   210            self.parent.url(self.name + "-non-existent/"), expected=404
   211        )  # , xfail="IHA hostglob")
   212
   213    def check(self):
   214        for r in self.results:
   215            if r.backend:
   216                assert r.backend.name == self.target.path.k8s, (
   217                    r.backend.name,
   218                    self.target.path.k8s,
   219                )
   220                assert r.backend.request.headers["x-envoy-original-path"][0] in (
   221                    f"/{self.name}/",
   222                    f"/{self.name}-nested/",
   223                )
   224
   225
   226class HostHeaderMappingIngress(MappingTest):
   227
   228    parent: AmbassadorTest
   229
   230    @classmethod
   231    def variants(cls) -> Generator[Node, None, None]:
   232        for st in variants(ServiceType):
   233            yield cls(st, name="{self.target.name}")
   234
   235    def manifests(self) -> str:
   236        return f"""
   237apiVersion: networking.k8s.io/v1
   238kind: Ingress
   239metadata:
   240  annotations:
   241    kubernetes.io/ingress.class: ambassador
   242    getambassador.io/ambassador-id: plain
   243  name: {self.name.lower()}
   244spec:
   245  rules:
   246  - host: inspector.external
   247    http:
   248      paths:
   249      - backend:
   250          service:
   251            name: {self.target.path.k8s}
   252            port:
   253              number: 80
   254        path: /{self.name}/
   255        pathType: Prefix
   256"""
   257
   258    def queries(self):
   259        yield Query(self.parent.url(self.name + "/"), expected=404)
   260        yield Query(
   261            self.parent.url(self.name + "/"), headers={"Host": "inspector.internal"}, expected=404
   262        )
   263        yield Query(self.parent.url(self.name + "/"), headers={"Host": "inspector.external"})
   264
   265
   266class HostHeaderMapping(MappingTest):
   267
   268    parent: AmbassadorTest
   269
   270    @classmethod
   271    def variants(cls) -> Generator[Node, None, None]:
   272        for st in variants(ServiceType):
   273            yield cls(st, name="{self.target.name}")
   274
   275    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   276        yield self, self.format(
   277            """
   278---
   279apiVersion: getambassador.io/v3alpha1
   280kind: Mapping
   281name:  {self.name}
   282prefix: /{self.name}/
   283service: http://{self.target.path.fqdn}
   284host: inspector.external
   285"""
   286        )
   287
   288    def queries(self):
   289        yield Query(self.parent.url(self.name + "/"), expected=404)
   290        yield Query(
   291            self.parent.url(self.name + "/"), headers={"Host": "inspector.internal"}, expected=404
   292        )
   293        yield Query(self.parent.url(self.name + "/"), headers={"Host": "inspector.external"})
   294        # Test that a host header with a port value that does match the listener's configured port is not
   295        # stripped for the purpose of routing, so it does not match the Mapping. This is the default behavior,
   296        # and can be overridden using `strip_matching_host_port`, tested below.
   297        yield Query(
   298            self.parent.url(self.name + "/"),
   299            headers={"Host": "inspector.external:" + str(Constants.SERVICE_PORT_HTTP)},
   300            expected=404,
   301        )
   302
   303
   304class InvalidPortMapping(MappingTest):
   305
   306    parent: AmbassadorTest
   307
   308    @classmethod
   309    def variants(cls) -> Generator[Node, None, None]:
   310        for st in variants(ServiceType):
   311            yield cls(st, name="{self.target.name}")
   312
   313    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   314        yield self, self.format(
   315            """
   316---
   317apiVersion: getambassador.io/v3alpha1
   318kind: Mapping
   319name:  {self.name}
   320hostname: "*"
   321prefix: /{self.name}/
   322service: http://{self.target.path.fqdn}:80.invalid
   323"""
   324        )
   325
   326    def queries(self):
   327        yield Query(self.parent.url("ambassador/v0/diag/?json=true&filter=errors"))
   328
   329    def check(self):
   330        error_string = "found invalid port for service"
   331        found_error = False
   332        for error_list in self.results[0].json:
   333            for error in error_list:
   334                if error.find(error_string) != -1:
   335                    found_error = True
   336        assert found_error, "could not find the relevant error - {}".format(error_string)
   337
   338
   339class WebSocketMapping(MappingTest):
   340
   341    parent: AmbassadorTest
   342    target: ServiceType
   343
   344    def init(self):
   345        MappingTest.init(self, WebsocketEcho())
   346
   347    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   348        yield self, self.format(
   349            """
   350---
   351apiVersion: getambassador.io/v3alpha1
   352kind: Mapping
   353name:  {self.name}
   354hostname: "*"
   355prefix: /{self.name}/
   356service: {self.target.path.fqdn}
   357use_websocket: true
   358"""
   359        )
   360
   361    def queries(self):
   362        yield Query(self.parent.url(self.name + "/"), expected=404)
   363
   364        yield Query(self.parent.url(self.name + "/", scheme="ws"), messages=["one", "two", "three"])
   365
   366    def check(self):
   367        assert self.results[-1].messages == ["one", "two", "three"], "invalid messages: %s" % repr(
   368            self.results[-1].messages
   369        )
   370
   371
   372class TLSOrigination(MappingTest):
   373
   374    parent: AmbassadorTest
   375    definition: str
   376
   377    IMPLICIT = """
   378---
   379apiVersion: getambassador.io/v3alpha1
   380kind: Mapping
   381name:  {self.name}
   382hostname: "*"
   383prefix: /{self.name}/
   384service: https://{self.target.path.fqdn}
   385"""
   386
   387    EXPLICIT = """
   388---
   389apiVersion: getambassador.io/v3alpha1
   390kind: Mapping
   391name:  {self.name}
   392hostname: "*"
   393prefix: /{self.name}/
   394service: https://{self.target.path.fqdn}
   395"""
   396
   397    @classmethod
   398    def variants(cls) -> Generator[Node, None, None]:
   399        for v in variants(ServiceType):
   400            for name, dfn in ("IMPLICIT", cls.IMPLICIT), ("EXPLICIT", cls.EXPLICIT):
   401                yield cls(v, dfn, name="{self.target.name}-%s" % name)
   402
   403    def init(self, target, definition):
   404        MappingTest.init(self, target)
   405        self.definition = definition
   406
   407    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   408        yield self.target, self.format(self.definition)
   409
   410    def queries(self):
   411        yield Query(self.parent.url(self.name + "/"))
   412
   413    def check(self):
   414        for r in self.results:
   415            assert r.backend.request.tls.enabled
   416
   417
   418class HostRedirectMapping(MappingTest):
   419    parent: AmbassadorTest
   420    target: ServiceType
   421
   422    def init(self):
   423        MappingTest.init(self, HTTP())
   424
   425    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   426        yield self.target, self.format(
   427            """
   428---
   429apiVersion: getambassador.io/v3alpha1
   430kind: Mapping
   431name:  {self.name}
   432hostname: "*"
   433prefix: /{self.name}/
   434service: foobar.com
   435host_redirect: true
   436---
   437apiVersion: getambassador.io/v3alpha1
   438kind: Mapping
   439name:  {self.name}-2
   440hostname: "*"
   441prefix: /{self.name}-2/
   442case_sensitive: false
   443service: foobar.com
   444host_redirect: true
   445---
   446apiVersion: getambassador.io/v3alpha1
   447kind: Mapping
   448name:  {self.name}-3
   449hostname: "*"
   450prefix: /{self.name}-3/foo/
   451service: foobar.com
   452host_redirect: true
   453path_redirect: /redirect/
   454redirect_response_code: 302
   455---
   456apiVersion: getambassador.io/v3alpha1
   457kind: Mapping
   458name:  {self.name}-4
   459hostname: "*"
   460prefix: /{self.name}-4/foo/bar/baz
   461service: foobar.com
   462host_redirect: true
   463prefix_redirect: /foobar/baz
   464redirect_response_code: 307
   465---
   466apiVersion: getambassador.io/v3alpha1
   467kind: Mapping
   468name:  {self.name}-5
   469hostname: "*"
   470prefix: /{self.name}-5/assets/([a-f0-9]{{12}})/images
   471prefix_regex: true
   472service: foobar.com
   473host_redirect: true
   474regex_redirect:
   475  pattern: /{self.name}-5/assets/([a-f0-9]{{12}})/images
   476  substitution: /images/\\1
   477redirect_response_code: 308
   478"""
   479        )
   480
   481    def queries(self):
   482        # [0]
   483        yield Query(self.parent.url(self.name + "/anything?itworked=true"), expected=301)
   484
   485        # [1]
   486        yield Query(self.parent.url(self.name.upper() + "/anything?itworked=true"), expected=404)
   487
   488        # [2]
   489        yield Query(self.parent.url(self.name + "-2/anything?itworked=true"), expected=301)
   490
   491        # [3]
   492        yield Query(self.parent.url(self.name.upper() + "-2/anything?itworked=true"), expected=301)
   493
   494        # [4]
   495        yield Query(self.parent.url(self.name + "-3/foo/anything"), expected=302)
   496
   497        # [5]
   498        yield Query(self.parent.url(self.name + "-4/foo/bar/baz/anything"), expected=307)
   499
   500        # [6]
   501        yield Query(self.parent.url(self.name + "-5/assets/abcd0000f123/images"), expected=308)
   502
   503        # [7]
   504        yield Query(
   505            self.parent.url(self.name + "-5/assets/abcd0000f123/images?itworked=true"), expected=308
   506        )
   507
   508    def check(self):
   509        # [0]
   510        assert self.results[0].headers["Location"] == [
   511            self.format("http://foobar.com/{self.name}/anything?itworked=true")
   512        ], f"Unexpected Location {self.results[0].headers['Location']}"
   513
   514        # [1]
   515        assert self.results[1].status == 404
   516
   517        # [2]
   518        assert self.results[2].headers["Location"] == [
   519            self.format("http://foobar.com/{self.name}-2/anything?itworked=true")
   520        ], f"Unexpected Location {self.results[2].headers['Location']}"
   521
   522        # [3]
   523        assert self.results[3].headers["Location"] == [
   524            self.format("http://foobar.com/" + self.name.upper() + "-2/anything?itworked=true")
   525        ], f"Unexpected Location {self.results[3].headers['Location']}"
   526
   527        # [4]
   528        assert self.results[4].headers["Location"] == [
   529            self.format("http://foobar.com/redirect/")
   530        ], f"Unexpected Location {self.results[4].headers['Location']}"
   531
   532        # [5]
   533        assert self.results[5].headers["Location"] == [
   534            self.format("http://foobar.com/foobar/baz/anything")
   535        ], f"Unexpected Location {self.results[5].headers['Location']}"
   536
   537        # [6]
   538        assert self.results[6].headers["Location"] == [
   539            self.format("http://foobar.com/images/abcd0000f123")
   540        ], f"Unexpected Location {self.results[6].headers['Location']}"
   541
   542        # [7]
   543        assert self.results[7].headers["Location"] == [
   544            self.format("http://foobar.com/images/abcd0000f123?itworked=true")
   545        ], f"Unexpected Location {self.results[7].headers['Location']}"
   546
   547
   548class CanaryMapping(MappingTest):
   549
   550    parent: AmbassadorTest
   551    target: ServiceType
   552    canary: ServiceType
   553    weight: int
   554
   555    @classmethod
   556    def variants(cls) -> Generator[Node, None, None]:
   557        for v in variants(ServiceType):
   558            for w in (0, 10, 50, 100):
   559                yield cls(v, v.clone("canary"), w, name="{self.target.name}-{self.weight}")
   560
   561    # XXX This type: ignore is here because we're deliberately overriding the
   562    # parent's init to have a different signature... but it's also intimately
   563    # (nay, incestuously) related to the variant()'s yield() above, and I really
   564    # don't want to deal with that right now. So. We'll deal with it later.
   565    def init(self, target: ServiceType, canary: ServiceType, weight):  # type: ignore
   566        MappingTest.init(self, target)
   567        self.canary = canary
   568        self.weight = weight
   569
   570    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   571        yield self.target, self.format(
   572            """
   573---
   574apiVersion: getambassador.io/v3alpha1
   575kind: Mapping
   576name:  {self.name}
   577hostname: "*"
   578prefix: /{self.name}/
   579service: http://{self.target.path.fqdn}
   580"""
   581        )
   582        yield self.canary, self.format(
   583            """
   584---
   585apiVersion: getambassador.io/v3alpha1
   586kind: Mapping
   587name:  {self.name}-canary
   588hostname: "*"
   589prefix: /{self.name}/
   590service: http://{self.canary.path.fqdn}
   591weight: {self.weight}
   592"""
   593        )
   594
   595    def queries(self):
   596        for i in range(100):
   597            yield Query(self.parent.url(self.name + "/"))
   598
   599    def check(self):
   600        hist = {}
   601
   602        for r in self.results:
   603            hist[r.backend.name] = hist.get(r.backend.name, 0) + 1
   604
   605        if self.weight == 0:
   606            assert hist.get(self.canary.path.k8s, 0) == 0
   607            assert hist.get(self.target.path.k8s, 0) == 100
   608        elif self.weight == 100:
   609            assert hist.get(self.canary.path.k8s, 0) == 100
   610            assert hist.get(self.target.path.k8s, 0) == 0
   611        else:
   612            canary = 100 * hist.get(self.canary.path.k8s, 0) / len(self.results)
   613            main = 100 * hist.get(self.target.path.k8s, 0) / len(self.results)
   614
   615            assert (
   616                abs(self.weight - canary) < 25
   617            ), f"weight {self.weight} routed {canary}% to canary"
   618            assert (
   619                abs(100 - (canary + main)) < 2
   620            ), f"weight {self.weight} routed only {canary + main}% at all?"
   621
   622
   623class CanaryDiffMapping(MappingTest):
   624
   625    parent: AmbassadorTest
   626    target: ServiceType
   627    canary: ServiceType
   628    weight: int
   629
   630    @classmethod
   631    def variants(cls) -> Generator[Node, None, None]:
   632        for v in variants(ServiceType):
   633            for w in (0, 10, 50, 100):
   634                yield cls(v, v.clone("canary"), w, name="{self.target.name}-{self.weight}")
   635
   636    # XXX This type: ignore is here because we're deliberately overriding the
   637    # parent's init to have a different signature... but it's also intimately
   638    # (nay, incestuously) related to the variant()'s yield() above, and I really
   639    # don't want to deal with that right now. So. We'll deal with it later.
   640    def init(self, target: ServiceType, canary: ServiceType, weight):  # type: ignore
   641        MappingTest.init(self, target)
   642        self.canary = canary
   643        self.weight = weight
   644
   645    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   646        yield self.target, self.format(
   647            """
   648---
   649apiVersion: getambassador.io/v3alpha1
   650kind: Mapping
   651name:  {self.name}
   652hostname: "*"
   653prefix: /{self.name}/
   654service: http://{self.target.path.fqdn}
   655host_rewrite: canary.1.example.com
   656"""
   657        )
   658        yield self.canary, self.format(
   659            """
   660---
   661apiVersion: getambassador.io/v3alpha1
   662kind: Mapping
   663name:  {self.name}-canary
   664hostname: "*"
   665prefix: /{self.name}/
   666service: http://{self.canary.path.fqdn}
   667host_rewrite: canary.2.example.com
   668weight: {self.weight}
   669"""
   670        )
   671
   672    def queries(self):
   673        for i in range(100):
   674            yield Query(self.parent.url(self.name + "/"))
   675
   676    def check(self):
   677        request_hosts = ["canary.1.example.com", "canary.2.example.com"]
   678
   679        hist = {}
   680
   681        for r in self.results:
   682            hist[r.backend.name] = hist.get(r.backend.name, 0) + 1
   683            assert (
   684                r.backend.request.host in request_hosts
   685            ), f"Expected host {request_hosts}, got {r.backend.request.host}"
   686
   687        if self.weight == 0:
   688            assert hist.get(self.canary.path.k8s, 0) == 0
   689            assert hist.get(self.target.path.k8s, 0) == 100
   690        elif self.weight == 100:
   691            assert hist.get(self.canary.path.k8s, 0) == 100
   692            assert hist.get(self.target.path.k8s, 0) == 0
   693        else:
   694            canary = 100 * hist.get(self.canary.path.k8s, 0) / len(self.results)
   695            main = 100 * hist.get(self.target.path.k8s, 0) / len(self.results)
   696
   697            assert (
   698                abs(self.weight - canary) < 25
   699            ), f"weight {self.weight} routed {canary}% to canary"
   700            assert (
   701                abs(100 - (canary + main)) < 2
   702            ), f"weight {self.weight} routed only {canary + main}% at all?"
   703
   704
   705class AddRespHeadersMapping(MappingTest):
   706    parent: AmbassadorTest
   707    target: ServiceType
   708
   709    def init(self):
   710        MappingTest.init(self, HTTPBin())
   711
   712    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   713        yield self, self.format(
   714            """
   715---
   716apiVersion: getambassador.io/v3alpha1
   717kind: Mapping
   718name:  {self.name}
   719hostname: "*"
   720prefix: /{self.name}/
   721service: {self.target.path.fqdn}
   722add_response_headers:
   723    koo:
   724        append: False
   725        value: KooK
   726    zoo:
   727        append: True
   728        value: ZooZ
   729    test:
   730        value: boo
   731    foo:
   732        value: Foo
   733"""
   734        )
   735
   736    def queries(self):
   737        yield Query(self.parent.url(self.name) + "/response-headers?zoo=Zoo&test=Test&koo=Koot")
   738
   739    def check(self):
   740        for r in self.results:
   741            if r.headers:
   742                # print(r.headers)
   743                assert r.headers["Koo"] == ["KooK"]
   744                assert r.headers["Zoo"] == ["Zoo", "ZooZ"]
   745                assert r.headers["Test"] == ["Test", "boo"]
   746                assert r.headers["Foo"] == ["Foo"]
   747
   748
   749# To make sure queries to Edge stack related paths adds X-Content-Type-Options = nosniff in the response header
   750# and not to any other mappings/routes
   751class EdgeStackMapping(MappingTest):
   752    parent: AmbassadorTest
   753    target: ServiceType
   754
   755    def init(self):
   756        MappingTest.init(self, HTTP())
   757
   758        if not EDGE_STACK:
   759            self.skip_node = True
   760
   761    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   762        yield self.target, self.format(
   763            """
   764---
   765apiVersion: getambassador.io/v3alpha1
   766kind: Mapping
   767name:  {self.name}
   768hostname: "*"
   769prefix: /{self.name}/
   770service: http://{self.target.path.fqdn}
   771"""
   772        )
   773
   774    def queries(self):
   775        yield Query(self.parent.url("edge_stack/admin/"), expected=404)
   776        yield Query(self.parent.url(self.name + "/"), expected=200)
   777
   778    def check(self):
   779        # assert self.results[0].headers['X-Content-Type-Options'] == ['nosniff']
   780        assert "X-Content-Type-Options" not in self.results[1].headers
   781
   782
   783class RemoveReqHeadersMapping(MappingTest):
   784    parent: AmbassadorTest
   785    target: ServiceType
   786
   787    def init(self):
   788        MappingTest.init(self, HTTPBin())
   789
   790    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   791        yield self, self.format(
   792            """
   793---
   794apiVersion: getambassador.io/v3alpha1
   795kind: Mapping
   796name:  {self.name}
   797hostname: "*"
   798prefix: /{self.name}/
   799service: {self.target.path.fqdn}
   800remove_request_headers:
   801- zoo
   802- aoo
   803"""
   804        )
   805
   806    def queries(self):
   807        yield Query(
   808            self.parent.url(self.name + "/headers"),
   809            headers={"zoo": "ZooZ", "aoo": "AooA", "foo": "FooF"},
   810        )
   811
   812    def check(self):
   813        for r in self.results:
   814            # print(r.json)
   815            if "headers" in r.json:
   816                assert r.json["headers"]["Foo"] == "FooF"
   817                assert "Zoo" not in r.json["headers"]
   818                assert "Aoo" not in r.json["headers"]
   819
   820
   821class AddReqHeadersMapping(MappingTest):
   822    parent: AmbassadorTest
   823    target: ServiceType
   824
   825    @classmethod
   826    def variants(cls) -> Generator[Node, None, None]:
   827        for st in variants(ServiceType):
   828            yield cls(st, name="{self.target.name}")
   829
   830    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   831        yield self, self.format(
   832            """
   833---
   834apiVersion: getambassador.io/v3alpha1
   835kind: Mapping
   836name:  {self.name}
   837hostname: "*"
   838prefix: /{self.name}/
   839service: http://{self.target.path.fqdn}
   840add_request_headers:
   841    zoo:
   842        append: False
   843        value: Zoo
   844    aoo:
   845        append: True
   846        value: aoo
   847    boo:
   848        value: boo
   849    foo:
   850        value: Foo
   851"""
   852        )
   853
   854    def queries(self):
   855        yield Query(
   856            self.parent.url(self.name + "/"),
   857            headers={"zoo": "ZooZ", "aoo": "AooA", "boo": "BooB", "foo": "FooF"},
   858        )
   859
   860    def check(self):
   861        for r in self.results:
   862            if r.backend:
   863                assert r.backend.request.headers["zoo"] == ["Zoo"]
   864                assert r.backend.request.headers["aoo"] == ["AooA", "aoo"]
   865                assert r.backend.request.headers["boo"] == ["BooB", "boo"]
   866                assert r.backend.request.headers["foo"] == ["FooF", "Foo"]

View as plain text