...
1from typing import Generator, Tuple, Union
2
3from abstract_tests import HTTP, AmbassadorTest, Node, ServiceType
4from kat.harness import Query
5
6
7class AllowChunkedLengthTestTrue(AmbassadorTest):
8 target: ServiceType
9
10 def init(self):
11 self.target = HTTP(name="target")
12
13 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
14 yield self, self.format(
15 """
16---
17apiVersion: ambassador
18kind: Module
19name: ambassador
20config:
21 allow_chunked_length: true
22---
23apiVersion: getambassador.io/v3alpha1
24kind: Mapping
25name: {self.target.path.k8s}-foo
26prefix: /foo/
27hostname: "*"
28service: {self.target.path.fqdn}
29"""
30 )
31
32 def queries(self):
33 yield Query(self.url("foo/"))
34 yield Query(self.url("ambassador/v0/diag/"))
35 yield Query(self.url("foo/"), headers={"content-length": "0", "transfer-encoding": "gzip"})
36 yield Query(
37 self.url("ambassador/v0/diag/"),
38 headers={"content-length": "0", "transfer-encoding": "gzip"},
39 )
40
41 def check(self):
42 # Not getting a 400 bad request is confirmation that this setting works as long as the request can reach the upstream
43 assert self.results[0].status == 200
44 assert self.results[1].status == 200
45 assert self.results[2].status == 200
46 assert self.results[3].status == 200
View as plain text