diff --git a/sunbeam-python/sunbeam/features/interface/utils.py b/sunbeam-python/sunbeam/features/interface/utils.py index af68f811c..8b1804b6c 100644 --- a/sunbeam-python/sunbeam/features/interface/utils.py +++ b/sunbeam-python/sunbeam/features/interface/utils.py @@ -28,6 +28,11 @@ LOG = logging.getLogger() +def normalize_pem(pem_bytes: bytes) -> bytes: + """Normalize pem line endings to LF.""" + return pem_bytes.replace(b"\r\n", b"\n").replace(b"\r", b"\n") + + def get_all_registered_groups(cli: click.Group) -> dict: """Get all the registered groups from cli object. @@ -91,9 +96,9 @@ def validate_ca_certificate( ctx: click.core.Context, param: click.core.Option, value: str ) -> str: try: - ca_bytes = base64.b64decode(value) + ca_bytes = normalize_pem(base64.b64decode(value)) x509.load_pem_x509_certificate(ca_bytes) - return value + return base64.b64encode(ca_bytes).decode() except (binascii.Error, TypeError, ValueError) as e: LOG.debug(e) raise click.BadParameter(str(e)) @@ -106,7 +111,7 @@ def validate_ca_chain( return None try: - chain_bytes = base64.b64decode(value) + chain_bytes = normalize_pem(base64.b64decode(value)) chain_list = re.findall( pattern=( "(?=-----BEGIN CERTIFICATE-----)(.*?)(?<=-----END CERTIFICATE-----)" @@ -120,7 +125,7 @@ def validate_ca_chain( for cert in chain_list: x509.load_pem_x509_certificate(cert.encode()) - return value + return base64.b64encode(chain_bytes).decode() # Check if the chain is in correct order for i in range(len(chain_list) - 1): @@ -128,7 +133,7 @@ def validate_ca_chain( issuer = x509.load_pem_x509_certificate(chain_list[i + 1].encode()) cert.verify_directly_issued_by(issuer) - return value + return base64.b64encode(chain_bytes).decode() except ( binascii.Error, TypeError, @@ -211,13 +216,6 @@ def generate_ca_chain(certificate: str, ca_certificate: str, ca_chain: str) -> s if not certificate_decoded or not ca_certificate_decoded or not ca_chain_decoded: raise binascii.Error("Unable to decode one of the certificates") - # Normalize line endings to LF to ensure consistent comparison and output. - # Certificates with CRLF line endings may otherwise be treated as different - # from equivalent certificates with LF line endings. - certificate_decoded = certificate_decoded.replace("\r\n", "\n") - ca_certificate_decoded = ca_certificate_decoded.replace("\r\n", "\n") - ca_chain_decoded = ca_chain_decoded.replace("\r\n", "\n") - # If ca_certificate is already part of ca_chain, do not add it to the final ca chain # manual-tls-certificates checks if the final ca_chain is in proper order and each # certificate is signed by the successor one. diff --git a/sunbeam-python/sunbeam/features/tls/common.py b/sunbeam-python/sunbeam/features/tls/common.py index 3ef7b147b..8746bf01d 100644 --- a/sunbeam-python/sunbeam/features/tls/common.py +++ b/sunbeam-python/sunbeam/features/tls/common.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2024 - Canonical Ltd # SPDX-License-Identifier: Apache-2.0 +import base64 import binascii import json import logging @@ -40,6 +41,7 @@ generate_ca_chain, get_subject_from_csr, is_certificate_valid, + normalize_pem, ) from sunbeam.features.interface.v1.base import BaseFeatureGroup from sunbeam.features.interface.v1.openstack import ( @@ -625,15 +627,20 @@ def prompt( if not cert or not is_certificate_valid(cert): raise click.ClickException("Not a valid certificate") + # Normalize CRLF to LF in the leaf cert before storing + cert_normalized = base64.b64encode( + normalize_pem(base64.b64decode(cert)) + ).decode() + self.process_certs[subject] = { "app": app, "unit": unit_name, "relation_id": relation_id, "csr": csr, - "certificate": cert, + "certificate": cert_normalized, } variables["certificates"].setdefault(subject, {}) - variables["certificates"][subject]["certificate"] = cert + variables["certificates"][subject]["certificate"] = cert_normalized questions.write_answers(self.client, self._CONFIG, variables) diff --git a/sunbeam-python/tests/unit/sunbeam/features/test_tls.py b/sunbeam-python/tests/unit/sunbeam/features/test_tls.py index f687760d2..ce475bcfe 100644 --- a/sunbeam-python/tests/unit/sunbeam/features/test_tls.py +++ b/sunbeam-python/tests/unit/sunbeam/features/test_tls.py @@ -46,6 +46,21 @@ def is_certificate_valid(): yield p +@pytest.fixture() +def normalize_pem(): + with patch.object(tls, "normalize_pem", return_value=b"fake-cert-bytes") as p: + yield p + + +@pytest.fixture() +def b64_encode_decode(): + with ( + patch.object(tls.base64, "b64decode", return_value=b"fake-cert-bytes"), + patch.object(tls.base64, "b64encode", return_value=b"fake-cert-encoded"), + ): + yield + + @pytest.fixture() def get_outstanding(): with patch.object( @@ -236,6 +251,7 @@ def test_prompt( write_answers, get_subject_from_csr, is_certificate_valid, + b64_encode_decode, ): certs_to_process = [ { diff --git a/sunbeam-python/tests/unit/sunbeam/features/test_utils.py b/sunbeam-python/tests/unit/sunbeam/features/test_utils.py index c24071cac..a5f9c35ce 100644 --- a/sunbeam-python/tests/unit/sunbeam/features/test_utils.py +++ b/sunbeam-python/tests/unit/sunbeam/features/test_utils.py @@ -1,10 +1,15 @@ # SPDX-FileCopyrightText: 2023 - Canonical Ltd # SPDX-License-Identifier: Apache-2.0 +import base64 +from unittest.mock import patch + from sunbeam.features.interface.utils import ( decode_base64_as_string, encode_base64_as_string, generate_ca_chain, + validate_ca_certificate, + validate_ca_chain, ) @@ -23,27 +28,31 @@ def test_generate_ca_chain(): assert ca_chain_decoded == expected_chain -def test_generate_ca_chain_deduplicates_ca_cert_with_crlf(): - """Test that ca_certificate with CRLF endings is not duplicated when +def test_validate_ca_certificate_normalizes_crlf(): + """validate_ca_certificate strips CRLF and returns clean base64.""" + cert_crlf = b"-----BEGIN CERTIFICATE-----\r\nDATA\r\n-----END CERTIFICATE-----\r\n" + cert_lf = b"-----BEGIN CERTIFICATE-----\nDATA\n-----END CERTIFICATE-----\n" + encoded = base64.b64encode(cert_crlf).decode() - ca_chain already contains an equivalent certificate with LF endings. - """ - cert1 = "CERT1" - # ca_certificate uses CRLF line endings - cert2_crlf = ( - "-----BEGIN CERTIFICATE-----\r\nISSUINGCA\r\n-----END CERTIFICATE-----\r\n" - ) - # ca_chain already contains the same cert with LF line endings - cert2_lf = "-----BEGIN CERTIFICATE-----\nISSUINGCA\n-----END CERTIFICATE-----\n" - cert3 = "-----BEGIN CERTIFICATE-----\nROOTCA\n-----END CERTIFICATE-----\n" - ca_chain_input = cert2_lf + "\n" + cert3 + with patch("sunbeam.features.interface.utils.x509.load_pem_x509_certificate"): + result = validate_ca_certificate(None, None, encoded) - ca_chain = generate_ca_chain( - encode_base64_as_string(cert1), - encode_base64_as_string(cert2_crlf), - encode_base64_as_string(ca_chain_input), + assert base64.b64decode(result) == cert_lf + + +def test_validate_ca_chain_normalizes_crlf(): + """validate_ca_chain strips CRLF and returns clean base64.""" + chain_crlf = ( + b"-----BEGIN CERTIFICATE-----\r\nISSUINGCA\r\n-----END CERTIFICATE-----\r\n" + b"-----BEGIN CERTIFICATE-----\r\nROOTCA\r\n-----END CERTIFICATE-----\r\n" ) - ca_chain_decoded = decode_base64_as_string(ca_chain) - # cert2 must appear only once in the output - assert ca_chain_decoded.count("ISSUINGCA") == 1 - assert ca_chain_decoded.count("ROOTCA") == 1 + chain_lf = ( + b"-----BEGIN CERTIFICATE-----\nISSUINGCA\n-----END CERTIFICATE-----\n" + b"-----BEGIN CERTIFICATE-----\nROOTCA\n-----END CERTIFICATE-----\n" + ) + encoded = base64.b64encode(chain_crlf).decode() + + with patch("sunbeam.features.interface.utils.x509.load_pem_x509_certificate"): + result = validate_ca_chain(None, None, encoded) + + assert base64.b64decode(result) == chain_lf