Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@
FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE,
FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE,
]
STANDBY_CLUSTER_CREATE_BACKUP_ERROR_MESSAGE = (
"Backups are not supported on a standby cluster. "
"Run create-backup on the primary cluster instead."
)
STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE = (
"Backups are not supported on a standby cluster. "
"Run list-backups on the primary cluster instead."
)
STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE = (
"Restoring backups is not supported on a standby cluster. "
"Run restore on the primary cluster instead."
)


class ListBackupsError(Exception):
Expand Down Expand Up @@ -153,6 +165,9 @@ def _can_initialise_stanza(self) -> bool:

def _can_unit_perform_backup(self) -> tuple[bool, str | None]:
"""Validates whether this unit can perform a backup."""
if self._is_standby_cluster():
return False, STANDBY_CLUSTER_CREATE_BACKUP_ERROR_MESSAGE

if self.charm.is_blocked:
return False, "Unit is in a blocking state"

Expand Down Expand Up @@ -183,6 +198,16 @@ def _can_unit_perform_backup(self) -> tuple[bool, str | None]:

return self._are_backup_settings_ok()

def _is_standby_cluster(self) -> bool:
"""Return whether this unit belongs to a standby cluster."""
if (
self.model.get_relation("replication") is None
and self.model.get_relation("replication-offer") is None
):
return False

return not self.charm.async_replication.is_primary_cluster()

def can_use_s3_repository(self) -> tuple[bool, str | None]:
"""Returns whether the charm was configured to use another cluster repository."""
# Check model uuid
Expand Down Expand Up @@ -1060,6 +1085,11 @@ def _run_backup(

def _on_list_backups_action(self, event) -> None:
"""List the previously created backups."""
if self._is_standby_cluster():
logger.warning(STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE)
event.fail(STANDBY_CLUSTER_LIST_BACKUPS_ERROR_MESSAGE)
return

are_backup_settings_ok, validation_message = self._are_backup_settings_ok()
if not are_backup_settings_ok:
logger.warning(validation_message)
Expand Down Expand Up @@ -1239,6 +1269,11 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
Returns:
a boolean indicating whether restore should be run.
"""
if self._is_standby_cluster():
logger.error(f"Restore failed: {STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE}")
event.fail(STANDBY_CLUSTER_RESTORE_ERROR_MESSAGE)
return False

are_backup_settings_ok, validation_message = self._are_backup_settings_ok()
if not are_backup_settings_ok:
logger.error(f"Restore failed: {validation_message}")
Expand Down
35 changes: 27 additions & 8 deletions src/relations/async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,22 @@ def _handle_replication_change(self, event: ActionEvent) -> bool:
return False

relation = self._relation
if relation is None:
event.fail("Replication relation not found")
return False

# Check if all units from the other cluster published their pod IPs in the relation data.
# Ensure the relation has at least one remote unit before trying to process unit data.
remote_units = [unit for unit in relation.units if unit.app == relation.app]
if len(remote_units) == 0:
event.fail(
"All units from the other cluster must publish their pod addresses in the relation data."
)
return False

# Check if all units from the other cluster published their IPs in the relation data.
# If not, fail the action telling that all units must publish their pod addresses in the
# relation data.
for unit in relation.units:
for unit in remote_units:
if "unit-address" not in relation.data[unit]:
event.fail(
"All units from the other cluster must publish their pod addresses in the relation data."
Expand Down Expand Up @@ -641,12 +652,20 @@ def _primary_cluster_endpoint(self) -> str:

def _re_emit_async_relation_changed_event(self) -> None:
"""Re-emit the async relation changed event."""
relation = self._relation
getattr(self.charm.on, f"{relation.name.replace('-', '_')}_relation_changed").emit(
relation,
app=relation.app,
unit=next(unit for unit in relation.units if unit.app == relation.app),
)
if relation := self._relation:
relation_unit = next(
(unit for unit in relation.units if unit.app == relation.app), None
)
if relation_unit is None:
logger.debug(
"Skipping re-emitting relation-changed event: no related units found yet."
)
return
getattr(self.charm.on, f"{relation.name.replace('-', '_')}_relation_changed").emit(
relation,
app=relation.app,
unit=relation_unit,
)

def _reinitialise_pgdata(self) -> None:
"""Reinitialise the pgdata folder."""
Expand Down
16 changes: 13 additions & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import json
import logging
import os
import socket
import subprocess
import uuid

Expand All @@ -21,6 +20,17 @@
logger = logging.getLogger(__name__)


def _get_non_loopback_host_ip() -> str:
"""Return the host IP reachable by LXD containers."""
output = subprocess.run(
["ip", "-4", "route", "get", "1.1.1.1"],
check=True,
capture_output=True,
text=True,
).stdout
return output.split("src", maxsplit=1)[1].split()[0]


@pytest.fixture(scope="session")
def charm():
# Return str instead of pathlib.Path since python-libjuju's model.deploy(), juju deploy, and
Expand Down Expand Up @@ -151,7 +161,7 @@ def microceph():
],
check=True,
)
host_ip = socket.gethostbyname(socket.gethostname())
host_ip = _get_non_loopback_host_ip()
subprocess.run(
f'echo "subjectAltName = IP:{host_ip}" > ./extfile.cnf',
shell=True,
Expand Down Expand Up @@ -209,7 +219,7 @@ def microceph():
key_id = key["access_key"]
secret_key = key["secret_key"]
logger.info("Set up microceph")
host_ip = socket.gethostbyname(socket.gethostname())
host_ip = _get_non_loopback_host_ip()
result = subprocess.run(
"base64 -w0 ./ca.crt", shell=True, check=True, stdout=subprocess.PIPE, text=True
)
Expand Down
176 changes: 176 additions & 0 deletions tests/integration/ha_tests/test_async_replication_backups_ceph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
#!/usr/bin/env python3
# Copyright 2026 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
from asyncio import gather

import pytest
from juju.model import Model
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, retry_if_exception_message, stop_after_delay, wait_fixed

from .. import markers
from ..conftest import ConnectionInformation
from ..helpers import DATABASE_APP_NAME, get_leader_unit, get_primary
from .conftest import fast_forward

logger = logging.getLogger(__name__)

CLUSTER_SIZE = 3
FAST_INTERVAL = "10s"
IDLE_PERIOD = 5
TIMEOUT = 2000

PRIMARY_S3_APP = "s3-primary"
STANDBY_S3_APP = "s3-standby"
EXPECTED_STANDBY_BACKUP_MESSAGE = (
"Backups are not supported on a standby cluster. "
"Run create-backup on the primary cluster instead."
)


async def _ensure_postgresql_deployed(model: Model, charm: str) -> None:
"""Deploy PostgreSQL in the given model when it is not already present."""
if DATABASE_APP_NAME not in model.applications:
await model.deploy(charm, num_units=CLUSTER_SIZE, config={"profile": "testing"})


async def _configure_s3_integrator(
model: Model,
app_name_to_deploy: str,
microceph: ConnectionInformation,
) -> None:
"""Deploy and configure one S3 integrator app against microceph RGW."""
if app_name_to_deploy not in model.applications:
await model.deploy(
"s3-integrator", application_name=app_name_to_deploy, channel="1/stable"
)
await model.wait_for_idle(
apps=[app_name_to_deploy], idle_period=IDLE_PERIOD, timeout=TIMEOUT
)

await model.applications[app_name_to_deploy].set_config({
"endpoint": f"https://{microceph.host}",
"bucket": f"{app_name_to_deploy}-bucket",
"path": "/pg",
"region": "",
"s3-uri-style": "path",
"tls-ca-chain": microceph.cert,
})

action = await model.units.get(f"{app_name_to_deploy}/0").run_action(
"sync-s3-credentials",
**{
"access-key": microceph.access_key_id,
"secret-key": microceph.secret_access_key,
},
)
await action.wait()

await model.relate(DATABASE_APP_NAME, app_name_to_deploy)


@markers.juju3
@pytest.mark.abort_on_fail
async def test_standby_backup_rejected_with_clear_message(
ops_test: OpsTest,
first_model: Model,
second_model: Model,
microceph: ConnectionInformation,
charm: str,
) -> None:
"""Validate backup behavior with async replication and Ceph-backed S3 configuration.

This test mirrors the live scenario:
1. Two PostgreSQL clusters in separate models.
2. Ceph-backed s3-integrator configured in each model.
3. Async replication created between clusters.
4. Backup succeeds on primary cluster and fails with clear message on standby.
"""
await _ensure_postgresql_deployed(first_model, charm)
await _ensure_postgresql_deployed(second_model, charm)

await _configure_s3_integrator(first_model, PRIMARY_S3_APP, microceph)
await _configure_s3_integrator(second_model, STANDBY_S3_APP, microceph)

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
first_model.wait_for_idle(
apps=[DATABASE_APP_NAME, PRIMARY_S3_APP],
status="active",
idle_period=IDLE_PERIOD,
timeout=TIMEOUT,
),
second_model.wait_for_idle(
apps=[DATABASE_APP_NAME, STANDBY_S3_APP],
status="active",
idle_period=IDLE_PERIOD,
timeout=TIMEOUT,
),
)

# Cross-model replication wiring.
offer_command = f"offer {DATABASE_APP_NAME}:replication-offer replication-offer"
offer_rc, _, offer_stderr = await ops_test.juju(*offer_command.split())
assert offer_rc == 0, f"offer failed: {offer_stderr}"

consume_command = (
f"consume -m {second_model.info.name} admin/{first_model.info.name}.replication-offer"
)
consume_rc, consume_stdout, consume_stderr = await ops_test.juju(*consume_command.split())
assert consume_rc == 0, f"consume failed: {consume_stderr or consume_stdout}"

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
first_model.wait_for_idle(
apps=[DATABASE_APP_NAME, PRIMARY_S3_APP],
status="active",
idle_period=IDLE_PERIOD,
timeout=TIMEOUT,
),
second_model.wait_for_idle(
apps=[DATABASE_APP_NAME, STANDBY_S3_APP],
status="active",
idle_period=IDLE_PERIOD,
timeout=TIMEOUT,
),
)

for attempt in Retrying(
stop=stop_after_delay(180),
wait=wait_fixed(5),
retry=retry_if_exception_message(match='application "replication-offer" not found'),
reraise=True,
):
with attempt:
await second_model.relate(DATABASE_APP_NAME, "replication-offer")

# Promote first model as primary cluster in async replication.
leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=first_model)
assert leader_unit is not None, "No leader unit found in primary model"
create_replication = await leader_unit.run_action("create-replication")
await create_replication.wait()
assert create_replication.results.get("return-code") == 0, "create-replication failed"

# Primary backup should succeed.
primary_unit_name = await get_primary(ops_test, f"{DATABASE_APP_NAME}/0", model=first_model)
replica_unit_name = next(
unit.name
for unit in first_model.applications[DATABASE_APP_NAME].units
if unit.name != primary_unit_name
)
primary_backup_action = await first_model.units[replica_unit_name].run_action("create-backup")
await primary_backup_action.wait()
assert primary_backup_action.results.get("return-code") == 0, (
"create-backup failed on primary cluster"
)

# Standby backup should fail with explicit unsupported-operation message.
standby_unit = second_model.units[f"{DATABASE_APP_NAME}/0"]
for attempt in Retrying(stop=stop_after_delay(180), wait=wait_fixed(5), reraise=True):
with attempt:
standby_backup_action = await standby_unit.run_action("create-backup")
await standby_backup_action.wait()
assert standby_backup_action.status == "failed"
action_message = standby_backup_action.data.get("message", "")
assert EXPECTED_STANDBY_BACKUP_MESSAGE in action_message
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
summary: test_async_replication_backups_ceph.py
environment:
TEST_MODULE: ha_tests/test_async_replication_backups_ceph.py
execute: |
tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results"
artifacts:
- allure-results
variants:
- -juju29
Loading
Loading