1import base64
2import fnmatch
3import functools
4import inspect
5import json
6import os
7import subprocess
8import sys
9import threading
10import time
11import traceback
12from abc import ABC
13from collections import OrderedDict
14from functools import singledispatch
15from hashlib import sha256
16from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Tuple, Type, Union, cast
17
18import pytest
19import yaml as pyyaml
20from packaging import version
21from yaml.parser import ParserError as YAMLParseError
22from yaml.scanner import ScannerError as YAMLScanError
23
24import tests.integration.manifests as integration_manifests
25from ambassador.utils import parse_bool
26from tests.manifests import cleartext_host_manifest, default_listener_manifest
27
28from .parser import SequenceView, Tag, dump, load
29from .utils import ShellCommand
30
31pyyaml_loader: Any = pyyaml.SafeLoader
32pyyaml_dumper: Any = pyyaml.SafeDumper
33
34try:
35 pyyaml_loader = pyyaml.CSafeLoader
36 pyyaml_dumper = pyyaml.CSafeDumper
37except AttributeError:
38 pass
39
40# We may have a SOURCE_ROOT override from the environment
41SOURCE_ROOT = os.environ.get("SOURCE_ROOT", "")
42
43# Figure out if we're running in Edge Stack or what.
44if os.path.exists("/buildroot/apro.version"):
45 # We let /buildroot/apro.version remain a source of truth to minimize the
46 # chances that we break anything that currently uses the builder shell.
47 EDGE_STACK = True
48else:
49 # If we do not see concrete evidence of running in an apro builder shell,
50 # then try to decide if the user wants us to assume we're running Edge Stack
51 # from an environment variable. And if that isn't set, just assume OSS.
52 EDGE_STACK = parse_bool(os.environ.get("EDGE_STACK", "false"))
53
54if EDGE_STACK:
55 # Hey look, we're running inside Edge Stack!
56 print("RUNNING IN EDGE STACK")
57 # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
58 # running in a build shell and we should look for sources in the usual location.
59 if not SOURCE_ROOT:
60 SOURCE_ROOT = "/buildroot/apro"
61else:
62 # We're either not running in Edge Stack or we're not sure, so just assume OSS.
63 print("RUNNING IN OSS")
64 # SOURCE_ROOT is optional, and we assume that if it isn't set, the user is
65 # running in a build shell and we should look for sources in the usual location.
66 if not SOURCE_ROOT:
67 SOURCE_ROOT = "/buildroot/ambassador"
68
69
70def run(cmd):
71 status = os.system(cmd)
72 if status != 0:
73 raise RuntimeError("command failed[%s]: %s" % (status, cmd))
74
75
76def kube_version_json():
77 result = subprocess.Popen(
78 "tools/bin/kubectl version -o json", stdout=subprocess.PIPE, shell=True
79 )
80 stdout, _ = result.communicate()
81 return json.loads(stdout)
82
83
84def strip_version(ver: str):
85 """
86 strip_version is needed to strip a major/minor version of non-standard symbols. For example, when working with GKE,
87 `kubectl version` returns a minor version like '14+', which is not semver or any standard version, for that matter.
88 So we handle exceptions like that here.
89 :param ver: version string
90 :return: stripped version
91 """
92
93 try:
94 return int(ver)
95 except ValueError as e:
96 # GKE returns weird versions with '+' in the end
97 if ver[-1] == "+":
98 return int(ver[:-1])
99
100 # If we still have not taken care of this, raise the error
101 raise ValueError(e)
102
103
104def kube_server_version(version_json=None):
105 if not version_json:
106 version_json = kube_version_json()
107
108 server_json = version_json.get("serverVersion", {})
109
110 if server_json:
111 server_major = strip_version(server_json.get("major", None))
112 server_minor = strip_version(server_json.get("minor", None))
113
114 return f"{server_major}.{server_minor}"
115 else:
116 return None
117
118
119def kube_client_version(version_json=None):
120 if not version_json:
121 version_json = kube_version_json()
122
123 client_json = version_json.get("clientVersion", {})
124
125 if client_json:
126 client_major = strip_version(client_json.get("major", None))
127 client_minor = strip_version(client_json.get("minor", None))
128
129 return f"{client_major}.{client_minor}"
130 else:
131 return None
132
133
134def is_kube_server_client_compatible(
135 debug_desc: str, requested_server_version: str, requested_client_version: str
136) -> bool:
137 is_cluster_compatible = True
138 kube_json = kube_version_json()
139
140 server_version = kube_server_version(kube_json)
141 client_version = kube_client_version(kube_json)
142
143 if server_version:
144 if version.parse(server_version) < version.parse(requested_server_version):
145 print(f"server version {server_version} is incompatible with {debug_desc}")
146 is_cluster_compatible = False
147 else:
148 print(f"server version {server_version} is compatible with {debug_desc}")
149 else:
150 print("could not determine Kubernetes server version?")
151
152 if client_version:
153 if version.parse(client_version) < version.parse(requested_client_version):
154 print(f"client version {client_version} is incompatible with {debug_desc}")
155 is_cluster_compatible = False
156 else:
157 print(f"client version {client_version} is compatible with {debug_desc}")
158 else:
159 print("could not determine Kubernetes client version?")
160
161 return is_cluster_compatible
162
163
164def is_ingress_class_compatible() -> bool:
165 return is_kube_server_client_compatible("IngressClass", "1.18", "1.14")
166
167
168def is_knative_compatible() -> bool:
169 return is_kube_server_client_compatible("Knative", "1.14", "1.14")
170
171
172def get_digest(data: str) -> str:
173 s = sha256()
174 s.update(data.encode("utf-8"))
175 return s.hexdigest()
176
177
178def has_changed(data: str, path: str) -> Tuple[bool, str]:
179 cur_size = len(data.strip()) if data else 0
180 cur_hash = get_digest(data)
181
182 # print(f'has_changed: data size {cur_size} - {cur_hash}')
183
184 prev_data = None
185 changed = True
186 reason = f"no {path} present"
187
188 if os.path.exists(path):
189 with open(path) as f:
190 prev_data = f.read()
191
192 prev_size = len(prev_data.strip()) if prev_data else 0
193 prev_hash = None
194
195 if prev_data:
196 prev_hash = get_digest(prev_data)
197
198 # print(f'has_changed: prev_data size {prev_size} - {prev_hash}')
199
200 if data:
201 if data != prev_data:
202 reason = f"different data in {path}"
203 else:
204 changed = False
205 reason = f"same data in {path}"
206
207 if changed:
208 # print(f'has_changed: updating {path}')
209 with open(path, "w") as f:
210 f.write(data)
211
212 # For now, we always have to reapply with split testing.
213 if not changed:
214 changed = True
215 reason = "always reapply for split test"
216
217 return (changed, reason)
218
219
220COUNTERS: Dict[Type, int] = {}
221
222SANITIZATIONS = OrderedDict(
223 (
224 ("://", "SCHEME"),
225 (":", "COLON"),
226 (" ", "SPACE"),
227 ("/t", "TAB"),
228 (".", "DOT"),
229 ("?", "QMARK"),
230 ("/", "SLASH"),
231 )
232)
233
234
235def sanitize(obj):
236 if isinstance(obj, str):
237 for k, v in SANITIZATIONS.items():
238 if obj.startswith(k):
239 obj = obj.replace(k, v + "-")
240 elif obj.endswith(k):
241 obj = obj.replace(k, "-" + v)
242 else:
243 obj = obj.replace(k, "-" + v + "-")
244 return obj
245 elif isinstance(obj, dict):
246 if "value" in obj:
247 return obj["value"]
248 else:
249 return "-".join("%s-%s" % (sanitize(k), sanitize(v)) for k, v in sorted(obj.items()))
250 else:
251 cls = obj.__class__
252 count = COUNTERS.get(cls, 0)
253 COUNTERS[cls] = count + 1
254 if count == 0:
255 return cls.__name__
256 else:
257 return "%s-%s" % (cls.__name__, count)
258
259
260def abstract_test(cls: type):
261 cls.abstract_test = True # type: ignore
262 return cls
263
264
265def get_nodes(node_type: type):
266 if not inspect.isabstract(node_type) and not node_type.__dict__.get("abstract_test", False):
267 yield node_type
268 for sc in node_type.__subclasses__():
269 if not sc.__dict__.get("skip_variant", False):
270 for ssc in get_nodes(sc):
271 yield ssc
272
273
274def variants(cls, *args, **kwargs) -> Tuple[Any]:
275 return tuple(a for n in get_nodes(cls) for a in n.variants(*args, **kwargs)) # type: ignore
276
277
278class Name(NamedTuple):
279 name: str
280 namespace: str
281
282 @property
283 def k8s(self) -> str:
284 return self.name.replace(".", "-").lower()
285
286 @property
287 def fqdn(self) -> str:
288 return ".".join([self.k8s, self.namespace, "svc", "cluster", "local"])
289
290
291class NodeLocal(threading.local):
292 current: Optional["Node"]
293
294 def __init__(self):
295 self.current = None
296
297
298_local = NodeLocal()
299
300
301def _argprocess(o):
302 if isinstance(o, Node):
303 return o.clone()
304 elif isinstance(o, tuple):
305 return tuple(_argprocess(i) for i in o)
306 elif isinstance(o, list):
307 return [_argprocess(i) for i in o]
308 elif isinstance(o, dict):
309 return {_argprocess(k): _argprocess(v) for k, v in o.items()}
310 else:
311 return o
312
313
314class Node(ABC):
315
316 parent: Optional["Node"]
317 children: List["Node"]
318 name: str
319 ambassador_id: str
320 namespace: str
321 is_ambassador = False
322 xfail: Optional[str]
323
324 def __init__(
325 self,
326 *args,
327 name: Optional[str] = None,
328 namespace: Optional[str] = None,
329 _clone: Optional["Node"] = None,
330 **kwargs,
331 ) -> None:
332 # If self.skip is set to true, this node is skipped
333 self.skip_node = False
334 self.xfail: Optional[str] = None
335
336 if _clone:
337 args = _clone._args # type: ignore
338 kwargs = _clone._kwargs # type: ignore
339 if name:
340 name = "-".join((_clone.name, name))
341 else:
342 name = _clone.name
343 self._args = _clone._args # type: ignore
344 self._kwargs = _clone._kwargs # type: ignore
345 else:
346 self._args = args
347 self._kwargs = kwargs
348 if name:
349 name = "-".join((self.__class__.__name__, name))
350 else:
351 name = self.__class__.__name__
352
353 saved = _local.current
354 self.parent = _local.current
355
356 if namespace:
357 self.namespace = namespace
358 if not getattr(self, "namespace", ""):
359 # We do the above `getattr` instead of just an `else` because subclasses might set a
360 # default namespace; so `self.namespace` might already be set before calling __init__().
361 if self.parent and self.parent.namespace:
362 # We have no namespace assigned, but our parent does have a namespace
363 # defined. Copy the namespace down from our parent.
364 self.namespace = self.parent.namespace
365 else:
366 self.namespace = "default"
367
368 _local.current = self
369 self.children = []
370 if self.parent is not None:
371 self.parent.children.append(self)
372 try:
373 init = getattr(self, "init", lambda *a, **kw: None)
374 init(*_argprocess(args), **_argprocess(kwargs))
375 finally:
376 _local.current = saved
377
378 # This has to come after the above to init(), because the format-string might reference
379 # things that get set by init().
380 self.name = self.format(name)
381
382 names = {} # type: ignore
383 for c in self.children:
384 assert (
385 c.name not in names
386 ), "test %s of type %s has duplicate children: %s of type %s, %s" % (
387 self.name,
388 self.__class__.__name__,
389 c.name,
390 c.__class__.__name__,
391 names[c.name].__class__.__name__,
392 )
393 names[c.name] = c
394
395 def clone(self, name=None):
396 return self.__class__(_clone=self, name=name)
397
398 @classmethod
399 def variants(cls):
400 yield cls()
401
402 @property
403 def path(self) -> Name:
404 if self.parent is None:
405 return Name(self.name, self.namespace)
406 else:
407 return Name(self.parent.path.name + "." + self.name, self.namespace)
408
409 @property
410 def traversal(self):
411 yield self
412 for c in self.children:
413 for d in c.traversal:
414 yield d
415
416 @property
417 def ancestors(self):
418 yield self
419 if self.parent is not None:
420 for a in self.parent.ancestors:
421 yield a
422
423 @property
424 def depth(self):
425 if self.parent is None:
426 return 0
427 else:
428 return self.parent.depth + 1
429
430 def format(self, st, **kwargs):
431 return integration_manifests.format(st, self=self, **kwargs)
432
433 @functools.lru_cache()
434 def matches(self, pattern):
435 if fnmatch.fnmatch(self.path.name, "*%s*" % pattern):
436 return True
437 for c in self.children:
438 if c.matches(pattern):
439 return True
440 return False
441
442 def requirements(self):
443 yield from ()
444
445 # log_kube_artifacts writes various logs about our underlying Kubernetes objects to
446 # a place where the artifact publisher can find them. See run-tests.sh.
447 def log_kube_artifacts(self):
448 if not getattr(self, "already_logged", False):
449 self.already_logged = True
450
451 print(f"logging kube artifacts for {self.path.k8s}")
452 sys.stdout.flush()
453
454 DEV = os.environ.get("AMBASSADOR_DEV", "0").lower() in ("1", "yes", "true")
455
456 log_path = f"/tmp/kat-logs-{self.path.k8s}"
457
458 if DEV:
459 os.system(f"docker logs {self.path.k8s} >{log_path} 2>&1")
460 else:
461 os.system(
462 f"tools/bin/kubectl logs -n {self.namespace} {self.path.k8s} >{log_path} 2>&1"
463 )
464
465 event_path = f"/tmp/kat-events-{self.path.k8s}"
466
467 fs1 = f"involvedObject.name={self.path.k8s}"
468 fs2 = f"involvedObject.namespace={self.namespace}"
469
470 cmd = f'tools/bin/kubectl get events -o json --field-selector "{fs1}" --field-selector "{fs2}"'
471 os.system(f'echo ==== "{cmd}" >{event_path}')
472 os.system(f"{cmd} >>{event_path} 2>&1")
473
474
475class Test(Node):
476
477 results: List["Result"] = []
478 pending: List["Query"] = []
479 queried: List["Query"] = []
480
481 __test__ = False
482
483 def config(self):
484 yield from ()
485
486 def manifests(self):
487 return None
488
489 def queries(self):
490 yield from ()
491
492 def check(self):
493 pass
494
495 @property
496 def ambassador_id(self):
497 if self.parent is None:
498 return self.path.k8s
499 else:
500 return self.parent.ambassador_id
501
502
503@singledispatch
504def encode_body(obj):
505 return encode_body(json.dumps(obj))
506
507
508@encode_body.register
509def encode_body_bytes(b: bytes):
510 return base64.encodebytes(b).decode("utf-8")
511
512
513@encode_body.register
514def encode_body_str(s: str):
515 return encode_body(s.encode("utf-8"))
516
517
518class Query:
519 def __init__(
520 self,
521 url,
522 expected=None,
523 method="GET",
524 headers=None,
525 messages=None,
526 insecure=False,
527 skip=None,
528 xfail=None,
529 phase=1,
530 debug=False,
531 sni=False,
532 error=None,
533 client_crt=None,
534 client_key=None,
535 client_cert_required=False,
536 ca_cert=None,
537 grpc_type=None,
538 cookies=None,
539 ignore_result=False,
540 body=None,
541 minTLSv="",
542 maxTLSv="",
543 cipherSuites=[],
544 ecdhCurves=[],
545 ):
546 self.method = method
547 self.url = url
548 self.headers = headers
549 self.body = body
550 self.cookies = cookies
551 self.messages = messages
552 self.insecure = insecure
553 self.minTLSv = minTLSv
554 self.maxTLSv = maxTLSv
555 self.cipherSuites = cipherSuites
556 self.ecdhCurves = ecdhCurves
557 if expected is None:
558 if url.lower().startswith("ws:"):
559 self.expected = 101
560 else:
561 self.expected = 200
562 else:
563 self.expected = expected
564 self.skip = skip
565 self.xfail = xfail
566 self.ignore_result = ignore_result
567 self.phase = phase
568 self.parent = None
569 self.result = None
570 self.debug = debug
571 self.sni = sni
572 self.error = error
573 self.client_cert_required = client_cert_required
574 self.client_cert = client_crt
575 self.client_key = client_key
576 self.ca_cert = ca_cert
577 assert grpc_type in (None, "real", "bridge", "web"), grpc_type
578 self.grpc_type = grpc_type
579
580 def as_json(self):
581 assert self.parent
582 result = {
583 "test": self.parent.path.name,
584 "id": id(self),
585 "url": self.url,
586 "insecure": self.insecure,
587 }
588 if self.sni:
589 result["sni"] = self.sni
590 if self.method:
591 result["method"] = self.method
592 if self.method:
593 result["maxTLSv"] = self.maxTLSv
594 if self.method:
595 result["minTLSv"] = self.minTLSv
596 if self.cipherSuites:
597 result["cipherSuites"] = self.cipherSuites
598 if self.ecdhCurves:
599 result["ecdhCurves"] = self.ecdhCurves
600 if self.headers:
601 result["headers"] = self.headers
602 if self.body is not None:
603 result["body"] = encode_body(self.body)
604 if self.cookies:
605 result["cookies"] = self.cookies
606 if self.messages is not None:
607 result["messages"] = self.messages
608 if self.client_cert is not None:
609 result["client_cert"] = self.client_cert
610 if self.client_key is not None:
611 result["client_key"] = self.client_key
612 if self.ca_cert is not None:
613 result["ca_cert"] = self.ca_cert
614 if self.client_cert_required:
615 result["client_cert_required"] = self.client_cert_required
616 if self.grpc_type:
617 result["grpc_type"] = self.grpc_type
618
619 return result
620
621
622class Result:
623 body: Optional[bytes]
624
625 def __init__(self, query, res):
626 self.query = query
627 query.result = self
628 self.parent = query.parent
629 self.status = res.get("status")
630 self.headers = res.get("headers")
631 self.messages = res.get("messages")
632 self.tls = res.get("tls")
633 if "body" in res:
634 self.body = base64.decodebytes(bytes(res["body"], "ASCII"))
635 else:
636 self.body = None
637 self.text = res.get("text")
638 self.json = res.get("json")
639 self.backend = BackendResult(self.json) if self.json else None
640 self.error = res.get("error")
641
642 def __repr__(self):
643 return str(self.as_dict())
644
645 def check(self):
646 if self.query.skip:
647 pytest.skip(self.query.skip)
648
649 if self.query.xfail:
650 pytest.xfail(self.query.xfail)
651
652 if not self.query.ignore_result:
653 if self.query.error is not None:
654 found = False
655 errors = self.query.error
656
657 if isinstance(self.query.error, str):
658 errors = [self.query.error]
659
660 if self.error is not None:
661 for error in errors:
662 if error in self.error:
663 found = True
664 break
665
666 assert found, "{}: expected error to contain any of {}; got {} instead".format(
667 self.query.url,
668 ", ".join(["'%s'" % x for x in errors]),
669 ("'%s'" % self.error) if self.error else "no error",
670 )
671 else:
672 if isinstance(self.query.expected, list):
673 if self.status not in self.query.expected:
674 self.parent.log_kube_artifacts()
675 assert (
676 self.status in self.query.expected
677 ), "%s: expected status code %s, got %s instead with error %s" % (
678 self.query.url,
679 self.query.expected,
680 self.status,
681 self.error,
682 )
683 else:
684
685 if self.query.expected != self.status:
686 self.parent.log_kube_artifacts()
687 assert (
688 self.query.expected == self.status
689 ), "%s: expected status code %s, got %s instead with error %s" % (
690 self.query.url,
691 self.query.expected,
692 self.status,
693 self.error,
694 )
695
696 def as_dict(self) -> Dict[str, Any]:
697 od = {
698 "query": self.query.as_json(),
699 "status": self.status,
700 "error": self.error,
701 "headers": self.headers,
702 }
703
704 if self.backend and self.backend.name:
705 od["backend"] = self.backend.as_dict()
706 else:
707 od["json"] = self.json
708 od["text"] = self.text
709
710 return od
711
712 # 'RENDERED': {
713 # 'client': {
714 # 'request': self.query.as_json(),
715 # 'response': {
716 # 'status': self.status,
717 # 'error': self.error,
718 # 'headers': self.headers
719 # }
720 # },
721 # 'upstream': {
722 # 'name': self.backend.name,
723 # 'request': {
724 # 'headers': self.backend.request.headers,
725 # 'url': {
726 # 'fragment': self.backend.request.url.fragment,
727 # 'host': self.backend.request.url.host,
728 # 'opaque': self.backend.request.url.opaque,
729 # 'path': self.backend.request.url.path,
730 # 'query': self.backend.request.url.query,
731 # 'rawQuery': self.backend.request.url.rawQuery,
732 # 'scheme': self.backend.request.url.scheme,
733 # 'username': self.backend.request.url.username,
734 # 'password': self.backend.request.url.password,
735 # },
736 # 'host': self.backend.request.host,
737 # 'tls': {
738 # 'enabled': self.backend.request.tls.enabled,
739 # 'server_name': self.backend.request.tls.server_name,
740 # 'version': self.backend.request.tls.version,
741 # 'negotiated_protocol': self.backend.request.tls.negotiated_protocol,
742 # },
743 # },
744 # 'response': {
745 # 'headers': self.backend.response.headers
746 # }
747 # }
748 # }
749
750
751class BackendURL:
752 def __init__(
753 self,
754 fragment=None,
755 host=None,
756 opaque=None,
757 path=None,
758 query=None,
759 rawQuery=None,
760 scheme=None,
761 username=None,
762 password=None,
763 ):
764 self.fragment = fragment
765 self.host = host
766 self.opaque = opaque
767 self.path = path
768 self.query = query
769 self.rawQuery = rawQuery
770 self.scheme = scheme
771 self.username = username
772 self.password = password
773
774 def as_dict(self) -> Dict["str", Any]:
775 return {
776 "fragment": self.fragment,
777 "host": self.host,
778 "opaque": self.opaque,
779 "path": self.path,
780 "query": self.query,
781 "rawQuery": self.rawQuery,
782 "scheme": self.scheme,
783 "username": self.username,
784 "password": self.password,
785 }
786
787
788class BackendRequest:
789 def __init__(self, req):
790 self.url = BackendURL(**req.get("url"))
791 self.headers = req.get("headers", {})
792 self.host = req.get("host", None)
793 self.tls = BackendTLS(req.get("tls", {}))
794
795 def as_dict(self) -> Dict[str, Any]:
796 od = {
797 "headers": self.headers,
798 "host": self.host,
799 }
800
801 if self.url:
802 od["url"] = self.url.as_dict()
803
804 if self.tls:
805 od["tls"] = self.tls.as_dict()
806
807 return od
808
809
810class BackendTLS:
811 def __init__(self, tls):
812 self.enabled = tls["enabled"]
813 self.server_name = tls.get("server-name")
814 self.version = tls.get("version")
815 self.negotiated_protocol = tls.get("negotiated-protocol")
816 self.negotiated_protocol_version = tls.get("negotiated-protocol-version")
817
818 def as_dict(self) -> Dict[str, Any]:
819 return {
820 "enabled": self.enabled,
821 "server_name": self.server_name,
822 "version": self.version,
823 "negotiated_protocol": self.negotiated_protocol,
824 "negotiated_protocol_version": self.negotiated_protocol_version,
825 }
826
827
828class BackendResponse:
829 def __init__(self, resp):
830 self.headers = resp.get("headers", {})
831
832 def as_dict(self) -> Dict[str, Any]:
833 return {"headers": self.headers}
834
835
836def dictify(obj):
837 if getattr(obj, "as_dict", None):
838 return obj.as_dict()
839 else:
840 return obj
841
842
843class BackendResult:
844 def __init__(self, bres):
845 self.name = "raw"
846 self.request = None
847 self.response = bres
848
849 if isinstance(bres, dict):
850 self.name = cast(str, bres.get("backend"))
851 self.request = BackendRequest(bres["request"]) if "request" in bres else None
852 self.response = BackendResponse(bres["response"]) if "response" in bres else None
853
854 def as_dict(self) -> Dict[str, Any]:
855 od = {"name": self.name}
856
857 if self.request:
858 od["request"] = dictify(self.request)
859
860 if self.response:
861 od["response"] = dictify(self.response)
862
863 return od
864
865
866def label(yaml, scope):
867 for obj in yaml:
868 md = obj["metadata"]
869
870 if "labels" not in md:
871 md["labels"] = {}
872
873 obj["metadata"]["labels"]["scope"] = scope
874 return yaml
875
876
877CLIENT_GO = "kat_client"
878
879
880def run_queries(name: str, queries: Sequence[Query]) -> Sequence[Result]:
881 jsonified = []
882 byid = {}
883
884 for q in queries:
885 jsonified.append(q.as_json())
886 byid[id(q)] = q
887
888 path_urls = f"/tmp/kat-client-{name}-urls.json"
889 path_results = f"/tmp/kat-client-{name}-results.json"
890 path_log = f"/tmp/kat-client-{name}.log"
891
892 with open(path_urls, "w") as f:
893 json.dump(jsonified, f)
894
895 # run(f"{CLIENT_GO} -input {path_urls} -output {path_results} 2> {path_log}")
896 res = ShellCommand.run(
897 "Running queries",
898 f"tools/bin/kubectl exec -n default -i kat -- /work/kat_client < '{path_urls}' > '{path_results}' 2> '{path_log}'",
899 shell=True,
900 )
901
902 if not res:
903 ret = [Result(q, {"error": "Command execution error"}) for q in queries]
904 return ret
905
906 with open(path_results, "r") as f:
907 content = f.read()
908 try:
909 json_results = json.loads(content)
910 except Exception as e:
911 ret = [
912 Result(q, {"error": "Could not parse JSON content after running KAT queries"})
913 for q in queries
914 ]
915 return ret
916
917 results = []
918
919 for r in json_results:
920 res = r["result"]
921 q = byid[r["id"]]
922 results.append(Result(q, res))
923
924 return results
925
926
927# yuck
928DOCTEST = False
929
930
931class Superpod:
932 def __init__(self, namespace: str) -> None:
933 self.namespace = namespace
934 self.next_clear = 8080
935 self.next_tls = 8443
936 self.service_names: Dict[int, str] = {}
937 self.name = "superpod-%s" % (self.namespace or "default")
938
939 def allocate(self, service_name) -> List[int]:
940 ports = [self.next_clear, self.next_tls]
941 self.service_names[self.next_clear] = service_name
942 self.service_names[self.next_tls] = service_name
943
944 self.next_clear += 1
945 self.next_tls += 1
946
947 return ports
948
949 def get_manifest_list(self) -> List[Dict[str, Any]]:
950 manifest = load(
951 "superpod",
952 integration_manifests.format(integration_manifests.load("superpod_pod")),
953 Tag.MAPPING,
954 )
955
956 assert len(manifest) == 1, "SUPERPOD manifest must have exactly one object"
957
958 m = manifest[0]
959
960 template = m["spec"]["template"]
961
962 ports: List[Dict[str, int]] = []
963 envs: List[Dict[str, Union[str, int]]] = template["spec"]["containers"][0]["env"]
964
965 for p in sorted(self.service_names.keys()):
966 ports.append({"containerPort": p})
967 envs.append({"name": f"BACKEND_{p}", "value": self.service_names[p]})
968
969 template["spec"]["containers"][0]["ports"] = ports
970
971 if "metadata" not in m:
972 m["metadata"] = {}
973
974 metadata = m["metadata"]
975 metadata["name"] = self.name
976
977 m["spec"]["selector"]["matchLabels"]["backend"] = self.name
978 template["metadata"]["labels"]["backend"] = self.name
979
980 if self.namespace:
981 # Fix up the namespace.
982 if "namespace" not in metadata:
983 metadata["namespace"] = self.namespace
984
985 return list(manifest)
986
987
988class Runner:
989 def __init__(self, *classes, scope=None):
990 self.scope = scope or "-".join(c.__name__ for c in classes)
991 self.roots = tuple(v for c in classes for v in variants(c))
992 self.nodes = [n for r in self.roots for n in r.traversal if not n.skip_node]
993 self.tests = [n for n in self.nodes if isinstance(n, Test)]
994 self.ids = [t.path.name for t in self.tests]
995 self.done = False
996 self.ids_to_strip: Dict[str, bool] = {}
997 self.names_to_ignore: Dict[str, bool] = {}
998
999 @pytest.mark.parametrize("t", self.tests, ids=self.ids)
1000 def test(request, capsys, t):
1001 if t.xfail:
1002 pytest.xfail(t.xfail)
1003 else:
1004 selected = set(
1005 item.callspec.getparam("t")
1006 for item in request.session.items
1007 if item.function == test
1008 )
1009
1010 with capsys.disabled():
1011 self.setup(selected)
1012
1013 # XXX: should aggregate the result of url checks
1014 i = 0
1015 for r in t.results:
1016 try:
1017 r.check()
1018 except AssertionError as e:
1019 # Add some context so that you can tell which query is failing.
1020 e.args = (f"query[{i}]: {e.args[0]}", *e.args[1:])
1021 raise
1022 i += 1
1023
1024 t.check()
1025
1026 self.__func__ = test
1027 self.__test__ = True
1028
1029 def __call__(self):
1030 assert False, "this is here for py.test discovery purposes only"
1031
1032 def setup(self, selected):
1033 if not self.done:
1034 if not DOCTEST:
1035 print()
1036
1037 expanded_up = set(selected)
1038
1039 for s in selected:
1040 for n in s.ancestors:
1041 if not n.xfail:
1042 expanded_up.add(n)
1043
1044 expanded = set(expanded_up)
1045
1046 for s in selected:
1047 for n in s.traversal:
1048 if not n.xfail:
1049 expanded.add(n)
1050
1051 try:
1052 self._setup_k8s(expanded)
1053 self._query(expanded_up)
1054 except:
1055 traceback.print_exc()
1056 pytest.exit("setup failed")
1057 finally:
1058 self.done = True
1059
1060 def get_manifests_and_namespaces(self, selected) -> Tuple[Any, List[str]]:
1061 manifests: OrderedDict[Any, list] = OrderedDict() # type: ignore
1062 superpods: Dict[str, Superpod] = {}
1063 namespaces = []
1064 for n in (n for n in self.nodes if n in selected and not n.xfail):
1065 manifest = None
1066 nsp = None
1067 ambassador_id = None
1068
1069 # print('manifesting for {n.path}')
1070
1071 # Walk up the parent chain to find our namespace and ambassador_id.
1072 cur = n
1073
1074 while cur:
1075 if not nsp:
1076 nsp = getattr(cur, "namespace", None)
1077 # print(f'... {cur.name} has namespace {nsp}')
1078
1079 if not ambassador_id:
1080 ambassador_id = getattr(cur, "ambassador_id", None)
1081 # print(f'... {cur.name} has ambassador_id {ambassador_id}')
1082
1083 if nsp and ambassador_id:
1084 # print(f'... good for namespace and ambassador_id')
1085 break
1086
1087 cur = cur.parent
1088
1089 # OK. Does this node want to use a superpod?
1090 if getattr(n, "use_superpod", False):
1091 # Yup. OK. Do we already have a superpod for this namespace?
1092 superpod = superpods.get(nsp, None) # type: ignore
1093
1094 if not superpod:
1095 # We don't have one, so we need to create one.
1096 superpod = Superpod(nsp) # type: ignore
1097 superpods[nsp] = superpod # type: ignore
1098
1099 # print(f'superpodifying {n.name}')
1100
1101 # Next up: use the backend_service.yaml manifest as a template...
1102 yaml = n.format(integration_manifests.load("backend_service"))
1103 manifest = load(n.path, yaml, Tag.MAPPING)
1104
1105 assert (
1106 len(manifest) == 1
1107 ), "backend_service.yaml manifest must have exactly one object"
1108
1109 m = manifest[0]
1110
1111 # Update the manifest's selector...
1112 m["spec"]["selector"]["backend"] = superpod.name
1113
1114 # ...and labels if needed...
1115 if ambassador_id:
1116 m["metadata"]["labels"] = {"kat-ambassador-id": ambassador_id}
1117
1118 # ...and target ports.
1119 superpod_ports = superpod.allocate(n.path.k8s)
1120
1121 m["spec"]["ports"][0]["targetPort"] = superpod_ports[0]
1122 m["spec"]["ports"][1]["targetPort"] = superpod_ports[1]
1123 else:
1124 # The non-superpod case...
1125 yaml = n.manifests()
1126
1127 if yaml is not None:
1128 is_plain_test = n.path.k8s.startswith("plain-")
1129
1130 if n.is_ambassador and not is_plain_test:
1131 add_default_http_listener = getattr(n, "add_default_http_listener", True)
1132 add_default_https_listener = getattr(n, "add_default_https_listener", True)
1133 add_cleartext_host = getattr(n, "edge_stack_cleartext_host", False)
1134
1135 if add_default_http_listener:
1136 # print(f"{n.path.k8s} adding default HTTP Listener")
1137 yaml += default_listener_manifest % {
1138 "namespace": nsp,
1139 "port": 8080,
1140 "protocol": "HTTPS",
1141 "securityModel": "XFP",
1142 }
1143
1144 if add_default_https_listener:
1145 # print(f"{n.path.k8s} adding default HTTPS Listener")
1146 yaml += default_listener_manifest % {
1147 "namespace": nsp,
1148 "port": 8443,
1149 "protocol": "HTTPS",
1150 "securityModel": "XFP",
1151 }
1152
1153 if EDGE_STACK and add_cleartext_host:
1154 # print(f"{n.path.k8s} adding Host")
1155
1156 host_yaml = cleartext_host_manifest % nsp
1157 yaml += host_yaml
1158
1159 yaml = n.format(yaml)
1160
1161 try:
1162 manifest = load(n.path, yaml, Tag.MAPPING)
1163 except Exception as e:
1164 print(f"parse failure! {e}")
1165 print(yaml)
1166
1167 if manifest:
1168 # print(manifest)
1169
1170 # Make sure namespaces and labels are properly set.
1171 for m in manifest:
1172 if "metadata" not in m:
1173 m["metadata"] = {}
1174
1175 metadata = m["metadata"]
1176
1177 if "labels" not in metadata:
1178 metadata["labels"] = {}
1179
1180 if ambassador_id:
1181 metadata["labels"]["kat-ambassador-id"] = ambassador_id
1182
1183 if nsp:
1184 if "namespace" not in metadata:
1185 metadata["namespace"] = nsp
1186
1187 # ...and, finally, save the manifest list.
1188 manifests[n] = list(manifest)
1189 if str(nsp) not in namespaces:
1190 namespaces.append(str(nsp))
1191
1192 for superpod in superpods.values():
1193 manifests[superpod] = superpod.get_manifest_list()
1194
1195 return manifests, namespaces
1196
1197 def _setup_k8s(self, selected):
1198 # First up, get the full manifest and save it to disk.
1199 manifests, namespaces = self.get_manifests_and_namespaces(selected)
1200
1201 configs: Dict[Node, List[Tuple[str, SequenceView]]] = OrderedDict()
1202 for n in (n for n in self.nodes if n in selected and not n.xfail):
1203 configs[n] = []
1204 for cfg in n.config():
1205 if isinstance(cfg, str):
1206 parent_config = configs[n.parent][0][1][0]
1207
1208 try:
1209 for o in load(n.path, cfg, Tag.MAPPING):
1210 parent_config.merge(o)
1211 except (YAMLScanError, YAMLParseError) as e:
1212 raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg))
1213 else:
1214 target = cfg[0]
1215
1216 try:
1217 yaml_view = load(n.path, cfg[1], Tag.MAPPING)
1218
1219 if n.ambassador_id:
1220 for obj in yaml_view:
1221 if "ambassador_id" not in obj:
1222 obj["ambassador_id"] = [n.ambassador_id]
1223
1224 configs[n].append((target, yaml_view))
1225 except (YAMLScanError, YAMLParseError) as e:
1226 raise Exception("Parse Error: %s, input text:\n%s" % (e, cfg[1]))
1227
1228 for tgt_cfgs in configs.values():
1229 for target, cfg in tgt_cfgs:
1230 for t in target.traversal:
1231 if t in manifests:
1232 k8s_yaml = manifests[t]
1233 for item in k8s_yaml:
1234 if item["kind"].lower() == "service":
1235 md = item["metadata"]
1236 if "annotations" not in md:
1237 md["annotations"] = {}
1238
1239 anns = md["annotations"]
1240
1241 if "getambassador.io/config" in anns:
1242 anns["getambassador.io/config"] += "\n" + dump(cfg)
1243 else:
1244 anns["getambassador.io/config"] = dump(cfg)
1245
1246 break
1247 else:
1248 continue
1249 break
1250 else:
1251 assert False, "no service found for target: %s" % target.path.name
1252
1253 yaml = ""
1254
1255 for v in manifests.values():
1256 yaml += dump(label(v, self.scope)) + "\n"
1257
1258 fname = "/tmp/k8s-%s.yaml" % self.scope
1259
1260 self.applied_manifests = False
1261
1262 # Always apply at this point, since we're doing the multi-run thing.
1263 manifest_changed, manifest_reason = has_changed(yaml, fname)
1264
1265 # XXX It is _so stupid_ that we're reparsing the whole manifest here.
1266 xxx_crap = pyyaml.load_all(open(fname, "r").read(), Loader=pyyaml_loader)
1267
1268 # Strip things we don't need from the manifest.
1269 trimmed_manifests = []
1270 trimmed = 0
1271 kept = 0
1272
1273 for obj in xxx_crap:
1274 keep = True
1275
1276 kind = "-nokind-"
1277 name = "-noname-"
1278 metadata: Dict[str, Any] = {}
1279 labels: Dict[str, str] = {}
1280 id_to_check: Optional[str] = None
1281
1282 if "kind" in obj:
1283 kind = obj["kind"]
1284
1285 if "metadata" in obj:
1286 metadata = obj["metadata"]
1287
1288 if "name" in metadata:
1289 name = metadata["name"]
1290
1291 if "labels" in metadata:
1292 labels = metadata["labels"]
1293
1294 if "kat-ambassador-id" in labels:
1295 id_to_check = labels["kat-ambassador-id"]
1296
1297 # print(f"metadata {metadata} id_to_check {id_to_check} obj {obj}")
1298
1299 # Keep namespaces, just in case.
1300 if kind == "Namespace":
1301 keep = True
1302 else:
1303 if id_to_check and (id_to_check in self.ids_to_strip):
1304 keep = False
1305 # print(f"...drop {kind} {name} (ID {id_to_check})")
1306 self.names_to_ignore[name] = True
1307
1308 if keep:
1309 kept += 1
1310 trimmed_manifests.append(obj)
1311 else:
1312 trimmed += 1
1313
1314 if trimmed:
1315 print(f"After trimming: kept {kept}, trimmed {trimmed}")
1316
1317 yaml = pyyaml.dump_all(trimmed_manifests, Dumper=pyyaml_dumper)
1318
1319 fname = "/tmp/k8s-%s-trimmed.yaml" % self.scope
1320
1321 self.applied_manifests = False
1322
1323 # Always apply at this point, since we're doing the multi-run thing.
1324 manifest_changed, manifest_reason = has_changed(yaml, fname)
1325
1326 # First up: CRDs.
1327 input_crds = integration_manifests.crd_manifests()
1328 if is_knative_compatible():
1329 input_crds += integration_manifests.load("knative_serving_crds")
1330
1331 # Strip out all of the schema validation, so that we can test with broken CRDs.
1332 # (KAT isn't really in the business of testing to be sure that Kubernetes can
1333 # run the K8s validators...)
1334 crds = pyyaml.load_all(input_crds, Loader=pyyaml_loader)
1335
1336 # Collect the CRDs with schema validation stripped in stripped_crds, because
1337 # pyyaml.load_all actually returns something more complex than a simple list,
1338 # so it doesn't reserialize well after being modified.
1339 stripped_crds = []
1340
1341 for crd in crds:
1342 # Guard against empty CRDs (the KNative files have some blank lines at
1343 # the end).
1344 if not crd:
1345 continue
1346
1347 if crd["apiVersion"] == "apiextensions.k8s.io/v1":
1348 # We can't naively strip the schema validation from apiextensions.k8s.io/v1 CRDs
1349 # because it is required; otherwise the API server would refuse to create the CRD,
1350 # telling us:
1351 #
1352 # CustomResourceDefinition.apiextensions.k8s.io "…" is invalid: spec.versions[0].schema.openAPIV3Schema: Required value: schemas are required
1353 #
1354 # So instead we must replace it with a schema that allows anything.
1355 for version in crd["spec"]["versions"]:
1356 if "schema" in version:
1357 version["schema"] = {
1358 "openAPIV3Schema": {
1359 "type": "object",
1360 "properties": {
1361 "apiVersion": {"type": "string"},
1362 "kind": {"type": "string"},
1363 "metadata": {"type": "object"},
1364 "spec": {
1365 "type": "object",
1366 "x-kubernetes-preserve-unknown-fields": True,
1367 },
1368 },
1369 },
1370 }
1371 elif crd["apiVersion"] == "apiextensions.k8s.io/v1beta1":
1372 crd["spec"].pop("validation", None)
1373 for version in crd["spec"]["versions"]:
1374 version.pop("schema", None)
1375 stripped_crds.append(crd)
1376
1377 final_crds = pyyaml.dump_all(stripped_crds, Dumper=pyyaml_dumper)
1378 changed, reason = has_changed(final_crds, "/tmp/k8s-CRDs.yaml")
1379
1380 if changed:
1381 print(f"CRDS changed ({reason}), applying.")
1382 if not ShellCommand.run_with_retry(
1383 "Apply CRDs",
1384 "tools/bin/kubectl",
1385 "apply",
1386 "-f",
1387 "/tmp/k8s-CRDs.yaml",
1388 retries=5,
1389 sleep_seconds=10,
1390 ):
1391 raise RuntimeError("Failed applying CRDs")
1392
1393 print("waiting for emissary-apiext server to become available")
1394 if os.system(
1395 "kubectl wait --timeout=90s --for=condition=available deployment emissary-apiext -n emissary-system > /dev/null 2>&1"
1396 ):
1397 raise RuntimeError(
1398 "emissary-apiext server did not become available within 90 seconds"
1399 )
1400 print("emissary-apiext server is available")
1401
1402 tries_left = 10
1403
1404 while (
1405 os.system("tools/bin/kubectl get crd mappings.getambassador.io > /dev/null 2>&1")
1406 != 0
1407 ):
1408 tries_left -= 1
1409
1410 if tries_left <= 0:
1411 raise RuntimeError("CRDs never became available")
1412
1413 print("sleeping for CRDs... (%d)" % tries_left)
1414 time.sleep(5)
1415 else:
1416 print(f"CRDS unchanged {reason}, skipping apply.")
1417
1418 # Next up: the KAT pod.
1419 kat_client_manifests = integration_manifests.load("kat_client_pod")
1420 if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
1421 kat_client_manifests = (
1422 integration_manifests.namespace_manifest("default") + kat_client_manifests
1423 )
1424 changed, reason = has_changed(
1425 integration_manifests.format(kat_client_manifests), "/tmp/k8s-kat-pod.yaml"
1426 )
1427
1428 if changed:
1429 print(f"KAT pod definition changed ({reason}), applying")
1430 if not ShellCommand.run_with_retry(
1431 "Apply KAT pod",
1432 "tools/bin/kubectl",
1433 "apply",
1434 "-f",
1435 "/tmp/k8s-kat-pod.yaml",
1436 "-n",
1437 "default",
1438 retries=5,
1439 sleep_seconds=10,
1440 ):
1441 raise RuntimeError("Could not apply manifest for KAT pod")
1442
1443 tries_left = 3
1444 time.sleep(1)
1445
1446 while True:
1447 if ShellCommand.run(
1448 "wait for KAT pod",
1449 "tools/bin/kubectl",
1450 "-n",
1451 "default",
1452 "wait",
1453 "--timeout=30s",
1454 "--for=condition=Ready",
1455 "pod",
1456 "kat",
1457 ):
1458 print("KAT pod ready")
1459 break
1460
1461 tries_left -= 1
1462
1463 if tries_left <= 0:
1464 raise RuntimeError("KAT pod never became available")
1465
1466 print("sleeping for KAT pod... (%d)" % tries_left)
1467 time.sleep(5)
1468 else:
1469 print(f"KAT pod definition unchanged {reason}, skipping apply.")
1470
1471 # Use a dummy pod to get around the !*@&#$!*@&# DockerHub rate limit.
1472 # XXX Better: switch to GCR.
1473 dummy_pod = integration_manifests.load("dummy_pod")
1474 if os.environ.get("DEV_USE_IMAGEPULLSECRET", False):
1475 dummy_pod = integration_manifests.namespace_manifest("default") + dummy_pod
1476 changed, reason = has_changed(
1477 integration_manifests.format(dummy_pod), "/tmp/k8s-dummy-pod.yaml"
1478 )
1479
1480 if changed:
1481 print(f"Dummy pod definition changed ({reason}), applying")
1482 if not ShellCommand.run_with_retry(
1483 "Apply dummy pod",
1484 "tools/bin/kubectl",
1485 "apply",
1486 "-f",
1487 "/tmp/k8s-dummy-pod.yaml",
1488 "-n",
1489 "default",
1490 retries=5,
1491 sleep_seconds=10,
1492 ):
1493 raise RuntimeError("Could not apply manifest for dummy pod")
1494
1495 tries_left = 3
1496 time.sleep(1)
1497
1498 while True:
1499 if ShellCommand.run(
1500 "wait for dummy pod",
1501 "tools/bin/kubectl",
1502 "-n",
1503 "default",
1504 "wait",
1505 "--timeout=30s",
1506 "--for=condition=Ready",
1507 "pod",
1508 "dummy-pod",
1509 ):
1510 print("Dummy pod ready")
1511 break
1512
1513 tries_left -= 1
1514
1515 if tries_left <= 0:
1516 raise RuntimeError("Dummy pod never became available")
1517
1518 print("sleeping for dummy pod... (%d)" % tries_left)
1519 time.sleep(5)
1520 else:
1521 print(f"Dummy pod definition unchanged {reason}, skipping apply.")
1522
1523 # # Clear out old stuff.
1524 if os.environ.get("DEV_CLEAN_K8S_RESOURCES", False):
1525 print("Clearing cluster...")
1526 ShellCommand.run(
1527 "clear old Kubernetes namespaces",
1528 "tools/bin/kubectl",
1529 "delete",
1530 "namespaces",
1531 "-l",
1532 "scope=AmbassadorTest",
1533 verbose=True,
1534 )
1535 ShellCommand.run(
1536 "clear old Kubernetes pods etc.",
1537 "tools/bin/kubectl",
1538 "delete",
1539 "all",
1540 "-l",
1541 "scope=AmbassadorTest",
1542 "--all-namespaces",
1543 verbose=True,
1544 )
1545
1546 # XXX: better prune selector label
1547 if manifest_changed:
1548 print(f"manifest changed ({manifest_reason}), applying...")
1549 if not ShellCommand.run_with_retry(
1550 "Applying k8s manifests",
1551 "tools/bin/kubectl",
1552 "apply",
1553 "--prune",
1554 "-l",
1555 "scope=%s" % self.scope,
1556 "-f",
1557 fname,
1558 retries=5,
1559 sleep_seconds=10,
1560 ):
1561 raise RuntimeError("Could not apply manifests")
1562 self.applied_manifests = True
1563
1564 for n in self.nodes:
1565 if n in selected and not n.xfail:
1566 action = getattr(n, "post_manifest", None)
1567 if action:
1568 action()
1569
1570 self._wait(selected)
1571
1572 print("Waiting 5s after requirements, just because...")
1573 time.sleep(5)
1574
1575 @staticmethod
1576 def _req_str(kind, req) -> str:
1577 printable = req
1578
1579 if kind == "url":
1580 printable = req.url
1581
1582 return printable
1583
1584 def _wait(self, selected: Sequence[Node]):
1585 requirements: List[Tuple[Node, str, Query]] = []
1586
1587 for node in selected:
1588 if node.xfail:
1589 continue
1590
1591 node_name = node.format("{self.path.k8s}")
1592 ambassador_id = getattr(node, "ambassador_id", None)
1593
1594 # print(f"{node_name} {ambassador_id}")
1595
1596 if ambassador_id and ambassador_id in self.ids_to_strip:
1597 # print(f"{node_name} has id {ambassador_id}, stripping")
1598 continue
1599
1600 if node_name in self.names_to_ignore:
1601 # print(f"{node_name} marked to ignore, stripping")
1602 continue
1603
1604 for kind, req in node.requirements():
1605 # print(f"{node_name} add req ({node_name}, {kind}, {self._req_str(kind, req)})")
1606 requirements.append((node, kind, req))
1607
1608 homogenous: Dict[str, List[Tuple[Node, Query]]] = {}
1609
1610 for node, kind, name in requirements:
1611 if kind not in homogenous:
1612 homogenous[kind] = []
1613
1614 homogenous[kind].append((node, name))
1615
1616 kinds = ["pod", "url"]
1617 delay = 5
1618 start = time.time()
1619 limit = int(os.environ.get("KAT_REQ_LIMIT", "900"))
1620
1621 print("Starting requirements check (limit %ds)... " % limit)
1622
1623 holdouts = {}
1624
1625 while time.time() - start < limit:
1626 for kind in kinds:
1627 if kind not in homogenous:
1628 continue
1629
1630 reqs = homogenous[kind]
1631
1632 print("Checking %s %s requirements... " % (len(reqs), kind), end="")
1633
1634 # print("\n")
1635 # for node, req in reqs:
1636 # print(f"...{node.format('{self.path.k8s}')} - {self._req_str(kind, req)}")
1637
1638 sys.stdout.flush()
1639
1640 is_ready, _holdouts = self._ready(kind, reqs)
1641
1642 if not is_ready:
1643 holdouts[kind] = _holdouts
1644 delay = int(min(delay * 2, 10))
1645 print("sleeping %ss..." % delay)
1646 sys.stdout.flush()
1647 time.sleep(delay)
1648 else:
1649 print("satisfied.")
1650 sys.stdout.flush()
1651 kinds.remove(kind)
1652
1653 break
1654 else:
1655 return
1656
1657 print("requirements not satisfied in %s seconds:" % limit)
1658
1659 for kind in kinds:
1660 _holdouts = holdouts.get(kind, [])
1661
1662 if _holdouts:
1663 print(f" {kind}:")
1664
1665 for node, text in _holdouts:
1666 print(f" {node.path.k8s} ({text})")
1667 node.log_kube_artifacts()
1668
1669 assert False, "requirements not satisfied in %s seconds" % limit
1670
1671 def _ready(self, kind, requirements):
1672 fn = {
1673 "pod": self._ready_pod,
1674 "url": self._ready_url,
1675 }[kind]
1676
1677 return fn(kind, requirements)
1678
1679 def _ready_pod(self, _, requirements):
1680 pods = self._pods(self.scope)
1681 not_ready = []
1682
1683 for node, name in requirements:
1684 if not pods.get(name, False):
1685 not_ready.append((node, name))
1686
1687 if not_ready:
1688 print("%d not ready (%s), " % (len(not_ready), name), end="")
1689 return (False, not_ready)
1690
1691 return (True, None)
1692
1693 def _ready_url(self, _, requirements):
1694 queries = []
1695
1696 for node, q in requirements:
1697 q.insecure = True
1698 q.parent = node
1699 queries.append(q)
1700
1701 # print("URL Reqs:")
1702 # print("\n".join([ f'{q.parent.name}: {q.url}' for q in queries ]))
1703
1704 result = run_queries("reqcheck", queries)
1705
1706 not_ready = [r for r in result if r.status != r.query.expected]
1707
1708 if not_ready:
1709 first = not_ready[0]
1710 print(
1711 "%d not ready (%s: %s) "
1712 % (len(not_ready), first.query.url, first.status or first.error),
1713 end="",
1714 )
1715 return (
1716 False,
1717 [
1718 (x.query.parent, "%s -- %s" % (x.query.url, x.status or x.error))
1719 for x in not_ready
1720 ],
1721 )
1722 else:
1723 return (True, None)
1724
1725 def _pods(self, scope=None):
1726 scope_for_path = scope if scope else "global"
1727 label_for_scope = f"-l scope={scope}" if scope else ""
1728
1729 fname = f"/tmp/pods-{scope_for_path}.json"
1730 if not ShellCommand.run_with_retry(
1731 "Getting pods",
1732 f"tools/bin/kubectl get pod {label_for_scope} --all-namespaces -o json > {fname}",
1733 shell=True,
1734 retries=5,
1735 sleep_seconds=10,
1736 ):
1737 raise RuntimeError("Could not get pods")
1738
1739 with open(fname) as f:
1740 raw_pods = json.load(f)
1741
1742 pods = {}
1743
1744 for p in raw_pods["items"]:
1745 name = p["metadata"]["name"]
1746
1747 cstats = p["status"].get("containerStatuses", [])
1748
1749 all_ready = True
1750
1751 for status in cstats:
1752 ready = status.get("ready", False)
1753
1754 if not ready:
1755 all_ready = False
1756 # print(f'pod {name} is not ready: {status.get("state", "unknown state")}')
1757
1758 pods[name] = all_ready
1759
1760 return pods
1761
1762 def _query(self, selected) -> None:
1763 queries = []
1764
1765 for t in self.tests:
1766 t_name = t.format("{self.path.k8s}")
1767
1768 if t in selected:
1769 t.pending = []
1770 t.queried = []
1771 t.results = []
1772 else:
1773 continue
1774
1775 ambassador_id = getattr(t, "ambassador_id", None)
1776
1777 if ambassador_id and ambassador_id in self.ids_to_strip:
1778 # print(f"{t_name}: SKIP QUERY due to ambassador_id {ambassador_id}")
1779 continue
1780
1781 # print(f"{t_name}: INCLUDE QUERY")
1782 for q in t.queries():
1783 q.parent = t
1784 t.pending.append(q)
1785 queries.append(q)
1786
1787 phases = sorted(set([q.phase for q in queries]))
1788 first = True
1789
1790 for phase in phases:
1791 if not first:
1792 phase_delay = int(os.environ.get("KAT_PHASE_DELAY", 10))
1793 print(
1794 "Waiting for {} seconds before starting phase {}...".format(phase_delay, phase)
1795 )
1796 time.sleep(phase_delay)
1797
1798 first = False
1799
1800 phase_queries = [q for q in queries if q.phase == phase]
1801
1802 print("Querying %s urls in phase %s..." % (len(phase_queries), phase), end="")
1803 sys.stdout.flush()
1804
1805 results = run_queries(f"phase{phase}", phase_queries)
1806
1807 print(" done.")
1808
1809 for r in results:
1810 t = r.parent
1811 t.queried.append(r.query)
1812
1813 if getattr(t, "debug", False) or getattr(r.query, "debug", False):
1814 print(
1815 "%s result: %s"
1816 % (t.name, json.dumps(r.as_dict(), sort_keys=True, indent=4))
1817 )
1818
1819 t.results.append(r)
1820 t.pending.remove(r.query)
View as plain text