1#!/usr/bin/python
2
3import logging
4import os
5import sys
6from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
7
8from ambassador import IR, Config
9from ambassador.fetch import ResourceFetcher
10from ambassador.utils import ParsedService as Service
11from ambassador.utils import SavedSecret, SecretHandler, SecretInfo, dump_json
12
13if TYPE_CHECKING:
14 from ambassador.ir.irresource import IRResource # pragma: no cover
15
16# default AES's Secret name
17# (by default, we assume it will be in the same namespace as Ambassador)
18DEFAULT_AES_SECRET_NAME = "ambassador-edge-stack"
19
20# the name of some env vars that can be used for overriding
21# the AES's Secret name/namespace
22ENV_AES_SECRET_NAME = "AMBASSADOR_AES_SECRET_NAME"
23ENV_AES_SECRET_NAMESPACE = "AMBASSADOR_AES_SECRET_NAMESPACE"
24
25# the name of some env vars that can be used for overriding
26# the Cloud Connect Token resource name/namespace
27ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAME = "AGENT_CONFIG_RESOURCE_NAME"
28ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAMESPACE = "AGENT_NAMESPACE"
29DEFAULT_CLOUD_CONNECT_TOKEN_RESOURCE_NAME = "ambassador-agent-cloud-token"
30
31# Fake SecretHandler for our fake IR, below.
32
33
34class SecretRecorder(SecretHandler):
35 def __init__(self, logger: logging.Logger) -> None:
36 super().__init__(logger, "-source_root-", "-cache_dir-", "0")
37 self.needed: Dict[Tuple[str, str], SecretInfo] = {}
38
39 # Record what was requested, and always return success.
40 def load_secret(
41 self, resource: "IRResource", secret_name: str, namespace: str
42 ) -> Optional[SecretInfo]:
43 self.logger.debug(
44 "SecretRecorder (%s %s): load secret %s in namespace %s"
45 % (resource.kind, resource.name, secret_name, namespace)
46 )
47
48 return self.record_secret(secret_name, namespace)
49
50 def record_secret(self, secret_name: str, namespace: str) -> Optional[SecretInfo]:
51 secret_key = (secret_name, namespace)
52
53 if secret_key not in self.needed:
54 self.needed[secret_key] = SecretInfo(
55 secret_name, namespace, "needed-secret", "-crt-", "-key-", decode_b64=False
56 )
57 return self.needed[secret_key]
58
59 # Secrets that're still needed also get recorded.
60 def still_needed(self, resource: "IRResource", secret_name: str, namespace: str) -> None:
61 self.logger.debug(
62 "SecretRecorder (%s %s): secret %s in namespace %s is still needed"
63 % (resource.kind, resource.name, secret_name, namespace)
64 )
65
66 self.record_secret(secret_name, namespace)
67
68 # Never cache anything.
69 def cache_secret(self, resource: "IRResource", secret_info: SecretInfo):
70 self.logger.debug(
71 "SecretRecorder (%s %s): skipping cache step for secret %s in namespace %s"
72 % (resource.kind, resource.name, secret_info.name, secret_info.namespace)
73 )
74
75 return SavedSecret(
76 secret_info.name,
77 secret_info.namespace,
78 "-crt-path-",
79 "-key-path-",
80 "-user-path-",
81 "-root-crt-path",
82 {"tls.crt": "-crt-", "tls.key": "-key-", "user.key": "-user-"},
83 )
84
85
86# XXX Sooooo there's some ugly stuff here.
87#
88# We need to do a little bit of the same work that the IR does for things like
89# managing Resolvers and parsing service names. However, we really don't want to
90# do all the work of instantiating an IR.
91#
92# The solution here is to subclass the IR and take advantage of the watch_only
93# initialization keyword, which skips the hard parts of building an IR.
94
95
96class FakeIR(IR):
97 def __init__(self, aconf: Config, logger=None) -> None:
98 # If we're asked about a secret, record interest in that secret.
99 self.secret_recorder = SecretRecorder(logger)
100
101 # If we're asked about a file, it's good.
102 file_checker = lambda path: True
103
104 super().__init__(
105 aconf,
106 logger=logger,
107 watch_only=True,
108 secret_handler=self.secret_recorder,
109 file_checker=file_checker,
110 )
111
112 # Don't bother actually saving resources that come up when working with
113 # the faked modules.
114 def save_resource(self, resource: "IRResource") -> "IRResource":
115 return resource
116
117
118class WatchHook:
119 def __init__(self, logger, yaml_stream) -> None:
120 # Watch management
121
122 self.logger = logger
123
124 self.consul_watches: List[Dict[str, str]] = []
125 self.kube_watches: List[Dict[str, str]] = []
126
127 self.load_yaml(yaml_stream)
128
129 def add_kube_watch(
130 self,
131 what: str,
132 kind: str,
133 namespace: Optional[str],
134 field_selector: Optional[str] = None,
135 label_selector: Optional[str] = None,
136 ) -> None:
137 watch = {"kind": kind}
138
139 if namespace:
140 watch["namespace"] = namespace
141
142 if field_selector:
143 watch["field-selector"] = field_selector
144
145 if label_selector:
146 watch["label-selector"] = label_selector
147
148 self.logger.debug(f"{what}: add watch {watch}")
149 self.kube_watches.append(watch)
150
151 def load_yaml(self, yaml_stream):
152 self.aconf = Config()
153
154 fetcher = ResourceFetcher(self.logger, self.aconf, watch_only=True)
155 fetcher.parse_watt(yaml_stream.read())
156
157 self.aconf.load_all(fetcher.sorted())
158
159 # We can lift mappings straight from the aconf...
160 mappings = self.aconf.get_config("mappings") or {}
161
162 # ...but we need the fake IR to deal with resolvers and TLS contexts.
163 self.fake = FakeIR(self.aconf, logger=self.logger)
164
165 self.logger.debug("IR: %s" % self.fake.as_json())
166
167 resolvers = self.fake.resolvers
168 contexts = self.fake.tls_contexts
169
170 self.logger.debug(f"mappings: {len(mappings)}")
171 self.logger.debug(f"resolvers: {len(resolvers)}")
172 self.logger.debug(f"contexts: {len(contexts)}")
173
174 global_resolver = self.fake.ambassador_module.get("resolver", None)
175
176 global_label_selector = os.environ.get("AMBASSADOR_LABEL_SELECTOR", "")
177 self.logger.debug("label-selector: %s" % global_label_selector)
178
179 cloud_connect_token_resource_name = os.getenv(
180 ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAME, DEFAULT_CLOUD_CONNECT_TOKEN_RESOURCE_NAME
181 )
182 cloud_connect_token_resource_namespace = os.getenv(
183 ENV_CLOUD_CONNECT_TOKEN_RESOURCE_NAMESPACE, Config.ambassador_namespace
184 )
185 self.logger.debug(
186 f"cloud-connect-token: need configmap/secret {cloud_connect_token_resource_name}.{cloud_connect_token_resource_namespace}"
187 )
188 self.add_kube_watch(
189 f"ConfigMap {cloud_connect_token_resource_name}",
190 "configmap",
191 namespace=cloud_connect_token_resource_namespace,
192 field_selector=f"metadata.name={cloud_connect_token_resource_name}",
193 )
194 self.add_kube_watch(
195 f"Secret {cloud_connect_token_resource_name}",
196 "secret",
197 namespace=cloud_connect_token_resource_namespace,
198 field_selector=f"metadata.name={cloud_connect_token_resource_name}",
199 )
200
201 # watch the AES Secret if the edge stack is running
202 if self.fake.edge_stack_allowed:
203 aes_secret_name = os.getenv(ENV_AES_SECRET_NAME, DEFAULT_AES_SECRET_NAME)
204 aes_secret_namespace = os.getenv(ENV_AES_SECRET_NAMESPACE, Config.ambassador_namespace)
205 self.logger.debug(
206 f"edge stack detected: need secret {aes_secret_name}.{aes_secret_namespace}"
207 )
208 self.add_kube_watch(
209 f"Secret {aes_secret_name}",
210 "secret",
211 namespace=aes_secret_namespace,
212 field_selector=f"metadata.name={aes_secret_name}",
213 )
214
215 # Walk hosts.
216 for host in self.fake.get_hosts():
217 sel = host.get("selector") or {}
218 match_labels = sel.get("matchLabels") or {}
219
220 label_selectors: List[str] = []
221
222 if global_label_selector:
223 label_selectors.append(global_label_selector)
224
225 if match_labels:
226 label_selectors += [f"{l}={v}" for l, v in match_labels.items()]
227
228 label_selector = ",".join(label_selectors) if label_selectors else None
229
230 for wanted_kind in ["service", "secret"]:
231 self.add_kube_watch(
232 f"Host {host.name}", wanted_kind, host.namespace, label_selector=label_selector
233 )
234
235 for mname, mapping in mappings.items():
236 res_name = mapping.get("resolver", None)
237 res_source = "mapping"
238
239 if not res_name:
240 res_name = global_resolver
241 res_source = "defaults"
242
243 ctx_name = mapping.get("tls", None)
244
245 self.logger.debug(
246 f"Mapping {mname}: resolver {res_name} from {res_source}, service {mapping.service}, tls {ctx_name}"
247 )
248
249 if res_name:
250 resolver = resolvers.get(res_name, None)
251 self.logger.debug(f"-> resolver {resolver}")
252
253 if resolver:
254 svc = Service(logger, mapping.service, ctx_name)
255
256 if resolver.kind == "ConsulResolver":
257 self.logger.debug(f"Mapping {mname} uses Consul resolver {res_name}")
258
259 # At the moment, we stuff the resolver's datacenter into the association
260 # ID for this watch. The ResourceFetcher relies on that.
261
262 self.consul_watches.append(
263 {
264 "id": resolver.datacenter,
265 "consul-address": resolver.address,
266 "datacenter": resolver.datacenter,
267 "service-name": svc.hostname,
268 }
269 )
270 elif resolver.kind == "KubernetesEndpointResolver":
271 host = svc.hostname
272 namespace = Config.ambassador_namespace
273
274 if not host:
275 # This is really kind of impossible.
276 self.logger.error(
277 f"KubernetesEndpointResolver {res_name} has no 'hostname'"
278 )
279 continue
280
281 if "." in host:
282 (host, namespace) = host.split(".", 2)[0:2]
283
284 self.logger.debug(
285 f"...kube endpoints: svc {svc.hostname} -> host {host} namespace {namespace}"
286 )
287
288 self.add_kube_watch(
289 f"endpoint",
290 "endpoints",
291 namespace,
292 label_selector=global_label_selector,
293 field_selector=f"metadata.name={host}",
294 )
295
296 for secret_key, secret_info in self.fake.secret_recorder.needed.items():
297 self.logger.debug(f"need secret {secret_info.name}.{secret_info.namespace}")
298
299 self.add_kube_watch(
300 f"needed secret",
301 "secret",
302 secret_info.namespace,
303 label_selector=global_label_selector,
304 field_selector=f"metadata.name={secret_info.name}",
305 )
306
307 if self.fake.edge_stack_allowed:
308 # If the edge stack is allowed, make sure we watch for our fallback context.
309 self.add_kube_watch(
310 "Fallback TLSContext", "TLSContext", namespace=Config.ambassador_namespace
311 )
312
313 ambassador_basedir = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
314
315 if os.path.exists(os.path.join(ambassador_basedir, ".ambassadorinstallations_ok")):
316 self.add_kube_watch(
317 "AmbassadorInstallations",
318 "ambassadorinstallations.getambassador.io",
319 Config.ambassador_namespace,
320 )
321
322 ambassador_knative_requested = (
323 os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "-unset-").lower() == "true"
324 )
325
326 if ambassador_knative_requested:
327 self.logger.debug("Looking for Knative support...")
328
329 if os.path.exists(os.path.join(ambassador_basedir, ".knative_clusteringress_ok")):
330 # Watch for clusteringresses.networking.internal.knative.dev in any namespace and with any labels.
331
332 self.logger.debug("watching for clusteringresses.networking.internal.knative.dev")
333 self.add_kube_watch(
334 "Knative clusteringresses",
335 "clusteringresses.networking.internal.knative.dev",
336 None,
337 )
338
339 if os.path.exists(os.path.join(ambassador_basedir, ".knative_ingress_ok")):
340 # Watch for ingresses.networking.internal.knative.dev in any namespace and
341 # with any labels.
342
343 self.add_kube_watch(
344 "Knative ingresses", "ingresses.networking.internal.knative.dev", None
345 )
346
347 self.watchset = {
348 "kubernetes-watches": self.kube_watches,
349 "consul-watches": self.consul_watches,
350 }
351
352 save_dir = os.environ.get("AMBASSADOR_WATCH_DIR", "/tmp")
353
354 if save_dir:
355 watchset = dump_json(self.watchset)
356 with open(os.path.join(save_dir, "watch.json"), "w") as output:
357 output.write(watchset)
358
359
360#### Mainline.
361
362if __name__ == "__main__":
363 loglevel = logging.INFO
364
365 args = sys.argv[1:]
366
367 if args:
368 if args[0] == "--debug":
369 loglevel = logging.DEBUG
370 args.pop(0)
371 elif args[0].startswith("--"):
372 raise Exception(f"Usage: {os.path.basename(sys.argv[0])} [--debug] [path]")
373
374 logging.basicConfig(
375 level=loglevel,
376 format="%(asctime)s watch-hook %(levelname)s: %(message)s",
377 datefmt="%Y-%m-%d %H:%M:%S",
378 )
379
380 alogger = logging.getLogger("ambassador")
381 alogger.setLevel(logging.INFO)
382
383 logger = logging.getLogger("watch_hook")
384 logger.setLevel(loglevel)
385
386 yaml_stream = sys.stdin
387
388 if args:
389 yaml_stream = open(args[0], "r")
390
391 wh = WatchHook(logger, yaml_stream)
392
393 watchset = dump_json(wh.watchset)
394 sys.stdout.write(watchset)
View as plain text