1from typing import Generator, Tuple, Union
2
3import pytest
4
5from abstract_tests import HTTP, AmbassadorTest, Node, ServiceType, StatsDSink
6from kat.harness import Query
7
8
9class CircuitBreakingTest(AmbassadorTest):
10 target: ServiceType
11 statsd: ServiceType
12
13 TARGET_CLUSTER = "cluster_circuitbreakingtest_http_cbdc1p1"
14
15 def init(self):
16 self.target = HTTP()
17 self.statsd = StatsDSink(target_cluster=self.TARGET_CLUSTER)
18
19 def manifests(self) -> str:
20 self.manifest_envs += f"""
21 - name: STATSD_ENABLED
22 value: 'true'
23 - name: STATSD_HOST
24 value: '{self.statsd.path.fqdn}'
25"""
26 return super().manifests()
27
28 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
29 yield self, self.format(
30 """
31---
32apiVersion: getambassador.io/v3alpha1
33kind: Mapping
34name: {self.target.path.k8s}-pr
35hostname: "*"
36prefix: /{self.name}-pr/
37service: {self.target.path.fqdn}
38circuit_breakers:
39- priority: default
40 max_pending_requests: 1
41 max_connections: 1
42---
43apiVersion: getambassador.io/v3alpha1
44kind: Mapping
45name: {self.name}-reset
46case_sensitive: false
47hostname: "*"
48prefix: /reset/
49rewrite: /RESET/
50service: {self.statsd.path.fqdn}
51---
52apiVersion: getambassador.io/v3alpha1
53kind: Mapping
54name: {self.name}-dump
55case_sensitive: false
56hostname: "*"
57prefix: /dump/
58rewrite: /DUMP/
59service: {self.statsd.path.fqdn}
60"""
61 )
62
63 def requirements(self):
64 yield from super().requirements()
65 yield ("url", Query(self.url(self.name) + "-pr/"))
66 yield ("url", Query(self.url("RESET/")))
67
68 def queries(self):
69 # Reset the statsd setup in phase 1...
70 yield Query(self.url("RESET/"), phase=1)
71
72 # Run 200 queries in phase 2, after the reset...
73 for i in range(200):
74 yield Query(
75 self.url(self.name) + "-pr/",
76 headers={"Requested-Backend-Delay": "1000"},
77 ignore_result=True,
78 phase=2,
79 )
80
81 # ...then 200 more queries in phase 3. Why the split? Because we get flakes if we
82 # try to ram 500 through at once (in the middle of the run, we get some connections
83 # that time out).
84
85 for i in range(200):
86 yield Query(
87 self.url(self.name) + "-pr/",
88 headers={"Requested-Backend-Delay": "1000"},
89 ignore_result=True,
90 phase=3,
91 )
92
93 # Dump the results in phase 4, after the queries.
94 yield Query(self.url("DUMP/"), phase=4)
95
96 def check(self):
97 result_count = len(self.results)
98
99 failures = []
100
101 if result_count != 402:
102 failures.append(f"wanted 402 results, got {result_count}")
103 else:
104 pending_results = self.results[1:400]
105 stats = self.results[401].json or {}
106
107 # pending requests tests
108 pending_overloaded = 0
109 error = 0
110
111 # printed = False
112
113 for result in pending_results:
114 # if not printed:
115 # import json
116 # print(json.dumps(result.as_dict(), sort_keys=True, indent=2))
117 # printed = True
118
119 if result.error:
120 error += 1
121 elif "X-Envoy-Overloaded" in result.headers:
122 pending_overloaded += 1
123
124 failed = False
125
126 if not 300 < pending_overloaded < 400:
127 failures.append(
128 f"Expected between 300 and 400 overloaded, got {pending_overloaded}"
129 )
130
131 cluster_stats = stats.get(self.__class__.TARGET_CLUSTER, {})
132 rq_completed = cluster_stats.get("upstream_rq_completed", -1)
133 rq_pending_overflow = cluster_stats.get("upstream_rq_pending_overflow", -1)
134
135 if error != 0:
136 failures.append(f"Expected no errors but got {error}")
137
138 if rq_completed != 400:
139 failures.append(
140 f"Expected 400 completed requests to {self.__class__.TARGET_CLUSTER}, got {rq_completed}"
141 )
142
143 if abs(pending_overloaded - rq_pending_overflow) >= 2:
144 failures.append(
145 f"Expected {pending_overloaded} rq_pending_overflow, got {rq_pending_overflow}"
146 )
147
148 if failures:
149 print("%s FAILED:\n %s" % (self.name, "\n ".join(failures)))
150 pytest.xfail(f"FFS {self.name}")
151
152
153class GlobalCircuitBreakingTest(AmbassadorTest):
154 target: ServiceType
155
156 def init(self):
157 self.target = HTTP()
158
159 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
160 yield self, self.format(
161 """
162---
163apiVersion: getambassador.io/v3alpha1
164kind: Host
165name: cleartext-host
166port: 8080
167protocol: HTTP
168securityModel: INSECURE
169requestPolicy:
170 insecure:
171 action: Route
172---
173apiVersion: getambassador.io/v3alpha1
174kind: Mapping
175name: {self.target.path.k8s}-pr
176hostname: "*"
177prefix: /{self.name}-pr/
178service: {self.target.path.fqdn}
179circuit_breakers:
180- priority: default
181 max_pending_requests: 1024
182 max_connections: 1024
183---
184apiVersion: getambassador.io/v3alpha1
185kind: Mapping
186name: {self.target.path.k8s}-normal
187hostname: "*"
188prefix: /{self.name}-normal/
189service: {self.target.path.fqdn}
190---
191apiVersion: getambassador.io/v3alpha1
192kind: Module
193name: ambassador
194config:
195 circuit_breakers:
196 - priority: default
197 max_pending_requests: 5
198 max_connections: 5
199"""
200 )
201
202 def requirements(self):
203 yield from super().requirements()
204 yield ("url", Query(self.url(self.name) + "-pr/"))
205 yield ("url", Query(self.url(self.name) + "-normal/"))
206
207 def queries(self):
208 for i in range(200):
209 yield Query(
210 self.url(self.name) + "-pr/",
211 headers={"Requested-Backend-Delay": "1000"},
212 ignore_result=True,
213 phase=1,
214 )
215 for i in range(200):
216 yield Query(
217 self.url(self.name) + "-normal/",
218 headers={"Requested-Backend-Delay": "1000"},
219 ignore_result=True,
220 phase=1,
221 )
222
223 def check(self):
224 failures = []
225
226 if len(self.results) != 400:
227 failures.append(f"wanted 400 results, got {len(self.results)}")
228 else:
229 cb_mapping_results = self.results[0:200]
230 normal_mapping_results = self.results[200:400]
231
232 # '-pr' mapping tests: this is a priority class of connection
233 pr_mapping_overloaded = 0
234
235 for result in cb_mapping_results:
236 if "X-Envoy-Overloaded" in result.headers:
237 pr_mapping_overloaded += 1
238
239 if pr_mapping_overloaded != 0:
240 failures.append(f"[GCR] expected no -pr overloaded, got {pr_mapping_overloaded}")
241
242 # '-normal' mapping tests: global configuration should be in effect
243 normal_overloaded = 0
244 # printed = False
245
246 for result in normal_mapping_results:
247 # if not printed:
248 # import json
249 # print(json.dumps(result.as_dict(), sort_keys=True, indent=2))
250 # printed = True
251
252 if "X-Envoy-Overloaded" in result.headers:
253 normal_overloaded += 1
254
255 if not 100 < normal_overloaded < 200:
256 failures.append(
257 f"[GCF] expected 100-200 normal_overloaded, got {normal_overloaded}"
258 )
259
260 if failures:
261 print("%s FAILED:\n %s" % (self.name, "\n ".join(failures)))
262 pytest.xfail(f"FFS {self.name}")
263
264
265class CircuitBreakingTCPTest(AmbassadorTest):
266 extra_ports = [6789, 6790]
267
268 target1: ServiceType
269 target2: ServiceType
270
271 def init(self):
272 self.target1 = HTTP(name="target1")
273 self.target2 = HTTP(name="target2")
274
275 # config() must _yield_ tuples of Node, Ambassador-YAML where the
276 # Ambassador-YAML will be annotated onto the Node.
277
278 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
279 yield self.target1, self.format(
280 """
281---
282apiVersion: getambassador.io/v3alpha1
283kind: TCPMapping
284name: {self.name}-1
285port: 6789
286service: {self.target1.path.fqdn}:80
287"""
288 )
289 yield self.target2, self.format(
290 """
291---
292apiVersion: getambassador.io/v3alpha1
293kind: TCPMapping
294name: {self.name}-2
295port: 6790
296service: {self.target2.path.fqdn}:80
297circuit_breakers:
298- priority: default
299 max_pending_requests: 1
300 max_connections: 1
301"""
302 )
303
304 def queries(self):
305 for i in range(200):
306 yield Query(
307 self.url(self.name, port=6789),
308 headers={"Requested-Backend-Delay": "1000"},
309 ignore_result=True,
310 phase=1,
311 )
312 for i in range(200):
313 yield Query(
314 self.url(self.name, port=6790),
315 headers={"Requested-Backend-Delay": "1000"},
316 ignore_result=True,
317 phase=1,
318 )
319
320 def check(self):
321 failures = []
322
323 if len(self.results) != 400:
324 failures.append(f"wanted 400 results, got {len(self.results)}")
325 else:
326 default_limit_result = self.results[0:200]
327 low_limit_results = self.results[200:400]
328
329 default_limit_failure = 0
330
331 for result in default_limit_result:
332 if result.error:
333 default_limit_failure += 1
334
335 if default_limit_failure != 0:
336 failures.append(
337 f"expected no failure with default limit, got {default_limit_failure}"
338 )
339
340 low_limit_failure = 0
341
342 for result in low_limit_results:
343 if result.error:
344 low_limit_failure += 1
345
346 if not 100 < low_limit_failure < 200:
347 failures.append(f"expected 100-200 failure with low limit, got {low_limit_failure}")
348
349 if failures:
350 print("%s FAILED:\n %s" % (self.name, "\n ".join(failures)))
351 pytest.xfail(f"FFS {self.name}")
View as plain text