1# Copyright 2018 Datawire. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License
14
15import logging
16import os
17import uuid
18from pathlib import Path
19from typing import Dict, List
20
21import click
22from kubernetes import client, config
23from kubernetes.client.rest import ApiException
24
25from ambassador.VERSION import Version
26
27__version__ = Version
28ambassador_id = os.getenv("AMBASSADOR_ID", "default")
29ambassador_namespace = os.environ.get("AMBASSADOR_NAMESPACE", "default")
30ambassador_single_namespace = bool("AMBASSADOR_SINGLE_NAMESPACE" in os.environ)
31ambassador_basedir = os.environ.get("AMBASSADOR_CONFIG_BASE_DIR", "/ambassador")
32
33logging.basicConfig(
34 level=logging.INFO, # if appDebug else logging.INFO,
35 format="%%(asctime)s kubewatch [%%(process)d T%%(threadName)s] %s %%(levelname)s: %%(message)s"
36 % __version__,
37 datefmt="%Y-%m-%d %H:%M:%S",
38)
39
40# logging.getLogger("datawire.scout").setLevel(logging.DEBUG)
41logger = logging.getLogger("kubewatch")
42logger.setLevel(logging.INFO)
43
44
45def kube_v1():
46 # Assume we got nothin'.
47 k8s_api = None
48
49 # XXX: is there a better way to check if we are inside a cluster or not?
50 if "KUBERNETES_SERVICE_HOST" in os.environ:
51 # If this goes horribly wrong and raises an exception (it shouldn't),
52 # we'll crash, and Kubernetes will kill the pod. That's probably not an
53 # unreasonable response.
54 config.load_incluster_config()
55 if "AMBASSADOR_VERIFY_SSL_FALSE" in os.environ:
56 configuration = client.Configuration()
57 configuration.verify_ssl = False
58 client.Configuration.set_default(configuration)
59 k8s_api = client.CoreV1Api()
60 else:
61 # Here, we might be running in docker, in which case we'll likely not
62 # have any Kube secrets, and that's OK.
63 try:
64 config.load_kube_config()
65 k8s_api = client.CoreV1Api()
66 except FileNotFoundError:
67 # Meh, just ride through.
68 logger.info("No K8s")
69
70 return k8s_api
71
72
73def hack_stored_versions(self):
74 return self._stored_versions
75
76
77def hack_stored_versions_setter(self, stored_versions):
78 self._stored_versions = stored_versions
79
80
81def hack_accepted_names(self):
82 return self._accepted_names
83
84
85def hack_accepted_names_setter(self, accepted_names):
86 self._accepted_names = accepted_names
87
88
89def hack_conditions(self):
90 return self._conditions
91
92
93def hack_conditions_setter(self, conditions):
94 self._conditions = conditions
95
96
97def check_crd_type(crd):
98 status = False
99
100 try:
101 client.apis.ApiextensionsV1beta1Api().read_custom_resource_definition(crd)
102 status = True
103 except client.rest.ApiException as e:
104 if e.status == 404:
105 logger.debug(f"CRD type definition not found for {crd}")
106 else:
107 logger.debug(f"CRD type definition unreadable for {crd}: {e.reason}")
108
109 return status
110
111
112def check_ingresses():
113 status = False
114
115 k8s_v1b1 = client.ExtensionsV1beta1Api(client.ApiClient(client.Configuration()))
116
117 if k8s_v1b1:
118 try:
119 if ambassador_single_namespace:
120 k8s_v1b1.list_namespaced_ingress(ambassador_namespace)
121 else:
122 k8s_v1b1.list_ingress_for_all_namespaces()
123 status = True
124 except ApiException as e:
125 logger.debug(f"Ingress check got {e.status}")
126
127 return status
128
129
130def check_ingress_classes():
131 status = False
132
133 api_client = client.ApiClient(client.Configuration())
134
135 if api_client:
136 try:
137 # Sadly, the Kubernetes Python library is not built with forward-compatibility in mind.
138 # Since IngressClass is a new resource, it is not discoverable through the python wrapper apis.
139 # Here, we extracted (read copy/pasted) a sample call from k8s_v1b1.list_ingress_for_all_namespaces()
140 # where we use the rest ApiClient to read ingressclasses.
141
142 path_params: Dict[str, str] = {}
143 query_params: List[str] = []
144 header_params: Dict[str, str] = {}
145
146 header_params["Accept"] = api_client.select_header_accept(
147 [
148 "application/json",
149 "application/yaml",
150 "application/vnd.kubernetes.protobuf",
151 "application/json;stream=watch",
152 "application/vnd.kubernetes.protobuf;stream=watch",
153 ]
154 )
155
156 header_params["Content-Type"] = api_client.select_header_content_type(["*/*"])
157
158 auth_settings = ["BearerToken"]
159
160 api_client.call_api(
161 "/apis/networking.k8s.io/v1beta1/ingressclasses",
162 "GET",
163 path_params,
164 query_params,
165 header_params,
166 auth_settings=auth_settings,
167 )
168 status = True
169 except ApiException as e:
170 logger.debug(f"IngressClass check got {e.status}")
171
172 return status
173
174
175def get_api_resources(group, version):
176 api_client = client.ApiClient(client.Configuration())
177
178 if api_client:
179 try:
180 # Sadly, the Kubernetes Python library supports a method equivalent to `kubectl api-versions`
181 # but nothing for `kubectl api-resources`.
182 # Here, we extracted (read copy/pasted) a sample call from ApisApi().get_api_versions()
183 # where we use the rest ApiClient to list api resources specific to a group.
184
185 path_params: Dict[str, str] = {}
186 query_params: List[str] = []
187 header_params: Dict[str, str] = {}
188
189 header_params["Accept"] = api_client.select_header_accept(["application/json"])
190
191 auth_settings = ["BearerToken"]
192
193 (data) = api_client.call_api(
194 f"/apis/{group}/{version}",
195 "GET",
196 path_params,
197 query_params,
198 header_params,
199 auth_settings=auth_settings,
200 response_type="V1APIResourceList",
201 )
202 return data[0]
203 except ApiException as e:
204 logger.error(f"get_api_resources {e.status}")
205
206 return None
207
208
209def touch_file(touchfile):
210 touchpath = Path(ambassador_basedir, touchfile)
211 try:
212 touchpath.touch()
213 except PermissionError as e:
214 logger.error(e)
215
216
217@click.command()
218@click.option("--debug", is_flag=True, help="Enable debugging")
219def main(debug):
220 if debug:
221 logger.setLevel(logging.DEBUG)
222
223 found = None
224 root_id = None
225
226 cluster_id = os.environ.get(
227 "AMBASSADOR_CLUSTER_ID", os.environ.get("AMBASSADOR_SCOUT_ID", None)
228 )
229 wanted = ambassador_namespace if ambassador_single_namespace else "default"
230
231 # Go ahead and try connecting to Kube.
232 v1 = kube_v1()
233
234 # OK. Do the cluster ID dance. If we already have one from the environment,
235 # we're good.
236
237 if cluster_id:
238 found = "environment"
239 else:
240 if v1:
241 # No ID from the environment, but we can try a lookup using Kube.
242 logger.debug("looking up ID for namespace %s" % wanted)
243
244 try:
245 ret = v1.read_namespace(wanted)
246 root_id = ret.metadata.uid
247 found = "namespace %s" % wanted
248 except ApiException as e:
249 # This means our namespace wasn't found?
250 logger.error("couldn't read namespace %s? %s" % (wanted, e))
251
252 if not root_id:
253 # OK, so we had a crack at this and something went wrong. Give up and hardcode
254 # something.
255 root_id = "00000000-0000-0000-0000-000000000000"
256 found = "hardcoded ID"
257
258 # One way or the other, we need to generate an ID here.
259 cluster_url = "d6e_id://%s/%s" % (root_id, ambassador_id)
260 logger.debug("cluster ID URL is %s" % cluster_url)
261
262 cluster_id = str(uuid.uuid5(uuid.NAMESPACE_URL, cluster_url)).lower()
263
264 # How about CRDs?
265
266 if v1:
267 # We were able to connect to Kube, so let's try to check for missing CRDs too.
268
269 required_crds = [
270 (
271 ".ambassador_ignore_crds",
272 "Main CRDs",
273 [
274 "authservices.getambassador.io",
275 "mappings.getambassador.io",
276 "modules.getambassador.io",
277 "ratelimitservices.getambassador.io",
278 "tcpmappings.getambassador.io",
279 "tlscontexts.getambassador.io",
280 "tracingservices.getambassador.io",
281 ],
282 ),
283 (
284 ".ambassador_ignore_crds_2",
285 "Resolver CRDs",
286 [
287 "consulresolvers.getambassador.io",
288 "kubernetesendpointresolvers.getambassador.io",
289 "kubernetesserviceresolvers.getambassador.io",
290 ],
291 ),
292 (".ambassador_ignore_crds_3", "Host CRDs", ["hosts.getambassador.io"]),
293 (".ambassador_ignore_crds_4", "LogService CRDs", ["logservices.getambassador.io"]),
294 (".ambassador_ignore_crds_5", "DevPortal CRDs", ["devportals.getambassador.io"]),
295 ]
296
297 # Flynn would say "Ew.", but we need to patch this till https://github.com/kubernetes-client/python/issues/376
298 # and https://github.com/kubernetes-client/gen/issues/52 are fixed \_(0.0)_/
299 client.models.V1beta1CustomResourceDefinitionStatus.accepted_names = property(
300 hack_accepted_names, hack_accepted_names_setter
301 )
302
303 client.models.V1beta1CustomResourceDefinitionStatus.conditions = property(
304 hack_conditions, hack_conditions_setter
305 )
306
307 client.models.V1beta1CustomResourceDefinitionStatus.stored_versions = property(
308 hack_stored_versions, hack_stored_versions_setter
309 )
310
311 known_api_resources = []
312 api_resources = get_api_resources("getambassador.io", "v2")
313 if api_resources:
314 known_api_resources = list(
315 map(lambda r: r.name + ".getambassador.io", api_resources.resources)
316 )
317
318 for touchfile, description, required in required_crds:
319 for crd in required:
320 if not crd in known_api_resources:
321 touch_file(touchfile)
322
323 logger.debug(
324 f"{description} are not available."
325 + " To enable CRD support, configure the Ambassador CRD type definitions and RBAC,"
326 + " then restart the Ambassador pod."
327 )
328 # logger.debug(f'touched {touchpath}')
329
330 if not check_ingress_classes():
331 touch_file(".ambassador_ignore_ingress_class")
332
333 logger.debug(
334 f"Ambassador does not have permission to read IngressClass resources."
335 + " To enable IngressClass support, configure RBAC to allow Ambassador to read IngressClass"
336 " resources, then restart the Ambassador pod."
337 )
338
339 if not check_ingresses():
340 touch_file(".ambassador_ignore_ingress")
341
342 logger.debug(
343 f"Ambassador does not have permission to read Ingress resources."
344 + " To enable Ingress support, configure RBAC to allow Ambassador to read Ingress resources,"
345 + " then restart the Ambassador pod."
346 )
347
348 # Check for our operator's CRD now
349 if check_crd_type("ambassadorinstallations.getambassador.io"):
350 touch_file(".ambassadorinstallations_ok")
351 logger.debug("ambassadorinstallations.getambassador.io CRD available")
352 else:
353 logger.debug("ambassadorinstallations.getambassador.io CRD not available")
354
355 # Have we been asked to do Knative support?
356 if os.environ.get("AMBASSADOR_KNATIVE_SUPPORT", "").lower() == "true":
357 # Yes. Check for their CRD types.
358
359 if check_crd_type("clusteringresses.networking.internal.knative.dev"):
360 touch_file(".knative_clusteringress_ok")
361 logger.debug("Knative clusteringresses available")
362 else:
363 logger.debug("Knative clusteringresses not available")
364
365 if check_crd_type("ingresses.networking.internal.knative.dev"):
366 touch_file(".knative_ingress_ok")
367 logger.debug("Knative ingresses available")
368 else:
369 logger.debug("Knative ingresses not available")
370 else:
371 # If we couldn't talk to Kube, log that, but broadly we'll expect our caller
372 # to DTRT around CRDs.
373
374 logger.debug("Kubernetes is not available, so not doing CRD check")
375
376 # Finally, spit out the cluster ID for our caller.
377 logger.debug("cluster ID is %s (from %s)" % (cluster_id, found))
378
379 print(cluster_id)
380
381
382if __name__ == "__main__":
383 main()
View as plain text