1import logging
2import os
3import sys
4
5import pytest
6
7from ambassador.utils import NullSecretHandler, parse_bool
8
9logging.basicConfig(
10 level=logging.INFO,
11 format="%(asctime)s test %(levelname)s: %(message)s",
12 datefmt="%Y-%m-%d %H:%M:%S",
13)
14
15logger = logging.getLogger("ambassador")
16
17from ambassador import Config
18from ambassador.fetch import ResourceFetcher
19from ambassador.fetch.ambassador import AmbassadorProcessor
20from ambassador.fetch.dependency import (
21 DependencyManager,
22 IngressClassesDependency,
23 SecretDependency,
24 ServiceDependency,
25)
26from ambassador.fetch.k8sobject import (
27 KubernetesGVK,
28 KubernetesObject,
29 KubernetesObjectKey,
30 KubernetesObjectScope,
31)
32from ambassador.fetch.k8sprocessor import (
33 AggregateKubernetesProcessor,
34 CountingKubernetesProcessor,
35 DeduplicatingKubernetesProcessor,
36 KubernetesProcessor,
37)
38from ambassador.fetch.location import LocationManager
39from ambassador.fetch.resource import NormalizedResource, ResourceManager
40from ambassador.utils import parse_yaml
41
42
43def k8s_object_from_yaml(yaml: str) -> KubernetesObject:
44 return KubernetesObject(parse_yaml(yaml)[0])
45
46
47valid_knative_ingress = k8s_object_from_yaml(
48 """
49---
50apiVersion: networking.internal.knative.dev/v1alpha1
51kind: Ingress
52metadata:
53 annotations:
54 getambassador.io/ambassador-id: webhook
55 networking.knative.dev/ingress.class: ambassador.ingress.networking.knative.dev
56 generation: 2
57 labels:
58 serving.knative.dev/route: helloworld-go
59 serving.knative.dev/routeNamespace: test
60 serving.knative.dev/service: helloworld-go
61 name: helloworld-go
62 namespace: test
63spec:
64 rules:
65 - hosts:
66 - helloworld-go.test.svc.cluster.local
67 http:
68 paths:
69 - retries:
70 attempts: 3
71 perTryTimeout: 10m0s
72 splits:
73 - appendHeaders:
74 Knative-Serving-Namespace: test
75 Knative-Serving-Revision: helloworld-go-qf94m
76 percent: 100
77 serviceName: helloworld-go-qf94m
78 serviceNamespace: test
79 servicePort: 80
80 timeout: 10m0s
81 visibility: ClusterLocal
82 visibility: ExternalIP
83status:
84 loadBalancer:
85 ingress:
86 - domainInternal: ambassador.ambassador-webhook.svc.cluster.local
87 observedGeneration: 2
88"""
89)
90
91valid_ingress_class = k8s_object_from_yaml(
92 """
93apiVersion: networking.k8s.io/v1
94kind: IngressClass
95metadata:
96 name: external-lb
97spec:
98 controller: getambassador.io/ingress-controller
99"""
100)
101
102valid_mapping = k8s_object_from_yaml(
103 """
104---
105apiVersion: getambassador.io/v3alpha1
106kind: Mapping
107metadata:
108 name: test
109 namespace: default
110spec:
111 hostname: "*"
112 prefix: /test/
113 service: test.default
114"""
115)
116
117valid_mapping_v1 = k8s_object_from_yaml(
118 """
119---
120apiVersion: getambassador.io/v3alpha1
121kind: Mapping
122metadata:
123 name: test
124 namespace: default
125spec:
126 hostname: "*"
127 prefix: /test/
128 service: test.default
129"""
130)
131
132
133class TestKubernetesGVK:
134 def test_legacy(self):
135 gvk = KubernetesGVK("v1", "Service")
136
137 assert gvk.api_version == "v1"
138 assert gvk.kind == "Service"
139 assert gvk.api_group is None
140 assert gvk.version == "v1"
141 assert gvk.domain == "service"
142
143 def test_group(self):
144 gvk = KubernetesGVK.for_ambassador("Mapping", version="v3alpha1")
145
146 assert gvk.api_version == "getambassador.io/v3alpha1"
147 assert gvk.kind == "Mapping"
148 assert gvk.api_group == "getambassador.io"
149 assert gvk.version == "v3alpha1"
150 assert gvk.domain == "mapping.getambassador.io"
151
152
153class TestKubernetesObject:
154 def test_valid(self):
155 assert valid_knative_ingress.gvk == KubernetesGVK.for_knative_networking("Ingress")
156 assert valid_knative_ingress.namespace == "test"
157 assert valid_knative_ingress.name == "helloworld-go"
158 assert valid_knative_ingress.scope == KubernetesObjectScope.NAMESPACE
159 assert valid_knative_ingress.key == KubernetesObjectKey(
160 valid_knative_ingress.gvk, "test", "helloworld-go"
161 )
162 assert valid_knative_ingress.generation == 2
163 assert len(valid_knative_ingress.annotations) == 2
164 assert valid_knative_ingress.ambassador_id == "webhook"
165 assert len(valid_knative_ingress.labels) == 3
166 assert (
167 valid_knative_ingress.spec["rules"][0]["hosts"][0]
168 == "helloworld-go.test.svc.cluster.local"
169 )
170 assert valid_knative_ingress.status["observedGeneration"] == 2
171
172 def test_valid_cluster_scoped(self):
173 assert valid_ingress_class.name == "external-lb"
174 assert valid_ingress_class.scope == KubernetesObjectScope.CLUSTER
175 assert valid_ingress_class.key == KubernetesObjectKey(
176 valid_ingress_class.gvk, None, "external-lb"
177 )
178 assert valid_ingress_class.key.namespace is None
179
180 with pytest.raises(AttributeError):
181 valid_ingress_class.namespace
182
183 def test_invalid(self):
184 with pytest.raises(ValueError, match="not a valid Kubernetes object"):
185 k8s_object_from_yaml("apiVersion: v1")
186
187
188class TestNormalizedResource:
189 def test_kubernetes_object_conversion(self):
190 resource = NormalizedResource.from_kubernetes_object(valid_mapping)
191
192 assert resource.rkey == f"{valid_mapping.name}.{valid_mapping.namespace}"
193 assert resource.object["apiVersion"] == valid_mapping.gvk.api_version
194 assert resource.object["kind"] == valid_mapping.kind
195 assert resource.object["name"] == valid_mapping.name
196 assert resource.object["namespace"] == valid_mapping.namespace
197 assert resource.object["generation"] == valid_mapping.generation
198 assert len(resource.object["metadata_labels"]) == 1
199 assert resource.object["metadata_labels"]["ambassador_crd"] == resource.rkey
200 assert resource.object["prefix"] == valid_mapping.spec["prefix"]
201 assert resource.object["service"] == valid_mapping.spec["service"]
202
203
204class TestLocationManager:
205 def test_context_manager(self):
206 lm = LocationManager()
207
208 assert len(lm.previous) == 0
209
210 assert lm.current.filename is None
211 assert lm.current.ocount == 1
212
213 with lm.push(filename="test", ocount=2) as loc:
214 assert len(lm.previous) == 1
215 assert lm.current == loc
216
217 assert loc.filename == "test"
218 assert loc.ocount == 2
219
220 with lm.push_reset() as rloc:
221 assert len(lm.previous) == 2
222 assert lm.current == rloc
223
224 assert rloc.filename == "test"
225 assert rloc.ocount == 1
226
227 assert len(lm.previous) == 0
228
229 assert lm.current.filename is None
230 assert lm.current.ocount == 1
231
232
233class FinalizingKubernetesProcessor(KubernetesProcessor):
234
235 finalized: bool = False
236
237 def finalize(self):
238 self.finalized = True
239
240
241class TestAmbassadorProcessor:
242 def test_mapping(self):
243 aconf = Config()
244 mgr = ResourceManager(logger, aconf, DependencyManager([]))
245
246 assert AmbassadorProcessor(mgr).try_process(valid_mapping)
247 assert len(mgr.elements) == 1
248
249 aconf.load_all(mgr.elements)
250 assert len(aconf.errors) == 0
251
252 mappings = aconf.get_config("mappings")
253 assert mappings
254 assert len(mappings) == 1
255
256 mapping = next(iter(mappings.values()))
257 assert mapping.apiVersion == valid_mapping.gvk.api_version
258 assert mapping.name == valid_mapping.name
259 assert mapping.namespace == valid_mapping.namespace
260 assert mapping.prefix == valid_mapping.spec["prefix"]
261 assert mapping.service == valid_mapping.spec["service"]
262
263 def test_mapping_v1(self):
264 aconf = Config()
265 mgr = ResourceManager(logger, aconf, DependencyManager([]))
266
267 assert AmbassadorProcessor(mgr).try_process(valid_mapping_v1)
268 assert len(mgr.elements) == 1
269 print(f"mgr.elements[0]={mgr.elements[0].apiVersion}")
270
271 aconf.load_all(mgr.elements)
272 assert len(aconf.errors) == 0
273
274 mappings = aconf.get_config("mappings")
275 assert mappings
276 assert len(mappings) == 1
277
278 mapping = next(iter(mappings.values()))
279 assert mapping.apiVersion == valid_mapping_v1.gvk.api_version
280 assert mapping.name == valid_mapping_v1.name
281 assert mapping.namespace == valid_mapping_v1.namespace
282 assert mapping.prefix == valid_mapping_v1.spec["prefix"]
283 assert mapping.service == valid_mapping_v1.spec["service"]
284
285 def test_ingress_with_named_port(self):
286 isEdgeStack = parse_bool(os.environ.get("EDGE_STACK", "false"))
287
288 yaml = """
289---
290apiVersion: v1
291kind: Service
292metadata:
293 name: quote
294 namespace: default
295spec:
296 type: ClusterIP
297 ports:
298 - name: http
299 port: 3000
300 protocol: TCP
301 targetPort: 3000
302 selector:
303 app: quote
304---
305apiVersion: extensions/v1beta1
306kind: Ingress
307metadata:
308 annotations:
309 getambassador.io/ambassador-id: default
310 kubernetes.io/ingress.class: ambassador
311 name: quote
312 namespace: default
313spec:
314 rules:
315 - http:
316 paths:
317 - path: /
318 pathType: ImplementationSpecific
319 backend:
320 serviceName: quote
321 servicePort: http
322 - path: /metrics
323 pathType: ImplementationSpecific
324 backend:
325 serviceName: quote
326 servicePort: metrics
327 - path: /health
328 pathType: ImplementationSpecific
329 backend:
330 serviceName: quote
331 servicePort: 9000
332 - path: /missed-name
333 pathType: ImplementationSpecific
334 backend:
335 serviceName: missed
336 servicePort: missed
337 - path: /missed-number
338 pathType: ImplementationSpecific
339 backend:
340 serviceName: missed
341 servicePort: 8080
342status:
343 loadBalancer: {}
344"""
345 aconf = Config()
346 logger.setLevel(logging.DEBUG)
347
348 fetcher = ResourceFetcher(logger, aconf)
349 fetcher.parse_yaml(yaml, True)
350
351 mgr = fetcher.manager
352
353 expectedElements = 7 if isEdgeStack else 6
354 assert len(mgr.elements) == expectedElements
355
356 aconf.load_all(fetcher.sorted())
357 assert len(aconf.errors) == 0
358
359 mappings = aconf.get_config("mappings")
360 assert mappings
361
362 expectedMappings = 6 if isEdgeStack else 5
363 assert len(mappings) == expectedMappings
364
365 mapping_root = mappings.get("quote-0-0")
366 assert mapping_root
367 assert mapping_root.prefix == "/"
368 assert mapping_root.service == "quote.default:3000"
369
370 mapping_metrics = mappings.get("quote-0-1")
371 assert mapping_metrics
372 assert mapping_metrics.prefix == "/metrics"
373 assert mapping_metrics.service == "quote.default:metrics"
374
375 mapping_health = mappings.get("quote-0-2")
376 assert mapping_health
377 assert mapping_health.prefix == "/health"
378 assert mapping_health.service == "quote.default:9000"
379
380 mapping_missed_name = mappings.get("quote-0-3")
381 assert mapping_missed_name
382 assert mapping_missed_name.prefix == "/missed-name"
383 assert mapping_missed_name.service == "missed.default:missed"
384
385 mapping_missed_number = mappings.get("quote-0-4")
386 assert mapping_missed_number
387 assert mapping_missed_number.prefix == "/missed-number"
388 assert mapping_missed_number.service == "missed.default:8080"
389
390
391class TestAggregateKubernetesProcessor:
392 def test_aggregation(self):
393 aconf = Config()
394
395 fp = FinalizingKubernetesProcessor()
396
397 p = AggregateKubernetesProcessor(
398 [
399 CountingKubernetesProcessor(aconf, valid_knative_ingress.gvk, "test_1"),
400 CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test_2"),
401 fp,
402 ]
403 )
404
405 assert len(p.kinds()) == 2
406
407 assert p.try_process(valid_knative_ingress)
408 assert p.try_process(valid_mapping)
409
410 assert aconf.get_count("test_1") == 1
411 assert aconf.get_count("test_2") == 1
412
413 p.finalize()
414 assert fp.finalized, "Aggregate processor did not call finalizers"
415
416
417class TestDeduplicatingKubernetesProcessor:
418 def test_deduplication(self):
419 aconf = Config()
420
421 p = DeduplicatingKubernetesProcessor(
422 CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
423 )
424
425 assert p.try_process(valid_mapping)
426 assert p.try_process(valid_mapping)
427 assert p.try_process(valid_mapping)
428
429 assert aconf.get_count("test") == 1
430
431
432class TestCountingKubernetesProcessor:
433 def test_count(self):
434 aconf = Config()
435
436 p = CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
437
438 assert p.try_process(valid_mapping), "Processor rejected matching resource"
439 assert p.try_process(valid_mapping), "Processor rejected matching resource (again)"
440 assert not p.try_process(valid_knative_ingress), "Processor accepted non-matching resource"
441
442 assert aconf.get_count("test") == 2, "Processor did not increment counter"
443
444
445class TestDependencyManager:
446 def setup(self):
447 self.deps = DependencyManager(
448 [
449 SecretDependency(),
450 ServiceDependency(),
451 IngressClassesDependency(),
452 ]
453 )
454
455 def test_cyclic(self):
456 a = self.deps.for_instance(object())
457 b = self.deps.for_instance(object())
458 c = self.deps.for_instance(object())
459
460 a.provide(SecretDependency)
461 a.want(ServiceDependency)
462 b.provide(ServiceDependency)
463 b.want(IngressClassesDependency)
464 c.provide(IngressClassesDependency)
465 c.want(SecretDependency)
466
467 with pytest.raises(ValueError):
468 self.deps.sorted_watt_keys()
469
470 def test_sort(self):
471 a = self.deps.for_instance(object())
472 b = self.deps.for_instance(object())
473 c = self.deps.for_instance(object())
474
475 a.want(SecretDependency)
476 a.want(ServiceDependency)
477 a.provide(IngressClassesDependency)
478 b.provide(SecretDependency)
479 c.provide(ServiceDependency)
480
481 assert self.deps.sorted_watt_keys() == ["secret", "service", "ingressclasses"]
482
483
484if __name__ == "__main__":
485 pytest.main(sys.argv)
View as plain text