1import json
2from typing import ClassVar, Dict, Generator, Sequence, Tuple, Union
3
4import pytest
5
6from abstract_tests import MappingTest, Node, OptionTest
7from kat.harness import Query, Test
8
9# This is the place to add new OptionTests.
10
11
12class AddRequestHeaders(OptionTest):
13
14 parent: Test
15
16 VALUES: ClassVar[Sequence[Dict[str, Dict[str, Union[str, bool]]]]] = [
17 {"foo": {"value": "bar"}},
18 {"moo": {"value": "arf"}},
19 {"zoo": {"append": True, "value": "bar"}},
20 {"xoo": {"append": False, "value": "dwe"}},
21 {"aoo": {"value": "tyu"}},
22 ]
23
24 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
25 yield "add_request_headers: %s" % json.dumps(self.value)
26
27 def check(self):
28 for r in self.parent.results:
29 for k, v in self.value.items():
30 actual = r.backend.request.headers.get(k.lower())
31 if isinstance(v, dict):
32 assert actual == [v["value"]], (actual, [v["value"]])
33 else:
34 assert actual == [v], (actual, [v])
35
36
37class AddResponseHeaders(OptionTest):
38
39 parent: Test
40
41 VALUES: ClassVar[Sequence[Dict[str, Dict[str, Union[str, bool]]]]] = [
42 {"foo": {"value": "bar"}},
43 {"moo": {"value": "arf"}},
44 {"zoo": {"append": True, "value": "bar"}},
45 {"xoo": {"append": False, "value": "dwe"}},
46 {"aoo": {"value": "tyu"}},
47 ]
48
49 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
50 yield "add_response_headers: %s" % json.dumps(self.value)
51
52 def check(self):
53 for r in self.parent.results:
54 # Why do we end up with capitalized headers anyway??
55 lowercased_headers = {k.lower(): v for k, v in r.headers.items()}
56
57 for k, v in self.value.items():
58 actual = lowercased_headers.get(k.lower())
59 if isinstance(v, dict):
60 assert actual == [v["value"]], "expected %s: %s but got %s" % (
61 k,
62 v["value"],
63 lowercased_headers,
64 )
65 else:
66 assert actual == [v], "expected %s: %s but got %s" % (k, v, lowercased_headers)
67
68
69class UseWebsocket(OptionTest):
70 # TODO: add a check with a websocket client as soon as we have backend support for it
71
72 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
73 yield "use_websocket: true"
74
75
76class CORS(OptionTest):
77 # isolated = True
78 # debug = True
79
80 # Note that there's also a GlobalCORSTest in t_cors.py.
81
82 parent: MappingTest
83
84 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
85 yield 'cors: { origins: ["*"] }'
86
87 def queries(self):
88 for q in self.parent.queries():
89 yield Query(q.url) # redundant with parent
90 yield Query(q.url, headers={"Origin": "https://www.test-cors.org"})
91
92 def check(self):
93 # can assert about self.parent.results too
94 assert self.results[0].backend.name == self.parent.target.path.k8s
95 # Uh. Is it OK that this is case-sensitive?
96 assert "Access-Control-Allow-Origin" not in self.results[0].headers
97
98 assert self.results[1].backend.name == self.parent.target.path.k8s
99 # Uh. Is it OK that this is case-sensitive?
100 assert self.results[1].headers["Access-Control-Allow-Origin"] == [
101 "https://www.test-cors.org"
102 ]
103
104
105class CaseSensitive(OptionTest):
106
107 parent: MappingTest
108
109 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
110 yield "case_sensitive: false"
111
112 def queries(self):
113 for q in self.parent.queries():
114 idx = q.url.find("/", q.url.find("://") + 3)
115 upped = q.url[:idx] + q.url[idx:].upper()
116 assert upped != q.url
117 yield Query(upped)
118
119
120class AutoHostRewrite(OptionTest):
121
122 parent: MappingTest
123
124 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
125 yield "auto_host_rewrite: true"
126
127 def check(self):
128 for r in self.parent.results:
129 assert r.backend
130 assert r.backend.request
131 requested_host_echoed = r.backend.request.host
132 responding_host = r.backend.name
133
134 assert requested_host_echoed == self.parent.target.path.fqdn
135 assert responding_host == self.parent.target.path.k8s
136
137
138class Rewrite(OptionTest):
139
140 parent: MappingTest
141
142 VALUES = ("/foo", "foo")
143
144 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
145 yield self.format("rewrite: {self.value}")
146
147 def queries(self):
148 if self.value[0] != "/":
149 for q in self.parent.pending:
150 q.xfail = "rewrite option is broken for values not beginning in slash"
151
152 return super(OptionTest, self).queries()
153
154 def check(self):
155 if self.value[0] != "/":
156 pytest.xfail("this is broken")
157
158 for r in self.parent.results:
159 assert r.backend.request.url.path == self.value
160
161
162class RemoveResponseHeaders(OptionTest):
163
164 parent: Test
165
166 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
167 yield "remove_response_headers: [x-envoy-upstream-service-time]"
168
169 def check(self):
170 for r in self.parent.results:
171 assert (
172 r.headers.get("x-envoy-upstream-service-time", None) == None
173 ), "x-envoy-upstream-service-time header was meant to be dropped but wasn't"
View as plain text