...
1from typing import Generator, Tuple, Union
2
3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
4from kat.harness import Query
5
6
7class AcceptanceGrpcTest(AmbassadorTest):
8 target: ServiceType
9
10 def init(self):
11 self.target = EGRPC()
12
13 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
14 # yield self, self.format("""
15 # ---
16 # apiVersion: getambassador.io/v3alpha1
17 # kind: Module
18 # name: ambassador
19 # # """)
20
21 yield self, self.format(
22 """
23---
24apiVersion: getambassador.io/v3alpha1
25kind: Mapping
26grpc: True
27hostname: "*"
28prefix: /echo.EchoService/
29rewrite: "" # This means to leave the prefix unaltered.
30name: {self.target.path.k8s}
31service: {self.target.path.k8s}
32"""
33 )
34
35 def queries(self):
36 # [0]
37 yield Query(
38 self.url("echo.EchoService/Echo"),
39 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
40 expected=200,
41 grpc_type="real",
42 )
43
44 # [1]
45 yield Query(
46 self.url("echo.EchoService/Echo"),
47 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "7"},
48 expected=200,
49 grpc_type="real",
50 )
51
52 # [2] -- PHASE 2
53 yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
54
55 def check(self):
56 # [0]
57 assert self.results[0].headers["Grpc-Status"] == [
58 "0"
59 ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
60
61 # [1]
62 assert self.results[1].headers["Grpc-Status"] == [
63 "7"
64 ], f'0 expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
65
66 # [2]
67 # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
68 errors = self.results[2].json
69 assert len(errors) == 0
70
71
72class EndpointGrpcTest(AmbassadorTest):
73 target: ServiceType
74
75 def init(self):
76 self.target = EGRPC()
77
78 def manifests(self) -> str:
79 return (
80 self.format(
81 """
82---
83apiVersion: getambassador.io/v3alpha1
84kind: KubernetesEndpointResolver
85metadata:
86 name: my-endpoint
87spec:
88 ambassador_id: ["endpointgrpctest"]
89---
90apiVersion: getambassador.io/v3alpha1
91kind: Mapping
92metadata:
93 name: {self.target.path.k8s}
94spec:
95 ambassador_id: ["endpointgrpctest"]
96 grpc: True
97 hostname: "*"
98 prefix: /echo.EchoService/
99 rewrite: "" # This means to leave the prefix unaltered.
100 service: {self.target.path.k8s}
101 resolver: my-endpoint
102 load_balancer:
103 policy: round_robin
104"""
105 )
106 + super().manifests()
107 )
108
109 def queries(self):
110 # [0]
111 yield Query(
112 self.url("echo.EchoService/Echo"),
113 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
114 expected=200,
115 grpc_type="real",
116 )
117
118 # [1]
119 yield Query(
120 self.url("echo.EchoService/Echo"),
121 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "7"},
122 expected=200,
123 grpc_type="real",
124 )
125
126 # [2] -- PHASE 2
127 yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2)
128
129 def check(self):
130 # [0]
131 assert self.results[0].headers["Grpc-Status"] == [
132 "0"
133 ], f'results[0]: expected ["0"], got {self.results[0].headers["Grpc-Status"]}'
134
135 # [1]
136 assert self.results[1].headers["Grpc-Status"] == [
137 "7"
138 ], f'results[1]: expected ["7"], got {self.results[0].headers["Grpc-Status"]}'
139
140 # [2]
141 # XXX Ew. If self.results[2].json is empty, the harness won't convert it to a response.
142 errors = self.results[2].json
143 assert len(errors) == 0
View as plain text