...

Text file src/github.com/letsencrypt/boulder/test/v2_integration.py

Documentation: github.com/letsencrypt/boulder/test

     1# -*- coding: utf-8 -*-
     2"""
     3Integration test cases for ACMEv2 as implemented by boulder-wfe2.
     4"""
     5import subprocess
     6import requests
     7import datetime
     8import time
     9import os
    10import json
    11import re
    12
    13import OpenSSL
    14
    15from cryptography import x509
    16from cryptography.hazmat.backends import default_backend
    17from cryptography.hazmat.primitives.asymmetric import rsa
    18from cryptography.hazmat.primitives import serialization
    19
    20import chisel2
    21from helpers import *
    22
    23from acme import errors as acme_errors
    24
    25from acme.messages import Status, CertificateRequest, Directory, NewRegistration
    26from acme import crypto_util as acme_crypto_util
    27from acme import client as acme_client
    28from acme import messages
    29from acme import challenges
    30from acme import errors
    31
    32import josepy
    33
    34import tempfile
    35import shutil
    36import atexit
    37import random
    38import string
    39
    40import threading
    41from http.server import HTTPServer, BaseHTTPRequestHandler
    42import socketserver
    43import socket
    44
    45import challtestsrv
    46challSrv = challtestsrv.ChallTestServer()
    47
    48def test_multidomain():
    49    chisel2.auth_and_issue([random_domain(), random_domain()])
    50
    51def test_wildcardmultidomain():
    52    """
    53    Test issuance for a random domain and a random wildcard domain using DNS-01.
    54    """
    55    chisel2.auth_and_issue([random_domain(), "*."+random_domain()], chall_type="dns-01")
    56
    57def test_http_challenge():
    58    chisel2.auth_and_issue([random_domain(), random_domain()], chall_type="http-01")
    59
    60def rand_http_chall(client):
    61    d = random_domain()
    62    csr_pem = chisel2.make_csr([d])
    63    order = client.new_order(csr_pem)
    64    authzs = order.authorizations
    65    for a in authzs:
    66        for c in a.body.challenges:
    67            if isinstance(c.chall, challenges.HTTP01):
    68                return d, c.chall
    69    raise(Exception("No HTTP-01 challenge found for random domain authz"))
    70
    71def check_challenge_dns_err(chalType):
    72    """
    73    check_challenge_dns_err tests that performing an ACME challenge of the
    74    specified type to a hostname that is configured to return SERVFAIL for all
    75    queries produces the correct problem type and detail message.
    76    """
    77    client = chisel2.make_client()
    78
    79    # Create a random domains.
    80    d = random_domain()
    81
    82    # Configure the chall srv to SERVFAIL all queries for that domain.
    83    challSrv.add_servfail_response(d)
    84
    85    # Expect a DNS problem with a detail that matches a regex
    86    expectedProbType = "dns"
    87    expectedProbRegex = re.compile(r"SERVFAIL looking up (A|AAAA|TXT|CAA) for {0}".format(d))
    88
    89    # Try and issue for the domain with the given challenge type.
    90    failed = False
    91    try:
    92        chisel2.auth_and_issue([d], client=client, chall_type=chalType)
    93    except acme_errors.ValidationError as e:
    94        # Mark that the auth_and_issue failed
    95        failed = True
    96        # Extract the failed challenge from each failed authorization
    97        for authzr in e.failed_authzrs:
    98            c = None
    99            if chalType == "http-01":
   100                c = chisel2.get_chall(authzr, challenges.HTTP01)
   101            elif chalType == "dns-01":
   102                c = chisel2.get_chall(authzr, challenges.DNS01)
   103            elif chalType == "tls-alpn-01":
   104                c = chisel2.get_chall(authzr, challenges.TLSALPN01)
   105            else:
   106                raise(Exception("Invalid challenge type requested: {0}".format(challType)))
   107
   108            # The failed challenge's error should match expected
   109            error = c.error
   110            if error is None or error.typ != "urn:ietf:params:acme:error:{0}".format(expectedProbType):
   111                raise(Exception("Expected {0} prob, got {1}".format(expectedProbType, error.typ)))
   112            if not expectedProbRegex.search(error.detail):
   113                raise(Exception("Prob detail did not match expectedProbRegex, got \"{0}\"".format(error.detail)))
   114    finally:
   115        challSrv.remove_servfail_response(d)
   116
   117    # If there was no exception that means something went wrong. The test should fail.
   118    if failed is False:
   119        raise(Exception("No problem generated issuing for broken DNS identifier"))
   120
   121def test_http_challenge_dns_err():
   122    """
   123    test_http_challenge_dns_err tests that a HTTP-01 challenge for a domain
   124    with broken DNS produces the correct problem response.
   125    """
   126    check_challenge_dns_err("http-01")
   127
   128def test_dns_challenge_dns_err():
   129    """
   130    test_dns_challenge_dns_err tests that a DNS-01 challenge for a domain
   131    with broken DNS produces the correct problem response.
   132    """
   133    check_challenge_dns_err("dns-01")
   134
   135def test_tls_alpn_challenge_dns_err():
   136    """
   137    test_tls_alpn_challenge_dns_err tests that a TLS-ALPN-01 challenge for a domain
   138    with broken DNS produces the correct problem response.
   139    """
   140    check_challenge_dns_err("tls-alpn-01")
   141
   142def test_http_challenge_broken_redirect():
   143    """
   144    test_http_challenge_broken_redirect tests that a common webserver
   145    mis-configuration receives the correct specialized error message when attempting
   146    an HTTP-01 challenge.
   147    """
   148    client = chisel2.make_client()
   149
   150    # Create an authz for a random domain and get its HTTP-01 challenge token
   151    d, chall = rand_http_chall(client)
   152    token = chall.encode("token")
   153
   154    # Create a broken HTTP redirect similar to a sort we see frequently "in the wild"
   155    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   156    redirect = "http://{0}.well-known/acme-challenge/bad-bad-bad".format(d)
   157    challSrv.add_http_redirect(
   158        challengePath,
   159        redirect)
   160
   161    # Expect the specialized error message
   162    expectedError = "10.77.77.77: Fetching {0}: Invalid host in redirect target \"{1}.well-known\". Check webserver config for missing '/' in redirect target.".format(redirect, d)
   163
   164    # NOTE(@cpu): Can't use chisel2.expect_problem here because it doesn't let
   165    # us interrogate the detail message easily.
   166    try:
   167        chisel2.auth_and_issue([d], client=client, chall_type="http-01")
   168    except acme_errors.ValidationError as e:
   169        for authzr in e.failed_authzrs:
   170            c = chisel2.get_chall(authzr, challenges.HTTP01)
   171            error = c.error
   172            if error is None or error.typ != "urn:ietf:params:acme:error:connection":
   173                raise(Exception("Expected connection prob, got %s" % (error.__str__())))
   174            if error.detail != expectedError:
   175                raise(Exception("Expected prob detail %s, got %s" % (expectedError, error.detail)))
   176
   177    challSrv.remove_http_redirect(challengePath)
   178
   179def test_failed_validation_limit():
   180    """
   181    Fail a challenge repeatedly for the same domain, with the same account. Once
   182    we reach the rate limit we should get a rateLimitedError. Note that this
   183    depends on the specific threshold configured in rate-limit-policies.yml.
   184
   185    This also incidentally tests a fix for
   186    https://github.com/letsencrypt/boulder/issues/4329. We expect to get
   187    ValidationErrors, eventually followed by a rate limit error.
   188    """
   189    domain = "fail." + random_domain()
   190    csr_pem = chisel2.make_csr([domain])
   191    client = chisel2.make_client()
   192    threshold = 3
   193    for _ in range(threshold):
   194        order = client.new_order(csr_pem)
   195        chall = order.authorizations[0].body.challenges[0]
   196        client.answer_challenge(chall, chall.response(client.net.key))
   197        try:
   198            client.poll_and_finalize(order)
   199        except errors.ValidationError as e:
   200            pass
   201    chisel2.expect_problem("urn:ietf:params:acme:error:rateLimited",
   202        lambda: chisel2.auth_and_issue([domain], client=client))
   203
   204
   205def test_http_challenge_loop_redirect():
   206    client = chisel2.make_client()
   207
   208    # Create an authz for a random domain and get its HTTP-01 challenge token
   209    d, chall = rand_http_chall(client)
   210    token = chall.encode("token")
   211
   212    # Create a HTTP redirect from the challenge's validation path to itself
   213    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   214    challSrv.add_http_redirect(
   215        challengePath,
   216        "http://{0}{1}".format(d, challengePath))
   217
   218    # Issuing for the the name should fail because of the challenge domains's
   219    # redirect loop.
   220    chisel2.expect_problem("urn:ietf:params:acme:error:connection",
   221        lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
   222
   223    challSrv.remove_http_redirect(challengePath)
   224
   225def test_http_challenge_badport_redirect():
   226    client = chisel2.make_client()
   227
   228    # Create an authz for a random domain and get its HTTP-01 challenge token
   229    d, chall = rand_http_chall(client)
   230    token = chall.encode("token")
   231
   232    # Create a HTTP redirect from the challenge's validation path to a host with
   233    # an invalid port.
   234    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   235    challSrv.add_http_redirect(
   236        challengePath,
   237        "http://{0}:1337{1}".format(d, challengePath))
   238
   239    # Issuing for the name should fail because of the challenge domain's
   240    # invalid port redirect.
   241    chisel2.expect_problem("urn:ietf:params:acme:error:connection",
   242        lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
   243
   244    challSrv.remove_http_redirect(challengePath)
   245
   246def test_http_challenge_badhost_redirect():
   247    client = chisel2.make_client()
   248
   249    # Create an authz for a random domain and get its HTTP-01 challenge token
   250    d, chall = rand_http_chall(client)
   251    token = chall.encode("token")
   252
   253    # Create a HTTP redirect from the challenge's validation path to a bare IP
   254    # hostname.
   255    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   256    challSrv.add_http_redirect(
   257        challengePath,
   258        "https://127.0.0.1{0}".format(challengePath))
   259
   260    # Issuing for the name should cause a connection error because the redirect
   261    # domain name is an IP address.
   262    chisel2.expect_problem("urn:ietf:params:acme:error:connection",
   263        lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
   264
   265    challSrv.remove_http_redirect(challengePath)
   266
   267def test_http_challenge_badproto_redirect():
   268    client = chisel2.make_client()
   269
   270    # Create an authz for a random domain and get its HTTP-01 challenge token
   271    d, chall = rand_http_chall(client)
   272    token = chall.encode("token")
   273
   274    # Create a HTTP redirect from the challenge's validation path to whacky
   275    # non-http/https protocol URL.
   276    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   277    challSrv.add_http_redirect(
   278        challengePath,
   279        "gopher://{0}{1}".format(d, challengePath))
   280
   281    # Issuing for the name should cause a connection error because the redirect
   282    # domain name is an IP address.
   283    chisel2.expect_problem("urn:ietf:params:acme:error:connection",
   284        lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
   285
   286    challSrv.remove_http_redirect(challengePath)
   287
   288def test_http_challenge_http_redirect():
   289    client = chisel2.make_client()
   290
   291    # Create an authz for a random domain and get its HTTP-01 challenge token
   292    d, chall = rand_http_chall(client)
   293    token = chall.encode("token")
   294    # Calculate its keyauth so we can add it in a special non-standard location
   295    # for the redirect result
   296    resp = chall.response(client.net.key)
   297    keyauth = resp.key_authorization
   298    challSrv.add_http01_response("http-redirect", keyauth)
   299
   300    # Create a HTTP redirect from the challenge's validation path to some other
   301    # token path where we have registered the key authorization.
   302    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   303    redirectPath = "/.well-known/acme-challenge/http-redirect?params=are&important=to&not=lose"
   304    challSrv.add_http_redirect(
   305        challengePath,
   306        "http://{0}{1}".format(d, redirectPath))
   307
   308    chisel2.auth_and_issue([d], client=client, chall_type="http-01")
   309
   310    challSrv.remove_http_redirect(challengePath)
   311    challSrv.remove_http01_response("http-redirect")
   312
   313    history = challSrv.http_request_history(d)
   314    challSrv.clear_http_request_history(d)
   315
   316    # There should have been at least two GET requests made to the
   317    # challtestsrv. There may have been more if remote VAs were configured.
   318    if len(history) < 2:
   319        raise(Exception("Expected at least 2 HTTP request events on challtestsrv, found {1}".format(len(history))))
   320
   321    initialRequests = []
   322    redirectedRequests = []
   323
   324    for request in history:
   325      # All requests should have been over HTTP
   326      if request['HTTPS'] is True:
   327        raise(Exception("Expected all requests to be HTTP"))
   328      # Initial requests should have the expected initial HTTP-01 URL for the challenge
   329      if request['URL'] == challengePath:
   330        initialRequests.append(request)
   331      # Redirected requests should have the expected redirect path URL with all
   332      # its parameters
   333      elif request['URL'] == redirectPath:
   334        redirectedRequests.append(request)
   335      else:
   336        raise(Exception("Unexpected request URL {0} in challtestsrv history: {1}".format(request['URL'], request)))
   337
   338    # There should have been at least 1 initial HTTP-01 validation request.
   339    if len(initialRequests) < 1:
   340        raise(Exception("Expected {0} initial HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(initialRequests))))
   341
   342    # There should have been at least 1 redirected HTTP request for each VA
   343    if len(redirectedRequests) < 1:
   344        raise(Exception("Expected {0} redirected HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(redirectedRequests))))
   345
   346def test_http_challenge_https_redirect():
   347    client = chisel2.make_client()
   348
   349    # Create an authz for a random domain and get its HTTP-01 challenge token
   350    d, chall = rand_http_chall(client)
   351    token = chall.encode("token")
   352    # Calculate its keyauth so we can add it in a special non-standard location
   353    # for the redirect result
   354    resp = chall.response(client.net.key)
   355    keyauth = resp.key_authorization
   356    challSrv.add_http01_response("https-redirect", keyauth)
   357
   358    # Create a HTTP redirect from the challenge's validation path to an HTTPS
   359    # path with some parameters
   360    challengePath = "/.well-known/acme-challenge/{0}".format(token)
   361    redirectPath = "/.well-known/acme-challenge/https-redirect?params=are&important=to&not=lose"
   362    challSrv.add_http_redirect(
   363        challengePath,
   364        "https://{0}{1}".format(d, redirectPath))
   365
   366    # Also add an A record for the domain pointing to the interface that the
   367    # HTTPS HTTP-01 challtestsrv is bound.
   368    challSrv.add_a_record(d, ["10.77.77.77"])
   369
   370    try:
   371        chisel2.auth_and_issue([d], client=client, chall_type="http-01")
   372    except errors.ValidationError as e:
   373        problems = []
   374        for authzr in e.failed_authzrs:
   375            for chall in authzr.body.challenges:
   376                error = chall.error
   377                if error:
   378                    problems.append(error.__str__())
   379        raise(Exception("validation problem: %s" % "; ".join(problems)))
   380
   381    challSrv.remove_http_redirect(challengePath)
   382    challSrv.remove_a_record(d)
   383
   384    history = challSrv.http_request_history(d)
   385    challSrv.clear_http_request_history(d)
   386
   387    # There should have been at least two GET requests made to the challtestsrv by the VA
   388    if len(history) < 2:
   389        raise(Exception("Expected 2 HTTP request events on challtestsrv, found {0}".format(len(history))))
   390
   391    initialRequests = []
   392    redirectedRequests = []
   393
   394    for request in history:
   395      # Initial requests should have the expected initial HTTP-01 URL for the challenge
   396      if request['URL'] == challengePath:
   397        initialRequests.append(request)
   398      # Redirected requests should have the expected redirect path URL with all
   399      # its parameters
   400      elif request['URL'] == redirectPath:
   401        redirectedRequests.append(request)
   402      else:
   403        raise(Exception("Unexpected request URL {0} in challtestsrv history: {1}".format(request['URL'], request)))
   404
   405    # There should have been at least 1 initial HTTP-01 validation request.
   406    if len(initialRequests) < 1:
   407        raise(Exception("Expected {0} initial HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(initialRequests))))
   408     # All initial requests should have been over HTTP
   409    for r in initialRequests:
   410      if r['HTTPS'] is True:
   411        raise(Exception("Expected all initial requests to be HTTP, got %s" % r))
   412
   413    # There should have been at least 1 redirected HTTP request for each VA
   414    if len(redirectedRequests) < 1:
   415        raise(Exception("Expected {0} redirected HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(redirectedRequests))))
   416    # All the redirected requests should have been over HTTPS with the correct
   417    # SNI value
   418    for r in redirectedRequests:
   419      if r['HTTPS'] is False:
   420        raise(Exception("Expected all redirected requests to be HTTPS"))
   421      if r['ServerName'] != d:
   422        raise(Exception("Expected all redirected requests to have ServerName {0} got \"{1}\"".format(d, r['ServerName'])))
   423
   424class SlowHTTPRequestHandler(BaseHTTPRequestHandler):
   425    def do_GET(self):
   426        try:
   427            # Sleeptime needs to be larger than the RA->VA timeout (20s at the
   428            # time of writing)
   429            sleeptime = 22
   430            print("SlowHTTPRequestHandler: sleeping for {0}s\n".format(sleeptime))
   431            time.sleep(sleeptime)
   432            self.send_response(200)
   433            self.end_headers()
   434            self.wfile.write(b"this is not an ACME key authorization")
   435        except:
   436            pass
   437
   438class SlowHTTPServer(HTTPServer):
   439    # Override handle_error so we don't print a misleading stack trace when the
   440    # VA terminates the connection due to timeout.
   441    def handle_error(self, request, client_address):
   442        pass
   443
   444def test_http_challenge_timeout():
   445    """
   446    test_http_challenge_timeout tests that the VA times out challenge requests
   447    to a slow HTTP server appropriately.
   448    """
   449    # Start a simple python HTTP server on port 80 in its own thread.
   450    # NOTE(@cpu): The pebble-challtestsrv binds 10.77.77.77:80 for HTTP-01
   451    # challenges so we must use the 10.88.88.88 address for the throw away
   452    # server for this test and add a mock DNS entry that directs the VA to it.
   453    httpd = SlowHTTPServer(("10.88.88.88", 80), SlowHTTPRequestHandler)
   454    thread = threading.Thread(target = httpd.serve_forever)
   455    thread.daemon = False
   456    thread.start()
   457
   458    # Pick a random domain
   459    hostname = random_domain()
   460
   461    # Add A record for the domains to ensure the VA's requests are directed
   462    # to the interface that we bound the HTTPServer to.
   463    challSrv.add_a_record(hostname, ["10.88.88.88"])
   464
   465    start = datetime.datetime.utcnow()
   466    end = 0
   467
   468    try:
   469        # We expect a connection timeout error to occur
   470        chisel2.expect_problem("urn:ietf:params:acme:error:connection",
   471            lambda: chisel2.auth_and_issue([hostname], chall_type="http-01"))
   472        end = datetime.datetime.utcnow()
   473    finally:
   474        # Shut down the HTTP server gracefully and join on its thread.
   475        httpd.shutdown()
   476        httpd.server_close()
   477        thread.join()
   478
   479    delta = end - start
   480    # Expected duration should be the RA->VA timeout plus some padding (At
   481    # present the timeout is 20s so adding 2s of padding = 22s)
   482    expectedDuration = 22
   483    if delta.total_seconds() == 0 or delta.total_seconds() > expectedDuration:
   484        raise(Exception("expected timeout to occur in under {0} seconds. Took {1}".format(expectedDuration, delta.total_seconds())))
   485
   486
   487def test_tls_alpn_challenge():
   488    # Pick two random domains
   489    domains = [random_domain(),random_domain()]
   490
   491    # Add A records for these domains to ensure the VA's requests are directed
   492    # to the interface that the challtestsrv has bound for TLS-ALPN-01 challenge
   493    # responses
   494    for host in domains:
   495        challSrv.add_a_record(host, ["10.88.88.88"])
   496    chisel2.auth_and_issue(domains, chall_type="tls-alpn-01")
   497
   498    for host in domains:
   499        challSrv.remove_a_record(host)
   500
   501def test_overlapping_wildcard():
   502    """
   503    Test issuance for a random domain and a wildcard version of the same domain
   504    using DNS-01. This should result in *two* distinct authorizations.
   505    """
   506    domain = random_domain()
   507    domains = [ domain, "*."+domain ]
   508    client = chisel2.make_client(None)
   509    csr_pem = chisel2.make_csr(domains)
   510    order = client.new_order(csr_pem)
   511    authzs = order.authorizations
   512
   513    if len(authzs) != 2:
   514        raise(Exception("order for %s had %d authorizations, expected 2" %
   515                (domains, len(authzs))))
   516
   517    cleanup = chisel2.do_dns_challenges(client, authzs)
   518    try:
   519        order = client.poll_and_finalize(order)
   520    finally:
   521        cleanup()
   522
   523def test_highrisk_blocklist():
   524    """
   525    Test issuance for a subdomain of a HighRiskBlockedNames entry. It should
   526    fail with a policy error.
   527    """
   528
   529    # We include "example.org" in `test/hostname-policy.yaml` in the
   530    # HighRiskBlockedNames list so issuing for "foo.example.org" should be
   531    # blocked.
   532    domain = "foo.example.org"
   533    # We expect this to produce a policy problem
   534    chisel2.expect_problem("urn:ietf:params:acme:error:rejectedIdentifier",
   535        lambda: chisel2.auth_and_issue([domain], chall_type="dns-01"))
   536
   537def test_wildcard_exactblacklist():
   538    """
   539    Test issuance for a wildcard that would cover an exact blacklist entry. It
   540    should fail with a policy error.
   541    """
   542
   543    # We include "highrisk.le-test.hoffman-andrews.com" in `test/hostname-policy.yaml`
   544    # Issuing for "*.le-test.hoffman-andrews.com" should be blocked
   545    domain = "*.le-test.hoffman-andrews.com"
   546    # We expect this to produce a policy problem
   547    chisel2.expect_problem("urn:ietf:params:acme:error:rejectedIdentifier",
   548        lambda: chisel2.auth_and_issue([domain], chall_type="dns-01"))
   549
   550def test_wildcard_authz_reuse():
   551    """
   552    Test that an authorization for a base domain obtained via HTTP-01 isn't
   553    reused when issuing a wildcard for that base domain later on.
   554    """
   555
   556    # Create one client to reuse across multiple issuances
   557    client = chisel2.make_client(None)
   558
   559    # Pick a random domain to issue for
   560    domains = [ random_domain() ]
   561    csr_pem = chisel2.make_csr(domains)
   562
   563    # Submit an order for the name
   564    order = client.new_order(csr_pem)
   565    # Complete the order via an HTTP-01 challenge
   566    cleanup = chisel2.do_http_challenges(client, order.authorizations)
   567    try:
   568        order = client.poll_and_finalize(order)
   569    finally:
   570        cleanup()
   571
   572    # Now try to issue a wildcard for the random domain
   573    domains[0] = "*." + domains[0]
   574    csr_pem = chisel2.make_csr(domains)
   575    order = client.new_order(csr_pem)
   576
   577    # We expect all of the returned authorizations to be pending status
   578    for authz in order.authorizations:
   579        if authz.body.status != Status("pending"):
   580            raise(Exception("order for %s included a non-pending authorization (status: %s) from a previous HTTP-01 order" %
   581                    ((domains), str(authz.body.status))))
   582
   583def test_bad_overlap_wildcard():
   584    chisel2.expect_problem("urn:ietf:params:acme:error:malformed",
   585        lambda: chisel2.auth_and_issue(["*.example.com", "www.example.com"]))
   586
   587def test_duplicate_orders():
   588    """
   589    Test that the same client issuing for the same domain names twice in a row
   590    works without error.
   591    """
   592    client = chisel2.make_client(None)
   593    domains = [ random_domain() ]
   594    chisel2.auth_and_issue(domains, client=client)
   595    chisel2.auth_and_issue(domains, client=client)
   596
   597def test_order_reuse_failed_authz():
   598    """
   599    Test that creating an order for a domain name, failing an authorization in
   600    that order, and submitting another new order request for the same name
   601    doesn't reuse a failed authorizaton in the new order.
   602    """
   603
   604    client = chisel2.make_client(None)
   605    domains = [ random_domain() ]
   606    csr_pem = chisel2.make_csr(domains)
   607
   608    order = client.new_order(csr_pem)
   609    firstOrderURI = order.uri
   610
   611    # Pick the first authz's first challenge, doesn't matter what type it is
   612    chall_body = order.authorizations[0].body.challenges[0]
   613    # Answer it, but with nothing set up to solve the challenge request
   614    client.answer_challenge(chall_body, chall_body.response(client.net.key))
   615
   616    deadline = datetime.datetime.now() + datetime.timedelta(seconds=60)
   617    authzFailed = False
   618    try:
   619        # Poll the order's authorizations until they are non-pending, a timeout
   620        # occurs, or there is an invalid authorization status.
   621        client.poll_authorizations(order, deadline)
   622    except acme_errors.ValidationError as e:
   623        # We expect there to be a ValidationError from one of the authorizations
   624        # being invalid.
   625        authzFailed = True
   626
   627    # If the poll ended and an authz's status isn't invalid then we reached the
   628    # deadline, fail the test
   629    if not authzFailed:
   630        raise(Exception("timed out waiting for order %s to become invalid" % firstOrderURI))
   631
   632    # Make another order with the same domains
   633    order = client.new_order(csr_pem)
   634
   635    # It should not be the same order as before
   636    if order.uri == firstOrderURI:
   637        raise(Exception("new-order for %s returned a , now-invalid, order" % domains))
   638
   639    # We expect all of the returned authorizations to be pending status
   640    for authz in order.authorizations:
   641        if authz.body.status != Status("pending"):
   642            raise(Exception("order for %s included a non-pending authorization (status: %s) from a previous order" %
   643                    ((domains), str(authz.body.status))))
   644
   645    # We expect the new order can be fulfilled
   646    cleanup = chisel2.do_http_challenges(client, order.authorizations)
   647    try:
   648        order = client.poll_and_finalize(order)
   649    finally:
   650        cleanup()
   651
   652def test_order_finalize_early():
   653    """
   654    Test that finalizing an order before its fully authorized results in the
   655    order having an error set and the status being invalid.
   656    """
   657    # Create a client
   658    client = chisel2.make_client(None)
   659
   660    # Create a random domain and a csr
   661    domains = [ random_domain() ]
   662    csr_pem = chisel2.make_csr(domains)
   663
   664    # Create an order for the domain
   665    order = client.new_order(csr_pem)
   666
   667    deadline = datetime.datetime.now() + datetime.timedelta(seconds=5)
   668
   669    # Finalizing an order early should generate an orderNotReady error.
   670    chisel2.expect_problem("urn:ietf:params:acme:error:orderNotReady",
   671        lambda: client.finalize_order(order, deadline))
   672
   673def test_revoke_by_account_unspecified():
   674    client = chisel2.make_client()
   675    cert_file = temppath('test_revoke_by_account_0.pem')
   676    order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
   677    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
   678
   679    reset_akamai_purges()
   680    client.revoke(josepy.ComparableX509(cert), 0)
   681
   682    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
   683    verify_akamai_purge()
   684
   685def test_revoke_by_account_with_reason():
   686    client = chisel2.make_client(None)
   687    cert_file = temppath('test_revoke_by_account_1.pem')
   688    order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
   689    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
   690
   691    reset_akamai_purges()
   692
   693    # Requesting revocation for keyCompromise should work, but not block the
   694    # key.
   695    client.revoke(josepy.ComparableX509(cert), 1)
   696    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "keyCompromise")
   697
   698    verify_akamai_purge()
   699
   700def test_revoke_by_authz():
   701    domains = [random_domain()]
   702    cert_file = temppath('test_revoke_by_authz.pem')
   703    order = chisel2.auth_and_issue(domains, cert_output=cert_file.name)
   704    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
   705
   706    # create a new client and re-authz
   707    client = chisel2.make_client(None)
   708    chisel2.auth_and_issue(domains, client=client)
   709
   710    reset_akamai_purges()
   711
   712    # Even though we requested reason 1 ("keyCompromise"), the result should be
   713    # 5 ("cessationOfOperation") due to the authorization method.
   714    client.revoke(josepy.ComparableX509(cert), 1)
   715    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "cessationOfOperation")
   716
   717    verify_akamai_purge()
   718
   719def test_revoke_by_privkey():
   720    domains = [random_domain()]
   721
   722    # We have to make our own CSR so that we can hold on to the private key
   723    # for revocation later.
   724    key = rsa.generate_private_key(65537, 2048, default_backend())
   725    key_pem = key.private_bytes(
   726        encoding=serialization.Encoding.PEM,
   727        format=serialization.PrivateFormat.TraditionalOpenSSL,
   728        encryption_algorithm=serialization.NoEncryption()
   729    )
   730    csr_pem = acme_crypto_util.make_csr(key_pem, domains, False)
   731
   732    # We have to do our own issuance because we made our own CSR.
   733    issue_client = chisel2.make_client(None)
   734    order = issue_client.new_order(csr_pem)
   735    cleanup = chisel2.do_http_challenges(issue_client, order.authorizations)
   736    try:
   737        order = issue_client.poll_and_finalize(order)
   738    finally:
   739        cleanup()
   740    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
   741
   742    cert_file = tempfile.NamedTemporaryFile(
   743        dir=tempdir, suffix='.test_revoke_by_privkey.pem',
   744        mode='w+', delete=False)
   745    cert_file.write(OpenSSL.crypto.dump_certificate(
   746        OpenSSL.crypto.FILETYPE_PEM, cert).decode())
   747    cert_file.close()
   748
   749    # Create a new client with the cert key as the account key. We don't
   750    # register a server-side account with this client, as we don't need one.
   751    revoke_client = chisel2.uninitialized_client(key=josepy.JWKRSA(key=key))
   752
   753    reset_akamai_purges()
   754
   755    # Even though we requested reason 0 ("unspecified"), the result should be
   756    # 1 ("keyCompromise") due to the authorization method.
   757    revoke_client.revoke(josepy.ComparableX509(cert), 0)
   758    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "keyCompromise")
   759
   760    verify_akamai_purge()
   761
   762def test_double_revocation():
   763    domains = [random_domain()]
   764
   765    # We have to make our own CSR so that we can hold on to the private key
   766    # for revocation later.
   767    key = rsa.generate_private_key(65537, 2048, default_backend())
   768    key_pem = key.private_bytes(
   769        encoding=serialization.Encoding.PEM,
   770        format=serialization.PrivateFormat.TraditionalOpenSSL,
   771        encryption_algorithm=serialization.NoEncryption()
   772    )
   773    csr_pem = acme_crypto_util.make_csr(key_pem, domains, False)
   774
   775    # We have to do our own issuance because we made our own CSR.
   776    sub_client = chisel2.make_client(None)
   777    order = sub_client.new_order(csr_pem)
   778    cleanup = chisel2.do_http_challenges(sub_client, order.authorizations)
   779    try:
   780        order = sub_client.poll_and_finalize(order)
   781    finally:
   782        cleanup()
   783    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
   784
   785    cert_file = tempfile.NamedTemporaryFile(
   786        dir=tempdir, suffix='.test_double_revoke.pem',
   787        mode='w+', delete=False)
   788    cert_file.write(OpenSSL.crypto.dump_certificate(
   789        OpenSSL.crypto.FILETYPE_PEM, cert).decode())
   790    cert_file.close()
   791
   792    # Create a new client with the cert key as the account key. We don't
   793    # register a server-side account with this client, as we don't need one.
   794    cert_client = chisel2.uninitialized_client(key=josepy.JWKRSA(key=key))
   795
   796    reset_akamai_purges()
   797
   798    # First revoke for any reason.
   799    sub_client.revoke(josepy.ComparableX509(cert), 0)
   800    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
   801    verify_akamai_purge()
   802
   803    # Re-revocation for anything other than keyCompromise should fail.
   804    try:
   805        sub_client.revoke(josepy.ComparableX509(cert), 3)
   806    except messages.Error:
   807        pass
   808    else:
   809        raise(Exception("Re-revoked for a bad reason"))
   810
   811    # Re-revocation for keyCompromise should work, as long as it is done
   812    # via the cert key to demonstrate said compromise.
   813    reset_akamai_purges()
   814    cert_client.revoke(josepy.ComparableX509(cert), 1)
   815    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "keyCompromise")
   816    verify_akamai_purge()
   817
   818    # A subsequent attempt should fail, because the cert is already revoked
   819    # for keyCompromise.
   820    try:
   821        cert_client.revoke(josepy.ComparableX509(cert), 1)
   822    except messages.Error:
   823        pass
   824    else:
   825        raise(Exception("Re-revoked already keyCompromise'd cert"))
   826
   827    # The same is true even when using the cert key.
   828    try:
   829        cert_client.revoke(josepy.ComparableX509(cert), 1)
   830    except messages.Error:
   831        pass
   832    else:
   833        raise(Exception("Re-revoked already keyCompromise'd cert"))
   834
   835def test_sct_embedding():
   836    order = chisel2.auth_and_issue([random_domain()])
   837    print(order.fullchain_pem.encode())
   838    cert = parse_cert(order)
   839
   840    # make sure there is no poison extension
   841    try:
   842        cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3"))
   843        raise(Exception("certificate contains CT poison extension"))
   844    except x509.ExtensionNotFound:
   845        # do nothing
   846        pass
   847
   848    # make sure there is a SCT list extension
   849    try:
   850        sctList = cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"))
   851    except x509.ExtensionNotFound:
   852        raise(Exception("certificate doesn't contain SCT list extension"))
   853    if len(sctList.value) != 2:
   854        raise(Exception("SCT list contains wrong number of SCTs"))
   855    for sct in sctList.value:
   856        if sct.version != x509.certificate_transparency.Version.v1:
   857            raise(Exception("SCT contains wrong version"))
   858        if sct.entry_type != x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE:
   859            raise(Exception("SCT contains wrong entry type"))
   860
   861def test_only_return_existing_reg():
   862    client = chisel2.uninitialized_client()
   863    email = "test@not-example.com"
   864    client.new_account(messages.NewRegistration.from_data(email=email,
   865            terms_of_service_agreed=True))
   866
   867    client = chisel2.uninitialized_client(key=client.net.key)
   868    class extendedAcct(dict):
   869        def json_dumps(self, indent=None):
   870            return json.dumps(self)
   871    acct = extendedAcct({
   872        "termsOfServiceAgreed": True,
   873        "contact": [email],
   874        "onlyReturnExisting": True
   875    })
   876    resp = client.net.post(client.directory['newAccount'], acct)
   877    if resp.status_code != 200:
   878        raise(Exception("incorrect response returned for onlyReturnExisting"))
   879
   880    other_client = chisel2.uninitialized_client()
   881    newAcct = extendedAcct({
   882        "termsOfServiceAgreed": True,
   883        "contact": [email],
   884        "onlyReturnExisting": True
   885    })
   886    chisel2.expect_problem("urn:ietf:params:acme:error:accountDoesNotExist",
   887        lambda: other_client.net.post(other_client.directory['newAccount'], newAcct))
   888
   889def BouncerHTTPRequestHandler(redirect, guestlist):
   890    """
   891    BouncerHTTPRequestHandler returns a BouncerHandler class that acts like
   892    a club bouncer in front of another server. The bouncer will respond to
   893    GET requests by looking up the allowed number of requests in the guestlist
   894    for the User-Agent making the request. If there is at least one guestlist
   895    spot for that UA it will be redirected to the real server and the
   896    guestlist will be decremented. Once the guestlist spots for a UA are
   897    expended requests will get a bogus result and have to stand outside in the
   898    cold
   899    """
   900    class BouncerHandler(BaseHTTPRequestHandler):
   901        def __init__(self, *args, **kwargs):
   902            BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
   903
   904        def do_HEAD(self):
   905            # This is used by wait_for_server
   906            self.send_response(200)
   907            self.end_headers()
   908
   909        def do_GET(self):
   910            ua = self.headers['User-Agent']
   911            guestlistAllows = BouncerHandler.guestlist.get(ua, 0)
   912            # If there is still space on the guestlist for this UA then redirect
   913            # the request and decrement the guestlist.
   914            if guestlistAllows > 0:
   915                BouncerHandler.guestlist[ua] -= 1
   916                self.log_message("BouncerHandler UA {0} is on the Guestlist. {1} requests remaining.".format(ua, BouncerHandler.guestlist[ua]))
   917                self.send_response(302)
   918                self.send_header("Location", BouncerHandler.redirect)
   919                self.end_headers()
   920            # Otherwise return a bogus result
   921            else:
   922                self.log_message("BouncerHandler UA {0} has no requests on the Guestlist. Sending request to the curb".format(ua))
   923                self.send_response(200)
   924                self.end_headers()
   925                self.wfile.write(u"(• ◡ •) <( VIPs only! )".encode())
   926
   927    BouncerHandler.guestlist = guestlist
   928    BouncerHandler.redirect = redirect
   929    return BouncerHandler
   930
   931def wait_for_server(addr):
   932    while True:
   933        try:
   934            # NOTE(@cpu): Using HEAD here instead of GET because the
   935            # BouncerHandler modifies its state for GET requests.
   936            status = requests.head(addr).status_code
   937            if status == 200:
   938                return
   939        except requests.exceptions.ConnectionError:
   940            pass
   941        time.sleep(0.5)
   942
   943def multiva_setup(client, guestlist):
   944    """
   945    Setup a testing domain and backing multiva server setup. This will block
   946    until the server is ready. The returned cleanup function should be used to
   947    stop the server. The first bounceFirst requests to the server will be sent
   948    to the real challtestsrv for a good answer, the rest will get a bad
   949    answer. Domain name is randomly chosen with random_domain().
   950    """
   951    hostname = random_domain()
   952
   953    csr_pem = chisel2.make_csr([hostname])
   954    order = client.new_order(csr_pem)
   955    authz = order.authorizations[0]
   956    chall = None
   957    for c in authz.body.challenges:
   958        if isinstance(c.chall, challenges.HTTP01):
   959            chall = c.chall
   960    if chall is None:
   961        raise(Exception("No HTTP-01 challenge found for random domain authz"))
   962
   963    token = chall.encode("token")
   964
   965    # Calculate the challenge's keyauth so we can add a good keyauth response on
   966    # the real challtestsrv that we redirect VIP requests to.
   967    resp = chall.response(client.net.key)
   968    keyauth = resp.key_authorization
   969    challSrv.add_http01_response(token, keyauth)
   970
   971    # Add an A record for the domains to ensure the VA's requests are directed
   972    # to the interface that we bound the HTTPServer to.
   973    challSrv.add_a_record(hostname, ["10.88.88.88"])
   974
   975    # Add an A record for the redirect target that sends it to the real chall
   976    # test srv for a valid HTTP-01 response.
   977    redirHostname = "pebble-challtestsrv.example.com"
   978    challSrv.add_a_record(redirHostname, ["10.77.77.77"])
   979
   980    # Start a simple python HTTP server on port 80 in its own thread.
   981    # NOTE(@cpu): The pebble-challtestsrv binds 10.77.77.77:80 for HTTP-01
   982    # challenges so we must use the 10.88.88.88 address for the throw away
   983    # server for this test and add a mock DNS entry that directs the VA to it.
   984    redirect = "http://{0}/.well-known/acme-challenge/{1}".format(
   985            redirHostname, token)
   986    httpd = HTTPServer(("10.88.88.88", 80), BouncerHTTPRequestHandler(redirect, guestlist))
   987    thread = threading.Thread(target = httpd.serve_forever)
   988    thread.daemon = False
   989    thread.start()
   990
   991    def cleanup():
   992        # Remove the challtestsrv mocks
   993        challSrv.remove_a_record(hostname)
   994        challSrv.remove_a_record(redirHostname)
   995        challSrv.remove_http01_response(token)
   996        # Shut down the HTTP server gracefully and join on its thread.
   997        httpd.shutdown()
   998        httpd.server_close()
   999        thread.join()
  1000
  1001    return hostname, cleanup
  1002
  1003def test_http_multiva_threshold_pass():
  1004    client = chisel2.make_client()
  1005
  1006    # Configure a guestlist that will pass the multiVA threshold test by
  1007    # allowing the primary VA and one remote.
  1008    guestlist = {"boulder": 1, "boulder-remote-b": 1}
  1009
  1010    hostname, cleanup = multiva_setup(client, guestlist)
  1011
  1012    try:
  1013        # With the maximum number of allowed remote VA failures the overall
  1014        # challenge should still succeed.
  1015        chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
  1016    finally:
  1017        cleanup()
  1018
  1019def test_http_multiva_primary_fail_remote_pass():
  1020    client = chisel2.make_client()
  1021
  1022    # Configure a guestlist that will fail the primary VA check but allow the
  1023    # remote VAs
  1024    guestlist = {"boulder": 0, "boulder-remote-a": 1, "boulder-remote-b": 1}
  1025
  1026    hostname, cleanup = multiva_setup(client, guestlist)
  1027
  1028    foundException = False
  1029
  1030    try:
  1031        # The overall validation should fail even if the remotes are allowed
  1032        # because the primary VA result cannot be overridden.
  1033        chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
  1034    except acme_errors.ValidationError as e:
  1035        # NOTE(@cpu): Chisel2's expect_problem doesn't work in this case so this
  1036        # test needs to unpack an `acme_errors.ValidationError` on its own. It
  1037        # might be possible to clean this up in the future.
  1038        if len(e.failed_authzrs) != 1:
  1039            raise(Exception("expected one failed authz, found {0}".format(len(e.failed_authzrs))))
  1040        challs = e.failed_authzrs[0].body.challenges
  1041        httpChall = None
  1042        for chall_body in challs:
  1043            if isinstance(chall_body.chall, challenges.HTTP01):
  1044                httpChall = chall_body
  1045        if httpChall is None:
  1046            raise(Exception("no HTTP-01 challenge in failed authz"))
  1047        if httpChall.error.typ != "urn:ietf:params:acme:error:unauthorized":
  1048            raise(Exception("expected unauthorized prob, found {0}".format(httpChall.error.typ)))
  1049        foundException = True
  1050    finally:
  1051        cleanup()
  1052        if foundException is False:
  1053            raise(Exception("Overall validation did not fail"))
  1054
  1055def test_http_multiva_threshold_fail():
  1056    client = chisel2.make_client()
  1057
  1058    # Configure a guestlist that will fail the multiVA threshold test by
  1059    # only allowing the primary VA.
  1060    guestlist = {"boulder": 1}
  1061
  1062    hostname, cleanup = multiva_setup(client, guestlist)
  1063
  1064    failed_authzrs = []
  1065    try:
  1066        chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
  1067    except acme_errors.ValidationError as e:
  1068        # NOTE(@cpu): Chisel2's expect_problem doesn't work in this case so this
  1069        # test needs to unpack an `acme_errors.ValidationError` on its own. It
  1070        # might be possible to clean this up in the future.
  1071        failed_authzrs = e.failed_authzrs
  1072    finally:
  1073        cleanup()
  1074    if len(failed_authzrs) != 1:
  1075        raise(Exception("expected one failed authz, found {0}".format(len(failed_authzrs))))
  1076    challs = failed_authzrs[0].body.challenges
  1077    httpChall = None
  1078    for chall_body in challs:
  1079        if isinstance(chall_body.chall, challenges.HTTP01):
  1080            httpChall = chall_body
  1081    if httpChall is None:
  1082        raise(Exception("no HTTP-01 challenge in failed authz"))
  1083    if httpChall.error.typ != "urn:ietf:params:acme:error:unauthorized":
  1084        raise(Exception("expected unauthorized prob, found {0}".format(httpChall.error.typ)))
  1085    if not httpChall.error.detail.startswith("During secondary validation: "):
  1086        raise(Exception("expected 'During secondary validation' problem detail, found {0}".format(httpChall.error.detail)))
  1087
  1088class FakeH2ServerHandler(socketserver.BaseRequestHandler):
  1089    """
  1090    FakeH2ServerHandler is a TCP socket handler that writes data representing an
  1091    initial HTTP/2 SETTINGS frame as a response to all received data.
  1092    """
  1093    def handle(self):
  1094        # Read whatever the HTTP request was so that the response isn't seen as
  1095        # unsolicited.
  1096        self.data = self.request.recv(1024).strip()
  1097        # Blast some HTTP/2 bytes onto the socket
  1098        # Truncated example data from taken from the community forum:
  1099        # https://community.letsencrypt.org/t/le-validation-error-if-server-is-in-google-infrastructure/51841
  1100        self.request.sendall(b"\x00\x00\x12\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x80\x00")
  1101
  1102def wait_for_tcp_server(addr, port):
  1103    """
  1104    wait_for_tcp_server attempts to make a TCP connection to the given
  1105    address/port every 0.5s until it succeeds.
  1106    """
  1107    while True:
  1108        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  1109        try:
  1110            sock.connect((addr, port))
  1111            sock.sendall(b"\n")
  1112            return
  1113        except socket.error:
  1114            time.sleep(0.5)
  1115            pass
  1116
  1117def test_http2_http01_challenge():
  1118    """
  1119    test_http2_http01_challenge tests that an HTTP-01 challenge made to a HTTP/2
  1120    server fails with a specific error message for this case.
  1121    """
  1122    client = chisel2.make_client()
  1123    hostname = "fake.h2.example.com"
  1124
  1125    # Add an A record for the test server to ensure the VA's requests are directed
  1126    # to the interface that we bind the FakeH2ServerHandler to.
  1127    challSrv.add_a_record(hostname, ["10.88.88.88"])
  1128
  1129    # Allow socket address reuse on the base TCPServer class. Failing to do this
  1130    # causes subsequent integration tests to fail with "Address in use" errors even
  1131    # though this test _does_ call shutdown() and server_close(). Even though the
  1132    # server was shut-down Python's socket will be in TIME_WAIT because of prev. client
  1133    # connections. Having the TCPServer set SO_REUSEADDR on the socket solves
  1134    # the problem.
  1135    socketserver.TCPServer.allow_reuse_address = True
  1136    # Create, start, and wait for a fake HTTP/2 server.
  1137    server = socketserver.TCPServer(("10.88.88.88", 80), FakeH2ServerHandler)
  1138    thread = threading.Thread(target = server.serve_forever)
  1139    thread.daemon = False
  1140    thread.start()
  1141    wait_for_tcp_server("10.88.88.88", 80)
  1142
  1143    # Issuing an HTTP-01 challenge for this hostname should produce a connection
  1144    # problem with an error specific to the HTTP/2 misconfiguration.
  1145    expectedError = "Server is speaking HTTP/2 over HTTP"
  1146    try:
  1147        chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
  1148    except acme_errors.ValidationError as e:
  1149        for authzr in e.failed_authzrs:
  1150            c = chisel2.get_chall(authzr, challenges.HTTP01)
  1151            error = c.error
  1152            if error is None or error.typ != "urn:ietf:params:acme:error:connection":
  1153                raise(Exception("Expected connection prob, got %s" % (error.__str__())))
  1154            if not error.detail.endswith(expectedError):
  1155                raise(Exception("Expected prob detail ending in %s, got %s" % (expectedError, error.detail)))
  1156    finally:
  1157        server.shutdown()
  1158        server.server_close()
  1159        thread.join()
  1160
  1161def test_new_order_policy_errs():
  1162    """
  1163    Test that creating an order with policy blocked identifiers returns
  1164    a problem with subproblems.
  1165    """
  1166    client = chisel2.make_client(None)
  1167
  1168    # 'in-addr.arpa' is present in `test/hostname-policy.yaml`'s
  1169    # HighRiskBlockedNames list.
  1170    csr_pem = chisel2.make_csr(["out-addr.in-addr.arpa", "between-addr.in-addr.arpa"])
  1171
  1172    # With two policy blocked names in the order we expect to get back a top
  1173    # level rejectedIdentifier with a detail message that references
  1174    # subproblems.
  1175    #
  1176    # TODO(@cpu): After https://github.com/certbot/certbot/issues/7046 is
  1177    # implemented in the upstream `acme` module this test should also ensure the
  1178    # subproblems are properly represented.
  1179    ok = False
  1180    try:
  1181        order = client.new_order(csr_pem)
  1182    except messages.Error as e:
  1183        ok = True
  1184        if e.typ != "urn:ietf:params:acme:error:rejectedIdentifier":
  1185            raise(Exception("Expected rejectedIdentifier type problem, got {0}".format(e.typ)))
  1186        if e.detail != 'Error creating new order :: Cannot issue for "between-addr.in-addr.arpa": The ACME server refuses to issue a certificate for this domain name, because it is forbidden by policy (and 1 more problems. Refer to sub-problems for more information.)':
  1187            raise(Exception("Order problem detail did not match expected"))
  1188    if not ok:
  1189        raise(Exception("Expected problem, got no error"))
  1190
  1191def test_long_san_no_cn():
  1192    if CONFIG_NEXT:
  1193        return
  1194    try:
  1195        chisel2.auth_and_issue(["".join(random.choice(string.ascii_uppercase) for x in range(61)) + ".com"])
  1196        # if we get to this raise the auth_and_issue call didn't fail, so fail the test
  1197        raise(Exception("Issuance didn't fail when the only SAN in a certificate was longer than the max CN length"))
  1198    except messages.Error as e:
  1199        if e.typ != "urn:ietf:params:acme:error:rejectedIdentifier":
  1200            raise(Exception("Expected malformed type problem, got {0}".format(e.typ)))
  1201        if e.detail != "NewOrder request did not include a SAN short enough to fit in CN":
  1202            raise(Exception("Problem detail did not match expected"))
  1203
  1204def test_delete_unused_challenges():
  1205    order = chisel2.auth_and_issue([random_domain()], chall_type="dns-01")
  1206    a = order.authorizations[0]
  1207    if len(a.body.challenges) != 1:
  1208        raise(Exception("too many challenges (%d) left after validation" % len(a.body.challenges)))
  1209    if not isinstance(a.body.challenges[0].chall, challenges.DNS01):
  1210        raise(Exception("wrong challenge type left after validation"))
  1211
  1212    # intentionally fail a challenge
  1213    client = chisel2.make_client()
  1214    csr_pem = chisel2.make_csr([random_domain()])
  1215    order = client.new_order(csr_pem)
  1216    c = chisel2.get_chall(order.authorizations[0], challenges.DNS01)
  1217    client.answer_challenge(c, c.response(client.net.key))
  1218    for _ in range(5):
  1219        a, _ = client.poll(order.authorizations[0])
  1220        if a.body.status == Status("invalid"):
  1221            break
  1222        time.sleep(1)
  1223    if len(a.body.challenges) != 1:
  1224        raise(Exception("too many challenges (%d) left after failed validation" %
  1225            len(a.body.challenges)))
  1226    if not isinstance(a.body.challenges[0].chall, challenges.DNS01):
  1227        raise(Exception("wrong challenge type left after validation"))
  1228
  1229def test_auth_deactivation_v2():
  1230    client = chisel2.make_client(None)
  1231    csr_pem = chisel2.make_csr([random_domain()])
  1232    order = client.new_order(csr_pem)
  1233    resp = client.deactivate_authorization(order.authorizations[0])
  1234    if resp.body.status is not messages.STATUS_DEACTIVATED:
  1235        raise(Exception("unexpected authorization status"))
  1236
  1237    order = chisel2.auth_and_issue([random_domain()], client=client)
  1238    resp = client.deactivate_authorization(order.authorizations[0])
  1239    if resp.body.status is not messages.STATUS_DEACTIVATED:
  1240        raise(Exception("unexpected authorization status"))
  1241
  1242def test_ocsp():
  1243    cert_file = temppath('test_ocsp.pem')
  1244    chisel2.auth_and_issue([random_domain()], cert_output=cert_file.name)
  1245    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "good")
  1246
  1247def test_ct_submission():
  1248    hostname = random_domain()
  1249
  1250    chisel2.auth_and_issue([hostname])
  1251
  1252    # These should correspond to the configured logs in ra.json.
  1253    log_groups = [
  1254        ["http://boulder.service.consul:4600/submissions", "http://boulder.service.consul:4601/submissions", "http://boulder.service.consul:4602/submissions", "http://boulder.service.consul:4603/submissions"],
  1255        ["http://boulder.service.consul:4604/submissions", "http://boulder.service.consul:4605/submissions"],
  1256        ["http://boulder.service.consul:4606/submissions"],
  1257        ["http://boulder.service.consul:4607/submissions"],
  1258        ["http://boulder.service.consul:4608/submissions"],
  1259        ["http://boulder.service.consul:4609/submissions"],
  1260    ]
  1261
  1262    # These should correspond to the logs with `submitFinal` in ra.json.
  1263    final_logs = [
  1264        "http://boulder.service.consul:4600/submissions",
  1265        "http://boulder.service.consul:4601/submissions",
  1266        "http://boulder.service.consul:4606/submissions",
  1267        "http://boulder.service.consul:4609/submissions",
  1268     ]
  1269
  1270    # We'd like to enforce strict limits here (exactly 1 submission per group,
  1271    # exactly two submissions overall) but the async nature of the race system
  1272    # means we can't -- a slowish submission to one log in a group could trigger
  1273    # a very fast submission to a different log in the same group, and then both
  1274    # submissions could succeed at the same time. Although the Go code will only
  1275    # use one of the SCTs, both logs will still have been submitted to, and it
  1276    # will show up here.
  1277    total_count = 0
  1278    for i in range(len(log_groups)):
  1279        group_count = 0
  1280        for j in range(len(log_groups[i])):
  1281            log = log_groups[i][j]
  1282            count = int(requests.get(log + "?hostnames=%s" % hostname).text)
  1283            threshold = 1
  1284            if log in final_logs:
  1285                threshold += 1
  1286            if count > threshold:
  1287                raise(Exception("Got %d submissions for log %s, expected at most %d" % (count, log, threshold)))
  1288            group_count += count
  1289        total_count += group_count
  1290    if total_count < 2:
  1291        raise(Exception("Got %d total submissions, expected at least 2" % total_count))
  1292
  1293def check_ocsp_basic_oid(cert_file, issuer_file, url):
  1294    """
  1295    This function checks if an OCSP response was successful, but doesn't verify
  1296    the signature or timestamp. This is useful when simulating the past, so we
  1297    don't incorrectly reject a response for being in the past.
  1298    """
  1299    ocsp_request = make_ocsp_req(cert_file, issuer_file)
  1300    responses = fetch_ocsp(ocsp_request, url)
  1301    # An unauthorized response (for instance, if the OCSP responder doesn't know
  1302    # about this cert) will just be 30 03 0A 01 06. A "good" or "revoked"
  1303    # response will contain, among other things, the id-pkix-ocsp-basic OID
  1304    # identifying the response type. We look for that OID to confirm we got a
  1305    # successful response.
  1306    expected = bytearray.fromhex("06 09 2B 06 01 05 05 07 30 01 01")
  1307    for resp in responses:
  1308        if not expected in bytearray(resp):
  1309            raise(Exception("Did not receive successful OCSP response: %s doesn't contain %s" %
  1310                (base64.b64encode(resp), base64.b64encode(expected))))
  1311
  1312ocsp_exp_unauth_setup_data = {}
  1313@register_six_months_ago
  1314def ocsp_exp_unauth_setup():
  1315    client = chisel2.make_client(None)
  1316    cert_file = temppath('ocsp_exp_unauth_setup.pem')
  1317    order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
  1318    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
  1319
  1320    # Since our servers are pretending to be in the past, but the openssl cli
  1321    # isn't, we'll get an expired OCSP response. Just check that it exists;
  1322    # don't do the full verification (which would fail).
  1323    check_ocsp_basic_oid(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002")
  1324    global ocsp_exp_unauth_setup_data
  1325    ocsp_exp_unauth_setup_data['cert_file'] = cert_file.name
  1326
  1327def test_ocsp_exp_unauth():
  1328    tries = 0
  1329    if 'cert_file' not in ocsp_exp_unauth_setup_data:
  1330        raise Exception("ocsp_exp_unauth_setup didn't run")
  1331    cert_file = ocsp_exp_unauth_setup_data['cert_file']
  1332    last_error = ""
  1333    while tries < 5:
  1334        try:
  1335            verify_ocsp(cert_file, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "XXX")
  1336            raise(Exception("Unexpected return from verify_ocsp"))
  1337        except subprocess.CalledProcessError as cpe:
  1338            last_error = cpe.output
  1339            if cpe.output == b"Responder Error: unauthorized (6)\n":
  1340                break
  1341        except e:
  1342            last_error = e
  1343            pass
  1344        tries += 1
  1345        time.sleep(0.25)
  1346    else:
  1347        raise(Exception("timed out waiting for unauthorized OCSP response for expired certificate. Last error: {}".format(last_error)))
  1348
  1349def test_blocked_key_account():
  1350    # Only config-next has a blocked keys file configured.
  1351    if not CONFIG_NEXT:
  1352        return
  1353
  1354    with open("test/test-ca.key", "rb") as key_file:
  1355        key = serialization.load_pem_private_key(key_file.read(), password=None, backend=default_backend())
  1356
  1357    # Create a client with the JWK set to a blocked private key
  1358    jwk = josepy.JWKRSA(key=key)
  1359    client = chisel2.uninitialized_client(jwk)
  1360    email = "test@not-example.com"
  1361
  1362    # Try to create an account
  1363    testPass = False
  1364    try:
  1365        client.new_account(messages.NewRegistration.from_data(email=email,
  1366                terms_of_service_agreed=True))
  1367    except acme_errors.Error as e:
  1368        if e.typ != "urn:ietf:params:acme:error:badPublicKey":
  1369            raise(Exception("problem did not have correct error type, had {0}".format(e.typ)))
  1370        if e.detail != "public key is forbidden":
  1371            raise(Exception("problem did not have correct error detail, had {0}".format(e.detail)))
  1372        testPass = True
  1373
  1374    if testPass is False:
  1375        raise(Exception("expected account creation to fail with Error when using blocked key"))
  1376
  1377def test_blocked_key_cert():
  1378    # Only config-next has a blocked keys file configured.
  1379    if not CONFIG_NEXT:
  1380        return
  1381
  1382    with open("test/test-ca.key", "r") as f:
  1383        pemBytes = f.read()
  1384
  1385    domains = [random_domain(), random_domain()]
  1386    csr = acme_crypto_util.make_csr(pemBytes, domains, False)
  1387
  1388    client = chisel2.make_client(None)
  1389    order = client.new_order(csr)
  1390    authzs = order.authorizations
  1391
  1392    testPass = False
  1393    cleanup = chisel2.do_http_challenges(client, authzs)
  1394    try:
  1395        order = client.poll_and_finalize(order)
  1396    except acme_errors.Error as e:
  1397        if e.typ != "urn:ietf:params:acme:error:badCSR":
  1398            raise(Exception("problem did not have correct error type, had {0}".format(e.typ)))
  1399        if e.detail != "Error finalizing order :: invalid public key in CSR: public key is forbidden":
  1400            raise(Exception("problem did not have correct error detail, had {0}".format(e.detail)))
  1401        testPass = True
  1402
  1403    if testPass is False:
  1404        raise(Exception("expected cert creation to fail with Error when using blocked key"))
  1405
  1406def test_expiration_mailer():
  1407    email_addr = "integration.%x@letsencrypt.org" % random.randrange(2**16)
  1408    order = chisel2.auth_and_issue([random_domain()], email=email_addr)
  1409    cert = parse_cert(order)
  1410    # Check that the expiration mailer sends a reminder
  1411    expiry = cert.not_valid_after
  1412    no_reminder = expiry + datetime.timedelta(days=-31)
  1413    first_reminder = expiry + datetime.timedelta(days=-13)
  1414    last_reminder = expiry + datetime.timedelta(days=-2)
  1415
  1416    requests.post("http://localhost:9381/clear", data='')
  1417    for time in (no_reminder, first_reminder, last_reminder):
  1418        print(get_future_output(
  1419            ["./bin/boulder", "expiration-mailer", "--config", "%s/expiration-mailer.json" % config_dir],
  1420            time))
  1421    resp = requests.get("http://localhost:9381/count?to=%s" % email_addr)
  1422    mailcount = int(resp.text)
  1423    if mailcount != 2:
  1424        raise(Exception("\nExpiry mailer failed: expected 2 emails, got %d" % mailcount))
  1425
  1426caa_recheck_setup_data = {}
  1427@register_twenty_days_ago
  1428def caa_recheck_setup():
  1429    client = chisel2.make_client()
  1430    # Issue a certificate with the clock set back, and save the authzs to check
  1431    # later that they are valid (200). They should however require rechecking for
  1432    # CAA purposes.
  1433    numNames = 10
  1434    # Generate numNames subdomains of a random domain
  1435    base_domain = random_domain()
  1436    domains = [ "{0}.{1}".format(str(n),base_domain) for n in range(numNames) ]
  1437    order = chisel2.auth_and_issue(domains, client=client)
  1438
  1439    global caa_recheck_setup_data
  1440    caa_recheck_setup_data = {
  1441        'client': client,
  1442        'authzs': order.authorizations,
  1443    }
  1444
  1445def test_recheck_caa():
  1446    """Request issuance for a domain where we have a old cached authz from when CAA
  1447       was good. We'll set a new CAA record forbidding issuance; the CAA should
  1448       recheck CAA and reject the request.
  1449    """
  1450    if 'authzs' not in caa_recheck_setup_data:
  1451        raise(Exception("CAA authzs not prepared for test_caa"))
  1452    domains = []
  1453    for a in caa_recheck_setup_data['authzs']:
  1454        response = caa_recheck_setup_data['client']._post(a.uri, None)
  1455        if response.status_code != 200:
  1456            raise(Exception("Unexpected response for CAA authz: ",
  1457                response.status_code))
  1458        domain = a.body.identifier.value
  1459        domains.append(domain)
  1460
  1461    # Set a forbidding CAA record on just one domain
  1462    challSrv.add_caa_issue(domains[3], ";")
  1463
  1464    # Request issuance for the previously-issued domain name, which should
  1465    # now be denied due to CAA.
  1466    chisel2.expect_problem("urn:ietf:params:acme:error:caa",
  1467        lambda: chisel2.auth_and_issue(domains, client=caa_recheck_setup_data['client']))
  1468
  1469def test_caa_good():
  1470    domain = random_domain()
  1471    challSrv.add_caa_issue(domain, "happy-hacker-ca.invalid")
  1472    chisel2.auth_and_issue([domain])
  1473
  1474def test_caa_reject():
  1475    domain = random_domain()
  1476    challSrv.add_caa_issue(domain, "sad-hacker-ca.invalid")
  1477    chisel2.expect_problem("urn:ietf:params:acme:error:caa",
  1478        lambda: chisel2.auth_and_issue([domain]))
  1479
  1480def test_caa_extensions():
  1481    goodCAA = "happy-hacker-ca.invalid"
  1482
  1483    client = chisel2.make_client()
  1484    caa_account_uri = client.net.account.uri
  1485    caa_records = [
  1486        {"domain": "accounturi.good-caa-reserved.com", "value":"{0}; accounturi={1}".format(goodCAA, caa_account_uri)},
  1487        {"domain": "dns-01-only.good-caa-reserved.com", "value": "{0}; validationmethods=dns-01".format(goodCAA)},
  1488        {"domain": "http-01-only.good-caa-reserved.com", "value": "{0}; validationmethods=http-01".format(goodCAA)},
  1489        {"domain": "dns-01-or-http01.good-caa-reserved.com", "value": "{0}; validationmethods=dns-01,http-01".format(goodCAA)},
  1490    ]
  1491    for policy in caa_records:
  1492        challSrv.add_caa_issue(policy["domain"], policy["value"])
  1493
  1494    chisel2.expect_problem("urn:ietf:params:acme:error:caa",
  1495        lambda: chisel2.auth_and_issue(["dns-01-only.good-caa-reserved.com"], chall_type="http-01"))
  1496
  1497    chisel2.expect_problem("urn:ietf:params:acme:error:caa",
  1498        lambda: chisel2.auth_and_issue(["http-01-only.good-caa-reserved.com"], chall_type="dns-01"))
  1499
  1500    ## Note: the additional names are to avoid rate limiting...
  1501    chisel2.auth_and_issue(["dns-01-only.good-caa-reserved.com", "www.dns-01-only.good-caa-reserved.com"], chall_type="dns-01")
  1502    chisel2.auth_and_issue(["http-01-only.good-caa-reserved.com", "www.http-01-only.good-caa-reserved.com"], chall_type="http-01")
  1503    chisel2.auth_and_issue(["dns-01-or-http-01.good-caa-reserved.com", "dns-01-only.good-caa-reserved.com"], chall_type="dns-01")
  1504    chisel2.auth_and_issue(["dns-01-or-http-01.good-caa-reserved.com", "http-01-only.good-caa-reserved.com"], chall_type="http-01")
  1505
  1506    ## CAA should fail with an arbitrary account, but succeed with the CAA client.
  1507    chisel2.expect_problem("urn:ietf:params:acme:error:caa", lambda: chisel2.auth_and_issue(["accounturi.good-caa-reserved.com"]))
  1508    chisel2.auth_and_issue(["accounturi.good-caa-reserved.com"], client=client)
  1509
  1510def test_new_account():
  1511    """
  1512    Test creating new accounts with no email, empty email, one email, and a
  1513    tuple of multiple emails.
  1514    """
  1515    for contact in (None, (), ("mailto:single@chisel.com",), ("mailto:one@chisel.com", "mailto:two@chisel.com")):
  1516        # We don't use `chisel2.make_client` or `messages.NewRegistration.from_data`
  1517        # here because they do too much client-side processing to make the
  1518        # contact addresses look "nice".
  1519        client = chisel2.uninitialized_client()
  1520        result = client.new_account(messages.NewRegistration(contact=contact, terms_of_service_agreed=True))
  1521        actual = result.body.contact
  1522        if contact is not None and contact != actual:
  1523            raise(Exception("New Account failed: expected contact %s, got %s" % (contact, actual)))
  1524
  1525def test_account_update():
  1526    """
  1527    Create a new ACME client/account with one contact email. Then update the
  1528    account to a different contact emails.
  1529    """
  1530    for contact in (None, (), ("mailto:single@chisel.com",), ("mailto:one@chisel.com", "mailto:two@chisel.com")):
  1531        # We don't use `chisel2.update_email` or `messages.NewRegistration.from_data`
  1532        # here because they do too much client-side processing to make the
  1533        # contact addresses look "nice".
  1534        print()
  1535        client = chisel2.make_client()
  1536        update = client.net.account.update(body=client.net.account.body.update(contact=contact))
  1537        result = client.update_registration(update)
  1538        actual = result.body.contact
  1539        if contact is not None and contact != actual:
  1540            raise(Exception("New Account failed: expected contact %s, got %s" % (contact, actual)))
  1541
  1542def test_renewal_exemption():
  1543    """
  1544    Under a single domain, issue two certificates for different subdomains of
  1545    the same name, then renewals of each of them. Since the certificatesPerName
  1546    rate limit in testing is 2 per 90 days, and the renewals should not be
  1547    counted under the renewal exemption, each of these issuances should succeed.
  1548    Then do one last issuance (for a third subdomain of the same name) that we
  1549    expect to be rate limited, just to check that the rate limit is actually 2,
  1550    and we are testing what we think we are testing. See
  1551    https://letsencrypt.org/docs/rate-limits/ for more details.
  1552    """
  1553    base_domain = random_domain()
  1554    # First issuance
  1555    chisel2.auth_and_issue(["www." + base_domain])
  1556    # First Renewal
  1557    chisel2.auth_and_issue(["www." + base_domain])
  1558    # Issuance of a different cert
  1559    chisel2.auth_and_issue(["blog." + base_domain])
  1560    # Renew that one
  1561    chisel2.auth_and_issue(["blog." + base_domain])
  1562    # Final, failed issuance, for another different cert
  1563    chisel2.expect_problem("urn:ietf:params:acme:error:rateLimited",
  1564        lambda: chisel2.auth_and_issue(["mail." + base_domain]))
  1565
  1566def test_certificates_per_name():
  1567    chisel2.expect_problem("urn:ietf:params:acme:error:rateLimited",
  1568        lambda: chisel2.auth_and_issue([random_domain() + ".lim.it"]))
  1569
  1570def test_oversized_csr():
  1571    # Number of names is chosen to be one greater than the configured RA/CA maxNames
  1572    numNames = 101
  1573    # Generate numNames subdomains of a random domain
  1574    base_domain = random_domain()
  1575    domains = [ "{0}.{1}".format(str(n),base_domain) for n in range(numNames) ]
  1576    # We expect issuing for these domains to produce a malformed error because
  1577    # there are too many names in the request.
  1578    chisel2.expect_problem("urn:ietf:params:acme:error:malformed",
  1579            lambda: chisel2.auth_and_issue(domains))
  1580
  1581def parse_cert(order):
  1582    return x509.load_pem_x509_certificate(order.fullchain_pem.encode(), default_backend())
  1583
  1584def test_admin_revoker_cert():
  1585    cert_file = temppath('test_admin_revoker_cert.pem')
  1586    order = chisel2.auth_and_issue([random_domain()], cert_output=cert_file.name)
  1587    parsed_cert = parse_cert(order)
  1588
  1589    # Revoke certificate by serial
  1590    reset_akamai_purges()
  1591    run(["./bin/boulder", "admin-revoker", "serial-revoke",
  1592        "--config", "%s/admin-revoker.json" % config_dir,
  1593        '%x' % parsed_cert.serial_number, '1'])
  1594
  1595    # Wait for OCSP response to indicate revocation took place
  1596    verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
  1597    verify_akamai_purge()
  1598
  1599def test_admin_revoker_batched():
  1600    serialFile = tempfile.NamedTemporaryFile(
  1601        dir=tempdir, suffix='.test_admin_revoker_batched.serials.hex',
  1602        mode='w+', delete=False)
  1603    cert_files = [
  1604        temppath('test_admin_revoker_batched.%d.pem' % x) for x in range(3)
  1605    ]
  1606
  1607    for cert_file in cert_files:
  1608        order = chisel2.auth_and_issue([random_domain()], cert_output=cert_file.name)
  1609        serialFile.write("%x\n" % parse_cert(order).serial_number)
  1610    serialFile.close()
  1611
  1612    run(["./bin/boulder", "admin-revoker", "batched-serial-revoke",
  1613        "--config", "%s/admin-revoker.json" % config_dir,
  1614        serialFile.name, '0', '2'])
  1615
  1616    for cert_file in cert_files:
  1617        verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
  1618
  1619def test_sct_embedding():
  1620    order = chisel2.auth_and_issue([random_domain()])
  1621    cert = parse_cert(order)
  1622
  1623    # make sure there is no poison extension
  1624    try:
  1625        cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3"))
  1626        raise(Exception("certificate contains CT poison extension"))
  1627    except x509.ExtensionNotFound:
  1628        # do nothing
  1629        pass
  1630
  1631    # make sure there is a SCT list extension
  1632    try:
  1633        sctList = cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"))
  1634    except x509.ExtensionNotFound:
  1635        raise(Exception("certificate doesn't contain SCT list extension"))
  1636    if len(sctList.value) != 2:
  1637        raise(Exception("SCT list contains wrong number of SCTs"))
  1638    for sct in sctList.value:
  1639        if sct.version != x509.certificate_transparency.Version.v1:
  1640            raise(Exception("SCT contains wrong version"))
  1641        if sct.entry_type != x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE:
  1642            raise(Exception("SCT contains wrong entry type"))
  1643        delta = sct.timestamp - datetime.datetime.now()
  1644        if abs(delta) > datetime.timedelta(hours=1):
  1645            raise(Exception("Delta between SCT timestamp and now was too great "
  1646                "%s vs %s (%s)" % (sct.timestamp, datetime.datetime.now(), delta)))
  1647
  1648def test_auth_deactivation():
  1649    client = chisel2.make_client(None)
  1650    d = random_domain()
  1651    csr_pem = chisel2.make_csr([d])
  1652    order = client.new_order(csr_pem)
  1653
  1654    resp = client.deactivate_authorization(order.authorizations[0])
  1655    if resp.body.status is not messages.STATUS_DEACTIVATED:
  1656        raise Exception("unexpected authorization status")
  1657
  1658    order = chisel2.auth_and_issue([random_domain()], client=client)
  1659    resp = client.deactivate_authorization(order.authorizations[0])
  1660    if resp.body.status is not messages.STATUS_DEACTIVATED:
  1661        raise Exception("unexpected authorization status")
  1662
  1663def get_ocsp_response_and_reason(cert_file, issuer_file, url):
  1664    """Returns the ocsp response output and revocation reason."""
  1665    output = verify_ocsp(cert_file, issuer_file, url, None)
  1666    m = re.search('Reason: (\w+)', output)
  1667    reason = m.group(1) if m is not None else ""
  1668    return output, reason
  1669
  1670ocsp_resigning_setup_data = {}
  1671@register_twenty_days_ago
  1672def ocsp_resigning_setup():
  1673    """Issue and then revoke a cert in the past.
  1674
  1675    Useful setup for test_ocsp_resigning, which needs to check that the
  1676    revocation reason is still correctly set after re-signing and old OCSP
  1677    response.
  1678    """
  1679    client = chisel2.make_client(None)
  1680    cert_file = temppath('ocsp_resigning_setup.pem')
  1681    order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
  1682
  1683    cert = OpenSSL.crypto.load_certificate(
  1684        OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
  1685    # Revoke for reason 5: cessationOfOperation
  1686    client.revoke(josepy.ComparableX509(cert), 5)
  1687
  1688    ocsp_response, reason = get_ocsp_response_and_reason(
  1689        cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002")
  1690    global ocsp_resigning_setup_data
  1691    ocsp_resigning_setup_data = {
  1692        'cert_file': cert_file.name,
  1693        'response': ocsp_response,
  1694        'reason': reason
  1695    }
  1696
  1697def test_ocsp_resigning():
  1698    """Check that, after re-signing an OCSP, the reason is still set."""
  1699    if 'response' not in ocsp_resigning_setup_data:
  1700        raise Exception("ocsp_resigning_setup didn't run")
  1701
  1702    tries = 0
  1703    while tries < 5:
  1704        resp, reason = get_ocsp_response_and_reason(
  1705            ocsp_resigning_setup_data['cert_file'], "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002")
  1706        if resp != ocsp_resigning_setup_data['response']:
  1707            break
  1708        tries += 1
  1709        time.sleep(0.25)
  1710    else:
  1711        raise(Exception("timed out waiting for re-signed OCSP response for certificate"))
  1712
  1713    if reason != ocsp_resigning_setup_data['reason']:
  1714        raise(Exception("re-signed ocsp response has different reason %s expected %s" % (
  1715            reason, ocsp_resigning_setup_data['reason'])))
  1716    if reason != "cessationOfOperation":
  1717        raise(Exception("re-signed ocsp response has wrong reason %s" % reason))

View as plain text