1from typing import Generator, Literal, Tuple, Union
2
3from abstract_tests import HTTP, RLSGRPC, AmbassadorTest, Node, ServiceType
4from ambassador import Config
5from kat.harness import Query
6from tests.selfsigned import TLSCerts
7
8
9class RateLimitV0Test(AmbassadorTest):
10 # debug = True
11 target: ServiceType
12 rls: ServiceType
13
14 def init(self):
15 self.target = HTTP()
16 self.rls = RLSGRPC()
17
18 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
19 # Use self.target here, because we want this mapping to be annotated
20 # on the service, not the Ambassador.
21 # ambassador_id: [ {self.with_tracing.ambassador_id}, {self.no_tracing.ambassador_id} ]
22 yield self.target, self.format(
23 """
24---
25apiVersion: getambassador.io/v3alpha1
26kind: Mapping
27name: ratelimit_target_mapping
28hostname: "*"
29prefix: /target/
30service: {self.target.path.fqdn}
31labels:
32 ambassador:
33 - request_label_group:
34 - request_headers:
35 key: x-ambassador-test-allow
36 header_name: "x-ambassador-test-allow"
37 omit_if_not_present: true
38 - request_headers:
39 key: x-ambassador-test-headers-append
40 header_name: "x-ambassador-test-headers-append"
41 omit_if_not_present: true
42---
43apiVersion: getambassador.io/v3alpha1
44kind: Mapping
45name: ratelimit_label_mapping
46hostname: "*"
47prefix: /labels/
48service: {self.target.path.fqdn}
49labels:
50 ambassador:
51 - host_and_user:
52 - request_headers:
53 key: custom-label
54 header_name: ":authority"
55 omit_if_not_present: true
56 - request_headers:
57 key: user
58 header_name: "x-user"
59 omit_if_not_present: true
60
61 - omg_header:
62 - request_headers:
63 key: custom-label
64 header_name: "x-omg"
65 default: "OMFG!"
66"""
67 )
68
69 # For self.with_tracing, we want to configure the TracingService.
70 yield self, self.format(
71 """
72---
73apiVersion: getambassador.io/v3alpha1
74kind: RateLimitService
75name: {self.rls.path.k8s}
76service: "{self.rls.path.fqdn}"
77timeout_ms: 500
78"""
79 )
80
81 def queries(self):
82 # Speak through each Ambassador to the traced service...
83 # yield Query(self.with_tracing.url("target/"))
84 # yield Query(self.no_tracing.url("target/"))
85
86 # [0]
87 # No matching headers, won't even go through ratelimit-service filter
88 yield Query(self.url("target/"))
89
90 # [1]
91 # Header instructing dummy ratelimit-service to allow request
92 yield Query(
93 self.url("target/"),
94 expected=200,
95 headers={
96 "x-ambassador-test-allow": "true",
97 "x-ambassador-test-headers-append": "no header",
98 },
99 )
100
101 # [2]
102 # Header instructing dummy ratelimit-service to reject request with
103 # a custom response body
104 yield Query(
105 self.url("target/"),
106 expected=429,
107 headers={
108 "x-ambassador-test-allow": "over my dead body",
109 "x-ambassador-test-headers-append": "Hello=Foo; Hi=Baz",
110 },
111 )
112
113 def check(self):
114 # [2] Verifies the 429 response and the proper content-type.
115 # The kat-server gRPC ratelimit implementation explicitly overrides
116 # the content-type to json, because the response is in fact json
117 # and we need to verify that this override is possible/correct.
118 assert self.results[2].headers["Hello"] == ["Foo"]
119 assert self.results[2].headers["Hi"] == ["Baz"]
120 assert self.results[2].headers["Content-Type"] == ["application/json"]
121 assert self.results[2].headers["X-Grpc-Service-Protocol-Version"] == ["v2"]
122
123
124class RateLimitV1Test(AmbassadorTest):
125 # debug = True
126 target: ServiceType
127
128 def init(self):
129 self.target = HTTP()
130 self.rls = RLSGRPC()
131
132 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
133 # Use self.target here, because we want this mapping to be annotated
134 # on the service, not the Ambassador.
135 yield self.target, self.format(
136 """
137---
138apiVersion: getambassador.io/v3alpha1
139kind: Mapping
140name: ratelimit_target_mapping
141hostname: "*"
142prefix: /target/
143service: {self.target.path.fqdn}
144labels:
145 ambassador:
146 - request_label_group:
147 - request_headers:
148 key: x-ambassador-test-allow
149 header_name: "x-ambassador-test-allow"
150 omit_if_not_present: true
151 - request_headers:
152 key: x-ambassador-test-headers-append
153 header_name: "x-ambassador-test-headers-append"
154 omit_if_not_present: true
155"""
156 )
157
158 yield self, self.format(
159 """
160---
161apiVersion: getambassador.io/v3alpha1
162kind: RateLimitService
163name: {self.rls.path.k8s}
164service: "{self.rls.path.fqdn}"
165timeout_ms: 500
166"""
167 )
168
169 def queries(self):
170 # [0]
171 # No matching headers, won't even go through ratelimit-service filter
172 yield Query(self.url("target/"))
173
174 # [1]
175 # Header instructing dummy ratelimit-service to allow request
176 yield Query(
177 self.url("target/"),
178 expected=200,
179 headers={
180 "x-ambassador-test-allow": "true",
181 "x-ambassador-test-headers-append": "no header",
182 },
183 )
184
185 # [2]
186 # Header instructing dummy ratelimit-service to reject request
187 yield Query(
188 self.url("target/"),
189 expected=429,
190 headers={
191 "x-ambassador-test-allow": "over my dead body",
192 "x-ambassador-test-headers-append": "Hello=Foo; Hi=Baz",
193 },
194 )
195
196 def check(self):
197 # [2] Verifies the 429 response and the proper content-type.
198 # The kat-server gRPC ratelimit implementation explicitly overrides
199 # the content-type to json, because the response is in fact json
200 # and we need to verify that this override is possible/correct.
201 assert self.results[2].headers["Hello"] == ["Foo"]
202 assert self.results[2].headers["Hi"] == ["Baz"]
203 assert self.results[2].headers["Content-Type"] == ["application/json"]
204 assert self.results[2].headers["X-Grpc-Service-Protocol-Version"] == ["v2"]
205
206
207class RateLimitV1WithTLSTest(AmbassadorTest):
208 # debug = True
209 target: ServiceType
210
211 def init(self):
212 self.target = HTTP()
213 self.rls = RLSGRPC()
214
215 def manifests(self) -> str:
216 return (
217 f"""
218---
219apiVersion: v1
220data:
221 tls.crt: {TLSCerts["ratelimit.datawire.io"].k8s_crt}
222 tls.key: {TLSCerts["ratelimit.datawire.io"].k8s_key}
223kind: Secret
224metadata:
225 name: ratelimit-tls-secret
226type: kubernetes.io/tls
227"""
228 + super().manifests()
229 )
230
231 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
232 # Use self.target here, because we want this mapping to be annotated
233 # on the service, not the Ambassador.
234 yield self.target, self.format(
235 """
236---
237apiVersion: getambassador.io/v3alpha1
238kind: TLSContext
239name: ratelimit-tls-context
240secret: ratelimit-tls-secret
241alpn_protocols: h2
242---
243apiVersion: getambassador.io/v3alpha1
244kind: Mapping
245name: ratelimit_target_mapping
246hostname: "*"
247prefix: /target/
248service: {self.target.path.fqdn}
249labels:
250 ambassador:
251 - request_label_group:
252 - request_headers:
253 key: x-ambassador-test-allow
254 header_name: "x-ambassador-test-allow"
255 omit_if_not_present: true
256 - request_headers:
257 key: x-ambassador-test-headers-append
258 header_name: "x-ambassador-test-headers-append"
259 omit_if_not_present: true
260"""
261 )
262
263 yield self, self.format(
264 """
265---
266apiVersion: getambassador.io/v3alpha1
267kind: RateLimitService
268name: {self.rls.path.k8s}
269service: "{self.rls.path.fqdn}"
270timeout_ms: 500
271tls: ratelimit-tls-context
272"""
273 )
274
275 def queries(self):
276 # No matching headers, won't even go through ratelimit-service filter
277 yield Query(self.url("target/"))
278
279 # Header instructing dummy ratelimit-service to allow request
280 yield Query(self.url("target/"), expected=200, headers={"x-ambassador-test-allow": "true"})
281
282 # Header instructing dummy ratelimit-service to reject request
283 yield Query(
284 self.url("target/"),
285 expected=429,
286 headers={
287 "x-ambassador-test-allow": "nope",
288 "x-ambassador-test-headers-append": "Hello=Foo; Hi=Baz",
289 },
290 )
291
292 def check(self):
293 # [2] Verifies the 429 response and the proper content-type.
294 # The kat-server gRPC ratelimit implementation explicitly overrides
295 # the content-type to json, because the response is in fact json
296 # and we need to verify that this override is possible/correct.
297 assert self.results[2].headers["Hello"] == ["Foo"]
298 assert self.results[2].headers["Hi"] == ["Baz"]
299 assert self.results[2].headers["Content-Type"] == ["application/json"]
300 assert self.results[2].headers["X-Grpc-Service-Protocol-Version"] == ["v2"]
301
302
303class RateLimitVerTest(AmbassadorTest):
304 # debug = True
305 target: ServiceType
306 specified_protocol_version: Literal["v2", "v3", "default"]
307 expected_protocol_version: Literal["v2", "v3"]
308 rls: ServiceType
309
310 @classmethod
311 def variants(cls) -> Generator[Node, None, None]:
312 for protocol_version in ["v2", "v3", "default"]:
313 yield cls(protocol_version, name="{self.specified_protocol_version}")
314
315 def init(self, protocol_version: Literal["v2", "v3", "default"]):
316 self.target = HTTP()
317 self.specified_protocol_version = protocol_version
318 self.expected_protocol_version = "v2" if protocol_version == "default" else protocol_version
319 if Config.envoy_api_version == "V2" and self.expected_protocol_version == "v3":
320 self.skip_node = True
321 self.rls = RLSGRPC(protocol_version=self.expected_protocol_version)
322
323 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
324 # Use self.target here, because we want this mapping to be annotated
325 # on the service, not the Ambassador.
326 yield self.target, self.format(
327 """
328---
329apiVersion: getambassador.io/v3alpha1
330kind: Mapping
331name: ratelimit_target_mapping
332hostname: "*"
333prefix: /target/
334service: {self.target.path.fqdn}
335labels:
336 ambassador:
337 - request_label_group:
338 - request_headers:
339 key: x-ambassador-test-allow
340 header_name: "x-ambassador-test-allow"
341 omit_if_not_present: true
342 - request_headers:
343 key: x-ambassador-test-headers-append
344 header_name: "x-ambassador-test-headers-append"
345 omit_if_not_present: true
346"""
347 )
348
349 yield self, self.format(
350 """
351---
352apiVersion: getambassador.io/v3alpha1
353kind: RateLimitService
354name: {self.rls.path.k8s}
355service: "{self.rls.path.fqdn}"
356timeout_ms: 500
357"""
358 ) + (
359 ""
360 if self.specified_protocol_version == "default"
361 else f"protocol_version: '{self.specified_protocol_version}'"
362 )
363
364 def queries(self):
365 # [0]
366 # No matching headers, won't even go through ratelimit-service filter
367 yield Query(self.url("target/"))
368
369 # [1]
370 # Header instructing dummy ratelimit-service to allow request
371 yield Query(
372 self.url("target/"),
373 expected=200,
374 headers={
375 "x-ambassador-test-allow": "true",
376 "x-ambassador-test-headers-append": "no header",
377 },
378 )
379
380 # [2]
381 # Header instructing dummy ratelimit-service to reject request
382 yield Query(
383 self.url("target/"),
384 expected=429,
385 headers={
386 "x-ambassador-test-allow": "over my dead body",
387 "x-ambassador-test-headers-append": "Hello=Foo; Hi=Baz",
388 },
389 )
390
391 def check(self):
392 # [2] Verifies the 429 response and the proper content-type.
393 # The kat-server gRPC ratelimit implementation explicitly overrides
394 # the content-type to json, because the response is in fact json
395 # and we need to verify that this override is possible/correct.
396 assert self.results[2].headers["Hello"] == ["Foo"]
397 assert self.results[2].headers["Hi"] == ["Baz"]
398 assert self.results[2].headers["Content-Type"] == ["application/json"]
399 assert self.results[2].headers["X-Grpc-Service-Protocol-Version"] == [
400 self.expected_protocol_version
401 ]
View as plain text