1import json
2from random import random
3from typing import ClassVar, Generator, Tuple, Union
4
5from abstract_tests import AHTTP, HTTP, AmbassadorTest, Node, ServiceType
6from kat.harness import EDGE_STACK, Query
7
8# The phase that we should wait until before performing test checks. Normally
9# this would be phase 2, which is 10 seconds after the first wave of queries,
10# but we increase it to phase 3 here to make sure that Zipkin and other tracers
11# have _plenty_ of time to receive traces from Envoy and index them for retrieval
12# through the API. We've seen this test flake when the check is performed in phase
13# 2, so the hope is that phase 3 reduces the likelihood of the test flaking again.
14check_phase = 3
15
16
17class Zipkin(ServiceType):
18 skip_variant: ClassVar[bool] = True
19
20 def __init__(self, *args, **kwargs) -> None:
21 # We want to reset Zipkin between test runs. StatsD has a handy "reset" call that can do
22 # this... but the only way to reset Zipkin is to roll over the Pod. So, 'nonce' is a
23 # horrible hack to get the Pod to roll over each invocation.
24 self.nonce = random()
25 kwargs[
26 "service_manifests"
27 ] = """
28---
29apiVersion: v1
30kind: Service
31metadata:
32 name: {self.path.k8s}
33spec:
34 selector:
35 backend: {self.path.k8s}
36 ports:
37 - port: 9411
38 name: http
39 targetPort: http
40 type: ClusterIP
41---
42apiVersion: apps/v1
43kind: Deployment
44metadata:
45 name: {self.path.k8s}
46spec:
47 selector:
48 matchLabels:
49 backend: {self.path.k8s}
50 replicas: 1
51 strategy:
52 type: Recreate # rolling would be bad with the nonce hack
53 template:
54 metadata:
55 labels:
56 backend: {self.path.k8s}
57 spec:
58 containers:
59 - name: zipkin
60 image: openzipkin/zipkin:2.17
61 ports:
62 - name: http
63 containerPort: 9411
64 env:
65 - name: _nonce
66 value: '{self.nonce}'
67"""
68 super().__init__(*args, **kwargs)
69
70 def requirements(self):
71 yield ("url", Query(f"http://{self.path.fqdn}:9411/api/v2/services"))
72
73
74class Jaeger(ServiceType):
75 skip_variant: ClassVar[bool] = True
76
77 def __init__(self, *args, **kwargs) -> None:
78 # We want to reset Jaeger between test runs. StatsD has a handy "reset" call that can do
79 # this... but the only way to reset Jaeger is to roll over the Pod. So, 'nonce' is a
80 # horrible hack to get the Pod to roll over each invocation.
81 self.nonce = random()
82 kwargs[
83 "service_manifests"
84 ] = """
85---
86apiVersion: v1
87kind: Service
88metadata:
89 name: {self.path.k8s}
90spec:
91 selector:
92 backend: {self.path.k8s}
93 ports:
94 - port: 16686
95 name: http-json
96 targetPort: http-json
97 - port: 4317
98 name: otlp-grpc
99 targetPort: otlp-grpc
100 type: ClusterIP
101---
102apiVersion: apps/v1
103kind: Deployment
104metadata:
105 name: {self.path.k8s}
106spec:
107 selector:
108 matchLabels:
109 backend: {self.path.k8s}
110 replicas: 1
111 strategy:
112 type: Recreate # rolling would be bad with the nonce hack
113 template:
114 metadata:
115 labels:
116 backend: {self.path.k8s}
117 spec:
118 containers:
119 - name: jaeger
120 image: jaegertracing/all-in-one:1.42.0
121 ports:
122 - name: http-json
123 containerPort: 16686
124 - name: otlp-grpc
125 containerPort: 4317
126 env:
127 - name: _nonce
128 value: '{self.nonce}'
129 - name: COLLECTOR_OTLP_ENABLED
130 value: "true"
131"""
132 super().__init__(*args, **kwargs)
133
134 def requirements(self):
135 yield ("url", Query(f"http://{self.path.fqdn}:16686/api/services"))
136
137
138class TracingTest(AmbassadorTest):
139 def init(self):
140 self.target = HTTP()
141 self.zipkin = Zipkin()
142
143 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
144 # Use self.target here, because we want this mapping to be annotated
145 # on the service, not the Ambassador.
146
147 yield self.target, self.format(
148 """
149---
150apiVersion: getambassador.io/v3alpha1
151kind: Mapping
152name: tracing_target_mapping
153hostname: "*"
154prefix: /target/
155service: {self.target.path.fqdn}
156"""
157 )
158
159 # Configure the TracingService.
160 yield self, self.format(
161 """
162---
163apiVersion: getambassador.io/v3alpha1
164kind: TracingService
165name: tracing
166service: {self.zipkin.path.fqdn}:9411
167driver: zipkin
168tag_headers:
169 - "x-watsup"
170custom_tags:
171 - tag: ltag
172 literal:
173 value: lvalue
174 - tag: etag
175 environment:
176 name: UNKNOWN_ENV_VAR
177 default_value: efallback
178 - tag: htag
179 request_header:
180 name: x-something
181 default_value: hfallback
182"""
183 )
184
185 def queries(self):
186 # Speak through each Ambassador to the traced service...
187
188 for i in range(100):
189 yield Query(
190 self.url("target/"),
191 headers={"x-watsup": "nothin", "x-something": "something"},
192 phase=1,
193 )
194
195 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
196 # is particularly helpful.
197 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
198 yield Query(
199 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtest-default",
200 phase=check_phase,
201 )
202 yield Query(
203 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtest-default",
204 phase=check_phase,
205 )
206
207 # The diagnostics page should load properly
208 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
209
210 def check(self):
211 for i in range(100):
212 result = self.results[i]
213 assert result.backend
214 assert result.backend.name == self.target.path.k8s
215
216 print(f"self.results[100] = {self.results[100]}")
217 assert (
218 self.results[100].backend is not None and self.results[100].backend.name == "raw"
219 ), f"unexpected self.results[100] = {self.results[100]}"
220 assert len(self.results[100].backend.response) == 1
221 assert self.results[100].backend.response[0] == "tracingtest-default"
222
223 assert self.results[101].backend
224 assert self.results[101].backend.name == "raw"
225
226 tracelist = set(x for x in self.results[101].backend.response)
227 print(f"tracelist = {tracelist}")
228 assert "router tracingtest_http_default_svc_cluster_local egress" in tracelist
229
230 # Look for the host that we actually queried, since that's what appears in the spans.
231 assert self.results[0].backend
232 assert self.results[0].backend.request
233 assert self.results[0].backend.request.host in tracelist
234
235 # Ensure we generate 128-bit traceids by default
236 trace = self.results[102].json[0][0]
237 traceId = trace["traceId"]
238 assert len(traceId) == 32
239 for t in self.results[102].json[0]:
240 if t.get("tags", {}).get("node_id") == "test-id":
241 assert "ltag" in t["tags"]
242 assert t["tags"]["ltag"] == "lvalue"
243 assert "etag" in t["tags"]
244 assert t["tags"]["etag"] == "efallback"
245 assert "htag" in t["tags"]
246 assert t["tags"]["htag"] == "something"
247
248
249class TracingTestLongClusterName(AmbassadorTest):
250 def init(self):
251 self.target = HTTP()
252 # The full name ends up being `{testname}-{zipkin}-{name}`; so the name we pass in doesn't
253 # need to be as long as you'd think. Down in check() we'll do some asserts on
254 # self.zipkin.path.fqdn to make sure we got the desired length correct (we can't do those
255 # checks here because .path isn't populated yet during init()).
256 self.zipkin = Zipkin(name="longnamethatforcescompression")
257
258 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
259 # Use self.target here, because we want this mapping to be annotated
260 # on the service, not the Ambassador.
261
262 yield self.target, self.format(
263 """
264---
265apiVersion: getambassador.io/v3alpha1
266kind: Mapping
267name: tracing_target_mapping_longclustername
268hostname: "*"
269prefix: /target/
270service: {self.target.path.fqdn}
271"""
272 )
273
274 # Configure the TracingService.
275 yield self, self.format(
276 """
277---
278apiVersion: getambassador.io/v3alpha1
279kind: TracingService
280name: tracing-longclustername
281service: {self.zipkin.path.fqdn}:9411
282driver: zipkin
283"""
284 )
285
286 def queries(self):
287 # Speak through each Ambassador to the traced service...
288
289 for i in range(100):
290 yield Query(self.url("target/"), phase=1)
291
292 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
293 # is particularly helpful.
294 yield Query(
295 f"http://{self.zipkin.path.fqdn}:9411/api/v2/services",
296 phase=check_phase,
297 )
298 yield Query(
299 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestlongclustername-default",
300 phase=check_phase,
301 )
302 yield Query(
303 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestlongclustername-default",
304 phase=check_phase,
305 )
306
307 # The diagnostics page should load properly, even though our Tracing Service
308 # has a long cluster name https://github.com/datawire/ambassador/issues/3021
309 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
310
311 def check(self):
312 assert len(self.zipkin.path.fqdn.split(".")[0]) > 60
313 assert len(self.zipkin.path.fqdn.split(".")[0]) < 64
314
315 for i in range(100):
316 result = self.results[i]
317 assert result.backend
318 assert result.backend.name == self.target.path.k8s
319
320 print(f"self.results[100] = {self.results[100]}")
321 assert (
322 self.results[100].backend is not None and self.results[100].backend.name == "raw"
323 ), f"unexpected self.results[100] = {self.results[100]}"
324 assert len(self.results[100].backend.response) == 1
325 assert self.results[100].backend.response[0] == "tracingtestlongclustername-default"
326
327 assert self.results[101].backend
328 assert self.results[101].backend.name == "raw"
329
330 tracelist = set(x for x in self.results[101].backend.response)
331 print(f"tracelist = {tracelist}")
332 assert (
333 "router tracingtestlongclustername_http_default_svc_cluster_local egress" in tracelist
334 )
335
336 # Look for the host that we actually queried, since that's what appears in the spans.
337 assert self.results[0].backend
338 assert self.results[0].backend.request
339 assert self.results[0].backend.request.host in tracelist
340
341 # Ensure we generate 128-bit traceids by default
342 trace = self.results[102].json[0][0]
343 traceId = trace["traceId"]
344 assert len(traceId) == 32
345
346
347class TracingTestShortTraceId(AmbassadorTest):
348 def init(self):
349 self.target = HTTP()
350 self.zipkin = Zipkin()
351
352 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
353 # Use self.target here, because we want this mapping to be annotated
354 # on the service, not the Ambassador.
355
356 yield self.target, self.format(
357 """
358---
359apiVersion: getambassador.io/v3alpha1
360kind: Mapping
361name: tracing_target_mapping_64
362hostname: "*"
363prefix: /target-64/
364service: {self.target.path.fqdn}
365"""
366 )
367
368 # Configure the TracingService.
369 yield self, self.format(
370 """
371---
372apiVersion: getambassador.io/v3alpha1
373kind: TracingService
374name: tracing-64
375service: {self.zipkin.path.fqdn}:9411
376driver: zipkin
377config:
378 trace_id_128bit: false
379"""
380 )
381
382 def queries(self):
383 # Speak through each Ambassador to the traced service...
384 yield Query(self.url("target-64/"), phase=1)
385
386 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
387 # is particularly helpful.
388 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces", phase=check_phase)
389
390 # The diagnostics page should load properly
391 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
392
393 def check(self):
394 # Ensure we generated 64-bit traceids
395 trace = self.results[1].json[0][0]
396 traceId = trace["traceId"]
397 assert len(traceId) == 16
398
399
400# This test asserts that the external authorization server receives the proper tracing
401# headers when Ambassador is configured with an HTTP AuthService.
402class TracingExternalAuthTest(AmbassadorTest):
403 def init(self):
404 if EDGE_STACK:
405 self.xfail = "XFailing for now, custom AuthServices not supported in Edge Stack"
406 self.target = HTTP()
407 self.auth = AHTTP(name="auth")
408 self.zipkin = Zipkin()
409
410 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
411 yield self.target, self.format(
412 """
413---
414apiVersion: getambassador.io/v3alpha1
415kind: Mapping
416name: tracing_target_mapping
417hostname: "*"
418prefix: /target/
419service: {self.target.path.fqdn}
420"""
421 )
422
423 yield self, self.format(
424 """
425---
426apiVersion: getambassador.io/v3alpha1
427kind: TracingService
428name: tracing-auth
429service: {self.zipkin.path.k8s}:9411
430driver: zipkin
431"""
432 )
433
434 yield self, self.format(
435 """
436---
437apiVersion: getambassador.io/v3alpha1
438kind: AuthService
439name: {self.auth.path.k8s}
440auth_service: "{self.auth.path.fqdn}"
441path_prefix: "/extauth"
442allowed_request_headers:
443- Kat-Req-Extauth-Requested-Status
444- Kat-Req-Extauth-Requested-Header
445"""
446 )
447
448 def queries(self):
449 yield Query(
450 self.url("target/"), headers={"Kat-Req-Extuath-Requested-Status": "200"}, expected=200
451 )
452
453 def check(self):
454 extauth_res = json.loads(self.results[0].headers["Extauth"][0])
455 assert self.results[0].backend
456 assert self.results[0].backend.request
457 request_headers = self.results[0].backend.request.headers
458
459 assert self.results[0].status == 200
460 assert self.results[0].headers["Server"] == ["envoy"]
461 assert (
462 extauth_res["request"]["headers"]["x-b3-parentspanid"]
463 == request_headers["x-b3-parentspanid"]
464 )
465 assert extauth_res["request"]["headers"]["x-b3-sampled"] == request_headers["x-b3-sampled"]
466 assert extauth_res["request"]["headers"]["x-b3-spanid"] == request_headers["x-b3-spanid"]
467 assert extauth_res["request"]["headers"]["x-b3-traceid"] == request_headers["x-b3-traceid"]
468 assert extauth_res["request"]["headers"]["x-request-id"] == request_headers["x-request-id"]
469
470
471class TracingTestSampling(AmbassadorTest):
472 """
473 Test for the "sampling" in TracingServices
474 """
475
476 def init(self):
477 self.target = HTTP()
478 self.zipkin = Zipkin()
479
480 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
481 # Use self.target here, because we want this mapping to be annotated
482 # on the service, not the Ambassador.
483
484 yield self.target, self.format(
485 """
486---
487apiVersion: getambassador.io/v3alpha1
488kind: Mapping
489name: tracing_target_mapping_65
490hostname: "*"
491prefix: /target-65/
492service: {self.target.path.fqdn}
493"""
494 )
495
496 # Configure the TracingService.
497 yield self, self.format(
498 """
499---
500apiVersion: getambassador.io/v3alpha1
501kind: TracingService
502name: tracing-65
503service: {self.zipkin.path.fqdn}:9411
504driver: zipkin
505sampling:
506 overall: 10
507"""
508 )
509
510 def queries(self):
511 # Speak through each Ambassador to the traced service...
512 for i in range(0, 100):
513 yield Query(self.url("target-65/"), phase=1, ignore_result=True)
514
515 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
516 # is particularly helpful.
517 yield Query(
518 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?limit=10000", phase=check_phase
519 )
520
521 # The diagnostics page should load properly
522 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
523
524 def check(self):
525 traces = self.results[100].json
526
527 print("%d traces obtained" % len(traces))
528
529 # import json
530 # print(json.dumps(traces, indent=4, sort_keys=True))
531
532 # We constantly find that Envoy's RNG isn't exactly predictable with small sample
533 # sizes, so even though 10% of 100 is 10, we'll make this pass as long as we don't
534 # go over 50 or under 1.
535 assert 1 <= len(traces) <= 50
536
537
538class TracingTestZipkinV2(AmbassadorTest):
539 """
540 Test for the "collector_endpoint_version" Zipkin config in TracingServices
541 """
542
543 def init(self):
544 self.target = HTTP()
545 self.zipkin = Zipkin()
546
547 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
548 # Use self.target here, because we want this mapping to be annotated
549 # on the service, not the Ambassador.
550 yield self.target, self.format(
551 """
552---
553apiVersion: getambassador.io/v3alpha1
554kind: Mapping
555name: tracing_target_mapping
556hostname: "*"
557prefix: /target/
558service: {self.target.path.fqdn}
559"""
560 )
561
562 # Configure the TracingService.
563 yield self, self.format(
564 """
565---
566apiVersion: getambassador.io/v3alpha1
567kind: TracingService
568name: tracing
569service: {self.zipkin.path.fqdn}:9411
570driver: zipkin
571config:
572 collector_endpoint: /api/v2/spans
573 collector_endpoint_version: HTTP_JSON
574 collector_hostname: {self.zipkin.path.fqdn}
575"""
576 )
577
578 def requirements(self):
579 yield from super().requirements()
580 yield ("url", Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services"))
581
582 def queries(self):
583 # Speak through each Ambassador to the traced service...
584
585 for i in range(100):
586 yield Query(self.url("target/"), phase=1)
587
588 # ...then ask the Zipkin for services and spans. Including debug=True in these queries
589 # is particularly helpful.
590 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
591 yield Query(
592 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv2-default",
593 phase=check_phase,
594 )
595 yield Query(
596 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv2-default",
597 phase=check_phase,
598 )
599
600 # The diagnostics page should load properly
601 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
602
603 def check(self):
604 for i in range(100):
605 result = self.results[i]
606 assert result.backend
607 assert result.backend.name == self.target.path.k8s
608
609 print(f"self.results[100] = {self.results[100]}")
610 assert (
611 self.results[100].backend is not None and self.results[100].backend.name == "raw"
612 ), f"unexpected self.results[100] = {self.results[100]}"
613 assert len(self.results[100].backend.response) == 1
614 assert self.results[100].backend.response[0] == "tracingtestzipkinv2-default"
615
616 assert self.results[101].backend
617 assert self.results[101].backend.name == "raw"
618
619 tracelist = set(x for x in self.results[101].backend.response)
620 print(f"tracelist = {tracelist}")
621 assert "router tracingtestzipkinv2_http_default_svc_cluster_local egress" in tracelist
622
623 # Look for the host that we actually queried, since that's what appears in the spans.
624 assert self.results[0].backend
625 assert self.results[0].backend.request
626 assert self.results[0].backend.request.host in tracelist
627
628 # Ensure we generate 128-bit traceids by default
629 trace = self.results[102].json[0][0]
630 traceId = trace["traceId"]
631 assert len(traceId) == 32
632
633
634class TracingTestZipkinV1(AmbassadorTest):
635 """
636 Test for the "collector_endpoint_version" Zipkin config in TracingServices
637 """
638
639 def init(self):
640 self.target = HTTP()
641 self.zipkin = Zipkin()
642
643 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
644 # Use self.target here, because we want this mapping to be annotated
645 # on the service, not the Ambassador.
646
647 yield self.target, self.format(
648 """
649---
650apiVersion: getambassador.io/v3alpha1
651kind: Mapping
652name: tracing_target_mapping
653hostname: "*"
654prefix: /target/
655service: {self.target.path.fqdn}
656"""
657 )
658
659 # Configure the TracingService.
660 yield self, self.format(
661 """
662---
663apiVersion: getambassador.io/v3alpha1
664kind: TracingService
665name: tracing
666service: {self.zipkin.path.fqdn}:9411
667driver: zipkin
668config:
669 collector_endpoint: /api/v1/spans
670 collector_endpoint_version: HTTP_JSON_V1
671 collector_hostname: {self.zipkin.path.fqdn}
672"""
673 )
674
675 def queries(self):
676 # Speak through each Ambassador to the traced service...
677
678 for i in range(100):
679 yield Query(self.url("target/"), phase=1)
680
681 # result 100
682 yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
683 # result 101
684 yield Query(
685 f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv1-default",
686 phase=check_phase,
687 )
688 # result 102
689 yield Query(
690 f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv1-default",
691 phase=check_phase,
692 )
693
694 # The diagnostics page should load properly
695 yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
696
697 def check(self):
698 for i in range(100):
699 result = self.results[i]
700 assert result.backend
701 assert result.backend.name == self.target.path.k8s
702
703 # verify no services were captured
704 services = self.results[100].json
705 assert len(services) == 0
706
707 # verify no spans were captured
708 spans = self.results[101].json
709 assert len(spans) == 0
710
711 # verify no traces were captured
712 traces = self.results[102].json
713 assert len(traces) == 0
714
715
716class TracingTestOpentelemetry(AmbassadorTest):
717 def init(self):
718 self.target = HTTP()
719 self.jaeger = Jaeger()
720
721 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
722 # Use self.target here, because we want this mapping to be annotated
723 # on the service, not the Ambassador.
724
725 yield self.target, self.format(
726 """
727---
728apiVersion: getambassador.io/v3alpha1
729kind: Mapping
730name: tracing_target_mapping
731hostname: "*"
732prefix: /target/
733service: {self.target.path.fqdn}
734"""
735 )
736
737 # Configure the TracingService.
738 yield self, self.format(
739 """
740---
741apiVersion: getambassador.io/v3alpha1
742kind: TracingService
743name: tracing
744service: {self.jaeger.path.fqdn}:4317
745driver: opentelemetry
746custom_tags:
747 - tag: ltag
748 literal:
749 value: lvalue
750 - tag: htag
751 request_header:
752 name: x-something
753 default_value: hfallback
754"""
755 )
756
757 def queries(self):
758 # Speak through each Ambassador to the traced service...
759
760 for i in range(20):
761 yield Query(
762 self.url("target/"),
763 headers={"x-watsup": "nothin", "x-something": "something"},
764 phase=1,
765 )
766
767 # query index-20: ask Jaeger for services
768 yield Query(f"http://{self.jaeger.path.fqdn}:16686/api/services", phase=check_phase)
769
770 # query index-21: ask for envoy traces for ambassador service
771 # since the check_readiness also creates spans we need to pull more than 20 to ensure
772 # we capture all
773 yield Query(
774 f"http://{self.jaeger.path.fqdn}:16686/api/traces?service=ambassador&limit=100",
775 phase=check_phase,
776 )
777
778 def check(self):
779 for i in range(20):
780 result = self.results[i]
781 assert result.backend
782 assert result.backend.name == self.target.path.k8s
783
784 # verify "ambassador" is the list of services from jaeger
785 print(f"self.results[20] = {self.results[20]}")
786 assert (
787 self.results[20].json is not None and "ambassador" in self.results[20].json["data"]
788 ), f"unexpected self.results[20] = {self.results[20]}"
789
790 # verify traces for /target egress and its route
791 upstream_tracelist = self.results[21].json["data"]
792
793 for trace in upstream_tracelist:
794 spans = trace.get("spans", [])
795
796 for span in spans:
797 # Check if the egress span contains expected tags.
798 # For some reason the router span isn't resolving the htag request_header,
799 # and it's being set to hfallback. Leaving it out of scope for this test.
800 # this may be due to experimental nature of otel driver
801 isEgress = (
802 span["operationName"]
803 == "egress tracingtestopentelemetry.default.svc.cluster.local"
804 )
805
806 isTargetPath = any(
807 t
808 for t in span.get("tags", [])
809 if t["key"] == "http.url" and "/target" in t["value"]
810 )
811
812 if isEgress and isTargetPath:
813 tags = {x["key"]: x["value"] for x in span.get("tags", [])}
814 assert "ltag" in tags, tags
815 assert tags["ltag"] == "lvalue", tags
816 assert "htag" in tags, tags
817 assert tags["htag"] == "something", tags
View as plain text