diff --git a/utils/utils.py b/utils/utils.py index 4a859fa..5b94dcc 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -170,8 +170,55 @@ def create_pfx(certpath, keypath, pfxpath): return def extract_pfx(pfxpath, certpath, keypath): - subprocess.run(f'openssl pkcs12 -in {pfxpath} -nodes -password pass:password -out {certpath} -clcerts', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) - subprocess.run(f'openssl pkcs12 -in {pfxpath} -nodes -password pass:password -out {keypath} -nocerts -nodes', shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) + """ + Extract certificate and private key from a PKCS12 (PFX) file. + Uses Python's cryptography library for reliable PEM formatting. + """ + # Read the PFX file + with open(pfxpath, 'rb') as pfx_file: + pfx_data = pfx_file.read() + + # Load the PKCS12 file - try with password first, then without + private_key = None + certificate = None + additional_certificates = None + + try: + private_key, certificate, additional_certificates = serialization.pkcs12.load_key_and_certificates( + pfx_data, + b'password', + backend=default_backend() + ) + except ValueError: + # Try without password if password-protected attempt fails + try: + private_key, certificate, additional_certificates = serialization.pkcs12.load_key_and_certificates( + pfx_data, + None, + backend=default_backend() + ) + except ValueError as e: + raise Exception(f'Failed to load PFX file (wrong password or corrupted file): {str(e)}') + + if private_key is None: + raise Exception('No private key found in PFX file') + if certificate is None: + raise Exception('No certificate found in PFX file') + + # Write certificate to PEM file (only the client certificate, not additional CA certs) + with open(certpath, 'wb') as cert_file: + cert_pem = certificate.public_bytes(serialization.Encoding.PEM) + cert_file.write(cert_pem) + + # Write private key to PEM file (unencrypted) + with open(keypath, 'wb') as key_file: + key_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + key_file.write(key_pem) + return def get_str_and_next(blob, start): @@ -207,4 +254,4 @@ def aes_decrypt(key, iv, content): cipher = Cipher(algorithms.AES(base64.b64decode(key)), modes.CBC(base64.b64decode(iv)), backend=default_backend()) decryptor = cipher.decryptor() decrypted_data = decryptor.update(content) + decryptor.finalize() - return decrypted_data \ No newline at end of file + return decrypted_data