1from typing import Generator, Literal, Tuple, Union, cast
2
3from abstract_tests import HTTP, RLSGRPC, AmbassadorTest, Node, ServiceType
4from kat.harness import Query
5from tests.selfsigned import TLSCerts
6
7
8class RateLimitV0Test(AmbassadorTest):
9 # debug = True
10 target: ServiceType
11 rls: ServiceType
12
13 def init(self):
14 self.target = HTTP()
15 self.rls = RLSGRPC()
16
17 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
18 # Use self.target here, because we want this mapping to be annotated
19 # on the service, not the Ambassador.
20 # ambassador_id: [ {self.with_tracing.ambassador_id}, {self.no_tracing.ambassador_id} ]
21 yield self.target, self.format(
22 """
23---
24apiVersion: getambassador.io/v3alpha1
25kind: Mapping
26name: ratelimit_target_mapping
27hostname: "*"
28prefix: /target/
29service: {self.target.path.fqdn}
30labels:
31 ambassador:
32 - request_label_group:
33 - request_headers:
34 key: kat-req-rls-allow
35 header_name: "kat-req-rls-allow"
36 omit_if_not_present: true
37 - request_headers:
38 key: kat-req-rls-headers-append
39 header_name: "kat-req-rls-headers-append"
40 omit_if_not_present: true
41---
42apiVersion: getambassador.io/v3alpha1
43kind: Mapping
44name: ratelimit_label_mapping
45hostname: "*"
46prefix: /labels/
47service: {self.target.path.fqdn}
48labels:
49 ambassador:
50 - host_and_user:
51 - request_headers:
52 key: custom-label
53 header_name: ":authority"
54 omit_if_not_present: true
55 - request_headers:
56 key: user
57 header_name: "x-user"
58 omit_if_not_present: true
59
60 - omg_header:
61 - request_headers:
62 key: custom-label
63 header_name: "x-omg"
64 default: "OMFG!"
65"""
66 )
67
68 # For self.with_tracing, we want to configure the TracingService.
69 yield self, self.format(
70 """
71---
72apiVersion: getambassador.io/v3alpha1
73kind: RateLimitService
74name: {self.rls.path.k8s}
75service: "{self.rls.path.fqdn}"
76timeout_ms: 500
77protocol_version: "v3"
78"""
79 )
80
81 def queries(self):
82 # Speak through each Ambassador to the traced service...
83 # yield Query(self.with_tracing.url("target/"))
84 # yield Query(self.no_tracing.url("target/"))
85
86 # [0]
87 # No matching headers, won't even go through ratelimit-service filter
88 yield Query(self.url("target/"))
89
90 # [1]
91 # Header instructing dummy ratelimit-service to allow request
92 yield Query(
93 self.url("target/"),
94 expected=200,
95 headers={
96 "kat-req-rls-allow": "true",
97 "kat-req-rls-headers-append": "no header",
98 },
99 )
100
101 # [2]
102 # Header instructing dummy ratelimit-service to reject request with
103 # a custom response body
104 yield Query(
105 self.url("target/"),
106 expected=429,
107 headers={
108 "kat-req-rls-allow": "over my dead body",
109 "kat-req-rls-headers-append": "Hello=Foo; Hi=Baz",
110 },
111 )
112
113 def check(self):
114 # [2] Verifies the 429 response and the proper content-type.
115 # The kat-server gRPC ratelimit implementation explicitly overrides
116 # the content-type to json, because the response is in fact json
117 # and we need to verify that this override is possible/correct.
118 assert self.results[2].headers["Hello"] == ["Foo"]
119 assert self.results[2].headers["Hi"] == ["Baz"]
120 assert self.results[2].headers["Content-Type"] == ["application/json"]
121 assert self.results[2].headers["Kat-Resp-Rls-Protocol-Version"] == ["v3"]
122
123
124class RateLimitV1Test(AmbassadorTest):
125 # debug = True
126 target: ServiceType
127
128 def init(self):
129 self.target = HTTP()
130 self.rls = RLSGRPC()
131
132 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
133 # Use self.target here, because we want this mapping to be annotated
134 # on the service, not the Ambassador.
135 yield self.target, self.format(
136 """
137---
138apiVersion: getambassador.io/v3alpha1
139kind: Mapping
140name: ratelimit_target_mapping
141hostname: "*"
142prefix: /target/
143service: {self.target.path.fqdn}
144labels:
145 ambassador:
146 - request_label_group:
147 - request_headers:
148 key: kat-req-rls-allow
149 header_name: "kat-req-rls-allow"
150 omit_if_not_present: true
151 - request_headers:
152 key: kat-req-rls-headers-append
153 header_name: "kat-req-rls-headers-append"
154 omit_if_not_present: true
155"""
156 )
157
158 yield self, self.format(
159 """
160---
161apiVersion: getambassador.io/v3alpha1
162kind: RateLimitService
163name: {self.rls.path.k8s}
164service: "{self.rls.path.fqdn}"
165timeout_ms: 500
166protocol_version: "v3"
167"""
168 )
169
170 def queries(self):
171 # [0]
172 # No matching headers, won't even go through ratelimit-service filter
173 yield Query(self.url("target/"))
174
175 # [1]
176 # Header instructing dummy ratelimit-service to allow request
177 yield Query(
178 self.url("target/"),
179 expected=200,
180 headers={
181 "kat-req-rls-allow": "true",
182 "kat-req-rls-headers-append": "no header",
183 },
184 )
185
186 # [2]
187 # Header instructing dummy ratelimit-service to reject request
188 yield Query(
189 self.url("target/"),
190 expected=429,
191 headers={
192 "kat-req-rls-allow": "over my dead body",
193 "kat-req-rls-headers-append": "Hello=Foo; Hi=Baz",
194 },
195 )
196
197 def check(self):
198 # [2] Verifies the 429 response and the proper content-type.
199 # The kat-server gRPC ratelimit implementation explicitly overrides
200 # the content-type to json, because the response is in fact json
201 # and we need to verify that this override is possible/correct.
202 assert self.results[2].headers["Hello"] == ["Foo"]
203 assert self.results[2].headers["Hi"] == ["Baz"]
204 assert self.results[2].headers["Content-Type"] == ["application/json"]
205 assert self.results[2].headers["Kat-Resp-Rls-Protocol-Version"] == ["v3"]
206
207
208class RateLimitV1WithTLSTest(AmbassadorTest):
209 # debug = True
210 target: ServiceType
211
212 def init(self):
213 self.target = HTTP()
214 self.rls = RLSGRPC()
215
216 def manifests(self) -> str:
217 return (
218 f"""
219---
220apiVersion: v1
221data:
222 tls.crt: {TLSCerts["ratelimit.datawire.io"].k8s_crt}
223 tls.key: {TLSCerts["ratelimit.datawire.io"].k8s_key}
224kind: Secret
225metadata:
226 name: ratelimit-tls-secret
227type: kubernetes.io/tls
228"""
229 + super().manifests()
230 )
231
232 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
233 # Use self.target here, because we want this mapping to be annotated
234 # on the service, not the Ambassador.
235 yield self.target, self.format(
236 """
237---
238apiVersion: getambassador.io/v3alpha1
239kind: TLSContext
240name: ratelimit-tls-context
241secret: ratelimit-tls-secret
242alpn_protocols: h2
243---
244apiVersion: getambassador.io/v3alpha1
245kind: Mapping
246name: ratelimit_target_mapping
247hostname: "*"
248prefix: /target/
249service: {self.target.path.fqdn}
250labels:
251 ambassador:
252 - request_label_group:
253 - request_headers:
254 key: kat-req-rls-allow
255 header_name: "kat-req-rls-allow"
256 omit_if_not_present: true
257 - request_headers:
258 key: kat-req-rls-headers-append
259 header_name: "kat-req-rls-headers-append"
260 omit_if_not_present: true
261"""
262 )
263
264 yield self, self.format(
265 """
266---
267apiVersion: getambassador.io/v3alpha1
268kind: RateLimitService
269name: {self.rls.path.k8s}
270service: "{self.rls.path.fqdn}"
271timeout_ms: 500
272tls: ratelimit-tls-context
273protocol_version: "v3"
274"""
275 )
276
277 def queries(self):
278 # No matching headers, won't even go through ratelimit-service filter
279 yield Query(self.url("target/"))
280
281 # Header instructing dummy ratelimit-service to allow request
282 yield Query(self.url("target/"), expected=200, headers={"kat-req-rls-allow": "true"})
283
284 # Header instructing dummy ratelimit-service to reject request
285 yield Query(
286 self.url("target/"),
287 expected=429,
288 headers={
289 "kat-req-rls-allow": "nope",
290 "kat-req-rls-headers-append": "Hello=Foo; Hi=Baz",
291 },
292 )
293
294 def check(self):
295 # [2] Verifies the 429 response and the proper content-type.
296 # The kat-server gRPC ratelimit implementation explicitly overrides
297 # the content-type to json, because the response is in fact json
298 # and we need to verify that this override is possible/correct.
299 assert self.results[2].headers["Hello"] == ["Foo"]
300 assert self.results[2].headers["Hi"] == ["Baz"]
301 assert self.results[2].headers["Content-Type"] == ["application/json"]
302 assert self.results[2].headers["Kat-Resp-Rls-Protocol-Version"] == ["v3"]
303
304
305class RateLimitVerTest(AmbassadorTest):
306 # debug = True
307 target: ServiceType
308 specified_protocol_version: Literal["v2", "v3", "default"]
309 expected_protocol_version: Literal["v3", "invalid"]
310 rls: ServiceType
311
312 @classmethod
313 def variants(cls) -> Generator[Node, None, None]:
314 for protocol_version in ["v2", "v3", "default"]:
315 yield cls(protocol_version, name="{self.specified_protocol_version}")
316
317 def init(self, protocol_version: Literal["v2", "v3", "default"]):
318 self.target = HTTP()
319 self.specified_protocol_version = protocol_version
320 self.expected_protocol_version = cast(
321 Literal["v3", "invalid"], protocol_version if protocol_version in ["v3"] else "invalid"
322 )
323 self.rls = RLSGRPC(
324 protocol_version=(
325 self.expected_protocol_version
326 if self.expected_protocol_version != "invalid"
327 else "v3"
328 )
329 )
330
331 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
332 # Use self.target here, because we want this mapping to be annotated
333 # on the service, not the Ambassador.
334 yield self.target, self.format(
335 """
336---
337apiVersion: getambassador.io/v3alpha1
338kind: Mapping
339name: ratelimit_target_mapping
340hostname: "*"
341prefix: /target/
342service: {self.target.path.fqdn}
343labels:
344 ambassador:
345 - request_label_group:
346 - request_headers:
347 key: kat-req-rls-allow
348 header_name: "kat-req-rls-allow"
349 omit_if_not_present: true
350 - request_headers:
351 key: kat-req-rls-headers-append
352 header_name: "kat-req-rls-headers-append"
353 omit_if_not_present: true
354"""
355 )
356
357 yield self, self.format(
358 """
359---
360apiVersion: getambassador.io/v3alpha1
361kind: RateLimitService
362name: {self.rls.path.k8s}
363service: "{self.rls.path.fqdn}"
364timeout_ms: 500
365"""
366 ) + (
367 ""
368 if self.specified_protocol_version == "default"
369 else f"protocol_version: '{self.specified_protocol_version}'"
370 )
371
372 def queries(self):
373 # [0]
374 # No matching headers, won't even go through ratelimit-service filter
375 yield Query(self.url("target/"))
376
377 # [1]
378 # Header instructing dummy ratelimit-service to allow request
379 yield Query(
380 self.url("target/"),
381 expected=200,
382 headers={
383 "kat-req-rls-allow": "true",
384 "kat-req-rls-headers-append": "no header",
385 },
386 )
387
388 # [2]
389 # Header instructing dummy ratelimit-service to reject request
390 yield Query(
391 self.url("target/"),
392 expected=(429 if self.expected_protocol_version != "invalid" else 200),
393 headers={
394 "kat-req-rls-allow": "over my dead body",
395 "kat-req-rls-headers-append": "Hello=Foo; Hi=Baz",
396 },
397 )
398
399 def check(self):
400 if self.expected_protocol_version == "invalid":
401 # all queries should succeed because the rate-limit filter was dropped, due to bad protocol
402 assert "Hello" not in self.results[2].headers
403 assert "Hi" not in self.results[2].headers
404 assert "Kat-Resp-Rls-Protocol-Version" not in self.results[2].headers
405 return
406
407 # [2] Verifies the 429 response and the proper content-type.
408 # The kat-server gRPC ratelimit implementation explicitly overrides
409 # the content-type to json, because the response is in fact json
410 # and we need to verify that this override is possible/correct.
411 assert self.results[2].headers["Hello"] == ["Foo"]
412 assert self.results[2].headers["Hi"] == ["Baz"]
413 assert self.results[2].headers["Content-Type"] == ["application/json"]
414 assert self.results[2].headers["Kat-Resp-Rls-Protocol-Version"] == [
415 self.expected_protocol_version
416 ]
View as plain text