...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/kat/t_grpc.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
     4from kat.harness import Query
     5
     6
     7class AcceptanceGrpcTest(AmbassadorTest):
     8    target: ServiceType
     9
    10    def init(self):
    11        self.target = EGRPC()
    12
    13    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    14        #         yield self, self.format("""
    15        # ---
    16        # apiVersion: getambassador.io/v3alpha1
    17        # kind:  Module
    18        # name:  ambassador
    19        # # """)
    20
    21        yield self, self.format(
    22            """
    23---
    24apiVersion: getambassador.io/v3alpha1
    25kind: Mapping
    26grpc: True
    27hostname: "*"
    28prefix: /echo.EchoService/
    29rewrite: ""   # This means to leave the prefix unaltered.
    30name:  {self.target.path.k8s}
    31service: {self.target.path.k8s}
    32"""
    33        )
    34
    35    def queries(self):
    36        # [0]
    37        yield Query(
    38            self.url("echo.EchoService/Echo"),
    39            headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
    40            expected=200,
    41            grpc_type="real",
    42        )
    43
    44        # [1]
    45        yield Query(
    46            self.url("echo.EchoService/Echo"),
    47            headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "7"},
    48            expected=200,
    49            grpc_type="real",
    50        )
    51
    52        # [2] -- PHASE 2
    53        yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
    54
    55    def check(self):
    56        # [0]
    57        assert self.results[0].headers["Grpc-Status"] == [
    58            "0"
    59        ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
    60
    61        # [1]
    62        assert self.results[1].headers["Grpc-Status"] == [
    63            "7"
    64        ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
    65
    66        # [2]
    67        # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
    68        errors = self.results[2].json
    69        assert len(errors) == 0
    70
    71
    72class EndpointGrpcTest(AmbassadorTest):
    73    target: ServiceType
    74
    75    def init(self):
    76        self.target = EGRPC()
    77
    78    def manifests(self) -> str:
    79        return (
    80            self.format(
    81                """
    82---
    83apiVersion: getambassador.io/v3alpha1
    84kind: KubernetesEndpointResolver
    85metadata:
    86    name: my-endpoint
    87spec:
    88    ambassador_id: ["endpointgrpctest"]
    89---
    90apiVersion: getambassador.io/v3alpha1
    91kind: Mapping
    92metadata:
    93    name: {self.target.path.k8s}
    94spec:
    95    ambassador_id: ["endpointgrpctest"]
    96    grpc: True
    97    hostname: "*"
    98    prefix: /echo.EchoService/
    99    rewrite: ""   # This means to leave the prefix unaltered.
   100    service: {self.target.path.k8s}
   101    resolver: my-endpoint
   102    load_balancer:
   103        policy: round_robin
   104"""
   105            )
   106            + super().manifests()
   107        )
   108
   109    def queries(self):
   110        # [0]
   111        yield Query(
   112            self.url("echo.EchoService/Echo"),
   113            headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
   114            expected=200,
   115            grpc_type="real",
   116        )
   117
   118        # [1]
   119        yield Query(
   120            self.url("echo.EchoService/Echo"),
   121            headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "7"},
   122            expected=200,
   123            grpc_type="real",
   124        )
   125
   126        # [2] -- PHASE 2
   127        yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
   128
   129    def check(self):
   130        # [0]
   131        assert self.results[0].headers["Grpc-Status"] == [
   132            "0"
   133        ], f'results[0]: expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
   134
   135        # [1]
   136        assert self.results[1].headers["Grpc-Status"] == [
   137            "7"
   138        ], f'results[1]: expected ["7"], got {self.results[0].headers["Grpc-Status"]}'
   139
   140        # [2]
   141        # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
   142        errors = self.results[2].json
   143        assert len(errors) == 0

View as plain text