...

Text file src/github.com/letsencrypt/boulder/test/helpers.py

Documentation: github.com/letsencrypt/boulder/test

     1import base64
     2import os
     3import urllib
     4import time
     5import re
     6import random
     7import json
     8import requests
     9import socket
    10import tempfile
    11import shutil
    12import atexit
    13import errno
    14import subprocess
    15
    16import challtestsrv
    17
    18challSrv = challtestsrv.ChallTestServer()
    19tempdir = tempfile.mkdtemp()
    20
    21@atexit.register
    22def stop():
    23    shutil.rmtree(tempdir)
    24
    25config_dir = os.environ.get('BOULDER_CONFIG_DIR', '')
    26if config_dir == '':
    27    raise Exception("BOULDER_CONFIG_DIR was not set")
    28CONFIG_NEXT = config_dir.startswith("test/config-next")
    29
    30def temppath(name):
    31    """Creates and returns a closed file inside the tempdir."""
    32    f = tempfile.NamedTemporaryFile(
    33        dir=tempdir,
    34        suffix='.{0}'.format(name),
    35        mode='w+',
    36        delete=False
    37    )
    38    f.close()
    39    return f
    40
    41def fakeclock(date):
    42    return date.strftime("%a %b %d %H:%M:%S UTC %Y")
    43
    44def get_future_output(cmd, date):
    45    return subprocess.check_output(cmd, stderr=subprocess.STDOUT,
    46        env={'FAKECLOCK': fakeclock(date)}).decode()
    47
    48def random_domain():
    49    """Generate a random domain for testing (to avoid rate limiting)."""
    50    return "rand.%x.xyz" % random.randrange(2**32)
    51
    52def run(cmd, **kwargs):
    53    return subprocess.check_call(cmd, stderr=subprocess.STDOUT, **kwargs)
    54
    55def fetch_ocsp(request_bytes, url):
    56    """Fetch an OCSP response using POST, GET, and GET with URL encoding.
    57
    58    Returns a tuple of the responses.
    59    """
    60    ocsp_req_b64 = base64.b64encode(request_bytes).decode()
    61
    62    # Make the OCSP request three different ways: by POST, by GET, and by GET with
    63    # URL-encoded parameters. All three should have an identical response.
    64    get_response = requests.get("%s/%s" % (url, ocsp_req_b64)).content
    65    get_encoded_response = requests.get("%s/%s" % (url, urllib.parse.quote(ocsp_req_b64, safe = ""))).content
    66    post_response = requests.post("%s/" % (url), data=request_bytes).content
    67
    68    return (post_response, get_response, get_encoded_response)
    69
    70def make_ocsp_req(cert_file, issuer_file):
    71    """Return the bytes of an OCSP request for the given certificate file."""
    72    with tempfile.NamedTemporaryFile(dir=tempdir) as f:
    73        run(["openssl", "ocsp", "-no_nonce",
    74            "-issuer", issuer_file,
    75            "-cert", cert_file,
    76            "-reqout", f.name])
    77        ocsp_req = f.read()
    78    return ocsp_req
    79
    80def ocsp_verify(cert_file, issuer_file, ocsp_response):
    81    with tempfile.NamedTemporaryFile(dir=tempdir, delete=False) as f:
    82        f.write(ocsp_response)
    83        f.close()
    84        output = subprocess.check_output([
    85            'openssl', 'ocsp', '-no_nonce',
    86            '-issuer', issuer_file,
    87            '-cert', cert_file,
    88            '-verify_other', issuer_file,
    89            '-CAfile', '/hierarchy/root-cert-rsa.pem',
    90            '-respin', f.name], stderr=subprocess.STDOUT).decode()
    91    # OpenSSL doesn't always return non-zero when response verify fails, so we
    92    # also look for the string "Response Verify Failure"
    93    verify_failure = "Response Verify Failure"
    94    if re.search(verify_failure, output):
    95        print(output)
    96        raise(Exception("OCSP verify failure"))
    97    return output
    98
    99def verify_ocsp(cert_file, issuer_file, url, status="revoked", reason=None):
   100    ocsp_request = make_ocsp_req(cert_file, issuer_file)
   101    responses = fetch_ocsp(ocsp_request, url)
   102
   103    # Verify all responses are the same
   104    for resp in responses:
   105        if resp != responses[0]:
   106            raise(Exception("OCSP responses differed: %s vs %s" %(
   107                base64.b64encode(responses[0]), base64.b64encode(resp))))
   108
   109    # Check response is for the correct certificate and is correct
   110    # status
   111    resp = responses[0]
   112    verify_output = ocsp_verify(cert_file, issuer_file, resp)
   113    if status is not None:
   114        if not re.search("%s: %s" % (cert_file, status), verify_output):
   115            print(verify_output)
   116            raise(Exception("OCSP response wasn't '%s'" % status))
   117    if reason is not None:
   118        if not re.search("Reason: %s" % reason, verify_output):
   119            print(verify_output)
   120            raise(Exception("OCSP response wasn't '%s'" % reason))
   121    return verify_output
   122
   123def reset_akamai_purges():
   124    requests.post("http://localhost:6789/debug/reset-purges", data="{}")
   125
   126def verify_akamai_purge():
   127    deadline = time.time() + .4
   128    while True:
   129        time.sleep(0.05)
   130        if time.time() > deadline:
   131            raise(Exception("Timed out waiting for Akamai purge"))
   132        response = requests.get("http://localhost:6789/debug/get-purges")
   133        purgeData = response.json()
   134        if len(purgeData["V3"]) == 0:
   135            continue
   136        break
   137    reset_akamai_purges()
   138
   139twenty_days_ago_functions = [ ]
   140
   141def register_twenty_days_ago(f):
   142    """Register a function to be run during "setup_twenty_days_ago." This allows
   143       test cases to define their own custom setup.
   144    """
   145    twenty_days_ago_functions.append(f)
   146
   147def setup_twenty_days_ago():
   148    """Do any setup that needs to happen 20 day in the past, for tests that
   149       will run in the 'present'.
   150    """
   151    for f in twenty_days_ago_functions:
   152        f()
   153
   154six_months_ago_functions = []
   155
   156def register_six_months_ago(f):
   157    six_months_ago_functions.append(f)
   158
   159def setup_six_months_ago():
   160    [f() for f in six_months_ago_functions]
   161
   162def waitport(port, prog, perTickCheck=None):
   163    """Wait until a port on localhost is open."""
   164    for _ in range(1000):
   165        try:
   166            time.sleep(0.1)
   167            if perTickCheck is not None and not perTickCheck():
   168                return False
   169            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   170            s.connect(('localhost', port))
   171            s.close()
   172            return True
   173        except socket.error as e:
   174            if e.errno == errno.ECONNREFUSED:
   175                print("Waiting for debug port %d (%s)" % (port, prog))
   176            else:
   177                raise
   178    raise(Exception("timed out waiting for debug port %d (%s)" % (port, prog)))
   179
   180def waithealth(prog, addr):
   181    subprocess.check_call([
   182        './bin/health-checker',
   183        '-addr', addr,
   184        '-config', os.path.join(config_dir, 'health-checker.json')])

View as plain text