1import logging
2import sys
3
4import pytest
5
6logging.basicConfig(
7 level=logging.INFO,
8 format="%(asctime)s test %(levelname)s: %(message)s",
9 datefmt="%Y-%m-%d %H:%M:%S",
10)
11
12logger = logging.getLogger("ambassador")
13
14from ambassador import Config
15from ambassador.fetch.ambassador import AmbassadorProcessor
16from ambassador.fetch.dependency import (
17 DependencyManager,
18 IngressClassesDependency,
19 SecretDependency,
20 ServiceDependency,
21)
22from ambassador.fetch.k8sobject import (
23 KubernetesGVK,
24 KubernetesObject,
25 KubernetesObjectKey,
26 KubernetesObjectScope,
27)
28from ambassador.fetch.k8sprocessor import (
29 AggregateKubernetesProcessor,
30 CountingKubernetesProcessor,
31 DeduplicatingKubernetesProcessor,
32 KubernetesProcessor,
33)
34from ambassador.fetch.location import LocationManager
35from ambassador.fetch.resource import NormalizedResource, ResourceManager
36from ambassador.utils import parse_yaml
37
38
39def k8s_object_from_yaml(yaml: str) -> KubernetesObject:
40 return KubernetesObject(parse_yaml(yaml)[0])
41
42
43valid_knative_ingress = k8s_object_from_yaml(
44 """
45---
46apiVersion: networking.internal.knative.dev/v1alpha1
47kind: Ingress
48metadata:
49 annotations:
50 getambassador.io/ambassador-id: webhook
51 networking.knative.dev/ingress.class: ambassador.ingress.networking.knative.dev
52 generation: 2
53 labels:
54 serving.knative.dev/route: helloworld-go
55 serving.knative.dev/routeNamespace: test
56 serving.knative.dev/service: helloworld-go
57 name: helloworld-go
58 namespace: test
59spec:
60 rules:
61 - hosts:
62 - helloworld-go.test.svc.cluster.local
63 http:
64 paths:
65 - retries:
66 attempts: 3
67 perTryTimeout: 10m0s
68 splits:
69 - appendHeaders:
70 Knative-Serving-Namespace: test
71 Knative-Serving-Revision: helloworld-go-qf94m
72 percent: 100
73 serviceName: helloworld-go-qf94m
74 serviceNamespace: test
75 servicePort: 80
76 timeout: 10m0s
77 visibility: ClusterLocal
78 visibility: ExternalIP
79status:
80 loadBalancer:
81 ingress:
82 - domainInternal: ambassador.ambassador-webhook.svc.cluster.local
83 observedGeneration: 2
84"""
85)
86
87valid_ingress_class = k8s_object_from_yaml(
88 """
89apiVersion: networking.k8s.io/v1
90kind: IngressClass
91metadata:
92 name: external-lb
93spec:
94 controller: getambassador.io/ingress-controller
95"""
96)
97
98valid_mapping = k8s_object_from_yaml(
99 """
100---
101apiVersion: getambassador.io/v3alpha1
102kind: Mapping
103metadata:
104 name: test
105 namespace: default
106spec:
107 hostname: "*"
108 prefix: /test/
109 service: test.default
110"""
111)
112
113valid_mapping_v1 = k8s_object_from_yaml(
114 """
115---
116apiVersion: getambassador.io/v3alpha1
117kind: Mapping
118metadata:
119 name: test
120 namespace: default
121spec:
122 hostname: "*"
123 prefix: /test/
124 service: test.default
125"""
126)
127
128
129class TestKubernetesGVK:
130 def test_legacy(self):
131 gvk = KubernetesGVK("v1", "Service")
132
133 assert gvk.api_version == "v1"
134 assert gvk.kind == "Service"
135 assert gvk.api_group is None
136 assert gvk.version == "v1"
137 assert gvk.domain == "service"
138
139 def test_group(self):
140 gvk = KubernetesGVK.for_ambassador("Mapping", version="v3alpha1")
141
142 assert gvk.api_version == "getambassador.io/v3alpha1"
143 assert gvk.kind == "Mapping"
144 assert gvk.api_group == "getambassador.io"
145 assert gvk.version == "v3alpha1"
146 assert gvk.domain == "mapping.getambassador.io"
147
148
149class TestKubernetesObject:
150 def test_valid(self):
151 assert valid_knative_ingress.gvk == KubernetesGVK.for_knative_networking("Ingress")
152 assert valid_knative_ingress.namespace == "test"
153 assert valid_knative_ingress.name == "helloworld-go"
154 assert valid_knative_ingress.scope == KubernetesObjectScope.NAMESPACE
155 assert valid_knative_ingress.key == KubernetesObjectKey(
156 valid_knative_ingress.gvk, "test", "helloworld-go"
157 )
158 assert valid_knative_ingress.generation == 2
159 assert len(valid_knative_ingress.annotations) == 2
160 assert valid_knative_ingress.ambassador_id == "webhook"
161 assert len(valid_knative_ingress.labels) == 3
162 assert (
163 valid_knative_ingress.spec["rules"][0]["hosts"][0]
164 == "helloworld-go.test.svc.cluster.local"
165 )
166 assert valid_knative_ingress.status["observedGeneration"] == 2
167
168 def test_valid_cluster_scoped(self):
169 assert valid_ingress_class.name == "external-lb"
170 assert valid_ingress_class.scope == KubernetesObjectScope.CLUSTER
171 assert valid_ingress_class.key == KubernetesObjectKey(
172 valid_ingress_class.gvk, None, "external-lb"
173 )
174 assert valid_ingress_class.key.namespace is None
175
176 with pytest.raises(AttributeError):
177 valid_ingress_class.namespace
178
179 def test_invalid(self):
180 with pytest.raises(ValueError, match="not a valid Kubernetes object"):
181 k8s_object_from_yaml("apiVersion: v1")
182
183
184class TestNormalizedResource:
185 def test_kubernetes_object_conversion(self):
186 resource = NormalizedResource.from_kubernetes_object(valid_mapping)
187
188 assert resource.rkey == f"{valid_mapping.name}.{valid_mapping.namespace}"
189 assert resource.object["apiVersion"] == valid_mapping.gvk.api_version
190 assert resource.object["kind"] == valid_mapping.kind
191 assert resource.object["name"] == valid_mapping.name
192 assert resource.object["namespace"] == valid_mapping.namespace
193 assert resource.object["generation"] == valid_mapping.generation
194 assert len(resource.object["metadata_labels"]) == 1
195 assert resource.object["metadata_labels"]["ambassador_crd"] == resource.rkey
196 assert resource.object["prefix"] == valid_mapping.spec["prefix"]
197 assert resource.object["service"] == valid_mapping.spec["service"]
198
199
200class TestLocationManager:
201 def test_context_manager(self):
202 lm = LocationManager()
203
204 assert len(lm.previous) == 0
205
206 assert lm.current.filename is None
207 assert lm.current.ocount == 1
208
209 with lm.push(filename="test", ocount=2) as loc:
210 assert len(lm.previous) == 1
211 assert lm.current == loc
212
213 assert loc.filename == "test"
214 assert loc.ocount == 2
215
216 with lm.push_reset() as rloc:
217 assert len(lm.previous) == 2
218 assert lm.current == rloc
219
220 assert rloc.filename == "test"
221 assert rloc.ocount == 1
222
223 assert len(lm.previous) == 0
224
225 assert lm.current.filename is None
226 assert lm.current.ocount == 1
227
228
229class FinalizingKubernetesProcessor(KubernetesProcessor):
230
231 finalized: bool = False
232
233 def finalize(self):
234 self.finalized = True
235
236
237class TestAmbassadorProcessor:
238 def test_mapping(self):
239 aconf = Config()
240 mgr = ResourceManager(logger, aconf, DependencyManager([]))
241
242 assert AmbassadorProcessor(mgr).try_process(valid_mapping)
243 assert len(mgr.elements) == 1
244
245 aconf.load_all(mgr.elements)
246 assert len(aconf.errors) == 0
247
248 mappings = aconf.get_config("mappings")
249 assert len(mappings) == 1
250
251 mapping = next(iter(mappings.values()))
252 assert mapping.apiVersion == valid_mapping.gvk.api_version
253 assert mapping.name == valid_mapping.name
254 assert mapping.namespace == valid_mapping.namespace
255 assert mapping.prefix == valid_mapping.spec["prefix"]
256 assert mapping.service == valid_mapping.spec["service"]
257
258 def test_mapping_v1(self):
259 aconf = Config()
260 mgr = ResourceManager(logger, aconf, DependencyManager([]))
261
262 assert AmbassadorProcessor(mgr).try_process(valid_mapping_v1)
263 assert len(mgr.elements) == 1
264 print(f"mgr.elements[0]={mgr.elements[0].apiVersion}")
265
266 aconf.load_all(mgr.elements)
267 assert len(aconf.errors) == 0
268
269 mappings = aconf.get_config("mappings")
270 assert len(mappings) == 1
271
272 mapping = next(iter(mappings.values()))
273 assert mapping.apiVersion == valid_mapping_v1.gvk.api_version
274 assert mapping.name == valid_mapping_v1.name
275 assert mapping.namespace == valid_mapping_v1.namespace
276 assert mapping.prefix == valid_mapping_v1.spec["prefix"]
277 assert mapping.service == valid_mapping_v1.spec["service"]
278
279
280class TestAggregateKubernetesProcessor:
281 def test_aggregation(self):
282 aconf = Config()
283
284 fp = FinalizingKubernetesProcessor()
285
286 p = AggregateKubernetesProcessor(
287 [
288 CountingKubernetesProcessor(aconf, valid_knative_ingress.gvk, "test_1"),
289 CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test_2"),
290 fp,
291 ]
292 )
293
294 assert len(p.kinds()) == 2
295
296 assert p.try_process(valid_knative_ingress)
297 assert p.try_process(valid_mapping)
298
299 assert aconf.get_count("test_1") == 1
300 assert aconf.get_count("test_2") == 1
301
302 p.finalize()
303 assert fp.finalized, "Aggregate processor did not call finalizers"
304
305
306class TestDeduplicatingKubernetesProcessor:
307 def test_deduplication(self):
308 aconf = Config()
309
310 p = DeduplicatingKubernetesProcessor(
311 CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
312 )
313
314 assert p.try_process(valid_mapping)
315 assert p.try_process(valid_mapping)
316 assert p.try_process(valid_mapping)
317
318 assert aconf.get_count("test") == 1
319
320
321class TestCountingKubernetesProcessor:
322 def test_count(self):
323 aconf = Config()
324
325 p = CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
326
327 assert p.try_process(valid_mapping), "Processor rejected matching resource"
328 assert p.try_process(valid_mapping), "Processor rejected matching resource (again)"
329 assert not p.try_process(valid_knative_ingress), "Processor accepted non-matching resource"
330
331 assert aconf.get_count("test") == 2, "Processor did not increment counter"
332
333
334class TestDependencyManager:
335 def setup(self):
336 self.deps = DependencyManager(
337 [
338 SecretDependency(),
339 ServiceDependency(),
340 IngressClassesDependency(),
341 ]
342 )
343
344 def test_cyclic(self):
345 a = self.deps.for_instance(object())
346 b = self.deps.for_instance(object())
347 c = self.deps.for_instance(object())
348
349 a.provide(SecretDependency)
350 a.want(ServiceDependency)
351 b.provide(ServiceDependency)
352 b.want(IngressClassesDependency)
353 c.provide(IngressClassesDependency)
354 c.want(SecretDependency)
355
356 with pytest.raises(ValueError):
357 self.deps.sorted_watt_keys()
358
359 def test_sort(self):
360 a = self.deps.for_instance(object())
361 b = self.deps.for_instance(object())
362 c = self.deps.for_instance(object())
363
364 a.want(SecretDependency)
365 a.want(ServiceDependency)
366 a.provide(IngressClassesDependency)
367 b.provide(SecretDependency)
368 c.provide(ServiceDependency)
369
370 assert self.deps.sorted_watt_keys() == ["secret", "service", "ingressclasses"]
371
372
373if __name__ == "__main__":
374 pytest.main(sys.argv)
View as plain text