...

Text file src/github.com/emissary-ingress/emissary/v3/python/tests/unit/test_fetch.py

Documentation: github.com/emissary-ingress/emissary/v3/python/tests/unit

     1import logging
     2import os
     3import sys
     4
     5import pytest
     6
     7from ambassador.utils import NullSecretHandler, parse_bool
     8
     9logging.basicConfig(
    10    level=logging.INFO,
    11    format="%(asctime)s test %(levelname)s: %(message)s",
    12    datefmt="%Y-%m-%d %H:%M:%S",
    13)
    14
    15logger = logging.getLogger("ambassador")
    16
    17from ambassador import Config
    18from ambassador.fetch import ResourceFetcher
    19from ambassador.fetch.ambassador import AmbassadorProcessor
    20from ambassador.fetch.dependency import (
    21    DependencyManager,
    22    IngressClassesDependency,
    23    SecretDependency,
    24    ServiceDependency,
    25)
    26from ambassador.fetch.k8sobject import (
    27    KubernetesGVK,
    28    KubernetesObject,
    29    KubernetesObjectKey,
    30    KubernetesObjectScope,
    31)
    32from ambassador.fetch.k8sprocessor import (
    33    AggregateKubernetesProcessor,
    34    CountingKubernetesProcessor,
    35    DeduplicatingKubernetesProcessor,
    36    KubernetesProcessor,
    37)
    38from ambassador.fetch.location import LocationManager
    39from ambassador.fetch.resource import NormalizedResource, ResourceManager
    40from ambassador.utils import parse_yaml
    41
    42
    43def k8s_object_from_yaml(yaml: str) -> KubernetesObject:
    44    return KubernetesObject(parse_yaml(yaml)[0])
    45
    46
    47valid_knative_ingress = k8s_object_from_yaml(
    48    """
    49---
    50apiVersion: networking.internal.knative.dev/v1alpha1
    51kind: Ingress
    52metadata:
    53  annotations:
    54    getambassador.io/ambassador-id: webhook
    55    networking.knative.dev/ingress.class: ambassador.ingress.networking.knative.dev
    56  generation: 2
    57  labels:
    58    serving.knative.dev/route: helloworld-go
    59    serving.knative.dev/routeNamespace: test
    60    serving.knative.dev/service: helloworld-go
    61  name: helloworld-go
    62  namespace: test
    63spec:
    64  rules:
    65  - hosts:
    66    - helloworld-go.test.svc.cluster.local
    67    http:
    68      paths:
    69      - retries:
    70          attempts: 3
    71          perTryTimeout: 10m0s
    72        splits:
    73        - appendHeaders:
    74            Knative-Serving-Namespace: test
    75            Knative-Serving-Revision: helloworld-go-qf94m
    76          percent: 100
    77          serviceName: helloworld-go-qf94m
    78          serviceNamespace: test
    79          servicePort: 80
    80        timeout: 10m0s
    81    visibility: ClusterLocal
    82  visibility: ExternalIP
    83status:
    84  loadBalancer:
    85    ingress:
    86    - domainInternal: ambassador.ambassador-webhook.svc.cluster.local
    87  observedGeneration: 2
    88"""
    89)
    90
    91valid_ingress_class = k8s_object_from_yaml(
    92    """
    93apiVersion: networking.k8s.io/v1
    94kind: IngressClass
    95metadata:
    96  name: external-lb
    97spec:
    98  controller: getambassador.io/ingress-controller
    99"""
   100)
   101
   102valid_mapping = k8s_object_from_yaml(
   103    """
   104---
   105apiVersion: getambassador.io/v3alpha1
   106kind: Mapping
   107metadata:
   108  name: test
   109  namespace: default
   110spec:
   111  hostname: "*"
   112  prefix: /test/
   113  service: test.default
   114"""
   115)
   116
   117valid_mapping_v1 = k8s_object_from_yaml(
   118    """
   119---
   120apiVersion: getambassador.io/v3alpha1
   121kind: Mapping
   122metadata:
   123  name: test
   124  namespace: default
   125spec:
   126  hostname: "*"
   127  prefix: /test/
   128  service: test.default
   129"""
   130)
   131
   132
   133class TestKubernetesGVK:
   134    def test_legacy(self):
   135        gvk = KubernetesGVK("v1", "Service")
   136
   137        assert gvk.api_version == "v1"
   138        assert gvk.kind == "Service"
   139        assert gvk.api_group is None
   140        assert gvk.version == "v1"
   141        assert gvk.domain == "service"
   142
   143    def test_group(self):
   144        gvk = KubernetesGVK.for_ambassador("Mapping", version="v3alpha1")
   145
   146        assert gvk.api_version == "getambassador.io/v3alpha1"
   147        assert gvk.kind == "Mapping"
   148        assert gvk.api_group == "getambassador.io"
   149        assert gvk.version == "v3alpha1"
   150        assert gvk.domain == "mapping.getambassador.io"
   151
   152
   153class TestKubernetesObject:
   154    def test_valid(self):
   155        assert valid_knative_ingress.gvk == KubernetesGVK.for_knative_networking("Ingress")
   156        assert valid_knative_ingress.namespace == "test"
   157        assert valid_knative_ingress.name == "helloworld-go"
   158        assert valid_knative_ingress.scope == KubernetesObjectScope.NAMESPACE
   159        assert valid_knative_ingress.key == KubernetesObjectKey(
   160            valid_knative_ingress.gvk, "test", "helloworld-go"
   161        )
   162        assert valid_knative_ingress.generation == 2
   163        assert len(valid_knative_ingress.annotations) == 2
   164        assert valid_knative_ingress.ambassador_id == "webhook"
   165        assert len(valid_knative_ingress.labels) == 3
   166        assert (
   167            valid_knative_ingress.spec["rules"][0]["hosts"][0]
   168            == "helloworld-go.test.svc.cluster.local"
   169        )
   170        assert valid_knative_ingress.status["observedGeneration"] == 2
   171
   172    def test_valid_cluster_scoped(self):
   173        assert valid_ingress_class.name == "external-lb"
   174        assert valid_ingress_class.scope == KubernetesObjectScope.CLUSTER
   175        assert valid_ingress_class.key == KubernetesObjectKey(
   176            valid_ingress_class.gvk, None, "external-lb"
   177        )
   178        assert valid_ingress_class.key.namespace is None
   179
   180        with pytest.raises(AttributeError):
   181            valid_ingress_class.namespace
   182
   183    def test_invalid(self):
   184        with pytest.raises(ValueError, match="not a valid Kubernetes object"):
   185            k8s_object_from_yaml("apiVersion: v1")
   186
   187
   188class TestNormalizedResource:
   189    def test_kubernetes_object_conversion(self):
   190        resource = NormalizedResource.from_kubernetes_object(valid_mapping)
   191
   192        assert resource.rkey == f"{valid_mapping.name}.{valid_mapping.namespace}"
   193        assert resource.object["apiVersion"] == valid_mapping.gvk.api_version
   194        assert resource.object["kind"] == valid_mapping.kind
   195        assert resource.object["name"] == valid_mapping.name
   196        assert resource.object["namespace"] == valid_mapping.namespace
   197        assert resource.object["generation"] == valid_mapping.generation
   198        assert len(resource.object["metadata_labels"]) == 1
   199        assert resource.object["metadata_labels"]["ambassador_crd"] == resource.rkey
   200        assert resource.object["prefix"] == valid_mapping.spec["prefix"]
   201        assert resource.object["service"] == valid_mapping.spec["service"]
   202
   203
   204class TestLocationManager:
   205    def test_context_manager(self):
   206        lm = LocationManager()
   207
   208        assert len(lm.previous) == 0
   209
   210        assert lm.current.filename is None
   211        assert lm.current.ocount == 1
   212
   213        with lm.push(filename="test", ocount=2) as loc:
   214            assert len(lm.previous) == 1
   215            assert lm.current == loc
   216
   217            assert loc.filename == "test"
   218            assert loc.ocount == 2
   219
   220            with lm.push_reset() as rloc:
   221                assert len(lm.previous) == 2
   222                assert lm.current == rloc
   223
   224                assert rloc.filename == "test"
   225                assert rloc.ocount == 1
   226
   227        assert len(lm.previous) == 0
   228
   229        assert lm.current.filename is None
   230        assert lm.current.ocount == 1
   231
   232
   233class FinalizingKubernetesProcessor(KubernetesProcessor):
   234
   235    finalized: bool = False
   236
   237    def finalize(self):
   238        self.finalized = True
   239
   240
   241class TestAmbassadorProcessor:
   242    def test_mapping(self):
   243        aconf = Config()
   244        mgr = ResourceManager(logger, aconf, DependencyManager([]))
   245
   246        assert AmbassadorProcessor(mgr).try_process(valid_mapping)
   247        assert len(mgr.elements) == 1
   248
   249        aconf.load_all(mgr.elements)
   250        assert len(aconf.errors) == 0
   251
   252        mappings = aconf.get_config("mappings")
   253        assert mappings
   254        assert len(mappings) == 1
   255
   256        mapping = next(iter(mappings.values()))
   257        assert mapping.apiVersion == valid_mapping.gvk.api_version
   258        assert mapping.name == valid_mapping.name
   259        assert mapping.namespace == valid_mapping.namespace
   260        assert mapping.prefix == valid_mapping.spec["prefix"]
   261        assert mapping.service == valid_mapping.spec["service"]
   262
   263    def test_mapping_v1(self):
   264        aconf = Config()
   265        mgr = ResourceManager(logger, aconf, DependencyManager([]))
   266
   267        assert AmbassadorProcessor(mgr).try_process(valid_mapping_v1)
   268        assert len(mgr.elements) == 1
   269        print(f"mgr.elements[0]={mgr.elements[0].apiVersion}")
   270
   271        aconf.load_all(mgr.elements)
   272        assert len(aconf.errors) == 0
   273
   274        mappings = aconf.get_config("mappings")
   275        assert mappings
   276        assert len(mappings) == 1
   277
   278        mapping = next(iter(mappings.values()))
   279        assert mapping.apiVersion == valid_mapping_v1.gvk.api_version
   280        assert mapping.name == valid_mapping_v1.name
   281        assert mapping.namespace == valid_mapping_v1.namespace
   282        assert mapping.prefix == valid_mapping_v1.spec["prefix"]
   283        assert mapping.service == valid_mapping_v1.spec["service"]
   284
   285    def test_ingress_with_named_port(self):
   286        isEdgeStack = parse_bool(os.environ.get("EDGE_STACK", "false"))
   287
   288        yaml = """
   289---
   290apiVersion: v1
   291kind: Service
   292metadata:
   293  name: quote
   294  namespace: default
   295spec:
   296  type: ClusterIP
   297  ports:
   298  - name: http
   299    port: 3000
   300    protocol: TCP
   301    targetPort: 3000
   302  selector:
   303    app: quote
   304---
   305apiVersion: extensions/v1beta1
   306kind: Ingress
   307metadata:
   308  annotations:
   309    getambassador.io/ambassador-id: default
   310    kubernetes.io/ingress.class: ambassador
   311  name: quote
   312  namespace: default
   313spec:
   314  rules:
   315  - http:
   316      paths:
   317      - path: /
   318        pathType: ImplementationSpecific
   319        backend:
   320          serviceName: quote
   321          servicePort: http
   322      - path: /metrics
   323        pathType: ImplementationSpecific
   324        backend:
   325          serviceName: quote
   326          servicePort: metrics
   327      - path: /health
   328        pathType: ImplementationSpecific
   329        backend:
   330          serviceName: quote
   331          servicePort: 9000
   332      - path: /missed-name
   333        pathType: ImplementationSpecific
   334        backend:
   335          serviceName: missed
   336          servicePort: missed
   337      - path: /missed-number
   338        pathType: ImplementationSpecific
   339        backend:
   340          serviceName: missed
   341          servicePort: 8080
   342status:
   343  loadBalancer: {}
   344"""
   345        aconf = Config()
   346        logger.setLevel(logging.DEBUG)
   347
   348        fetcher = ResourceFetcher(logger, aconf)
   349        fetcher.parse_yaml(yaml, True)
   350
   351        mgr = fetcher.manager
   352
   353        expectedElements = 7 if isEdgeStack else 6
   354        assert len(mgr.elements) == expectedElements
   355
   356        aconf.load_all(fetcher.sorted())
   357        assert len(aconf.errors) == 0
   358
   359        mappings = aconf.get_config("mappings")
   360        assert mappings
   361
   362        expectedMappings = 6 if isEdgeStack else 5
   363        assert len(mappings) == expectedMappings
   364
   365        mapping_root = mappings.get("quote-0-0")
   366        assert mapping_root
   367        assert mapping_root.prefix == "/"
   368        assert mapping_root.service == "quote.default:3000"
   369
   370        mapping_metrics = mappings.get("quote-0-1")
   371        assert mapping_metrics
   372        assert mapping_metrics.prefix == "/metrics"
   373        assert mapping_metrics.service == "quote.default:metrics"
   374
   375        mapping_health = mappings.get("quote-0-2")
   376        assert mapping_health
   377        assert mapping_health.prefix == "/health"
   378        assert mapping_health.service == "quote.default:9000"
   379
   380        mapping_missed_name = mappings.get("quote-0-3")
   381        assert mapping_missed_name
   382        assert mapping_missed_name.prefix == "/missed-name"
   383        assert mapping_missed_name.service == "missed.default:missed"
   384
   385        mapping_missed_number = mappings.get("quote-0-4")
   386        assert mapping_missed_number
   387        assert mapping_missed_number.prefix == "/missed-number"
   388        assert mapping_missed_number.service == "missed.default:8080"
   389
   390
   391class TestAggregateKubernetesProcessor:
   392    def test_aggregation(self):
   393        aconf = Config()
   394
   395        fp = FinalizingKubernetesProcessor()
   396
   397        p = AggregateKubernetesProcessor(
   398            [
   399                CountingKubernetesProcessor(aconf, valid_knative_ingress.gvk, "test_1"),
   400                CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test_2"),
   401                fp,
   402            ]
   403        )
   404
   405        assert len(p.kinds()) == 2
   406
   407        assert p.try_process(valid_knative_ingress)
   408        assert p.try_process(valid_mapping)
   409
   410        assert aconf.get_count("test_1") == 1
   411        assert aconf.get_count("test_2") == 1
   412
   413        p.finalize()
   414        assert fp.finalized, "Aggregate processor did not call finalizers"
   415
   416
   417class TestDeduplicatingKubernetesProcessor:
   418    def test_deduplication(self):
   419        aconf = Config()
   420
   421        p = DeduplicatingKubernetesProcessor(
   422            CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
   423        )
   424
   425        assert p.try_process(valid_mapping)
   426        assert p.try_process(valid_mapping)
   427        assert p.try_process(valid_mapping)
   428
   429        assert aconf.get_count("test") == 1
   430
   431
   432class TestCountingKubernetesProcessor:
   433    def test_count(self):
   434        aconf = Config()
   435
   436        p = CountingKubernetesProcessor(aconf, valid_mapping.gvk, "test")
   437
   438        assert p.try_process(valid_mapping), "Processor rejected matching resource"
   439        assert p.try_process(valid_mapping), "Processor rejected matching resource (again)"
   440        assert not p.try_process(valid_knative_ingress), "Processor accepted non-matching resource"
   441
   442        assert aconf.get_count("test") == 2, "Processor did not increment counter"
   443
   444
   445class TestDependencyManager:
   446    def setup(self):
   447        self.deps = DependencyManager(
   448            [
   449                SecretDependency(),
   450                ServiceDependency(),
   451                IngressClassesDependency(),
   452            ]
   453        )
   454
   455    def test_cyclic(self):
   456        a = self.deps.for_instance(object())
   457        b = self.deps.for_instance(object())
   458        c = self.deps.for_instance(object())
   459
   460        a.provide(SecretDependency)
   461        a.want(ServiceDependency)
   462        b.provide(ServiceDependency)
   463        b.want(IngressClassesDependency)
   464        c.provide(IngressClassesDependency)
   465        c.want(SecretDependency)
   466
   467        with pytest.raises(ValueError):
   468            self.deps.sorted_watt_keys()
   469
   470    def test_sort(self):
   471        a = self.deps.for_instance(object())
   472        b = self.deps.for_instance(object())
   473        c = self.deps.for_instance(object())
   474
   475        a.want(SecretDependency)
   476        a.want(ServiceDependency)
   477        a.provide(IngressClassesDependency)
   478        b.provide(SecretDependency)
   479        c.provide(ServiceDependency)
   480
   481        assert self.deps.sorted_watt_keys() == ["secret", "service", "ingressclasses"]
   482
   483
   484if __name__ == "__main__":
   485    pytest.main(sys.argv)

View as plain text