...
1from typing import Generator, Tuple, Union
2
3from abstract_tests import EGRPC, AmbassadorTest, Node, ServiceType
4from kat.harness import Query
5
6
7class AcceptanceGrpcWebTest(AmbassadorTest):
8
9 target: ServiceType
10
11 def init(self):
12 self.target = EGRPC()
13
14 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
15 yield self, self.format(
16 """
17---
18apiVersion: getambassador.io/v3alpha1
19kind: Module
20name: ambassador
21config:
22 enable_grpc_web: True
23"""
24 )
25
26 yield self, self.format(
27 """
28---
29apiVersion: getambassador.io/v3alpha1
30kind: Mapping
31grpc: True
32hostname: "*"
33prefix: /echo.EchoService/
34rewrite: /echo.EchoService/
35name: {self.target.path.k8s}
36service: {self.target.path.k8s}
37"""
38 )
39
40 def queries(self):
41 # [0]
42 yield Query(
43 self.url("echo.EchoService/Echo"),
44 headers={
45 "content-type": "application/grpc-web-text",
46 "accept": "application/grpc-web-text",
47 "requested-status": "0",
48 },
49 expected=200,
50 grpc_type="web",
51 )
52
53 # [1]
54 yield Query(
55 self.url("echo.EchoService/Echo"),
56 headers={
57 "content-type": "application/grpc-web-text",
58 "accept": "application/grpc-web-text",
59 "requested-status": "7",
60 },
61 expected=200,
62 grpc_type="web",
63 )
64
65 def check(self):
66 # print('AcceptanceGrpcWebTest results:')
67 #
68 # i = 0
69 #
70 # for result in self.results:
71 # print(f'{i}: {json.dumps(result.as_dict(), sort_keys=True, indent=4)}')
72 # print(f' headers {json.dumps(result.headers, sort_keys=True)}')
73 # print(f' grpc-status {json.dumps(result.headers.get("Grpc-Status", "-none-"), sort_keys=True)}')
74 # i += 1
75
76 # [0]
77 gstat = self.results[0].headers.get("Grpc-Status", "-none-")
78 # print(f' grpc-status {gstat}')
79
80 if gstat == ["0"]:
81 assert True
82 else:
83 assert False, f'0: got {gstat} instead of ["0"]'
84
85 # # [0]
86 # assert self.results[0].headers["Grpc-Status"] == ["0"]
87
88 # [1]
89 assert self.results[1].headers["Grpc-Status"] == ["7"]
View as plain text