...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/t_circuitbreaker.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3import pytest
     4
     5from abstract_tests import HTTP, AmbassadorTest, Node, ServiceType, StatsDSink
     6from kat.harness import Query
     7
     8
     9class CircuitBreakingTest(AmbassadorTest):
    10    target: ServiceType
    11    statsd: ServiceType
    12
    13    TARGET_CLUSTER = "cluster_circuitbreakingtest_http_cbdc1p1"
    14
    15    def init(self):
    16        self.target = HTTP()
    17        self.statsd = StatsDSink(target_cluster=self.TARGET_CLUSTER)
    18
    19    def manifests(self) -> str:
    20        self.manifest_envs += f"""
    21    - name: STATSD_ENABLED
    22      value: 'true'
    23    - name: STATSD_HOST
    24      value: '{self.statsd.path.fqdn}'
    25"""
    26        return super().manifests()
    27
    28    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    29        yield self, self.format(
    30            """
    31---
    32apiVersion: getambassador.io/v3alpha1
    33kind: Mapping
    34name:  {self.target.path.k8s}-pr
    35hostname: "*"
    36prefix: /{self.name}-pr/
    37service: {self.target.path.fqdn}
    38circuit_breakers:
    39- priority: default
    40  max_pending_requests: 1
    41  max_connections: 1
    42---
    43apiVersion: getambassador.io/v3alpha1
    44kind: Mapping
    45name: {self.name}-reset
    46case_sensitive: false
    47hostname: "*"
    48prefix: /reset/
    49rewrite: /RESET/
    50service: {self.statsd.path.fqdn}
    51---
    52apiVersion: getambassador.io/v3alpha1
    53kind: Mapping
    54name: {self.name}-dump
    55case_sensitive: false
    56hostname: "*"
    57prefix: /dump/
    58rewrite: /DUMP/
    59service: {self.statsd.path.fqdn}
    60"""
    61        )
    62
    63    def requirements(self):
    64        yield from super().requirements()
    65        yield ("url", Query(self.url(self.name) + "-pr/"))
    66        yield ("url", Query(self.url("RESET/")))
    67
    68    def queries(self):
    69        # Reset the statsd setup in phase 1...
    70        yield Query(self.url("RESET/"), phase=1)
    71
    72        # Run 200 queries in phase 2, after the reset...
    73        for i in range(200):
    74            yield Query(
    75                self.url(self.name) + "-pr/",
    76                headers={"Requested-Backend-Delay": "1000"},
    77                ignore_result=True,
    78                phase=2,
    79            )
    80
    81        # ...then 200 more queries in phase 3. Why the split? Because we get flakes if we
    82        # try to ram 500 through at once (in the middle of the run, we get some connections
    83        # that time out).
    84
    85        for i in range(200):
    86            yield Query(
    87                self.url(self.name) + "-pr/",
    88                headers={"Requested-Backend-Delay": "1000"},
    89                ignore_result=True,
    90                phase=3,
    91            )
    92
    93        # Dump the results in phase 4, after the queries.
    94        yield Query(self.url("DUMP/"), phase=4)
    95
    96    def check(self):
    97        result_count = len(self.results)
    98
    99        failures = []
   100
   101        if result_count != 402:
   102            failures.append(f"wanted 402 results, got {result_count}")
   103        else:
   104            pending_results = self.results[1:400]
   105            stats = self.results[401].json or {}
   106
   107            # pending requests tests
   108            pending_overloaded = 0
   109            error = 0
   110
   111            # printed = False
   112
   113            for result in pending_results:
   114                # if not printed:
   115                #     import json
   116                #     print(json.dumps(result.as_dict(), sort_keys=True, indent=2))
   117                #     printed = True
   118
   119                if result.error:
   120                    error += 1
   121                elif "X-Envoy-Overloaded" in result.headers:
   122                    pending_overloaded += 1
   123
   124            failed = False
   125
   126            if not 300 < pending_overloaded < 400:
   127                failures.append(
   128                    f"Expected between 300 and 400 overloaded, got {pending_overloaded}"
   129                )
   130
   131            cluster_stats = stats.get(self.__class__.TARGET_CLUSTER, {})
   132            rq_completed = cluster_stats.get("upstream_rq_completed", -1)
   133            rq_pending_overflow = cluster_stats.get("upstream_rq_pending_overflow", -1)
   134
   135            if error != 0:
   136                failures.append(f"Expected no errors but got {error}")
   137
   138            if rq_completed != 400:
   139                failures.append(
   140                    f"Expected 400 completed requests to {self.__class__.TARGET_CLUSTER}, got {rq_completed}"
   141                )
   142
   143            if abs(pending_overloaded - rq_pending_overflow) >= 2:
   144                failures.append(
   145                    f"Expected {pending_overloaded} rq_pending_overflow, got {rq_pending_overflow}"
   146                )
   147
   148        if failures:
   149            print("%s FAILED:\n  %s" % (self.name, "\n  ".join(failures)))
   150            pytest.xfail(f"FFS {self.name}")
   151
   152
   153class GlobalCircuitBreakingTest(AmbassadorTest):
   154    target: ServiceType
   155
   156    def init(self):
   157        self.target = HTTP()
   158
   159    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   160        yield self, self.format(
   161            """
   162---
   163apiVersion: getambassador.io/v3alpha1
   164kind: Host
   165name: cleartext-host
   166port: 8080
   167protocol: HTTP
   168securityModel: INSECURE
   169requestPolicy:
   170  insecure:
   171    action: Route
   172---
   173apiVersion: getambassador.io/v3alpha1
   174kind: Mapping
   175name:  {self.target.path.k8s}-pr
   176hostname: "*"
   177prefix: /{self.name}-pr/
   178service: {self.target.path.fqdn}
   179circuit_breakers:
   180- priority: default
   181  max_pending_requests: 1024
   182  max_connections: 1024
   183---
   184apiVersion: getambassador.io/v3alpha1
   185kind: Mapping
   186name:  {self.target.path.k8s}-normal
   187hostname: "*"
   188prefix: /{self.name}-normal/
   189service: {self.target.path.fqdn}
   190---
   191apiVersion: getambassador.io/v3alpha1
   192kind:  Module
   193name:  ambassador
   194config:
   195  circuit_breakers:
   196  - priority: default
   197    max_pending_requests: 5
   198    max_connections: 5
   199"""
   200        )
   201
   202    def requirements(self):
   203        yield from super().requirements()
   204        yield ("url", Query(self.url(self.name) + "-pr/"))
   205        yield ("url", Query(self.url(self.name) + "-normal/"))
   206
   207    def queries(self):
   208        for i in range(200):
   209            yield Query(
   210                self.url(self.name) + "-pr/",
   211                headers={"Requested-Backend-Delay": "1000"},
   212                ignore_result=True,
   213                phase=1,
   214            )
   215        for i in range(200):
   216            yield Query(
   217                self.url(self.name) + "-normal/",
   218                headers={"Requested-Backend-Delay": "1000"},
   219                ignore_result=True,
   220                phase=1,
   221            )
   222
   223    def check(self):
   224        failures = []
   225
   226        if len(self.results) != 400:
   227            failures.append(f"wanted 400 results, got {len(self.results)}")
   228        else:
   229            cb_mapping_results = self.results[0:200]
   230            normal_mapping_results = self.results[200:400]
   231
   232            # '-pr' mapping tests: this is a priority class of connection
   233            pr_mapping_overloaded = 0
   234
   235            for result in cb_mapping_results:
   236                if "X-Envoy-Overloaded" in result.headers:
   237                    pr_mapping_overloaded += 1
   238
   239            if pr_mapping_overloaded != 0:
   240                failures.append(f"[GCR] expected no -pr overloaded, got {pr_mapping_overloaded}")
   241
   242            # '-normal' mapping tests: global configuration should be in effect
   243            normal_overloaded = 0
   244            # printed = False
   245
   246            for result in normal_mapping_results:
   247                # if not printed:
   248                #     import json
   249                #     print(json.dumps(result.as_dict(), sort_keys=True, indent=2))
   250                #     printed = True
   251
   252                if "X-Envoy-Overloaded" in result.headers:
   253                    normal_overloaded += 1
   254
   255            if not 100 < normal_overloaded < 200:
   256                failures.append(
   257                    f"[GCF] expected 100-200 normal_overloaded, got {normal_overloaded}"
   258                )
   259
   260        if failures:
   261            print("%s FAILED:\n  %s" % (self.name, "\n  ".join(failures)))
   262            pytest.xfail(f"FFS {self.name}")
   263
   264
   265class CircuitBreakingTCPTest(AmbassadorTest):
   266    extra_ports = [6789, 6790]
   267
   268    target1: ServiceType
   269    target2: ServiceType
   270
   271    def init(self):
   272        self.target1 = HTTP(name="target1")
   273        self.target2 = HTTP(name="target2")
   274
   275    # config() must _yield_ tuples of Node, Ambassador-YAML where the
   276    # Ambassador-YAML will be annotated onto the Node.
   277
   278    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
   279        yield self.target1, self.format(
   280            """
   281---
   282apiVersion: getambassador.io/v3alpha1
   283kind: TCPMapping
   284name:  {self.name}-1
   285port: 6789
   286service: {self.target1.path.fqdn}:80
   287"""
   288        )
   289        yield self.target2, self.format(
   290            """
   291---
   292apiVersion: getambassador.io/v3alpha1
   293kind: TCPMapping
   294name:  {self.name}-2
   295port: 6790
   296service: {self.target2.path.fqdn}:80
   297circuit_breakers:
   298- priority: default
   299  max_pending_requests: 1
   300  max_connections: 1
   301"""
   302        )
   303
   304    def queries(self):
   305        for i in range(200):
   306            yield Query(
   307                self.url(self.name, port=6789),
   308                headers={"Requested-Backend-Delay": "1000"},
   309                ignore_result=True,
   310                phase=1,
   311            )
   312        for i in range(200):
   313            yield Query(
   314                self.url(self.name, port=6790),
   315                headers={"Requested-Backend-Delay": "1000"},
   316                ignore_result=True,
   317                phase=1,
   318            )
   319
   320    def check(self):
   321        failures = []
   322
   323        if len(self.results) != 400:
   324            failures.append(f"wanted 400 results, got {len(self.results)}")
   325        else:
   326            default_limit_result = self.results[0:200]
   327            low_limit_results = self.results[200:400]
   328
   329            default_limit_failure = 0
   330
   331            for result in default_limit_result:
   332                if result.error:
   333                    default_limit_failure += 1
   334
   335            if default_limit_failure != 0:
   336                failures.append(
   337                    f"expected no failure with default limit, got {default_limit_failure}"
   338                )
   339
   340            low_limit_failure = 0
   341
   342            for result in low_limit_results:
   343                if result.error:
   344                    low_limit_failure += 1
   345
   346            if not 100 < low_limit_failure < 200:
   347                failures.append(f"expected 100-200 failure with low limit, got {low_limit_failure}")
   348
   349        if failures:
   350            print("%s FAILED:\n  %s" % (self.name, "\n  ".join(failures)))
   351            pytest.xfail(f"FFS {self.name}")

View as plain text