1# -*- coding: utf-8 -*-
2"""
3Integration test cases for ACMEv2 as implemented by boulder-wfe2.
4"""
5import subprocess
6import requests
7import datetime
8import time
9import os
10import json
11import re
12
13import OpenSSL
14
15from cryptography import x509
16from cryptography.hazmat.backends import default_backend
17from cryptography.hazmat.primitives.asymmetric import rsa
18from cryptography.hazmat.primitives import serialization
19
20import chisel2
21from helpers import *
22
23from acme import errors as acme_errors
24
25from acme.messages import Status, CertificateRequest, Directory, NewRegistration
26from acme import crypto_util as acme_crypto_util
27from acme import client as acme_client
28from acme import messages
29from acme import challenges
30from acme import errors
31
32import josepy
33
34import tempfile
35import shutil
36import atexit
37import random
38import string
39
40import threading
41from http.server import HTTPServer, BaseHTTPRequestHandler
42import socketserver
43import socket
44
45import challtestsrv
46challSrv = challtestsrv.ChallTestServer()
47
48def test_multidomain():
49 chisel2.auth_and_issue([random_domain(), random_domain()])
50
51def test_wildcardmultidomain():
52 """
53 Test issuance for a random domain and a random wildcard domain using DNS-01.
54 """
55 chisel2.auth_and_issue([random_domain(), "*."+random_domain()], chall_type="dns-01")
56
57def test_http_challenge():
58 chisel2.auth_and_issue([random_domain(), random_domain()], chall_type="http-01")
59
60def rand_http_chall(client):
61 d = random_domain()
62 csr_pem = chisel2.make_csr([d])
63 order = client.new_order(csr_pem)
64 authzs = order.authorizations
65 for a in authzs:
66 for c in a.body.challenges:
67 if isinstance(c.chall, challenges.HTTP01):
68 return d, c.chall
69 raise(Exception("No HTTP-01 challenge found for random domain authz"))
70
71def check_challenge_dns_err(chalType):
72 """
73 check_challenge_dns_err tests that performing an ACME challenge of the
74 specified type to a hostname that is configured to return SERVFAIL for all
75 queries produces the correct problem type and detail message.
76 """
77 client = chisel2.make_client()
78
79 # Create a random domains.
80 d = random_domain()
81
82 # Configure the chall srv to SERVFAIL all queries for that domain.
83 challSrv.add_servfail_response(d)
84
85 # Expect a DNS problem with a detail that matches a regex
86 expectedProbType = "dns"
87 expectedProbRegex = re.compile(r"SERVFAIL looking up (A|AAAA|TXT|CAA) for {0}".format(d))
88
89 # Try and issue for the domain with the given challenge type.
90 failed = False
91 try:
92 chisel2.auth_and_issue([d], client=client, chall_type=chalType)
93 except acme_errors.ValidationError as e:
94 # Mark that the auth_and_issue failed
95 failed = True
96 # Extract the failed challenge from each failed authorization
97 for authzr in e.failed_authzrs:
98 c = None
99 if chalType == "http-01":
100 c = chisel2.get_chall(authzr, challenges.HTTP01)
101 elif chalType == "dns-01":
102 c = chisel2.get_chall(authzr, challenges.DNS01)
103 elif chalType == "tls-alpn-01":
104 c = chisel2.get_chall(authzr, challenges.TLSALPN01)
105 else:
106 raise(Exception("Invalid challenge type requested: {0}".format(challType)))
107
108 # The failed challenge's error should match expected
109 error = c.error
110 if error is None or error.typ != "urn:ietf:params:acme:error:{0}".format(expectedProbType):
111 raise(Exception("Expected {0} prob, got {1}".format(expectedProbType, error.typ)))
112 if not expectedProbRegex.search(error.detail):
113 raise(Exception("Prob detail did not match expectedProbRegex, got \"{0}\"".format(error.detail)))
114 finally:
115 challSrv.remove_servfail_response(d)
116
117 # If there was no exception that means something went wrong. The test should fail.
118 if failed is False:
119 raise(Exception("No problem generated issuing for broken DNS identifier"))
120
121def test_http_challenge_dns_err():
122 """
123 test_http_challenge_dns_err tests that a HTTP-01 challenge for a domain
124 with broken DNS produces the correct problem response.
125 """
126 check_challenge_dns_err("http-01")
127
128def test_dns_challenge_dns_err():
129 """
130 test_dns_challenge_dns_err tests that a DNS-01 challenge for a domain
131 with broken DNS produces the correct problem response.
132 """
133 check_challenge_dns_err("dns-01")
134
135def test_tls_alpn_challenge_dns_err():
136 """
137 test_tls_alpn_challenge_dns_err tests that a TLS-ALPN-01 challenge for a domain
138 with broken DNS produces the correct problem response.
139 """
140 check_challenge_dns_err("tls-alpn-01")
141
142def test_http_challenge_broken_redirect():
143 """
144 test_http_challenge_broken_redirect tests that a common webserver
145 mis-configuration receives the correct specialized error message when attempting
146 an HTTP-01 challenge.
147 """
148 client = chisel2.make_client()
149
150 # Create an authz for a random domain and get its HTTP-01 challenge token
151 d, chall = rand_http_chall(client)
152 token = chall.encode("token")
153
154 # Create a broken HTTP redirect similar to a sort we see frequently "in the wild"
155 challengePath = "/.well-known/acme-challenge/{0}".format(token)
156 redirect = "http://{0}.well-known/acme-challenge/bad-bad-bad".format(d)
157 challSrv.add_http_redirect(
158 challengePath,
159 redirect)
160
161 # Expect the specialized error message
162 expectedError = "10.77.77.77: Fetching {0}: Invalid host in redirect target \"{1}.well-known\". Check webserver config for missing '/' in redirect target.".format(redirect, d)
163
164 # NOTE(@cpu): Can't use chisel2.expect_problem here because it doesn't let
165 # us interrogate the detail message easily.
166 try:
167 chisel2.auth_and_issue([d], client=client, chall_type="http-01")
168 except acme_errors.ValidationError as e:
169 for authzr in e.failed_authzrs:
170 c = chisel2.get_chall(authzr, challenges.HTTP01)
171 error = c.error
172 if error is None or error.typ != "urn:ietf:params:acme:error:connection":
173 raise(Exception("Expected connection prob, got %s" % (error.__str__())))
174 if error.detail != expectedError:
175 raise(Exception("Expected prob detail %s, got %s" % (expectedError, error.detail)))
176
177 challSrv.remove_http_redirect(challengePath)
178
179def test_failed_validation_limit():
180 """
181 Fail a challenge repeatedly for the same domain, with the same account. Once
182 we reach the rate limit we should get a rateLimitedError. Note that this
183 depends on the specific threshold configured in rate-limit-policies.yml.
184
185 This also incidentally tests a fix for
186 https://github.com/letsencrypt/boulder/issues/4329. We expect to get
187 ValidationErrors, eventually followed by a rate limit error.
188 """
189 domain = "fail." + random_domain()
190 csr_pem = chisel2.make_csr([domain])
191 client = chisel2.make_client()
192 threshold = 3
193 for _ in range(threshold):
194 order = client.new_order(csr_pem)
195 chall = order.authorizations[0].body.challenges[0]
196 client.answer_challenge(chall, chall.response(client.net.key))
197 try:
198 client.poll_and_finalize(order)
199 except errors.ValidationError as e:
200 pass
201 chisel2.expect_problem("urn:ietf:params:acme:error:rateLimited",
202 lambda: chisel2.auth_and_issue([domain], client=client))
203
204
205def test_http_challenge_loop_redirect():
206 client = chisel2.make_client()
207
208 # Create an authz for a random domain and get its HTTP-01 challenge token
209 d, chall = rand_http_chall(client)
210 token = chall.encode("token")
211
212 # Create a HTTP redirect from the challenge's validation path to itself
213 challengePath = "/.well-known/acme-challenge/{0}".format(token)
214 challSrv.add_http_redirect(
215 challengePath,
216 "http://{0}{1}".format(d, challengePath))
217
218 # Issuing for the the name should fail because of the challenge domains's
219 # redirect loop.
220 chisel2.expect_problem("urn:ietf:params:acme:error:connection",
221 lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
222
223 challSrv.remove_http_redirect(challengePath)
224
225def test_http_challenge_badport_redirect():
226 client = chisel2.make_client()
227
228 # Create an authz for a random domain and get its HTTP-01 challenge token
229 d, chall = rand_http_chall(client)
230 token = chall.encode("token")
231
232 # Create a HTTP redirect from the challenge's validation path to a host with
233 # an invalid port.
234 challengePath = "/.well-known/acme-challenge/{0}".format(token)
235 challSrv.add_http_redirect(
236 challengePath,
237 "http://{0}:1337{1}".format(d, challengePath))
238
239 # Issuing for the name should fail because of the challenge domain's
240 # invalid port redirect.
241 chisel2.expect_problem("urn:ietf:params:acme:error:connection",
242 lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
243
244 challSrv.remove_http_redirect(challengePath)
245
246def test_http_challenge_badhost_redirect():
247 client = chisel2.make_client()
248
249 # Create an authz for a random domain and get its HTTP-01 challenge token
250 d, chall = rand_http_chall(client)
251 token = chall.encode("token")
252
253 # Create a HTTP redirect from the challenge's validation path to a bare IP
254 # hostname.
255 challengePath = "/.well-known/acme-challenge/{0}".format(token)
256 challSrv.add_http_redirect(
257 challengePath,
258 "https://127.0.0.1{0}".format(challengePath))
259
260 # Issuing for the name should cause a connection error because the redirect
261 # domain name is an IP address.
262 chisel2.expect_problem("urn:ietf:params:acme:error:connection",
263 lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
264
265 challSrv.remove_http_redirect(challengePath)
266
267def test_http_challenge_badproto_redirect():
268 client = chisel2.make_client()
269
270 # Create an authz for a random domain and get its HTTP-01 challenge token
271 d, chall = rand_http_chall(client)
272 token = chall.encode("token")
273
274 # Create a HTTP redirect from the challenge's validation path to whacky
275 # non-http/https protocol URL.
276 challengePath = "/.well-known/acme-challenge/{0}".format(token)
277 challSrv.add_http_redirect(
278 challengePath,
279 "gopher://{0}{1}".format(d, challengePath))
280
281 # Issuing for the name should cause a connection error because the redirect
282 # domain name is an IP address.
283 chisel2.expect_problem("urn:ietf:params:acme:error:connection",
284 lambda: chisel2.auth_and_issue([d], client=client, chall_type="http-01"))
285
286 challSrv.remove_http_redirect(challengePath)
287
288def test_http_challenge_http_redirect():
289 client = chisel2.make_client()
290
291 # Create an authz for a random domain and get its HTTP-01 challenge token
292 d, chall = rand_http_chall(client)
293 token = chall.encode("token")
294 # Calculate its keyauth so we can add it in a special non-standard location
295 # for the redirect result
296 resp = chall.response(client.net.key)
297 keyauth = resp.key_authorization
298 challSrv.add_http01_response("http-redirect", keyauth)
299
300 # Create a HTTP redirect from the challenge's validation path to some other
301 # token path where we have registered the key authorization.
302 challengePath = "/.well-known/acme-challenge/{0}".format(token)
303 redirectPath = "/.well-known/acme-challenge/http-redirect?params=are&important=to¬=lose"
304 challSrv.add_http_redirect(
305 challengePath,
306 "http://{0}{1}".format(d, redirectPath))
307
308 chisel2.auth_and_issue([d], client=client, chall_type="http-01")
309
310 challSrv.remove_http_redirect(challengePath)
311 challSrv.remove_http01_response("http-redirect")
312
313 history = challSrv.http_request_history(d)
314 challSrv.clear_http_request_history(d)
315
316 # There should have been at least two GET requests made to the
317 # challtestsrv. There may have been more if remote VAs were configured.
318 if len(history) < 2:
319 raise(Exception("Expected at least 2 HTTP request events on challtestsrv, found {1}".format(len(history))))
320
321 initialRequests = []
322 redirectedRequests = []
323
324 for request in history:
325 # All requests should have been over HTTP
326 if request['HTTPS'] is True:
327 raise(Exception("Expected all requests to be HTTP"))
328 # Initial requests should have the expected initial HTTP-01 URL for the challenge
329 if request['URL'] == challengePath:
330 initialRequests.append(request)
331 # Redirected requests should have the expected redirect path URL with all
332 # its parameters
333 elif request['URL'] == redirectPath:
334 redirectedRequests.append(request)
335 else:
336 raise(Exception("Unexpected request URL {0} in challtestsrv history: {1}".format(request['URL'], request)))
337
338 # There should have been at least 1 initial HTTP-01 validation request.
339 if len(initialRequests) < 1:
340 raise(Exception("Expected {0} initial HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(initialRequests))))
341
342 # There should have been at least 1 redirected HTTP request for each VA
343 if len(redirectedRequests) < 1:
344 raise(Exception("Expected {0} redirected HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(redirectedRequests))))
345
346def test_http_challenge_https_redirect():
347 client = chisel2.make_client()
348
349 # Create an authz for a random domain and get its HTTP-01 challenge token
350 d, chall = rand_http_chall(client)
351 token = chall.encode("token")
352 # Calculate its keyauth so we can add it in a special non-standard location
353 # for the redirect result
354 resp = chall.response(client.net.key)
355 keyauth = resp.key_authorization
356 challSrv.add_http01_response("https-redirect", keyauth)
357
358 # Create a HTTP redirect from the challenge's validation path to an HTTPS
359 # path with some parameters
360 challengePath = "/.well-known/acme-challenge/{0}".format(token)
361 redirectPath = "/.well-known/acme-challenge/https-redirect?params=are&important=to¬=lose"
362 challSrv.add_http_redirect(
363 challengePath,
364 "https://{0}{1}".format(d, redirectPath))
365
366 # Also add an A record for the domain pointing to the interface that the
367 # HTTPS HTTP-01 challtestsrv is bound.
368 challSrv.add_a_record(d, ["10.77.77.77"])
369
370 try:
371 chisel2.auth_and_issue([d], client=client, chall_type="http-01")
372 except errors.ValidationError as e:
373 problems = []
374 for authzr in e.failed_authzrs:
375 for chall in authzr.body.challenges:
376 error = chall.error
377 if error:
378 problems.append(error.__str__())
379 raise(Exception("validation problem: %s" % "; ".join(problems)))
380
381 challSrv.remove_http_redirect(challengePath)
382 challSrv.remove_a_record(d)
383
384 history = challSrv.http_request_history(d)
385 challSrv.clear_http_request_history(d)
386
387 # There should have been at least two GET requests made to the challtestsrv by the VA
388 if len(history) < 2:
389 raise(Exception("Expected 2 HTTP request events on challtestsrv, found {0}".format(len(history))))
390
391 initialRequests = []
392 redirectedRequests = []
393
394 for request in history:
395 # Initial requests should have the expected initial HTTP-01 URL for the challenge
396 if request['URL'] == challengePath:
397 initialRequests.append(request)
398 # Redirected requests should have the expected redirect path URL with all
399 # its parameters
400 elif request['URL'] == redirectPath:
401 redirectedRequests.append(request)
402 else:
403 raise(Exception("Unexpected request URL {0} in challtestsrv history: {1}".format(request['URL'], request)))
404
405 # There should have been at least 1 initial HTTP-01 validation request.
406 if len(initialRequests) < 1:
407 raise(Exception("Expected {0} initial HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(initialRequests))))
408 # All initial requests should have been over HTTP
409 for r in initialRequests:
410 if r['HTTPS'] is True:
411 raise(Exception("Expected all initial requests to be HTTP, got %s" % r))
412
413 # There should have been at least 1 redirected HTTP request for each VA
414 if len(redirectedRequests) < 1:
415 raise(Exception("Expected {0} redirected HTTP-01 request events on challtestsrv, found {1}".format(validation_attempts, len(redirectedRequests))))
416 # All the redirected requests should have been over HTTPS with the correct
417 # SNI value
418 for r in redirectedRequests:
419 if r['HTTPS'] is False:
420 raise(Exception("Expected all redirected requests to be HTTPS"))
421 if r['ServerName'] != d:
422 raise(Exception("Expected all redirected requests to have ServerName {0} got \"{1}\"".format(d, r['ServerName'])))
423
424class SlowHTTPRequestHandler(BaseHTTPRequestHandler):
425 def do_GET(self):
426 try:
427 # Sleeptime needs to be larger than the RA->VA timeout (20s at the
428 # time of writing)
429 sleeptime = 22
430 print("SlowHTTPRequestHandler: sleeping for {0}s\n".format(sleeptime))
431 time.sleep(sleeptime)
432 self.send_response(200)
433 self.end_headers()
434 self.wfile.write(b"this is not an ACME key authorization")
435 except:
436 pass
437
438class SlowHTTPServer(HTTPServer):
439 # Override handle_error so we don't print a misleading stack trace when the
440 # VA terminates the connection due to timeout.
441 def handle_error(self, request, client_address):
442 pass
443
444def test_http_challenge_timeout():
445 """
446 test_http_challenge_timeout tests that the VA times out challenge requests
447 to a slow HTTP server appropriately.
448 """
449 # Start a simple python HTTP server on port 80 in its own thread.
450 # NOTE(@cpu): The pebble-challtestsrv binds 10.77.77.77:80 for HTTP-01
451 # challenges so we must use the 10.88.88.88 address for the throw away
452 # server for this test and add a mock DNS entry that directs the VA to it.
453 httpd = SlowHTTPServer(("10.88.88.88", 80), SlowHTTPRequestHandler)
454 thread = threading.Thread(target = httpd.serve_forever)
455 thread.daemon = False
456 thread.start()
457
458 # Pick a random domain
459 hostname = random_domain()
460
461 # Add A record for the domains to ensure the VA's requests are directed
462 # to the interface that we bound the HTTPServer to.
463 challSrv.add_a_record(hostname, ["10.88.88.88"])
464
465 start = datetime.datetime.utcnow()
466 end = 0
467
468 try:
469 # We expect a connection timeout error to occur
470 chisel2.expect_problem("urn:ietf:params:acme:error:connection",
471 lambda: chisel2.auth_and_issue([hostname], chall_type="http-01"))
472 end = datetime.datetime.utcnow()
473 finally:
474 # Shut down the HTTP server gracefully and join on its thread.
475 httpd.shutdown()
476 httpd.server_close()
477 thread.join()
478
479 delta = end - start
480 # Expected duration should be the RA->VA timeout plus some padding (At
481 # present the timeout is 20s so adding 2s of padding = 22s)
482 expectedDuration = 22
483 if delta.total_seconds() == 0 or delta.total_seconds() > expectedDuration:
484 raise(Exception("expected timeout to occur in under {0} seconds. Took {1}".format(expectedDuration, delta.total_seconds())))
485
486
487def test_tls_alpn_challenge():
488 # Pick two random domains
489 domains = [random_domain(),random_domain()]
490
491 # Add A records for these domains to ensure the VA's requests are directed
492 # to the interface that the challtestsrv has bound for TLS-ALPN-01 challenge
493 # responses
494 for host in domains:
495 challSrv.add_a_record(host, ["10.88.88.88"])
496 chisel2.auth_and_issue(domains, chall_type="tls-alpn-01")
497
498 for host in domains:
499 challSrv.remove_a_record(host)
500
501def test_overlapping_wildcard():
502 """
503 Test issuance for a random domain and a wildcard version of the same domain
504 using DNS-01. This should result in *two* distinct authorizations.
505 """
506 domain = random_domain()
507 domains = [ domain, "*."+domain ]
508 client = chisel2.make_client(None)
509 csr_pem = chisel2.make_csr(domains)
510 order = client.new_order(csr_pem)
511 authzs = order.authorizations
512
513 if len(authzs) != 2:
514 raise(Exception("order for %s had %d authorizations, expected 2" %
515 (domains, len(authzs))))
516
517 cleanup = chisel2.do_dns_challenges(client, authzs)
518 try:
519 order = client.poll_and_finalize(order)
520 finally:
521 cleanup()
522
523def test_highrisk_blocklist():
524 """
525 Test issuance for a subdomain of a HighRiskBlockedNames entry. It should
526 fail with a policy error.
527 """
528
529 # We include "example.org" in `test/hostname-policy.yaml` in the
530 # HighRiskBlockedNames list so issuing for "foo.example.org" should be
531 # blocked.
532 domain = "foo.example.org"
533 # We expect this to produce a policy problem
534 chisel2.expect_problem("urn:ietf:params:acme:error:rejectedIdentifier",
535 lambda: chisel2.auth_and_issue([domain], chall_type="dns-01"))
536
537def test_wildcard_exactblacklist():
538 """
539 Test issuance for a wildcard that would cover an exact blacklist entry. It
540 should fail with a policy error.
541 """
542
543 # We include "highrisk.le-test.hoffman-andrews.com" in `test/hostname-policy.yaml`
544 # Issuing for "*.le-test.hoffman-andrews.com" should be blocked
545 domain = "*.le-test.hoffman-andrews.com"
546 # We expect this to produce a policy problem
547 chisel2.expect_problem("urn:ietf:params:acme:error:rejectedIdentifier",
548 lambda: chisel2.auth_and_issue([domain], chall_type="dns-01"))
549
550def test_wildcard_authz_reuse():
551 """
552 Test that an authorization for a base domain obtained via HTTP-01 isn't
553 reused when issuing a wildcard for that base domain later on.
554 """
555
556 # Create one client to reuse across multiple issuances
557 client = chisel2.make_client(None)
558
559 # Pick a random domain to issue for
560 domains = [ random_domain() ]
561 csr_pem = chisel2.make_csr(domains)
562
563 # Submit an order for the name
564 order = client.new_order(csr_pem)
565 # Complete the order via an HTTP-01 challenge
566 cleanup = chisel2.do_http_challenges(client, order.authorizations)
567 try:
568 order = client.poll_and_finalize(order)
569 finally:
570 cleanup()
571
572 # Now try to issue a wildcard for the random domain
573 domains[0] = "*." + domains[0]
574 csr_pem = chisel2.make_csr(domains)
575 order = client.new_order(csr_pem)
576
577 # We expect all of the returned authorizations to be pending status
578 for authz in order.authorizations:
579 if authz.body.status != Status("pending"):
580 raise(Exception("order for %s included a non-pending authorization (status: %s) from a previous HTTP-01 order" %
581 ((domains), str(authz.body.status))))
582
583def test_bad_overlap_wildcard():
584 chisel2.expect_problem("urn:ietf:params:acme:error:malformed",
585 lambda: chisel2.auth_and_issue(["*.example.com", "www.example.com"]))
586
587def test_duplicate_orders():
588 """
589 Test that the same client issuing for the same domain names twice in a row
590 works without error.
591 """
592 client = chisel2.make_client(None)
593 domains = [ random_domain() ]
594 chisel2.auth_and_issue(domains, client=client)
595 chisel2.auth_and_issue(domains, client=client)
596
597def test_order_reuse_failed_authz():
598 """
599 Test that creating an order for a domain name, failing an authorization in
600 that order, and submitting another new order request for the same name
601 doesn't reuse a failed authorizaton in the new order.
602 """
603
604 client = chisel2.make_client(None)
605 domains = [ random_domain() ]
606 csr_pem = chisel2.make_csr(domains)
607
608 order = client.new_order(csr_pem)
609 firstOrderURI = order.uri
610
611 # Pick the first authz's first challenge, doesn't matter what type it is
612 chall_body = order.authorizations[0].body.challenges[0]
613 # Answer it, but with nothing set up to solve the challenge request
614 client.answer_challenge(chall_body, chall_body.response(client.net.key))
615
616 deadline = datetime.datetime.now() + datetime.timedelta(seconds=60)
617 authzFailed = False
618 try:
619 # Poll the order's authorizations until they are non-pending, a timeout
620 # occurs, or there is an invalid authorization status.
621 client.poll_authorizations(order, deadline)
622 except acme_errors.ValidationError as e:
623 # We expect there to be a ValidationError from one of the authorizations
624 # being invalid.
625 authzFailed = True
626
627 # If the poll ended and an authz's status isn't invalid then we reached the
628 # deadline, fail the test
629 if not authzFailed:
630 raise(Exception("timed out waiting for order %s to become invalid" % firstOrderURI))
631
632 # Make another order with the same domains
633 order = client.new_order(csr_pem)
634
635 # It should not be the same order as before
636 if order.uri == firstOrderURI:
637 raise(Exception("new-order for %s returned a , now-invalid, order" % domains))
638
639 # We expect all of the returned authorizations to be pending status
640 for authz in order.authorizations:
641 if authz.body.status != Status("pending"):
642 raise(Exception("order for %s included a non-pending authorization (status: %s) from a previous order" %
643 ((domains), str(authz.body.status))))
644
645 # We expect the new order can be fulfilled
646 cleanup = chisel2.do_http_challenges(client, order.authorizations)
647 try:
648 order = client.poll_and_finalize(order)
649 finally:
650 cleanup()
651
652def test_order_finalize_early():
653 """
654 Test that finalizing an order before its fully authorized results in the
655 order having an error set and the status being invalid.
656 """
657 # Create a client
658 client = chisel2.make_client(None)
659
660 # Create a random domain and a csr
661 domains = [ random_domain() ]
662 csr_pem = chisel2.make_csr(domains)
663
664 # Create an order for the domain
665 order = client.new_order(csr_pem)
666
667 deadline = datetime.datetime.now() + datetime.timedelta(seconds=5)
668
669 # Finalizing an order early should generate an orderNotReady error.
670 chisel2.expect_problem("urn:ietf:params:acme:error:orderNotReady",
671 lambda: client.finalize_order(order, deadline))
672
673def test_revoke_by_account_unspecified():
674 client = chisel2.make_client()
675 cert_file = temppath('test_revoke_by_account_0.pem')
676 order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
677 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
678
679 reset_akamai_purges()
680 client.revoke(josepy.ComparableX509(cert), 0)
681
682 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
683 verify_akamai_purge()
684
685def test_revoke_by_account_with_reason():
686 client = chisel2.make_client(None)
687 cert_file = temppath('test_revoke_by_account_1.pem')
688 order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
689 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
690
691 reset_akamai_purges()
692
693 # Requesting revocation for keyCompromise should work, but not block the
694 # key.
695 client.revoke(josepy.ComparableX509(cert), 1)
696 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "keyCompromise")
697
698 verify_akamai_purge()
699
700def test_revoke_by_authz():
701 domains = [random_domain()]
702 cert_file = temppath('test_revoke_by_authz.pem')
703 order = chisel2.auth_and_issue(domains, cert_output=cert_file.name)
704 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
705
706 # create a new client and re-authz
707 client = chisel2.make_client(None)
708 chisel2.auth_and_issue(domains, client=client)
709
710 reset_akamai_purges()
711
712 # Even though we requested reason 1 ("keyCompromise"), the result should be
713 # 5 ("cessationOfOperation") due to the authorization method.
714 client.revoke(josepy.ComparableX509(cert), 1)
715 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "cessationOfOperation")
716
717 verify_akamai_purge()
718
719def test_revoke_by_privkey():
720 domains = [random_domain()]
721
722 # We have to make our own CSR so that we can hold on to the private key
723 # for revocation later.
724 key = rsa.generate_private_key(65537, 2048, default_backend())
725 key_pem = key.private_bytes(
726 encoding=serialization.Encoding.PEM,
727 format=serialization.PrivateFormat.TraditionalOpenSSL,
728 encryption_algorithm=serialization.NoEncryption()
729 )
730 csr_pem = acme_crypto_util.make_csr(key_pem, domains, False)
731
732 # We have to do our own issuance because we made our own CSR.
733 issue_client = chisel2.make_client(None)
734 order = issue_client.new_order(csr_pem)
735 cleanup = chisel2.do_http_challenges(issue_client, order.authorizations)
736 try:
737 order = issue_client.poll_and_finalize(order)
738 finally:
739 cleanup()
740 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
741
742 cert_file = tempfile.NamedTemporaryFile(
743 dir=tempdir, suffix='.test_revoke_by_privkey.pem',
744 mode='w+', delete=False)
745 cert_file.write(OpenSSL.crypto.dump_certificate(
746 OpenSSL.crypto.FILETYPE_PEM, cert).decode())
747 cert_file.close()
748
749 # Create a new client with the cert key as the account key. We don't
750 # register a server-side account with this client, as we don't need one.
751 revoke_client = chisel2.uninitialized_client(key=josepy.JWKRSA(key=key))
752
753 reset_akamai_purges()
754
755 # Even though we requested reason 0 ("unspecified"), the result should be
756 # 1 ("keyCompromise") due to the authorization method.
757 revoke_client.revoke(josepy.ComparableX509(cert), 0)
758 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "keyCompromise")
759
760 verify_akamai_purge()
761
762def test_double_revocation():
763 domains = [random_domain()]
764
765 # We have to make our own CSR so that we can hold on to the private key
766 # for revocation later.
767 key = rsa.generate_private_key(65537, 2048, default_backend())
768 key_pem = key.private_bytes(
769 encoding=serialization.Encoding.PEM,
770 format=serialization.PrivateFormat.TraditionalOpenSSL,
771 encryption_algorithm=serialization.NoEncryption()
772 )
773 csr_pem = acme_crypto_util.make_csr(key_pem, domains, False)
774
775 # We have to do our own issuance because we made our own CSR.
776 sub_client = chisel2.make_client(None)
777 order = sub_client.new_order(csr_pem)
778 cleanup = chisel2.do_http_challenges(sub_client, order.authorizations)
779 try:
780 order = sub_client.poll_and_finalize(order)
781 finally:
782 cleanup()
783 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
784
785 cert_file = tempfile.NamedTemporaryFile(
786 dir=tempdir, suffix='.test_double_revoke.pem',
787 mode='w+', delete=False)
788 cert_file.write(OpenSSL.crypto.dump_certificate(
789 OpenSSL.crypto.FILETYPE_PEM, cert).decode())
790 cert_file.close()
791
792 # Create a new client with the cert key as the account key. We don't
793 # register a server-side account with this client, as we don't need one.
794 cert_client = chisel2.uninitialized_client(key=josepy.JWKRSA(key=key))
795
796 reset_akamai_purges()
797
798 # First revoke for any reason.
799 sub_client.revoke(josepy.ComparableX509(cert), 0)
800 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
801 verify_akamai_purge()
802
803 # Re-revocation for anything other than keyCompromise should fail.
804 try:
805 sub_client.revoke(josepy.ComparableX509(cert), 3)
806 except messages.Error:
807 pass
808 else:
809 raise(Exception("Re-revoked for a bad reason"))
810
811 # Re-revocation for keyCompromise should work, as long as it is done
812 # via the cert key to demonstrate said compromise.
813 reset_akamai_purges()
814 cert_client.revoke(josepy.ComparableX509(cert), 1)
815 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked", "keyCompromise")
816 verify_akamai_purge()
817
818 # A subsequent attempt should fail, because the cert is already revoked
819 # for keyCompromise.
820 try:
821 cert_client.revoke(josepy.ComparableX509(cert), 1)
822 except messages.Error:
823 pass
824 else:
825 raise(Exception("Re-revoked already keyCompromise'd cert"))
826
827 # The same is true even when using the cert key.
828 try:
829 cert_client.revoke(josepy.ComparableX509(cert), 1)
830 except messages.Error:
831 pass
832 else:
833 raise(Exception("Re-revoked already keyCompromise'd cert"))
834
835def test_sct_embedding():
836 order = chisel2.auth_and_issue([random_domain()])
837 print(order.fullchain_pem.encode())
838 cert = parse_cert(order)
839
840 # make sure there is no poison extension
841 try:
842 cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3"))
843 raise(Exception("certificate contains CT poison extension"))
844 except x509.ExtensionNotFound:
845 # do nothing
846 pass
847
848 # make sure there is a SCT list extension
849 try:
850 sctList = cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"))
851 except x509.ExtensionNotFound:
852 raise(Exception("certificate doesn't contain SCT list extension"))
853 if len(sctList.value) != 2:
854 raise(Exception("SCT list contains wrong number of SCTs"))
855 for sct in sctList.value:
856 if sct.version != x509.certificate_transparency.Version.v1:
857 raise(Exception("SCT contains wrong version"))
858 if sct.entry_type != x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE:
859 raise(Exception("SCT contains wrong entry type"))
860
861def test_only_return_existing_reg():
862 client = chisel2.uninitialized_client()
863 email = "test@not-example.com"
864 client.new_account(messages.NewRegistration.from_data(email=email,
865 terms_of_service_agreed=True))
866
867 client = chisel2.uninitialized_client(key=client.net.key)
868 class extendedAcct(dict):
869 def json_dumps(self, indent=None):
870 return json.dumps(self)
871 acct = extendedAcct({
872 "termsOfServiceAgreed": True,
873 "contact": [email],
874 "onlyReturnExisting": True
875 })
876 resp = client.net.post(client.directory['newAccount'], acct)
877 if resp.status_code != 200:
878 raise(Exception("incorrect response returned for onlyReturnExisting"))
879
880 other_client = chisel2.uninitialized_client()
881 newAcct = extendedAcct({
882 "termsOfServiceAgreed": True,
883 "contact": [email],
884 "onlyReturnExisting": True
885 })
886 chisel2.expect_problem("urn:ietf:params:acme:error:accountDoesNotExist",
887 lambda: other_client.net.post(other_client.directory['newAccount'], newAcct))
888
889def BouncerHTTPRequestHandler(redirect, guestlist):
890 """
891 BouncerHTTPRequestHandler returns a BouncerHandler class that acts like
892 a club bouncer in front of another server. The bouncer will respond to
893 GET requests by looking up the allowed number of requests in the guestlist
894 for the User-Agent making the request. If there is at least one guestlist
895 spot for that UA it will be redirected to the real server and the
896 guestlist will be decremented. Once the guestlist spots for a UA are
897 expended requests will get a bogus result and have to stand outside in the
898 cold
899 """
900 class BouncerHandler(BaseHTTPRequestHandler):
901 def __init__(self, *args, **kwargs):
902 BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
903
904 def do_HEAD(self):
905 # This is used by wait_for_server
906 self.send_response(200)
907 self.end_headers()
908
909 def do_GET(self):
910 ua = self.headers['User-Agent']
911 guestlistAllows = BouncerHandler.guestlist.get(ua, 0)
912 # If there is still space on the guestlist for this UA then redirect
913 # the request and decrement the guestlist.
914 if guestlistAllows > 0:
915 BouncerHandler.guestlist[ua] -= 1
916 self.log_message("BouncerHandler UA {0} is on the Guestlist. {1} requests remaining.".format(ua, BouncerHandler.guestlist[ua]))
917 self.send_response(302)
918 self.send_header("Location", BouncerHandler.redirect)
919 self.end_headers()
920 # Otherwise return a bogus result
921 else:
922 self.log_message("BouncerHandler UA {0} has no requests on the Guestlist. Sending request to the curb".format(ua))
923 self.send_response(200)
924 self.end_headers()
925 self.wfile.write(u"(• ◡ •) <( VIPs only! )".encode())
926
927 BouncerHandler.guestlist = guestlist
928 BouncerHandler.redirect = redirect
929 return BouncerHandler
930
931def wait_for_server(addr):
932 while True:
933 try:
934 # NOTE(@cpu): Using HEAD here instead of GET because the
935 # BouncerHandler modifies its state for GET requests.
936 status = requests.head(addr).status_code
937 if status == 200:
938 return
939 except requests.exceptions.ConnectionError:
940 pass
941 time.sleep(0.5)
942
943def multiva_setup(client, guestlist):
944 """
945 Setup a testing domain and backing multiva server setup. This will block
946 until the server is ready. The returned cleanup function should be used to
947 stop the server. The first bounceFirst requests to the server will be sent
948 to the real challtestsrv for a good answer, the rest will get a bad
949 answer. Domain name is randomly chosen with random_domain().
950 """
951 hostname = random_domain()
952
953 csr_pem = chisel2.make_csr([hostname])
954 order = client.new_order(csr_pem)
955 authz = order.authorizations[0]
956 chall = None
957 for c in authz.body.challenges:
958 if isinstance(c.chall, challenges.HTTP01):
959 chall = c.chall
960 if chall is None:
961 raise(Exception("No HTTP-01 challenge found for random domain authz"))
962
963 token = chall.encode("token")
964
965 # Calculate the challenge's keyauth so we can add a good keyauth response on
966 # the real challtestsrv that we redirect VIP requests to.
967 resp = chall.response(client.net.key)
968 keyauth = resp.key_authorization
969 challSrv.add_http01_response(token, keyauth)
970
971 # Add an A record for the domains to ensure the VA's requests are directed
972 # to the interface that we bound the HTTPServer to.
973 challSrv.add_a_record(hostname, ["10.88.88.88"])
974
975 # Add an A record for the redirect target that sends it to the real chall
976 # test srv for a valid HTTP-01 response.
977 redirHostname = "pebble-challtestsrv.example.com"
978 challSrv.add_a_record(redirHostname, ["10.77.77.77"])
979
980 # Start a simple python HTTP server on port 80 in its own thread.
981 # NOTE(@cpu): The pebble-challtestsrv binds 10.77.77.77:80 for HTTP-01
982 # challenges so we must use the 10.88.88.88 address for the throw away
983 # server for this test and add a mock DNS entry that directs the VA to it.
984 redirect = "http://{0}/.well-known/acme-challenge/{1}".format(
985 redirHostname, token)
986 httpd = HTTPServer(("10.88.88.88", 80), BouncerHTTPRequestHandler(redirect, guestlist))
987 thread = threading.Thread(target = httpd.serve_forever)
988 thread.daemon = False
989 thread.start()
990
991 def cleanup():
992 # Remove the challtestsrv mocks
993 challSrv.remove_a_record(hostname)
994 challSrv.remove_a_record(redirHostname)
995 challSrv.remove_http01_response(token)
996 # Shut down the HTTP server gracefully and join on its thread.
997 httpd.shutdown()
998 httpd.server_close()
999 thread.join()
1000
1001 return hostname, cleanup
1002
1003def test_http_multiva_threshold_pass():
1004 client = chisel2.make_client()
1005
1006 # Configure a guestlist that will pass the multiVA threshold test by
1007 # allowing the primary VA and one remote.
1008 guestlist = {"boulder": 1, "boulder-remote-b": 1}
1009
1010 hostname, cleanup = multiva_setup(client, guestlist)
1011
1012 try:
1013 # With the maximum number of allowed remote VA failures the overall
1014 # challenge should still succeed.
1015 chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
1016 finally:
1017 cleanup()
1018
1019def test_http_multiva_primary_fail_remote_pass():
1020 client = chisel2.make_client()
1021
1022 # Configure a guestlist that will fail the primary VA check but allow the
1023 # remote VAs
1024 guestlist = {"boulder": 0, "boulder-remote-a": 1, "boulder-remote-b": 1}
1025
1026 hostname, cleanup = multiva_setup(client, guestlist)
1027
1028 foundException = False
1029
1030 try:
1031 # The overall validation should fail even if the remotes are allowed
1032 # because the primary VA result cannot be overridden.
1033 chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
1034 except acme_errors.ValidationError as e:
1035 # NOTE(@cpu): Chisel2's expect_problem doesn't work in this case so this
1036 # test needs to unpack an `acme_errors.ValidationError` on its own. It
1037 # might be possible to clean this up in the future.
1038 if len(e.failed_authzrs) != 1:
1039 raise(Exception("expected one failed authz, found {0}".format(len(e.failed_authzrs))))
1040 challs = e.failed_authzrs[0].body.challenges
1041 httpChall = None
1042 for chall_body in challs:
1043 if isinstance(chall_body.chall, challenges.HTTP01):
1044 httpChall = chall_body
1045 if httpChall is None:
1046 raise(Exception("no HTTP-01 challenge in failed authz"))
1047 if httpChall.error.typ != "urn:ietf:params:acme:error:unauthorized":
1048 raise(Exception("expected unauthorized prob, found {0}".format(httpChall.error.typ)))
1049 foundException = True
1050 finally:
1051 cleanup()
1052 if foundException is False:
1053 raise(Exception("Overall validation did not fail"))
1054
1055def test_http_multiva_threshold_fail():
1056 client = chisel2.make_client()
1057
1058 # Configure a guestlist that will fail the multiVA threshold test by
1059 # only allowing the primary VA.
1060 guestlist = {"boulder": 1}
1061
1062 hostname, cleanup = multiva_setup(client, guestlist)
1063
1064 failed_authzrs = []
1065 try:
1066 chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
1067 except acme_errors.ValidationError as e:
1068 # NOTE(@cpu): Chisel2's expect_problem doesn't work in this case so this
1069 # test needs to unpack an `acme_errors.ValidationError` on its own. It
1070 # might be possible to clean this up in the future.
1071 failed_authzrs = e.failed_authzrs
1072 finally:
1073 cleanup()
1074 if len(failed_authzrs) != 1:
1075 raise(Exception("expected one failed authz, found {0}".format(len(failed_authzrs))))
1076 challs = failed_authzrs[0].body.challenges
1077 httpChall = None
1078 for chall_body in challs:
1079 if isinstance(chall_body.chall, challenges.HTTP01):
1080 httpChall = chall_body
1081 if httpChall is None:
1082 raise(Exception("no HTTP-01 challenge in failed authz"))
1083 if httpChall.error.typ != "urn:ietf:params:acme:error:unauthorized":
1084 raise(Exception("expected unauthorized prob, found {0}".format(httpChall.error.typ)))
1085 if not httpChall.error.detail.startswith("During secondary validation: "):
1086 raise(Exception("expected 'During secondary validation' problem detail, found {0}".format(httpChall.error.detail)))
1087
1088class FakeH2ServerHandler(socketserver.BaseRequestHandler):
1089 """
1090 FakeH2ServerHandler is a TCP socket handler that writes data representing an
1091 initial HTTP/2 SETTINGS frame as a response to all received data.
1092 """
1093 def handle(self):
1094 # Read whatever the HTTP request was so that the response isn't seen as
1095 # unsolicited.
1096 self.data = self.request.recv(1024).strip()
1097 # Blast some HTTP/2 bytes onto the socket
1098 # Truncated example data from taken from the community forum:
1099 # https://community.letsencrypt.org/t/le-validation-error-if-server-is-in-google-infrastructure/51841
1100 self.request.sendall(b"\x00\x00\x12\x04\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x80\x00")
1101
1102def wait_for_tcp_server(addr, port):
1103 """
1104 wait_for_tcp_server attempts to make a TCP connection to the given
1105 address/port every 0.5s until it succeeds.
1106 """
1107 while True:
1108 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1109 try:
1110 sock.connect((addr, port))
1111 sock.sendall(b"\n")
1112 return
1113 except socket.error:
1114 time.sleep(0.5)
1115 pass
1116
1117def test_http2_http01_challenge():
1118 """
1119 test_http2_http01_challenge tests that an HTTP-01 challenge made to a HTTP/2
1120 server fails with a specific error message for this case.
1121 """
1122 client = chisel2.make_client()
1123 hostname = "fake.h2.example.com"
1124
1125 # Add an A record for the test server to ensure the VA's requests are directed
1126 # to the interface that we bind the FakeH2ServerHandler to.
1127 challSrv.add_a_record(hostname, ["10.88.88.88"])
1128
1129 # Allow socket address reuse on the base TCPServer class. Failing to do this
1130 # causes subsequent integration tests to fail with "Address in use" errors even
1131 # though this test _does_ call shutdown() and server_close(). Even though the
1132 # server was shut-down Python's socket will be in TIME_WAIT because of prev. client
1133 # connections. Having the TCPServer set SO_REUSEADDR on the socket solves
1134 # the problem.
1135 socketserver.TCPServer.allow_reuse_address = True
1136 # Create, start, and wait for a fake HTTP/2 server.
1137 server = socketserver.TCPServer(("10.88.88.88", 80), FakeH2ServerHandler)
1138 thread = threading.Thread(target = server.serve_forever)
1139 thread.daemon = False
1140 thread.start()
1141 wait_for_tcp_server("10.88.88.88", 80)
1142
1143 # Issuing an HTTP-01 challenge for this hostname should produce a connection
1144 # problem with an error specific to the HTTP/2 misconfiguration.
1145 expectedError = "Server is speaking HTTP/2 over HTTP"
1146 try:
1147 chisel2.auth_and_issue([hostname], client=client, chall_type="http-01")
1148 except acme_errors.ValidationError as e:
1149 for authzr in e.failed_authzrs:
1150 c = chisel2.get_chall(authzr, challenges.HTTP01)
1151 error = c.error
1152 if error is None or error.typ != "urn:ietf:params:acme:error:connection":
1153 raise(Exception("Expected connection prob, got %s" % (error.__str__())))
1154 if not error.detail.endswith(expectedError):
1155 raise(Exception("Expected prob detail ending in %s, got %s" % (expectedError, error.detail)))
1156 finally:
1157 server.shutdown()
1158 server.server_close()
1159 thread.join()
1160
1161def test_new_order_policy_errs():
1162 """
1163 Test that creating an order with policy blocked identifiers returns
1164 a problem with subproblems.
1165 """
1166 client = chisel2.make_client(None)
1167
1168 # 'in-addr.arpa' is present in `test/hostname-policy.yaml`'s
1169 # HighRiskBlockedNames list.
1170 csr_pem = chisel2.make_csr(["out-addr.in-addr.arpa", "between-addr.in-addr.arpa"])
1171
1172 # With two policy blocked names in the order we expect to get back a top
1173 # level rejectedIdentifier with a detail message that references
1174 # subproblems.
1175 #
1176 # TODO(@cpu): After https://github.com/certbot/certbot/issues/7046 is
1177 # implemented in the upstream `acme` module this test should also ensure the
1178 # subproblems are properly represented.
1179 ok = False
1180 try:
1181 order = client.new_order(csr_pem)
1182 except messages.Error as e:
1183 ok = True
1184 if e.typ != "urn:ietf:params:acme:error:rejectedIdentifier":
1185 raise(Exception("Expected rejectedIdentifier type problem, got {0}".format(e.typ)))
1186 if e.detail != 'Error creating new order :: Cannot issue for "between-addr.in-addr.arpa": The ACME server refuses to issue a certificate for this domain name, because it is forbidden by policy (and 1 more problems. Refer to sub-problems for more information.)':
1187 raise(Exception("Order problem detail did not match expected"))
1188 if not ok:
1189 raise(Exception("Expected problem, got no error"))
1190
1191def test_long_san_no_cn():
1192 if CONFIG_NEXT:
1193 return
1194 try:
1195 chisel2.auth_and_issue(["".join(random.choice(string.ascii_uppercase) for x in range(61)) + ".com"])
1196 # if we get to this raise the auth_and_issue call didn't fail, so fail the test
1197 raise(Exception("Issuance didn't fail when the only SAN in a certificate was longer than the max CN length"))
1198 except messages.Error as e:
1199 if e.typ != "urn:ietf:params:acme:error:rejectedIdentifier":
1200 raise(Exception("Expected malformed type problem, got {0}".format(e.typ)))
1201 if e.detail != "NewOrder request did not include a SAN short enough to fit in CN":
1202 raise(Exception("Problem detail did not match expected"))
1203
1204def test_delete_unused_challenges():
1205 order = chisel2.auth_and_issue([random_domain()], chall_type="dns-01")
1206 a = order.authorizations[0]
1207 if len(a.body.challenges) != 1:
1208 raise(Exception("too many challenges (%d) left after validation" % len(a.body.challenges)))
1209 if not isinstance(a.body.challenges[0].chall, challenges.DNS01):
1210 raise(Exception("wrong challenge type left after validation"))
1211
1212 # intentionally fail a challenge
1213 client = chisel2.make_client()
1214 csr_pem = chisel2.make_csr([random_domain()])
1215 order = client.new_order(csr_pem)
1216 c = chisel2.get_chall(order.authorizations[0], challenges.DNS01)
1217 client.answer_challenge(c, c.response(client.net.key))
1218 for _ in range(5):
1219 a, _ = client.poll(order.authorizations[0])
1220 if a.body.status == Status("invalid"):
1221 break
1222 time.sleep(1)
1223 if len(a.body.challenges) != 1:
1224 raise(Exception("too many challenges (%d) left after failed validation" %
1225 len(a.body.challenges)))
1226 if not isinstance(a.body.challenges[0].chall, challenges.DNS01):
1227 raise(Exception("wrong challenge type left after validation"))
1228
1229def test_auth_deactivation_v2():
1230 client = chisel2.make_client(None)
1231 csr_pem = chisel2.make_csr([random_domain()])
1232 order = client.new_order(csr_pem)
1233 resp = client.deactivate_authorization(order.authorizations[0])
1234 if resp.body.status is not messages.STATUS_DEACTIVATED:
1235 raise(Exception("unexpected authorization status"))
1236
1237 order = chisel2.auth_and_issue([random_domain()], client=client)
1238 resp = client.deactivate_authorization(order.authorizations[0])
1239 if resp.body.status is not messages.STATUS_DEACTIVATED:
1240 raise(Exception("unexpected authorization status"))
1241
1242def test_ocsp():
1243 cert_file = temppath('test_ocsp.pem')
1244 chisel2.auth_and_issue([random_domain()], cert_output=cert_file.name)
1245 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "good")
1246
1247def test_ct_submission():
1248 hostname = random_domain()
1249
1250 chisel2.auth_and_issue([hostname])
1251
1252 # These should correspond to the configured logs in ra.json.
1253 log_groups = [
1254 ["http://boulder.service.consul:4600/submissions", "http://boulder.service.consul:4601/submissions", "http://boulder.service.consul:4602/submissions", "http://boulder.service.consul:4603/submissions"],
1255 ["http://boulder.service.consul:4604/submissions", "http://boulder.service.consul:4605/submissions"],
1256 ["http://boulder.service.consul:4606/submissions"],
1257 ["http://boulder.service.consul:4607/submissions"],
1258 ["http://boulder.service.consul:4608/submissions"],
1259 ["http://boulder.service.consul:4609/submissions"],
1260 ]
1261
1262 # These should correspond to the logs with `submitFinal` in ra.json.
1263 final_logs = [
1264 "http://boulder.service.consul:4600/submissions",
1265 "http://boulder.service.consul:4601/submissions",
1266 "http://boulder.service.consul:4606/submissions",
1267 "http://boulder.service.consul:4609/submissions",
1268 ]
1269
1270 # We'd like to enforce strict limits here (exactly 1 submission per group,
1271 # exactly two submissions overall) but the async nature of the race system
1272 # means we can't -- a slowish submission to one log in a group could trigger
1273 # a very fast submission to a different log in the same group, and then both
1274 # submissions could succeed at the same time. Although the Go code will only
1275 # use one of the SCTs, both logs will still have been submitted to, and it
1276 # will show up here.
1277 total_count = 0
1278 for i in range(len(log_groups)):
1279 group_count = 0
1280 for j in range(len(log_groups[i])):
1281 log = log_groups[i][j]
1282 count = int(requests.get(log + "?hostnames=%s" % hostname).text)
1283 threshold = 1
1284 if log in final_logs:
1285 threshold += 1
1286 if count > threshold:
1287 raise(Exception("Got %d submissions for log %s, expected at most %d" % (count, log, threshold)))
1288 group_count += count
1289 total_count += group_count
1290 if total_count < 2:
1291 raise(Exception("Got %d total submissions, expected at least 2" % total_count))
1292
1293def check_ocsp_basic_oid(cert_file, issuer_file, url):
1294 """
1295 This function checks if an OCSP response was successful, but doesn't verify
1296 the signature or timestamp. This is useful when simulating the past, so we
1297 don't incorrectly reject a response for being in the past.
1298 """
1299 ocsp_request = make_ocsp_req(cert_file, issuer_file)
1300 responses = fetch_ocsp(ocsp_request, url)
1301 # An unauthorized response (for instance, if the OCSP responder doesn't know
1302 # about this cert) will just be 30 03 0A 01 06. A "good" or "revoked"
1303 # response will contain, among other things, the id-pkix-ocsp-basic OID
1304 # identifying the response type. We look for that OID to confirm we got a
1305 # successful response.
1306 expected = bytearray.fromhex("06 09 2B 06 01 05 05 07 30 01 01")
1307 for resp in responses:
1308 if not expected in bytearray(resp):
1309 raise(Exception("Did not receive successful OCSP response: %s doesn't contain %s" %
1310 (base64.b64encode(resp), base64.b64encode(expected))))
1311
1312ocsp_exp_unauth_setup_data = {}
1313@register_six_months_ago
1314def ocsp_exp_unauth_setup():
1315 client = chisel2.make_client(None)
1316 cert_file = temppath('ocsp_exp_unauth_setup.pem')
1317 order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
1318 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
1319
1320 # Since our servers are pretending to be in the past, but the openssl cli
1321 # isn't, we'll get an expired OCSP response. Just check that it exists;
1322 # don't do the full verification (which would fail).
1323 check_ocsp_basic_oid(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002")
1324 global ocsp_exp_unauth_setup_data
1325 ocsp_exp_unauth_setup_data['cert_file'] = cert_file.name
1326
1327def test_ocsp_exp_unauth():
1328 tries = 0
1329 if 'cert_file' not in ocsp_exp_unauth_setup_data:
1330 raise Exception("ocsp_exp_unauth_setup didn't run")
1331 cert_file = ocsp_exp_unauth_setup_data['cert_file']
1332 last_error = ""
1333 while tries < 5:
1334 try:
1335 verify_ocsp(cert_file, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "XXX")
1336 raise(Exception("Unexpected return from verify_ocsp"))
1337 except subprocess.CalledProcessError as cpe:
1338 last_error = cpe.output
1339 if cpe.output == b"Responder Error: unauthorized (6)\n":
1340 break
1341 except e:
1342 last_error = e
1343 pass
1344 tries += 1
1345 time.sleep(0.25)
1346 else:
1347 raise(Exception("timed out waiting for unauthorized OCSP response for expired certificate. Last error: {}".format(last_error)))
1348
1349def test_blocked_key_account():
1350 # Only config-next has a blocked keys file configured.
1351 if not CONFIG_NEXT:
1352 return
1353
1354 with open("test/test-ca.key", "rb") as key_file:
1355 key = serialization.load_pem_private_key(key_file.read(), password=None, backend=default_backend())
1356
1357 # Create a client with the JWK set to a blocked private key
1358 jwk = josepy.JWKRSA(key=key)
1359 client = chisel2.uninitialized_client(jwk)
1360 email = "test@not-example.com"
1361
1362 # Try to create an account
1363 testPass = False
1364 try:
1365 client.new_account(messages.NewRegistration.from_data(email=email,
1366 terms_of_service_agreed=True))
1367 except acme_errors.Error as e:
1368 if e.typ != "urn:ietf:params:acme:error:badPublicKey":
1369 raise(Exception("problem did not have correct error type, had {0}".format(e.typ)))
1370 if e.detail != "public key is forbidden":
1371 raise(Exception("problem did not have correct error detail, had {0}".format(e.detail)))
1372 testPass = True
1373
1374 if testPass is False:
1375 raise(Exception("expected account creation to fail with Error when using blocked key"))
1376
1377def test_blocked_key_cert():
1378 # Only config-next has a blocked keys file configured.
1379 if not CONFIG_NEXT:
1380 return
1381
1382 with open("test/test-ca.key", "r") as f:
1383 pemBytes = f.read()
1384
1385 domains = [random_domain(), random_domain()]
1386 csr = acme_crypto_util.make_csr(pemBytes, domains, False)
1387
1388 client = chisel2.make_client(None)
1389 order = client.new_order(csr)
1390 authzs = order.authorizations
1391
1392 testPass = False
1393 cleanup = chisel2.do_http_challenges(client, authzs)
1394 try:
1395 order = client.poll_and_finalize(order)
1396 except acme_errors.Error as e:
1397 if e.typ != "urn:ietf:params:acme:error:badCSR":
1398 raise(Exception("problem did not have correct error type, had {0}".format(e.typ)))
1399 if e.detail != "Error finalizing order :: invalid public key in CSR: public key is forbidden":
1400 raise(Exception("problem did not have correct error detail, had {0}".format(e.detail)))
1401 testPass = True
1402
1403 if testPass is False:
1404 raise(Exception("expected cert creation to fail with Error when using blocked key"))
1405
1406def test_expiration_mailer():
1407 email_addr = "integration.%x@letsencrypt.org" % random.randrange(2**16)
1408 order = chisel2.auth_and_issue([random_domain()], email=email_addr)
1409 cert = parse_cert(order)
1410 # Check that the expiration mailer sends a reminder
1411 expiry = cert.not_valid_after
1412 no_reminder = expiry + datetime.timedelta(days=-31)
1413 first_reminder = expiry + datetime.timedelta(days=-13)
1414 last_reminder = expiry + datetime.timedelta(days=-2)
1415
1416 requests.post("http://localhost:9381/clear", data='')
1417 for time in (no_reminder, first_reminder, last_reminder):
1418 print(get_future_output(
1419 ["./bin/boulder", "expiration-mailer", "--config", "%s/expiration-mailer.json" % config_dir],
1420 time))
1421 resp = requests.get("http://localhost:9381/count?to=%s" % email_addr)
1422 mailcount = int(resp.text)
1423 if mailcount != 2:
1424 raise(Exception("\nExpiry mailer failed: expected 2 emails, got %d" % mailcount))
1425
1426caa_recheck_setup_data = {}
1427@register_twenty_days_ago
1428def caa_recheck_setup():
1429 client = chisel2.make_client()
1430 # Issue a certificate with the clock set back, and save the authzs to check
1431 # later that they are valid (200). They should however require rechecking for
1432 # CAA purposes.
1433 numNames = 10
1434 # Generate numNames subdomains of a random domain
1435 base_domain = random_domain()
1436 domains = [ "{0}.{1}".format(str(n),base_domain) for n in range(numNames) ]
1437 order = chisel2.auth_and_issue(domains, client=client)
1438
1439 global caa_recheck_setup_data
1440 caa_recheck_setup_data = {
1441 'client': client,
1442 'authzs': order.authorizations,
1443 }
1444
1445def test_recheck_caa():
1446 """Request issuance for a domain where we have a old cached authz from when CAA
1447 was good. We'll set a new CAA record forbidding issuance; the CAA should
1448 recheck CAA and reject the request.
1449 """
1450 if 'authzs' not in caa_recheck_setup_data:
1451 raise(Exception("CAA authzs not prepared for test_caa"))
1452 domains = []
1453 for a in caa_recheck_setup_data['authzs']:
1454 response = caa_recheck_setup_data['client']._post(a.uri, None)
1455 if response.status_code != 200:
1456 raise(Exception("Unexpected response for CAA authz: ",
1457 response.status_code))
1458 domain = a.body.identifier.value
1459 domains.append(domain)
1460
1461 # Set a forbidding CAA record on just one domain
1462 challSrv.add_caa_issue(domains[3], ";")
1463
1464 # Request issuance for the previously-issued domain name, which should
1465 # now be denied due to CAA.
1466 chisel2.expect_problem("urn:ietf:params:acme:error:caa",
1467 lambda: chisel2.auth_and_issue(domains, client=caa_recheck_setup_data['client']))
1468
1469def test_caa_good():
1470 domain = random_domain()
1471 challSrv.add_caa_issue(domain, "happy-hacker-ca.invalid")
1472 chisel2.auth_and_issue([domain])
1473
1474def test_caa_reject():
1475 domain = random_domain()
1476 challSrv.add_caa_issue(domain, "sad-hacker-ca.invalid")
1477 chisel2.expect_problem("urn:ietf:params:acme:error:caa",
1478 lambda: chisel2.auth_and_issue([domain]))
1479
1480def test_caa_extensions():
1481 goodCAA = "happy-hacker-ca.invalid"
1482
1483 client = chisel2.make_client()
1484 caa_account_uri = client.net.account.uri
1485 caa_records = [
1486 {"domain": "accounturi.good-caa-reserved.com", "value":"{0}; accounturi={1}".format(goodCAA, caa_account_uri)},
1487 {"domain": "dns-01-only.good-caa-reserved.com", "value": "{0}; validationmethods=dns-01".format(goodCAA)},
1488 {"domain": "http-01-only.good-caa-reserved.com", "value": "{0}; validationmethods=http-01".format(goodCAA)},
1489 {"domain": "dns-01-or-http01.good-caa-reserved.com", "value": "{0}; validationmethods=dns-01,http-01".format(goodCAA)},
1490 ]
1491 for policy in caa_records:
1492 challSrv.add_caa_issue(policy["domain"], policy["value"])
1493
1494 chisel2.expect_problem("urn:ietf:params:acme:error:caa",
1495 lambda: chisel2.auth_and_issue(["dns-01-only.good-caa-reserved.com"], chall_type="http-01"))
1496
1497 chisel2.expect_problem("urn:ietf:params:acme:error:caa",
1498 lambda: chisel2.auth_and_issue(["http-01-only.good-caa-reserved.com"], chall_type="dns-01"))
1499
1500 ## Note: the additional names are to avoid rate limiting...
1501 chisel2.auth_and_issue(["dns-01-only.good-caa-reserved.com", "www.dns-01-only.good-caa-reserved.com"], chall_type="dns-01")
1502 chisel2.auth_and_issue(["http-01-only.good-caa-reserved.com", "www.http-01-only.good-caa-reserved.com"], chall_type="http-01")
1503 chisel2.auth_and_issue(["dns-01-or-http-01.good-caa-reserved.com", "dns-01-only.good-caa-reserved.com"], chall_type="dns-01")
1504 chisel2.auth_and_issue(["dns-01-or-http-01.good-caa-reserved.com", "http-01-only.good-caa-reserved.com"], chall_type="http-01")
1505
1506 ## CAA should fail with an arbitrary account, but succeed with the CAA client.
1507 chisel2.expect_problem("urn:ietf:params:acme:error:caa", lambda: chisel2.auth_and_issue(["accounturi.good-caa-reserved.com"]))
1508 chisel2.auth_and_issue(["accounturi.good-caa-reserved.com"], client=client)
1509
1510def test_new_account():
1511 """
1512 Test creating new accounts with no email, empty email, one email, and a
1513 tuple of multiple emails.
1514 """
1515 for contact in (None, (), ("mailto:single@chisel.com",), ("mailto:one@chisel.com", "mailto:two@chisel.com")):
1516 # We don't use `chisel2.make_client` or `messages.NewRegistration.from_data`
1517 # here because they do too much client-side processing to make the
1518 # contact addresses look "nice".
1519 client = chisel2.uninitialized_client()
1520 result = client.new_account(messages.NewRegistration(contact=contact, terms_of_service_agreed=True))
1521 actual = result.body.contact
1522 if contact is not None and contact != actual:
1523 raise(Exception("New Account failed: expected contact %s, got %s" % (contact, actual)))
1524
1525def test_account_update():
1526 """
1527 Create a new ACME client/account with one contact email. Then update the
1528 account to a different contact emails.
1529 """
1530 for contact in (None, (), ("mailto:single@chisel.com",), ("mailto:one@chisel.com", "mailto:two@chisel.com")):
1531 # We don't use `chisel2.update_email` or `messages.NewRegistration.from_data`
1532 # here because they do too much client-side processing to make the
1533 # contact addresses look "nice".
1534 print()
1535 client = chisel2.make_client()
1536 update = client.net.account.update(body=client.net.account.body.update(contact=contact))
1537 result = client.update_registration(update)
1538 actual = result.body.contact
1539 if contact is not None and contact != actual:
1540 raise(Exception("New Account failed: expected contact %s, got %s" % (contact, actual)))
1541
1542def test_renewal_exemption():
1543 """
1544 Under a single domain, issue two certificates for different subdomains of
1545 the same name, then renewals of each of them. Since the certificatesPerName
1546 rate limit in testing is 2 per 90 days, and the renewals should not be
1547 counted under the renewal exemption, each of these issuances should succeed.
1548 Then do one last issuance (for a third subdomain of the same name) that we
1549 expect to be rate limited, just to check that the rate limit is actually 2,
1550 and we are testing what we think we are testing. See
1551 https://letsencrypt.org/docs/rate-limits/ for more details.
1552 """
1553 base_domain = random_domain()
1554 # First issuance
1555 chisel2.auth_and_issue(["www." + base_domain])
1556 # First Renewal
1557 chisel2.auth_and_issue(["www." + base_domain])
1558 # Issuance of a different cert
1559 chisel2.auth_and_issue(["blog." + base_domain])
1560 # Renew that one
1561 chisel2.auth_and_issue(["blog." + base_domain])
1562 # Final, failed issuance, for another different cert
1563 chisel2.expect_problem("urn:ietf:params:acme:error:rateLimited",
1564 lambda: chisel2.auth_and_issue(["mail." + base_domain]))
1565
1566def test_certificates_per_name():
1567 chisel2.expect_problem("urn:ietf:params:acme:error:rateLimited",
1568 lambda: chisel2.auth_and_issue([random_domain() + ".lim.it"]))
1569
1570def test_oversized_csr():
1571 # Number of names is chosen to be one greater than the configured RA/CA maxNames
1572 numNames = 101
1573 # Generate numNames subdomains of a random domain
1574 base_domain = random_domain()
1575 domains = [ "{0}.{1}".format(str(n),base_domain) for n in range(numNames) ]
1576 # We expect issuing for these domains to produce a malformed error because
1577 # there are too many names in the request.
1578 chisel2.expect_problem("urn:ietf:params:acme:error:malformed",
1579 lambda: chisel2.auth_and_issue(domains))
1580
1581def parse_cert(order):
1582 return x509.load_pem_x509_certificate(order.fullchain_pem.encode(), default_backend())
1583
1584def test_admin_revoker_cert():
1585 cert_file = temppath('test_admin_revoker_cert.pem')
1586 order = chisel2.auth_and_issue([random_domain()], cert_output=cert_file.name)
1587 parsed_cert = parse_cert(order)
1588
1589 # Revoke certificate by serial
1590 reset_akamai_purges()
1591 run(["./bin/boulder", "admin-revoker", "serial-revoke",
1592 "--config", "%s/admin-revoker.json" % config_dir,
1593 '%x' % parsed_cert.serial_number, '1'])
1594
1595 # Wait for OCSP response to indicate revocation took place
1596 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
1597 verify_akamai_purge()
1598
1599def test_admin_revoker_batched():
1600 serialFile = tempfile.NamedTemporaryFile(
1601 dir=tempdir, suffix='.test_admin_revoker_batched.serials.hex',
1602 mode='w+', delete=False)
1603 cert_files = [
1604 temppath('test_admin_revoker_batched.%d.pem' % x) for x in range(3)
1605 ]
1606
1607 for cert_file in cert_files:
1608 order = chisel2.auth_and_issue([random_domain()], cert_output=cert_file.name)
1609 serialFile.write("%x\n" % parse_cert(order).serial_number)
1610 serialFile.close()
1611
1612 run(["./bin/boulder", "admin-revoker", "batched-serial-revoke",
1613 "--config", "%s/admin-revoker.json" % config_dir,
1614 serialFile.name, '0', '2'])
1615
1616 for cert_file in cert_files:
1617 verify_ocsp(cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
1618
1619def test_sct_embedding():
1620 order = chisel2.auth_and_issue([random_domain()])
1621 cert = parse_cert(order)
1622
1623 # make sure there is no poison extension
1624 try:
1625 cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.3"))
1626 raise(Exception("certificate contains CT poison extension"))
1627 except x509.ExtensionNotFound:
1628 # do nothing
1629 pass
1630
1631 # make sure there is a SCT list extension
1632 try:
1633 sctList = cert.extensions.get_extension_for_oid(x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.2"))
1634 except x509.ExtensionNotFound:
1635 raise(Exception("certificate doesn't contain SCT list extension"))
1636 if len(sctList.value) != 2:
1637 raise(Exception("SCT list contains wrong number of SCTs"))
1638 for sct in sctList.value:
1639 if sct.version != x509.certificate_transparency.Version.v1:
1640 raise(Exception("SCT contains wrong version"))
1641 if sct.entry_type != x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE:
1642 raise(Exception("SCT contains wrong entry type"))
1643 delta = sct.timestamp - datetime.datetime.now()
1644 if abs(delta) > datetime.timedelta(hours=1):
1645 raise(Exception("Delta between SCT timestamp and now was too great "
1646 "%s vs %s (%s)" % (sct.timestamp, datetime.datetime.now(), delta)))
1647
1648def test_auth_deactivation():
1649 client = chisel2.make_client(None)
1650 d = random_domain()
1651 csr_pem = chisel2.make_csr([d])
1652 order = client.new_order(csr_pem)
1653
1654 resp = client.deactivate_authorization(order.authorizations[0])
1655 if resp.body.status is not messages.STATUS_DEACTIVATED:
1656 raise Exception("unexpected authorization status")
1657
1658 order = chisel2.auth_and_issue([random_domain()], client=client)
1659 resp = client.deactivate_authorization(order.authorizations[0])
1660 if resp.body.status is not messages.STATUS_DEACTIVATED:
1661 raise Exception("unexpected authorization status")
1662
1663def get_ocsp_response_and_reason(cert_file, issuer_file, url):
1664 """Returns the ocsp response output and revocation reason."""
1665 output = verify_ocsp(cert_file, issuer_file, url, None)
1666 m = re.search('Reason: (\w+)', output)
1667 reason = m.group(1) if m is not None else ""
1668 return output, reason
1669
1670ocsp_resigning_setup_data = {}
1671@register_twenty_days_ago
1672def ocsp_resigning_setup():
1673 """Issue and then revoke a cert in the past.
1674
1675 Useful setup for test_ocsp_resigning, which needs to check that the
1676 revocation reason is still correctly set after re-signing and old OCSP
1677 response.
1678 """
1679 client = chisel2.make_client(None)
1680 cert_file = temppath('ocsp_resigning_setup.pem')
1681 order = chisel2.auth_and_issue([random_domain()], client=client, cert_output=cert_file.name)
1682
1683 cert = OpenSSL.crypto.load_certificate(
1684 OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
1685 # Revoke for reason 5: cessationOfOperation
1686 client.revoke(josepy.ComparableX509(cert), 5)
1687
1688 ocsp_response, reason = get_ocsp_response_and_reason(
1689 cert_file.name, "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002")
1690 global ocsp_resigning_setup_data
1691 ocsp_resigning_setup_data = {
1692 'cert_file': cert_file.name,
1693 'response': ocsp_response,
1694 'reason': reason
1695 }
1696
1697def test_ocsp_resigning():
1698 """Check that, after re-signing an OCSP, the reason is still set."""
1699 if 'response' not in ocsp_resigning_setup_data:
1700 raise Exception("ocsp_resigning_setup didn't run")
1701
1702 tries = 0
1703 while tries < 5:
1704 resp, reason = get_ocsp_response_and_reason(
1705 ocsp_resigning_setup_data['cert_file'], "/hierarchy/intermediate-cert-rsa-a.pem", "http://localhost:4002")
1706 if resp != ocsp_resigning_setup_data['response']:
1707 break
1708 tries += 1
1709 time.sleep(0.25)
1710 else:
1711 raise(Exception("timed out waiting for re-signed OCSP response for certificate"))
1712
1713 if reason != ocsp_resigning_setup_data['reason']:
1714 raise(Exception("re-signed ocsp response has different reason %s expected %s" % (
1715 reason, ocsp_resigning_setup_data['reason'])))
1716 if reason != "cessationOfOperation":
1717 raise(Exception("re-signed ocsp response has wrong reason %s" % reason))
View as plain text