Skip to content

Commit f39a11d

Browse files
committed
first python API proposition
first round-trip tests feat: made asn1 structures readable refacto: adapted existing functions accordingly feat/pkcs12: added symmetric_decrypt feat: deserialize 3 possible encodings feat: handling AES-128 feat: raise error when no recipient is found feat/pkcs7: added decanonicalize function feat/asn1: added decode_der_data feat/pkcs7: added smime_enveloped_decode tests are the round-trip (encrypt & decrypt) more tests for 100% python coverage test support pkcs7_encrypt with openssl added algorithm to pkcs7_encrypt signature refacto: decrypt function is clearer flow is more natural refacto: added all rust error tests refacto: added another CA chain for checking fix: const handling Refactor PKCS7Decryptor to pkcs7_decrypt refacto: removed SMIME_ENVELOPED_DECODE from rust code refacto: removed decode_der_data adapted tests accordingly removed the PEM tag check added tests for smime_decnonicalize one more test case Update src/rust/src/pkcs7.rs Co-authored-by: Alex Gaynor <[email protected]> took comments into account pem to der is now outside of decrypt fix: removed test_support pkcs7_encrypt added vector for aes_256_cbc encrypted pkcs7 feat: not using test_support decrypt anymore added new vectors for PKCS7 tests feat: using pkcs7 vectors removed previous ones fix: changed wrong function feat: added certificate issuer check test: generating the RSA chain removed the vectors accordingly moved symmetric_decrypt to pkcs7.rs
1 parent 4c72f36 commit f39a11d

File tree

6 files changed

+521
-98
lines changed

6 files changed

+521
-98
lines changed

src/cryptography/hazmat/bindings/_rust/pkcs7.pyi

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import typing
66

77
from cryptography import x509
88
from cryptography.hazmat.primitives import serialization
9+
from cryptography.hazmat.primitives.asymmetric import rsa
910
from cryptography.hazmat.primitives.serialization import pkcs7
1011

1112
def serialize_certificates(
@@ -22,6 +23,13 @@ def sign_and_serialize(
2223
encoding: serialization.Encoding,
2324
options: typing.Iterable[pkcs7.PKCS7Options],
2425
) -> bytes: ...
26+
def pem_to_der(data: bytes) -> bytes: ...
27+
def deserialize_and_decrypt(
28+
data: bytes,
29+
certificate: x509.Certificate,
30+
private_key: rsa.RSAPrivateKey,
31+
options: typing.Iterable[pkcs7.PKCS7Options],
32+
) -> bytes: ...
2533
def load_pem_pkcs7_certificates(
2634
data: bytes,
2735
) -> list[x509.Certificate]: ...

src/cryptography/hazmat/bindings/_rust/test_support.pyi

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@ class TestCertificate:
1313
subject_value_tags: list[int]
1414

1515
def test_parse_certificate(data: bytes) -> TestCertificate: ...
16-
def pkcs7_decrypt(
17-
encoding: serialization.Encoding,
18-
msg: bytes,
19-
pkey: serialization.pkcs7.PKCS7PrivateKeyTypes,
20-
cert_recipient: x509.Certificate,
21-
options: list[pkcs7.PKCS7Options],
22-
) -> bytes: ...
2316
def pkcs7_verify(
2417
encoding: serialization.Encoding,
2518
sig: bytes,

src/cryptography/hazmat/primitives/serialization/pkcs7.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,81 @@ def encrypt(
263263
return rust_pkcs7.encrypt_and_serialize(self, encoding, options)
264264

265265

266+
def pkcs7_decrypt_der(
267+
data: bytes,
268+
certificate: x509.Certificate,
269+
private_key: rsa.RSAPrivateKey,
270+
options: typing.Iterable[PKCS7Options],
271+
) -> bytes:
272+
return _pkcs7_decrypt(data, certificate, private_key, options)
273+
274+
275+
def pkcs7_decrypt_pem(
276+
data: bytes,
277+
certificate: x509.Certificate,
278+
private_key: rsa.RSAPrivateKey,
279+
options: typing.Iterable[PKCS7Options],
280+
) -> bytes:
281+
data = rust_pkcs7.pem_to_der(data)
282+
return _pkcs7_decrypt(data, certificate, private_key, options)
283+
284+
285+
def pkcs7_decrypt_smime(
286+
data: bytes,
287+
certificate: x509.Certificate,
288+
private_key: rsa.RSAPrivateKey,
289+
options: typing.Iterable[PKCS7Options],
290+
) -> bytes:
291+
data = _smime_enveloped_decode(data)
292+
return _pkcs7_decrypt(data, certificate, private_key, options)
293+
294+
295+
def _pkcs7_decrypt(
296+
data: bytes,
297+
certificate: x509.Certificate,
298+
private_key: rsa.RSAPrivateKey,
299+
options: typing.Iterable[PKCS7Options],
300+
) -> bytes:
301+
from cryptography.hazmat.backends.openssl.backend import (
302+
backend as ossl,
303+
)
304+
305+
if not ossl.rsa_encryption_supported(padding=padding.PKCS1v15()):
306+
raise UnsupportedAlgorithm(
307+
"RSA with PKCS1 v1.5 padding is not supported by this version"
308+
" of OpenSSL.",
309+
_Reasons.UNSUPPORTED_PADDING,
310+
)
311+
312+
options = list(options)
313+
if not all(isinstance(x, PKCS7Options) for x in options):
314+
raise ValueError("options must be from the PKCS7Options enum")
315+
if any(
316+
opt not in [PKCS7Options.Text, PKCS7Options.Binary] for opt in options
317+
):
318+
raise ValueError(
319+
"Only the following options are supported for encryption: "
320+
"Text, Binary"
321+
)
322+
elif PKCS7Options.Text in options and PKCS7Options.Binary in options:
323+
# OpenSSL accepts both options at the same time, but ignores Text.
324+
# We fail defensively to avoid unexpected outputs.
325+
raise ValueError("Cannot use Binary and Text options at the same time")
326+
327+
if not isinstance(certificate, x509.Certificate):
328+
raise TypeError("certificate must be a x509.Certificate")
329+
330+
if not isinstance(certificate.public_key(), rsa.RSAPublicKey):
331+
raise TypeError("Only RSA keys are supported at this time.")
332+
333+
if not isinstance(private_key, rsa.RSAPrivateKey):
334+
raise TypeError("Only RSA private keys are supported at this time.")
335+
336+
return rust_pkcs7.deserialize_and_decrypt(
337+
data, certificate, private_key, options
338+
)
339+
340+
266341
def _smime_signed_encode(
267342
data: bytes, signature: bytes, micalg: str, text_mode: bool
268343
) -> bytes:
@@ -328,6 +403,16 @@ def _smime_enveloped_encode(data: bytes) -> bytes:
328403
return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0))
329404

330405

406+
def _smime_enveloped_decode(data: bytes) -> bytes:
407+
m = email.message_from_bytes(data)
408+
if m.get_content_type() not in {
409+
"application/x-pkcs7-mime",
410+
"application/pkcs7-mime",
411+
}:
412+
raise ValueError("Not an S/MIME enveloped message")
413+
return bytes(m.get_payload(decode=True))
414+
415+
331416
class OpenSSLMimePart(email.message.MIMEPart):
332417
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
333418
# a newline if there are no headers.

src/rust/src/pkcs7.rs

Lines changed: 210 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ use openssl::pkcs7::Pkcs7;
1616
use pyo3::types::{PyAnyMethods, PyBytesMethods, PyListMethods};
1717

1818
use crate::asn1::encode_der_data;
19+
use crate::backend::ciphers;
1920
use crate::buf::CffiBuf;
2021
use crate::error::{CryptographyError, CryptographyResult};
22+
use crate::padding::PKCS7UnpaddingContext;
2123
use crate::pkcs12::symmetric_encrypt;
2224
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
2325
use crate::x509::certificate::load_der_x509_certificate;
24-
use crate::{exceptions, types, x509};
26+
use crate::{backend, exceptions, types, x509};
2527

2628
const PKCS7_CONTENT_TYPE_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 3);
2729
const PKCS7_MESSAGE_DIGEST_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 4);
@@ -164,6 +166,153 @@ fn encrypt_and_serialize<'p>(
164166
}
165167
}
166168

169+
#[pyo3::pyfunction]
170+
fn pem_to_der<'p>(
171+
py: pyo3::Python<'p>,
172+
data: CffiBuf<'p>,
173+
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
174+
let pem_str = std::str::from_utf8(data.as_bytes())
175+
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Invalid PEM data"))?;
176+
let pem = pem::parse(pem_str)
177+
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Failed to parse PEM data"))?;
178+
179+
Ok(pyo3::types::PyBytes::new_bound(py, &pem.into_contents()))
180+
}
181+
182+
#[pyo3::pyfunction]
183+
fn deserialize_and_decrypt<'p>(
184+
py: pyo3::Python<'p>,
185+
data: CffiBuf<'p>,
186+
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
187+
private_key: pyo3::Bound<'p, backend::rsa::RsaPrivateKey>,
188+
options: &pyo3::Bound<'p, pyo3::types::PyList>,
189+
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
190+
// Deserialize the content info
191+
let content_info = asn1::parse_single::<pkcs7::ContentInfo<'_>>(data.as_bytes()).unwrap();
192+
let plain_content = match content_info.content {
193+
pkcs7::Content::EnvelopedData(data) => {
194+
// Extract enveloped data
195+
let enveloped_data = data.into_inner();
196+
197+
// Get recipients, and the one matching with the given certificate (if any)
198+
let mut recipient_infos = enveloped_data.recipient_infos.unwrap_read().clone();
199+
let recipient_certificate = certificate.get().raw.borrow_dependent();
200+
let recipient_serial_number = recipient_certificate.tbs_cert.serial;
201+
let recipient_issuer = recipient_certificate.tbs_cert.issuer.clone();
202+
let found_recipient_info = recipient_infos.find(|info| {
203+
info.issuer_and_serial_number.serial_number == recipient_serial_number
204+
&& info.issuer_and_serial_number.issuer == recipient_issuer
205+
});
206+
207+
// Raise error when no recipient is found
208+
// Unsure if this is the right exception to raise
209+
let recipient_info = match found_recipient_info {
210+
Some(info) => info,
211+
None => {
212+
return Err(CryptographyError::from(
213+
exceptions::AttributeNotFound::new_err((
214+
"No recipient found that matches the given certificate.",
215+
exceptions::Reasons::UNSUPPORTED_X509,
216+
)),
217+
));
218+
}
219+
};
220+
221+
// Decrypt the key using the private key
222+
let padding = types::PKCS1V15.get(py)?.call0()?;
223+
let key = private_key
224+
.call_method1(
225+
pyo3::intern!(py, "decrypt"),
226+
(recipient_info.encrypted_key, &padding),
227+
)?
228+
.extract::<pyo3::pybacked::PyBackedBytes>()?;
229+
230+
// Get algorithm
231+
// TODO: implement all the possible algorithms
232+
let algorithm_identifier = enveloped_data
233+
.encrypted_content_info
234+
.content_encryption_algorithm;
235+
let (algorithm, mode) = match algorithm_identifier.params {
236+
AlgorithmParameters::Aes128Cbc(iv) => (
237+
types::AES128.get(py)?.call1((key,))?,
238+
types::CBC
239+
.get(py)?
240+
.call1((pyo3::types::PyBytes::new_bound(py, &iv),))?,
241+
),
242+
_ => {
243+
return Err(CryptographyError::from(
244+
exceptions::UnsupportedAlgorithm::new_err((
245+
"Only AES-128-CBC is currently supported for decryption.",
246+
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
247+
)),
248+
));
249+
}
250+
};
251+
252+
// Decrypt the content using the key and proper algorithm
253+
let encrypted_content = enveloped_data
254+
.encrypted_content_info
255+
.encrypted_content
256+
.unwrap();
257+
let decrypted_content = symmetric_decrypt(py, algorithm, mode, encrypted_content)?;
258+
pyo3::types::PyBytes::new_bound(py, decrypted_content.as_slice())
259+
}
260+
_ => {
261+
return Err(CryptographyError::from(
262+
exceptions::UnsupportedAlgorithm::new_err((
263+
"Only EnvelopedData structures are currently supported.",
264+
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
265+
)),
266+
));
267+
}
268+
};
269+
270+
// Return content in different form, based on options
271+
let text_mode = options.contains(types::PKCS7_TEXT.get(py)?)?;
272+
let plain_data = if options.contains(types::PKCS7_BINARY.get(py)?)? {
273+
plain_content
274+
} else {
275+
let decanonicalized = smime_decanonicalize(plain_content.as_bytes(), text_mode);
276+
pyo3::types::PyBytes::new_bound(py, decanonicalized.into_owned().as_slice())
277+
};
278+
279+
Ok(plain_data)
280+
}
281+
282+
pub(crate) fn symmetric_decrypt(
283+
py: pyo3::Python<'_>,
284+
algorithm: pyo3::Bound<'_, pyo3::PyAny>,
285+
mode: pyo3::Bound<'_, pyo3::PyAny>,
286+
data: &[u8],
287+
) -> CryptographyResult<Vec<u8>> {
288+
let block_size = algorithm
289+
.getattr(pyo3::intern!(py, "block_size"))?
290+
.extract()?;
291+
292+
let mut cipher =
293+
ciphers::CipherContext::new(py, algorithm, mode, openssl::symm::Mode::Decrypt)?;
294+
295+
// Decrypt the data
296+
let mut decrypted_data = vec![0; data.len() + (block_size / 8)];
297+
let count = cipher.update_into(py, data, &mut decrypted_data)?;
298+
let final_block = cipher.finalize(py)?;
299+
assert!(final_block.as_bytes().is_empty());
300+
decrypted_data.truncate(count);
301+
302+
// Unpad the data
303+
let mut unpadder = PKCS7UnpaddingContext::new(block_size);
304+
let unpadded_first_blocks = unpadder.update(py, CffiBuf::from_bytes(py, &decrypted_data))?;
305+
let unpadded_last_block = unpadder.finalize(py)?;
306+
307+
let unpadded_data = [
308+
unpadded_first_blocks.as_bytes(),
309+
unpadded_last_block.as_bytes(),
310+
]
311+
.concat();
312+
313+
Ok(unpadded_data)
314+
}
315+
167316
#[pyo3::pyfunction]
168317
fn sign_and_serialize<'p>(
169318
py: pyo3::Python<'p>,
@@ -415,6 +564,39 @@ fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [
415564
}
416565
}
417566

567+
fn smime_decanonicalize(data: &[u8], text_mode: bool) -> Cow<'_, [u8]> {
568+
let mut new_data = vec![];
569+
let mut last_idx = 0;
570+
571+
// Remove the header if text_mode is true
572+
let data = if text_mode {
573+
let header = b"Content-Type: text/plain\r\n\r\n";
574+
if data.starts_with(header) {
575+
&data[header.len()..]
576+
} else {
577+
data
578+
}
579+
} else {
580+
data
581+
};
582+
583+
// Remove the \r in the data
584+
for (i, c) in data.iter().copied().enumerate() {
585+
if c == b'\n' && i > 0 && data[i - 1] == b'\r' {
586+
new_data.extend_from_slice(&data[last_idx..i - 1]);
587+
new_data.push(b'\n');
588+
last_idx = i + 1;
589+
}
590+
}
591+
592+
// Copy the remaining data
593+
if last_idx < data.len() {
594+
new_data.extend_from_slice(&data[last_idx..]);
595+
}
596+
597+
Cow::Owned(new_data)
598+
}
599+
418600
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
419601
fn load_pkcs7_certificates(
420602
py: pyo3::Python<'_>,
@@ -507,8 +689,8 @@ fn load_der_pkcs7_certificates<'p>(
507689
pub(crate) mod pkcs7_mod {
508690
#[pymodule_export]
509691
use super::{
510-
encrypt_and_serialize, load_der_pkcs7_certificates, load_pem_pkcs7_certificates,
511-
serialize_certificates, sign_and_serialize,
692+
deserialize_and_decrypt, encrypt_and_serialize, load_der_pkcs7_certificates,
693+
load_pem_pkcs7_certificates, pem_to_der, serialize_certificates, sign_and_serialize,
512694
};
513695
}
514696

@@ -517,7 +699,7 @@ mod tests {
517699
use std::borrow::Cow;
518700
use std::ops::Deref;
519701

520-
use super::smime_canonicalize;
702+
use super::{smime_canonicalize, smime_decanonicalize};
521703

522704
#[test]
523705
fn test_smime_canonicalize() {
@@ -577,4 +759,28 @@ mod tests {
577759
);
578760
}
579761
}
762+
763+
#[test]
764+
fn test_smime_decanonicalize() {
765+
for (input, text_mode, expected_output) in [
766+
// Values with text_mode=false
767+
(b"" as &[u8], false, b"" as &[u8]),
768+
(b"abc\r\n", false, b"abc\n"),
769+
(b"\r\nabc\n", false, b"\nabc\n"),
770+
(b"abc\r\ndef\r\n", false, b"abc\ndef\n"),
771+
(b"abc\r\ndef\nabc", false, b"abc\ndef\nabc"),
772+
// Values with text_mode=true
773+
(b"abc\r\n", true, b"abc\n"),
774+
(b"Content-Type: text/plain\r\n\r\n", true, b""),
775+
(b"Content-Type: text/plain\r\n\r\nabc", true, b"abc"),
776+
(
777+
b"Content-Type: text/plain\r\n\r\nabc\r\ndef\nabc",
778+
true,
779+
b"abc\ndef\nabc",
780+
),
781+
] {
782+
let result = smime_decanonicalize(input, text_mode);
783+
assert_eq!(result.deref(), expected_output);
784+
}
785+
}
580786
}

0 commit comments

Comments
 (0)