...
1from typing import Generator, Tuple, Union
2
3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
4from kat.harness import Query
5
6
7class AcceptanceGrpcBridgeTest(AmbassadorTest):
8
9 target: ServiceType
10
11 def init(self):
12 self.target = EGRPC()
13
14 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
15 yield self, self.format(
16 """
17---
18apiVersion: getambassador.io/v3alpha1
19kind: Module
20name: ambassador
21config:
22 enable_grpc_http11_bridge: True
23"""
24 )
25
26 yield self, self.format(
27 """
28---
29apiVersion: getambassador.io/v3alpha1
30kind: Mapping
31grpc: True
32hostname: "*"
33prefix: /echo.EchoService/
34rewrite: /echo.EchoService/
35name: {self.target.path.k8s}
36service: {self.target.path.k8s}
37"""
38 )
39
40 def queries(self):
41 # [0]
42 yield Query(
43 self.url("echo.EchoService/Echo"),
44 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
45 expected=200,
46 grpc_type="bridge",
47 )
48
49 # [1]
50 yield Query(
51 self.url("echo.EchoService/Echo"),
52 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "7"},
53 expected=503,
54 grpc_type="bridge",
55 )
56
57 def check(self):
58 # [0]
59 assert self.results[0].headers["Grpc-Status"] == ["0"]
60
61 # [1]
62 assert self.results[1].headers["Grpc-Status"] == ["7"]
View as plain text