...

Text file src/github.com/datawire/ambassador/v2/python/tests/unit/test_fetch.py

Documentation: github.com/datawire/ambassador/v2/python/tests/unit

     1import logging
     2import sys
     3
     4import pytest
     5
     6logging.basicConfig(
     7    level=logging.INFO,
     8    format="%(asctime)s test %(levelname)s: %(message)s",
     9    datefmt="%Y-%m-%d %H:%M:%S",
    10)
    11
    12logger = logging.getLogger("ambassador")
    13
    14from ambassador import Config
    15from ambassador.fetch.ambassador import AmbassadorProcessor
    16from ambassador.fetch.dependency import (
    17    DependencyManager,
    18    IngressClassesDependency,
    19    SecretDependency,
    20    ServiceDependency,
    21)
    22from ambassador.fetch.k8sobject import (
    23    KubernetesGVK,
    24    KubernetesObject,
    25    KubernetesObjectKey,
    26    KubernetesObjectScope,
    27)
    28from ambassador.fetch.k8sprocessor import (
    29    AggregateKubernetesProcessor,
    30    CountingKubernetesProcessor,
    31    DeduplicatingKubernetesProcessor,
    32    KubernetesProcessor,
    33)
    34from ambassador.fetch.location import LocationManager
    35from ambassador.fetch.resource import NormalizedResource, ResourceManager
    36from ambassador.utils import parse_yaml
    37
    38
    39def k8s_object_from_yaml(yaml: str) -> KubernetesObject:
    40    return KubernetesObject(parse_yaml(yaml)[0])
    41
    42
    43valid_knative_ingress = k8s_object_from_yaml(
    44    """
    45---
    46apiVersion: networking.internal.knative.dev/v1alpha1
    47kind: Ingress
    48metadata:
    49  annotations:
    50    getambassador.io/ambassador-id: webhook
    51    networking.knative.dev/ingress.class: ambassador.ingress.networking.knative.dev
    52  generation: 2
    53  labels:
    54    serving.knative.dev/route: helloworld-go
    55    serving.knative.dev/routeNamespace: test
    56    serving.knative.dev/service: helloworld-go
    57  name: helloworld-go
    58  namespace: test
    59spec:
    60  rules:
    61  - hosts:
    62    - helloworld-go.test.svc.cluster.local
    63    http:
    64      paths:
    65      - retries:
    66          attempts: 3
    67          perTryTimeout: 10m0s
    68        splits:
    69        - appendHeaders:
    70            Knative-Serving-Namespace: test
    71            Knative-Serving-Revision: helloworld-go-qf94m
    72          percent: 100
    73          serviceName: helloworld-go-qf94m
    74          serviceNamespace: test
    75          servicePort: 80
    76        timeout: 10m0s
    77    visibility: ClusterLocal
    78  visibility: ExternalIP
    79status:
    80  loadBalancer:
    81    ingress:
    82    - domainInternal: ambassador.ambassador-webhook.svc.cluster.local
    83  observedGeneration: 2
    84"""
    85)
    86
    87valid_ingress_class = k8s_object_from_yaml(
    88    """
    89apiVersion: networking.k8s.io/v1
    90kind: IngressClass
    91metadata:
    92  name: external-lb
    93spec:
    94  controller: getambassador.io/ingress-controller
    95"""
    96)
    97
    98valid_mapping = k8s_object_from_yaml(
    99    """
   100---
   101apiVersion: getambassador.io/v3alpha1
   102kind: Mapping
   103metadata:
   104  name: test
   105  namespace: default
   106spec:
   107  hostname: "*"
   108  prefix: /test/
   109  service: test.default
   110"""
   111)
   112
   113valid_mapping_v1 = k8s_object_from_yaml(
   114    """
   115---
   116apiVersion: getambassador.io/v3alpha1
   117kind: Mapping
   118metadata:
   119  name: test
   120  namespace: default
   121spec:
   122  hostname: "*"
   123  prefix: /test/
   124  service: test.default
   125"""
   126)
   127
   128
   129class TestKubernetesGVK:
   130    def test_legacy(self):
   131        gvk = KubernetesGVK("v1", "Service")
   132
   133        assert gvk.api_version == "v1"
   134        assert gvk.kind == "Service"
   135        assert gvk.api_group is None
   136        assert gvk.version == "v1"
   137        assert gvk.domain == "service"
   138
   139    def test_group(self):
   140        gvk = KubernetesGVK.for_ambassador("Mapping", version="v3alpha1")
   141
   142        assert gvk.api_version == "getambassador.io/v3alpha1"
   143        assert gvk.kind == "Mapping"
   144        assert gvk.api_group == "getambassador.io"
   145        assert gvk.version == "v3alpha1"
   146        assert gvk.domain == "mapping.getambassador.io"
   147
   148
   149class TestKubernetesObject:
   150    def test_valid(self):
   151        assert valid_knative_ingress.gvk == KubernetesGVK.for_knative_networking("Ingress")
   152        assert valid_knative_ingress.namespace == "test"
   153        assert valid_knative_ingress.name == "helloworld-go"
   154        assert valid_knative_ingress.scope == KubernetesObjectScope.NAMESPACE
   155        assert valid_knative_ingress.key == KubernetesObjectKey(
   156            valid_knative_ingress.gvk, "test", "helloworld-go"
   157        )
   158        assert valid_knative_ingress.generation == 2
   159        assert len(valid_knative_ingress.annotations) == 2
   160        assert valid_knative_ingress.ambassador_id == "webhook"
   161        assert len(valid_knative_ingress.labels) == 3
   162        assert (
   163            valid_knative_ingress.spec["rules"][0]["hosts"][0]
   164            == "helloworld-go.test.svc.cluster.local"
   165        )
   166        assert valid_knative_ingress.status["observedGeneration"] == 2
   167
   168    def test_valid_cluster_scoped(self):
   169        assert valid_ingress_class.name == "external-lb"
   170        assert valid_ingress_class.scope == KubernetesObjectScope.CLUSTER
   171        assert valid_ingress_class.key == KubernetesObjectKey(
   172            valid_ingress_class.gvk, None, "external-lb"
   173        )
   174        assert valid_ingress_class.key.namespace is None
   175
   176        with pytest.raises(AttributeError):
   177            valid_ingress_class.namespace
   178
   179    def test_invalid(self):
   180        with pytest.raises(ValueError, match="not a valid Kubernetes object"):
   181            k8s_object_from_yaml("apiVersion: v1")
   182
   183
   184class TestNormalizedResource:
   185    def test_kubernetes_object_conversion(self):
   186        resource = NormalizedResource.from_kubernetes_object(valid_mapping)
   187
   188        assert resource.rkey == f"{valid_mapping.name}.{valid_mapping.namespace}"
   189        assert resource.object["apiVersion"] == valid_mapping.gvk.api_version
   190        assert resource.object["kind"] == valid_mapping.kind
   191        assert resource.object["name"] == valid_mapping.name
   192        assert resource.object["namespace"] == valid_mapping.namespace
   193        assert resource.object["generation"] == valid_mapping.generation
   194        assert len(resource.object["metadata_labels"]) == 1
   195        assert resource.object["metadata_labels"]["ambassador_crd"] == resource.rkey
   196        assert resource.object["prefix"] == valid_mapping.spec["prefix"]
   197        assert resource.object["service"] == valid_mapping.spec["service"]
   198
   199
   200class TestLocationManager:
   201    def test_context_manager(self):
   202        lm = LocationManager()
   203
   204        assert len(lm.previous) == 0
   205
   206        assert lm.current.filename is None
   207        assert lm.current.ocount == 1
   208
   209        with lm.push(filename="test", ocount=2) as loc:
   210            assert len(lm.previous) == 1
   211            assert lm.current == loc
   212
   213            assert loc.filename == "test"
   214            assert loc.ocount == 2
   215
   216            with lm.push_reset() as rloc:
   217                assert len(lm.previous) == 2
   218                assert lm.current == rloc
   219
   220                assert rloc.filename == "test"
   221                assert rloc.ocount == 1
   222
   223        assert len(lm.previous) == 0
   224
   225        assert lm.current.filename is None
   226        assert lm.current.ocount == 1
   227
   228
   229class FinalizingKubernetesProcessor(KubernetesProcessor):
   230
   231    finalized: bool = False
   232
   233    def finalize(self):
   234        self.finalized = True
   235
   236
   237class TestAmbassadorProcessor:
   238    def test_mapping(self):
   239        aconf = Config()
   240        mgr = ResourceManager(logger, aconf, DependencyManager([]))
   241
   242        assert AmbassadorProcessor(mgr).try_process(valid_mapping)
   243        assert len(mgr.elements) == 1
   244
   245        aconf.load_all(mgr.elements)
   246        assert len(aconf.errors) == 0
   247
   248        mappings = aconf.get_config("mappings")
   249        assert len(mappings) == 1
   250
   251        mapping = next(iter(mappings.values()))
   252        assert mapping.apiVersion == valid_mapping.gvk.api_version
   253        assert mapping.name == valid_mapping.name
   254        assert mapping.namespace == valid_mapping.namespace
   255        assert mapping.prefix == valid_mapping.spec["prefix"]
   256        assert mapping.service == valid_mapping.spec["service"]
   257
   258    def test_mapping_v1(self):
   259        aconf = Config()
   260        mgr = ResourceManager(logger, aconf, DependencyManager([]))
   261
   262        assert AmbassadorProcessor(mgr).try_process(valid_mapping_v1)
   263        assert len(mgr.elements) == 1
   264        print(f"mgr.elements[0]={mgr.elements[0].apiVersion}")
   265
   266        aconf.load_all(mgr.elements)
   267        assert len(aconf.errors) == 0
   268
   269        mappings = aconf.get_config("mappings")
   270        assert len(mappings) == 1
   271
   272        mapping = next(iter(mappings.values()))
   273        assert mapping.apiVersion == valid_mapping_v1.gvk.api_version
   274        assert mapping.name == valid_mapping_v1.name
   275        assert mapping.namespace == valid_mapping_v1.namespace
   276        assert mapping.prefix == valid_mapping_v1.spec["prefix"]
   277        assert mapping.service == valid_mapping_v1.spec["service"]
   278
   279
   280class TestAggregateKubernetesProcessor:
   281    def test_aggregation(self):
   282        aconf = Config()
   283
   284        fp = FinalizingKubernetesProcessor()
   285
   286        p = AggregateKubernetesProcessor(
   287            [
   288                CountingKubernetesProcessor(aconf, valid_knative_ingress.gvk, "test_1"),
   289                CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test_2"),
   290                fp,
   291            ]
   292        )
   293
   294        assert len(p.kinds()) == 2
   295
   296        assert p.try_process(valid_knative_ingress)
   297        assert p.try_process(valid_mapping)
   298
   299        assert aconf.get_count("test_1") == 1
   300        assert aconf.get_count("test_2") == 1
   301
   302        p.finalize()
   303        assert fp.finalized, "Aggregate processor did not call finalizers"
   304
   305
   306class TestDeduplicatingKubernetesProcessor:
   307    def test_deduplication(self):
   308        aconf = Config()
   309
   310        p = DeduplicatingKubernetesProcessor(
   311            CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
   312        )
   313
   314        assert p.try_process(valid_mapping)
   315        assert p.try_process(valid_mapping)
   316        assert p.try_process(valid_mapping)
   317
   318        assert aconf.get_count("test") == 1
   319
   320
   321class TestCountingKubernetesProcessor:
   322    def test_count(self):
   323        aconf = Config()
   324
   325        p = CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
   326
   327        assert p.try_process(valid_mapping), "Processor rejected matching resource"
   328        assert p.try_process(valid_mapping), "Processor rejected matching resource (again)"
   329        assert not p.try_process(valid_knative_ingress), "Processor accepted non-matching resource"
   330
   331        assert aconf.get_count("test") == 2, "Processor did not increment counter"
   332
   333
   334class TestDependencyManager:
   335    def setup(self):
   336        self.deps = DependencyManager(
   337            [
   338                SecretDependency(),
   339                ServiceDependency(),
   340                IngressClassesDependency(),
   341            ]
   342        )
   343
   344    def test_cyclic(self):
   345        a = self.deps.for_instance(object())
   346        b = self.deps.for_instance(object())
   347        c = self.deps.for_instance(object())
   348
   349        a.provide(SecretDependency)
   350        a.want(ServiceDependency)
   351        b.provide(ServiceDependency)
   352        b.want(IngressClassesDependency)
   353        c.provide(IngressClassesDependency)
   354        c.want(SecretDependency)
   355
   356        with pytest.raises(ValueError):
   357            self.deps.sorted_watt_keys()
   358
   359    def test_sort(self):
   360        a = self.deps.for_instance(object())
   361        b = self.deps.for_instance(object())
   362        c = self.deps.for_instance(object())
   363
   364        a.want(SecretDependency)
   365        a.want(ServiceDependency)
   366        a.provide(IngressClassesDependency)
   367        b.provide(SecretDependency)
   368        c.provide(ServiceDependency)
   369
   370        assert self.deps.sorted_watt_keys() == ["secret", "service", "ingressclasses"]
   371
   372
   373if __name__ == "__main__":
   374    pytest.main(sys.argv)

View as plain text