...

Text file src/github.com/datawire/ambassador/v2/python/tests/kat/t_grpc_bridge.py

Documentation: github.com/datawire/ambassador/v2/python/tests/kat

     1from typing import Generator, Tuple, Union
     2
     3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
     4from kat.harness import Query
     5
     6
     7class AcceptanceGrpcBridgeTest(AmbassadorTest):
     8
     9    target: ServiceType
    10
    11    def init(self):
    12        self.target = EGRPC()
    13
    14    def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
    15        yield self, self.format(
    16            """
    17---
    18apiVersion: getambassador.io/v3alpha1
    19kind:  Module
    20name:  ambassador
    21config:
    22    enable_grpc_http11_bridge: True
    23"""
    24        )
    25
    26        yield self, self.format(
    27            """
    28---
    29apiVersion: getambassador.io/v3alpha1
    30kind: Mapping
    31grpc: True
    32hostname: "*"
    33prefix: /echo.EchoService/
    34rewrite: /echo.EchoService/
    35name:  {self.target.path.k8s}
    36service: {self.target.path.k8s}
    37"""
    38        )
    39
    40    def queries(self):
    41        # [0]
    42        yield Query(
    43            self.url("echo.EchoService/Echo"),
    44            headers={"content-type": "application/grpc", "requested-status": "0"},
    45            expected=200,
    46            grpc_type="bridge",
    47        )
    48
    49        # [1]
    50        yield Query(
    51            self.url("echo.EchoService/Echo"),
    52            headers={"content-type": "application/grpc", "requested-status": "7"},
    53            expected=503,
    54            grpc_type="bridge",
    55        )
    56
    57    def check(self):
    58        # [0]
    59        assert self.results[0].headers["Grpc-Status"] == ["0"]
    60
    61        # [1]
    62        assert self.results[1].headers["Grpc-Status"] == ["7"]

View as plain text