...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/t_grpc.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
     4from kat.harness import Query
     5
     6
     7class AcceptanceGrpcTest(AmbassadorTest):
     8    target: ServiceType
     9
    10    def init(self):
    11        self.target = EGRPC()
    12
    13    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    14        #         yield self, self.format("""
    15        # ---
    16        # apiVersion: getambassador.io/v3alpha1
    17        # kind:  Module
    18        # name:  ambassador
    19        # # """)
    20
    21        yield self, self.format(
    22            """
    23---
    24apiVersion: getambassador.io/v3alpha1
    25kind: Mapping
    26grpc: True
    27hostname: "*"
    28prefix: /echo.EchoService/
    29rewrite: /echo.EchoService/
    30name:  {self.target.path.k8s}
    31service: {self.target.path.k8s}
    32"""
    33        )
    34
    35    def queries(self):
    36        # [0]
    37        yield Query(
    38            self.url("echo.EchoService/Echo"),
    39            headers={"content-type": "application/grpc", "requested-status": "0"},
    40            expected=200,
    41            grpc_type="real",
    42        )
    43
    44        # [1]
    45        yield Query(
    46            self.url("echo.EchoService/Echo"),
    47            headers={"content-type": "application/grpc", "requested-status": "7"},
    48            expected=200,
    49            grpc_type="real",
    50        )
    51
    52        # [2] -- PHASE 2
    53        yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
    54
    55    def check(self):
    56        # [0]
    57        assert self.results[0].headers["Grpc-Status"] == [
    58            "0"
    59        ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
    60
    61        # [1]
    62        assert self.results[1].headers["Grpc-Status"] == [
    63            "7"
    64        ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
    65
    66        # [2]
    67        # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
    68        errors = self.results[2].json
    69        assert len(errors) == 0
    70
    71
    72class EndpointGrpcTest(AmbassadorTest):
    73    target: ServiceType
    74
    75    def init(self):
    76        self.target = EGRPC()
    77
    78    def manifests(self) -> str:
    79        return (
    80            self.format(
    81                """
    82---
    83apiVersion: getambassador.io/v3alpha1
    84kind: KubernetesEndpointResolver
    85metadata:
    86    name: my-endpoint
    87spec:
    88    ambassador_id: ["endpointgrpctest"]
    89"""
    90            )
    91            + super().manifests()
    92        )
    93
    94    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    95        yield self, self.format(
    96            """
    97---
    98apiVersion: getambassador.io/v3alpha1
    99kind: Mapping
   100grpc: True
   101hostname: "*"
   102prefix: /echo.EchoService/
   103rewrite: /echo.EchoService/
   104name:  {self.target.path.k8s}
   105service: {self.target.path.k8s}
   106resolver: my-endpoint
   107load_balancer:
   108  policy: round_robin
   109"""
   110        )
   111
   112    def queries(self):
   113        # [0]
   114        yield Query(
   115            self.url("echo.EchoService/Echo"),
   116            headers={"content-type": "application/grpc", "requested-status": "0"},
   117            expected=200,
   118            grpc_type="real",
   119        )
   120
   121        # [1]
   122        yield Query(
   123            self.url("echo.EchoService/Echo"),
   124            headers={"content-type": "application/grpc", "requested-status": "7"},
   125            expected=200,
   126            grpc_type="real",
   127        )
   128
   129        # [2] -- PHASE 2
   130        yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
   131
   132    def check(self):
   133        # [0]
   134        assert self.results[0].headers["Grpc-Status"] == [
   135            "0"
   136        ], f'results[0]: expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
   137
   138        # [1]
   139        assert self.results[1].headers["Grpc-Status"] == [
   140            "7"
   141        ], f'results[1]: expected ["7"], got {self.results[0].headers["Grpc-Status"]}'
   142
   143        # [2]
   144        # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
   145        errors = self.results[2].json
   146        assert len(errors) == 0

View as plain text