1from typing import Generator, Tuple, Union
2
3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
4from kat.harness import Query
5
6
7class AcceptanceGrpcTest(AmbassadorTest):
8 target: ServiceType
9
10 def init(self):
11 self.target = EGRPC()
12
13 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
14 # yield self, self.format("""
15 # ---
16 # apiVersion: getambassador.io/v3alpha1
17 # kind: Module
18 # name: ambassador
19 # # """)
20
21 yield self, self.format(
22 """
23---
24apiVersion: getambassador.io/v3alpha1
25kind: Mapping
26grpc: True
27hostname: "*"
28prefix: /echo.EchoService/
29rewrite: /echo.EchoService/
30name: {self.target.path.k8s}
31service: {self.target.path.k8s}
32"""
33 )
34
35 def queries(self):
36 # [0]
37 yield Query(
38 self.url("echo.EchoService/Echo"),
39 headers={"content-type": "application/grpc", "requested-status": "0"},
40 expected=200,
41 grpc_type="real",
42 )
43
44 # [1]
45 yield Query(
46 self.url("echo.EchoService/Echo"),
47 headers={"content-type": "application/grpc", "requested-status": "7"},
48 expected=200,
49 grpc_type="real",
50 )
51
52 # [2] -- PHASE 2
53 yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
54
55 def check(self):
56 # [0]
57 assert self.results[0].headers["Grpc-Status"] == [
58 "0"
59 ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
60
61 # [1]
62 assert self.results[1].headers["Grpc-Status"] == [
63 "7"
64 ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
65
66 # [2]
67 # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
68 errors = self.results[2].json
69 assert len(errors) == 0
70
71
72class EndpointGrpcTest(AmbassadorTest):
73 target: ServiceType
74
75 def init(self):
76 self.target = EGRPC()
77
78 def manifests(self) -> str:
79 return (
80 self.format(
81 """
82---
83apiVersion: getambassador.io/v3alpha1
84kind: KubernetesEndpointResolver
85metadata:
86 name: my-endpoint
87spec:
88 ambassador_id: ["endpointgrpctest"]
89"""
90 )
91 + super().manifests()
92 )
93
94 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
95 yield self, self.format(
96 """
97---
98apiVersion: getambassador.io/v3alpha1
99kind: Mapping
100grpc: True
101hostname: "*"
102prefix: /echo.EchoService/
103rewrite: /echo.EchoService/
104name: {self.target.path.k8s}
105service: {self.target.path.k8s}
106resolver: my-endpoint
107load_balancer:
108 policy: round_robin
109"""
110 )
111
112 def queries(self):
113 # [0]
114 yield Query(
115 self.url("echo.EchoService/Echo"),
116 headers={"content-type": "application/grpc", "requested-status": "0"},
117 expected=200,
118 grpc_type="real",
119 )
120
121 # [1]
122 yield Query(
123 self.url("echo.EchoService/Echo"),
124 headers={"content-type": "application/grpc", "requested-status": "7"},
125 expected=200,
126 grpc_type="real",
127 )
128
129 # [2] -- PHASE 2
130 yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
131
132 def check(self):
133 # [0]
134 assert self.results[0].headers["Grpc-Status"] == [
135 "0"
136 ], f'results[0]: expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
137
138 # [1]
139 assert self.results[1].headers["Grpc-Status"] == [
140 "7"
141 ], f'results[1]: expected ["7"], got {self.results[0].headers["Grpc-Status"]}'
142
143 # [2]
144 # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
145 errors = self.results[2].json
146 assert len(errors) == 0
View as plain text