...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/t_tracing.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1import json
     2from random import random
     3from typing import ClassVar, Generator, Tuple, Union
     4
     5from abstract_tests import AHTTP, HTTP, AmbassadorTest, Node, ServiceType
     6from kat.harness import EDGE_STACK, Query
     7
     8# The phase that we should wait until before performing test checks. Normally
     9# this would be phase 2, which is 10 seconds after the first wave of queries,
    10# but we increase it to phase 3 here to make sure that Zipkin and other tracers
    11# have _plenty_ of time to receive traces from Envoy and index them for retrieval
    12# through the API. We've seen this test flake when the check is performed in phase
    13# 2, so the hope is that phase 3 reduces the likelihood of the test flaking again.
    14check_phase = 3
    15
    16
    17class Zipkin(ServiceType):
    18    skip_variant: ClassVar[bool] = True
    19
    20    def __init__(self, *args, **kwargs) -> None:
    21        # We want to reset Zipkin between test runs.  StatsD has a handy "reset" call that can do
    22        # this... but the only way to reset Zipkin is to roll over the Pod.  So, 'nonce' is a
    23        # horrible hack to get the Pod to roll over each invocation.
    24        self.nonce = random()
    25        kwargs[
    26            "service_manifests"
    27        ] = """
    28---
    29apiVersion: v1
    30kind: Service
    31metadata:
    32  name: {self.path.k8s}
    33spec:
    34  selector:
    35    backend: {self.path.k8s}
    36  ports:
    37  - port: 9411
    38    name: http
    39    targetPort: http
    40  type: ClusterIP
    41---
    42apiVersion: apps/v1
    43kind: Deployment
    44metadata:
    45  name: {self.path.k8s}
    46spec:
    47  selector:
    48    matchLabels:
    49      backend: {self.path.k8s}
    50  replicas: 1
    51  strategy:
    52    type: Recreate # rolling would be bad with the nonce hack
    53  template:
    54    metadata:
    55      labels:
    56        backend: {self.path.k8s}
    57    spec:
    58      containers:
    59      - name: zipkin
    60        image: openzipkin/zipkin:2.17
    61        ports:
    62        - name: http
    63          containerPort: 9411
    64        env:
    65        - name: _nonce
    66          value: '{self.nonce}'
    67"""
    68        super().__init__(*args, **kwargs)
    69
    70    def requirements(self):
    71        yield ("url", Query(f"http://{self.path.fqdn}:9411/api/v2/services"))
    72
    73
    74class TracingTest(AmbassadorTest):
    75    def init(self):
    76        self.target = HTTP()
    77        self.zipkin = Zipkin()
    78
    79    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    80        # Use self.target here, because we want this mapping to be annotated
    81        # on the service, not the Ambassador.
    82
    83        yield self.target, self.format(
    84            """
    85---
    86apiVersion: getambassador.io/v3alpha1
    87kind: Mapping
    88name:  tracing_target_mapping
    89hostname: "*"
    90prefix: /target/
    91service: {self.target.path.fqdn}
    92"""
    93        )
    94
    95        # Configure the TracingService.
    96        yield self, self.format(
    97            """
    98---
    99apiVersion: getambassador.io/v3alpha1
   100kind: TracingService
   101name: tracing
   102service: {self.zipkin.path.fqdn}:9411
   103driver: zipkin
   104tag_headers:
   105  - "x-watsup"
   106"""
   107        )
   108
   109    def queries(self):
   110        # Speak through each Ambassador to the traced service...
   111
   112        for i in range(100):
   113            yield Query(self.url("target/"), headers={"x-watsup": "nothin"}, phase=1)
   114
   115        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   116        # is particularly helpful.
   117        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
   118        yield Query(
   119            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtest-default",
   120            phase=check_phase,
   121        )
   122        yield Query(
   123            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtest-default",
   124            phase=check_phase,
   125        )
   126
   127        # The diagnostics page should load properly
   128        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   129
   130    def check(self):
   131        for i in range(100):
   132            assert self.results[i].backend.name == self.target.path.k8s
   133
   134        print(f"self.results[100] = {self.results[100]}")
   135        assert (
   136            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   137        ), f"unexpected self.results[100] = {self.results[100]}"
   138        assert len(self.results[100].backend.response) == 1
   139        assert self.results[100].backend.response[0] == "tracingtest-default"
   140
   141        assert self.results[101].backend.name == "raw"
   142
   143        tracelist = set(x for x in self.results[101].backend.response)
   144        print(f"tracelist = {tracelist}")
   145        assert (
   146            "router cluster_tracingtest_http_default_svc_cluster_local_default egress" in tracelist
   147        )
   148
   149        # Look for the host that we actually queried, since that's what appears in the spans.
   150        assert self.results[0].backend.request.host in tracelist
   151
   152        # Ensure we generate 128-bit traceids by default
   153        trace = self.results[102].json[0][0]
   154        traceId = trace["traceId"]
   155        assert len(traceId) == 32
   156        for t in self.results[102].json[0]:
   157            if t.get("tags", {}).get("node_id") == "test-id":
   158                assert "x-watsup" in t["tags"]
   159                assert t["tags"]["x-watsup"] == "nothin"
   160
   161
   162class TracingTestLongClusterName(AmbassadorTest):
   163    def init(self):
   164        self.target = HTTP()
   165        # The full name ends up being `{testname}-{zipkin}-{name}`; so the name we pass in doesn't
   166        # need to be as long as you'd think.  Down in check() we'll do some asserts on
   167        # self.zipkin.path.fqdn to make sure we got the desired length correct (we can't do those
   168        # checks here because .path isn't populated yet during init()).
   169        self.zipkin = Zipkin(name="longnamethatforcescompression")
   170
   171    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   172        # Use self.target here, because we want this mapping to be annotated
   173        # on the service, not the Ambassador.
   174
   175        yield self.target, self.format(
   176            """
   177---
   178apiVersion: getambassador.io/v3alpha1
   179kind: Mapping
   180name:  tracing_target_mapping_longclustername
   181hostname: "*"
   182prefix: /target/
   183service: {self.target.path.fqdn}
   184"""
   185        )
   186
   187        # Configure the TracingService.
   188        yield self, self.format(
   189            """
   190---
   191apiVersion: getambassador.io/v3alpha1
   192kind: TracingService
   193name: tracing-longclustername
   194service: {self.zipkin.path.fqdn}:9411
   195driver: zipkin
   196"""
   197        )
   198
   199    def queries(self):
   200        # Speak through each Ambassador to the traced service...
   201
   202        for i in range(100):
   203            yield Query(self.url("target/"), phase=1)
   204
   205        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   206        # is particularly helpful.
   207        yield Query(
   208            f"http://{self.zipkin.path.fqdn}:9411/api/v2/services",
   209            phase=check_phase,
   210        )
   211        yield Query(
   212            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestlongclustername-default",
   213            phase=check_phase,
   214        )
   215        yield Query(
   216            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestlongclustername-default",
   217            phase=check_phase,
   218        )
   219
   220        # The diagnostics page should load properly, even though our Tracing Service
   221        # has a long cluster name https://github.com/datawire/ambassador/issues/3021
   222        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   223
   224    def check(self):
   225        assert len(self.zipkin.path.fqdn.split(".")[0]) > 60
   226        assert len(self.zipkin.path.fqdn.split(".")[0]) < 64
   227
   228        for i in range(100):
   229            assert self.results[i].backend.name == self.target.path.k8s
   230
   231        print(f"self.results[100] = {self.results[100]}")
   232        assert (
   233            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   234        ), f"unexpected self.results[100] = {self.results[100]}"
   235        assert len(self.results[100].backend.response) == 1
   236        assert self.results[100].backend.response[0] == "tracingtestlongclustername-default"
   237
   238        assert self.results[101].backend.name == "raw"
   239
   240        tracelist = set(x for x in self.results[101].backend.response)
   241        print(f"tracelist = {tracelist}")
   242        assert "router cluster_tracingtestlongclustername_http_-0 egress" in tracelist
   243
   244        # Look for the host that we actually queried, since that's what appears in the spans.
   245        assert self.results[0].backend.request.host in tracelist
   246
   247        # Ensure we generate 128-bit traceids by default
   248        trace = self.results[102].json[0][0]
   249        traceId = trace["traceId"]
   250        assert len(traceId) == 32
   251
   252
   253class TracingTestShortTraceId(AmbassadorTest):
   254    def init(self):
   255        self.target = HTTP()
   256        self.zipkin = Zipkin()
   257
   258    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   259        # Use self.target here, because we want this mapping to be annotated
   260        # on the service, not the Ambassador.
   261
   262        yield self.target, self.format(
   263            """
   264---
   265apiVersion: getambassador.io/v3alpha1
   266kind: Mapping
   267name:  tracing_target_mapping_64
   268hostname: "*"
   269prefix: /target-64/
   270service: {self.target.path.fqdn}
   271"""
   272        )
   273
   274        # Configure the TracingService.
   275        yield self, self.format(
   276            """
   277---
   278apiVersion: getambassador.io/v3alpha1
   279kind: TracingService
   280name: tracing-64
   281service: {self.zipkin.path.fqdn}:9411
   282driver: zipkin
   283config:
   284  trace_id_128bit: false
   285"""
   286        )
   287
   288    def queries(self):
   289        # Speak through each Ambassador to the traced service...
   290        yield Query(self.url("target-64/"), phase=1)
   291
   292        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   293        # is particularly helpful.
   294        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces", phase=check_phase)
   295
   296        # The diagnostics page should load properly
   297        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   298
   299    def check(self):
   300        # Ensure we generated 64-bit traceids
   301        trace = self.results[1].json[0][0]
   302        traceId = trace["traceId"]
   303        assert len(traceId) == 16
   304
   305
   306# This test asserts that the external authorization server receives the proper tracing
   307# headers when Ambassador is configured with an HTTP AuthService.
   308class TracingExternalAuthTest(AmbassadorTest):
   309    def init(self):
   310        if EDGE_STACK:
   311            self.xfail = "XFailing for now, custom AuthServices not supported in Edge Stack"
   312        self.target = HTTP()
   313        self.auth = AHTTP(name="auth")
   314        self.zipkin = Zipkin()
   315
   316    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   317        yield self.target, self.format(
   318            """
   319---
   320apiVersion: getambassador.io/v3alpha1
   321kind: Mapping
   322name:  tracing_target_mapping
   323hostname: "*"
   324prefix: /target/
   325service: {self.target.path.fqdn}
   326"""
   327        )
   328
   329        yield self, self.format(
   330            """
   331---
   332apiVersion: getambassador.io/v3alpha1
   333kind: TracingService
   334name: tracing-auth
   335service: {self.zipkin.path.k8s}:9411
   336driver: zipkin
   337"""
   338        )
   339
   340        yield self, self.format(
   341            """
   342---
   343apiVersion: getambassador.io/v3alpha1
   344kind: AuthService
   345name:  {self.auth.path.k8s}
   346auth_service: "{self.auth.path.fqdn}"
   347path_prefix: "/extauth"
   348allowed_request_headers:
   349- Requested-Status
   350- Requested-Header
   351"""
   352        )
   353
   354    def queries(self):
   355        yield Query(self.url("target/"), headers={"Requested-Status": "200"}, expected=200)
   356
   357    def check(self):
   358        extauth_res = json.loads(self.results[0].headers["Extauth"][0])
   359        request_headers = self.results[0].backend.request.headers
   360
   361        assert self.results[0].status == 200
   362        assert self.results[0].headers["Server"] == ["envoy"]
   363        assert (
   364            extauth_res["request"]["headers"]["x-b3-parentspanid"]
   365            == request_headers["x-b3-parentspanid"]
   366        )
   367        assert extauth_res["request"]["headers"]["x-b3-sampled"] == request_headers["x-b3-sampled"]
   368        assert extauth_res["request"]["headers"]["x-b3-spanid"] == request_headers["x-b3-spanid"]
   369        assert extauth_res["request"]["headers"]["x-b3-traceid"] == request_headers["x-b3-traceid"]
   370        assert extauth_res["request"]["headers"]["x-request-id"] == request_headers["x-request-id"]
   371
   372
   373class TracingTestSampling(AmbassadorTest):
   374    """
   375    Test for the "sampling" in TracingServices
   376    """
   377
   378    def init(self):
   379        self.target = HTTP()
   380        self.zipkin = Zipkin()
   381
   382    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   383        # Use self.target here, because we want this mapping to be annotated
   384        # on the service, not the Ambassador.
   385
   386        yield self.target, self.format(
   387            """
   388---
   389apiVersion: getambassador.io/v3alpha1
   390kind: Mapping
   391name:  tracing_target_mapping_65
   392hostname: "*"
   393prefix: /target-65/
   394service: {self.target.path.fqdn}
   395"""
   396        )
   397
   398        # Configure the TracingService.
   399        yield self, self.format(
   400            """
   401---
   402apiVersion: getambassador.io/v3alpha1
   403kind: TracingService
   404name: tracing-65
   405service: {self.zipkin.path.fqdn}:9411
   406driver: zipkin
   407sampling:
   408  overall: 10
   409"""
   410        )
   411
   412    def queries(self):
   413        # Speak through each Ambassador to the traced service...
   414        for i in range(0, 100):
   415            yield Query(self.url("target-65/"), phase=1, ignore_result=True)
   416
   417        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   418        # is particularly helpful.
   419        yield Query(
   420            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?limit=10000", phase=check_phase
   421        )
   422
   423        # The diagnostics page should load properly
   424        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   425
   426    def check(self):
   427        traces = self.results[100].json
   428
   429        print("%d traces obtained" % len(traces))
   430
   431        # import json
   432        # print(json.dumps(traces, indent=4, sort_keys=True))
   433
   434        # We constantly find that Envoy's RNG isn't exactly predictable with small sample
   435        # sizes, so even though 10% of 100 is 10, we'll make this pass as long as we don't
   436        # go over 50 or under 1.
   437        assert 1 <= len(traces) <= 50
   438
   439
   440class TracingTestZipkinV2(AmbassadorTest):
   441    """
   442    Test for the "collector_endpoint_version" Zipkin config in TracingServices
   443    """
   444
   445    def init(self):
   446        self.target = HTTP()
   447        self.zipkin = Zipkin()
   448
   449    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   450        # Use self.target here, because we want this mapping to be annotated
   451        # on the service, not the Ambassador.
   452        yield self.target, self.format(
   453            """
   454---
   455apiVersion: getambassador.io/v3alpha1
   456kind: Mapping
   457name:  tracing_target_mapping
   458hostname: "*"
   459prefix: /target/
   460service: {self.target.path.fqdn}
   461"""
   462        )
   463
   464        # Configure the TracingService.
   465        yield self, self.format(
   466            """
   467---
   468apiVersion: getambassador.io/v3alpha1
   469kind: TracingService
   470name: tracing
   471service: {self.zipkin.path.fqdn}:9411
   472driver: zipkin
   473config:
   474  collector_endpoint: /api/v2/spans
   475  collector_endpoint_version: HTTP_JSON
   476  collector_hostname: {self.zipkin.path.fqdn}
   477"""
   478        )
   479
   480    def requirements(self):
   481        yield from super().requirements()
   482        yield ("url", Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services"))
   483
   484    def queries(self):
   485        # Speak through each Ambassador to the traced service...
   486
   487        for i in range(100):
   488            yield Query(self.url("target/"), phase=1)
   489
   490        # ...then ask the Zipkin for services and spans. Including debug=True in these queries
   491        # is particularly helpful.
   492        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
   493        yield Query(
   494            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv2-default",
   495            phase=check_phase,
   496        )
   497        yield Query(
   498            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv2-default",
   499            phase=check_phase,
   500        )
   501
   502        # The diagnostics page should load properly
   503        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   504
   505    def check(self):
   506        for i in range(100):
   507            assert self.results[i].backend.name == self.target.path.k8s
   508
   509        print(f"self.results[100] = {self.results[100]}")
   510        assert (
   511            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   512        ), f"unexpected self.results[100] = {self.results[100]}"
   513        assert len(self.results[100].backend.response) == 1
   514        assert self.results[100].backend.response[0] == "tracingtestzipkinv2-default"
   515
   516        assert self.results[101].backend.name == "raw"
   517
   518        tracelist = set(x for x in self.results[101].backend.response)
   519        print(f"tracelist = {tracelist}")
   520        assert "router cluster_tracingtestzipkinv2_http_default-0 egress" in tracelist
   521
   522        # Look for the host that we actually queried, since that's what appears in the spans.
   523        assert self.results[0].backend.request.host in tracelist
   524
   525        # Ensure we generate 128-bit traceids by default
   526        trace = self.results[102].json[0][0]
   527        traceId = trace["traceId"]
   528        assert len(traceId) == 32
   529
   530
   531class TracingTestZipkinV1(AmbassadorTest):
   532    """
   533    Test for the "collector_endpoint_version" Zipkin config in TracingServices
   534    """
   535
   536    def init(self):
   537        self.target = HTTP()
   538        self.zipkin = Zipkin()
   539
   540    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   541        # Use self.target here, because we want this mapping to be annotated
   542        # on the service, not the Ambassador.
   543
   544        yield self.target, self.format(
   545            """
   546---
   547apiVersion: getambassador.io/v3alpha1
   548kind: Mapping
   549name:  tracing_target_mapping
   550hostname: "*"
   551prefix: /target/
   552service: {self.target.path.fqdn}
   553"""
   554        )
   555
   556        # Configure the TracingService.
   557        yield self, self.format(
   558            """
   559---
   560apiVersion: getambassador.io/v3alpha1
   561kind: TracingService
   562name: tracing
   563service: {self.zipkin.path.fqdn}:9411
   564driver: zipkin
   565config:
   566  collector_endpoint: /api/v1/spans
   567  collector_endpoint_version: HTTP_JSON_V1
   568  collector_hostname: {self.zipkin.path.fqdn}
   569"""
   570        )
   571
   572    def queries(self):
   573        # Speak through each Ambassador to the traced service...
   574
   575        for i in range(100):
   576            yield Query(self.url("target/"), phase=1)
   577
   578        # result 100
   579        yield Query(f"http://{self.zipkin.path.fqdn}:9411/api/v2/services", phase=check_phase)
   580        # result 101
   581        yield Query(
   582            f"http://{self.zipkin.path.fqdn}:9411/api/v2/spans?serviceName=tracingtestzipkinv1-default",
   583            phase=check_phase,
   584        )
   585        yield Query(
   586            f"http://{self.zipkin.path.fqdn}:9411/api/v2/traces?serviceName=tracingtestzipkinv1-default",
   587            phase=check_phase,
   588        )
   589
   590        # The diagnostics page should load properly
   591        yield Query(self.url("ambassador/v0/diag/"), phase=check_phase)
   592
   593    def check(self):
   594        for i in range(100):
   595            assert self.results[i].backend.name == self.target.path.k8s
   596
   597        print(f"self.results[100] = {self.results[100]}")
   598        assert (
   599            self.results[100].backend is not None and self.results[100].backend.name == "raw"
   600        ), f"unexpected self.results[100] = {self.results[100]}"
   601        assert len(self.results[100].backend.response) == 1
   602        assert self.results[100].backend.response[0] == "tracingtestzipkinv1-default"
   603
   604        assert self.results[101].backend.name == "raw"
   605
   606        tracelist = set(x for x in self.results[101].backend.response)
   607        print(f"tracelist = {tracelist}")
   608
   609        assert "router cluster_tracingtestzipkinv1_http_default-0 egress" in tracelist
   610
   611        # Look for the host that we actually queried, since that's what appears in the spans.
   612        assert self.results[0].backend.request.host in tracelist
   613
   614        # Ensure we generate 128-bit traceids by default
   615        trace = self.results[102].json[0][0]
   616        traceId = trace["traceId"]
   617        assert len(traceId) == 32

View as plain text