1#!/usr/bin/python
2
3import logging
4import os
5import sys
6from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
7
8from ambassador import IR, Config
9from ambassador.fetch import ResourceFetcher
10from ambassador.utils import ParsedService as Service
11from ambassador.utils import SavedSecret, SecretHandler, SecretInfo, dump_json
12
13if TYPE_CHECKING:
14 from ambassador.ir.irresource import IRResource # pragma: no cover
15
16# default AES's Secret name
17# (by default, we assume it will be in the same namespace as Ambassador)
18DEFAULT_AES_SECRET_NAME = "ambassador-edge-stack"
19
20# the name of some env vars that can be used for overriding
21# the AES's Secret name/namespace
22ENV_AES_SECRET_NAME = "AMBASSADOR_AES_SECRET_NAME"
23ENV_AES_SECRET_NAMESPACE = "AMBASSADOR_AES_SECRET_NAMESPACE"
24
25# the name of some env vars that can be used for overriding
26# the Cloud Connect Token resource name/namespace
27ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAME = "AGENT_CONFIG_RESOURCE_NAME"
28ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAMESPACE = "AGENT_NAMESPACE"
29DEFAULT_CLOUD_CONNECT_TOKEN_RESOURCE_NAME = "ambassador-agent-cloud-token"
30
31# Fake SecretHandler for our fake IR, below.
32
33
34class SecretRecorder(SecretHandler):
35 def __init__(self, logger: logging.Logger) -> None:
36 super().__init__(logger, "-source_root-", "-cache_dir-", "0")
37 self.needed: Dict[Tuple[str, str], SecretInfo] = {}
38
39 # Record what was requested, and always return success.
40 def load_secret(
41 self, resource: "IRResource", secret_name: str, namespace: str
42 ) -> Optional[SecretInfo]:
43 self.logger.debug(
44 "SecretRecorder (%s %s): load secret %s in namespace %s"
45 % (resource.kind, resource.name, secret_name, namespace)
46 )
47
48 return self.record_secret(secret_name, namespace)
49
50 def record_secret(self, secret_name: str, namespace: str) -> Optional[SecretInfo]:
51 secret_key = (secret_name, namespace)
52
53 if secret_key not in self.needed:
54 self.needed[secret_key] = SecretInfo(
55 secret_name, namespace, "needed-secret", "-crt-", "-key-", decode_b64=False
56 )
57 return self.needed[secret_key]
58
59 # Secrets that're still needed also get recorded.
60 def still_needed(self, resource: "IRResource", secret_name: str, namespace: str) -> None:
61 self.logger.debug(
62 "SecretRecorder (%s %s): secret %s in namespace %s is still needed"
63 % (resource.kind, resource.name, secret_name, namespace)
64 )
65
66 self.record_secret(secret_name, namespace)
67
68 # Never cache anything.
69 def cache_secret(self, resource: "IRResource", secret_info: SecretInfo):
70 self.logger.debug(
71 "SecretRecorder (%s %s): skipping cache step for secret %s in namespace %s"
72 % (resource.kind, resource.name, secret_info.name, secret_info.namespace)
73 )
74
75 return SavedSecret(
76 secret_info.name,
77 secret_info.namespace,
78 "-crt-path-",
79 "-key-path-",
80 "-user-path-",
81 "-root-crt-path",
82 {"tls.crt": "-crt-", "tls.key": "-key-", "user.key": "-user-"},
83 )
84
85
86# XXX Sooooo there's some ugly stuff here.
87#
88# We need to do a little bit of the same work that the IR does for things like
89# managing Resolvers and parsing service names. However, we really don't want to
90# do all the work of instantiating an IR.
91#
92# The solution here is to subclass the IR and take advantage of the watch_only
93# initialization keyword, which skips the hard parts of building an IR.
94
95
96class FakeIR(IR):
97 def __init__(self, aconf: Config, logger=None) -> None:
98 # If we're asked about a secret, record interest in that secret.
99 self.secret_recorder = SecretRecorder(logger)
100
101 # If we're asked about a file, it's good.
102 file_checker = lambda path: True
103
104 super().__init__(
105 aconf,
106 logger=logger,
107 watch_only=True,
108 secret_handler=self.secret_recorder,
109 file_checker=file_checker,
110 )
111
112 # Don't bother actually saving resources that come up when working with
113 # the faked modules.
114 def save_resource(self, resource: "IRResource") -> "IRResource":
115 return resource
116
117
118class WatchHook:
119 def __init__(self, logger, yaml_stream) -> None:
120 # Watch management
121
122 self.logger = logger
123
124 self.consul_watches: List[Dict[str, str]] = []
125 self.kube_watches: List[Dict[str, str]] = []
126
127 self.load_yaml(yaml_stream)
128
129 def add_kube_watch(
130 self,
131 what: str,
132 kind: str,
133 namespace: Optional[str],
134 field_selector: Optional[str] = None,
135 label_selector: Optional[str] = None,
136 ) -> None:
137 watch = {"kind": kind}
138
139 if namespace:
140 watch["namespace"] = namespace
141
142 if field_selector:
143 watch["field-selector"] = field_selector
144
145 if label_selector:
146 watch["label-selector"] = label_selector
147
148 self.logger.debug(f"{what}: add watch {watch}")
149 self.kube_watches.append(watch)
150
151 def load_yaml(self, yaml_stream):
152 self.aconf = Config()
153
154 fetcher = ResourceFetcher(self.logger, self.aconf, watch_only=True)
155 fetcher.parse_watt(yaml_stream.read())
156
157 self.aconf.load_all(fetcher.sorted())
158
159 # We can lift mappings straight from the aconf...
160 mappings = self.aconf.get_config("mappings") or {}
161
162 # ...but we need the fake IR to deal with resolvers and TLS contexts.
163 self.fake = FakeIR(self.aconf, logger=self.logger)
164
165 self.logger.debug("IR: %s" % self.fake.as_json())
166
167 resolvers = self.fake.resolvers
168 contexts = self.fake.tls_contexts
169
170 self.logger.debug(f"mappings: {len(mappings)}")
171 self.logger.debug(f"resolvers: {len(resolvers)}")
172 self.logger.debug(f"contexts: {len(contexts)}")
173
174 global_resolver = self.fake.ambassador_module.get("resolver", None)
175
176 global_label_selector = os.environ.get("AMBASSADOR_LABEL_SELECTOR", "")
177 self.logger.debug("label-selector: %s" % global_label_selector)
178
179 cloud_connect_token_resource_name = os.getenv(
180 ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAME, DEFAULT_CLOUD_CONNECT_TOKEN_RESOURCE_NAME
181 )
182 cloud_connect_token_resource_namespace = os.getenv(
183 ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAMESPACE, Config.ambassador_namespace
184 )
185 self.logger.debug(
186 f"cloud-connect-token: need configmap/secret {cloud_connect_token_resource_name}.{cloud_connect_token_resource_namespace}"
187 )
188 self.add_kube_watch(
189 f"ConfigMap {cloud_connect_token_resource_name}",
190 "configmap",
191 namespace=cloud_connect_token_resource_namespace,
192 field_selector=f"metadata.name={cloud_connect_token_resource_name}",
193 )
194 self.add_kube_watch(
195 f"Secret {cloud_connect_token_resource_name}",
196 "secret",
197 namespace=cloud_connect_token_resource_namespace,
198 field_selector=f"metadata.name={cloud_connect_token_resource_name}",
199 )
200
201 # watch the AES Secret if the edge stack is running
202 if self.fake.edge_stack_allowed:
203 aes_secret_name = os.getenv(ENV_AES_SECRET_NAME, DEFAULT_AES_SECRET_NAME)
204 aes_secret_namespace = os.getenv(ENV_AES_SECRET_NAMESPACE, Config.ambassador_namespace)
205 self.logger.debug(
206 f"edge stack detected: need secret {aes_secret_name}.{aes_secret_namespace}"
207 )
208 self.add_kube_watch(
209 f"Secret {aes_secret_name}",
210 "secret",
211 namespace=aes_secret_namespace,
212 field_selector=f"metadata.name={aes_secret_name}",
213 )
214
215 # Walk hosts.
216 for host in self.fake.get_hosts():
217 sel = host.get("selector") or {}
218 match_labels = sel.get("matchLabels") or {}
219
220 label_selectors: List[str] = []
221
222 if global_label_selector:
223 label_selectors.append(global_label_selector)
224
225 if match_labels:
226 label_selectors += [f"{l}={v}" for l, v in match_labels.items()]
227
228 label_selector = ",".join(label_selectors) if label_selectors else None
229
230 for wanted_kind in ["service", "secret"]:
231 self.add_kube_watch(
232 f"Host {host.name}", wanted_kind, host.namespace, label_selector=label_selector
233 )
234
235 for mname, mapping in mappings.items():
236 res_name = mapping.get("resolver", None)
237 res_source = "mapping"
238
239 if not res_name:
240 res_name = global_resolver
241 res_source = "defaults"
242
243 ctx_name = mapping.get("tls", None)
244
245 self.logger.debug(
246 f"Mapping {mname}: resolver {res_name} from {res_source}, service {mapping.service}, tls {ctx_name}"
247 )
248
249 if res_name:
250 resolver = resolvers.get(res_name, None)
251 self.logger.debug(f"-> resolver {resolver}")
252
253 if resolver:
254 svc = Service(logger, mapping.service, ctx_name)
255
256 if resolver.kind == "ConsulResolver":
257 self.logger.debug(f"Mapping {mname} uses Consul resolver {res_name}")
258
259 # At the moment, we stuff the resolver's datacenter into the association
260 # ID for this watch. The ResourceFetcher relies on that.
261
262 assert resolver.datacenter
263 assert resolver.address
264 assert svc.hostname
265
266 self.consul_watches.append(
267 {
268 "id": resolver.datacenter,
269 "consul-address": resolver.address,
270 "datacenter": resolver.datacenter,
271 "service-name": svc.hostname,
272 }
273 )
274 elif resolver.kind == "KubernetesEndpointResolver":
275 hostname = svc.hostname
276 namespace = Config.ambassador_namespace
277
278 if not hostname:
279 # This is really kind of impossible.
280 self.logger.error(
281 f"KubernetesEndpointResolver {res_name} has no 'hostname'"
282 )
283 continue
284
285 if "." in hostname:
286 (hostname, namespace) = hostname.split(".", 2)[0:2]
287
288 self.logger.debug(
289 f"...kube endpoints: svc {svc.hostname} -> host {hostname} namespace {namespace}"
290 )
291
292 self.add_kube_watch(
293 f"endpoint",
294 "endpoints",
295 namespace,
296 label_selector=global_label_selector,
297 field_selector=f"metadata.name={hostname}",
298 )
299
300 for secret_key, secret_info in self.fake.secret_recorder.needed.items():
301 self.logger.debug(f"need secret {secret_info.name}.{secret_info.namespace}")
302
303 self.add_kube_watch(
304 f"needed secret",
305 "secret",
306 secret_info.namespace,
307 label_selector=global_label_selector,
308 field_selector=f"metadata.name={secret_info.name}",
309 )
310
311 if self.fake.edge_stack_allowed:
312 # If the edge stack is allowed, make sure we watch for our fallback context.
313 self.add_kube_watch(
314 "Fallback TLSContext", "TLSContext", namespace=Config.ambassador_namespace
315 )
316
317 ambassador_basedir = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
318
319 if os.path.exists(os.path.join(ambassador_basedir, ".ambassadorinstallations_ok")):
320 self.add_kube_watch(
321 "AmbassadorInstallations",
322 "ambassadorinstallations.getambassador.io",
323 Config.ambassador_namespace,
324 )
325
326 ambassador_knative_requested = (
327 os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "-unset-").lower() == "true"
328 )
329
330 if ambassador_knative_requested:
331 self.logger.debug("Looking for Knative support...")
332
333 if os.path.exists(os.path.join(ambassador_basedir, ".knative_clusteringress_ok")):
334 # Watch for clusteringresses.networking.internal.knative.dev in any namespace and with any labels.
335
336 self.logger.debug("watching for clusteringresses.networking.internal.knative.dev")
337 self.add_kube_watch(
338 "Knative clusteringresses",
339 "clusteringresses.networking.internal.knative.dev",
340 None,
341 )
342
343 if os.path.exists(os.path.join(ambassador_basedir, ".knative_ingress_ok")):
344 # Watch for ingresses.networking.internal.knative.dev in any namespace and
345 # with any labels.
346
347 self.add_kube_watch(
348 "Knative ingresses", "ingresses.networking.internal.knative.dev", None
349 )
350
351 self.watchset: Dict[str, List[Dict[str, str]]] = {
352 "kubernetes-watches": self.kube_watches,
353 "consul-watches": self.consul_watches,
354 }
355
356 save_dir = os.environ.get("AMBASSADOR_WATCH_DIR", "/tmp")
357
358 if save_dir:
359 watchset = dump_json(self.watchset)
360 with open(os.path.join(save_dir, "watch.json"), "w") as output:
361 output.write(watchset)
362
363
364#### Mainline.
365
366if __name__ == "__main__":
367 loglevel = logging.INFO
368
369 args = sys.argv[1:]
370
371 if args:
372 if args[0] == "--debug":
373 loglevel = logging.DEBUG
374 args.pop(0)
375 elif args[0].startswith("--"):
376 raise Exception(f"Usage: {os.path.basename(sys.argv[0])} [--debug] [path]")
377
378 logging.basicConfig(
379 level=loglevel,
380 format="%(asctime)s watch-hook %(levelname)s: %(message)s",
381 datefmt="%Y-%m-%d %H:%M:%S",
382 )
383
384 alogger = logging.getLogger("ambassador")
385 alogger.setLevel(logging.INFO)
386
387 logger = logging.getLogger("watch_hook")
388 logger.setLevel(loglevel)
389
390 yaml_stream = sys.stdin
391
392 if args:
393 yaml_stream = open(args[0], "r")
394
395 wh = WatchHook(logger, yaml_stream)
396
397 watchset = dump_json(wh.watchset)
398 sys.stdout.write(watchset)
View as plain text