1import base64
2import os
3import shutil
4import subprocess
5import sys
6from typing import Any, ClassVar, Generator, List, Optional, Sequence, Tuple, Union
7from typing import cast as typecast
8
9import pytest
10import yaml
11
12# These type: ignores are because, weirdly, the yaml.CSafe* variants don't share
13# a type with their non-C variants. No clue why not.
14yaml_loader = yaml.SafeLoader # type: ignore
15yaml_dumper = yaml.SafeDumper # type: ignore
16
17try:
18 yaml_loader = yaml.CSafeLoader # type: ignore
19 yaml_dumper = yaml.CSafeDumper # type: ignore
20except AttributeError:
21 pass
22
23import tests.integration.manifests as integration_manifests
24from kat.harness import Name, Node, Query, Test, abstract_test, sanitize
25from kat.utils import ShellCommand
26
27AMBASSADOR_LOCAL = """
28---
29apiVersion: v1
30kind: Secret
31metadata:
32 name: {self.path.k8s}
33 annotations:
34 kubernetes.io/service-account.name: {self.path.k8s}
35type: kubernetes.io/service-account-token
36"""
37
38
39def assert_default_errors(errors, include_ingress_errors=True):
40 default_errors = [
41 [
42 "",
43 "Ambassador could not find core CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
44 ],
45 [
46 "",
47 "Ambassador could not find Resolver type CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
48 ],
49 [
50 "",
51 "Ambassador could not find the Host CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
52 ],
53 [
54 "",
55 "Ambassador could not find the LogService CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
56 ],
57 ]
58
59 if include_ingress_errors:
60 default_errors.append(
61 [
62 "",
63 "Ambassador is not permitted to read Ingress resources. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/running/ingress-controller/#ambassador-as-an-ingress-controller for more information. You can continue using Ambassador, but Ingress resources will be ignored...",
64 ]
65 )
66
67 number_of_default_errors = len(default_errors)
68
69 if errors[:number_of_default_errors] != default_errors:
70 assert False, f"default error table mismatch: got\n{errors}"
71
72 for error in errors[number_of_default_errors:]:
73 assert (
74 "found invalid port" in error[1]
75 ), "Could not find 'found invalid port' in the error {}".format(error[1])
76
77
78DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
79
80
81@abstract_test
82class AmbassadorTest(Test):
83
84 """
85 AmbassadorTest is a top level ambassador test.
86 """
87
88 OFFSET: ClassVar[int] = 0
89 IMAGE_BUILT: ClassVar[bool] = False
90
91 _index: Optional[int] = None
92 _ambassador_id: Optional[str] = None
93 single_namespace: bool = False
94 disable_endpoints: bool = False
95 name: str
96 path: Name
97 extra_ports: Optional[List[int]] = None
98 debug_diagd: bool = True
99 debug_envoy: bool = False
100 manifest_envs = ""
101 is_ambassador = True
102 allow_edge_stack_redirect = False
103 edge_stack_cleartext_host = True
104 envoy_api_version: Optional[str] = None
105
106 env: List[str] = []
107
108 def manifests(self) -> str:
109 rbac = integration_manifests.load("rbac_cluster_scope")
110
111 self.manifest_envs += """
112 - name: POLL_EVERY_SECS
113 value: "0"
114 - name: CONSUL_WATCHER_PORT
115 value: "8500"
116"""
117
118 if os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true").lower() == "false":
119 self.manifest_envs += """
120 - name: AMBASSADOR_FAST_RECONFIGURE
121 value: "false"
122"""
123
124 amb_debug = []
125 if self.debug_diagd:
126 amb_debug.append("diagd")
127 if self.debug_envoy:
128 amb_debug.append("envoy")
129 if amb_debug:
130 self.manifest_envs += """
131 - name: AMBASSADOR_DEBUG
132 value: "%s"
133 - name: AES_LOG_LEVEL
134 value: "debug"
135""" % ":".join(
136 amb_debug
137 )
138
139 if self.ambassador_id:
140 self.manifest_envs += f"""
141 - name: AMBASSADOR_LABEL_SELECTOR
142 value: "kat-ambassador-id={self.ambassador_id}"
143"""
144
145 if self.single_namespace:
146 self.manifest_envs += """
147 - name: AMBASSADOR_SINGLE_NAMESPACE
148 value: "yes"
149"""
150 rbac = integration_manifests.load("rbac_namespace_scope")
151
152 if self.disable_endpoints:
153 self.manifest_envs += """
154 - name: AMBASSADOR_DISABLE_ENDPOINTS
155 value: "yes"
156"""
157 if not self.allow_edge_stack_redirect:
158 self.manifest_envs += """
159 - name: AMBASSADOR_NO_TLS_REDIRECT
160 value: "yes"
161"""
162
163 if self.envoy_api_version is not None:
164 self.manifest_envs += f"""
165 - name: AMBASSADOR_ENVOY_API_VERSION
166 value: "{self.envoy_api_version}"
167"""
168 elif os.environ.get("AMBASSADOR_ENVOY_API_VERSION", "") != "":
169 self.manifest_envs += (
170 """
171 - name: AMBASSADOR_ENVOY_API_VERSION
172 value: "%s"
173"""
174 % os.environ["AMBASSADOR_ENVOY_API_VERSION"]
175 )
176
177 eports = ""
178
179 if self.extra_ports:
180 for port in self.extra_ports:
181 eports += f"""
182 - name: extra-{port}
183 protocol: TCP
184 port: {port}
185 targetPort: {port}
186"""
187
188 if DEV:
189 return self.format(rbac + AMBASSADOR_LOCAL, extra_ports=eports)
190 else:
191 return self.format(
192 rbac + integration_manifests.load("ambassador"),
193 envs=self.manifest_envs,
194 extra_ports=eports,
195 capabilities_block="",
196 )
197
198 @property
199 def index(self) -> int:
200 if self._index is None:
201 # lock here?
202 self._index = AmbassadorTest.OFFSET
203 AmbassadorTest.OFFSET += 1
204
205 return typecast(int, self._index)
206
207 def post_manifest(self):
208 if not DEV:
209 return
210
211 if os.environ.get("KAT_SKIP_DOCKER"):
212 return
213
214 image = os.environ["AMBASSADOR_DOCKER_IMAGE"]
215 cached_image = os.environ["BASE_PY_IMAGE"]
216 ambassador_base_image = os.environ["BASE_GO_IMAGE"]
217
218 if not AmbassadorTest.IMAGE_BUILT:
219 AmbassadorTest.IMAGE_BUILT = True
220
221 cmd = ShellCommand(
222 "docker", "ps", "-a", "-f", "label=kat-family=ambassador", "--format", "{{.ID}}"
223 )
224
225 if cmd.check("find old docker container IDs"):
226 ids = cmd.stdout.split("\n")
227
228 while ids:
229 if ids[-1]:
230 break
231
232 ids.pop()
233
234 if ids:
235 print("Killing old containers...")
236 ShellCommand.run("kill old containers", "docker", "kill", *ids, verbose=True)
237 ShellCommand.run("rm old containers", "docker", "rm", *ids, verbose=True)
238
239 context = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
240
241 print("Starting docker build...", end="")
242 sys.stdout.flush()
243
244 cmd = ShellCommand(
245 "docker",
246 "build",
247 "--build-arg",
248 "BASE_PY_IMAGE={}".format(cached_image),
249 "--build-arg",
250 "BASE_GO_IMAGE={}".format(ambassador_base_image),
251 context,
252 "-t",
253 image,
254 )
255
256 if cmd.check("docker build Ambassador image"):
257 print("done.")
258 else:
259 pytest.exit("container failed to build")
260
261 fname = "/tmp/k8s-%s.yaml" % self.path.k8s
262 if os.path.exists(fname):
263 with open(fname) as fd:
264 content = fd.read()
265 else:
266 nsp = getattr(self, "namespace", None) or "default"
267
268 cmd = ShellCommand(
269 "tools/bin/kubectl", "get", "-n", nsp, "-o", "yaml", "secret", self.path.k8s
270 )
271
272 if not cmd.check(f"fetch secret for {self.path.k8s}"):
273 pytest.exit(f"could not fetch secret for {self.path.k8s}")
274
275 content = cmd.stdout
276
277 with open(fname, "wb") as fd:
278 fd.write(content.encode("utf-8"))
279
280 try:
281 secret = yaml.load(content, Loader=yaml_loader)
282 except Exception as e:
283 print("could not parse YAML:\n%s" % content)
284 raise e
285
286 data = secret["data"]
287 # secret_dir = tempfile.mkdtemp(prefix=self.path.k8s, suffix="secret")
288 secret_dir = "/tmp/%s-ambassadormixin-%s" % (self.path.k8s, "secret")
289
290 shutil.rmtree(secret_dir, ignore_errors=True)
291 os.mkdir(secret_dir, 0o777)
292
293 for k, v in data.items():
294 with open(os.path.join(secret_dir, k), "wb") as f:
295 f.write(base64.decodebytes(bytes(v, "utf8")))
296 print("Launching %s container." % self.path.k8s)
297 command = ["docker", "run", "-d", "-l", "kat-family=ambassador", "--name", self.path.k8s]
298
299 envs = [
300 "KUBERNETES_SERVICE_HOST=kubernetes",
301 "KUBERNETES_SERVICE_PORT=443",
302 "AMBASSADOR_SNAPSHOT_COUNT=1",
303 "AMBASSADOR_CONFIG_BASE_DIR=/tmp/ambassador",
304 "POLL_EVERY_SECS=0",
305 "CONSUL_WATCHER_PORT=8500",
306 "AMBASSADOR_UPDATE_MAPPING_STATUS=false",
307 "AMBASSADOR_ID=%s" % self.ambassador_id,
308 ]
309
310 if self.namespace:
311 envs.append("AMBASSADOR_NAMESPACE=%s" % self.namespace)
312
313 if self.single_namespace:
314 envs.append("AMBASSADOR_SINGLE_NAMESPACE=yes")
315
316 if self.disable_endpoints:
317 envs.append("AMBASSADOR_DISABLE_ENDPOINTS=yes")
318
319 amb_debug = []
320 if self.debug_diagd:
321 amb_debug.append("diagd")
322 if self.debug_envoy:
323 amb_debug.append("envoy")
324 if amb_debug:
325 envs.append("AMBASSADOR_DEBUG=%s" % ":".join(amb_debug))
326
327 envs.extend(self.env)
328 [command.extend(["-e", env]) for env in envs]
329
330 ports = [
331 "%s:8877" % (8877 + self.index),
332 "%s:8001" % (8001 + self.index),
333 "%s:8080" % (8080 + self.index),
334 "%s:8443" % (8443 + self.index),
335 ]
336
337 if self.extra_ports:
338 for port in self.extra_ports:
339 ports.append(f"{port}:{port}")
340
341 [command.extend(["-p", port]) for port in ports]
342
343 volumes = ["%s:/var/run/secrets/kubernetes.io/serviceaccount" % secret_dir]
344 [command.extend(["-v", volume]) for volume in volumes]
345
346 command.append(image)
347
348 if os.environ.get("KAT_SHOW_DOCKER"):
349 print(" ".join(command))
350
351 cmd = ShellCommand(*command)
352
353 if not cmd.check(f"start container for {self.path.k8s}"):
354 pytest.exit(f"could not start container for {self.path.k8s}")
355
356 def queries(self):
357 if DEV:
358 cmd = ShellCommand("docker", "ps", "-qf", "name=%s" % self.path.k8s)
359
360 if not cmd.check(f"docker check for {self.path.k8s}"):
361 if not cmd.stdout.strip():
362 log_cmd = ShellCommand(
363 "docker", "logs", self.path.k8s, stderr=subprocess.STDOUT
364 )
365
366 if log_cmd.check(f"docker logs for {self.path.k8s}"):
367 print(cmd.stdout)
368
369 pytest.exit(f"container failed to start for {self.path.k8s}")
370
371 return ()
372
373 def scheme(self) -> str:
374 return "http"
375
376 def url(self, prefix, scheme=None, port=None) -> str:
377 if scheme is None:
378 scheme = self.scheme()
379
380 if DEV:
381 if not port:
382 port = 8443 if scheme == "https" else 8080
383 port += self.index
384
385 return "%s://%s/%s" % (scheme, "localhost:%s" % port, prefix)
386 else:
387 host_and_port = self.path.fqdn
388
389 if port:
390 host_and_port += f":{port}"
391
392 return "%s://%s/%s" % (scheme, host_and_port, prefix)
393
394 def requirements(self):
395 yield ("url", Query(self.url("ambassador/v0/check_ready")))
396 yield ("url", Query(self.url("ambassador/v0/check_alive")))
397
398
399@abstract_test
400class ServiceType(Node):
401
402 path: Name
403 _manifests: Optional[str]
404 use_superpod: bool = True
405
406 def __init__(
407 self, service_manifests: str = None, namespace: str = None, *args, **kwargs
408 ) -> None:
409 super().__init__(namespace=namespace, *args, **kwargs)
410
411 self._manifests = service_manifests
412
413 if self._manifests:
414 self.use_superpod = False
415
416 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
417 yield from ()
418
419 def manifests(self):
420 if self.use_superpod:
421 return None
422
423 return self.format(self._manifests)
424
425 def requirements(self):
426 if self.use_superpod:
427 yield from ()
428
429 yield ("url", Query("http://%s" % self.path.fqdn))
430 yield ("url", Query("https://%s" % self.path.fqdn))
431
432
433@abstract_test
434class ServiceTypeGrpc(Node):
435
436 path: Name
437
438 def __init__(self, service_manifests: str = None, *args, **kwargs) -> None:
439 super().__init__(*args, **kwargs)
440 self._manifests = service_manifests or integration_manifests.load("backend")
441
442 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
443 yield from ()
444
445 def manifests(self):
446 return self.format(self._manifests)
447
448 def requirements(self):
449 yield ("url", Query("http://%s" % self.path.fqdn))
450 yield ("url", Query("https://%s" % self.path.fqdn))
451
452
453class HTTP(ServiceType):
454 pass
455
456
457class GRPC(ServiceType):
458 pass
459
460
461class EGRPC(ServiceType):
462 skip_variant: ClassVar[bool] = True
463
464 def __init__(self, *args, **kwargs) -> None:
465 # Do this unconditionally, because that's the point of this class.
466 kwargs["service_manifests"] = integration_manifests.load("grpc_echo_backend")
467 super().__init__(*args, **kwargs)
468
469 def requirements(self):
470 yield (
471 "url",
472 Query(
473 "http://%s/echo.EchoService/Echo" % self.path.fqdn,
474 headers={"content-type": "application/grpc", "requested-status": "0"},
475 expected=200,
476 grpc_type="real",
477 ),
478 )
479
480
481class AHTTP(ServiceType):
482 skip_variant: ClassVar[bool] = True
483
484 def __init__(self, *args, **kwargs) -> None:
485 # Do this unconditionally, because that's the point of this class.
486 kwargs["service_manifests"] = integration_manifests.load("auth_backend")
487 super().__init__(*args, **kwargs)
488
489
490class AGRPC(ServiceType):
491 skip_variant: ClassVar[bool] = True
492
493 def __init__(self, protocol_version: str = "v2", *args, **kwargs) -> None:
494 self.protocol_version = protocol_version
495
496 # Do this unconditionally, because that's the point of this class.
497 kwargs["service_manifests"] = integration_manifests.load("grpc_auth_backend")
498 super().__init__(*args, **kwargs)
499
500 def requirements(self):
501 yield ("pod", self.path.k8s)
502
503
504class RLSGRPC(ServiceType):
505 skip_variant: ClassVar[bool] = True
506
507 def __init__(self, protocol_version: str = "v2", *args, **kwargs) -> None:
508 self.protocol_version = protocol_version
509
510 # Do this unconditionally, because that's the point of this class.
511 kwargs["service_manifests"] = integration_manifests.load("grpc_rls_backend")
512 super().__init__(*args, **kwargs)
513
514 def requirements(self):
515 yield ("pod", self.path.k8s)
516
517
518class ALSGRPC(ServiceType):
519 skip_variant: ClassVar[bool] = True
520
521 def __init__(self, *args, **kwargs) -> None:
522 # Do this unconditionally, because that's the point of this class.
523 kwargs["service_manifests"] = integration_manifests.load("grpc_als_backend")
524 super().__init__(*args, **kwargs)
525
526 def requirements(self):
527 yield ("pod", self.path.k8s)
528
529
530class HTTPBin(ServiceType):
531 skip_variant: ClassVar[bool] = True
532
533 def __init__(self, *args, **kwargs) -> None:
534 # Do this unconditionally, because that's the point of this class.
535 kwargs["service_manifests"] = integration_manifests.load("httpbin_backend")
536 super().__init__(*args, **kwargs)
537
538 def requirements(self):
539 yield ("url", Query("http://%s/status/200" % self.path.fqdn))
540
541
542class WebsocketEcho(ServiceType):
543 skip_variant: ClassVar[bool] = True
544
545 def __init__(self, *args, **kwargs) -> None:
546 # Do this unconditionally, because that's the point of this class.
547 kwargs["service_manifests"] = integration_manifests.load("websocket_echo_backend")
548 super().__init__(*args, **kwargs)
549
550 def requirements(self):
551 yield ("url", Query("http://%s/" % self.path.fqdn, expected=404))
552
553
554class StatsDSink(ServiceType):
555 skip_variant: ClassVar[bool] = True
556 target_cluster: str
557
558 def __init__(self, target_cluster: str, *args, **kwargs) -> None:
559 self.target_cluster = target_cluster
560 # Do this unconditionally, because that's the point of this class.
561 kwargs["service_manifests"] = integration_manifests.load("statsd_backend")
562 super().__init__(*args, **kwargs)
563
564 def requirements(self):
565 yield ("url", Query("http://%s/SUMMARY" % self.path.fqdn))
566
567
568@abstract_test
569class MappingTest(Test):
570
571 target: ServiceType
572 options: Sequence["OptionTest"]
573 parent: AmbassadorTest
574
575 no_local_mode = True
576 skip_local_instead_of_xfail = "Plain (MappingTest)"
577
578 def init(self, target: ServiceType, options=()) -> None:
579 self.target = target
580 self.options = list(options)
581 self.is_ambassador = True
582
583
584@abstract_test
585class OptionTest(Test):
586
587 VALUES: ClassVar[Any] = None
588 value: Any
589 parent: Test
590
591 no_local_mode = True
592 skip_local_instead_of_xfail = "Plain (OptionTests)"
593
594 @classmethod
595 def variants(cls) -> Generator[Node, None, None]:
596 if cls.VALUES is None:
597 yield cls()
598 else:
599 for val in cls.VALUES:
600 yield cls(val, name=sanitize(val))
601
602 def init(self, value=None):
603 self.value = value
View as plain text