1import json
2from random import random
3from typing import ClassVar, Generator, Tuple, Union
4
5from abstract_tests import AHTTP, HTTP, AmbassadorTest, Node, ServiceType
6from kat.harness import EDGE_STACK, Query
7
8# The phase that we should wait until before performing test checks. Normally
9# this would be phase 2, which is 10 seconds after the first wave of queries,
10# but we increase it to phase 3 here to make sure that Zipkin and other tracers
11# have _plenty_ of time to receive traces from Envoy and index them for retrieval
12# through the API. We've seen this test flake when the check is performed in phase
13# 2, so the hope is that phase 3 reduces the likelihood of the test flaking again.
14check_phase = 3
15
16
17class Zipkin(ServiceType):
18 skip_variant: ClassVar[bool] = True
19
20 def __init__(self, *args, **kwargs) -> None:
21 # We want to reset Zipkin between test runs. StatsD has a handy "reset" call that can do
22 # this... but the only way to reset Zipkin is to roll over the Pod. So, 'nonce' is a
23 # horrible hack to get the Pod to roll over each invocation.
24 self.nonce = random()
25 kwargs[
26 "service_manifests"
27 ] = """
28---
29apiVersion: v1
30kind: Service
31metadata:
32 name: {self.path.k8s}
33spec:
34 selector:
35 backend: {self.path.k8s}
36 ports:
37 - port: 9411
38 name: http
39 targetPort: http
40 type: ClusterIP
41---
42apiVersion: apps/v1
43kind: Deployment
44metadata:
45 name: {self.path.k8s}
46spec:
47 selector:
48 matchLabels:
49 backend: {self.path.k8s}
50 replicas: 1
51 strategy:
52 type: Recreate # rolling would be bad with the nonce hack
53 template:
54 metadata:
55 labels:
56 backend: {self.path.k8s}
57 spec:
58 containers:
59 - name: zipkin
60 image: openzipkin/zipkin:2.17
61 ports:
62 - name: http
63 containerPort: 9411
64 env:
65 - name: _nonce
66 value: '{self.nonce}'
67"""
68 super().__init__(*args, **kwargs)
69
70 def requirements(self):
71 yield ("url", Query(f"http://{self.path.fqdn}:9411/api/v2/services"))
72
73
74class TracingTest(AmbassadorTest):
75 def init(self):
76 self.target = HTTP()
77 self.zipkin = Zipkin()
78
79 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
80 # Use self.target here, because we want this mapping to be annotated
81 # on the service, not the Ambassador.
82
83 yield self.target, self.format(
84 """
85---
86apiVersion: getambassador.io/v3alpha1
87kind: Mapping
88name: tracing_target_mapping
89hostname: "*"
90prefix: /target/
91service: {self.target.path.fqdn}
92"""
93 )
94
95 # Configure the TracingService.
96 yield self, self.format(
97 """
98---
99apiVersion: getambassador.io/v3alpha1
100kind: TracingService
101name: tracing
102service: {self.zipkin.path.fqdn}:9411
103driver: zipkin
104tag_headers:
105 - "x-watsup"
106"""
107 )
108
109 def queries(self):
110 # Speak through each Ambassador to the traced service...
111
112 for i in range(100):
113 yield Query(self.url("target/"), headers={"x-watsup": "nothin"}, phase=1)
114
115 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
116 # is particularly helpful.
117 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
118 yield Query(
119 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtest-default",
120 phase=check_phase,
121 )
122 yield Query(
123 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtest-default",
124 phase=check_phase,
125 )
126
127 # The diagnostics page should load properly
128 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
129
130 def check(self):
131 for i in range(100):
132 assert self.results[i].backend.name == self.target.path.k8s
133
134 print(f"self.results[100] = {self.results[100]}")
135 assert (
136 self.results[100].backend is not None and self.results[100].backend.name == "raw"
137 ), f"unexpected self.results[100] = {self.results[100]}"
138 assert len(self.results[100].backend.response) == 1
139 assert self.results[100].backend.response[0] == "tracingtest-default"
140
141 assert self.results[101].backend.name == "raw"
142
143 tracelist = set(x for x in self.results[101].backend.response)
144 print(f"tracelist = {tracelist}")
145 assert (
146 "router cluster_tracingtest_http_default_svc_cluster_local_default egress" in tracelist
147 )
148
149 # Look for the host that we actually queried, since that's what appears in the spans.
150 assert self.results[0].backend.request.host in tracelist
151
152 # Ensure we generate 128-bit traceids by default
153 trace = self.results[102].json[0][0]
154 traceId = trace["traceId"]
155 assert len(traceId) == 32
156 for t in self.results[102].json[0]:
157 if t.get("tags", {}).get("node_id") == "test-id":
158 assert "x-watsup" in t["tags"]
159 assert t["tags"]["x-watsup"] == "nothin"
160
161
162class TracingTestLongClusterName(AmbassadorTest):
163 def init(self):
164 self.target = HTTP()
165 # The full name ends up being `{testname}-{zipkin}-{name}`; so the name we pass in doesn't
166 # need to be as long as you'd think. Down in check() we'll do some asserts on
167 # self.zipkin.path.fqdn to make sure we got the desired length correct (we can't do those
168 # checks here because .path isn't populated yet during init()).
169 self.zipkin = Zipkin(name="longnamethatforcescompression")
170
171 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
172 # Use self.target here, because we want this mapping to be annotated
173 # on the service, not the Ambassador.
174
175 yield self.target, self.format(
176 """
177---
178apiVersion: getambassador.io/v3alpha1
179kind: Mapping
180name: tracing_target_mapping_longclustername
181hostname: "*"
182prefix: /target/
183service: {self.target.path.fqdn}
184"""
185 )
186
187 # Configure the TracingService.
188 yield self, self.format(
189 """
190---
191apiVersion: getambassador.io/v3alpha1
192kind: TracingService
193name: tracing-longclustername
194service: {self.zipkin.path.fqdn}:9411
195driver: zipkin
196"""
197 )
198
199 def queries(self):
200 # Speak through each Ambassador to the traced service...
201
202 for i in range(100):
203 yield Query(self.url("target/"), phase=1)
204
205 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
206 # is particularly helpful.
207 yield Query(
208 f"http://{self.zipkin.path.fqdn}:9411/api/v2/services",
209 phase=check_phase,
210 )
211 yield Query(
212 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestlongclustername-default",
213 phase=check_phase,
214 )
215 yield Query(
216 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestlongclustername-default",
217 phase=check_phase,
218 )
219
220 # The diagnostics page should load properly, even though our Tracing Service
221 # has a long cluster name https://github.com/datawire/ambassador/issues/3021
222 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
223
224 def check(self):
225 assert len(self.zipkin.path.fqdn.split(".")[0]) > 60
226 assert len(self.zipkin.path.fqdn.split(".")[0]) < 64
227
228 for i in range(100):
229 assert self.results[i].backend.name == self.target.path.k8s
230
231 print(f"self.results[100] = {self.results[100]}")
232 assert (
233 self.results[100].backend is not None and self.results[100].backend.name == "raw"
234 ), f"unexpected self.results[100] = {self.results[100]}"
235 assert len(self.results[100].backend.response) == 1
236 assert self.results[100].backend.response[0] == "tracingtestlongclustername-default"
237
238 assert self.results[101].backend.name == "raw"
239
240 tracelist = set(x for x in self.results[101].backend.response)
241 print(f"tracelist = {tracelist}")
242 assert "router cluster_tracingtestlongclustername_http_-0 egress" in tracelist
243
244 # Look for the host that we actually queried, since that's what appears in the spans.
245 assert self.results[0].backend.request.host in tracelist
246
247 # Ensure we generate 128-bit traceids by default
248 trace = self.results[102].json[0][0]
249 traceId = trace["traceId"]
250 assert len(traceId) == 32
251
252
253class TracingTestShortTraceId(AmbassadorTest):
254 def init(self):
255 self.target = HTTP()
256 self.zipkin = Zipkin()
257
258 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
259 # Use self.target here, because we want this mapping to be annotated
260 # on the service, not the Ambassador.
261
262 yield self.target, self.format(
263 """
264---
265apiVersion: getambassador.io/v3alpha1
266kind: Mapping
267name: tracing_target_mapping_64
268hostname: "*"
269prefix: /target-64/
270service: {self.target.path.fqdn}
271"""
272 )
273
274 # Configure the TracingService.
275 yield self, self.format(
276 """
277---
278apiVersion: getambassador.io/v3alpha1
279kind: TracingService
280name: tracing-64
281service: {self.zipkin.path.fqdn}:9411
282driver: zipkin
283config:
284 trace_id_128bit: false
285"""
286 )
287
288 def queries(self):
289 # Speak through each Ambassador to the traced service...
290 yield Query(self.url("target-64/"), phase=1)
291
292 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
293 # is particularly helpful.
294 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces", phase=check_phase)
295
296 # The diagnostics page should load properly
297 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
298
299 def check(self):
300 # Ensure we generated 64-bit traceids
301 trace = self.results[1].json[0][0]
302 traceId = trace["traceId"]
303 assert len(traceId) == 16
304
305
306# This test asserts that the external authorization server receives the proper tracing
307# headers when Ambassador is configured with an HTTP AuthService.
308class TracingExternalAuthTest(AmbassadorTest):
309 def init(self):
310 if EDGE_STACK:
311 self.xfail = "XFailing for now, custom AuthServices not supported in Edge Stack"
312 self.target = HTTP()
313 self.auth = AHTTP(name="auth")
314 self.zipkin = Zipkin()
315
316 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
317 yield self.target, self.format(
318 """
319---
320apiVersion: getambassador.io/v3alpha1
321kind: Mapping
322name: tracing_target_mapping
323hostname: "*"
324prefix: /target/
325service: {self.target.path.fqdn}
326"""
327 )
328
329 yield self, self.format(
330 """
331---
332apiVersion: getambassador.io/v3alpha1
333kind: TracingService
334name: tracing-auth
335service: {self.zipkin.path.k8s}:9411
336driver: zipkin
337"""
338 )
339
340 yield self, self.format(
341 """
342---
343apiVersion: getambassador.io/v3alpha1
344kind: AuthService
345name: {self.auth.path.k8s}
346auth_service: "{self.auth.path.fqdn}"
347path_prefix: "/extauth"
348allowed_request_headers:
349- Requested-Status
350- Requested-Header
351"""
352 )
353
354 def queries(self):
355 yield Query(self.url("target/"), headers={"Requested-Status": "200"}, expected=200)
356
357 def check(self):
358 extauth_res = json.loads(self.results[0].headers["Extauth"][0])
359 request_headers = self.results[0].backend.request.headers
360
361 assert self.results[0].status == 200
362 assert self.results[0].headers["Server"] == ["envoy"]
363 assert (
364 extauth_res["request"]["headers"]["x-b3-parentspanid"]
365 == request_headers["x-b3-parentspanid"]
366 )
367 assert extauth_res["request"]["headers"]["x-b3-sampled"] == request_headers["x-b3-sampled"]
368 assert extauth_res["request"]["headers"]["x-b3-spanid"] == request_headers["x-b3-spanid"]
369 assert extauth_res["request"]["headers"]["x-b3-traceid"] == request_headers["x-b3-traceid"]
370 assert extauth_res["request"]["headers"]["x-request-id"] == request_headers["x-request-id"]
371
372
373class TracingTestSampling(AmbassadorTest):
374 """
375 Test for the "sampling" in TracingServices
376 """
377
378 def init(self):
379 self.target = HTTP()
380 self.zipkin = Zipkin()
381
382 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
383 # Use self.target here, because we want this mapping to be annotated
384 # on the service, not the Ambassador.
385
386 yield self.target, self.format(
387 """
388---
389apiVersion: getambassador.io/v3alpha1
390kind: Mapping
391name: tracing_target_mapping_65
392hostname: "*"
393prefix: /target-65/
394service: {self.target.path.fqdn}
395"""
396 )
397
398 # Configure the TracingService.
399 yield self, self.format(
400 """
401---
402apiVersion: getambassador.io/v3alpha1
403kind: TracingService
404name: tracing-65
405service: {self.zipkin.path.fqdn}:9411
406driver: zipkin
407sampling:
408 overall: 10
409"""
410 )
411
412 def queries(self):
413 # Speak through each Ambassador to the traced service...
414 for i in range(0, 100):
415 yield Query(self.url("target-65/"), phase=1, ignore_result=True)
416
417 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
418 # is particularly helpful.
419 yield Query(
420 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?limit=10000", phase=check_phase
421 )
422
423 # The diagnostics page should load properly
424 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
425
426 def check(self):
427 traces = self.results[100].json
428
429 print("%d traces obtained" % len(traces))
430
431 # import json
432 # print(json.dumps(traces, indent=4, sort_keys=True))
433
434 # We constantly find that Envoy's RNG isn't exactly predictable with small sample
435 # sizes, so even though 10% of 100 is 10, we'll make this pass as long as we don't
436 # go over 50 or under 1.
437 assert 1 <= len(traces) <= 50
438
439
440class TracingTestZipkinV2(AmbassadorTest):
441 """
442 Test for the "collector_endpoint_version" Zipkin config in TracingServices
443 """
444
445 def init(self):
446 self.target = HTTP()
447 self.zipkin = Zipkin()
448
449 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
450 # Use self.target here, because we want this mapping to be annotated
451 # on the service, not the Ambassador.
452 yield self.target, self.format(
453 """
454---
455apiVersion: getambassador.io/v3alpha1
456kind: Mapping
457name: tracing_target_mapping
458hostname: "*"
459prefix: /target/
460service: {self.target.path.fqdn}
461"""
462 )
463
464 # Configure the TracingService.
465 yield self, self.format(
466 """
467---
468apiVersion: getambassador.io/v3alpha1
469kind: TracingService
470name: tracing
471service: {self.zipkin.path.fqdn}:9411
472driver: zipkin
473config:
474 collector_endpoint: /api/v2/spans
475 collector_endpoint_version: HTTP_JSON
476 collector_hostname: {self.zipkin.path.fqdn}
477"""
478 )
479
480 def requirements(self):
481 yield from super().requirements()
482 yield ("url", Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services"))
483
484 def queries(self):
485 # Speak through each Ambassador to the traced service...
486
487 for i in range(100):
488 yield Query(self.url("target/"), phase=1)
489
490 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
491 # is particularly helpful.
492 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
493 yield Query(
494 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv2-default",
495 phase=check_phase,
496 )
497 yield Query(
498 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv2-default",
499 phase=check_phase,
500 )
501
502 # The diagnostics page should load properly
503 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
504
505 def check(self):
506 for i in range(100):
507 assert self.results[i].backend.name == self.target.path.k8s
508
509 print(f"self.results[100] = {self.results[100]}")
510 assert (
511 self.results[100].backend is not None and self.results[100].backend.name == "raw"
512 ), f"unexpected self.results[100] = {self.results[100]}"
513 assert len(self.results[100].backend.response) == 1
514 assert self.results[100].backend.response[0] == "tracingtestzipkinv2-default"
515
516 assert self.results[101].backend.name == "raw"
517
518 tracelist = set(x for x in self.results[101].backend.response)
519 print(f"tracelist = {tracelist}")
520 assert "router cluster_tracingtestzipkinv2_http_default-0 egress" in tracelist
521
522 # Look for the host that we actually queried, since that's what appears in the spans.
523 assert self.results[0].backend.request.host in tracelist
524
525 # Ensure we generate 128-bit traceids by default
526 trace = self.results[102].json[0][0]
527 traceId = trace["traceId"]
528 assert len(traceId) == 32
529
530
531class TracingTestZipkinV1(AmbassadorTest):
532 """
533 Test for the "collector_endpoint_version" Zipkin config in TracingServices
534 """
535
536 def init(self):
537 self.target = HTTP()
538 self.zipkin = Zipkin()
539
540 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
541 # Use self.target here, because we want this mapping to be annotated
542 # on the service, not the Ambassador.
543
544 yield self.target, self.format(
545 """
546---
547apiVersion: getambassador.io/v3alpha1
548kind: Mapping
549name: tracing_target_mapping
550hostname: "*"
551prefix: /target/
552service: {self.target.path.fqdn}
553"""
554 )
555
556 # Configure the TracingService.
557 yield self, self.format(
558 """
559---
560apiVersion: getambassador.io/v3alpha1
561kind: TracingService
562name: tracing
563service: {self.zipkin.path.fqdn}:9411
564driver: zipkin
565config:
566 collector_endpoint: /api/v1/spans
567 collector_endpoint_version: HTTP_JSON_V1
568 collector_hostname: {self.zipkin.path.fqdn}
569"""
570 )
571
572 def queries(self):
573 # Speak through each Ambassador to the traced service...
574
575 for i in range(100):
576 yield Query(self.url("target/"), phase=1)
577
578 # result 100
579 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
580 # result 101
581 yield Query(
582 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv1-default",
583 phase=check_phase,
584 )
585 yield Query(
586 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv1-default",
587 phase=check_phase,
588 )
589
590 # The diagnostics page should load properly
591 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
592
593 def check(self):
594 for i in range(100):
595 assert self.results[i].backend.name == self.target.path.k8s
596
597 print(f"self.results[100] = {self.results[100]}")
598 assert (
599 self.results[100].backend is not None and self.results[100].backend.name == "raw"
600 ), f"unexpected self.results[100] = {self.results[100]}"
601 assert len(self.results[100].backend.response) == 1
602 assert self.results[100].backend.response[0] == "tracingtestzipkinv1-default"
603
604 assert self.results[101].backend.name == "raw"
605
606 tracelist = set(x for x in self.results[101].backend.response)
607 print(f"tracelist = {tracelist}")
608
609 assert "router cluster_tracingtestzipkinv1_http_default-0 egress" in tracelist
610
611 # Look for the host that we actually queried, since that's what appears in the spans.
612 assert self.results[0].backend.request.host in tracelist
613
614 # Ensure we generate 128-bit traceids by default
615 trace = self.results[102].json[0][0]
616 traceId = trace["traceId"]
617 assert len(traceId) == 32
View as plain text