...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/t_grpc_web.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
     4from kat.harness import Query
     5
     6
     7class AcceptanceGrpcWebTest(AmbassadorTest):
     8
     9    target: ServiceType
    10
    11    def init(self):
    12        self.target = EGRPC()
    13
    14    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    15        yield self, self.format(
    16            """
    17---
    18apiVersion: getambassador.io/v3alpha1
    19kind:  Module
    20name:  ambassador
    21config:
    22    enable_grpc_web: True
    23"""
    24        )
    25
    26        yield self, self.format(
    27            """
    28---
    29apiVersion: getambassador.io/v3alpha1
    30kind: Mapping
    31grpc: True
    32hostname: "*"
    33prefix: /echo.EchoService/
    34rewrite: /echo.EchoService/
    35name:  {self.target.path.k8s}
    36service: {self.target.path.k8s}
    37"""
    38        )
    39
    40    def queries(self):
    41        # [0]
    42        yield Query(
    43            self.url("echo.EchoService/Echo"),
    44            headers={
    45                "content-type": "application/grpc-web-text",
    46                "accept": "application/grpc-web-text",
    47                "requested-status": "0",
    48            },
    49            expected=200,
    50            grpc_type="web",
    51        )
    52
    53        # [1]
    54        yield Query(
    55            self.url("echo.EchoService/Echo"),
    56            headers={
    57                "content-type": "application/grpc-web-text",
    58                "accept": "application/grpc-web-text",
    59                "requested-status": "7",
    60            },
    61            expected=200,
    62            grpc_type="web",
    63        )
    64
    65    def check(self):
    66        # print('AcceptanceGrpcWebTest results:')
    67        #
    68        # i = 0
    69        #
    70        # for result in self.results:
    71        #     print(f'{i}: {json.dumps(result.as_dict(), sort_keys=True, indent=4)}')
    72        #     print(f'    headers {json.dumps(result.headers, sort_keys=True)}')
    73        #     print(f'    grpc-status {json.dumps(result.headers.get("Grpc-Status", "-none-"), sort_keys=True)}')
    74        #     i += 1
    75
    76        # [0]
    77        gstat = self.results[0].headers.get("Grpc-Status", "-none-")
    78        # print(f'    grpc-status {gstat}')
    79
    80        if gstat == ["0"]:
    81            assert True
    82        else:
    83            assert False, f'0: got {gstat} instead of ["0"]'
    84
    85        # # [0]
    86        # assert self.results[0].headers["Grpc-Status"] == ["0"]
    87
    88        # [1]
    89        assert self.results[1].headers["Grpc-Status"] == ["7"]

View as plain text