1import base64
2import os
3import shutil
4import subprocess
5import sys
6from random import random
7from typing import Any, ClassVar, Generator, List, Optional, Sequence, Tuple, Union
8from typing import cast as typecast
9
10import pytest
11import yaml
12
13# These type: ignores are because, weirdly, the yaml.CSafe* variants don't share
14# a type with their non-C variants. No clue why not.
15yaml_loader = yaml.SafeLoader # type: ignore
16yaml_dumper = yaml.SafeDumper # type: ignore
17
18try:
19 yaml_loader = yaml.CSafeLoader # type: ignore
20 yaml_dumper = yaml.CSafeDumper # type: ignore
21except AttributeError:
22 pass
23
24import tests.integration.manifests as integration_manifests
25from kat.harness import Name, Node, Query, Test, abstract_test, sanitize
26from kat.utils import ShellCommand
27
28AMBASSADOR_LOCAL = """
29---
30apiVersion: v1
31kind: Secret
32metadata:
33 name: {self.path.k8s}
34 annotations:
35 kubernetes.io/service-account.name: {self.path.k8s}
36type: kubernetes.io/service-account-token
37"""
38
39
40def assert_default_errors(errors, include_ingress_errors=True):
41 default_errors = [
42 [
43 "",
44 "Ambassador could not find core CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
45 ],
46 [
47 "",
48 "Ambassador could not find Resolver type CRD definitions. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
49 ],
50 [
51 "",
52 "Ambassador could not find the Host CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
53 ],
54 [
55 "",
56 "Ambassador could not find the LogService CRD definition. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/install/upgrade-to-edge-stack/#5-update-and-restart for more information. You can continue using Ambassador via Kubernetes annotations, any configuration via CRDs will be ignored...",
57 ],
58 ]
59
60 if include_ingress_errors:
61 default_errors.append(
62 [
63 "",
64 "Ambassador is not permitted to read Ingress resources. Please visit https://www.getambassador.io/docs/edge-stack/latest/topics/running/ingress-controller/#ambassador-as-an-ingress-controller for more information. You can continue using Ambassador, but Ingress resources will be ignored...",
65 ]
66 )
67
68 number_of_default_errors = len(default_errors)
69
70 if errors[:number_of_default_errors] != default_errors:
71 assert False, f"default error table mismatch: got\n{errors}"
72
73 for error in errors[number_of_default_errors:]:
74 assert (
75 "found invalid port" in error[1]
76 ), "Could not find 'found invalid port' in the error {}".format(error[1])
77
78
79DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
80
81
82@abstract_test
83class AmbassadorTest(Test):
84
85 """
86 AmbassadorTest is a top level ambassador test.
87 """
88
89 OFFSET: ClassVar[int] = 0
90 IMAGE_BUILT: ClassVar[bool] = False
91
92 _index: Optional[int] = None
93 _ambassador_id: Optional[str] = None
94 single_namespace: bool = False
95 disable_endpoints: bool = False
96 name: str
97 path: Name
98 extra_ports: Optional[List[int]] = None
99 debug_diagd: bool = True
100 debug_envoy: bool = False
101 manifest_envs = ""
102 is_ambassador = True
103 allow_edge_stack_redirect = False
104 edge_stack_cleartext_host = True
105
106 env: List[str] = []
107
108 def manifests(self) -> str:
109 rbac = integration_manifests.load("rbac_cluster_scope")
110
111 self.manifest_envs += """
112 - name: POLL_EVERY_SECS
113 value: "0"
114 - name: CONSUL_WATCHER_PORT
115 value: "8500"
116"""
117
118 if os.environ.get("AMBASSADOR_FAST_RECONFIGURE", "true").lower() == "false":
119 self.manifest_envs += """
120 - name: AMBASSADOR_FAST_RECONFIGURE
121 value: "false"
122"""
123
124 amb_debug = []
125 if self.debug_diagd:
126 amb_debug.append("diagd")
127 if self.debug_envoy:
128 amb_debug.append("envoy")
129 if amb_debug:
130 self.manifest_envs += """
131 - name: AMBASSADOR_DEBUG
132 value: "%s"
133 - name: AES_LOG_LEVEL
134 value: "debug"
135""" % ":".join(
136 amb_debug
137 )
138
139 if self.ambassador_id:
140 self.manifest_envs += f"""
141 - name: AMBASSADOR_LABEL_SELECTOR
142 value: "kat-ambassador-id={self.ambassador_id}"
143"""
144
145 if self.single_namespace:
146 self.manifest_envs += """
147 - name: AMBASSADOR_SINGLE_NAMESPACE
148 value: "yes"
149"""
150 rbac = integration_manifests.load("rbac_namespace_scope")
151
152 if self.disable_endpoints:
153 self.manifest_envs += """
154 - name: AMBASSADOR_DISABLE_ENDPOINTS
155 value: "yes"
156"""
157 if not self.allow_edge_stack_redirect:
158 self.manifest_envs += """
159 - name: AMBASSADOR_NO_TLS_REDIRECT
160 value: "yes"
161"""
162
163 eports = ""
164
165 if self.extra_ports:
166 for port in self.extra_ports:
167 eports += f"""
168 - name: extra-{port}
169 protocol: TCP
170 port: {port}
171 targetPort: {port}
172"""
173
174 if DEV:
175 return self.format(rbac + AMBASSADOR_LOCAL, extra_ports=eports)
176 else:
177 return self.format(
178 rbac + integration_manifests.load("ambassador"),
179 envs=self.manifest_envs,
180 extra_ports=eports,
181 capabilities_block="",
182 )
183
184 @property
185 def index(self) -> int:
186 if self._index is None:
187 # lock here?
188 self._index = AmbassadorTest.OFFSET
189 AmbassadorTest.OFFSET += 1
190
191 return typecast(int, self._index)
192
193 def post_manifest(self):
194 if not DEV:
195 return
196
197 if os.environ.get("KAT_SKIP_DOCKER"):
198 return
199
200 image = os.environ["AMBASSADOR_DOCKER_IMAGE"]
201 cached_image = os.environ["BASE_PY_IMAGE"]
202 ambassador_base_image = os.environ["BASE_GO_IMAGE"]
203
204 if not AmbassadorTest.IMAGE_BUILT:
205 AmbassadorTest.IMAGE_BUILT = True
206
207 cmd = ShellCommand(
208 "docker", "ps", "-a", "-f", "label=kat-family=ambassador", "--format", "{{.ID}}"
209 )
210
211 if cmd.check("find old docker container IDs"):
212 ids = cmd.stdout.split("\n")
213
214 while ids:
215 if ids[-1]:
216 break
217
218 ids.pop()
219
220 if ids:
221 print("Killing old containers...")
222 ShellCommand.run("kill old containers", "docker", "kill", *ids, verbose=True)
223 ShellCommand.run("rm old containers", "docker", "rm", *ids, verbose=True)
224
225 context = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
226
227 print("Starting docker build...", end="")
228 sys.stdout.flush()
229
230 cmd = ShellCommand(
231 "docker",
232 "build",
233 "--build-arg",
234 "BASE_PY_IMAGE={}".format(cached_image),
235 "--build-arg",
236 "BASE_GO_IMAGE={}".format(ambassador_base_image),
237 context,
238 "-t",
239 image,
240 )
241
242 if cmd.check("docker build Ambassador image"):
243 print("done.")
244 else:
245 pytest.exit("container failed to build")
246
247 fname = "/tmp/k8s-%s.yaml" % self.path.k8s
248 if os.path.exists(fname):
249 with open(fname) as fd:
250 content = fd.read()
251 else:
252 nsp = getattr(self, "namespace", None) or "default"
253
254 cmd = ShellCommand(
255 "tools/bin/kubectl", "get", "-n", nsp, "-o", "yaml", "secret", self.path.k8s
256 )
257
258 if not cmd.check(f"fetch secret for {self.path.k8s}"):
259 pytest.exit(f"could not fetch secret for {self.path.k8s}")
260
261 content = cmd.stdout
262
263 with open(fname, "wb") as fd:
264 fd.write(content.encode("utf-8"))
265
266 try:
267 secret = yaml.load(content, Loader=yaml_loader)
268 except Exception as e:
269 print("could not parse YAML:\n%s" % content)
270 raise e
271
272 data = secret["data"]
273 # secret_dir = tempfile.mkdtemp(prefix=self.path.k8s, suffix="secret")
274 secret_dir = "/tmp/%s-ambassadormixin-%s" % (self.path.k8s, "secret")
275
276 shutil.rmtree(secret_dir, ignore_errors=True)
277 os.mkdir(secret_dir, 0o777)
278
279 for k, v in data.items():
280 with open(os.path.join(secret_dir, k), "wb") as f:
281 f.write(base64.decodebytes(bytes(v, "utf8")))
282 print("Launching %s container." % self.path.k8s)
283 command = ["docker", "run", "-d", "-l", "kat-family=ambassador", "--name", self.path.k8s]
284
285 envs = [
286 "KUBERNETES_SERVICE_HOST=kubernetes",
287 "KUBERNETES_SERVICE_PORT=443",
288 "AMBASSADOR_SNAPSHOT_COUNT=1",
289 "AMBASSADOR_CONFIG_BASE_DIR=/tmp/ambassador",
290 "POLL_EVERY_SECS=0",
291 "CONSUL_WATCHER_PORT=8500",
292 "AMBASSADOR_UPDATE_MAPPING_STATUS=false",
293 "AMBASSADOR_ID=%s" % self.ambassador_id,
294 ]
295
296 if self.namespace:
297 envs.append("AMBASSADOR_NAMESPACE=%s" % self.namespace)
298
299 if self.single_namespace:
300 envs.append("AMBASSADOR_SINGLE_NAMESPACE=yes")
301
302 if self.disable_endpoints:
303 envs.append("AMBASSADOR_DISABLE_ENDPOINTS=yes")
304
305 amb_debug = []
306 if self.debug_diagd:
307 amb_debug.append("diagd")
308 if self.debug_envoy:
309 amb_debug.append("envoy")
310 if amb_debug:
311 envs.append("AMBASSADOR_DEBUG=%s" % ":".join(amb_debug))
312
313 envs.extend(self.env)
314 for env in envs:
315 command.extend(["-e", env])
316
317 ports = [
318 "%s:8877" % (8877 + self.index),
319 "%s:8001" % (8001 + self.index),
320 "%s:8080" % (8080 + self.index),
321 "%s:8443" % (8443 + self.index),
322 ]
323
324 if self.extra_ports:
325 for port in self.extra_ports:
326 ports.append(f"{port}:{port}")
327
328 for port_str in ports:
329 command.extend(["-p", port_str])
330
331 volumes = ["%s:/var/run/secrets/kubernetes.io/serviceaccount" % secret_dir]
332 for volume in volumes:
333 command.extend(["-v", volume])
334
335 command.append(image)
336
337 if os.environ.get("KAT_SHOW_DOCKER"):
338 print(" ".join(command))
339
340 cmd = ShellCommand(*command)
341
342 if not cmd.check(f"start container for {self.path.k8s}"):
343 pytest.exit(f"could not start container for {self.path.k8s}")
344
345 def queries(self):
346 if DEV:
347 cmd = ShellCommand("docker", "ps", "-qf", "name=%s" % self.path.k8s)
348
349 if not cmd.check(f"docker check for {self.path.k8s}"):
350 if not cmd.stdout.strip():
351 log_cmd = ShellCommand(
352 "docker", "logs", self.path.k8s, stderr=subprocess.STDOUT
353 )
354
355 if log_cmd.check(f"docker logs for {self.path.k8s}"):
356 print(cmd.stdout)
357
358 pytest.exit(f"container failed to start for {self.path.k8s}")
359
360 return ()
361
362 def scheme(self) -> str:
363 return "http"
364
365 def url(self, prefix, scheme=None, port=None) -> str:
366 if scheme is None:
367 scheme = self.scheme()
368
369 if DEV:
370 if not port:
371 port = 8443 if scheme == "https" else 8080
372 port += self.index
373
374 return "%s://%s/%s" % (scheme, "localhost:%s" % port, prefix)
375 else:
376 host_and_port = self.path.fqdn
377
378 if port:
379 host_and_port += f":{port}"
380
381 return "%s://%s/%s" % (scheme, host_and_port, prefix)
382
383 def requirements(self):
384 yield ("url", Query(self.url("ambassador/v0/check_ready")))
385 yield ("url", Query(self.url("ambassador/v0/check_alive")))
386
387
388@abstract_test
389class ServiceType(Node):
390
391 path: Name
392 _manifests: Optional[str]
393 use_superpod: bool = True
394
395 def __init__(
396 self, service_manifests: str = None, namespace: str = None, *args, **kwargs
397 ) -> None:
398 super().__init__(namespace=namespace, *args, **kwargs)
399
400 self._manifests = service_manifests
401
402 if self._manifests:
403 self.use_superpod = False
404
405 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
406 yield from ()
407
408 def manifests(self):
409 if self.use_superpod:
410 return None
411
412 return self.format(self._manifests)
413
414 def requirements(self):
415 if self.use_superpod:
416 yield from ()
417
418 yield ("url", Query("http://%s" % self.path.fqdn))
419 yield ("url", Query("https://%s" % self.path.fqdn))
420
421
422@abstract_test
423class ServiceTypeGrpc(Node):
424
425 path: Name
426
427 def __init__(self, service_manifests: str = None, *args, **kwargs) -> None:
428 super().__init__(*args, **kwargs)
429 self._manifests = service_manifests or integration_manifests.load("backend")
430
431 def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]:
432 yield from ()
433
434 def manifests(self):
435 return self.format(self._manifests)
436
437 def requirements(self):
438 yield ("url", Query("http://%s" % self.path.fqdn))
439 yield ("url", Query("https://%s" % self.path.fqdn))
440
441
442class HTTP(ServiceType):
443 pass
444
445
446class GRPC(ServiceType):
447 pass
448
449
450class EGRPC(ServiceType):
451 skip_variant: ClassVar[bool] = True
452
453 def __init__(self, *args, **kwargs) -> None:
454 # Do this unconditionally, because that's the point of this class.
455 kwargs["service_manifests"] = integration_manifests.load("grpc_echo_backend")
456 super().__init__(*args, **kwargs)
457
458 def requirements(self):
459 yield (
460 "url",
461 Query(
462 "http://%s/echo.EchoService/Echo" % self.path.fqdn,
463 headers={"content-type": "application/grpc", "kat-req-echo-requested-status": "0"},
464 expected=200,
465 grpc_type="real",
466 ),
467 )
468
469
470class HealthCheckServer(ServiceType):
471 skip_variant: ClassVar[bool] = True
472
473 def __init__(self, *args, **kwargs) -> None:
474 # We want to reset the health check server between runs since the test involves making one
475 # of the pods unhealthy. 'nonce' is a
476 # horrible hack to get the Pod to roll over each invocation.
477 self.nonce = random()
478 self.use_superpod = False
479 # Do this unconditionally, because that's the point of this class.
480 kwargs["service_manifests"] = integration_manifests.load("health_check_server")
481 super().__init__(*args, **kwargs)
482
483 def requirements(self):
484 yield ("deployment", self.path.k8s)
485 yield ("url", Query("http://%s" % self.path.fqdn))
486 yield ("url", Query("https://%s" % self.path.fqdn))
487
488
489class AHTTP(ServiceType):
490 skip_variant: ClassVar[bool] = True
491
492 def __init__(self, *args, **kwargs) -> None:
493 # Do this unconditionally, because that's the point of this class.
494 kwargs["service_manifests"] = integration_manifests.load("auth_backend")
495 super().__init__(*args, **kwargs)
496
497
498class AGRPC(ServiceType):
499 skip_variant: ClassVar[bool] = True
500
501 def __init__(self, protocol_version: str = "v3", *args, **kwargs) -> None:
502 self.protocol_version = protocol_version
503
504 # Do this unconditionally, because that's the point of this class.
505 kwargs["service_manifests"] = integration_manifests.load("grpc_auth_backend")
506 super().__init__(*args, **kwargs)
507
508 def requirements(self):
509 yield ("pod", self.path.k8s)
510
511
512class RLSGRPC(ServiceType):
513 skip_variant: ClassVar[bool] = True
514
515 def __init__(self, protocol_version: str = "v3", *args, **kwargs) -> None:
516 self.protocol_version = protocol_version
517
518 # Do this unconditionally, because that's the point of this class.
519 kwargs["service_manifests"] = integration_manifests.load("grpc_rls_backend")
520 super().__init__(*args, **kwargs)
521
522 def requirements(self):
523 yield ("pod", self.path.k8s)
524
525
526class ALSGRPC(ServiceType):
527 skip_variant: ClassVar[bool] = True
528
529 def __init__(self, *args, **kwargs) -> None:
530 # Do this unconditionally, because that's the point of this class.
531 kwargs["service_manifests"] = integration_manifests.load("grpc_als_backend")
532 super().__init__(*args, **kwargs)
533
534 def requirements(self):
535 yield ("pod", self.path.k8s)
536
537
538class HTTPBin(ServiceType):
539 skip_variant: ClassVar[bool] = True
540
541 def __init__(self, *args, **kwargs) -> None:
542 # Do this unconditionally, because that's the point of this class.
543 kwargs["service_manifests"] = integration_manifests.load("httpbin_backend")
544 super().__init__(*args, **kwargs)
545
546 def requirements(self):
547 yield ("url", Query("http://%s/status/200" % self.path.fqdn))
548
549
550class WebsocketEcho(ServiceType):
551 skip_variant: ClassVar[bool] = True
552
553 def __init__(self, *args, **kwargs) -> None:
554 # Do this unconditionally, because that's the point of this class.
555 kwargs["service_manifests"] = integration_manifests.load("websocket_echo_backend")
556 super().__init__(*args, **kwargs)
557
558 def requirements(self):
559 yield ("url", Query("http://%s/" % self.path.fqdn, expected=404))
560
561
562class StatsDSink(ServiceType):
563 skip_variant: ClassVar[bool] = True
564 target_cluster: str
565
566 def __init__(self, target_cluster: str, *args, **kwargs) -> None:
567 self.target_cluster = target_cluster
568 # Do this unconditionally, because that's the point of this class.
569 kwargs["service_manifests"] = integration_manifests.load("statsd_backend")
570 super().__init__(*args, **kwargs)
571
572 def requirements(self):
573 yield ("url", Query("http://%s/SUMMARY" % self.path.fqdn))
574
575
576@abstract_test
577class MappingTest(Test):
578
579 target: ServiceType
580 options: Sequence["OptionTest"]
581 parent: AmbassadorTest
582
583 no_local_mode = True
584 skip_local_instead_of_xfail = "Plain (MappingTest)"
585
586 def init(self, target: ServiceType, options=()) -> None:
587 self.target = target
588 self.options = list(options)
589 self.is_ambassador = True
590
591
592@abstract_test
593class OptionTest(Test):
594
595 VALUES: ClassVar[Any] = None
596 value: Any
597 parent: Test
598
599 no_local_mode = True
600 skip_local_instead_of_xfail = "Plain (OptionTests)"
601
602 @classmethod
603 def variants(cls) -> Generator[Node, None, None]:
604 if cls.VALUES is None:
605 yield cls()
606 else:
607 for val in cls.VALUES:
608 yield cls(val, name=sanitize(val))
609
610 def init(self, value=None):
611 self.value = value
View as plain text