1import base64
2import fnmatch
3import functools
4import inspect
5import json
6import os
7import subprocess
8import sys
9import threading
10import time
11import traceback
12from abc import ABC
13from collections import OrderedDict
14from hashlib import sha256
15from typing import (
16 Any,
17 Callable,
18 Dict,
19 List,
20 NamedTuple,
21 Optional,
22 Sequence,
23 Tuple,
24 Type,
25 Union,
26 cast,
27)
28
29import pytest
30import yaml as pyyaml
31from packaging import version
32from yaml.parser import ParserError as YAMLParseError
33from yaml.scanner import ScannerError as YAMLScanError
34
35import tests.integration.manifests as integration_manifests
36from ambassador.utils import parse_bool
37from multi import multi
38from tests.kubeutils import apply_kube_artifacts
39from tests.manifests import cleartext_host_manifest, default_listener_manifest
40
41from .parser import Tag, dump, load
42from .utils import ShellCommand
43
44pyyaml_loader: Any = pyyaml.SafeLoader
45pyyaml_dumper: Any = pyyaml.SafeDumper
46
47try:
48 pyyaml_loader = pyyaml.CSafeLoader
49 pyyaml_dumper = pyyaml.CSafeDumper
50except AttributeError:
51 pass
52
53# Run mode can be local (don't do any Envoy stuff), envoy (only do Envoy stuff),
54# or all (allow both). Default is all.
55RUN_MODE = os.environ.get("KAT_RUN_MODE", "all").lower()
56
57# We may have a SOURCE_ROOT override from the environment
58SOURCE_ROOT = os.environ.get("SOURCE_ROOT", "")
59
60# Figure out if we're running in Edge Stack or what.
61if os.path.exists("/buildroot/apro.version"):
62 # We let /buildroot/apro.version remain a source of truth to minimize the
63 # chances that we break anything that currently uses the builder shell.
64 EDGE_STACK = True
65else:
66 # If we do not see concrete evidence of running in an apro builder shell,
67 # then try to decide if the user wants us to assume we're running Edge Stack
68 # from an environment variable. And if that isn't set, just assume OSS.
69 EDGE_STACK = parse_bool(os.environ.get("EDGE_STACK", "false"))
70
71if EDGE_STACK:
72 # Hey look, we're running inside Edge Stack!
73 print("RUNNING IN EDGE STACK")
74 # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
75 # running in a build shell and we should look for sources in the usual location.
76 if not SOURCE_ROOT:
77 SOURCE_ROOT = "/buildroot/apro"
78 GOLD_ROOT = os.path.join(SOURCE_ROOT, "tests/pytest/gold")
79else:
80 # We're either not running in Edge Stack or we're not sure, so just assume OSS.
81 print("RUNNING IN OSS")
82 # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
83 # running in a build shell and we should look for sources in the usual location.
84 if not SOURCE_ROOT:
85 SOURCE_ROOT = "/buildroot/ambassador"
86 GOLD_ROOT = os.path.join(SOURCE_ROOT, "python/tests/gold")
87
88
89def run(cmd):
90 status = os.system(cmd)
91 if status != 0:
92 raise RuntimeError("command failed[%s]: %s" % (status, cmd))
93
94
95def kube_version_json():
96 result = subprocess.Popen(
97 "tools/bin/kubectl version -o json", stdout=subprocess.PIPE, shell=True
98 )
99 stdout, _ = result.communicate()
100 return json.loads(stdout)
101
102
103def strip_version(ver: str):
104 """
105 strip_version is needed to strip a major/minor version of non-standard symbols. For example, when working with GKE,
106 `kubectl version` returns a minor version like '14+', which is not semver or any standard version, for that matter.
107 So we handle exceptions like that here.
108 :param ver: version string
109 :return: stripped version
110 """
111
112 try:
113 return int(ver)
114 except ValueError as e:
115 # GKE returns weird versions with '+' in the end
116 if ver[-1] == "+":
117 return int(ver[:-1])
118
119 # If we still have not taken care of this, raise the error
120 raise ValueError(e)
121
122
123def kube_server_version(version_json=None):
124 if not version_json:
125 version_json = kube_version_json()
126
127 server_json = version_json.get("serverVersion", {})
128
129 if server_json:
130 server_major = strip_version(server_json.get("major", None))
131 server_minor = strip_version(server_json.get("minor", None))
132
133 return f"{server_major}.{server_minor}"
134 else:
135 return None
136
137
138def kube_client_version(version_json=None):
139 if not version_json:
140 version_json = kube_version_json()
141
142 client_json = version_json.get("clientVersion", {})
143
144 if client_json:
145 client_major = strip_version(client_json.get("major", None))
146 client_minor = strip_version(client_json.get("minor", None))
147
148 return f"{client_major}.{client_minor}"
149 else:
150 return None
151
152
153def is_kube_server_client_compatible(
154 debug_desc: str, requested_server_version: str, requested_client_version: str
155) -> bool:
156 is_cluster_compatible = True
157 kube_json = kube_version_json()
158
159 server_version = kube_server_version(kube_json)
160 client_version = kube_client_version(kube_json)
161
162 if server_version:
163 if version.parse(server_version) < version.parse(requested_server_version):
164 print(f"server version {server_version} is incompatible with {debug_desc}")
165 is_cluster_compatible = False
166 else:
167 print(f"server version {server_version} is compatible with {debug_desc}")
168 else:
169 print("could not determine Kubernetes server version?")
170
171 if client_version:
172 if version.parse(client_version) < version.parse(requested_client_version):
173 print(f"client version {client_version} is incompatible with {debug_desc}")
174 is_cluster_compatible = False
175 else:
176 print(f"client version {client_version} is compatible with {debug_desc}")
177 else:
178 print("could not determine Kubernetes client version?")
179
180 return is_cluster_compatible
181
182
183def is_ingress_class_compatible() -> bool:
184 return is_kube_server_client_compatible("IngressClass", "1.18", "1.14")
185
186
187def is_knative_compatible() -> bool:
188 # Skip KNative immediately for run_mode local.
189 if RUN_MODE == "local":
190 return False
191
192 return is_kube_server_client_compatible("Knative", "1.14", "1.14")
193
194
195def get_digest(data: str) -> str:
196 s = sha256()
197 s.update(data.encode("utf-8"))
198 return s.hexdigest()
199
200
201def has_changed(data: str, path: str) -> Tuple[bool, str]:
202 cur_size = len(data.strip()) if data else 0
203 cur_hash = get_digest(data)
204
205 # print(f'has_changed: data size {cur_size} - {cur_hash}')
206
207 prev_data = None
208 changed = True
209 reason = f"no {path} present"
210
211 if os.path.exists(path):
212 with open(path) as f:
213 prev_data = f.read()
214
215 prev_size = len(prev_data.strip()) if prev_data else 0
216 prev_hash = None
217
218 if prev_data:
219 prev_hash = get_digest(prev_data)
220
221 # print(f'has_changed: prev_data size {prev_size} - {prev_hash}')
222
223 if data:
224 if data != prev_data:
225 reason = f"different data in {path}"
226 else:
227 changed = False
228 reason = f"same data in {path}"
229
230 if changed:
231 # print(f'has_changed: updating {path}')
232 with open(path, "w") as f:
233 f.write(data)
234
235 # For now, we always have to reapply with split testing.
236 if not changed:
237 changed = True
238 reason = "always reapply for split test"
239
240 return (changed, reason)
241
242
243COUNTERS: Dict[Type, int] = {}
244
245SANITIZATIONS = OrderedDict(
246 (
247 ("://", "SCHEME"),
248 (":", "COLON"),
249 (" ", "SPACE"),
250 ("/t", "TAB"),
251 (".", "DOT"),
252 ("?", "QMARK"),
253 ("/", "SLASH"),
254 )
255)
256
257
258def sanitize(obj):
259 if isinstance(obj, str):
260 for k, v in SANITIZATIONS.items():
261 if obj.startswith(k):
262 obj = obj.replace(k, v + "-")
263 elif obj.endswith(k):
264 obj = obj.replace(k, "-" + v)
265 else:
266 obj = obj.replace(k, "-" + v + "-")
267 return obj
268 elif isinstance(obj, dict):
269 if "value" in obj:
270 return obj["value"]
271 else:
272 return "-".join("%s-%s" % (sanitize(k), sanitize(v)) for k, v in sorted(obj.items()))
273 else:
274 cls = obj.__class__
275 count = COUNTERS.get(cls, 0)
276 COUNTERS[cls] = count + 1
277 if count == 0:
278 return cls.__name__
279 else:
280 return "%s-%s" % (cls.__name__, count)
281
282
283def abstract_test(cls: type):
284 cls.abstract_test = True # type: ignore
285 return cls
286
287
288def get_nodes(node_type: type):
289 if not inspect.isabstract(node_type) and not node_type.__dict__.get("abstract_test", False):
290 yield node_type
291 for sc in node_type.__subclasses__():
292 if not sc.__dict__.get("skip_variant", False):
293 for ssc in get_nodes(sc):
294 yield ssc
295
296
297def variants(cls, *args, **kwargs) -> Tuple[Any]:
298 return tuple(a for n in get_nodes(cls) for a in n.variants(*args, **kwargs)) # type: ignore
299
300
301class Name(NamedTuple):
302 name: str
303 namespace: str
304
305 @property
306 def k8s(self) -> str:
307 return self.name.replace(".", "-").lower()
308
309 @property
310 def fqdn(self) -> str:
311 return ".".join([self.k8s, self.namespace, "svc", "cluster", "local"])
312
313
314class NodeLocal(threading.local):
315 def __init__(self):
316 self.current = None
317
318
319_local = NodeLocal()
320
321
322def _argprocess(o):
323 if isinstance(o, Node):
324 return o.clone()
325 elif isinstance(o, tuple):
326 return tuple(_argprocess(i) for i in o)
327 elif isinstance(o, list):
328 return [_argprocess(i) for i in o]
329 elif isinstance(o, dict):
330 return {_argprocess(k): _argprocess(v) for k, v in o.items()}
331 else:
332 return o
333
334
335class Node(ABC):
336
337 parent: "Node"
338 children: List["Node"]
339 name: str
340 ambassador_id: str
341 namespace: str
342 is_ambassador = False
343 local_result: Optional[Dict[str, str]] = None
344 xfail: Optional[str]
345
346 def __init__(
347 self,
348 *args,
349 name: Optional[str] = None,
350 namespace: Optional[str] = None,
351 _clone: Optional["Node"] = None,
352 **kwargs,
353 ) -> None:
354 # If self.skip is set to true, this node is skipped
355 self.skip_node = False
356 self.xfail = None
357
358 if _clone:
359 args = _clone._args # type: ignore
360 kwargs = _clone._kwargs # type: ignore
361 if name:
362 name = "-".join((_clone.name, name))
363 else:
364 name = _clone.name
365 self._args = _clone._args # type: ignore
366 self._kwargs = _clone._kwargs # type: ignore
367 else:
368 self._args = args
369 self._kwargs = kwargs
370 if name:
371 name = "-".join((self.__class__.__name__, name))
372 else:
373 name = self.__class__.__name__
374
375 saved = _local.current
376 self.parent = _local.current
377
378 if namespace:
379 self.namespace = namespace
380 if not getattr(self, "namespace", ""):
381 # We do the above `getattr` instead of just an `else` because subclasses might set a
382 # default namespace; so `self.namespace` might already be set before calling __init__().
383 if self.parent and self.parent.namespace:
384 # We have no namespace assigned, but our parent does have a namespace
385 # defined. Copy the namespace down from our parent.
386 self.namespace = self.parent.namespace
387 else:
388 self.namespace = "default"
389
390 _local.current = self
391 self.children = []
392 if self.parent is not None:
393 self.parent.children.append(self)
394 try:
395 init = getattr(self, "init", lambda *a, **kw: None)
396 init(*_argprocess(args), **_argprocess(kwargs))
397 finally:
398 _local.current = saved
399
400 # This has to come after the above to init(), because the format-string might reference
401 # things that get set by init().
402 self.name = self.format(name)
403
404 names = {} # type: ignore
405 for c in self.children:
406 assert (
407 c.name not in names
408 ), "test %s of type %s has duplicate children: %s of type %s, %s" % (
409 self.name,
410 self.__class__.__name__,
411 c.name,
412 c.__class__.__name__,
413 names[c.name].__class__.__name__,
414 )
415 names[c.name] = c
416
417 def clone(self, name=None):
418 return self.__class__(_clone=self, name=name)
419
420 def find_local_result(self, stop_at_first_ambassador: bool = False) -> Optional[Dict[str, str]]:
421 test_name = self.format("{self.path.k8s}")
422
423 # print(f"{test_name} {type(self)} FIND_LOCAL_RESULT")
424
425 end_result: Optional[Dict[str, str]] = None
426
427 n: Optional[Node] = self
428
429 while n:
430 node_name = n.format("{self.path.k8s}")
431 parent = n.parent
432 parent_name = parent.format("{self.path.k8s}") if parent else "-none-"
433
434 end_result = getattr(n, "local_result", None)
435 result_str = end_result["result"] if end_result else "-none-"
436 # print(f"{test_name}: {'ambassador' if n.is_ambassador else 'node'} {node_name}, parent {parent_name}, local_result = {result_str}")
437
438 if end_result is not None:
439 break
440
441 if n.is_ambassador and stop_at_first_ambassador:
442 # This is an Ambassador: don't continue past it.
443 break
444
445 n = n.parent
446
447 return end_result
448
449 def check_local(self, gold_root: str, k8s_yaml_path: str) -> Tuple[bool, bool]:
450 testname = self.format("{self.path.k8s}")
451
452 if self.xfail:
453 # XFail early -- but still return True, True so that we don't try to run Envoy on it.
454 self.local_result = {"result": "xfail", "reason": self.xfail}
455 # print(f"==== XFAIL: {testname} local: {self.xfail}")
456 return (True, True)
457
458 if not self.is_ambassador:
459 # print(f"{testname} ({type(self)}) is not an Ambassador")
460 return (False, False)
461
462 if not self.ambassador_id:
463 print(f"{testname} ({type(self)}) is an Ambassador but has no ambassador_id?")
464 return (False, False)
465
466 ambassador_namespace = getattr(self, "namespace", "default")
467 ambassador_single_namespace = getattr(self, "single_namespace", False)
468
469 no_local_mode: bool = getattr(self, "no_local_mode", False)
470 skip_local_reason: Optional[str] = getattr(self, "skip_local_instead_of_xfail", None)
471
472 # print(f"{testname}: ns {ambassador_namespace} ({'single' if ambassador_single_namespace else 'multi'})")
473
474 gold_path = os.path.join(gold_root, testname)
475
476 if os.path.isdir(gold_path) and not no_local_mode:
477 # print(f"==== {testname} running locally from {gold_path}")
478
479 # Yeah, I know, brutal hack.
480 #
481 # XXX (Flynn) This code isn't used and we don't know if it works. If you try
482 # it, bring it up-to-date with the environment created in abstract_tests.py
483 envstuff = ["env", f"AMBASSADOR_NAMESPACE={ambassador_namespace}"]
484
485 cmd = [
486 "mockery",
487 "--debug",
488 k8s_yaml_path,
489 "-w",
490 "python /ambassador/watch_hook.py",
491 "--kat",
492 self.ambassador_id,
493 "--diff",
494 gold_path,
495 ]
496
497 if ambassador_single_namespace:
498 envstuff.append("AMBASSADOR_SINGLE_NAMESPACE=yes")
499 cmd += ["-n", ambassador_namespace]
500
501 if not getattr(self, "allow_edge_stack_redirect", False):
502 envstuff.append("AMBASSADOR_NO_TLS_REDIRECT=yes")
503
504 cmd = envstuff + cmd
505
506 w = ShellCommand(*cmd)
507
508 if w.status():
509 print(f"==== GOOD: {testname} local against {gold_path}")
510 self.local_result = {"result": "pass"}
511 else:
512 print(f"==== FAIL: {testname} local against {gold_path}")
513
514 self.local_result = {"result": "fail", "stdout": w.stdout, "stderr": w.stderr}
515
516 return (True, True)
517 else:
518 # If we have a local reason, has a parent already subsumed us?
519 #
520 # XXX The way KAT works, our parent will have always run earlier than us, so
521 # it's not clear if we can ever not have been subsumed.
522
523 if skip_local_reason:
524 local_result = self.find_local_result()
525
526 if local_result:
527 self.local_result = {
528 "result": "skip",
529 "reason": f"subsumed by {skip_local_reason} -- {local_result['result']}",
530 }
531 # print(f"==== {self.local_result['result'].upper()} {testname} {self.local_result['reason']}")
532 return (True, True)
533
534 # OK, we weren't already subsumed. If we're in local mode, we'll skip or xfail
535 # depending on skip_local_reason.
536
537 if RUN_MODE == "local":
538 if skip_local_reason:
539 self.local_result = {
540 "result": "skip",
541 # 'reason': f"subsumed by {skip_local_reason} without result in local mode"
542 }
543 print(
544 f"==== {self.local_result['result'].upper()} {testname} {self.local_result['reason']}"
545 )
546 return (True, True)
547 else:
548 # XFail -- but still return True, True so that we don't try to run Envoy on it.
549 self.local_result = {
550 "result": "xfail",
551 "reason": f"missing local cache {gold_path}",
552 }
553 # print(f"==== {self.local_result['result'].upper()} {testname} {self.local_result['reason']}")
554 return (True, True)
555
556 # If here, we're not in local mode. Allow Envoy to run.
557 self.local_result = None
558 # print(f"==== IGNORE {testname} no local cache")
559 return (True, False)
560
561 def has_local_result(self) -> bool:
562 return bool(self.local_result)
563
564 @classmethod
565 def variants(cls):
566 yield cls()
567
568 @property
569 def path(self) -> Name:
570 if self.parent is None:
571 return Name(self.name, self.namespace)
572 else:
573 return Name(self.parent.path.name + "." + self.name, self.namespace)
574
575 @property
576 def traversal(self):
577 yield self
578 for c in self.children:
579 for d in c.traversal:
580 yield d
581
582 @property
583 def ancestors(self):
584 yield self
585 if self.parent is not None:
586 for a in self.parent.ancestors:
587 yield a
588
589 @property
590 def depth(self):
591 if self.parent is None:
592 return 0
593 else:
594 return self.parent.depth + 1
595
596 def format(self, st, **kwargs):
597 return integration_manifests.format(st, self=self, **kwargs)
598
599 @functools.lru_cache()
600 def matches(self, pattern):
601 if fnmatch.fnmatch(self.path.name, "*%s*" % pattern):
602 return True
603 for c in self.children:
604 if c.matches(pattern):
605 return True
606 return False
607
608 def requirements(self):
609 yield from ()
610
611 # log_kube_artifacts writes various logs about our underlying Kubernetes objects to
612 # a place where the artifact publisher can find them. See run-tests.sh.
613 def log_kube_artifacts(self):
614 if not getattr(self, "already_logged", False):
615 self.already_logged = True
616
617 print(f"logging kube artifacts for {self.path.k8s}")
618 sys.stdout.flush()
619
620 DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
621
622 log_path = f"/tmp/kat-logs-{self.path.k8s}"
623
624 if DEV:
625 os.system(f"docker logs {self.path.k8s} >{log_path} 2>&1")
626 else:
627 os.system(
628 f"tools/bin/kubectl logs -n {self.namespace} {self.path.k8s} >{log_path} 2>&1"
629 )
630
631 event_path = f"/tmp/kat-events-{self.path.k8s}"
632
633 fs1 = f"involvedObject.name={self.path.k8s}"
634 fs2 = f"involvedObject.namespace={self.namespace}"
635
636 cmd = f'tools/bin/kubectl get events -o json --field-selector "{fs1}" --field-selector "{fs2}"'
637 os.system(f'echo ==== "{cmd}" >{event_path}')
638 os.system(f"{cmd} >>{event_path} 2>&1")
639
640
641class Test(Node):
642
643 results: Sequence["Result"]
644
645 __test__ = False
646
647 def config(self):
648 yield from ()
649
650 def manifests(self):
651 return None
652
653 def queries(self):
654 yield from ()
655
656 def check(self):
657 pass
658
659 def handle_local_result(self) -> bool:
660 test_name = self.format("{self.path.k8s}")
661
662 # print(f"{test_name} {type(self)} HANDLE_LOCAL_RESULT")
663
664 end_result = self.find_local_result()
665
666 if end_result is not None:
667 result_type = end_result["result"]
668
669 if result_type == "pass":
670 pass
671 elif result_type == "skip":
672 pytest.skip(end_result["reason"])
673 elif result_type == "fail":
674 sys.stdout.write(end_result["stdout"])
675
676 if os.environ.get("KAT_VERBOSE", None):
677 sys.stderr.write(end_result["stderr"])
678
679 pytest.fail("local check failed")
680 elif result_type == "xfail":
681 pytest.xfail(end_result["reason"])
682
683 return True
684
685 return False
686
687 @property
688 def ambassador_id(self):
689 if self.parent is None:
690 return self.path.k8s
691 else:
692 return self.parent.ambassador_id
693
694
695@multi
696def encode_body(obj):
697 yield type(obj)
698
699
700@encode_body.when(bytes) # type: ignore
701def encode_body(b):
702 return base64.encodebytes(b).decode("utf-8")
703
704
705@encode_body.when(str) # type: ignore
706def encode_body(s):
707 return encode_body(s.encode("utf-8"))
708
709
710@encode_body.default # type: ignore
711def encode_body(obj):
712 return encode_body(json.dumps(obj))
713
714
715class Query:
716 def __init__(
717 self,
718 url,
719 expected=None,
720 method="GET",
721 headers=None,
722 messages=None,
723 insecure=False,
724 skip=None,
725 xfail=None,
726 phase=1,
727 debug=False,
728 sni=False,
729 error=None,
730 client_crt=None,
731 client_key=None,
732 client_cert_required=False,
733 ca_cert=None,
734 grpc_type=None,
735 cookies=None,
736 ignore_result=False,
737 body=None,
738 minTLSv="",
739 maxTLSv="",
740 cipherSuites=[],
741 ecdhCurves=[],
742 ):
743 self.method = method
744 self.url = url
745 self.headers = headers
746 self.body = body
747 self.cookies = cookies
748 self.messages = messages
749 self.insecure = insecure
750 self.minTLSv = minTLSv
751 self.maxTLSv = maxTLSv
752 self.cipherSuites = cipherSuites
753 self.ecdhCurves = ecdhCurves
754 if expected is None:
755 if url.lower().startswith("ws:"):
756 self.expected = 101
757 else:
758 self.expected = 200
759 else:
760 self.expected = expected
761 self.skip = skip
762 self.xfail = xfail
763 self.ignore_result = ignore_result
764 self.phase = phase
765 self.parent = None
766 self.result = None
767 self.debug = debug
768 self.sni = sni
769 self.error = error
770 self.client_cert_required = client_cert_required
771 self.client_cert = client_crt
772 self.client_key = client_key
773 self.ca_cert = ca_cert
774 assert grpc_type in (None, "real", "bridge", "web"), grpc_type
775 self.grpc_type = grpc_type
776
777 def as_json(self):
778 result = {
779 "test": self.parent.path.name,
780 "id": id(self),
781 "url": self.url,
782 "insecure": self.insecure,
783 }
784 if self.sni:
785 result["sni"] = self.sni
786 if self.method:
787 result["method"] = self.method
788 if self.method:
789 result["maxTLSv"] = self.maxTLSv
790 if self.method:
791 result["minTLSv"] = self.minTLSv
792 if self.cipherSuites:
793 result["cipherSuites"] = self.cipherSuites
794 if self.ecdhCurves:
795 result["ecdhCurves"] = self.ecdhCurves
796 if self.headers:
797 result["headers"] = self.headers
798 if self.body is not None:
799 result["body"] = encode_body(self.body)
800 if self.cookies:
801 result["cookies"] = self.cookies
802 if self.messages is not None:
803 result["messages"] = self.messages
804 if self.client_cert is not None:
805 result["client_cert"] = self.client_cert
806 if self.client_key is not None:
807 result["client_key"] = self.client_key
808 if self.ca_cert is not None:
809 result["ca_cert"] = self.ca_cert
810 if self.client_cert_required:
811 result["client_cert_required"] = self.client_cert_required
812 if self.grpc_type:
813 result["grpc_type"] = self.grpc_type
814
815 return result
816
817
818class Result:
819 def __init__(self, query, res):
820 self.query = query
821 query.result = self
822 self.parent = query.parent
823 self.status = res.get("status")
824 self.headers = res.get("headers")
825 self.messages = res.get("messages")
826 self.tls = res.get("tls")
827 if "body" in res:
828 self.body = base64.decodebytes(bytes(res["body"], "ASCII"))
829 else:
830 self.body = None
831 self.text = res.get("text")
832 self.json = res.get("json")
833 self.backend = BackendResult(self.json) if self.json else None
834 self.error = res.get("error")
835
836 def __repr__(self):
837 return str(self.as_dict())
838
839 def check(self):
840 if self.query.skip:
841 pytest.skip(self.query.skip)
842
843 if self.query.xfail:
844 pytest.xfail(self.query.xfail)
845
846 if not self.query.ignore_result:
847 if self.query.error is not None:
848 found = False
849 errors = self.query.error
850
851 if isinstance(self.query.error, str):
852 errors = [self.query.error]
853
854 if self.error is not None:
855 for error in errors:
856 if error in self.error:
857 found = True
858 break
859
860 assert found, "{}: expected error to contain any of {}; got {} instead".format(
861 self.query.url,
862 ", ".join(["'%s'" % x for x in errors]),
863 ("'%s'" % self.error) if self.error else "no error",
864 )
865 else:
866 if self.query.expected != self.status:
867 self.parent.log_kube_artifacts()
868
869 assert (
870 self.query.expected == self.status
871 ), "%s: expected status code %s, got %s instead with error %s" % (
872 self.query.url,
873 self.query.expected,
874 self.status,
875 self.error,
876 )
877
878 def as_dict(self) -> Dict[str, Any]:
879 od = {
880 "query": self.query.as_json(),
881 "status": self.status,
882 "error": self.error,
883 "headers": self.headers,
884 }
885
886 if self.backend and self.backend.name:
887 od["backend"] = self.backend.as_dict()
888 else:
889 od["json"] = self.json
890 od["text"] = self.text
891
892 return od
893
894 # 'RENDERED': {
895 # 'client': {
896 # 'request': self.query.as_json(),
897 # 'response': {
898 # 'status': self.status,
899 # 'error': self.error,
900 # 'headers': self.headers
901 # }
902 # },
903 # 'upstream': {
904 # 'name': self.backend.name,
905 # 'request': {
906 # 'headers': self.backend.request.headers,
907 # 'url': {
908 # 'fragment': self.backend.request.url.fragment,
909 # 'host': self.backend.request.url.host,
910 # 'opaque': self.backend.request.url.opaque,
911 # 'path': self.backend.request.url.path,
912 # 'query': self.backend.request.url.query,
913 # 'rawQuery': self.backend.request.url.rawQuery,
914 # 'scheme': self.backend.request.url.scheme,
915 # 'username': self.backend.request.url.username,
916 # 'password': self.backend.request.url.password,
917 # },
918 # 'host': self.backend.request.host,
919 # 'tls': {
920 # 'enabled': self.backend.request.tls.enabled,
921 # 'server_name': self.backend.request.tls.server_name,
922 # 'version': self.backend.request.tls.version,
923 # 'negotiated_protocol': self.backend.request.tls.negotiated_protocol,
924 # },
925 # },
926 # 'response': {
927 # 'headers': self.backend.response.headers
928 # }
929 # }
930 # }
931
932
933class BackendURL:
934 def __init__(
935 self,
936 fragment=None,
937 host=None,
938 opaque=None,
939 path=None,
940 query=None,
941 rawQuery=None,
942 scheme=None,
943 username=None,
944 password=None,
945 ):
946 self.fragment = fragment
947 self.host = host
948 self.opaque = opaque
949 self.path = path
950 self.query = query
951 self.rawQuery = rawQuery
952 self.scheme = scheme
953 self.username = username
954 self.password = password
955
956 def as_dict(self) -> Dict["str", Any]:
957 return {
958 "fragment": self.fragment,
959 "host": self.host,
960 "opaque": self.opaque,
961 "path": self.path,
962 "query": self.query,
963 "rawQuery": self.rawQuery,
964 "scheme": self.scheme,
965 "username": self.username,
966 "password": self.password,
967 }
968
969
970class BackendRequest:
971 def __init__(self, req):
972 self.url = BackendURL(**req.get("url"))
973 self.headers = req.get("headers", {})
974 self.host = req.get("host", None)
975 self.tls = BackendTLS(req.get("tls", {}))
976
977 def as_dict(self) -> Dict[str, Any]:
978 od = {
979 "headers": self.headers,
980 "host": self.host,
981 }
982
983 if self.url:
984 od["url"] = self.url.as_dict()
985
986 if self.tls:
987 od["tls"] = self.tls.as_dict()
988
989 return od
990
991
992class BackendTLS:
993 def __init__(self, tls):
994 self.enabled = tls["enabled"]
995 self.server_name = tls.get("server-name")
996 self.version = tls.get("version")
997 self.negotiated_protocol = tls.get("negotiated-protocol")
998 self.negotiated_protocol_version = tls.get("negotiated-protocol-version")
999
1000 def as_dict(self) -> Dict[str, Any]:
1001 return {
1002 "enabled": self.enabled,
1003 "server_name": self.server_name,
1004 "version": self.version,
1005 "negotiated_protocol": self.negotiated_protocol,
1006 "negotiated_protocol_version": self.negotiated_protocol_version,
1007 }
1008
1009
1010class BackendResponse:
1011 def __init__(self, resp):
1012 self.headers = resp.get("headers", {})
1013
1014 def as_dict(self) -> Dict[str, Any]:
1015 return {"headers": self.headers}
1016
1017
1018def dictify(obj):
1019 if getattr(obj, "as_dict", None):
1020 return obj.as_dict()
1021 else:
1022 return obj
1023
1024
1025class BackendResult:
1026 def __init__(self, bres):
1027 self.name = "raw"
1028 self.request = None
1029 self.response = bres
1030
1031 if isinstance(bres, dict):
1032 self.name = bres.get("backend")
1033 self.request = BackendRequest(bres["request"]) if "request" in bres else None
1034 self.response = BackendResponse(bres["response"]) if "response" in bres else None
1035
1036 def as_dict(self) -> Dict[str, Any]:
1037 od = {"name": self.name}
1038
1039 if self.request:
1040 od["request"] = dictify(self.request)
1041
1042 if self.response:
1043 od["response"] = dictify(self.response)
1044
1045 return od
1046
1047
1048def label(yaml, scope):
1049 for obj in yaml:
1050 md = obj["metadata"]
1051
1052 if "labels" not in md:
1053 md["labels"] = {}
1054
1055 obj["metadata"]["labels"]["scope"] = scope
1056 return yaml
1057
1058
1059CLIENT_GO = "kat_client"
1060
1061
1062def run_queries(name: str, queries: Sequence[Query]) -> Sequence[Result]:
1063 jsonified = []
1064 byid = {}
1065
1066 for q in queries:
1067 jsonified.append(q.as_json())
1068 byid[id(q)] = q
1069
1070 path_urls = f"/tmp/kat-client-{name}-urls.json"
1071 path_results = f"/tmp/kat-client-{name}-results.json"
1072 path_log = f"/tmp/kat-client-{name}.log"
1073
1074 with open(path_urls, "w") as f:
1075 json.dump(jsonified, f)
1076
1077 # run(f"{CLIENT_GO} -input {path_urls} -output {path_results} 2> {path_log}")
1078 res = ShellCommand.run(
1079 "Running queries",
1080 f"tools/bin/kubectl exec -n default -i kat /work/kat_client < '{path_urls}' > '{path_results}' 2> '{path_log}'",
1081 shell=True,
1082 )
1083
1084 if not res:
1085 ret = [Result(q, {"error": "Command execution error"}) for q in queries]
1086 return ret
1087
1088 with open(path_results, "r") as f:
1089 content = f.read()
1090 try:
1091 json_results = json.loads(content)
1092 except Exception as e:
1093 ret = [
1094 Result(q, {"error": "Could not parse JSON content after running KAT queries"})
1095 for q in queries
1096 ]
1097 return ret
1098
1099 results = []
1100
1101 for r in json_results:
1102 res = r["result"]
1103 q = byid[r["id"]]
1104 results.append(Result(q, res))
1105
1106 return results
1107
1108
1109# yuck
1110DOCTEST = False
1111
1112
1113class Superpod:
1114 def __init__(self, namespace: str) -> None:
1115 self.namespace = namespace
1116 self.next_clear = 8080
1117 self.next_tls = 8443
1118 self.service_names: Dict[int, str] = {}
1119 self.name = "superpod-%s" % (self.namespace or "default")
1120
1121 def allocate(self, service_name) -> List[int]:
1122 ports = [self.next_clear, self.next_tls]
1123 self.service_names[self.next_clear] = service_name
1124 self.service_names[self.next_tls] = service_name
1125
1126 self.next_clear += 1
1127 self.next_tls += 1
1128
1129 return ports
1130
1131 def get_manifest_list(self) -> List[Dict[str, Any]]:
1132 manifest = load(
1133 "superpod",
1134 integration_manifests.format(integration_manifests.load("superpod_pod")),
1135 Tag.MAPPING,
1136 )
1137
1138 assert len(manifest) == 1, "SUPERPOD manifest must have exactly one object"
1139
1140 m = manifest[0]
1141
1142 template = m["spec"]["template"]
1143
1144 ports: List[Dict[str, int]] = []
1145 envs: List[Dict[str, Union[str, int]]] = template["spec"]["containers"][0]["env"]
1146
1147 for p in sorted(self.service_names.keys()):
1148 ports.append({"containerPort": p})
1149 envs.append({"name": f"BACKEND_{p}", "value": self.service_names[p]})
1150
1151 template["spec"]["containers"][0]["ports"] = ports
1152
1153 if "metadata" not in m:
1154 m["metadata"] = {}
1155
1156 metadata = m["metadata"]
1157 metadata["name"] = self.name
1158
1159 m["spec"]["selector"]["matchLabels"]["backend"] = self.name
1160 template["metadata"]["labels"]["backend"] = self.name
1161
1162 if self.namespace:
1163 # Fix up the namespace.
1164 if "namespace" not in metadata:
1165 metadata["namespace"] = self.namespace
1166
1167 return list(manifest)
1168
1169
1170class Runner:
1171 def __init__(self, *classes, scope=None):
1172 self.scope = scope or "-".join(c.__name__ for c in classes)
1173 self.roots = tuple(v for c in classes for v in variants(c))
1174 self.nodes = [n for r in self.roots for n in r.traversal if not n.skip_node]
1175 self.tests = [n for n in self.nodes if isinstance(n, Test)]
1176 self.ids = [t.path.name for t in self.tests]
1177 self.done = False
1178 self.skip_nonlocal_tests = False
1179 self.ids_to_strip: Dict[str, bool] = {}
1180 self.names_to_ignore: Dict[str, bool] = {}
1181
1182 @pytest.mark.parametrize("t", self.tests, ids=self.ids)
1183 def test(request, capsys, t):
1184 if t.xfail:
1185 pytest.xfail(t.xfail)
1186 else:
1187 selected = set(
1188 item.callspec.getparam("t")
1189 for item in request.session.items
1190 if item.function == test
1191 )
1192
1193 with capsys.disabled():
1194 self.setup(selected)
1195
1196 if not t.handle_local_result():
1197 # XXX: should aggregate the result of url checks
1198 i = 0
1199 for r in t.results:
1200 try:
1201 r.check()
1202 except AssertionError as e:
1203 # Add some context so that you can tell which query is failing.
1204 e.args = (f"query[{i}]: {e.args[0]}", *e.args[1:])
1205 raise
1206 i += 1
1207
1208 t.check()
1209
1210 self.__func__ = test
1211 self.__test__ = True
1212
1213 def __call__(self):
1214 assert False, "this is here for py.test discovery purposes only"
1215
1216 def setup(self, selected):
1217 if not self.done:
1218 if not DOCTEST:
1219 print()
1220
1221 expanded_up = set(selected)
1222
1223 for s in selected:
1224 for n in s.ancestors:
1225 if not n.xfail:
1226 expanded_up.add(n)
1227
1228 expanded = set(expanded_up)
1229
1230 for s in selected:
1231 for n in s.traversal:
1232 if not n.xfail:
1233 expanded.add(n)
1234
1235 try:
1236 self._setup_k8s(expanded)
1237
1238 if self.skip_nonlocal_tests:
1239 self.done = True
1240 return
1241
1242 for t in self.tests:
1243 if t.has_local_result():
1244 # print(f"{t.name}: SKIP due to local result")
1245 continue
1246
1247 if t in expanded_up:
1248 pre_query: Callable = getattr(t, "pre_query", None)
1249
1250 if pre_query:
1251 pre_query()
1252
1253 self._query(expanded_up)
1254 except:
1255 traceback.print_exc()
1256 pytest.exit("setup failed")
1257 finally:
1258 self.done = True
1259
1260 def get_manifests_and_namespaces(self, selected) -> Tuple[Any, List[str]]:
1261 manifests: OrderedDict[Any, list] = OrderedDict() # type: ignore
1262 superpods: Dict[str, Superpod] = {}
1263 namespaces = []
1264 for n in (n for n in self.nodes if n in selected and not n.xfail):
1265 manifest = None
1266 nsp = None
1267 ambassador_id = None
1268
1269 # print('manifesting for {n.path}')
1270
1271 # Walk up the parent chain to find our namespace and ambassador_id.
1272 cur = n
1273
1274 while cur:
1275 if not nsp:
1276 nsp = getattr(cur, "namespace", None)
1277 # print(f'... {cur.name} has namespace {nsp}')
1278
1279 if not ambassador_id:
1280 ambassador_id = getattr(cur, "ambassador_id", None)
1281 # print(f'... {cur.name} has ambassador_id {ambassador_id}')
1282
1283 if nsp and ambassador_id:
1284 # print(f'... good for namespace and ambassador_id')
1285 break
1286
1287 cur = cur.parent
1288
1289 # OK. Does this node want to use a superpod?
1290 if getattr(n, "use_superpod", False):
1291 # Yup. OK. Do we already have a superpod for this namespace?
1292 superpod = superpods.get(nsp, None) # type: ignore
1293
1294 if not superpod:
1295 # We don't have one, so we need to create one.
1296 superpod = Superpod(nsp) # type: ignore
1297 superpods[nsp] = superpod # type: ignore
1298
1299 # print(f'superpodifying {n.name}')
1300
1301 # Next up: use the backend_service.yaml manifest as a template...
1302 yaml = n.format(integration_manifests.load("backend_service"))
1303 manifest = load(n.path, yaml, Tag.MAPPING)
1304
1305 assert (
1306 len(manifest) == 1
1307 ), "backend_service.yaml manifest must have exactly one object"
1308
1309 m = manifest[0]
1310
1311 # Update the manifest's selector...
1312 m["spec"]["selector"]["backend"] = superpod.name
1313
1314 # ...and labels if needed...
1315 if ambassador_id:
1316 m["metadata"]["labels"] = {"kat-ambassador-id": ambassador_id}
1317
1318 # ...and target ports.
1319 superpod_ports = superpod.allocate(n.path.k8s)
1320
1321 m["spec"]["ports"][0]["targetPort"] = superpod_ports[0]
1322 m["spec"]["ports"][1]["targetPort"] = superpod_ports[1]
1323 else:
1324 # The non-superpod case...
1325 yaml = n.manifests()
1326
1327 if yaml is not None:
1328 is_plain_test = n.path.k8s.startswith("plain-")
1329
1330 if n.is_ambassador and not is_plain_test:
1331 add_default_http_listener = getattr(n, "add_default_http_listener", True)
1332 add_default_https_listener = getattr(n, "add_default_https_listener", True)
1333 add_cleartext_host = getattr(n, "edge_stack_cleartext_host", False)
1334
1335 if add_default_http_listener:
1336 # print(f"{n.path.k8s} adding default HTTP Listener")
1337 yaml += default_listener_manifest % {
1338 "namespace": nsp,
1339 "port": 8080,
1340 "protocol": "HTTPS",
1341 "securityModel": "XFP",
1342 }
1343
1344 if add_default_https_listener:
1345 # print(f"{n.path.k8s} adding default HTTPS Listener")
1346 yaml += default_listener_manifest % {
1347 "namespace": nsp,
1348 "port": 8443,
1349 "protocol": "HTTPS",
1350 "securityModel": "XFP",
1351 }
1352
1353 if EDGE_STACK and add_cleartext_host:
1354 # print(f"{n.path.k8s} adding Host")
1355
1356 host_yaml = cleartext_host_manifest % nsp
1357 yaml += host_yaml
1358
1359 yaml = n.format(yaml)
1360
1361 try:
1362 manifest = load(n.path, yaml, Tag.MAPPING)
1363 except Exception as e:
1364 print(f"parse failure! {e}")
1365 print(yaml)
1366
1367 if manifest:
1368 # print(manifest)
1369
1370 # Make sure namespaces and labels are properly set.
1371 for m in manifest:
1372 if "metadata" not in m:
1373 m["metadata"] = {}
1374
1375 metadata = m["metadata"]
1376
1377 if "labels" not in metadata:
1378 metadata["labels"] = {}
1379
1380 if ambassador_id:
1381 metadata["labels"]["kat-ambassador-id"] = ambassador_id
1382
1383 if nsp:
1384 if "namespace" not in metadata:
1385 metadata["namespace"] = nsp
1386
1387 # ...and, finally, save the manifest list.
1388 manifests[n] = list(manifest)
1389 if str(nsp) not in namespaces:
1390 namespaces.append(str(nsp))
1391
1392 for superpod in superpods.values():
1393 manifests[superpod] = superpod.get_manifest_list()
1394
1395 return manifests, namespaces
1396
1397 def do_local_checks(self, selected, fname) -> bool:
1398 if RUN_MODE == "envoy":
1399 print("Local mode not allowed, continuing to Envoy mode")
1400 return False
1401
1402 all_valid = True
1403 self.ids_to_strip = {}
1404 # This feels a bit wrong?
1405 self.names_to_ignore = {}
1406
1407 for n in (n for n in self.nodes if n in selected):
1408 local_possible, local_checked = n.check_local(GOLD_ROOT, fname)
1409
1410 if local_possible:
1411 if local_checked:
1412 self.ids_to_strip[n.ambassador_id] = True
1413 else:
1414 all_valid = False
1415
1416 return all_valid
1417
1418 def _setup_k8s(self, selected):
1419 # First up, get the full manifest and save it to disk.
1420 manifests, namespaces = self.get_manifests_and_namespaces(selected)
1421
1422 configs = OrderedDict()
1423 for n in (n for n in self.nodes if n in selected and not n.xfail):
1424 configs[n] = []
1425 for cfg in n.config():
1426 if isinstance(cfg, str):
1427 parent_config = configs[n.parent][0][1][0]
1428
1429 try:
1430 for o in load(n.path, cfg, Tag.MAPPING):
1431 parent_config.merge(o)
1432 except (YAMLScanError, YAMLParseError) as e:
1433 raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg))
1434 else:
1435 target = cfg[0]
1436
1437 try:
1438 yaml = load(n.path, cfg[1], Tag.MAPPING)
1439
1440 if n.ambassador_id:
1441 for obj in yaml:
1442 if "ambassador_id" not in obj:
1443 obj["ambassador_id"] = [n.ambassador_id]
1444
1445 configs[n].append((target, yaml))
1446 except (YAMLScanError, YAMLParseError) as e:
1447 raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg[1]))
1448
1449 for tgt_cfgs in configs.values():
1450 for target, cfg in tgt_cfgs:
1451 for t in target.traversal:
1452 if t in manifests:
1453 k8s_yaml = manifests[t]
1454 for item in k8s_yaml:
1455 if item["kind"].lower() == "service":
1456 md = item["metadata"]
1457 if "annotations" not in md:
1458 md["annotations"] = {}
1459
1460 anns = md["annotations"]
1461
1462 if "getambassador.io/config" in anns:
1463 anns["getambassador.io/config"] += "\n" + dump(cfg)
1464 else:
1465 anns["getambassador.io/config"] = dump(cfg)
1466
1467 break
1468 else:
1469 continue
1470 break
1471 else:
1472 assert False, "no service found for target: %s" % target.path.name
1473
1474 yaml = ""
1475
1476 for v in manifests.values():
1477 yaml += dump(label(v, self.scope)) + "\n"
1478
1479 fname = "/tmp/k8s-%s.yaml" % self.scope
1480
1481 self.applied_manifests = False
1482
1483 # Always apply at this point, since we're doing the multi-run thing.
1484 manifest_changed, manifest_reason = has_changed(yaml, fname)
1485
1486 # OK. Try running local stuff.
1487 if self.do_local_checks(selected, fname):
1488 # Everything that could run locally did. Good enough.
1489 self.skip_nonlocal_tests = True
1490 return True
1491
1492 # Something didn't work out quite right.
1493 print(f"Continuing with Kube tests...")
1494 # print(f"ids_to_strip {self.ids_to_strip}")
1495
1496 # XXX It is _so stupid_ that we're reparsing the whole manifest here.
1497 xxx_crap = pyyaml.load_all(open(fname, "r").read(), Loader=pyyaml_loader)
1498
1499 # Strip things we don't need from the manifest.
1500 trimmed_manifests = []
1501 trimmed = 0
1502 kept = 0
1503
1504 for obj in xxx_crap:
1505 keep = True
1506
1507 kind = "-nokind-"
1508 name = "-noname-"
1509 metadata: Dict[str, Any] = {}
1510 labels: Dict[str, str] = {}
1511 id_to_check: Optional[str] = None
1512
1513 if "kind" in obj:
1514 kind = obj["kind"]
1515
1516 if "metadata" in obj:
1517 metadata = obj["metadata"]
1518
1519 if "name" in metadata:
1520 name = metadata["name"]
1521
1522 if "labels" in metadata:
1523 labels = metadata["labels"]
1524
1525 if "kat-ambassador-id" in labels:
1526 id_to_check = labels["kat-ambassador-id"]
1527
1528 # print(f"metadata {metadata} id_to_check {id_to_check} obj {obj}")
1529
1530 # Keep namespaces, just in case.
1531 if kind == "Namespace":
1532 keep = True
1533 else:
1534 if id_to_check and (id_to_check in self.ids_to_strip):
1535 keep = False
1536 # print(f"...drop {kind} {name} (ID {id_to_check})")
1537 self.names_to_ignore[name] = True
1538
1539 if keep:
1540 kept += 1
1541 trimmed_manifests.append(obj)
1542 else:
1543 trimmed += 1
1544
1545 if trimmed:
1546 print(f"After trimming: kept {kept}, trimmed {trimmed}")
1547
1548 yaml = pyyaml.dump_all(trimmed_manifests, Dumper=pyyaml_dumper)
1549
1550 fname = "/tmp/k8s-%s-trimmed.yaml" % self.scope
1551
1552 self.applied_manifests = False
1553
1554 # Always apply at this point, since we're doing the multi-run thing.
1555 manifest_changed, manifest_reason = has_changed(yaml, fname)
1556
1557 # First up: CRDs.
1558 input_crds = integration_manifests.crd_manifests()
1559 if is_knative_compatible():
1560 input_crds += integration_manifests.load("knative_serving_crds")
1561
1562 # Strip out all of the schema validation, so that we can test with broken CRDs.
1563 # (KAT isn't really in the business of testing to be sure that Kubernetes can
1564 # run the K8s validators...)
1565 crds = pyyaml.load_all(input_crds, Loader=pyyaml_loader)
1566
1567 # Collect the CRDs with schema validation stripped in stripped_crds, because
1568 # pyyaml.load_all actually returns something more complex than a simple list,
1569 # so it doesn't reserialize well after being modified.
1570 stripped_crds = []
1571
1572 for crd in crds:
1573 # Guard against empty CRDs (the KNative files have some blank lines at
1574 # the end).
1575 if not crd:
1576 continue
1577
1578 if crd["apiVersion"] == "apiextensions.k8s.io/v1":
1579 # We can't naively strip the schema validation from apiextensions.k8s.io/v1 CRDs
1580 # because it is required; otherwise the API server would refuse to create the CRD,
1581 # telling us:
1582 #
1583 # CustomResourceDefinition.apiextensions.k8s.io "…" is invalid: spec.versions[0].schema.openAPIV3Schema: Required value: schemas are required
1584 #
1585 # So instead we must replace it with a schema that allows anything.
1586 for version in crd["spec"]["versions"]:
1587 if "schema" in version:
1588 version["schema"] = {
1589 "openAPIV3Schema": {
1590 "type": "object",
1591 "properties": {
1592 "apiVersion": {"type": "string"},
1593 "kind": {"type": "string"},
1594 "metadata": {"type": "object"},
1595 "spec": {
1596 "type": "object",
1597 "x-kubernetes-preserve-unknown-fields": True,
1598 },
1599 },
1600 },
1601 }
1602 elif crd["apiVersion"] == "apiextensions.k8s.io/v1beta1":
1603 crd["spec"].pop("validation", None)
1604 for version in crd["spec"]["versions"]:
1605 version.pop("schema", None)
1606 stripped_crds.append(crd)
1607
1608 final_crds = pyyaml.dump_all(stripped_crds, Dumper=pyyaml_dumper)
1609 changed, reason = has_changed(final_crds, "/tmp/k8s-CRDs.yaml")
1610
1611 if changed:
1612 print(f"CRDS changed ({reason}), applying.")
1613 if not ShellCommand.run_with_retry(
1614 "Apply CRDs",
1615 "tools/bin/kubectl",
1616 "apply",
1617 "-f",
1618 "/tmp/k8s-CRDs.yaml",
1619 retries=5,
1620 sleep_seconds=10,
1621 ):
1622 raise RuntimeError("Failed applying CRDs")
1623
1624 tries_left = 10
1625
1626 while (
1627 os.system("tools/bin/kubectl get crd mappings.getambassador.io > /dev/null 2>&1")
1628 != 0
1629 ):
1630 tries_left -= 1
1631
1632 if tries_left <= 0:
1633 raise RuntimeError("CRDs never became available")
1634
1635 print("sleeping for CRDs... (%d)" % tries_left)
1636 time.sleep(5)
1637 else:
1638 print(f"CRDS unchanged {reason}, skipping apply.")
1639
1640 # Next up: the KAT pod.
1641 kat_client_manifests = integration_manifests.load("kat_client_pod")
1642 if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
1643 kat_client_manifests = (
1644 integration_manifests.namespace_manifest("default") + kat_client_manifests
1645 )
1646 changed, reason = has_changed(
1647 integration_manifests.format(kat_client_manifests), "/tmp/k8s-kat-pod.yaml"
1648 )
1649
1650 if changed:
1651 print(f"KAT pod definition changed ({reason}), applying")
1652 if not ShellCommand.run_with_retry(
1653 "Apply KAT pod",
1654 "tools/bin/kubectl",
1655 "apply",
1656 "-f",
1657 "/tmp/k8s-kat-pod.yaml",
1658 "-n",
1659 "default",
1660 retries=5,
1661 sleep_seconds=10,
1662 ):
1663 raise RuntimeError("Could not apply manifest for KAT pod")
1664
1665 tries_left = 3
1666 time.sleep(1)
1667
1668 while True:
1669 if ShellCommand.run(
1670 "wait for KAT pod",
1671 "tools/bin/kubectl",
1672 "-n",
1673 "default",
1674 "wait",
1675 "--timeout=30s",
1676 "--for=condition=Ready",
1677 "pod",
1678 "kat",
1679 ):
1680 print("KAT pod ready")
1681 break
1682
1683 tries_left -= 1
1684
1685 if tries_left <= 0:
1686 raise RuntimeError("KAT pod never became available")
1687
1688 print("sleeping for KAT pod... (%d)" % tries_left)
1689 time.sleep(5)
1690 else:
1691 print(f"KAT pod definition unchanged {reason}, skipping apply.")
1692
1693 # Use a dummy pod to get around the !*@&#$!*@&# DockerHub rate limit.
1694 # XXX Better: switch to GCR.
1695 dummy_pod = integration_manifests.load("dummy_pod")
1696 if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
1697 dummy_pod = integration_manifests.namespace_manifest("default") + dummy_pod
1698 changed, reason = has_changed(
1699 integration_manifests.format(dummy_pod), "/tmp/k8s-dummy-pod.yaml"
1700 )
1701
1702 if changed:
1703 print(f"Dummy pod definition changed ({reason}), applying")
1704 if not ShellCommand.run_with_retry(
1705 "Apply dummy pod",
1706 "tools/bin/kubectl",
1707 "apply",
1708 "-f",
1709 "/tmp/k8s-dummy-pod.yaml",
1710 "-n",
1711 "default",
1712 retries=5,
1713 sleep_seconds=10,
1714 ):
1715 raise RuntimeError("Could not apply manifest for dummy pod")
1716
1717 tries_left = 3
1718 time.sleep(1)
1719
1720 while True:
1721 if ShellCommand.run(
1722 "wait for dummy pod",
1723 "tools/bin/kubectl",
1724 "-n",
1725 "default",
1726 "wait",
1727 "--timeout=30s",
1728 "--for=condition=Ready",
1729 "pod",
1730 "dummy-pod",
1731 ):
1732 print("Dummy pod ready")
1733 break
1734
1735 tries_left -= 1
1736
1737 if tries_left <= 0:
1738 raise RuntimeError("Dummy pod never became available")
1739
1740 print("sleeping for dummy pod... (%d)" % tries_left)
1741 time.sleep(5)
1742 else:
1743 print(f"Dummy pod definition unchanged {reason}, skipping apply.")
1744
1745 # # Clear out old stuff.
1746 if os.environ.get("DEV_CLEAN_K8S_RESOURCES", False):
1747 print("Clearing cluster...")
1748 ShellCommand.run(
1749 "clear old Kubernetes namespaces",
1750 "tools/bin/kubectl",
1751 "delete",
1752 "namespaces",
1753 "-l",
1754 "scope=AmbassadorTest",
1755 verbose=True,
1756 )
1757 ShellCommand.run(
1758 "clear old Kubernetes pods etc.",
1759 "tools/bin/kubectl",
1760 "delete",
1761 "all",
1762 "-l",
1763 "scope=AmbassadorTest",
1764 "--all-namespaces",
1765 verbose=True,
1766 )
1767
1768 # XXX: better prune selector label
1769 if manifest_changed:
1770 print(f"manifest changed ({manifest_reason}), applying...")
1771 if not ShellCommand.run_with_retry(
1772 "Applying k8s manifests",
1773 "tools/bin/kubectl",
1774 "apply",
1775 "--prune",
1776 "-l",
1777 "scope=%s" % self.scope,
1778 "-f",
1779 fname,
1780 retries=5,
1781 sleep_seconds=10,
1782 ):
1783 raise RuntimeError("Could not apply manifests")
1784 self.applied_manifests = True
1785
1786 for n in self.nodes:
1787 if n in selected and not n.xfail:
1788 action = getattr(n, "post_manifest", None)
1789 if action:
1790 action()
1791
1792 self._wait(selected)
1793
1794 print("Waiting 5s after requirements, just because...")
1795 time.sleep(5)
1796
1797 @staticmethod
1798 def _req_str(kind, req) -> str:
1799 printable = req
1800
1801 if kind == "url":
1802 printable = req.url
1803
1804 return printable
1805
1806 def _wait(self, selected):
1807 requirements = []
1808
1809 for node in selected:
1810 if node.xfail:
1811 continue
1812
1813 node_name = node.format("{self.path.k8s}")
1814 ambassador_id = getattr(node, "ambassador_id", None)
1815
1816 # print(f"{node_name} {ambassador_id}")
1817
1818 if node.has_local_result():
1819 # print(f"{node_name} has local result, skipping")
1820 continue
1821
1822 if ambassador_id and ambassador_id in self.ids_to_strip:
1823 # print(f"{node_name} has id {ambassador_id}, stripping")
1824 continue
1825
1826 if node_name in self.names_to_ignore:
1827 # print(f"{node_name} marked to ignore, stripping")
1828 continue
1829
1830 # if RUN_MODE != "envoy":
1831 # print(f"{node_name}: including in nonlocal tests")
1832
1833 for kind, req in node.requirements():
1834 # print(f"{node_name} add req ({node_name}, {kind}, {self._req_str(kind, req)})")
1835 requirements.append((node, kind, req))
1836
1837 homogenous = {}
1838
1839 for node, kind, name in requirements:
1840 if kind not in homogenous:
1841 homogenous[kind] = []
1842
1843 homogenous[kind].append((node, name))
1844
1845 kinds = ["pod", "url"]
1846 delay = 5
1847 start = time.time()
1848 limit = int(os.environ.get("KAT_REQ_LIMIT", "900"))
1849
1850 print("Starting requirements check (limit %ds)... " % limit)
1851
1852 holdouts = {}
1853
1854 while time.time() - start < limit:
1855 for kind in kinds:
1856 if kind not in homogenous:
1857 continue
1858
1859 reqs = homogenous[kind]
1860
1861 print("Checking %s %s requirements... " % (len(reqs), kind), end="")
1862
1863 # print("\n")
1864 # for node, req in reqs:
1865 # print(f"...{node.format('{self.path.k8s}')} - {self._req_str(kind, req)}")
1866
1867 sys.stdout.flush()
1868
1869 is_ready, _holdouts = self._ready(kind, reqs)
1870
1871 if not is_ready:
1872 holdouts[kind] = _holdouts
1873 delay = int(min(delay * 2, 10))
1874 print("sleeping %ss..." % delay)
1875 sys.stdout.flush()
1876 time.sleep(delay)
1877 else:
1878 print("satisfied.")
1879 sys.stdout.flush()
1880 kinds.remove(kind)
1881
1882 break
1883 else:
1884 return
1885
1886 print("requirements not satisfied in %s seconds:" % limit)
1887
1888 for kind in kinds:
1889 _holdouts = holdouts.get(kind, [])
1890
1891 if _holdouts:
1892 print(f" {kind}:")
1893
1894 for node, text in _holdouts:
1895 print(f" {node.path.k8s} ({text})")
1896 node.log_kube_artifacts()
1897
1898 assert False, "requirements not satisfied in %s seconds" % limit
1899
1900 @multi
1901 def _ready(self, kind, _):
1902 return kind
1903
1904 @_ready.when("pod") # type: ignore
1905 def _ready(self, _, requirements):
1906 pods = self._pods(self.scope)
1907 not_ready = []
1908
1909 for node, name in requirements:
1910 if not pods.get(name, False):
1911 not_ready.append((node, name))
1912
1913 if not_ready:
1914 print("%d not ready (%s), " % (len(not_ready), name), end="")
1915 return (False, not_ready)
1916
1917 return (True, None)
1918
1919 @_ready.when("url") # type: ignore
1920 def _ready(self, _, requirements):
1921 queries = []
1922
1923 for node, q in requirements:
1924 q.insecure = True
1925 q.parent = node
1926 queries.append(q)
1927
1928 # print("URL Reqs:")
1929 # print("\n".join([ f'{q.parent.name}: {q.url}' for q in queries ]))
1930
1931 result = run_queries("reqcheck", queries)
1932
1933 not_ready = [r for r in result if r.status != r.query.expected]
1934
1935 if not_ready:
1936 first = not_ready[0]
1937 print(
1938 "%d not ready (%s: %s) "
1939 % (len(not_ready), first.query.url, first.status or first.error),
1940 end="",
1941 )
1942 return (
1943 False,
1944 [
1945 (x.query.parent, "%s -- %s" % (x.query.url, x.status or x.error))
1946 for x in not_ready
1947 ],
1948 )
1949 else:
1950 return (True, None)
1951
1952 def _pods(self, scope=None):
1953 scope_for_path = scope if scope else "global"
1954 label_for_scope = f"-l scope={scope}" if scope else ""
1955
1956 fname = f"/tmp/pods-{scope_for_path}.json"
1957 if not ShellCommand.run_with_retry(
1958 "Getting pods",
1959 f"tools/bin/kubectl get pod {label_for_scope} --all-namespaces -o json > {fname}",
1960 shell=True,
1961 retries=5,
1962 sleep_seconds=10,
1963 ):
1964 raise RuntimeError("Could not get pods")
1965
1966 with open(fname) as f:
1967 raw_pods = json.load(f)
1968
1969 pods = {}
1970
1971 for p in raw_pods["items"]:
1972 name = p["metadata"]["name"]
1973
1974 cstats = p["status"].get("containerStatuses", [])
1975
1976 all_ready = True
1977
1978 for status in cstats:
1979 ready = status.get("ready", False)
1980
1981 if not ready:
1982 all_ready = False
1983 # print(f'pod {name} is not ready: {status.get("state", "unknown state")}')
1984
1985 pods[name] = all_ready
1986
1987 return pods
1988
1989 def _query(self, selected) -> None:
1990 queries = []
1991
1992 for t in self.tests:
1993 t_name = t.format("{self.path.k8s}")
1994
1995 if t in selected:
1996 t.pending = []
1997 t.queried = []
1998 t.results = []
1999 else:
2000 continue
2001
2002 if t.has_local_result():
2003 # print(f"{t_name}: SKIP QUERY due to local result")
2004 continue
2005
2006 ambassador_id = getattr(t, "ambassador_id", None)
2007
2008 if ambassador_id and ambassador_id in self.ids_to_strip:
2009 # print(f"{t_name}: SKIP QUERY due to ambassador_id {ambassador_id}")
2010 continue
2011
2012 # print(f"{t_name}: INCLUDE QUERY")
2013 for q in t.queries():
2014 q.parent = t
2015 t.pending.append(q)
2016 queries.append(q)
2017
2018 phases = sorted(set([q.phase for q in queries]))
2019 first = True
2020
2021 for phase in phases:
2022 if not first:
2023 phase_delay = int(os.environ.get("KAT_PHASE_DELAY", 10))
2024 print(
2025 "Waiting for {} seconds before starting phase {}...".format(phase_delay, phase)
2026 )
2027 time.sleep(phase_delay)
2028
2029 first = False
2030
2031 phase_queries = [q for q in queries if q.phase == phase]
2032
2033 print("Querying %s urls in phase %s..." % (len(phase_queries), phase), end="")
2034 sys.stdout.flush()
2035
2036 results = run_queries(f"phase{phase}", phase_queries)
2037
2038 print(" done.")
2039
2040 for r in results:
2041 t = r.parent
2042 t.queried.append(r.query)
2043
2044 if getattr(t, "debug", False) or getattr(r.query, "debug", False):
2045 print(
2046 "%s result: %s"
2047 % (t.name, json.dumps(r.as_dict(), sort_keys=True, indent=4))
2048 )
2049
2050 t.results.append(r)
2051 t.pending.remove(r.query)
View as plain text