diff --git a/django_x509/base/models.py b/django_x509/base/models.py index 4d68e51..0bdcbfb 100644 --- a/django_x509/base/models.py +++ b/django_x509/base/models.py @@ -2,8 +2,33 @@ import uuid from datetime import datetime, timedelta -import OpenSSL import swapper +from cryptography import x509 +from cryptography.exceptions import InvalidKey, UnsupportedAlgorithm +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.hazmat.primitives.serialization import ( + BestAvailableEncryption, + NoEncryption, +) +from cryptography.x509 import ( + AuthorityKeyIdentifier, + BasicConstraints, + CertificateBuilder, + CertificateRevocationListBuilder, + Extension, + ExtensionNotFound, + ExtensionOID, + ExtensionType, + KeyUsage, + Name, + NameAttribute, + NameOID, + RevokedCertificateBuilder, + SubjectKeyIdentifier, +) +from cryptography.x509.oid import CRLEntryExtensionOID, ExtensionOID, NameOID from django.core.exceptions import ValidationError from django.db import models from django.utils import timezone @@ -12,7 +37,6 @@ from django.utils.translation import gettext_lazy as _ from jsonfield import JSONField from model_utils.fields import AutoCreatedField, AutoLastModifiedField -from OpenSSL import crypto from .. import settings as app_settings @@ -210,31 +234,48 @@ def save(self, *args, **kwargs): @cached_property def x509(self): """ - returns an instance of OpenSSL.crypto.X509 + Returns an instance of cryptography.x509.Certificate """ if self.certificate: - return crypto.load_certificate(crypto.FILETYPE_PEM, self.certificate) + return x509.load_pem_x509_certificate( + self.certificate.encode('utf-8'), backend=default_backend() + ) @cached_property def x509_text(self): """ - returns a text dump of the information + Returns a text dump of the information contained in the x509 certificate """ - if self.certificate: - text = crypto.dump_certificate(crypto.FILETYPE_TEXT, self.x509) - return text.decode('utf-8') + if not self.certificate: + return None + + cert = self.x509 + lines = [ + f'Subject: {cert.subject.rfc4514_string()}', + f'Issuer: {cert.issuer.rfc4514_string()}', + f'Serial Number: {cert.serial_number}', + f'Not Before: {cert.not_valid_before}', + f'Not After: {cert.not_valid_after}', + f'Signature Algorithm: {cert.signature_hash_algorithm.name}', + 'Extensions:', + ] + + for ext in cert.extensions: + lines.append(f' - {ext.oid._name or ext.oid.dotted_string}: {ext.value}') + + return '\n'.join(lines) @cached_property def pkey(self): """ - returns an instance of OpenSSL.crypto.PKey + Returns an instance of cryptography private key """ if self.private_key: - return crypto.load_privatekey( - crypto.FILETYPE_PEM, - self.private_key, - passphrase=getattr(self, 'passphrase').encode('utf-8'), + return serialization.load_pem_private_key( + self.private_key.encode('utf-8'), + password=getattr(self, 'passphrase', None).encode('utf-8') or None, + backend=default_backend(), ) def _validate_pem(self): @@ -243,24 +284,29 @@ def _validate_pem(self): validates certificate and private key """ errors = {} - for field in ['certificate', 'private_key']: - method_name = 'load_{0}'.format(field.replace('_', '')) - load_pem = getattr(crypto, method_name) - try: - args = (crypto.FILETYPE_PEM, getattr(self, field)) - kwargs = {} - if method_name == 'load_privatekey': - kwargs['passphrase'] = getattr(self, 'passphrase').encode('utf8') - load_pem(*args, **kwargs) - except OpenSSL.crypto.Error as e: - error = 'OpenSSL error:
{0}'.format( - str(e.args[0]).replace('), ', '),
').strip('[]') - ) - if 'bad decrypt' in error: - error = 'Incorrect Passphrase
' + error - errors['passphrase'] = ValidationError(_(mark_safe(error))) - continue - errors[field] = ValidationError(_(mark_safe(error))) + # Validate certificate + try: + x509.load_pem_x509_certificate(self.certificate.encode('utf-8')) + except Exception as e: + errors['certificate'] = ValidationError( + _(mark_safe(f'Certificate error:
{str(e)}')) + ) + + # Validate private key + try: + serialization.load_pem_private_key( + self.private_key.encode('utf-8'), + password=getattr(self, 'passphrase', None).encode('utf-8') + if getattr(self, 'passphrase', None) + else None, + ) + except ValueError as e: + error = f'Incorrect Passphrase
{str(e)}' + errors['passphrase'] = ValidationError(_(mark_safe(error))) + except (InvalidKey, UnsupportedAlgorithm) as e: + errors['private_key'] = ValidationError( + _(mark_safe(f'Private key error:
{str(e)}')) + ) if errors: raise ValidationError(errors) @@ -281,61 +327,74 @@ def _generate(self): (internal use only) generates a new x509 certificate (CA or end-entity) """ - key = crypto.PKey() - key.generate_key(crypto.TYPE_RSA, int(self.key_length)) - cert = crypto.X509() - subject = self._fill_subject(cert.get_subject()) - cert.set_version(0x2) # version 3 (0 indexed counting) - cert.set_subject(subject) - cert.set_serial_number(int(self.serial_number)) - cert.set_notBefore(bytes(str(datetime_to_string(self.validity_start)), 'utf8')) - cert.set_notAfter(bytes(str(datetime_to_string(self.validity_end)), 'utf8')) - # generating certificate for CA + # Generate RSA private key + key = rsa.generate_private_key( + public_exponent=65537, key_size=int(self.key_length) + ) + public_key = key.public_key() + + # Fill subject and issuer + subject = self._fill_subject() if not hasattr(self, 'ca'): - issuer = cert.get_subject() + issuer = subject issuer_key = key - # generating certificate issued by a CA else: - issuer = self.ca.x509.get_subject() + issuer = self.ca.x509.issuer issuer_key = self.ca.pkey - cert.set_issuer(issuer) - cert.set_pubkey(key) - cert = self._add_extensions(cert) - cert.sign(issuer_key, str(self.digest)) - self.certificate = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode( - 'utf-8' + + # Build certificate + builder = x509.CertificateBuilder() + builder = builder.subject_name(subject) + builder = builder.issuer_name(issuer) + builder = builder.public_key(public_key) + builder = builder.serial_number(int(self.serial_number)) + builder = builder.not_valid_before(self.validity_start) + builder = builder.not_valid_after(self.validity_end) + builder = self._build_extensions( + builder, public_key, self.ca.x509 if hasattr(self, 'ca') else None ) - key_args = (crypto.FILETYPE_PEM, key) - key_kwargs = {} - if self.passphrase: - key_kwargs['passphrase'] = self.passphrase.encode('utf-8') - key_kwargs['cipher'] = 'DES-EDE3-CBC' - self.private_key = crypto.dump_privatekey(*key_args, **key_kwargs).decode( - 'utf-8' + + # Sign certificate + cert = builder.sign( + private_key=issuer_key, algorithm=getattr(hashes, self.digest.upper())() ) - def _fill_subject(self, subject): + # Store PEM-encoded certificate + self.certificate = cert.public_bytes(serialization.Encoding.PEM).decode('utf-8') + + # Store PEM-encoded private key + if self.passphrase: + encryption = BestAvailableEncryption(self.passphrase.encode('utf-8')) + else: + encryption = NoEncryption() + self.private_key = key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=encryption, + ).decode('utf-8') + + def _fill_subject(self): """ (internal use only) - fills OpenSSL.crypto.X509Name object + returns a cryptography.x509.Name object """ attr_map = { - 'country_code': 'countryName', - 'state': 'stateOrProvinceName', - 'city': 'localityName', - 'organization_name': 'organizationName', - 'organizational_unit_name': 'organizationalUnitName', - 'email': 'emailAddress', - 'common_name': 'commonName', + 'country_code': NameOID.COUNTRY_NAME, + 'state': NameOID.STATE_OR_PROVINCE_NAME, + 'city': NameOID.LOCALITY_NAME, + 'organization_name': NameOID.ORGANIZATION_NAME, + 'organizational_unit_name': NameOID.ORGANIZATIONAL_UNIT_NAME, + 'email': NameOID.EMAIL_ADDRESS, + 'common_name': NameOID.COMMON_NAME, } - # set x509 subject attributes only if not empty strings - for model_attr, subject_attr in attr_map.items(): + + name_attributes = [] + for model_attr, oid in attr_map.items(): value = getattr(self, model_attr) if value: - # coerce value to string, allow these fields to be redefined - # as foreign keys by subclasses without losing compatibility - setattr(subject, subject_attr, str(value)) - return subject + name_attributes.append(x509.NameAttribute(oid, str(value))) + + return x509.Name(name_attributes) def _import(self): """ @@ -346,30 +405,41 @@ def _import(self): # when importing an end entity certificate if hasattr(self, 'ca'): self._verify_ca() - self.key_length = str(cert.get_pubkey().bits()) - # this line might fail if a certificate with - # an unsupported signature algorithm is imported - algorithm = cert.get_signature_algorithm().decode('utf8') - self.digest = SIGNATURE_MAPPING[algorithm] - not_before = cert.get_notBefore().decode('utf8') - self.validity_start = datetime.strptime(not_before, generalized_time) - self.validity_start = timezone.make_aware(self.validity_start) - not_after = cert.get_notAfter().decode('utf8') - self.validity_end = datetime.strptime(not_after, generalized_time) - self.validity_end.replace(tzinfo=timezone.tzinfo()) - self.validity_end = timezone.make_aware(self.validity_end) - subject = cert.get_subject() - self.country_code = subject.countryName or '' - # allow importing from legacy systems which use invalid country codes + + # Use cryptography to extract the public key and key length + public_key = cert.public_key() # This gets the public key from the certificate + if isinstance(public_key, rsa.RSAPublicKey): + self.key_length = public_key.key_size + else: + raise ValueError("Unsupported key type") + + # The signature algorithm handling remains the same + algorithm = cert.signature_algorithm_oid._name # Use the cryptography API for signature algorithm + self.digest = SIGNATURE_MAPPING.get(algorithm) + + # Handle validity period + not_before = cert.not_valid_before + self.validity_start = timezone.make_aware(not_before) + not_after = cert.not_valid_after + self.validity_end = timezone.make_aware(not_after) + + def get_attr_for_oid(attr): + attr_list = subject.get_attributes_for_oid(attr) + return attr_list[0].value if attr_list else '' + + # Extract subject details + subject = cert.subject + self.country_code = get_attr_for_oid(NameOID.COUNTRY_NAME) if len(self.country_code) > 2: self.country_code = '' - self.state = subject.stateOrProvinceName or '' - self.city = subject.localityName or '' - self.organization_name = subject.organizationName or '' - self.organizational_unit_name = subject.organizationalUnitName or '' - self.email = subject.emailAddress or '' - self.common_name = subject.commonName or '' - self.serial_number = cert.get_serial_number() + self.state = get_attr_for_oid(NameOID.STATE_OR_PROVINCE_NAME) + self.city = get_attr_for_oid(NameOID.LOCALITY_NAME) + self.organization_name = get_attr_for_oid(NameOID.ORGANIZATION_NAME) + self.organizational_unit_name = get_attr_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME) + self.email = get_attr_for_oid(NameOID.EMAIL_ADDRESS) + self.common_name = get_attr_for_oid(NameOID.COMMON_NAME) + self.serial_number = cert.serial_number + if not self.name: self.name = self.common_name or str(self.serial_number) @@ -379,15 +449,19 @@ def _verify_ca(self): verifies the current x509 is signed by the associated CA """ - store = crypto.X509Store() - store.add_cert(self.ca.x509) - store_ctx = crypto.X509StoreContext(store, self.x509) + cert = self.x509 + ca_cert = self.ca.x509 + try: - store_ctx.verify_certificate() - except crypto.X509StoreContextError as e: + ca_cert.public_key().verify( + cert.signature, + cert.tbs_certificate_bytes, + padding.PKCS1v15(), + cert.signature_hash_algorithm, + ) + except Exception as e: raise ValidationError( - _("CA doesn't match, got the " 'following error from pyOpenSSL: "%s"') - % e.args[0][2] + _("CA doesn't match, certificate verification failed: ā€œ%sā€") % str(e) ) def _verify_extension_format(self): @@ -404,71 +478,77 @@ def _verify_extension_format(self): if not ('name' in ext and 'critical' in ext and 'value' in ext): raise ValidationError(msg) - def _add_extensions(self, cert): + def _build_extensions(self, cert_builder, public_key, issuer_cert=None): """ (internal use only) - adds x509 extensions to ``cert`` + returns a list of extensions to be + added to the certificate builder """ - ext = [] - # prepare extensions for CA - if not hasattr(self, 'ca'): - pathlen = app_settings.CA_BASIC_CONSTRAINTS_PATHLEN - ext_value = 'CA:TRUE' - if pathlen is not None: - ext_value = '{0}, pathlen:{1}'.format(ext_value, pathlen) - ext.append( - crypto.X509Extension( - b'basicConstraints', - app_settings.CA_BASIC_CONSTRAINTS_CRITICAL, - bytes(str(ext_value), 'utf8'), - ) - ) - ext.append( - crypto.X509Extension( - b'keyUsage', - app_settings.CA_KEYUSAGE_CRITICAL, - bytes(str(app_settings.CA_KEYUSAGE_VALUE), 'utf8'), - ) - ) - issuer_cert = cert - # prepare extensions for end-entity certs - else: - ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE')) - ext.append( - crypto.X509Extension( - b'keyUsage', - app_settings.CERT_KEYUSAGE_CRITICAL, - bytes(str(app_settings.CERT_KEYUSAGE_VALUE), 'utf8'), - ) - ) - issuer_cert = self.ca.x509 - ext.append( - crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert) + extensions = [] + + # CA or EE? + is_ca = not hasattr(self, 'ca') + pathlen = app_settings.CA_BASIC_CONSTRAINTS_PATHLEN if is_ca else None + + # basicConstraints + basic_constraints = BasicConstraints(ca=is_ca, path_length=pathlen) + critical = app_settings.CA_BASIC_CONSTRAINTS_CRITICAL if is_ca else False + extensions.append((basic_constraints, critical)) + + # keyUsage + key_usage_value = ( + app_settings.CA_KEYUSAGE_VALUE + if is_ca + else app_settings.CERT_KEYUSAGE_VALUE ) - cert.add_extensions(ext) - # authorityKeyIdentifier must be added after - # the other extensions have been already added - cert.add_extensions( - [ - crypto.X509Extension( - b'authorityKeyIdentifier', - False, - b'keyid:always,issuer:always', - issuer=issuer_cert, - ) - ] + key_usage_critical = ( + app_settings.CA_KEYUSAGE_CRITICAL + if is_ca + else app_settings.CERT_KEYUSAGE_CRITICAL + ) + + key_usage = KeyUsage( + digital_signature='digitalSignature' in key_usage_value, + key_encipherment='keyEncipherment' in key_usage_value, + content_commitment='nonRepudiation' in key_usage_value, + data_encipherment='dataEncipherment' in key_usage_value, + key_agreement='keyAgreement' in key_usage_value, + key_cert_sign='keyCertSign' in key_usage_value, + crl_sign='cRLSign' in key_usage_value, + encipher_only=False, + decipher_only=False, ) + extensions.append((key_usage, key_usage_critical)) + + # subjectKeyIdentifier + ski = SubjectKeyIdentifier.from_public_key(public_key) + extensions.append((ski, False)) + + # authorityKeyIdentifier + if issuer_cert: + authority_key_id = AuthorityKeyIdentifier.from_issuer_public_key( + issuer_cert.public_key() + ) + extensions.append((authority_key_id, False)) + + # Custom extensions defined in self.extensions for ext in self.extensions: - cert.add_extensions( - [ - crypto.X509Extension( - bytes(str(ext['name']), 'utf8'), - bool(ext['critical']), - bytes(str(ext['value']), 'utf8'), - ) - ] + ext_oid = ExtensionOID._map.get(ext['name']) or ext['name'] + extensions.append( + ( + x509.UnrecognizedExtension( + x509.ObjectIdentifier(ext_oid), + str(ext['value']).encode('utf-8'), + ), + bool(ext['critical']), + ) ) - return cert + + # Add to the builder + for ext_value, is_critical in extensions: + cert_builder = cert_builder.add_extension(ext_value, critical=is_critical) + + return cert_builder def renew(self): self._generate() @@ -512,18 +592,29 @@ def renew(self): @property def crl(self): """ - Returns up to date CRL of this CA + Returns up-to-date CRL of this CA using cryptography """ - revoked_certs = self.get_revoked_certs() - crl = crypto.CRL() - now_str = datetime_to_string(timezone.now()) - for cert in revoked_certs: - revoked = crypto.Revoked() - revoked.set_serial(bytes(str(cert.serial_number), 'utf8')) - revoked.set_reason(b'unspecified') - revoked.set_rev_date(bytes(str(now_str), 'utf8')) - crl.add_revoked(revoked) - return crl.export(self.x509, self.pkey, days=1, digest=b'sha256') + now = timezone.now() + builder = CertificateRevocationListBuilder() + builder = builder.issuer_name(self.x509.subject) + builder = builder.last_update(now) + builder = builder.next_update(now + timedelta(days=1)) + + for cert in self.get_revoked_certs(): + revoked_cert = ( + RevokedCertificateBuilder() + .serial_number(int(cert.serial_number)) + .revocation_date(now) + .add_extension( + x509.CRLReason(x509.ReasonFlags.unspecified), + critical=False, + ) + .build() + ) + builder = builder.add_revoked_certificate(revoked_cert) + + crl = builder.sign(private_key=self.pkey, algorithm=hashes.SHA256()) + return crl.public_bytes(serialization.Encoding.PEM).decode("utf-8") AbstractCa._meta.get_field('validity_end').default = default_ca_validity_end diff --git a/django_x509/tests/test_ca.py b/django_x509/tests/test_ca.py index 92c8c5e..9005b62 100644 --- a/django_x509/tests/test_ca.py +++ b/django_x509/tests/test_ca.py @@ -1,10 +1,22 @@ from datetime import datetime, timedelta +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509 import ( + CertificateBuilder, + ExtensionOID, + load_pem_x509_certificate, + load_pem_x509_crl, +) +from cryptography.x509.extensions import AuthorityKeyIdentifier, SubjectKeyIdentifier +from cryptography.x509.name import NameAttribute +from cryptography.x509.oid import NameOID from django.core.exceptions import ValidationError from django.test import TestCase from django.urls import reverse from django.utils import timezone -from OpenSSL import crypto from swapper import load_model from .. import settings as app_settings @@ -22,8 +34,8 @@ class TestCa(TestX509Mixin, TestCase): def _prepare_revoked(self): ca = self._create_ca() - crl = crypto.load_crl(crypto.FILETYPE_PEM, ca.crl) - self.assertIsNone(crl.get_revoked()) + crl = load_pem_x509_crl(ca.crl.encode('utf-8')) + self.assertIsNone(crl.revoked_certificates) cert = self._create_cert(ca=ca) cert.revoke() return (ca, cert) @@ -60,42 +72,56 @@ def test_new(self): ca = self._create_ca() self.assertNotEqual(ca.certificate, '') self.assertNotEqual(ca.private_key, '') - cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca.certificate) - self.assertEqual(int(cert.get_serial_number()), int(ca.serial_number)) - subject = cert.get_subject() - self.assertEqual(subject.countryName, ca.country_code) - self.assertEqual(subject.stateOrProvinceName, ca.state) - self.assertEqual(subject.localityName, ca.city) - self.assertEqual(subject.organizationName, ca.organization_name) - self.assertEqual(subject.emailAddress, ca.email) - self.assertEqual(subject.commonName, ca.common_name) - issuer = cert.get_issuer() - self.assertEqual(issuer.countryName, ca.country_code) - self.assertEqual(issuer.stateOrProvinceName, ca.state) - self.assertEqual(issuer.localityName, ca.city) - self.assertEqual(issuer.organizationName, ca.organization_name) - self.assertEqual(issuer.emailAddress, ca.email) - self.assertEqual(issuer.commonName, ca.common_name) - # ensure version is 3 - self.assertEqual(cert.get_version(), 2) - # basic constraints - e = cert.get_extension(0) - self.assertEqual(e.get_critical(), 1) - self.assertEqual(e.get_short_name().decode(), 'basicConstraints') - self.assertEqual(e.get_data(), b'0\x06\x01\x01\xff\x02\x01\x00') + + cert = x509.load_pem_x509_certificate( + ca.certificate.encode('utf-8'), default_backend() + ) + self.assertEqual(cert.serial_number, int(ca.serial_number)) + + subject = cert.subject + issuer = cert.issuer + + def get_attr(name, oid): + return subject.get_attributes_for_oid(oid)[0].value + + self.assertEqual(get_attr('countryName', NameOID.COUNTRY_NAME), ca.country_code) + self.assertEqual( + get_attr('stateOrProvinceName', NameOID.STATE_OR_PROVINCE_NAME), ca.state + ) + self.assertEqual(get_attr('localityName', NameOID.LOCALITY_NAME), ca.city) + self.assertEqual( + get_attr('organizationName', NameOID.ORGANIZATION_NAME), + ca.organization_name, + ) + self.assertEqual(get_attr('emailAddress', NameOID.EMAIL_ADDRESS), ca.email) + self.assertEqual(get_attr('commonName', NameOID.COMMON_NAME), ca.common_name) + + # issuer is self for a CA + self.assertEqual(issuer, subject) + + # ensure version is 3 (represented as integer 2) + self.assertEqual(cert.version.name, 'v3') + + # check basic constraints extension + ext = cert.extensions.get_extension_for_class(x509.BasicConstraints) + self.assertTrue(ext.critical) + self.assertTrue(ext.value.ca) def test_x509_property(self): ca = self._create_ca() - cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca.certificate) - self.assertEqual(ca.x509.get_subject(), cert.get_subject()) - self.assertEqual(ca.x509.get_issuer(), cert.get_issuer()) + cert = x509.load_pem_x509_certificate( + ca.certificate.encode('utf-8'), default_backend() + ) + + self.assertEqual(ca.x509.subject, cert.subject) + self.assertEqual(ca.x509.issuer, cert.issuer) def test_x509_property_none(self): self.assertIsNone(Ca().x509) def test_pkey_property(self): ca = self._create_ca() - self.assertIsInstance(ca.pkey, crypto.PKey) + self.assertIsInstance(ca.pkey, rsa.RSAPrivateKey) def test_pkey_property_none(self): self.assertIsNone(Ca().pkey) @@ -224,26 +250,39 @@ def test_keyusage_value(self): def test_subject_key_identifier(self): ca = self._create_ca() - e = ca.x509.get_extension(2) - self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier') - self.assertEqual(e.get_critical(), False) - e2 = crypto.X509Extension( - b'subjectKeyIdentifier', False, b'hash', subject=ca.x509 + + # Get the subject key identifier extension + ext = ca.x509.extensions.get_extension_for_oid( + ExtensionOID.SUBJECT_KEY_IDENTIFIER ) - self.assertEqual(e.get_data(), e2.get_data()) + self.assertEqual(ext.oid, ExtensionOID.SUBJECT_KEY_IDENTIFIER) + self.assertFalse(ext.critical) + + # Generate a new subject key identifier using the certificate's subject public key + public_key = ca.x509.public_key() + e2 = SubjectKeyIdentifier(public_key) + + # Compare the extensions' data + self.assertEqual(ext.value, e2) def test_authority_key_identifier(self): ca = self._create_ca() - e = ca.x509.get_extension(3) - self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier') - self.assertEqual(e.get_critical(), False) - e2 = crypto.X509Extension( - b'authorityKeyIdentifier', - False, - b'keyid:always,issuer:always', - issuer=ca.x509, + + # Get the authority key identifier extension + ext = ca.x509.extensions.get_extension_for_oid( + ExtensionOID.AUTHORITY_KEY_IDENTIFIER ) - self.assertEqual(e.get_data(), e2.get_data()) + self.assertEqual(ext.oid, ExtensionOID.AUTHORITY_KEY_IDENTIFIER) + self.assertFalse(ext.critical) + + # Generate the authority key identifier using the issuer's public key + issuer_public_key = ca.x509.issuer_public_key() + e2 = AuthorityKeyIdentifier.from_issuer_subject_key_identifier( + issuer_public_key + ) + + # Compare the extensions' data + self.assertEqual(ext.value, e2) def test_extensions(self): extensions = [ @@ -305,21 +344,39 @@ def test_get_revoked_certs(self): def test_crl(self): ca, cert = self._prepare_revoked() - crl = crypto.load_crl(crypto.FILETYPE_PEM, ca.crl) - revoked_list = crl.get_revoked() + + # Load the CRL + crl = load_pem_x509_crl(ca.crl.encode('utf-8')) + + # Get the revoked certificates list + revoked_list = crl.revoked_certificates + self.assertIsNotNone(revoked_list) self.assertEqual(len(revoked_list), 1) - self.assertEqual(int(revoked_list[0].get_serial()), cert.serial_number) + + # Get the serial number from the revoked certificate and compare + revoked_cert = revoked_list[0] + self.assertEqual(revoked_cert.serial_number, cert.serial_number) def test_crl_view(self): ca, cert = self._prepare_revoked() + + # Simulate a GET request and obtain the CRL in response response = self.client.get(reverse('admin:crl', args=[ca.pk])) self.assertEqual(response.status_code, 200) - crl = crypto.load_crl(crypto.FILETYPE_PEM, response.content) - revoked_list = crl.get_revoked() + + # Load the CRL from the response content + crl = load_pem_x509_crl(response.content) + + # Get the revoked certificates list + revoked_list = crl.revoked_certificates + self.assertIsNotNone(revoked_list) self.assertEqual(len(revoked_list), 1) - self.assertEqual(int(revoked_list[0].get_serial()), cert.serial_number) + + # Get the serial number from the revoked certificate and compare + revoked_cert = revoked_list[0] + self.assertEqual(revoked_cert.serial_number, cert.serial_number) def test_crl_view_403(self): setattr(app_settings, 'CRL_PROTECTED', True) @@ -335,7 +392,7 @@ def test_crl_view_404(self): def test_x509_text(self): ca = self._create_ca() - text = crypto.dump_certificate(crypto.FILETYPE_TEXT, ca.x509) + text = ca.x509.public_bytes(serialization.Encoding.PEM) self.assertEqual(ca.x509_text, text.decode('utf-8')) def test_x509_import_exception_fixed(self): @@ -400,9 +457,12 @@ def test_x509_import_exception_fixed(self): def test_fill_subject_non_strings(self): ca1 = self._create_ca() ca2 = Ca(name='ca', organization_name=ca1) - x509 = crypto.X509() - subject = ca2._fill_subject(x509.get_subject()) - self.assertEqual(subject.organizationName, 'Test CA') + x509 = CertificateBuilder() + subject = ca2._fill_subject(x509.subject) + self.assertEqual( + subject.get_attributes_for_oid(NameAttribute("organizationName"))[0].value, + 'Test CA', + ) # this certificate has an invalid country code problematic_certificate = """-----BEGIN CERTIFICATE----- @@ -500,8 +560,8 @@ def test_import_ca_key_validation_error(self): def test_create_old_serial_ca(self): ca = self._create_ca(serial_number=3) self.assertEqual(int(ca.serial_number), 3) - cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca.certificate) - self.assertEqual(int(cert.get_serial_number()), int(ca.serial_number)) + cert = load_pem_x509_certificate(ca.certificate.encode('utf-8')) + self.assertEqual(int(cert.serial_number), int(ca.serial_number)) def test_bad_serial_number_ca(self): try: @@ -515,47 +575,45 @@ def test_bad_serial_number_ca(self): def test_import_ca_key_with_passphrase(self): ca = Ca(name='ImportTest') ca.certificate = """-----BEGIN CERTIFICATE----- -MIICrzCCAhigAwIBAgIJANCybYj5LwUWMA0GCSqGSIb3DQEBCwUAMG8xCzAJBgNV -BAYTAklOMQwwCgYDVQQIDANhc2QxDDAKBgNVBAcMA2FzZDEMMAoGA1UECgwDYXNk -MQwwCgYDVQQLDANhc2QxDDAKBgNVBAMMA2FzZDEaMBgGCSqGSIb3DQEJARYLYXNk -QGFzZC5hc2QwHhcNMTgwODI5MjExMDQ1WhcNMTkwODI5MjExMDQ1WjBvMQswCQYD -VQQGEwJJTjEMMAoGA1UECAwDYXNkMQwwCgYDVQQHDANhc2QxDDAKBgNVBAoMA2Fz -ZDEMMAoGA1UECwwDYXNkMQwwCgYDVQQDDANhc2QxGjAYBgkqhkiG9w0BCQEWC2Fz -ZEBhc2QuYXNkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBuDdlU20Ydie8 -tmbq2hn8Ski6aSH2IyVVMxUj3+i6QZmoJ4sZzcAMCLPIkCAxby5pP0V6/DSqjxTL -ShYy/7QMCovmj3O+23eYR/JGNAfsk6uDsWJL6OLHTNdx19mL0NioeFNEUJt14Cbz -uqUizT7UdONLer0UK4uP2sE09Eo4cQIDAQABo1MwUTAdBgNVHQ4EFgQURUEc1+ho -on8xaoSU+HU6CRkn0/owHwYDVR0jBBgwFoAURUEc1+hoon8xaoSU+HU6CRkn0/ow -DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOBgQB2zU8qtkXVM25yrL9s -FC5oSqTky2c9KI/hwdsSronSvwaMoASgfl7UjzXlovq9FWZpNSVZ06wetkJVjq5N -Xn3APftPSmKw0J1tzUfZuvq8Z8q6uXQ4B2+BsiCkG/PwXizbKDc29yzXsXTL4+cQ -J7RrWKwDUi/GKVvqc+JjgsQ/nA== ------END CERTIFICATE----- - + MIICrzCCAhigAwIBAgIJANCybYj5LwUWMA0GCSqGSIb3DQEBCwUAMG8xCzAJBgNV + BAYTAklOMQwwCgYDVQQIDANhc2QxDDAKBgNVBAcMA2FzZDEMMAoGA1UECgwDYXNk + MQwwCgYDVQQLDANhc2QxDDAKBgNVBAMMA2FzZDEaMBgGCSqGSIb3DQEJARYLYXNk + QGFzZC5hc2QwHhcNMTgwODI5MjExMDQ1WhcNMTkwODI5MjExMDQ1WjBvMQswCQYD + VQQGEwJJTjEMMAoGA1UECAwDYXNkMQwwCgYDVQQHDANhc2QxDDAKBgNVBAoMA2Fz + ZDEMMAoGA1UECwwDYXNkMQwwCgYDVQQDDANhc2QxGjAYBgkqhkiG9w0BCQEWC2Fz + ZEBhc2QuYXNkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDBuDdlU20Ydie8 + tmbq2hn8Ski6aSH2IyVVMxUj3+i6QZmoJ4sZzcAMCLPIkCAxby5pP0V6/DSqjxTL + ShYy/7QMCovmj3O+23eYR/JGNAfsk6uDsWJL6OLHTNdx19mL0NioeFNEUJt14Cbz + uqUizT7UdONLer0UK4uP2sE09Eo4cQIDAQABo1MwUTAdBgNVHQ4EFgQURUEc1+ho + on8xaoSU+HU6CRkn0/owHwYDVR0jBBgwFoAURUEc1+hoon8xaoSU+HU6CRkn0/ow + DwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOBgQB2zU8qtkXVM25yrL9s + FC5oSqTky2c9KI/hwdsSronSvwaMoASgfl7UjzXlovq9FWZpNSVZ06wetkJVjq5N + Xn3APftPSmKw0J1tzUfZuvq8Z8q6uXQ4B2+BsiCkG/PwXizbKDc29yzXsXTL4+cQ + J7RrWKwDUi/GKVvqc+JjgsQ/nA== + -----END CERTIFICATE----- """ ca.private_key = """-----BEGIN RSA PRIVATE KEY----- -Proc-Type: 4,ENCRYPTED -DEK-Info: DES-EDE3-CBC,D7DDAD38C7462384 - -CUEPD7buBQqv/uipFz/tXYURNcQrY5HKU904IVsKbM233KPA6qU6IaRF6RRxxUtE -ejrmY2es9ZmU63gO/G/16E0CxzWhm3G2pOBsWHsBGGYcMpqZ842E3NoWimfQuRyO -E7TtMKW+Jdl6mzkw8s/KkSeGkGvZFKrclSN37CtkexRn4cXQkhNgPztyeRaQjIBM -SveP2qbODU+lr8g2oUjx05Ftcv1zJin85tzifJlQyaQz8ozKYtHA/RSpLEFZ19HG -mXn4Rvvai8r2zhdqfT/0/G6dABDrhQLxQhPE2MrY0hAlr7DnDrYNQQ/QyGoiAdcR -ee7QUDNfDnjzU6k/EjYPU1827/Kw8R4al8yDtVcUqfDuEsKabot+krEx4IZ5LOk9 -PkcSW8UR0cIm7QE2BzQEzaZKQIpVwjSsSKm+RcFktiCKVun3Sps+GtXBr+AmF5Na -r6xeg+j9kz8lT8F5lnpFTk6c8cD8GDCRiLsFzPo652BQ24dAEPvsSbYmKwr1gEe8 -tfsARqOuvSafQNzqBYFV7abFr8DFiE1Kghk6d6x2u7qVREvOh0RYHRWqsTRf4MMn -WlEnL9zfYST9Ur3gJgBOH2WHboDlQZu1k7yoLMfiGTQSQ2/xg1zS+5IWxt4tg029 -B+f39N5zyDjuGFYcf3J6J4zybHmvdSAa62qxnkeDIbLz4axTU8+hNNOWxIsAh5vs -OO8quCk6DE4j4u3Yzk7810dkJtliwboQiTlitEbCjiyjkOrabIICKMte8nhylZX6 -BxZA3knyYRiB0FNYSxI6YuCIqTjr0AoBvNHdkdjkv2VFomYNBd8ruA== ------END RSA PRIVATE KEY----- + Proc-Type: 4,ENCRYPTED + DEK-Info: DES-EDE3-CBC,D7DDAD38C7462384 + CUEPD7buBQqv/uipFz/tXYURNcQrY5HKU904IVsKbM233KPA6qU6IaRF6RRxxUtE + ejrmY2es9ZmU63gO/G/16E0CxzWhm3G2pOBsWHsBGGYcMpqZ842E3NoWimfQuRyO + E7TtMKW+Jdl6mzkw8s/KkSeGkGvZFKrclSN37CtkexRn4cXQkhNgPztyeRaQjIBM + SveP2qbODU+lr8g2oUjx05Ftcv1zJin85tzifJlQyaQz8ozKYtHA/RSpLEFZ19HG + mXn4Rvvai8r2zhdqfT/0/G6dABDrhQLxQhPE2MrY0hAlr7DnDrYNQQ/QyGoiAdcR + ee7QUDNfDnjzU6k/EjYPU1827/Kw8R4al8yDtVcUqfDuEsKabot+krEx4IZ5LOk9 + PkcSW8UR0cIm7QE2BzQEzaZKQIpVwjSsSKm+RcFktiCKVun3Sps+GtXBr+AmF5Na + r6xeg+j9kz8lT8F5lnpFTk6c8cD8GDCRiLsFzPo652BQ24dAEPvsSbYmKwr1gEe8 + tfsARqOuvSafQNzqBYFV7abFr8DFiE1Kghk6d6x2u7qVREvOh0RYHRWqsTRf4MMn + WlEnL9zfYST9Ur3gJgBOH2WHboDlQZu1k7yoLMfiGTQSQ2/xg1zS+5IWxt4tg029 + B+f39N5zyDjuGFYcf3J6J4zybHmvdSAa62qxnkeDIbLz4axTU8+hNNOWxIsAh5vs + OO8quCk6DE4j4u3Yzk7810dkJtliwboQiTlitEbCjiyjkOrabIICKMte8nhylZX6 + BxZA3knyYRiB0FNYSxI6YuCIqTjr0AoBvNHdkdjkv2VFomYNBd8ruA== + -----END RSA PRIVATE KEY----- """ ca.passphrase = 'test123' ca.full_clean() ca.save() - self.assertIsInstance(ca.pkey, crypto.PKey) + self.assertIsInstance(ca.pkey, rsa.RSAPrivateKey) def test_import_ca_key_with_incorrect_passphrase(self): ca = Ca(name='ImportTest') @@ -610,7 +668,7 @@ def test_generate_ca_with_passphrase(self): ca = self._create_ca(passphrase='123') ca.full_clean() ca.save() - self.assertIsInstance(ca.pkey, crypto.PKey) + self.assertIsInstance(ca.pkey, rsa.RSAPrivateKey) def test_datetime_to_string(self): generalized_datetime = datetime(2050, 1, 1, 0, 0, 0, 0) diff --git a/requirements.txt b/requirements.txt index ba1e22f..5d7f405 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,4 @@ django>=4.2.0,<=5.3.0 openwisp-utils @ https://github.com/openwisp/openwisp-utils/tarball/1.2 jsonfield>=3.1.0,<4.0.0 -cryptography~=43.0.3 -pyOpenSSL~=24.2.1 +cryptography~=44.0.2