...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/kat/t_chunked_length.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3from abstract_tests import HTTP, AmbassadorTest, Node, ServiceType
     4from kat.harness import Query
     5
     6
     7class AllowChunkedLengthTestTrue(AmbassadorTest):
     8    target: ServiceType
     9
    10    def init(self):
    11        self.target = HTTP(name="target")
    12
    13    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    14        yield self, self.format(
    15            """
    16---
    17apiVersion: ambassador
    18kind:  Module
    19name:  ambassador
    20config:
    21  allow_chunked_length: true
    22---
    23apiVersion: getambassador.io/v3alpha1
    24kind:  Mapping
    25name:  {self.target.path.k8s}-foo
    26prefix: /foo/
    27hostname: "*"
    28service: {self.target.path.fqdn}
    29"""
    30        )
    31
    32    def queries(self):
    33        yield Query(self.url("foo/"))
    34        yield Query(self.url("ambassador/v0/diag/"))
    35        yield Query(self.url("foo/"), headers={"content-length": "0", "transfer-encoding": "gzip"})
    36        yield Query(
    37            self.url("ambassador/v0/diag/"),
    38            headers={"content-length": "0", "transfer-encoding": "gzip"},
    39        )
    40
    41    def check(self):
    42        # Not getting a 400 bad request is confirmation that this setting works as long as the request can reach the upstream
    43        assert self.results[0].status == 200
    44        assert self.results[1].status == 200
    45        assert self.results[2].status == 200
    46        assert self.results[3].status == 200

View as plain text