Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions redis/ocsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ def _check_certificate(issuer_cert, ocsp_bytes, validate=True):
raise AuthorizationError("you are not authorized to view this ocsp certificate")
if ocsp_response.response_status == ocsp.OCSPResponseStatus.SUCCESSFUL:
if ocsp_response.certificate_status != ocsp.OCSPCertStatus.GOOD:
return False
raise ConnectionError(
f'Received an {str(ocsp_response.certificate_status).split(".")[1]} '
"ocsp certificate status"
)
else:
return False
raise ConnectionError(
"failed to retrieve a sucessful response from the ocsp responder"
)

if ocsp_response.this_update >= datetime.datetime.now():
raise ConnectionError("ocsp certificate was issued in the future")
Expand Down
10 changes: 7 additions & 3 deletions tests/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_ssl_ocsp_called_withcrypto(self, request):
def test_valid_ocsp_cert_http(self):
from redis.ocsp import OCSPVerifier

hostnames = ["github.com", "aws.amazon.com", "ynet.co.il", "microsoft.com"]
hostnames = ["github.com", "aws.amazon.com", "ynet.co.il"]
for hostname in hostnames:
context = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
Expand All @@ -124,7 +124,9 @@ def test_revoked_ocsp_certificate(self):
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
assert ocsp.is_valid() is False
with pytest.raises(ConnectionError) as e:
assert ocsp.is_valid()
assert "REVOKED" in str(e)

@skip_if_nocryptography()
def test_unauthorized_ocsp(self):
Expand All @@ -147,7 +149,9 @@ def test_ocsp_not_present_in_response(self):
with socket.create_connection((hostname, 443)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as wrapped:
ocsp = OCSPVerifier(wrapped, hostname, 443)
assert ocsp.is_valid() is False
with pytest.raises(ConnectionError) as e:
assert ocsp.is_valid()
assert "from the" in str(e)

@skip_if_nocryptography()
def test_unauthorized_then_direct(self):
Expand Down