diff --git a/config.yaml b/config.yaml index 6a21a7c4..9482e762 100644 --- a/config.yaml +++ b/config.yaml @@ -183,6 +183,18 @@ options: description: | A cookie encryption key used for API authentication flows. The value should be 32 url-safe, base64 encoded bytes. If not set, one will be generated securely. + gpg_secret_id: + type: string + default: + description: | + Juju secret URI (e.g. secret:xxxx) containing the GPG credentials used by + Landscape Server to sign repository metadata. The secret must contain two fields: + - gpg-passphrase: the GPG key passphrase + - gpg-private-key: the ASCII-armored GPG private key + The secret must be granted to the landscape-server application before setting + this option. When set, the charm writes /etc/landscape/gpg-passphrase and + imports the private key into /etc/landscape/gpg on install and whenever the + secret is rotated. min_install: type: boolean default: False diff --git a/src/charm.py b/src/charm.py index 04cefc52..e9dff9f3 100755 --- a/src/charm.py +++ b/src/charm.py @@ -58,6 +58,7 @@ MaintenanceStatus, ModelError, Relation, + SecretNotFoundError, WaitingStatus, ) from pydantic import ValidationError @@ -100,6 +101,9 @@ HASH_ID_DATABASES = "/opt/canonical/landscape/hash-id-databases-ignore-maintenance" UPDATE_WSL_DISTRIBUTIONS_SCRIPT = "/opt/canonical/landscape/update-wsl-distributions" +GPG_HOME_DIR = "/etc/landscape-server/gpg" +GPG_PASSPHRASE_FILE = "/etc/landscape-server/gpg-passphrase.txt" + LANDSCAPE_SERVER = "landscape-server" LANDSCAPE_PACKAGES = ( LANDSCAPE_SERVER, @@ -274,6 +278,8 @@ def __init__(self, *args): self.on.get_service_conf_action, self._on_get_service_conf_action ) + # Secrets + self.framework.observe(self.on.secret_changed, self._on_secret_changed) # SMTP self.smtp = SmtpRequires(self) self.framework.observe( @@ -536,6 +542,8 @@ def _on_config_changed(self, _) -> None: self._write_cookie_encryption_key(cookie_encryption_key) self._stored.cookie_encryption_key = cookie_encryption_key + self._configure_gpg() + self._update_ready_status(restart_services=True) self._provide_all_haproxy_route_requirements() @@ -600,6 +608,93 @@ def _write_cookie_encryption_key(self, cookie_encryption_key): logger.info("Writing cookie encryption key") update_service_conf({"api": {"cookie-encryption-key": cookie_encryption_key}}) + def _configure_gpg(self) -> bool: + """Write GPG credentials from the configured Juju secret. + + Writes the passphrase to GPG_PASSPHRASE_FILE and imports the private + key into GPG_HOME_DIR. Returns True when GPG was configured + successfully, False when no secret is configured. Sets a BlockedStatus + and returns False on error. + """ + secret_id = self.charm_config.gpg_secret_id + if not secret_id: + logger.debug("gpg_secret_id not configured, skipping GPG setup") + return False + + self.unit.status = MaintenanceStatus("Configuring GPG credentials") + + try: + secret = self.model.get_secret(id=secret_id) + content = secret.get_content(refresh=True) + except (SecretNotFoundError, ModelError): + logger.warning("GPG secret '%s' not found or not accessible", secret_id) + self.unit.status = BlockedStatus("GPG secret not found or not accessible") + return False + + passphrase = content.get("gpg-passphrase") + private_key = content.get("gpg-private-key") + + if not passphrase or not private_key: + logger.warning( + "GPG secret is missing required fields: " + "'gpg-passphrase' and/or 'gpg-private-key'" + ) + self.unit.status = BlockedStatus("GPG secret missing required fields") + return False + + landscape_user = user_exists("landscape") + landscape_uid = landscape_user.pw_uid + landscape_gid = landscape_user.pw_gid + + os.makedirs(GPG_HOME_DIR, mode=0o700, exist_ok=True) + # Enforce 0700 explicitly — makedirs is subject to umask + os.chmod(GPG_HOME_DIR, 0o700) + os.chown(GPG_HOME_DIR, landscape_uid, landscape_gid) + + # Set umask to 0o177 so open() creates the file with 0o600 immediately, + # avoiding a window where it could be world-readable + old_umask = os.umask(0o177) + try: + with open(GPG_PASSPHRASE_FILE, "w") as fp: + fp.write(passphrase) + finally: + os.umask(old_umask) + os.chown(GPG_PASSPHRASE_FILE, landscape_uid, landscape_gid) + + try: + subprocess.run( + [ + "gpg", + "--homedir", + GPG_HOME_DIR, + "--batch", + "--passphrase-file", + GPG_PASSPHRASE_FILE, + "--import", + ], + input=private_key, + check=True, + text=True, + capture_output=True, + ) + except subprocess.CalledProcessError as e: + logger.error("Failed to import GPG private key: %s", e.stderr) + self.unit.status = BlockedStatus("Failed to import GPG key") + return False + + logger.info("GPG credentials configured successfully") + self.unit.status = WaitingStatus("Waiting on relations") + return True + + def _on_secret_changed(self, event) -> None: + """Re-configure GPG credentials when the GPG secret is rotated.""" + secret_id = self.charm_config.gpg_secret_id + if not secret_id or event.secret.id != secret_id: + return + + if self._configure_gpg(): + self._update_ready_status(restart_services=True) + def _on_upgrade_charm(self, _: UpgradeCharmEvent) -> None: self._provide_all_haproxy_route_requirements() @@ -661,12 +756,13 @@ def _on_install(self, event: InstallEvent) -> None: write_license_file( license_file, user_exists("landscape").pw_uid, self.root_gid ) - - self.unit.status = ActiveStatus("Unit is ready") + self.unit.status = WaitingStatus("Waiting on relations") # Indicate that this install is a charm install. prepend_default_settings({"DEPLOYED_FROM": "charm"}) + self._configure_gpg() + self._update_ready_status() def _update_status(self, event: UpdateStatusEvent) -> None: diff --git a/src/config.py b/src/config.py index e33705e8..4a899bb0 100644 --- a/src/config.py +++ b/src/config.py @@ -63,6 +63,7 @@ def landscape_ppas(self) -> list[str]: additional_service_config: str | None = None secret_token: str | None = None cookie_encryption_key: str | None = None + gpg_secret_id: str | None = None min_install: bool prometheus_scrape_interval: str autoregistration: bool diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 4662435f..c2fe3f4b 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -8,10 +8,11 @@ import json import os from pwd import struct_passwd +import subprocess from subprocess import CalledProcessError from tempfile import TemporaryDirectory import unittest -from unittest.mock import ANY, call, DEFAULT, Mock, patch, PropertyMock +from unittest.mock import ANY, call, DEFAULT, Mock, mock_open, patch, PropertyMock from charms.operator_libs_linux.v0 import apt from charms.operator_libs_linux.v0.apt import PackageError, PackageNotFoundError @@ -23,6 +24,7 @@ MaintenanceStatus, PeerRelation, Relation, + Secret, State, StoredState, TCPPort, @@ -31,6 +33,8 @@ from charm import ( DEFAULT_SERVICES, get_modified_env_vars, + GPG_HOME_DIR, + GPG_PASSPHRASE_FILE, HASH_ID_DATABASES, LANDSCAPE_PACKAGES, LANDSCAPE_UBUNTU_INSTALLER_ATTACH, @@ -2126,6 +2130,204 @@ def test_action_get_service_conf(monkeypatch): assert json.loads(ctx.action_results["config"]) == conf +class TestGPGConfiguration: + """Tests for GPG credential management via Juju secrets.""" + + GPG_SECRET_ID = "secret:test-gpg-secret-id" + PASSPHRASE = "my-test-passphrase" + PRIVATE_KEY = ( + "-----BEGIN PGP PRIVATE KEY BLOCK-----\ntest" + "\n-----END PGP PRIVATE KEY BLOCK-----" + ) + + def _make_gpg_secret(self, secret_id=None): + return Secret( + id=secret_id or self.GPG_SECRET_ID, + tracked_content={ + "gpg-passphrase": self.PASSPHRASE, + "gpg-private-key": self.PRIVATE_KEY, + }, + ) + + def test_no_secret_configured_skips_gpg(self, replicas_network_state): + """When gpg_secret_id is not set, _configure_gpg does nothing.""" + ctx = Context(LandscapeServerCharm) + state_in = State(**replicas_network_state) + + with patch("charm.os.makedirs") as mock_makedirs: + state_out = ctx.run(ctx.on.config_changed(), state_in) + + mock_makedirs.assert_not_called() + assert not isinstance(state_out.unit_status, BlockedStatus) + + def test_secret_not_found_sets_blocked(self, replicas_network_state): + """When the secret does not exist, the unit enters BlockedStatus.""" + ctx = Context(LandscapeServerCharm) + state_in = State( + **replicas_network_state, + config={"gpg_secret_id": self.GPG_SECRET_ID}, + ) + + state_out = ctx.run(ctx.on.config_changed(), state_in) + + assert isinstance(state_out.unit_status, BlockedStatus) + assert "not found" in state_out.unit_status.message + + def test_secret_missing_fields_sets_blocked(self, replicas_network_state): + """When the secret exists but lacks required fields, BlockedStatus is set.""" + ctx = Context(LandscapeServerCharm) + incomplete_secret = Secret( + id=self.GPG_SECRET_ID, + tracked_content={"gpg-passphrase": self.PASSPHRASE}, + ) + state_in = State( + **replicas_network_state, + config={"gpg_secret_id": self.GPG_SECRET_ID}, + secrets=[incomplete_secret], + ) + + state_out = ctx.run(ctx.on.config_changed(), state_in) + + assert isinstance(state_out.unit_status, BlockedStatus) + assert "missing required fields" in state_out.unit_status.message + + def test_gpg_import_failure_sets_blocked(self, replicas_network_state): + """When gpg --import fails, BlockedStatus is set.""" + ctx = Context(LandscapeServerCharm) + secret = self._make_gpg_secret() + state_in = State( + **replicas_network_state, + config={"gpg_secret_id": self.GPG_SECRET_ID}, + secrets=[secret], + ) + mock_landscape = Mock() + mock_landscape.pw_uid = 1000 + mock_landscape.pw_gid = 1001 + + with ( + patch("charm.os.makedirs"), + patch("charm.os.chmod"), + patch("charm.os.chown"), + patch("charm.os.umask", return_value=0o022), + patch("charm.user_exists", return_value=mock_landscape), + patch("charm.subprocess.run") as mock_run, + patch("builtins.open", mock_open()), + ): + mock_run.side_effect = subprocess.CalledProcessError( + 1, "gpg", stderr="bad key" + ) + state_out = ctx.run(ctx.on.config_changed(), state_in) + + assert isinstance(state_out.unit_status, BlockedStatus) + assert "Failed to import GPG key" in state_out.unit_status.message + + def test_gpg_configured_successfully(self, replicas_network_state): + """When credentials are valid, files are written with correct permissions.""" + ctx = Context(LandscapeServerCharm) + secret = self._make_gpg_secret() + state_in = State( + **replicas_network_state, + config={"gpg_secret_id": self.GPG_SECRET_ID}, + secrets=[secret], + ) + mock_landscape = Mock() + mock_landscape.pw_uid = 1000 + mock_landscape.pw_gid = 1001 + + with ( + patch("charm.os.makedirs") as mock_makedirs, + patch("charm.os.chmod") as mock_chmod, + patch("charm.os.chown") as mock_chown, + patch("charm.os.umask", return_value=0o022) as mock_umask, + patch("charm.user_exists", return_value=mock_landscape), + patch("charm.subprocess.run") as mock_run, + patch("builtins.open", mock_open()) as mock_file, + ): + mock_run.return_value = subprocess.CompletedProcess([], 0) + ctx.run(ctx.on.config_changed(), state_in) + + mock_makedirs.assert_called_once_with(GPG_HOME_DIR, mode=0o700, exist_ok=True) + mock_chmod.assert_any_call(GPG_HOME_DIR, 0o700) + mock_chown.assert_any_call(GPG_HOME_DIR, 1000, 1001) + mock_umask.assert_any_call(0o177) + mock_file.assert_any_call(GPG_PASSPHRASE_FILE, "w") + mock_chown.assert_any_call(GPG_PASSPHRASE_FILE, 1000, 1001) + mock_run.assert_called_once_with( + [ + "gpg", + "--homedir", + GPG_HOME_DIR, + "--batch", + "--passphrase-file", + GPG_PASSPHRASE_FILE, + "--import", + ], + input=self.PRIVATE_KEY, + check=True, + text=True, + capture_output=True, + ) + + def test_secret_changed_ignores_different_secret(self, replicas_network_state): + """secret-changed for an unrelated secret ID is ignored.""" + ctx = Context(LandscapeServerCharm) + other_secret = Secret( + id="secret:other-secret-id", + tracked_content={"key": "value"}, + ) + state_in = State( + **replicas_network_state, + config={"gpg_secret_id": self.GPG_SECRET_ID}, + secrets=[other_secret], + ) + + with patch("charm.os.makedirs") as mock_makedirs: + ctx.run(ctx.on.secret_changed(other_secret), state_in) + + mock_makedirs.assert_not_called() + + def test_secret_changed_reconfigures_gpg(self, replicas_network_state): + """secret-changed for the GPG secret triggers reconfiguration.""" + ctx = Context(LandscapeServerCharm) + secret = self._make_gpg_secret() + state_in = State( + **replicas_network_state, + config={"gpg_secret_id": self.GPG_SECRET_ID}, + secrets=[secret], + ) + mock_landscape = Mock() + mock_landscape.pw_uid = 1000 + mock_landscape.pw_gid = 1001 + + with ( + patch("charm.os.makedirs"), + patch("charm.os.chmod"), + patch("charm.os.chown"), + patch("charm.os.umask", return_value=0o022), + patch("charm.user_exists", return_value=mock_landscape), + patch("charm.subprocess.run") as mock_run, + patch("builtins.open", mock_open()), + ): + mock_run.return_value = subprocess.CompletedProcess([], 0) + ctx.run(ctx.on.secret_changed(secret), state_in) + + mock_run.assert_called_once_with( + [ + "gpg", + "--homedir", + GPG_HOME_DIR, + "--batch", + "--passphrase-file", + GPG_PASSPHRASE_FILE, + "--import", + ], + input=self.PRIVATE_KEY, + check=True, + text=True, + capture_output=True, + ) + + class TestSmtpIntegration(unittest.TestCase): def setUp(self): self.harness = Harness(LandscapeServerCharm)