diff --git a/src/backups.py b/src/backups.py index e983c61ecf7..9e40ae26324 100644 --- a/src/backups.py +++ b/src/backups.py @@ -60,6 +60,18 @@ FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE, FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE, ] +STANDBY_CLUSTER_CREATE_BACKUP_ERROR_MESSAGE = ( + "Backups are not supported on a standby cluster. " + "Run create-backup on the primary cluster instead." +) +STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE = ( + "Backups are not supported on a standby cluster. " + "Run list-backups on the primary cluster instead." +) +STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE = ( + "Restoring backups is not supported on a standby cluster. " + "Run restore on the primary cluster instead." +) class ListBackupsError(Exception): @@ -153,6 +165,9 @@ def _can_initialise_stanza(self) -> bool: def _can_unit_perform_backup(self) -> tuple[bool, str | None]: """Validates whether this unit can perform a backup.""" + if self._is_standby_cluster(): + return False, STANDBY_CLUSTER_CREATE_BACKUP_ERROR_MESSAGE + if self.charm.is_blocked: return False, "Unit is in a blocking state" @@ -183,6 +198,16 @@ def _can_unit_perform_backup(self) -> tuple[bool, str | None]: return self._are_backup_settings_ok() + def _is_standby_cluster(self) -> bool: + """Return whether this unit belongs to a standby cluster.""" + if ( + self.model.get_relation("replication") is None + and self.model.get_relation("replication-offer") is None + ): + return False + + return not self.charm.async_replication.is_primary_cluster() + def can_use_s3_repository(self) -> tuple[bool, str | None]: """Returns whether the charm was configured to use another cluster repository.""" # Check model uuid @@ -1060,6 +1085,11 @@ def _run_backup( def _on_list_backups_action(self, event) -> None: """List the previously created backups.""" + if self._is_standby_cluster(): + logger.warning(STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE) + event.fail(STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE) + return + are_backup_settings_ok, validation_message = self._are_backup_settings_ok() if not are_backup_settings_ok: logger.warning(validation_message) @@ -1239,6 +1269,11 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: Returns: a boolean indicating whether restore should be run. """ + if self._is_standby_cluster(): + logger.error(f"Restore failed: {STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE}") + event.fail(STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE) + return False + are_backup_settings_ok, validation_message = self._are_backup_settings_ok() if not are_backup_settings_ok: logger.error(f"Restore failed: {validation_message}") diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 2939f06ca56..549202fdd1c 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -434,11 +434,22 @@ def _handle_replication_change(self, event: ActionEvent) -> bool: return False relation = self._relation + if relation is None: + event.fail("Replication relation not found") + return False - # Check if all units from the other cluster published their pod IPs in the relation data. + # Ensure the relation has at least one remote unit before trying to process unit data. + remote_units = [unit for unit in relation.units if unit.app == relation.app] + if len(remote_units) == 0: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return False + + # Check if all units from the other cluster published their IPs in the relation data. # If not, fail the action telling that all units must publish their pod addresses in the # relation data. - for unit in relation.units: + for unit in remote_units: if "unit-address" not in relation.data[unit]: event.fail( "All units from the other cluster must publish their pod addresses in the relation data." @@ -641,12 +652,20 @@ def _primary_cluster_endpoint(self) -> str: def _re_emit_async_relation_changed_event(self) -> None: """Re-emit the async relation changed event.""" - relation = self._relation - getattr(self.charm.on, f"{relation.name.replace('-', '_')}_relation_changed").emit( - relation, - app=relation.app, - unit=next(unit for unit in relation.units if unit.app == relation.app), - ) + if relation := self._relation: + relation_unit = next( + (unit for unit in relation.units if unit.app == relation.app), None + ) + if relation_unit is None: + logger.debug( + "Skipping re-emitting relation-changed event: no related units found yet." + ) + return + getattr(self.charm.on, f"{relation.name.replace('-', '_')}_relation_changed").emit( + relation, + app=relation.app, + unit=relation_unit, + ) def _reinitialise_pgdata(self) -> None: """Reinitialise the pgdata folder.""" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b70aa7a6f80..74746c14083 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,7 +4,6 @@ import json import logging import os -import socket import subprocess import uuid @@ -21,6 +20,17 @@ logger = logging.getLogger(__name__) +def _get_non_loopback_host_ip() -> str: + """Return the host IP reachable by LXD containers.""" + output = subprocess.run( + ["ip", "-4", "route", "get", "1.1.1.1"], + check=True, + capture_output=True, + text=True, + ).stdout + return output.split("src", maxsplit=1)[1].split()[0] + + @pytest.fixture(scope="session") def charm(): # Return str instead of pathlib.Path since python-libjuju's model.deploy(), juju deploy, and @@ -151,7 +161,7 @@ def microceph(): ], check=True, ) - host_ip = socket.gethostbyname(socket.gethostname()) + host_ip = _get_non_loopback_host_ip() subprocess.run( f'echo "subjectAltName = IP:{host_ip}" > ./extfile.cnf', shell=True, @@ -209,7 +219,7 @@ def microceph(): key_id = key["access_key"] secret_key = key["secret_key"] logger.info("Set up microceph") - host_ip = socket.gethostbyname(socket.gethostname()) + host_ip = _get_non_loopback_host_ip() result = subprocess.run( "base64 -w0 ./ca.crt", shell=True, check=True, stdout=subprocess.PIPE, text=True ) diff --git a/tests/integration/ha_tests/test_async_replication_backups_ceph.py b/tests/integration/ha_tests/test_async_replication_backups_ceph.py new file mode 100644 index 00000000000..19b7db6134f --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication_backups_ceph.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. +import logging +from asyncio import gather + +import pytest +from juju.model import Model +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, retry_if_exception_message, stop_after_delay, wait_fixed + +from .. import markers +from ..conftest import ConnectionInformation +from ..helpers import DATABASE_APP_NAME, get_leader_unit, get_primary +from .conftest import fast_forward + +logger = logging.getLogger(__name__) + +CLUSTER_SIZE = 3 +FAST_INTERVAL = "10s" +IDLE_PERIOD = 5 +TIMEOUT = 2000 + +PRIMARY_S3_APP = "s3-primary" +STANDBY_S3_APP = "s3-standby" +EXPECTED_STANDBY_BACKUP_MESSAGE = ( + "Backups are not supported on a standby cluster. " + "Run create-backup on the primary cluster instead." +) + + +async def _ensure_postgresql_deployed(model: Model, charm: str) -> None: + """Deploy PostgreSQL in the given model when it is not already present.""" + if DATABASE_APP_NAME not in model.applications: + await model.deploy(charm, num_units=CLUSTER_SIZE, config={"profile": "testing"}) + + +async def _configure_s3_integrator( + model: Model, + app_name_to_deploy: str, + microceph: ConnectionInformation, +) -> None: + """Deploy and configure one S3 integrator app against microceph RGW.""" + if app_name_to_deploy not in model.applications: + await model.deploy( + "s3-integrator", application_name=app_name_to_deploy, channel="1/stable" + ) + await model.wait_for_idle( + apps=[app_name_to_deploy], idle_period=IDLE_PERIOD, timeout=TIMEOUT + ) + + await model.applications[app_name_to_deploy].set_config({ + "endpoint": f"https://{microceph.host}", + "bucket": f"{app_name_to_deploy}-bucket", + "path": "/pg", + "region": "", + "s3-uri-style": "path", + "tls-ca-chain": microceph.cert, + }) + + action = await model.units.get(f"{app_name_to_deploy}/0").run_action( + "sync-s3-credentials", + **{ + "access-key": microceph.access_key_id, + "secret-key": microceph.secret_access_key, + }, + ) + await action.wait() + + await model.relate(DATABASE_APP_NAME, app_name_to_deploy) + + +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_standby_backup_rejected_with_clear_message( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + microceph: ConnectionInformation, + charm: str, +) -> None: + """Validate backup behavior with async replication and Ceph-backed S3 configuration. + + This test mirrors the live scenario: + 1. Two PostgreSQL clusters in separate models. + 2. Ceph-backed s3-integrator configured in each model. + 3. Async replication created between clusters. + 4. Backup succeeds on primary cluster and fails with clear message on standby. + """ + await _ensure_postgresql_deployed(first_model, charm) + await _ensure_postgresql_deployed(second_model, charm) + + await _configure_s3_integrator(first_model, PRIMARY_S3_APP, microceph) + await _configure_s3_integrator(second_model, STANDBY_S3_APP, microceph) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, PRIMARY_S3_APP], + status="active", + idle_period=IDLE_PERIOD, + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME, STANDBY_S3_APP], + status="active", + idle_period=IDLE_PERIOD, + timeout=TIMEOUT, + ), + ) + + # Cross-model replication wiring. + offer_command = f"offer {DATABASE_APP_NAME}:replication-offer replication-offer" + offer_rc, _, offer_stderr = await ops_test.juju(*offer_command.split()) + assert offer_rc == 0, f"offer failed: {offer_stderr}" + + consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication-offer" + ) + consume_rc, consume_stdout, consume_stderr = await ops_test.juju(*consume_command.split()) + assert consume_rc == 0, f"consume failed: {consume_stderr or consume_stdout}" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, PRIMARY_S3_APP], + status="active", + idle_period=IDLE_PERIOD, + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME, STANDBY_S3_APP], + status="active", + idle_period=IDLE_PERIOD, + timeout=TIMEOUT, + ), + ) + + for attempt in Retrying( + stop=stop_after_delay(180), + wait=wait_fixed(5), + retry=retry_if_exception_message(match='application "replication-offer" not found'), + reraise=True, + ): + with attempt: + await second_model.relate(DATABASE_APP_NAME, "replication-offer") + + # Promote first model as primary cluster in async replication. + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=first_model) + assert leader_unit is not None, "No leader unit found in primary model" + create_replication = await leader_unit.run_action("create-replication") + await create_replication.wait() + assert create_replication.results.get("return-code") == 0, "create-replication failed" + + # Primary backup should succeed. + primary_unit_name = await get_primary(ops_test, f"{DATABASE_APP_NAME}/0", model=first_model) + replica_unit_name = next( + unit.name + for unit in first_model.applications[DATABASE_APP_NAME].units + if unit.name != primary_unit_name + ) + primary_backup_action = await first_model.units[replica_unit_name].run_action("create-backup") + await primary_backup_action.wait() + assert primary_backup_action.results.get("return-code") == 0, ( + "create-backup failed on primary cluster" + ) + + # Standby backup should fail with explicit unsupported-operation message. + standby_unit = second_model.units[f"{DATABASE_APP_NAME}/0"] + for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(5), reraise=True): + with attempt: + standby_backup_action = await standby_unit.run_action("create-backup") + await standby_backup_action.wait() + assert standby_backup_action.status == "failed" + action_message = standby_backup_action.data.get("message", "") + assert EXPECTED_STANDBY_BACKUP_MESSAGE in action_message diff --git a/tests/spread/test_async_replication_backups_ceph.py/task.yaml b/tests/spread/test_async_replication_backups_ceph.py/task.yaml new file mode 100644 index 00000000000..e4787b9a070 --- /dev/null +++ b/tests/spread/test_async_replication_backups_ceph.py/task.yaml @@ -0,0 +1,9 @@ +summary: test_async_replication_backups_ceph.py +environment: + TEST_MODULE: ha_tests/test_async_replication_backups_ceph.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results +variants: + - -juju29 diff --git a/tests/unit/test_async_replication.py b/tests/unit/test_async_replication.py index 0174a0080d1..34425e790ce 100644 --- a/tests/unit/test_async_replication.py +++ b/tests/unit/test_async_replication.py @@ -732,22 +732,61 @@ def test_handle_replication_change(): relation._can_promote_cluster = MagicMock(return_value=False) result = relation._handle_replication_change(mock_event) assert result is False + # 2. mock_charm = MagicMock() mock_event = MagicMock() + mock_relation = MagicMock() + mock_relation.units = [] + relation = PostgreSQLAsyncReplication(mock_charm) relation._can_promote_cluster = MagicMock(return_value=True) - relation.get_system_identifier = MagicMock(return_value=(12345, "some error")) - result = relation._handle_replication_change(mock_event) + relation.get_system_identifier = MagicMock() + with patch.object( + PostgreSQLAsyncReplication, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + result = relation._handle_replication_change(mock_event) + assert result is False + relation.get_system_identifier.assert_not_called() + mock_event.fail.assert_called_once_with( + "All units from the other cluster must publish their pod addresses in the relation data." + ) # 3. mock_charm = MagicMock() mock_event = MagicMock() mock_relation = MagicMock() + mock_unit = MagicMock() + mock_unit.app = mock_relation.app + mock_relation.units = [mock_unit] + mock_relation.data = {mock_unit: {"unit-address": "10.0.0.1"}, mock_charm.app: {}} + + relation = PostgreSQLAsyncReplication(mock_charm) + relation._can_promote_cluster = MagicMock(return_value=True) + relation.get_system_identifier = MagicMock(return_value=(12345, "some error")) + with patch.object( + PostgreSQLAsyncReplication, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + result = relation._handle_replication_change(mock_event) + + assert result is False + + # 4. + mock_charm = MagicMock() + mock_event = MagicMock() + mock_relation = MagicMock() mock_unit1 = MagicMock() mock_unit2 = MagicMock() + mock_unit1.app = mock_relation.app + mock_unit2.app = mock_relation.app mock_relation.units = [mock_unit1, mock_unit2] mock_relation.data = { mock_unit1: {"unit-address": "10.0.0.1"}, @@ -756,14 +795,19 @@ def test_handle_replication_change(): } relation = PostgreSQLAsyncReplication(mock_charm) - relation._relation = mock_relation relation._can_promote_cluster = MagicMock(return_value=True) relation.get_system_identifier = MagicMock(return_value=(12345, None)) relation._get_highest_promoted_cluster_counter_value = MagicMock(return_value="1") relation._update_primary_cluster_data = MagicMock() relation._re_emit_async_relation_changed_event = MagicMock() - result = relation._handle_replication_change(mock_event) + with patch.object( + PostgreSQLAsyncReplication, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + result = relation._handle_replication_change(mock_event) assert result is True relation._can_promote_cluster.assert_called_once_with(mock_event) @@ -774,6 +818,42 @@ def test_handle_replication_change(): mock_event.fail.assert_not_called() +def test_re_emit_async_relation_changed_event(): + mock_charm = MagicMock() + relation = PostgreSQLAsyncReplication(mock_charm) + mock_relation = MagicMock() + mock_relation.name = "replication-offer" + mock_relation.app = MagicMock() + mock_relation.units = [] + + with patch.object( + PostgreSQLAsyncReplication, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + relation._re_emit_async_relation_changed_event() + + mock_charm.on.replication_offer_relation_changed.emit.assert_not_called() + + remote_unit = MagicMock() + remote_unit.app = mock_relation.app + mock_relation.units = [remote_unit] + with patch.object( + PostgreSQLAsyncReplication, + "_relation", + new_callable=PropertyMock, + return_value=mock_relation, + ): + relation._re_emit_async_relation_changed_event() + + mock_charm.on.replication_offer_relation_changed.emit.assert_called_once_with( + mock_relation, + app=mock_relation.app, + unit=remote_unit, + ) + + def test_handle_forceful_promotion(): # 1. mock_charm = MagicMock() diff --git a/tests/unit/test_backups.py b/tests/unit/test_backups.py index 93aec4c95d4..b57cf7a0144 100644 --- a/tests/unit/test_backups.py +++ b/tests/unit/test_backups.py @@ -14,7 +14,13 @@ from ops.testing import Harness from tenacity import RetryError, wait_fixed -from backups import ListBackupsError, PostgreSQLBackups +from backups import ( + STANDBY_CLUSTER_CREATE_BACKUP_ERROR_MESSAGE, + STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE, + STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE, + ListBackupsError, + PostgreSQLBackups, +) from charm import PostgresqlOperatorCharm from constants import PEER @@ -192,6 +198,7 @@ def test_can_initialise_stanza(harness): def test_can_unit_perform_backup(harness): with ( patch("charm.PostgreSQLBackups._are_backup_settings_ok") as _are_backup_settings_ok, + patch("charm.PostgreSQLBackups._is_standby_cluster") as _is_standby_cluster, patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, patch("ops.model.Application.planned_units") as _planned_units, patch( @@ -199,6 +206,14 @@ def test_can_unit_perform_backup(harness): ) as _is_primary, ): peer_rel_id = harness.model.get_relation(PEER).id + # Test when unit belongs to a standby cluster. + _is_standby_cluster.return_value = True + assert harness.charm.backup._can_unit_perform_backup() == ( + False, + STANDBY_CLUSTER_CREATE_BACKUP_ERROR_MESSAGE, + ) + + _is_standby_cluster.return_value = False # Test when the charm fails to retrieve the primary. _is_primary.side_effect = RetryError(last_attempt=1) assert harness.charm.backup._can_unit_perform_backup() == ( @@ -272,6 +287,33 @@ def test_can_unit_perform_backup(harness): assert harness.charm.backup._can_unit_perform_backup() == (True, None) +def test_is_standby_cluster(harness): + with ( + patch.object(harness.charm.backup.model, "get_relation") as _get_relation, + patch.object(harness.charm.async_replication, "is_primary_cluster") as _is_primary_cluster, + ): + # Test when async replication relation is not present. + _get_relation.side_effect = [None, None] + assert not harness.charm.backup._is_standby_cluster() + _is_primary_cluster.assert_not_called() + + # Test when relation is present and this is the primary cluster. + _get_relation.reset_mock() + _is_primary_cluster.reset_mock() + _get_relation.side_effect = [object()] + _is_primary_cluster.return_value = True + assert not harness.charm.backup._is_standby_cluster() + _is_primary_cluster.assert_called_once() + + # Test when relation is present and this is the standby cluster. + _get_relation.reset_mock() + _is_primary_cluster.reset_mock() + _get_relation.side_effect = [object()] + _is_primary_cluster.return_value = False + assert harness.charm.backup._is_standby_cluster() + _is_primary_cluster.assert_called_once() + + def test_can_use_s3_repository(harness): with ( patch("charm.Patroni.reload_patroni_configuration") as _reload_patroni_configuration, @@ -1422,10 +1464,21 @@ def test_on_list_backups_action(harness): patch( "charm.PostgreSQLBackups._generate_backup_list_output" ) as _generate_backup_list_output, + patch("charm.PostgreSQLBackups._is_standby_cluster") as _is_standby_cluster, patch("charm.PostgreSQLBackups._are_backup_settings_ok") as _are_backup_settings_ok, ): - # Test when not all backup settings are ok. + # Test when unit belongs to a standby cluster. mock_event = MagicMock() + _is_standby_cluster.return_value = True + harness.charm.backup._on_list_backups_action(mock_event) + mock_event.fail.assert_called_once_with(STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE) + _are_backup_settings_ok.assert_not_called() + _generate_backup_list_output.assert_not_called() + mock_event.set_results.assert_not_called() + + # Test when not all backup settings are ok. + mock_event.reset_mock() + _is_standby_cluster.return_value = False _are_backup_settings_ok.return_value = (False, "fake validation message") harness.charm.backup._on_list_backups_action(mock_event) mock_event.fail.assert_called_once() @@ -1751,10 +1804,19 @@ def test_on_restore_action(harness): def test_pre_restore_checks(harness): with ( patch("ops.model.Application.planned_units") as _planned_units, + patch("charm.PostgreSQLBackups._is_standby_cluster") as _is_standby_cluster, patch("charm.PostgreSQLBackups._are_backup_settings_ok") as _are_backup_settings_ok, ): - # Test when S3 parameters are not ok. + # Test when unit belongs to a standby cluster. mock_event = MagicMock(params={}) + _is_standby_cluster.return_value = True + assert not harness.charm.backup._pre_restore_checks(mock_event) + mock_event.fail.assert_called_once_with(STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE) + _are_backup_settings_ok.assert_not_called() + + # Test when S3 parameters are not ok. + mock_event.reset_mock() + _is_standby_cluster.return_value = False _are_backup_settings_ok.return_value = (False, "fake error message") assert not harness.charm.backup._pre_restore_checks(mock_event) mock_event.fail.assert_called_once()