...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/kat/t_tracing.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/kat

     1import json
     2from random import random
     3from typing import ClassVar, Generator, Tuple, Union
     4
     5from abstract_tests import AHTTP, HTTP, AmbassadorTest, Node, ServiceType
     6from kat.harness import EDGE_STACK, Query
     7
     8# The phase that we should wait until before performing test checks. Normally
     9# this would be phase 2, which is 10 seconds after the first wave of queries,
    10# but we increase it to phase 3 here to make sure that Zipkin and other tracers
    11# have _plenty_ of time to receive traces from Envoy and index them for retrieval
    12# through the API. We've seen this test flake when the check is performed in phase
    13# 2, so the hope is that phase 3 reduces the likelihood of the test flaking again.
    14check_phase = 3
    15
    16
    17class Zipkin(ServiceType):
    18    skip_variant: ClassVar[bool] = True
    19
    20    def __init__(self, *args, **kwargs) -> None:
    21        # We want to reset Zipkin between test runs.  StatsD has a handy "reset" call that can do
    22        # this... but the only way to reset Zipkin is to roll over the Pod.  So, 'nonce' is a
    23        # horrible hack to get the Pod to roll over each invocation.
    24        self.nonce = random()
    25        kwargs[
    26            "service_manifests"
    27        ] = """
    28---
    29apiVersion: v1
    30kind: Service
    31metadata:
    32  name: {self.path.k8s}
    33spec:
    34  selector:
    35    backend: {self.path.k8s}
    36  ports:
    37  - port: 9411
    38    name: http
    39    targetPort: http
    40  type: ClusterIP
    41---
    42apiVersion: apps/v1
    43kind: Deployment
    44metadata:
    45  name: {self.path.k8s}
    46spec:
    47  selector:
    48    matchLabels:
    49      backend: {self.path.k8s}
    50  replicas: 1
    51  strategy:
    52    type: Recreate # rolling would be bad with the nonce hack
    53  template:
    54    metadata:
    55      labels:
    56        backend: {self.path.k8s}
    57    spec:
    58      containers:
    59      - name: zipkin
    60        image: openzipkin/zipkin:2.17
    61        ports:
    62        - name: http
    63          containerPort: 9411
    64        env:
    65        - name: _nonce
    66          value: '{self.nonce}'
    67"""
    68        super().__init__(*args, **kwargs)
    69
    70    def requirements(self):
    71        yield ("url", Query(f"http://{self.path.fqdn}:9411/api/v2/services"))
    72
    73
    74class Jaeger(ServiceType):
    75    skip_variant: ClassVar[bool] = True
    76
    77    def __init__(self, *args, **kwargs) -> None:
    78        # We want to reset Jaeger between test runs.  StatsD has a handy "reset" call that can do
    79        # this... but the only way to reset Jaeger is to roll over the Pod.  So, 'nonce' is a
    80        # horrible hack to get the Pod to roll over each invocation.
    81        self.nonce = random()
    82        kwargs[
    83            "service_manifests"
    84        ] = """
    85---
    86apiVersion: v1
    87kind: Service
    88metadata:
    89  name: {self.path.k8s}
    90spec:
    91  selector:
    92    backend: {self.path.k8s}
    93  ports:
    94  - port: 16686
    95    name: http-json
    96    targetPort: http-json
    97  - port: 4317
    98    name: otlp-grpc
    99    targetPort: otlp-grpc
   100  type: ClusterIP
   101---
   102apiVersion: apps/v1
   103kind: Deployment
   104metadata:
   105  name: {self.path.k8s}
   106spec:
   107  selector:
   108    matchLabels:
   109      backend: {self.path.k8s}
   110  replicas: 1
   111  strategy:
   112    type: Recreate # rolling would be bad with the nonce hack
   113  template:
   114    metadata:
   115      labels:
   116        backend: {self.path.k8s}
   117    spec:
   118      containers:
   119      - name: jaeger
   120        image: jaegertracing/all-in-one:1.42.0
   121        ports:
   122        - name: http-json
   123          containerPort: 16686
   124        - name: otlp-grpc
   125          containerPort: 4317
   126        env:
   127        - name: _nonce
   128          value: '{self.nonce}'
   129        - name: COLLECTOR_OTLP_ENABLED
   130          value: "true"
   131"""
   132        super().__init__(*args, **kwargs)
   133
   134    def requirements(self):
   135        yield ("url", Query(f"http://{self.path.fqdn}:16686/api/services"))
   136
   137
   138class TracingTest(AmbassadorTest):
   139    def init(self):
   140        self.target = HTTP()
   141        self.zipkin = Zipkin()
   142
   143    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   144        # Use self.target here, because we want this mapping to be annotated
   145        # on the service, not the Ambassador.
   146
   147        yield self.target, self.format(
   148            """
   149---
   150apiVersion: getambassador.io/v3alpha1
   151kind: Mapping
   152name:  tracing_target_mapping
   153hostname: "*"
   154prefix: /target/
   155service: {self.target.path.fqdn}
   156"""
   157        )
   158
   159        # Configure the TracingService.
   160        yield self, self.format(
   161            """
   162---
   163apiVersion: getambassador.io/v3alpha1
   164kind: TracingService
   165name: tracing
   166service: {self.zipkin.path.fqdn}:9411
   167driver: zipkin
   168tag_headers:
   169  - "x-watsup"
   170custom_tags:
   171  - tag: ltag
   172    literal:
   173      value: lvalue
   174  - tag: etag
   175    environment:
   176      name: UNKNOWN_ENV_VAR
   177      default_value: efallback
   178  - tag: htag
   179    request_header:
   180      name: x-something
   181      default_value: hfallback
   182"""
   183        )
   184
   185    def queries(self):
   186        # Speak through each Ambassador to the traced service...
   187
   188        for i in range(100):
   189            yield Query(
   190                self.url("target/"),
   191                headers={"x-watsup": "nothin", "x-something": "something"},
   192                phase=1,
   193            )
   194
   195        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   196        # is particularly helpful.
   197        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
   198        yield Query(
   199            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtest-default",
   200            phase=check_phase,
   201        )
   202        yield Query(
   203            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtest-default",
   204            phase=check_phase,
   205        )
   206
   207        # The diagnostics page should load properly
   208        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   209
   210    def check(self):
   211        for i in range(100):
   212            result = self.results[i]
   213            assert result.backend
   214            assert result.backend.name == self.target.path.k8s
   215
   216        print(f"self.results[100] = {self.results[100]}")
   217        assert (
   218            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   219        ), f"unexpected self.results[100] = {self.results[100]}"
   220        assert len(self.results[100].backend.response) == 1
   221        assert self.results[100].backend.response[0] == "tracingtest-default"
   222
   223        assert self.results[101].backend
   224        assert self.results[101].backend.name == "raw"
   225
   226        tracelist = set(x for x in self.results[101].backend.response)
   227        print(f"tracelist = {tracelist}")
   228        assert "router tracingtest_http_default_svc_cluster_local egress" in tracelist
   229
   230        # Look for the host that we actually queried, since that's what appears in the spans.
   231        assert self.results[0].backend
   232        assert self.results[0].backend.request
   233        assert self.results[0].backend.request.host in tracelist
   234
   235        # Ensure we generate 128-bit traceids by default
   236        trace = self.results[102].json[0][0]
   237        traceId = trace["traceId"]
   238        assert len(traceId) == 32
   239        for t in self.results[102].json[0]:
   240            if t.get("tags", {}).get("node_id") == "test-id":
   241                assert "ltag" in t["tags"]
   242                assert t["tags"]["ltag"] == "lvalue"
   243                assert "etag" in t["tags"]
   244                assert t["tags"]["etag"] == "efallback"
   245                assert "htag" in t["tags"]
   246                assert t["tags"]["htag"] == "something"
   247
   248
   249class TracingTestLongClusterName(AmbassadorTest):
   250    def init(self):
   251        self.target = HTTP()
   252        # The full name ends up being `{testname}-{zipkin}-{name}`; so the name we pass in doesn't
   253        # need to be as long as you'd think.  Down in check() we'll do some asserts on
   254        # self.zipkin.path.fqdn to make sure we got the desired length correct (we can't do those
   255        # checks here because .path isn't populated yet during init()).
   256        self.zipkin = Zipkin(name="longnamethatforcescompression")
   257
   258    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   259        # Use self.target here, because we want this mapping to be annotated
   260        # on the service, not the Ambassador.
   261
   262        yield self.target, self.format(
   263            """
   264---
   265apiVersion: getambassador.io/v3alpha1
   266kind: Mapping
   267name:  tracing_target_mapping_longclustername
   268hostname: "*"
   269prefix: /target/
   270service: {self.target.path.fqdn}
   271"""
   272        )
   273
   274        # Configure the TracingService.
   275        yield self, self.format(
   276            """
   277---
   278apiVersion: getambassador.io/v3alpha1
   279kind: TracingService
   280name: tracing-longclustername
   281service: {self.zipkin.path.fqdn}:9411
   282driver: zipkin
   283"""
   284        )
   285
   286    def queries(self):
   287        # Speak through each Ambassador to the traced service...
   288
   289        for i in range(100):
   290            yield Query(self.url("target/"), phase=1)
   291
   292        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   293        # is particularly helpful.
   294        yield Query(
   295            f"http://{self.zipkin.path.fqdn}:9411/api/v2/services",
   296            phase=check_phase,
   297        )
   298        yield Query(
   299            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestlongclustername-default",
   300            phase=check_phase,
   301        )
   302        yield Query(
   303            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestlongclustername-default",
   304            phase=check_phase,
   305        )
   306
   307        # The diagnostics page should load properly, even though our Tracing Service
   308        # has a long cluster name https://github.com/datawire/ambassador/issues/3021
   309        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   310
   311    def check(self):
   312        assert len(self.zipkin.path.fqdn.split(".")[0]) > 60
   313        assert len(self.zipkin.path.fqdn.split(".")[0]) < 64
   314
   315        for i in range(100):
   316            result = self.results[i]
   317            assert result.backend
   318            assert result.backend.name == self.target.path.k8s
   319
   320        print(f"self.results[100] = {self.results[100]}")
   321        assert (
   322            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   323        ), f"unexpected self.results[100] = {self.results[100]}"
   324        assert len(self.results[100].backend.response) == 1
   325        assert self.results[100].backend.response[0] == "tracingtestlongclustername-default"
   326
   327        assert self.results[101].backend
   328        assert self.results[101].backend.name == "raw"
   329
   330        tracelist = set(x for x in self.results[101].backend.response)
   331        print(f"tracelist = {tracelist}")
   332        assert (
   333            "router tracingtestlongclustername_http_default_svc_cluster_local egress" in tracelist
   334        )
   335
   336        # Look for the host that we actually queried, since that's what appears in the spans.
   337        assert self.results[0].backend
   338        assert self.results[0].backend.request
   339        assert self.results[0].backend.request.host in tracelist
   340
   341        # Ensure we generate 128-bit traceids by default
   342        trace = self.results[102].json[0][0]
   343        traceId = trace["traceId"]
   344        assert len(traceId) == 32
   345
   346
   347class TracingTestShortTraceId(AmbassadorTest):
   348    def init(self):
   349        self.target = HTTP()
   350        self.zipkin = Zipkin()
   351
   352    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   353        # Use self.target here, because we want this mapping to be annotated
   354        # on the service, not the Ambassador.
   355
   356        yield self.target, self.format(
   357            """
   358---
   359apiVersion: getambassador.io/v3alpha1
   360kind: Mapping
   361name:  tracing_target_mapping_64
   362hostname: "*"
   363prefix: /target-64/
   364service: {self.target.path.fqdn}
   365"""
   366        )
   367
   368        # Configure the TracingService.
   369        yield self, self.format(
   370            """
   371---
   372apiVersion: getambassador.io/v3alpha1
   373kind: TracingService
   374name: tracing-64
   375service: {self.zipkin.path.fqdn}:9411
   376driver: zipkin
   377config:
   378  trace_id_128bit: false
   379"""
   380        )
   381
   382    def queries(self):
   383        # Speak through each Ambassador to the traced service...
   384        yield Query(self.url("target-64/"), phase=1)
   385
   386        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   387        # is particularly helpful.
   388        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces", phase=check_phase)
   389
   390        # The diagnostics page should load properly
   391        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   392
   393    def check(self):
   394        # Ensure we generated 64-bit traceids
   395        trace = self.results[1].json[0][0]
   396        traceId = trace["traceId"]
   397        assert len(traceId) == 16
   398
   399
   400# This test asserts that the external authorization server receives the proper tracing
   401# headers when Ambassador is configured with an HTTP AuthService.
   402class TracingExternalAuthTest(AmbassadorTest):
   403    def init(self):
   404        if EDGE_STACK:
   405            self.xfail = "XFailing for now, custom AuthServices not supported in Edge Stack"
   406        self.target = HTTP()
   407        self.auth = AHTTP(name="auth")
   408        self.zipkin = Zipkin()
   409
   410    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   411        yield self.target, self.format(
   412            """
   413---
   414apiVersion: getambassador.io/v3alpha1
   415kind: Mapping
   416name:  tracing_target_mapping
   417hostname: "*"
   418prefix: /target/
   419service: {self.target.path.fqdn}
   420"""
   421        )
   422
   423        yield self, self.format(
   424            """
   425---
   426apiVersion: getambassador.io/v3alpha1
   427kind: TracingService
   428name: tracing-auth
   429service: {self.zipkin.path.k8s}:9411
   430driver: zipkin
   431"""
   432        )
   433
   434        yield self, self.format(
   435            """
   436---
   437apiVersion: getambassador.io/v3alpha1
   438kind: AuthService
   439name:  {self.auth.path.k8s}
   440auth_service: "{self.auth.path.fqdn}"
   441path_prefix: "/extauth"
   442allowed_request_headers:
   443- Kat-Req-Extauth-Requested-Status
   444- Kat-Req-Extauth-Requested-Header
   445"""
   446        )
   447
   448    def queries(self):
   449        yield Query(
   450            self.url("target/"), headers={"Kat-Req-Extuath-Requested-Status": "200"}, expected=200
   451        )
   452
   453    def check(self):
   454        extauth_res = json.loads(self.results[0].headers["Extauth"][0])
   455        assert self.results[0].backend
   456        assert self.results[0].backend.request
   457        request_headers = self.results[0].backend.request.headers
   458
   459        assert self.results[0].status == 200
   460        assert self.results[0].headers["Server"] == ["envoy"]
   461        assert (
   462            extauth_res["request"]["headers"]["x-b3-parentspanid"]
   463            == request_headers["x-b3-parentspanid"]
   464        )
   465        assert extauth_res["request"]["headers"]["x-b3-sampled"] == request_headers["x-b3-sampled"]
   466        assert extauth_res["request"]["headers"]["x-b3-spanid"] == request_headers["x-b3-spanid"]
   467        assert extauth_res["request"]["headers"]["x-b3-traceid"] == request_headers["x-b3-traceid"]
   468        assert extauth_res["request"]["headers"]["x-request-id"] == request_headers["x-request-id"]
   469
   470
   471class TracingTestSampling(AmbassadorTest):
   472    """
   473    Test for the "sampling" in TracingServices
   474    """
   475
   476    def init(self):
   477        self.target = HTTP()
   478        self.zipkin = Zipkin()
   479
   480    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   481        # Use self.target here, because we want this mapping to be annotated
   482        # on the service, not the Ambassador.
   483
   484        yield self.target, self.format(
   485            """
   486---
   487apiVersion: getambassador.io/v3alpha1
   488kind: Mapping
   489name:  tracing_target_mapping_65
   490hostname: "*"
   491prefix: /target-65/
   492service: {self.target.path.fqdn}
   493"""
   494        )
   495
   496        # Configure the TracingService.
   497        yield self, self.format(
   498            """
   499---
   500apiVersion: getambassador.io/v3alpha1
   501kind: TracingService
   502name: tracing-65
   503service: {self.zipkin.path.fqdn}:9411
   504driver: zipkin
   505sampling:
   506  overall: 10
   507"""
   508        )
   509
   510    def queries(self):
   511        # Speak through each Ambassador to the traced service...
   512        for i in range(0, 100):
   513            yield Query(self.url("target-65/"), phase=1, ignore_result=True)
   514
   515        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   516        # is particularly helpful.
   517        yield Query(
   518            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?limit=10000", phase=check_phase
   519        )
   520
   521        # The diagnostics page should load properly
   522        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   523
   524    def check(self):
   525        traces = self.results[100].json
   526
   527        print("%d traces obtained" % len(traces))
   528
   529        # import json
   530        # print(json.dumps(traces, indent=4, sort_keys=True))
   531
   532        # We constantly find that Envoy's RNG isn't exactly predictable with small sample
   533        # sizes, so even though 10% of 100 is 10, we'll make this pass as long as we don't
   534        # go over 50 or under 1.
   535        assert 1 <= len(traces) <= 50
   536
   537
   538class TracingTestZipkinV2(AmbassadorTest):
   539    """
   540    Test for the "collector_endpoint_version" Zipkin config in TracingServices
   541    """
   542
   543    def init(self):
   544        self.target = HTTP()
   545        self.zipkin = Zipkin()
   546
   547    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   548        # Use self.target here, because we want this mapping to be annotated
   549        # on the service, not the Ambassador.
   550        yield self.target, self.format(
   551            """
   552---
   553apiVersion: getambassador.io/v3alpha1
   554kind: Mapping
   555name:  tracing_target_mapping
   556hostname: "*"
   557prefix: /target/
   558service: {self.target.path.fqdn}
   559"""
   560        )
   561
   562        # Configure the TracingService.
   563        yield self, self.format(
   564            """
   565---
   566apiVersion: getambassador.io/v3alpha1
   567kind: TracingService
   568name: tracing
   569service: {self.zipkin.path.fqdn}:9411
   570driver: zipkin
   571config:
   572  collector_endpoint: /api/v2/spans
   573  collector_endpoint_version: HTTP_JSON
   574  collector_hostname: {self.zipkin.path.fqdn}
   575"""
   576        )
   577
   578    def requirements(self):
   579        yield from super().requirements()
   580        yield ("url", Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services"))
   581
   582    def queries(self):
   583        # Speak through each Ambassador to the traced service...
   584
   585        for i in range(100):
   586            yield Query(self.url("target/"), phase=1)
   587
   588        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   589        # is particularly helpful.
   590        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
   591        yield Query(
   592            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv2-default",
   593            phase=check_phase,
   594        )
   595        yield Query(
   596            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv2-default",
   597            phase=check_phase,
   598        )
   599
   600        # The diagnostics page should load properly
   601        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   602
   603    def check(self):
   604        for i in range(100):
   605            result = self.results[i]
   606            assert result.backend
   607            assert result.backend.name == self.target.path.k8s
   608
   609        print(f"self.results[100] = {self.results[100]}")
   610        assert (
   611            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   612        ), f"unexpected self.results[100] = {self.results[100]}"
   613        assert len(self.results[100].backend.response) == 1
   614        assert self.results[100].backend.response[0] == "tracingtestzipkinv2-default"
   615
   616        assert self.results[101].backend
   617        assert self.results[101].backend.name == "raw"
   618
   619        tracelist = set(x for x in self.results[101].backend.response)
   620        print(f"tracelist = {tracelist}")
   621        assert "router tracingtestzipkinv2_http_default_svc_cluster_local egress" in tracelist
   622
   623        # Look for the host that we actually queried, since that's what appears in the spans.
   624        assert self.results[0].backend
   625        assert self.results[0].backend.request
   626        assert self.results[0].backend.request.host in tracelist
   627
   628        # Ensure we generate 128-bit traceids by default
   629        trace = self.results[102].json[0][0]
   630        traceId = trace["traceId"]
   631        assert len(traceId) == 32
   632
   633
   634class TracingTestZipkinV1(AmbassadorTest):
   635    """
   636    Test for the "collector_endpoint_version" Zipkin config in TracingServices
   637    """
   638
   639    def init(self):
   640        self.target = HTTP()
   641        self.zipkin = Zipkin()
   642
   643    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   644        # Use self.target here, because we want this mapping to be annotated
   645        # on the service, not the Ambassador.
   646
   647        yield self.target, self.format(
   648            """
   649---
   650apiVersion: getambassador.io/v3alpha1
   651kind: Mapping
   652name:  tracing_target_mapping
   653hostname: "*"
   654prefix: /target/
   655service: {self.target.path.fqdn}
   656"""
   657        )
   658
   659        # Configure the TracingService.
   660        yield self, self.format(
   661            """
   662---
   663apiVersion: getambassador.io/v3alpha1
   664kind: TracingService
   665name: tracing
   666service: {self.zipkin.path.fqdn}:9411
   667driver: zipkin
   668config:
   669  collector_endpoint: /api/v1/spans
   670  collector_endpoint_version: HTTP_JSON_V1
   671  collector_hostname: {self.zipkin.path.fqdn}
   672"""
   673        )
   674
   675    def queries(self):
   676        # Speak through each Ambassador to the traced service...
   677
   678        for i in range(100):
   679            yield Query(self.url("target/"), phase=1)
   680
   681        # result 100
   682        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
   683        # result 101
   684        yield Query(
   685            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv1-default",
   686            phase=check_phase,
   687        )
   688        # result 102
   689        yield Query(
   690            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv1-default",
   691            phase=check_phase,
   692        )
   693
   694        # The diagnostics page should load properly
   695        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   696
   697    def check(self):
   698        for i in range(100):
   699            result = self.results[i]
   700            assert result.backend
   701            assert result.backend.name == self.target.path.k8s
   702
   703        # verify no services were captured
   704        services = self.results[100].json
   705        assert len(services) == 0
   706
   707        # verify no spans were captured
   708        spans = self.results[101].json
   709        assert len(spans) == 0
   710
   711        # verify no traces were captured
   712        traces = self.results[102].json
   713        assert len(traces) == 0
   714
   715
   716class TracingTestOpentelemetry(AmbassadorTest):
   717    def init(self):
   718        self.target = HTTP()
   719        self.jaeger = Jaeger()
   720
   721    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   722        # Use self.target here, because we want this mapping to be annotated
   723        # on the service, not the Ambassador.
   724
   725        yield self.target, self.format(
   726            """
   727---
   728apiVersion: getambassador.io/v3alpha1
   729kind: Mapping
   730name:  tracing_target_mapping
   731hostname: "*"
   732prefix: /target/
   733service: {self.target.path.fqdn}
   734"""
   735        )
   736
   737        # Configure the TracingService.
   738        yield self, self.format(
   739            """
   740---
   741apiVersion: getambassador.io/v3alpha1
   742kind: TracingService
   743name: tracing
   744service: {self.jaeger.path.fqdn}:4317
   745driver: opentelemetry
   746custom_tags:
   747  - tag: ltag
   748    literal:
   749      value: lvalue
   750  - tag: htag
   751    request_header:
   752      name: x-something
   753      default_value: hfallback
   754"""
   755        )
   756
   757    def queries(self):
   758        # Speak through each Ambassador to the traced service...
   759
   760        for i in range(20):
   761            yield Query(
   762                self.url("target/"),
   763                headers={"x-watsup": "nothin", "x-something": "something"},
   764                phase=1,
   765            )
   766
   767        # query index-20: ask Jaeger for services
   768        yield Query(f"http://{self.jaeger.path.fqdn}:16686/api/services", phase=check_phase)
   769
   770        # query index-21: ask for envoy traces for ambassador service
   771        # since the check_readiness also creates spans we need to pull more than 20 to ensure
   772        # we capture all
   773        yield Query(
   774            f"http://{self.jaeger.path.fqdn}:16686/api/traces?service=ambassador&limit=100",
   775            phase=check_phase,
   776        )
   777
   778    def check(self):
   779        for i in range(20):
   780            result = self.results[i]
   781            assert result.backend
   782            assert result.backend.name == self.target.path.k8s
   783
   784        # verify "ambassador" is the list of services from jaeger
   785        print(f"self.results[20] = {self.results[20]}")
   786        assert (
   787            self.results[20].json is not None and "ambassador" in self.results[20].json["data"]
   788        ), f"unexpected self.results[20] = {self.results[20]}"
   789
   790        # verify traces for /target egress and its route
   791        upstream_tracelist = self.results[21].json["data"]
   792
   793        for trace in upstream_tracelist:
   794            spans = trace.get("spans", [])
   795
   796            for span in spans:
   797                # Check if the egress span contains expected tags.
   798                # For some reason the router span isn't resolving the htag request_header,
   799                # and it's being set to hfallback. Leaving it out of scope for this test.
   800                # this may be due to experimental nature of otel driver
   801                isEgress = (
   802                    span["operationName"]
   803                    == "egress tracingtestopentelemetry.default.svc.cluster.local"
   804                )
   805
   806                isTargetPath = any(
   807                    t
   808                    for t in span.get("tags", [])
   809                    if t["key"] == "http.url" and "/target" in t["value"]
   810                )
   811
   812                if isEgress and isTargetPath:
   813                    tags = {x["key"]: x["value"] for x in span.get("tags", [])}
   814                    assert "ltag" in tags, tags
   815                    assert tags["ltag"] == "lvalue", tags
   816                    assert "htag" in tags, tags
   817                    assert tags["htag"] == "something", tags

View as plain text