import hashlib from base64 import b64decode from typing import Generator, List, Tuple, Union from abstract_tests import HTTP, AmbassadorTest, Node, ServiceType from kat.harness import EDGE_STACK, Query from tests.integration.manifests import namespace_manifest from tests.selfsigned import TLSCerts from tests.utils import create_crl_pem_b64 bug_404_routes = ( True # Do we erroneously send 404 responses directly instead of redirect-to-tls first? ) class TLSContextsTest(AmbassadorTest): """ This test makes sure that TLS is not turned on when it's not intended to. For example, when an 'upstream' TLS configuration is passed, the port is not supposed to switch to 443 """ def init(self): self.target = HTTP() if EDGE_STACK: self.xfail = "Not yet supported in Edge Stack" self.xfail = "FIXME: IHA" def manifests(self) -> str: return ( f""" --- apiVersion: v1 metadata: name: test-tlscontexts-secret labels: kat-ambassador-id: tlscontextstest data: tls.crt: {TLSCerts["master.datawire.io"].k8s_crt} kind: Secret type: Opaque """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: tls ambassador_id: [{self.ambassador_id}] config: upstream: enabled: True secret: test-tlscontexts-secret """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.target.path.k8s} prefix: /{self.name}/ service: {self.target.path.fqdn} """ ) def scheme(self) -> str: return "https" def queries(self): yield Query( self.url(self.name + "/"), error=["connection refused", "connection reset by peer", "EOF", "request canceled"], ) def requirements(self): yield from ( r for r in super().requirements() if r[0] == "url" and r[1].url.startswith("http://") ) class ClientCertificateAuthentication(AmbassadorTest): def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: return ( f""" --- apiVersion: v1 metadata: name: test-clientcert-client-secret labels: kat-ambassador-id: clientcertificateauthentication data: tls.crt: {TLSCerts["master.datawire.io"].k8s_crt} kind: Secret type: Opaque --- apiVersion: v1 kind: Secret metadata: name: test-clientcert-server-secret labels: kat-ambassador-id: clientcertificateauthentication type: kubernetes.io/tls data: tls.crt: {TLSCerts["ambassador.example.com"].k8s_crt} tls.key: {TLSCerts["ambassador.example.com"].k8s_key} """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: ambassador config: forward_client_cert_details: SANITIZE_SET set_current_client_cert_details: subject: true --- apiVersion: getambassador.io/v3alpha1 kind: Module ambassador_id: [{self.ambassador_id}] name: tls config: server: enabled: True secret: test-clientcert-server-secret client: enabled: True secret: test-clientcert-client-secret cert_required: True """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.target.path.k8s} prefix: /{self.name}/ service: {self.target.path.fqdn} add_request_headers: x-cert-start: { value: "%DOWNSTREAM_PEER_CERT_V_START%" } x-cert-end: { value: "%DOWNSTREAM_PEER_CERT_V_END%" } x-cert-start-custom: { value: "%DOWNSTREAM_PEER_CERT_V_START(%b %e %H:%M:%S %Y %Z)%" } x-cert-end-custom: { value: "%DOWNSTREAM_PEER_CERT_V_END(%b %e %H:%M:%S %Y %Z)%" } """ ) def scheme(self) -> str: return "https" def queries(self): yield Query( self.url(self.name + "/"), insecure=True, client_crt=TLSCerts["presto.example.com"].pubcert, client_key=TLSCerts["presto.example.com"].privkey, client_cert_required=True, ca_cert=TLSCerts["master.datawire.io"].pubcert, ) # In TLS < 1.3, there's not a dedicated alert code for "the client forgot to include a certificate", # so we get a generic alert=40 ("handshake_failure"). We also include "write: connection reset by peer" # because we've seen cases where Envoy and the client library don't play nicely, so the error report doesn't # get back before the connection closes. yield Query( self.url(self.name + "/"), insecure=True, maxTLSv="v1.2", error=["tls: handshake failure", "write: connection reset by peer"], ) # TLS 1.3 added a dedicated alert=116 ("certificate_required") for that scenario. See above for why # "write: connection reset by peer " is also accepted. yield Query( self.url(self.name + "/"), insecure=True, minTLSv="v1.3", error=["tls: certificate required", "write: connection reset by peer"], ) def check(self): cert = TLSCerts["presto.example.com"].pubcert # base64-decode the cert data after removing the "---BEGIN CERTIFICATE---" / "---END CERTIFICATE---" lines. certraw = b64decode("\n".join(l for l in cert.split("\n") if not l.startswith("-"))) # take the sha256 sum aof that. certhash = hashlib.sha256(certraw).hexdigest() assert self.results[0].backend assert self.results[0].backend.request assert self.results[0].backend.request.headers["x-forwarded-client-cert"] == [ f'Hash={certhash};Subject="CN=presto.example.com,OU=Engineering,O=Ambassador Labs,L=Boston,ST=MA,C=US"' ], ( "unexpected x-forwarded-client-cert value: %s" % self.results[0].backend.request.headers["x-forwarded-client-cert"] ) assert self.results[0].backend.request.headers["x-cert-start"] == [ "2021-11-10T13:12:00.000Z" ], ( "unexpected x-cert-start value: %s" % self.results[0].backend.request.headers["x-cert-start"] ) assert self.results[0].backend.request.headers["x-cert-end"] == [ "2099-11-10T13:12:00.000Z" ], ( "unexpected x-cert-end value: %s" % self.results[0].backend.request.headers["x-cert-end"] ) assert self.results[1].backend assert self.results[1].backend.request assert self.results[0].backend.request.headers["x-cert-start-custom"] == [ "Nov 10 13:12:00 2021 UTC" ], ( "unexpected x-cert-start-custom value: %s" % self.results[1].backend.request.headers["x-cert-start-custom"] ) assert self.results[0].backend.request.headers["x-cert-end-custom"] == [ "Nov 10 13:12:00 2099 UTC" ], ( "unexpected x-cert-end-custom value: %s" % self.results[0].backend.request.headers["x-cert-end-custom"] ) def requirements(self): for r in super().requirements(): query = r[1] query.insecure = True query.client_cert = TLSCerts["presto.example.com"].pubcert query.client_key = TLSCerts["presto.example.com"].privkey query.client_cert_required = True query.ca_cert = TLSCerts["master.datawire.io"].pubcert yield (r[0], query) class ClientCertificateAuthenticationContext(AmbassadorTest): def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: return ( self.format( f""" --- apiVersion: v1 metadata: name: ccauthctx-client-secret labels: kat-ambassador-id: {self.ambassador_id} data: tls.crt: {TLSCerts["master.datawire.io"].k8s_crt} kind: Secret type: Opaque --- apiVersion: v1 kind: Secret metadata: name: ccauthctx-server-secret labels: kat-ambassador-id: {self.ambassador_id} type: kubernetes.io/tls data: tls.crt: {TLSCerts["ambassador.example.com"].k8s_crt} tls.key: {TLSCerts["ambassador.example.com"].k8s_key} --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext metadata: name: ccauthctx-tls labels: kat-ambassador-id: {self.ambassador_id} spec: ambassador_id: [{self.ambassador_id}] hosts: [ "*" ] secret: ccauthctx-server-secret ca_secret: ccauthctx-client-secret cert_required: True """ ) + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.target.path.k8s} prefix: /{self.name}/ service: {self.target.path.fqdn} """ ) def scheme(self) -> str: return "https" def queries(self): yield Query( self.url(self.name + "/"), insecure=True, client_crt=TLSCerts["presto.example.com"].pubcert, client_key=TLSCerts["presto.example.com"].privkey, client_cert_required=True, ca_cert=TLSCerts["master.datawire.io"].pubcert, ) # In TLS < 1.3, there's not a dedicated alert code for "the client forgot to include a certificate", # so we get a generic alert=40 ("handshake_failure"). We also include "write: connection reset by peer" # because we've seen cases where Envoy and the client library don't play nicely, so the error report doesn't # get back before the connection closes. yield Query( self.url(self.name + "/"), insecure=True, maxTLSv="v1.2", error=["tls: handshake failure", "write: connection reset by peer"], ) # TLS 1.3 added a dedicated alert=116 ("certificate_required") for that scenario. See above for why # "write: connection reset by peer" is also accepted. yield Query( self.url(self.name + "/"), insecure=True, minTLSv="v1.3", error=["tls: certificate required", "write: connection reset by peer"], ) def requirements(self): for r in super().requirements(): query = r[1] query.insecure = True query.client_cert = TLSCerts["presto.example.com"].pubcert query.client_key = TLSCerts["presto.example.com"].privkey query.client_cert_required = True query.ca_cert = TLSCerts["master.datawire.io"].pubcert yield (r[0], query) class ClientCertificateAuthenticationContextCRL(AmbassadorTest): def init(self): self.xfail = "FIXME: IHA" # This test should cover TLSContext with a crl_secret self.target = HTTP() def manifests(self) -> str: return ( self.format( f""" --- apiVersion: v1 metadata: name: ccauthctxcrl-client-secret labels: kat-ambassador-id: {self.ambassador_id} data: tls.crt: {TLSCerts["master.datawire.io"].k8s_crt} kind: Secret type: Opaque --- apiVersion: v1 kind: Secret metadata: name: ccauthctxcrl-server-secret labels: kat-ambassador-id: {self.ambassador_id} type: kubernetes.io/tls data: tls.crt: {TLSCerts["ambassador.example.com"].k8s_crt} tls.key: {TLSCerts["ambassador.example.com"].k8s_key} --- apiVersion: v1 kind: Secret metadata: name: ccauthctxcrl-crl-secret labels: kat-ambassador-id: {self.ambassador_id} type: Opaque data: crl.pem: {create_crl_pem_b64(TLSCerts["master.datawire.io"].pubcert, TLSCerts["master.datawire.io"].privkey, [TLSCerts["presto.example.com"].pubcert])} --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext metadata: name: ccauthctxcrl-tls labels: kat-ambassador-id: {self.ambassador_id} spec: ambassador_id: [{self.ambassador_id}] hosts: [ "*" ] secret: ccauthctxcrl-server-secret ca_secret: ccauthctxcrl-client-secret crl_secret: ccauthctxcrl-crl-secret cert_required: True """ ) + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.target.path.k8s} prefix: / service: {self.target.path.fqdn} hostname: "*" """ ) def scheme(self) -> str: return "https" def queries(self): yield Query( self.url(self.name + "/"), insecure=True, client_crt=TLSCerts["presto.example.com"].pubcert, client_key=TLSCerts["presto.example.com"].privkey, client_cert_required=True, ca_cert=TLSCerts["master.datawire.io"].pubcert, error=["tls: revoked certificate"], ) def requirements(self): yield ("pod", self.path.k8s) class TLSOriginationSecret(AmbassadorTest): def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: return ( f""" --- apiVersion: v1 kind: Secret metadata: name: test-origination-secret labels: kat-ambassador-id: tlsoriginationsecret type: kubernetes.io/tls data: tls.crt: {TLSCerts["localhost"].k8s_crt} tls.key: {TLSCerts["localhost"].k8s_key} """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: fingerprint = ( hashlib.sha1( ( TLSCerts["localhost"].pubcert + "\n" + TLSCerts["localhost"].privkey + "\n" ).encode("utf-8") ) .hexdigest() .upper() ) yield self, f""" --- apiVersion: getambassador.io/v3alpha1 kind: Module ambassador_id: [{self.ambassador_id}] name: tls config: upstream: secret: test-origination-secret upstream-files: cert_chain_file: /tmp/ambassador/snapshots/default/secrets-decoded/test-origination-secret/{fingerprint}.crt private_key_file: /tmp/ambassador/snapshots/default/secrets-decoded/test-origination-secret/{fingerprint}.key """ yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.target.path.k8s} prefix: /{self.name}/ service: {self.target.path.fqdn} tls: upstream """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.target.path.k8s}-files prefix: /{self.name}-files/ service: {self.target.path.fqdn} tls: upstream-files """ ) def queries(self): yield Query(self.url(self.name + "/")) yield Query(self.url(self.name + "-files/")) def check(self): for r in self.results: assert r.backend assert r.backend.request assert r.backend.request.tls.enabled class TLS(AmbassadorTest): target: ServiceType def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: return ( f""" --- apiVersion: v1 kind: Secret metadata: name: test-tls-secret labels: kat-ambassador-id: tls type: kubernetes.io/tls data: tls.crt: {TLSCerts["localhost"].k8s_crt} tls.key: {TLSCerts["localhost"].k8s_key} --- apiVersion: v1 kind: Secret metadata: name: ambassador-certs labels: kat-ambassador-id: tls type: kubernetes.io/tls data: tls.crt: {TLSCerts["localhost"].k8s_crt} tls.key: {TLSCerts["localhost"].k8s_key} --- apiVersion: getambassador.io/v3alpha1 kind: Host metadata: name: tls-host labels: kat-ambassador-id: tls spec: ambassador_id: [tls] tlsSecret: name: test-tls-secret requestPolicy: insecure: action: Reject """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: # # Use self here, not self.target, because we want the TLS module to # # be annotated on the Ambassador itself. # yield self, self.format(""" # --- # apiVersion: getambassador.io/v3alpha1 # kind: Module # name: tls # ambassador_id: [{self.ambassador_id}] # config: # server: # enabled: True # secret: test-tls-secret # """) # Use self.target _here_, because we want the mapping to be annotated # on the service, not the Ambassador. Also, you don't need to include # the ambassador_id unless you need some special ambassador_id that # isn't something that kat already knows about. # # If the test were more complex, we'd probably need to do some sort # of mangling for the mapping name and prefix. For this simple test, # it's not necessary. yield self.target, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: tls_target_mapping prefix: /tls-target/ service: {self.target.path.fqdn} """ ) def scheme(self) -> str: return "https" def queries(self): yield Query(self.url("tls-target/"), insecure=True) class TLSInvalidSecret(AmbassadorTest): target: ServiceType def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: tls ambassador_id: [{self.ambassador_id}] config: server: enabled: True secret: test-certs-secret-invalid missing-secret-key: cert_chain_file: /nonesuch bad-path-info: cert_chain_file: /nonesuch private_key_file: /nonesuch validation-without-termination: enabled: True secret: test-certs-secret-invalid ca_secret: ambassador-certs """ ) yield self.target, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: tls_target_mapping prefix: /tls-target/ service: {self.target.path.fqdn} """ ) def scheme(self) -> str: return "http" def queries(self): yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2) def check(self): assert self.results[0].backend errors = self.results[0].backend.response expected = set( { "TLSContext server found no certificate in secret test-certs-secret-invalid in namespace default, ignoring...", "TLSContext bad-path-info found no cert_chain_file '/nonesuch'", "TLSContext bad-path-info found no private_key_file '/nonesuch'", "TLSContext validation-without-termination found no certificate in secret test-certs-secret-invalid in namespace default, ignoring...", "TLSContext missing-secret-key: 'cert_chain_file' requires 'private_key_file' as well", } ) current = set({}) for errsvc, errtext in errors: current.add(errtext) diff = expected - current assert len(diff) == 0, f"expected {len(expected)} errors, got {len(errors)}: Missing {diff}" class TLSContextTest(AmbassadorTest): # debug = True def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() if EDGE_STACK: self.xfail = "XFailing for now" def manifests(self) -> str: return ( namespace_manifest("secret-namespace") + f""" --- apiVersion: v1 data: tls.crt: {TLSCerts["localhost"].k8s_crt} tls.key: {TLSCerts["localhost"].k8s_key} kind: Secret metadata: name: test-tlscontext-secret-0 labels: kat-ambassador-id: tlscontexttest type: kubernetes.io/tls --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-1"].k8s_crt} tls.key: {TLSCerts["tls-context-host-1"].k8s_key} kind: Secret metadata: name: test-tlscontext-secret-1 namespace: secret-namespace labels: kat-ambassador-id: tlscontexttest type: kubernetes.io/tls --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-2"].k8s_crt} tls.key: {TLSCerts["tls-context-host-2"].k8s_key} kind: Secret metadata: name: test-tlscontext-secret-2 labels: kat-ambassador-id: tlscontexttest type: kubernetes.io/tls """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-same-prefix-1 prefix: /tls-context-same/ service: http://{self.target.path.fqdn} host: tls-context-host-1 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-same-context-1 hosts: - tls-context-host-1 secret: test-tlscontext-secret-1.secret-namespace min_tls_version: v1.0 max_tls_version: v1.3 redirect_cleartext_from: 8080 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-same-prefix-2 prefix: /tls-context-same/ service: http://{self.target.path.fqdn} host: tls-context-host-2 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-same-context-2 hosts: - tls-context-host-2 secret: test-tlscontext-secret-2 alpn_protocols: h2,http/1.1 redirect_cleartext_from: 8080 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: tls config: server: enabled: True secret: test-tlscontext-secret-0 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-other-mapping prefix: /{self.name}/ service: https://{self.target.path.fqdn} """ ) # Ambassador should not return an error when hostname is not present. yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-no-secret min_tls_version: v1.0 max_tls_version: v1.3 redirect_cleartext_from: 8080 """ ) # Ambassador should return an error for this configuration. yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-same-context-error hosts: - tls-context-host-1 redirect_cleartext_from: 8080 """ ) # Ambassador should return an error for this configuration. yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-rcf-error hosts: - tls-context-host-1 redirect_cleartext_from: 8081 """ ) def scheme(self) -> str: return "https" @staticmethod def _go_close_connection_error(url): """ :param url: url passed to the query :return: error message string that Go's net/http package throws when server closes connection """ return "Get {}: EOF".format(url) def queries(self): # 0 yield Query( self.url("ambassador/v0/diag/?json=true&filter=errors"), headers={"Host": "tls-context-host-2"}, insecure=True, sni=True, ) # 1 - Correct host #1 yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, ) # 2 - Correct host #2 yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-2"}, expected=200, insecure=True, sni=True, ) # 3 - Incorrect host yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-3"}, # error=self._go_close_connection_error(self.url("tls-context-same/")), expected=404, insecure=True, ) # 4 - Incorrect path, correct host yield Query( self.url("tls-context-different/"), headers={"Host": "tls-context-host-1"}, expected=404, insecure=True, sni=True, ) # Other mappings with no host will respond with the fallbock cert. # 5 - no Host header, fallback cert from the TLS module yield Query( self.url(self.name + "/"), # error=self._go_close_connection_error(self.url(self.name + "/")), insecure=True, ) # 6 - explicit Host header, fallback cert yield Query( self.url(self.name + "/"), # error=self._go_close_connection_error(self.url(self.name + "/")), # sni=True, headers={"Host": "tls-context-host-3"}, insecure=True, ) # 7 - explicit Host header 1 wins, we'll get the SNI cert for this overlapping path yield Query( self.url(self.name + "/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, ) # 8 - explicit Host header 2 wins, we'll get the SNI cert for this overlapping path yield Query( self.url(self.name + "/"), headers={"Host": "tls-context-host-2"}, expected=200, insecure=True, sni=True, ) # 9 - Redirect cleartext from actually redirects. yield Query( self.url("tls-context-same/", scheme="http"), headers={"Host": "tls-context-host-1"}, expected=301, insecure=True, sni=True, ) def check(self): # XXX Ew. If self.results[0].json is empty, the harness won't convert it to a response. errors = self.results[0].json num_errors = len(errors) assert num_errors == 5, "expected 5 errors, got {} -\n{}".format(num_errors, errors) errors_that_should_be_found = { "TLSContext TLSContextTest-no-secret has no certificate information at all?": False, "TLSContext TLSContextTest-same-context-error has no certificate information at all?": False, "TLSContext TLSContextTest-same-context-error is missing cert_chain_file": False, "TLSContext TLSContextTest-same-context-error is missing private_key_file": False, "TLSContext: TLSContextTest-rcf-error; configured conflicting redirect_from port: 8081": False, } unknown_errors: List[str] = [] for err in errors: text = err[1] if text in errors_that_should_be_found: errors_that_should_be_found[text] = True else: unknown_errors.append(f"Unexpected error {text}") for err, found in errors_that_should_be_found.items(): if not found: unknown_errors.append(f"Missing error {err}") assert not unknown_errors, f"Problems with errors: {unknown_errors}" idx = 0 for result in self.results: if result.status == 200 and result.query.headers: host_header = result.query.headers["Host"] tls_common_name = result.tls[0]["Issuer"]["CommonName"] # XXX Weirdness with the fallback cert here! You see, if we use host # tls-context-host-3 (or, really, anything except -1 or -2), then the # fallback cert actually has CN 'localhost'. We should replace this with # a real fallback cert, but for now, just hack the host_header. # # Ew. if host_header == "tls-context-host-3": host_header = "localhost" assert host_header == tls_common_name, "test %d wanted CN %s, but got %s" % ( idx, host_header, tls_common_name, ) idx += 1 def requirements(self): # We're replacing super()'s requirements deliberately here. Without a Host header they can't work. yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-2"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-2"}, insecure=True, sni=True, ), ) class TLSIngressTest(AmbassadorTest): def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: self.manifest_envs = """ - name: AMBASSADOR_DEBUG value: "diagd" """ return ( namespace_manifest("secret-namespace-ingress") + f""" --- apiVersion: v1 data: tls.crt: {TLSCerts["localhost"].k8s_crt} tls.key: {TLSCerts["localhost"].k8s_key} kind: Secret metadata: name: test-tlscontext-secret-ingress-0 labels: kat-ambassador-id: tlsingresstest type: kubernetes.io/tls --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-1"].k8s_crt} tls.key: {TLSCerts["tls-context-host-1"].k8s_key} kind: Secret metadata: name: test-tlscontext-secret-ingress-1 namespace: secret-namespace-ingress labels: kat-ambassador-id: tlsingresstest type: kubernetes.io/tls --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-2"].k8s_crt} tls.key: {TLSCerts["tls-context-host-2"].k8s_key} kind: Secret metadata: name: test-tlscontext-secret-ingress-2 labels: kat-ambassador-id: tlsingresstest type: kubernetes.io/tls --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: annotations: kubernetes.io/ingress.class: ambassador getambassador.io/ambassador-id: tlsingresstest name: {self.name.lower()}-1 spec: tls: - secretName: test-tlscontext-secret-ingress-1.secret-namespace-ingress hosts: - tls-context-host-1 rules: - host: tls-context-host-1 http: paths: - backend: service: name: {self.target.path.k8s} port: number: 80 path: /tls-context-same/ pathType: Prefix --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: annotations: kubernetes.io/ingress.class: ambassador getambassador.io/ambassador-id: tlsingresstest name: {self.name.lower()}-2 spec: tls: - secretName: test-tlscontext-secret-ingress-2 hosts: - tls-context-host-2 rules: - host: tls-context-host-2 http: paths: - backend: service: name: {self.target.path.k8s} port: number: 80 path: /tls-context-same/ pathType: Prefix """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: tls config: server: enabled: True secret: test-tlscontext-secret-ingress-0 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping hostname: "*" name: {self.name}-other-mapping prefix: /{self.name}/ service: https://{self.target.path.fqdn} """ ) def scheme(self) -> str: return "https" @staticmethod def _go_close_connection_error(url): """ :param url: url passed to the query :return: error message string that Go's net/http package throws when server closes connection """ return "Get {}: EOF".format(url) def queries(self): # 0 yield Query( self.url("ambassador/v0/diag/?json=true&filter=errors"), headers={"Host": "tls-context-host-2"}, insecure=True, sni=True, ) # 1 - Correct host #1 yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, ) # 2 - Correct host #2 yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-2"}, expected=200, insecure=True, sni=True, ) # 3 - Incorrect host yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-3"}, # error=self._go_close_connection_error(self.url("tls-context-same/")), expected=404, insecure=True, ) # 4 - Incorrect path, correct host yield Query( self.url("tls-context-different/"), headers={"Host": "tls-context-host-1"}, expected=404, insecure=True, sni=True, ) # Other mappings with no host will respond with the fallbock cert. # 5 - no Host header, fallback cert from the TLS module yield Query( self.url(self.name + "/"), # error=self._go_close_connection_error(self.url(self.name + "/")), insecure=True, ) # 6 - explicit Host header, fallback cert yield Query( self.url(self.name + "/"), # error=self._go_close_connection_error(self.url(self.name + "/")), # sni=True, headers={"Host": "tls-context-host-3"}, insecure=True, ) # 7 - explicit Host header 1 wins, we'll get the SNI cert for this overlapping path yield Query( self.url(self.name + "/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, ) # 7 - explicit Host header 2 wins, we'll get the SNI cert for this overlapping path yield Query( self.url(self.name + "/"), headers={"Host": "tls-context-host-2"}, expected=200, insecure=True, sni=True, ) def check(self): # XXX Ew. If self.results[0].json is empty, the harness won't convert it to a response. errors = self.results[0].json num_errors = len(errors) assert num_errors == 0, "expected 0 errors, got {} -\n{}".format(num_errors, errors) idx = 0 for result in self.results: if result.status == 200 and result.query.headers: host_header = result.query.headers["Host"] tls_common_name = result.tls[0]["Issuer"]["CommonName"] # XXX Weirdness with the fallback cert here! You see, if we use host # tls-context-host-3 (or, really, anything except -1 or -2), then the # fallback cert actually has CN 'localhost'. We should replace this with # a real fallback cert, but for now, just hack the host_header. # # Ew. if host_header == "tls-context-host-3": host_header = "localhost" # Yep, that's expected. Since the TLS secret for 'tls-context-host-1' is # not namespaced it should only resolve to the Ingress' own # namespace, and can't use the 'secret.namespace' Ambassador syntax if host_header == "tls-context-host-1": host_header = "localhost" assert host_header == tls_common_name, "test %d wanted CN %s, but got %s" % ( idx, host_header, tls_common_name, ) idx += 1 def requirements(self): yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-2"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-2"}, insecure=True, sni=True, ), ) class TLSContextProtocolMaxVersion(AmbassadorTest): # Here we're testing that the client can't exceed the maximum TLS version # configured. # # XXX 2019-09-11: vet that the test client's support for TLS v1.3 is up-to-date. # It appears not to be. # debug = True def init(self): self.target = HTTP() if EDGE_STACK: self.xfail = "Not yet supported in Edge Stack" self.xfail = "FIXME: IHA" def manifests(self) -> str: return ( f""" --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-1"].k8s_crt} tls.key: {TLSCerts["tls-context-host-1"].k8s_key} kind: Secret metadata: name: secret.max-version labels: kat-ambassador-id: tlscontextprotocolmaxversion type: kubernetes.io/tls """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: ambassador config: defaults: tls_secret_namespacing: False --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-same-prefix-1 prefix: /tls-context-same/ service: http://{self.target.path.fqdn} host: tls-context-host-1 --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-same-context-1 hosts: - tls-context-host-1 secret: secret.max-version min_tls_version: v1.1 max_tls_version: v1.2 """ ) def scheme(self) -> str: return "https" @staticmethod def _go_close_connection_error(url): """ :param url: url passed to the query :return: error message string that Go's net/http package throws when server closes connection """ return "Get {}: EOF".format(url) def queries(self): # ---- # XXX 2019-09-11 # These aren't actually reporting the negotiated version, alhough correct # behavior can be verified with a custom log format. What, does the silly thing just not # report the negotiated version if it's the max you've requested?? # # For now, we're checking for the None result, but, ew. # ---- yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, minTLSv="v1.2", maxTLSv="v1.2", ) # This should give us TLS v1.1 yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, minTLSv="v1.0", maxTLSv="v1.1", ) # This should be an error. yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, minTLSv="v1.3", maxTLSv="v1.3", error=[ "tls: server selected unsupported protocol version 303", "tls: no supported versions satisfy MinVersion and MaxVersion", "tls: protocol version not supported", "read: connection reset by peer", ], ) # The TLS inspector just closes the connection. Wow. def check(self): assert self.results[0].backend assert self.results[0].backend.request tls_0_version = self.results[0].backend.request.tls.negotiated_protocol_version assert self.results[1].backend assert self.results[1].backend.request tls_1_version = self.results[1].backend.request.tls.negotiated_protocol_version # See comment in queries for why these are None. They should be v1.2 and v1.1 respectively. assert tls_0_version == None, f"requesting TLS v1.2 got TLS {tls_0_version}" assert tls_1_version == None, f"requesting TLS v1.0-v1.1 got TLS {tls_1_version}" def requirements(self): # We're replacing super()'s requirements deliberately here. Without a Host header they can't work. yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, minTLSv="v1.2", ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, minTLSv="v1.2", ), ) class TLSContextProtocolMinVersion(AmbassadorTest): # Here we're testing that the client can't drop below the minimum TLS version # configured. # # XXX 2019-09-11: vet that the test client's support for TLS v1.3 is up-to-date. # It appears not to be. # debug = True def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: return ( f""" --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-1"].k8s_crt} tls.key: {TLSCerts["tls-context-host-1"].k8s_key} kind: Secret metadata: name: secret.min-version labels: kat-ambassador-id: tlscontextprotocolminversion type: kubernetes.io/tls """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-same-prefix-1 prefix: /tls-context-same/ service: https://{self.target.path.fqdn} host: tls-context-host-1 --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-same-context-1 hosts: - tls-context-host-1 secret: secret.min-version secret_namespacing: False min_tls_version: v1.2 max_tls_version: v1.3 """ ) def scheme(self) -> str: return "https" @staticmethod def _go_close_connection_error(url): """ :param url: url passed to the query :return: error message string that Go's net/http package throws when server closes connection """ return "Get {}: EOF".format(url) def queries(self): # This should give v1.3, but it currently seems to give 1.2. yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, minTLSv="v1.2", maxTLSv="v1.3", ) # This should give v1.2 yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, minTLSv="v1.1", maxTLSv="v1.2", ) # This should be an error. yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, minTLSv="v1.0", maxTLSv="v1.0", error=[ "tls: server selected unsupported protocol version 303", "tls: no supported versions satisfy MinVersion and MaxVersion", "tls: protocol version not supported", ], ) def check(self): assert self.results[0].backend assert self.results[0].backend.request tls_0_version = self.results[0].backend.request.tls.negotiated_protocol_version assert self.results[1].backend assert self.results[1].backend.request tls_1_version = self.results[1].backend.request.tls.negotiated_protocol_version # Hmmm. Why does Envoy prefer 1.2 to 1.3 here?? This may be a client thing -- have to # rebuild with Go 1.13. assert tls_0_version == "v1.2", f"requesting TLS v1.2-v1.3 got TLS {tls_0_version}" assert tls_1_version == "v1.2", f"requesting TLS v1.1-v1.2 got TLS {tls_1_version}" def requirements(self): # We're replacing super()'s requirements deliberately here. Without a Host header they can't work. yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) class TLSContextCipherSuites(AmbassadorTest): # debug = True def init(self): self.xfail = "FIXME: IHA" self.target = HTTP() def manifests(self) -> str: return ( f""" --- apiVersion: v1 data: tls.crt: {TLSCerts["tls-context-host-1"].k8s_crt} tls.key: {TLSCerts["tls-context-host-1"].k8s_key} kind: Secret metadata: name: secret.cipher-suites labels: kat-ambassador-id: tlscontextciphersuites type: kubernetes.io/tls """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-same-prefix-1 prefix: /tls-context-same/ service: https://{self.target.path.fqdn} host: tls-context-host-1 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-same-context-1 hosts: - tls-context-host-1 secret: secret.cipher-suites secret_namespacing: False max_tls_version: v1.2 cipher_suites: - ECDHE-RSA-AES128-GCM-SHA256 ecdh_curves: - P-256 """ ) def scheme(self) -> str: return "https" @staticmethod def _go_close_connection_error(url): """ :param url: url passed to the query :return: error message string that Go's net/http package throws when server closes connection """ return "Get {}: EOF".format(url) def queries(self): yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, cipherSuites=["TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"], maxTLSv="v1.2", ) yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, cipherSuites=["TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256"], maxTLSv="v1.2", error="tls: handshake failure", ) yield Query( self.url("tls-context-same/"), headers={"Host": "tls-context-host-1"}, expected=200, insecure=True, sni=True, cipherSuites=["TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256"], ecdhCurves=["X25519"], maxTLSv="v1.2", error="tls: handshake failure", ) def check(self): assert self.results[0].backend assert self.results[0].backend.request tls_0_version = self.results[0].backend.request.tls.negotiated_protocol_version assert tls_0_version == "v1.2", f"requesting TLS v1.2 got TLS {tls_0_version}" def requirements(self): yield ( "url", Query( self.url("ambassador/v0/check_ready"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) yield ( "url", Query( self.url("ambassador/v0/check_alive"), headers={"Host": "tls-context-host-1"}, insecure=True, sni=True, ), ) class TLSContextIstioSecretTest(AmbassadorTest): # debug = True def init(self): self.target = HTTP() if EDGE_STACK: self.xfail = "XFailing for now" def manifests(self) -> str: return ( namespace_manifest("secret-namespace") + """ --- apiVersion: v1 data: cert-chain.pem: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSURJVENDQWdtZ0F3SUJBZ0lSQU8wbFh1OVhOYkNrejJJTEhiYlVBbDh3RFFZSktvWklodmNOQVFFTEJRQXcKR0RFV01CUUdBMVVFQ2hNTlkyeDFjM1JsY2k1c2IyTmhiREFlRncweU1EQXhNakF4TmpReE5EbGFGdzB5TURBMApNVGt4TmpReE5EbGFNQUF3Z2dFaU1BMEdDU3FHU0liM0RRRUJBUVVBQTRJQkR3QXdnZ0VLQW9JQkFRQ3h2RWxuCmd6SldTejR6RGM5TE5od0xCZm1nTStlY3k0T096UEFtSGhnZER2RFhLVE40Qll0bS8veTFRT2tGNG9JeHVMVnAKYW5ULzdHdUJHNzlrbUg1TkpkcWhzV0c1b1h0TWpiZnZnZFJ6dW50UVg1OFI5d0pWT2YwNlo4dHFUYmE4VVI3YQpYZFY1c2VSbGtINU1VWmhVNXkxNzA1ZVNycVBROGVBd1hiazdOejNlTUd4Ujc1NjZOK3g2UDIrcEZmTDF1dEJ3CnRhSVVpYlVNR0liODcwYmtxVmlzSHQ1aC95blkrV3FlclJLREhTLzVRQlZiMytZSXd4N3o1b3FPbDBvZ05YODkKVnlzNFM0NzdXNDBPWGRZaStHeGwwKzFVT2F3NEw2a0tTaWhjVTZJUm1YbWhiUXpRb0VvazN6TDNaR2hWS3FhbwpUaFdqTVhrMkZxS1pNSnBCQWdNQkFBR2pmakI4TUE0R0ExVWREd0VCL3dRRUF3SUZvREFkQmdOVkhTVUVGakFVCkJnZ3JCZ0VGQlFjREFRWUlLd1lCQlFVSEF3SXdEQVlEVlIwVEFRSC9CQUl3QURBOUJnTlZIUkVCQWY4RU16QXgKaGk5emNHbG1abVU2THk5amJIVnpkR1Z5TG14dlkyRnNMMjV6TDJGdFltRnpjMkZrYjNJdmMyRXZaR1ZtWVhWcwpkREFOQmdrcWhraUc5dzBCQVFzRkFBT0NBUUVBaHQ3c1dSOHEzeFNaM1BsTGFnS0REc1c2UlYyRUJCRkNhR08rCjlJb2lrQXZTdTV2b3VKS3EzVHE0WU9LRzJnbEpvVSs1c2lmL25DYzFva1ZTakNJSnh1UVFhdzd5QkV0WWJaZkYKSXI2WEkzbUtCVC9kWHpOM00yL1g4Q3RBNHI5SFQ4VmxmMitJMHNqb01hVE80WHdPNVQ5eXdoREJXdzdrdThVRApnMjdzTFlHVy9UNzIvT0JGUEcxa2VlRUpva3BhSXZQOVliWS9qSlRWZVVIYk1FODVOckJFMWNndUVnSlVod1VKCkhiam4xcEFKMHZsUWZrVW9mT3VRZkFtZGpHWjc2N2phOE5ldHZBdk9tRExPV2dzQWM4KzRsRXBKVURwcmhlVEoKazBrSFh6cUMyTzN4a250U0QxM2FFa2VUMXJocjM3MXc1OTVJUjgvR1llSis3a3JqRkE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== key.pem: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb2dJQkFBS0NBUUVBc2J4Slo0TXlWa3MrTXczUFN6WWNDd1g1b0RQbm5NdURqc3p3Smg0WUhRN3cxeWt6CmVBV0xadi84dFVEcEJlS0NNYmkxYVdwMC8reHJnUnUvWkpoK1RTWGFvYkZodWFGN1RJMjM3NEhVYzdwN1VGK2YKRWZjQ1ZUbjlPbWZMYWsyMnZGRWUybDNWZWJIa1paQitURkdZVk9jdGU5T1hrcTZqMFBIZ01GMjVPemM5M2pCcwpVZStldWpmc2VqOXZxUlh5OWJyUWNMV2lGSW0xREJpRy9POUc1S2xZckI3ZVlmOHAyUGxxbnEwU2d4MHYrVUFWClc5L21DTU1lOCthS2pwZEtJRFYvUFZjck9FdU8rMXVORGwzV0l2aHNaZFB0VkRtc09DK3BDa29vWEZPaUVabDUKb1cwTTBLQktKTjh5OTJSb1ZTcW1xRTRWb3pGNU5oYWltVENhUVFJREFRQUJBb0lCQUI1bXdIK09OMnYvVHRKWQp5RjVyRVB6cHRyc3FaYkd5TmZ5VkhYYkhxd1E5YkFEQnNXWVVQTFlQajJCSmpCSlBua2wyK01EaFRzWC80SnVpCjdXZjlsWTBJcm83OTBtTjROYWp3ak1mUkExQVFVOHQ1cjdIWStITXZpaHNWYWZ2eTh4RGZKMUhldndjajRKZG0KMGRPb0dWQmNnckV0amoydTFhS0YzUDBvNnVndno2SmtSWld2SjZ4SGlya0NETk5MWlpzbHB5UzFHRjZmYm9aTwp1SmFTLzc2S25JS1FQT3hCaE83ME80WHF6am5wMVk1UzduTjRoM1Z2RmVPREcvQ2pWaGhOcE4xV0NadFNvSXBwCk9XOVdONVRvUnZhVDhnelljcG9TOEMzYXVqSzVvV1FiVzdRZys2NXRoWGNqcFpRM0VFSnNaLzNsTWRsbGE3TFcKT2k3Vkhpa0NnWUVBeHBUQjZodnBRcnNXUUhjcDhRdG94RitNUThVL2l5WjZ6dU5BNHZyWFdwUlFDVVg4d1ZiRwowTFNZN1lSVGhuOGtUZ09vWlNWMU9VcThUTjlnOG91UUh6bS9ta1FpV0p0bnNXWGJtNjF3SFozaWNlQ1FxWDU4CmoyUjM2eXBONGpuUENPREVwcDVKWExZLzNFTnZnYTBxSm9ZVWp4UUpHZDgyWUxKRmJrMHZmTzhDZ1lFQTVTQ0MKcHJTR0NBL0dUVkY4MjRmaW1YTkNMcllOVmV1TStqZFdqQUFBZkQzWHpUK1JWeFZsTENTVUluQUdtYjh2djZlcApreHYrdWlBZTg2TDBhUVVDTENSRFF2SjR3MnNPRWkwWWMwTGlKUGdBN1JLeFhwVGUrQ09vS1VmcTZyVi96TTdNCmhCbWtDT2ZoUnRDT3NENGNBcWQ0MzluQjZBVm01K21VV0FqNHU4OENnWUJFTXBSQi9TSG5xKzZoWndzOVgraTAKQUFoZ3dkM253T2hPSXRlRzNCU1hZL1gwcVZkN1luelc4aDdPK3pIZ0w4dmRDdjZLOWdsRENycU9QK3pBZjFPWQpsYkdLbmptWmFvMTY2L3MyaEtMTFdReUtoVS9KRmNwYlNHcXlsWTIzMHBpYWVPNndOZzRGekFVMGRPaFhoWXZEClBTclVWRkluMDNPT1U4cnFiWkdRZXdLQmdFMVJPaVZNOTRtUzRTVElJYXptM3NWUFNuNyt1ZU5MZUNnYk1sNU4KeGR3bTlrSnhkL2I5NWtVT0Z0ckVHTVlhNk43d2tkMXRiZmlhekRjRXZ4c05NSjE2b3lQZE5Ia2xEL3Q4TWlyNgozOXIvd1RnK3ZaR2dCTm1SRnJiUGFPdEkwZFpuMWtXaGJXUC84MW4xR0tGS1pDTlZKZ25Mcm80Ly9HaTN2bkl5Cm5OU3JBb0dBVGVidmRLamtENk5XQmJMWSsrVUVQN1ZLd0pOWlR1VWUvT0FBdlJIKzZaWEV4SkhtM3pjV280TVkKMG8vL2dyNzhBdDM4NEk5QVBwMnQwV3lmTmlaTStWUFh4a1lKTU5IU01mcXdGcVRVSmE3NGttNVUrYnB4Mm1ueAovUlR6aElHMDE4SXN3NHBGeUZ4ekpTSVdCK2VpVEF6NFZsMEw2ZU0yNUp5R3lyU2x0Q2M9Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg== root-cert.pem: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMzVENDQWNXZ0F3SUJBZ0lRVGRHUmJPampxWjBEZDUxOVJqdXlzREFOQmdrcWhraUc5dzBCQVFzRkFEQVkKTVJZd0ZBWURWUVFLRXcxamJIVnpkR1Z5TG14dlkyRnNNQjRYRFRJd01ERXlNREUyTkRBek5Gb1hEVE13TURFeApOekUyTkRBek5Gb3dHREVXTUJRR0ExVUVDaE1OWTJ4MWMzUmxjaTVzYjJOaGJEQ0NBU0l3RFFZSktvWklodmNOCkFRRUJCUUFEZ2dFUEFEQ0NBUW9DZ2dFQkFMNzhadlRtQ2hxYUM5Z0lFUFlWSWYrVkFsU0tJR2JsdktvUUJNNmwKWlNBTmxNQXg3elJQTjFQdVMrV2I5M1hxMXNzN1hEUEY4UmlIL2dCWE05aGZsNUpGTDErbmlLYWR3RHh5UUdXQQpPMUFBQXNmZlpud3NkWDhDOGdCcE5zUkVZYVo5SzExdDI5NmV5WUc1d3ozMW9rZVFYSTVrSU0vdWgxL2wwN3pKClU3eG8zSmVZbHpMZnJSVWhNRnc1Vk5ETkNCY3JldEoyOWgvZzRpS1plM2JDS3laVmJRUkN3VjR5ck12YTA4Z3kKYzRhSGJud1VtRThKT0JvcE5abW1uOHc0bFcwQjFsS1Q3aFhBRldJdW55WVhIOWFabUJJd1pPVk9kV0N4SmZnTQpKSWY1UVJSY0s5MVZGMjYvcUp2RHlwaVpxcHFJcEdQWHJHbHF2dGtTSmwxdHhYMENBd0VBQWFNak1DRXdEZ1lEClZSMFBBUUgvQkFRREFnSUVNQThHQTFVZEV3RUIvd1FGTUFNQkFmOHdEUVlKS29aSWh2Y05BUUVMQlFBRGdnRUIKQUpjWXl3WkoxeUZpQzRpT0xNbXY4MTZYZEhUSWdRTGlLaXNBdGRqb21TdXhLc0o1eXZ3M2lGdkROSklseEQ4SgoyVVROR2JJTFN2d29qQ1JzQVcyMlJtelpjZG95SXkvcFVIR25EVUpiMk14T0svaEVWU0x4cnN6RHlEK2YwR1liCjdhL1Q2ZmJFbUdYK0JHTnBKZ2lTKytwUm5JMzE3THN6aldtTUlmbVF3T1NtZXNvKzhMSXAxZS9STGVKcThoM0cKREZzcVA4c1BLaHNEM1M1RWNGYU5vSVg4OThVK3UvUWlKd3BoS2lDK3RRRzExeGJZanMxaURNcFJpUGsvSi9NRwpiaTZnQm8zZGdjZ1RWWFdOY2YzeHRiQWErMmkzK3k1V25ydHoyK1d4ZG96cEhpN3FLL1BEbGpwVG5JdkY2Nm0wCjBFYVA0T3ZOY29hNk12MUpoYkFVK0w0PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg== kind: Secret metadata: name: istio.test-tlscontext-istio-secret-1 namespace: secret-namespace labels: kat-ambassador-id: tlscontextistiosecret type: istio.io/key-and-cert """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping name: {self.name}-istio-prefix-1 prefix: /tls-context-istio/ service: https://{self.target.path.fqdn} tls: {self.name}-istio-context-1 """ ) yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: {self.name}-istio-context-1 secret: istio.test-tlscontext-istio-secret-1 namespace: secret-namespace secret_namespacing: False """ ) def queries(self): yield Query(self.url("ambassador/v0/diag/?json=true&filter=errors"), phase=2) def check(self): assert ( self.results[0].backend is None ), f"expected 0 errors, got {len(self.results[0].backend.response)}: received {self.results[0].backend.response}" class TLSCoalescing(AmbassadorTest): def init(self): self.target = HTTP() if EDGE_STACK: self.xfail = "Not yet supported in Edge Stack" self.xfail = "FIXME: IHA" def manifests(self) -> str: return ( f""" --- apiVersion: v1 metadata: name: tlscoalescing-certs labels: kat-ambassador-id: tlscoalescing data: tls.crt: {TLSCerts["*.domain.com"].k8s_crt} tls.key: {TLSCerts["*.domain.com"].k8s_key} kind: Secret type: kubernetes.io/tls """ + super().manifests() ) def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: yield self, self.format( """ apiVersion: getambassador.io/v3alpha1 kind: TLSContext name: tlscoalescing-context secret: tlscoalescing-certs alpn_protocols: h2, http/1.1 hosts: - domain.com - a.domain.com - b.domain.com """ ) def scheme(self) -> str: return "https" @staticmethod def _go_close_connection_error(url): """ :param url: url passed to the query :return: error message string that Go's net/http package throws when server closes connection """ return "Get {}: EOF".format(url) def queries(self): yield Query( self.url("ambassador/v0/diag/"), headers={"Host": "a.domain.com"}, insecure=True, sni=True, ) yield Query( self.url("ambassador/v0/diag/"), headers={"Host": "b.domain.com"}, insecure=True, sni=True, ) def requirements(self): yield ("url", Query(self.url("ambassador/v0/check_ready"), insecure=True, sni=True)) class TLSInheritFromModule(AmbassadorTest): target: ServiceType def init(self): self.xfail = "FIXME: IHA" self.edge_stack_cleartext_host = False self.target = HTTP() def config(self) -> Generator[Union[str, Tuple[Node, str]], None, None]: # These are annotations instead of resources because the name matters. yield self, self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: Module name: tls ambassador_id: [{self.ambassador_id}] config: server: enabled: True redirect_cleartext_from: 8080 """ ) def manifests(self) -> str: return ( self.format( """ --- apiVersion: getambassador.io/v3alpha1 kind: TLSContext metadata: name: {self.path.k8s} spec: ambassador_id: [ {self.ambassador_id} ] alpn_protocols: "h2,http/1.1" hosts: - a.domain.com secret: {self.path.k8s} --- apiVersion: v1 kind: Secret metadata: name: {self.path.k8s} labels: kat-ambassador-id: {self.ambassador_id} type: kubernetes.io/tls data: tls.crt: """ + TLSCerts["a.domain.com"].k8s_crt + """ tls.key: """ + TLSCerts["a.domain.com"].k8s_key + """ --- apiVersion: getambassador.io/v3alpha1 kind: Mapping metadata: name: {self.path.k8s}-target-mapping spec: ambassador_id: [ {self.ambassador_id} ] prefix: /foo service: {self.target.path.fqdn} """ ) + super().manifests() ) def scheme(self) -> str: return "https" def queries(self): yield Query(self.url("foo", scheme="http"), headers={"Host": "a.domain.com"}, expected=301) yield Query( self.url("bar", scheme="http"), headers={"Host": "a.domain.com"}, expected=(404 if bug_404_routes else 301), ) yield Query( self.url("foo", scheme="https"), headers={"Host": "a.domain.com"}, ca_cert=TLSCerts["a.domain.com"].pubcert, sni=True, expected=200, ) yield Query( self.url("bar", scheme="https"), headers={"Host": "a.domain.com"}, ca_cert=TLSCerts["a.domain.com"].pubcert, sni=True, expected=404, ) def requirements(self): for r in super().requirements(): query = r[1] query.headers = {"Host": "a.domain.com"} query.sni = ( True # Use query.headers["Host"] instead of urlparse(query.url).hostname for SNI ) query.ca_cert = TLSCerts["a.domain.com"].pubcert yield (r[0], query)