1#!/usr/bin/python
2
3# Copyright 2019-2020 Datawire. All rights reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License
16
17########
18# This is a debugging and testing tool that simulates the configuration
19# cycle of watt -> watch_hook -> Ambassador, given a set of Kubernetes
20# inputs. It's the basis of KAT local mode, and also a primary development
21# tool at Datawire.
22########
23
24import difflib
25import errno
26import filecmp
27import functools
28import io
29import logging
30import os
31import shutil
32import sys
33from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
34
35import click
36
37from watch_hook import WatchHook
38
39# Use this instead of click.option
40click_option = functools.partial(click.option, show_default=True)
41click_option_no_default = functools.partial(click.option, show_default=False)
42
43from ambassador import IR, Config, Diagnostics, EnvoyConfig
44from ambassador.fetch import ResourceFetcher
45from ambassador.utils import SecretHandler, SecretInfo, dump_json, parse_bool, parse_yaml
46
47if TYPE_CHECKING:
48 from ambassador.ir import IRResource
49
50KubeResource = Dict[str, Any]
51KubeList = List[KubeResource]
52WattDict = Dict[str, KubeList]
53
54
55class LabelSpec:
56 def __init__(self, serialization: str) -> None:
57 if "=" not in serialization:
58 raise Exception(f"label serialization must be key=value, not {serialization}")
59
60 (key, value) = serialization.split("=", 1)
61
62 self.key = key
63 self.value = value
64
65 def __str__(self) -> str:
66 return f"{self.key}={self.value}"
67
68 def match(self, labels: Dict[str, str]) -> bool:
69 return bool(labels.get(self.key, None) == self.value)
70
71
72class FieldSpec:
73 def __init__(self, serialization: str) -> None:
74 if "=" not in serialization:
75 raise Exception(f"field serialization must be key=value, not {serialization}")
76
77 (key, value) = serialization.split("=", 1)
78
79 self.elements = key.split(".")
80 self.value = value
81
82 def __str__(self) -> str:
83 return f"{'.'.join(self.elements)}={self.value}"
84
85 def match(self, resource: Dict[str, Any]) -> bool:
86 node = resource
87
88 for el in self.elements[:-1]:
89 node = node.get(el, None)
90
91 if node is None:
92 return False
93
94 return bool(node.get(self.elements[-1], None) == self.value)
95
96
97class WatchResult:
98 def __init__(self, kind: str, watch_id: str) -> None:
99 self.kind = kind
100 self.watch_id = watch_id
101
102
103class WatchSpec:
104 def __init__(
105 self,
106 logger: logging.Logger,
107 kind: str,
108 namespace: Optional[str],
109 labels: Optional[str],
110 fields: Optional[str] = None,
111 bootstrap: Optional[bool] = False,
112 ):
113 self.logger = logger
114 self.kind = kind
115 self.match_kinds = {self.kind.lower(): True}
116 self.namespace = namespace
117 self.labels: Optional[List[LabelSpec]] = None
118 self.fields: Optional[List[FieldSpec]] = None
119 self.bootstrap = bootstrap
120
121 if self.kind == "ingresses":
122 self.match_kinds["ingress"] = True
123
124 if labels:
125 self.labels = [LabelSpec(l) for l in labels.split(",")]
126
127 if fields:
128 self.fields = [FieldSpec(f) for f in fields.split(",")]
129
130 def _labelstr(self) -> str:
131 return ",".join([str(x) for x in self.labels or []])
132
133 def _fieldstr(self) -> str:
134 return ",".join([str(x) for x in self.fields or []])
135
136 @staticmethod
137 def _star(s: Optional[str]) -> str:
138 return s if s else "*"
139
140 def __repr__(self) -> str:
141 s = f"{self.kind}|{self._star(self.namespace)}|{self._star(self._fieldstr())}|{self._star(self._labelstr())}"
142
143 if self.bootstrap:
144 s += " (bootstrap)"
145
146 return f"<{s}>"
147
148 def __str__(self) -> str:
149 if self.bootstrap:
150 return f"{self.kind}|bootstrap"
151 else:
152 return f"{self.kind}|{self._star(self.namespace)}|{self._star(self._fieldstr())}|{self._star(self._labelstr())}"
153
154 def match(self, obj: KubeResource) -> Optional[WatchResult]:
155 kind: Optional[str] = obj.get("kind") or None
156 metadata: Dict[str, Any] = obj.get("metadata") or {}
157 name: Optional[str] = metadata.get("name") or None
158 namespace: str = metadata.get("namespace") or "default"
159 labels: Dict[str, str] = metadata.get("labels") or {}
160
161 if not kind or not name:
162 self.logger.error(f"K8s object requires kind and name: {obj}")
163 return None
164
165 # self.logger.debug(f"match {self}: check {obj}")
166 match_kind_str = ",".join(sorted(self.match_kinds.keys()))
167
168 # OK. Does the kind match up?
169 if kind.lower() not in self.match_kinds:
170 # self.logger.debug(f"match {self}: mismatch for kind {kind}, match_kinds {match_kind_str}")
171 return None
172
173 # How about namespace (if present)?
174 if self.namespace:
175 if namespace != self.namespace:
176 # self.logger.debug(f"match {self}: mismatch for namespace {namespace}")
177 return None
178
179 # OK, check labels...
180 if self.labels:
181 for l in self.labels:
182 if not l.match(labels):
183 # self.logger.debug(f"match {self}: mismatch for label {l}")
184 return None
185
186 # ...and fields.
187 if self.fields:
188 for f in self.fields:
189 if not f.match(obj):
190 # self.logger.debug(f"match {self}: mismatch for field {f}")
191 return None
192
193 # Woo, it worked!
194 self.logger.debug(f"match {self} - {match_kind_str}: good!")
195 # self.logger.debug(f"{obj}")
196
197 return WatchResult(kind=self.kind, watch_id=str(self))
198
199
200class Mockery:
201 def __init__(
202 self,
203 logger: logging.Logger,
204 debug: bool,
205 sources: List[str],
206 labels: Optional[str],
207 namespace: Optional[str],
208 watch: str,
209 ) -> None:
210 self.logger = logger
211 self.debug = debug
212 self.sources = sources
213 self.namespace = namespace
214 self.watch = watch
215
216 self.watch_specs: Dict[str, WatchSpec] = {}
217
218 # Set up bootstrap sources.
219 for source in sources:
220 bootstrap_watch = WatchSpec(
221 logger=self.logger,
222 kind=source,
223 namespace=self.namespace,
224 labels=labels,
225 bootstrap=True,
226 )
227
228 if not self.maybe_add(bootstrap_watch):
229 self.logger.error(f"how is a bootstrap watch not new? {bootstrap_watch}")
230 sys.exit(1)
231
232 def maybe_add(self, w: WatchSpec) -> bool:
233 key = str(w)
234
235 if key in self.watch_specs:
236 return False
237 else:
238 self.watch_specs[key] = w
239
240 return True
241
242 def load(self, manifest: KubeList) -> WattDict:
243 collected: Dict[str, Dict[str, KubeResource]] = {}
244 watt_k8s: WattDict = {}
245
246 self.logger.info("LOADING:")
247
248 for spec in self.watch_specs.values():
249 self.logger.debug(f"{repr(spec)}")
250
251 for obj in manifest:
252 metadata = obj.get("metadata") or {}
253 name = metadata.get("name")
254
255 if not name:
256 self.logger.debug(f"skipping unnamed object {obj}")
257 continue
258
259 # self.logger.debug(f"consider {obj}")
260
261 for w in self.watch_specs.values():
262 m = w.match(obj)
263
264 if m:
265 by_type = collected.setdefault(m.kind, {})
266
267 # If we already have this object's name in the collection,
268 # this is a duplicate find.
269 if name not in by_type:
270 by_type[name] = obj
271
272 # Once that's all done, flatten everything.
273 for kind in collected.keys():
274 watt_k8s[kind] = list(collected[kind].values())
275
276 self.snapshot = dump_json({"Consul": {}, "Kubernetes": watt_k8s}, pretty=True)
277
278 return watt_k8s
279
280 def run_hook(self) -> Tuple[bool, bool]:
281 self.logger.info("RUNNING HOOK")
282
283 yaml_stream = io.StringIO(self.snapshot)
284
285 wh = WatchHook(self.logger, yaml_stream)
286
287 any_changes = False
288
289 if wh.watchset:
290 for w in wh.watchset.get("kubernetes-watches") or []:
291 potential = WatchSpec(
292 logger=self.logger,
293 kind=w["kind"],
294 namespace=w.get("namespace"),
295 labels=w.get("label-selector"),
296 fields=w.get("field-selector"),
297 bootstrap=False,
298 )
299
300 if self.maybe_add(potential):
301 any_changes = True
302
303 return True, any_changes
304
305
306class MockSecretHandler(SecretHandler):
307 def load_secret(
308 self, resource: "IRResource", secret_name: str, namespace: str
309 ) -> Optional[SecretInfo]:
310 # Allow an environment variable to state whether we're in Edge Stack. But keep the
311 # existing condition as sufficient, so that there is less of a chance of breaking
312 # things running in a container with this file present.
313 if parse_bool(os.environ.get("EDGE_STACK", "false")) or os.path.exists(
314 "/ambassador/.edge_stack"
315 ):
316 if (secret_name == "fallback-self-signed-cert") and (
317 namespace == Config.ambassador_namespace
318 ):
319 # This is Edge Stack. Force the fake TLS secret.
320
321 self.logger.info(
322 f"MockSecretHandler: mocking fallback secret {secret_name}.{namespace}"
323 )
324 return SecretInfo(
325 secret_name,
326 namespace,
327 "mocked-fallback-secret",
328 "-fallback-cert-",
329 "-fallback-key-",
330 decode_b64=False,
331 )
332
333 self.logger.debug(f"MockSecretHandler: cannot load {secret_name}.{namespace}")
334 return None
335
336
337@click.command(
338 help="Mock the watt/watch_hook/diagd cycle to generate an IR from a Kubernetes YAML manifest."
339)
340@click_option("--debug/--no-debug", default=True, help="enable debugging")
341@click_option(
342 "-n", "--namespace", type=click.STRING, help="namespace to watch [default: all namespaces])"
343)
344@click_option(
345 "-s",
346 "--source",
347 type=click.STRING,
348 multiple=True,
349 help="define initial source types [default: all Ambassador resources]",
350)
351@click_option("--labels", type=click.STRING, multiple=True, help="define initial label selector")
352@click_option(
353 "--force-pod-labels/--no-force-pod-labels",
354 default=True,
355 help="copy initial label selector to /tmp/ambassador-pod-info/labels",
356)
357@click_option(
358 "--kat-name", "--kat", type=click.STRING, help="emulate a running KAT test with this name"
359)
360@click_option(
361 "-w",
362 "--watch",
363 type=click.STRING,
364 default="python /ambassador/watch_hook.py",
365 help="define a watch hook",
366)
367@click_option("--diff-path", "--diff", type=click.STRING, help="directory to diff against")
368@click_option(
369 "--include-ir/--no-include-ir",
370 "--ir/--no-ir",
371 default=False,
372 help="include IR in diff when using --diff-path",
373)
374@click_option(
375 "--include-aconf/--no-include-aconf",
376 "--aconf/--no-aconf",
377 default=False,
378 help="include AConf in diff when using --diff-path",
379)
380@click_option("--update/--no-update", default=False, help="update the diff path when finished")
381@click.argument("k8s-yaml-paths", nargs=-1)
382def main(
383 k8s_yaml_paths: List[str],
384 debug: bool,
385 force_pod_labels: bool,
386 update: bool,
387 source: List[str],
388 labels: List[str],
389 namespace: Optional[str],
390 watch: str,
391 include_ir: bool,
392 include_aconf: bool,
393 diff_path: Optional[str] = None,
394 kat_name: Optional[str] = None,
395) -> None:
396 loglevel = logging.DEBUG if debug else logging.INFO
397
398 logging.basicConfig(
399 level=loglevel,
400 format="%(asctime)s mockery %(levelname)s: %(message)s",
401 datefmt="%Y-%m-%d %H:%M:%S",
402 )
403
404 logger = logging.getLogger("mockery")
405
406 logger.debug(f"reading from {k8s_yaml_paths}")
407
408 if not source:
409 source = [
410 "Host",
411 "service",
412 "ingresses",
413 "AuthService",
414 "Listener",
415 "LogService",
416 "Mapping",
417 "Module",
418 "RateLimitService",
419 "TCPMapping",
420 "TLSContext",
421 "TracingService",
422 "ConsulResolver",
423 "KubernetesEndpointResolver",
424 "KubernetesServiceResolver",
425 ]
426
427 if namespace:
428 os.environ["AMBASSADOR_NAMESPACE"] = namespace
429
430 # Make labels a list, instead of a tuple.
431 labels = list(labels)
432 labels_to_force = {l: True for l in labels or []}
433
434 if kat_name:
435 logger.debug(f"KAT name {kat_name}")
436
437 # First set up some labels to force.
438
439 labels_to_force["scope=AmbassadorTest"] = True
440 labels_to_force[f"service={kat_name}"] = True
441
442 kat_amb_id_label = f"kat-ambassador-id={kat_name}"
443
444 if kat_amb_id_label not in labels_to_force:
445 labels_to_force[kat_amb_id_label] = True
446 labels.append(kat_amb_id_label)
447
448 os.environ["AMBASSADOR_ID"] = kat_name
449 os.environ["AMBASSADOR_LABEL_SELECTOR"] = kat_amb_id_label
450
451 # Forcibly override the cached ambassador_id.
452 Config.ambassador_id = kat_name
453
454 logger.debug(f"namespace {namespace or '*'}")
455 logger.debug(f"labels to watch {', '.join(labels)}")
456 logger.debug(f"labels to force {', '.join(sorted(labels_to_force.keys()))}")
457 logger.debug(f"watch hook {watch}")
458 logger.debug(f"sources {', '.join(source)}")
459
460 for key in sorted(os.environ.keys()):
461 if key.startswith("AMBASSADOR"):
462 logger.debug(f"${key}={os.environ[key]}")
463
464 if force_pod_labels:
465 try:
466 os.makedirs("/tmp/ambassador-pod-info")
467 except OSError as e:
468 if e.errno != errno.EEXIST:
469 raise
470
471 with open("/tmp/ambassador-pod-info/labels", "w", encoding="utf-8") as outfile:
472 for l in labels_to_force:
473 outfile.write(l)
474 outfile.write("\n")
475
476 # Pull in the YAML.
477 input_yaml = "".join([open(x, "r").read() for x in k8s_yaml_paths])
478 manifest = parse_yaml(input_yaml)
479
480 w = Mockery(logger, debug, source, ",".join(labels), namespace, watch)
481
482 iteration = 0
483
484 while True:
485 iteration += 1
486
487 if iteration > 10:
488 print(f"!!!! Not stable after 10 iterations, failing")
489 logger.error("Not stable after 10 iterations, failing")
490 sys.exit(1)
491
492 logger.info(f"======== START ITERATION {iteration}")
493
494 w.load(manifest)
495
496 logger.info(f"WATT_K8S: {w.snapshot}")
497
498 hook_ok, any_changes = w.run_hook()
499
500 if not hook_ok:
501 raise Exception("hook failed")
502
503 if any_changes:
504 logger.info(f"======== END ITERATION {iteration}: watches changed!")
505 else:
506 logger.info(f"======== END ITERATION {iteration}: stable!")
507 break
508
509 # Once here, we should be good to go.
510 try:
511 os.makedirs("/tmp/ambassador/snapshots")
512 except OSError as e:
513 if e.errno != errno.EEXIST:
514 raise
515
516 scc = MockSecretHandler(logger, "mockery", "/tmp/ambassador/snapshots", f"v{iteration}")
517
518 aconf = Config()
519
520 logger.debug(f"Config.ambassador_id {Config.ambassador_id}")
521 logger.debug(f"Config.ambassador_namespace {Config.ambassador_namespace}")
522
523 logger.info(f"STABLE WATT_K8S: {w.snapshot}")
524
525 fetcher = ResourceFetcher(logger, aconf)
526 fetcher.parse_watt(w.snapshot)
527 aconf.load_all(fetcher.sorted())
528
529 open("/tmp/ambassador/snapshots/aconf.json", "w", encoding="utf-8").write(aconf.as_json())
530
531 ir = IR(aconf, secret_handler=scc)
532
533 open("/tmp/ambassador/snapshots/ir.json", "w", encoding="utf-8").write(ir.as_json())
534
535 econf = EnvoyConfig.generate(ir, Config.envoy_api_version)
536 bootstrap_config, ads_config, clustermap = econf.split_config()
537
538 ads_config.pop("@type", None)
539 with open("/tmp/ambassador/snapshots/econf.json", "w", encoding="utf-8") as outfile:
540 outfile.write(dump_json(ads_config, pretty=True))
541
542 with open(
543 f"/tmp/ambassador/snapshots/econf-{Config.ambassador_id}.json", "w", encoding="utf-8"
544 ) as outfile:
545 outfile.write(dump_json(ads_config, pretty=True))
546
547 with open("/tmp/ambassador/snapshots/bootstrap.json", "w", encoding="utf-8") as outfile:
548 outfile.write(dump_json(bootstrap_config, pretty=True))
549
550 diag = Diagnostics(ir, econf)
551
552 with open("/tmp/ambassador/snapshots/diag.json", "w", encoding="utf-8") as outfile:
553 outfile.write(dump_json(diag.as_dict(), pretty=True))
554
555 if diff_path:
556 diffs = False
557
558 pairs_to_check = [
559 (
560 os.path.join(diff_path, "snapshots", "econf.json"),
561 "/tmp/ambassador/snapshots/econf.json",
562 ),
563 (
564 os.path.join(diff_path, "bootstrap-ads.json"),
565 "/tmp/ambassador/snapshots/bootstrap.json",
566 ),
567 ]
568
569 if include_ir:
570 pairs_to_check.append(
571 (
572 os.path.join(diff_path, "snapshots", "ir.json"),
573 "/tmp/ambassador/snapshots/ir.json",
574 )
575 )
576
577 if include_aconf:
578 pairs_to_check.append(
579 (
580 os.path.join(diff_path, "snapshots", "aconf.json"),
581 "/tmp/ambassador/snapshots/aconf.json",
582 )
583 )
584
585 for gold_path, check_path in pairs_to_check:
586 if update:
587 logger.info(f"mv {check_path} {gold_path}")
588 shutil.move(check_path, gold_path)
589 elif not filecmp.cmp(gold_path, check_path):
590 diffs = True
591
592 gold_lines = open(gold_path, "r", encoding="utf-8").readlines()
593 check_lines = open(check_path, "r", encoding="utf-8").readlines()
594
595 for line in difflib.unified_diff(
596 gold_lines, check_lines, fromfile=gold_path, tofile=check_path
597 ):
598 sys.stdout.write(line)
599
600 if diffs:
601 sys.exit(1)
602
603
604if __name__ == "__main__":
605 main()
View as plain text