diff --git a/tests/integration/adapters.py b/tests/integration/adapters.py new file mode 100644 index 0000000000..1892a63814 --- /dev/null +++ b/tests/integration/adapters.py @@ -0,0 +1,603 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Adapters which use Jubilant to provide libjuju-compliant API.""" + +import json +import logging +import secrets +import subprocess +from collections.abc import Callable, Generator, Iterable, Mapping +from contextlib import contextmanager +from dataclasses import dataclass +from functools import cached_property +from typing import Any, TypedDict + +from jubilant import ( + ConfigValue, + Juju, + Status, + Task, + TaskError, + all_active, + all_agents_idle, + any_error, +) +from jubilant.statustypes import UnitStatus + +logger = logging.getLogger(__name__) + + +TConstraints = Any +TDevices = Any +ShowUnitOutput = dict + + +class TStorageInfo(TypedDict): + """JSON type of Storage returned by `juju list-storage`.""" + + key: str + attachments: dict[str, dict] + kind: str + life: str + persistent: bool + + +@dataclass +class Endpoint: + """Data model for endpoint info of a relation.""" + + name: str + + +@dataclass +class RequiresInfo: + """Data model for requires info of a relation.""" + + application_name: str + name: str + + +@dataclass +class RelationInfo: + """Data model for `juju show-unit`:`relation-info` section.""" + + app: str + endpoint: str + related_endpoint: str + raw: dict[str, Any] + + @property + def endpoints(self) -> list[Endpoint]: + """Relation endpoints.""" + return [Endpoint(self.endpoint), Endpoint(self.related_endpoint)] + + @property + def is_peer(self) -> bool: + """Is this a peer relation?""" + apps = {_unit_name_to_app(unit_name) for unit_name in self.raw["related-units"]} + return not bool(apps - {self.app}) + + @property + def requires(self) -> RequiresInfo: + """Return the requires side info of the relation.""" + name = self.raw.get("related-endpoint", "") + app = "" + if related_units := self.raw.get("related-units", {}): + app = _unit_name_to_app(next(iter(related_units))) + + return RequiresInfo(name=name, application_name=app) + + +def _unit_name_to_app(name: str) -> str: + """Convert unit name to app name.""" + return name.split("/")[0] + + +def all_statuses_are(expected: str, status: Status, apps: Iterable[str]) -> bool: + """Return True if all units and apps have the `expected` status.""" + if not apps: + apps = status.apps + + for app in apps: + app_info = status.apps.get(app) + if app_info is None: + return False + if app_info.app_status.current != expected: + return False + for unit_info in status.get_units(app).values(): + if unit_info.workload_status.current != expected: + return False + return True + + +def all_active_idle(status: Status, *apps: str): + """Return True if all units are active|idle.""" + return all_agents_idle(status, *apps) and all_active(status, *apps) + + +class ActionAdapter: + """Action model adapter for libjuju.""" + + def __init__(self, task: Task, failed: bool = False): + self.task = task + self.status = "failed" if failed else "succeeded" + self.results = task.results + + def wait(self): + """Mock wait, since jubilant actions are sync.""" + return self + + +class UnitAdapter: + """Unit model adapter for libjuju.""" + + def __init__(self, name: str, app: str, status: UnitStatus, juju: Juju): + self.app = app + self.name = name + self.status = status + self._juju = juju + + def _update_status(self) -> None: + """Update unit status.""" + self.status = self._juju.status().apps[self.app].units[self.name] + + def is_leader_from_status(self) -> bool: + """Check to see if this unit is the leader.""" + return self.status.leader + + def run_action(self, action_name: str, **params): + """Run an action on this unit.""" + failed = False + try: + task = self._juju.run(self.name, action=action_name, params=dict(params)) + except TaskError as e: + task = e.task + failed = True + return ActionAdapter(task, failed=failed) + + def show(self) -> ShowUnitOutput: + """Return the parsed `show-unit` command.""" + raw = self._juju.cli("show-unit", "--format", "json", self.name) + return json.loads(raw).get(self.name, {}) + + def relation_info(self) -> dict[int, RelationInfo]: + """Return the unit `relation-info` for `juju show-unit` output.""" + ret = {} + for item in self.show().get("relation-info", []): + if not (_id := item.get("relation-id")): + continue + + ret[_id] = RelationInfo( + app=self.app, + endpoint=item.get("endpoint", ""), + related_endpoint=item.get("related-endpoint", ""), + raw=dict(item), + ) + + return ret + + @property + def public_address(self) -> str: + """Unit public address.""" + return self.status.public_address + + @property + def workload_status(self) -> str: + """Return workload status.""" + self._update_status() + return self.status.workload_status.current + + @property + def workload_status_message(self) -> str: + """Return workload status message.""" + self._update_status() + return self.status.workload_status.message + + +class ApplicationAdapter: + """Application model adapter for libjuju.""" + + def __init__(self, name: str, juju: Juju): + self.name = name + self._juju = juju + + def add_unit( + self, + count: int = 1, + to: str | Iterable[str] | None = None, + attach_storage: Iterable[str] = [], + ): + """Add one or more units to this application.""" + _attach_storage = attach_storage if attach_storage else None + self._juju.add_unit(self.name, num_units=count, to=to, attach_storage=_attach_storage) + + add_units = add_unit + + def destroy_unit(self, *unit_names: str): + """Destroy units by name.""" + self._juju.remove_unit(*unit_names, destroy_storage=True) + + destroy_units = destroy_unit + + def remove_relation( + self, local_relation: str, remote_relation: str, block_until_done: bool = False + ): + """Remove a relation to another application.""" + self._juju.remove_relation(local_relation, remote_relation) + + def set_config(self, config: Mapping[str, ConfigValue]): + """Set configuration options for this application.""" + self._juju.config(self.name, values=config) + + @property + def relations(self) -> list[RelationInfo]: + """Application relations.""" + return ModelAdapter.get_relations(self.units).values() + + @property + def units(self) -> list[UnitAdapter]: + """Application units.""" + units = self._juju.status().apps[self.name].units + return [ + UnitAdapter(name=unit_name, app=self.name, status=unit_status, juju=self._juju) + for unit_name, unit_status in units.items() + ] + + +class ModelAdapter: + """Adapter for libjuju `Model` objects.""" + + def __init__(self, juju: Juju, wait_delay: float = 3.0): + self._juju = juju + self._delay = wait_delay + + def add_secret(self, name: str, data_args: Iterable[str], file: str = "", info: str = ""): + """Adds a secret with a list of key values. + + Equivalent to the cli command: + juju add-secret [options] [key[#base64|#file]=value...] + + :param name str: The name of the secret to be added. + :param data_args []str: The key value pairs to be added into the secret. + :param file str: A path to a yaml file containing secret key values. + :param info str: The secret description. + """ + pass + + def block_until( + self, *conditions: Callable, timeout: float | None = None, wait_period: float = 0.5 + ): + """Return only after all conditions are true.""" + # set a large enough timeout if no timeout is provided + _timeout = timeout if timeout else 1800 # 30 min. + # Adjust delay proportional to timeout to restrict `juju status` calls to 180. + # Min. delay will be 5s. + _delay = max(5, _timeout // 180) + self._juju.wait( + lambda status: all(c() for c in conditions), + timeout=_timeout, + successes=1, + delay=_delay, + ) + + def deploy( + self, + entity_url: str, + application_name: str | None = None, + bind: dict[str, str] = {}, # noqa + channel: str | None = None, + config: dict[str, ConfigValue] | None = None, + constraints: TDevices = None, + force: bool = False, + num_units: int = 1, + overlays: list[str] | None = None, + base: str | None = None, + resources: dict[str, str] | None = None, + series: str | None = None, + revision: str | int | None = None, + storage: Mapping[str, str] | None = None, + to: str | None = None, + devices: TDevices = None, + trust: bool = False, + attach_storage: list[str] | None = None, + ) -> None: + """Deploy a new service or bundle. + + :param str entity_url: Charm or bundle to deploy. Charm url or file path + :param str application_name: Name to give the service + :param dict bind: : pairs + :param str channel: Charm store channel from which to retrieve + the charm or bundle, e.g. 'edge' + :param dict config: Charm configuration dictionary + :param constraints: Service constraints + :type constraints: :class:`juju.Constraints` + :param bool force: Allow charm to be deployed to a machine running + an unsupported series + :param int num_units: Number of units to deploy + :param [] overlays: Bundles to overlay on the primary bundle, applied in order + :param str base: The base on which to deploy + :param dict resources: : pairs + :param str series: Series on which to deploy DEPRECATED: use --base (with Juju 3.1) + :param int revision: specifying a revision requires a channel for future upgrades for charms. + For bundles, revision and channel are mutually exclusive. + :param dict storage: optional storage constraints, in the form of `{label: constraint}`. + The label is a string specified by the charm, while the constraint is + a constraints.StorageConstraintsDict, or a string following + `the juju storage constraint directive format `_, + specifying the storage pool, number of volumes, and size of each volume. + :param to: Placement directive as a string. For example: + + '23' - place on machine 23 + 'lxd:7' - place in new lxd container on machine 7 + '24/lxd/3' - place in container 3 on machine 24 + + If None, a new machine is provisioned. + :param devices: charm device constraints + :param bool trust: Trust signifies that the charm should be deployed + with access to trusted credentials. Hooks run by the charm can access + cloud credentials and other trusted access credentials. + + :param str[] attach_storage: Existing storage to attach to the deployed unit + (not available on k8s models) + """ + _overlays = list(overlays) if overlays else [] + # For compatibility with libjuju num_units=0 for subordinate charms + kwargs = {} + if num_units > 0: + kwargs = {"num_units": num_units} + self._juju.deploy( + entity_url, + app=application_name, + attach_storage=attach_storage, + base=base, + bind=bind, + channel=channel, + config=config, + constraints=constraints, + force=force, + overlays=_overlays, + resources=resources, + revision=revision, + storage=storage, + to=to, + trust=trust, + **kwargs, + ) + + def destroy_unit( + self, + unit_id: str, + destroy_storage: bool = False, + dry_run: bool = False, + force: bool = False, + max_wait: float | None = None, + ): + """Destroy units by name.""" + self._juju.remove_unit(unit_id, destroy_storage=destroy_storage, force=force) + + def list_storage(self, filesystem: bool = False, volume: bool = False) -> list[TStorageInfo]: + """Lists storage details.""" + raw = self._juju.cli("list-storage", "--format", "json") + json_ = json.loads(raw) + ret = [] + for storage_key, storage_details in json_.get("storage", {}).items(): + ret.append({"key": storage_key, **storage_details}) + + return ret + + def relate(self, relation1: str, relation2: str): + """The relate function is deprecated in favor of integrate. + + The logic is the same. + """ + self._juju.integrate(relation1, relation2) + + add_relation = relate + integrate = relate + + def remove_application( + self, + app_name: str, + block_until_done: bool = False, + force: bool = False, + destroy_storage: bool = False, + no_wait: bool = False, + timeout: float | None = None, + ) -> None: + """Removes the given application from the model. + + :param str app_name: Name of the application + :param bool force: Completely remove an application and all its dependencies. (=false) + :param bool destroy_storage: Destroy storage attached to application unit. (=false) + :param bool no_wait: Rush through application removal without waiting for each individual step to complete (=false) + :param bool block_until_done: Ensure the app is removed from the + model when returned + :param int timeout: Raise asyncio.exceptions.TimeoutError if the application is not removed + within the timeout period. + """ + self._juju.remove_application(app_name, destroy_storage=destroy_storage, force=force) + if not block_until_done: + return + + self._juju.wait( + lambda status: app_name not in status.apps, + delay=self._delay, + timeout=timeout, + ) + + def set_config(self, config: Mapping[str, ConfigValue]): + """Set configuration options for this application.""" + self._juju.model_config(values=config) + + # TODO: add support for wait_for_... args + def wait_for_idle( + self, + apps: Iterable[str] | None = None, + raise_on_error: bool = True, + raise_on_blocked: bool = False, + wait_for_active: bool = False, + timeout: float | None = 10 * 60, + idle_period: float = 15, + check_freq: float = 0.5, + status: str | None = None, + wait_for_at_least_units: int | None = None, + wait_for_exact_units: int | None = None, + ) -> None: + """Wait for applications in the model to settle into an idle state. + + :param Iterable[str]|None apps: Optional list of specific app names to wait on. + If given, all apps must be present in the model and idle, while other + apps in the model can still be busy. If not given, all apps currently + in the model must be idle. + + :param bool raise_on_error: If True, then any unit or app going into + "error" status immediately raises either a JujuAppError or a JujuUnitError. + Note that machine or agent failures will always raise an exception (either + JujuMachineError or JujuAgentError), regardless of this param. The default + is True. + + :param bool raise_on_blocked: If True, then any unit or app going into + "blocked" status immediately raises either a JujuAppError or a JujuUnitError. + The default is False. + + :param bool wait_for_active: If True, then also wait for all unit workload + statuses to be "active" as well. The default is False. + + :param float timeout: How long to wait, in seconds, for the bundle settles + before raising an asyncio.TimeoutError. If None, will wait forever. + The default is 10 minutes. + + :param float idle_period: How long, in seconds, the agent statuses of all + units of all apps need to be `idle`. This delay is used to ensure that + any pending hooks have a chance to start to avoid false positives. + The default is 15 seconds. + Exact behaviour is undefined for very small values and 0. + + :param float check_freq: How frequently, in seconds, to check the model. + The default is every half-second. + + :param str status: The status to wait for. If None, not waiting. + The default is None (not waiting for any status). + + :param int wait_for_at_least_units: The least number of units to go into the idle + state. wait_for_idle will return after that many units are available (across all the + given applications). + The default is 1 unit. + + :param int wait_for_exact_units: The exact number of units to be expected before + going into the idle state. (e.g. useful for scaling down). + When set, takes precedence over the `wait_for_units` parameter. + """ + + def _all_idle_with_status(juju_status: Status, *apps: str): + return all_agents_idle(juju_status, *apps) and all_statuses_are( + status, juju_status, *apps + ) + + if status == "active" or wait_for_active: + wait_func = all_active_idle + else: + wait_func = _all_idle_with_status + + error_func = any_error if raise_on_error else None + delay = check_freq if check_freq else self._delay + _apps = apps if apps else list(self._juju.status().apps) + + self._juju.wait( + lambda juju_status: wait_func(juju_status, *_apps), + error=error_func, + delay=delay, + timeout=timeout, + successes=(idle_period) // delay, + ) + + @property + def applications(self) -> dict[str, ApplicationAdapter]: + """Return a mapping of application name: Application objects.""" + apps = self._juju.status().apps + return {app: ApplicationAdapter(app, self._juju) for app in apps} + + @property + def relations(self) -> list[RelationInfo]: + """Return a map of relation-id:Relation for all relations currently in the model.""" + return self.get_relations(self.units.values()).values() + + @property + def units(self) -> dict[str, UnitAdapter]: + ret = {} + for app in self.applications.values(): + for unit in app.units: + ret[unit.name] = unit + return ret + + @staticmethod + def get_relations(units: Iterable[UnitAdapter]) -> dict[int, RelationInfo]: + """Return a map of relation-id: RelationInfo for all relations currently in the model.""" + ret = {} + for unit in units: + for rel_id, rel_info in unit.relation_info().items(): + ret[rel_id] = rel_info + + return ret + + +class LibjujuExtensions: + """python-libjuju extensions for Jubilant.""" + + def __init__(self, juju: Juju): + self._juju = juju + + @contextmanager + def fast_forward(self, fast_interval: str = "10s", slow_interval: str | None = None): + self._juju.model_config({"update-status-hook-interval": fast_interval}) + yield + interval = slow_interval or "5m" + self._juju.model_config({"update-status-hook-interval": interval}) + + @property + def model(self) -> ModelAdapter: + """python-libjuju model adapter.""" + return ModelAdapter(self._juju) + + +class JujuFixture(Juju): + def __init__(self, *, model=None, wait_timeout=3 * 60, cli_binary=None): + super().__init__(model=model, wait_timeout=wait_timeout, cli_binary=cli_binary) + + @cached_property + def ext(self) -> LibjujuExtensions: + """python-libjuju extensions.""" + return LibjujuExtensions(self) + + +@contextmanager +def temp_model_fixture( + keep: bool = False, + controller: str | None = None, + cloud: str | None = None, + config: Mapping[str, ConfigValue] | None = None, + credential: str | None = None, +) -> Generator[JujuFixture]: + """Context manager to create a temporary model for running tests in.""" + juju = JujuFixture() + model = "jubilant-" + secrets.token_hex(4) # 4 bytes (8 hex digits) should be plenty + juju.add_model(model, cloud=cloud, controller=controller, config=config, credential=credential) + try: + yield juju + finally: + if not keep: + assert juju.model is not None + try: + # We're not using juju.destroy_model() here, as Juju doesn't provide a way + # to specify the timeout for the entire model destruction operation. + args = ["destroy-model", juju.model, "--no-prompt", "--destroy-storage", "--force"] + juju._cli(*args, include_model=False, timeout=10 * 60) + juju.model = None + except subprocess.TimeoutExpired as exc: + logger.error( + "timeout destroying model: %s\nStdout:\n%s\nStderr:\n%s", + exc, + exc.stdout, + exc.stderr, + ) diff --git a/tests/integration/backup_helpers.py b/tests/integration/backup_helpers.py index c5ee62b248..831ca0ffbf 100644 --- a/tests/integration/backup_helpers.py +++ b/tests/integration/backup_helpers.py @@ -3,11 +3,10 @@ # See LICENSE file for licensing details. import logging -from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_attempt, wait_exponential -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( DATABASE_APP_NAME, db_connect, get_password, @@ -20,8 +19,8 @@ CANNOT_RESTORE_PITR = "cannot restore PITR, juju debug-log for details" -async def backup_deploy( - ops_test: OpsTest, +def backup_deploy( + juju: JujuFixture, s3_integrator_app_name: str, tls_certificates_app_name: str | None, tls_channel, @@ -32,51 +31,50 @@ async def backup_deploy( ) -> str: use_tls = all([tls_certificates_app_name, tls_channel]) # Deploy S3 Integrator and TLS Certificates Operator. - await ops_test.model.deploy(s3_integrator_app_name) + juju.ext.model.deploy(s3_integrator_app_name) if use_tls: - await ops_test.model.deploy(tls_certificates_app_name, channel=tls_channel) + juju.ext.model.deploy(tls_certificates_app_name, channel=tls_channel) # Deploy and relate PostgreSQL to S3 integrator (one database app for each cloud for now # as archive_mode is disabled after restoring the backup) and to TLS Certificates Operator # (to be able to create backups from replicas). database_app_name = f"{DATABASE_APP_NAME}-{cloud.lower()}" - await ops_test.model.deploy( + juju.ext.model.deploy( charm, application_name=database_app_name, num_units=2, - base=CHARM_BASE, config={"profile": "testing"}, ) if use_tls: - await ops_test.model.relate( + juju.ext.model.relate( f"{database_app_name}:client-certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.relate( + juju.ext.model.relate( f"{database_app_name}:peer-certificates", f"{tls_certificates_app_name}:certificates" ) - async with ops_test.fast_forward(fast_interval="60s"): - await ops_test.model.wait_for_idle(apps=[database_app_name], status="active", timeout=1000) + with juju.ext.fast_forward(fast_interval="60s"): + juju.ext.model.wait_for_idle(apps=[database_app_name], status="active", timeout=1000) # Configure and set access and secret keys. logger.info(f"configuring S3 integrator for {cloud}") - await ops_test.model.applications[s3_integrator_app_name].set_config(config) - action = await ops_test.model.units.get(f"{s3_integrator_app_name}/0").run_action( + juju.ext.model.applications[s3_integrator_app_name].set_config(config) + action = juju.ext.model.units.get(f"{s3_integrator_app_name}/0").run_action( "sync-s3-credentials", **credentials, ) - await action.wait() + action.wait() - await ops_test.model.relate(database_app_name, s3_integrator_app_name) - async with ops_test.fast_forward(fast_interval="60s"): - await ops_test.model.wait_for_idle( + juju.ext.model.relate(database_app_name, s3_integrator_app_name) + with juju.ext.fast_forward(fast_interval="60s"): + juju.ext.model.wait_for_idle( apps=[database_app_name, s3_integrator_app_name], status="active", timeout=1500 ) return database_app_name -async def backup_operations( - ops_test: OpsTest, +def backup_operations( + juju: JujuFixture, s3_integrator_app_name: str, tls_certificates_app_name: str | None, tls_channel, @@ -86,8 +84,8 @@ async def backup_operations( charm, ) -> None: """Basic set of operations for backup testing in different cloud providers.""" - database_app_name = await backup_deploy( - ops_test, + database_app_name = backup_deploy( + juju, s3_integrator_app_name, tls_certificates_app_name, tls_channel, @@ -97,15 +95,15 @@ async def backup_operations( charm, ) - primary = await get_primary(ops_test, f"{database_app_name}/0") - for unit in ops_test.model.applications[database_app_name].units: + primary = get_primary(juju, f"{database_app_name}/0") + for unit in juju.ext.model.applications[database_app_name].units: if unit.name != primary: replica = unit.name break # Write some data. - password = await get_password(ops_test, database_app_name=database_app_name) - address = get_unit_address(ops_test, primary) + password = get_password(database_app_name=database_app_name) + address = get_unit_address(juju, primary) logger.info("creating a table in the database") with db_connect(host=address, password=password) as connection: connection.autocommit = True @@ -116,24 +114,24 @@ async def backup_operations( # Run the "create backup" action. logger.info("creating a backup") - action = await ops_test.model.units.get(replica).run_action("create-backup") - await action.wait() + action = juju.ext.model.units.get(replica).run_action("create-backup") + action.wait() backup_status = action.results.get("backup-status") assert backup_status, "backup hasn't succeeded" - await ops_test.model.wait_for_idle( + juju.ext.model.wait_for_idle( apps=[database_app_name, s3_integrator_app_name], status="active", timeout=1000 ) # With a stable cluster, Run the "create backup" action - async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=30) + with juju.ext.fast_forward(): + juju.ext.model.wait_for_idle(status="active", timeout=1000, idle_period=30) logger.info("listing the available backups") - action = await ops_test.model.units.get(replica).run_action("list-backups") - await action.wait() + action = juju.ext.model.units.get(replica).run_action("list-backups") + action.wait() backups = action.results.get("backups") # 5 lines for header output, 1 backup line ==> 6 total lines assert len(backups.split("\n")) == 6, "full backup is not outputted" - await ops_test.model.wait_for_idle(status="active", timeout=1000) + juju.ext.model.wait_for_idle(status="active", timeout=1000) # Write some data. logger.info("creating a second table in the database") @@ -144,23 +142,23 @@ async def backup_operations( # Run the "create backup" action. logger.info("creating a backup") - action = await ops_test.model.units.get(replica).run_action( + action = juju.ext.model.units.get(replica).run_action( "create-backup", **{"type": "differential"} ) - await action.wait() + action.wait() backup_status = action.results.get("backup-status") assert backup_status, "backup hasn't succeeded" - async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(status="active", timeout=1000) + with juju.ext.fast_forward(): + juju.ext.model.wait_for_idle(status="active", timeout=1000) # Run the "list backups" action. logger.info("listing the available backups") - action = await ops_test.model.units.get(replica).run_action("list-backups") - await action.wait() + action = juju.ext.model.units.get(replica).run_action("list-backups") + action.wait() backups = action.results.get("backups") # 5 lines for header output, 2 backup lines ==> 7 total lines assert len(backups.split("\n")) == 7, "differential backup is not outputted" - await ops_test.model.wait_for_idle(status="active", timeout=1000) + juju.ext.model.wait_for_idle(status="active", timeout=1000) # Write some data. logger.info("creating a second table in the database") @@ -169,13 +167,13 @@ async def backup_operations( connection.cursor().execute("CREATE TABLE backup_table_3 (test_collumn INT );") connection.close() # Scale down to be able to restore. - async with ops_test.fast_forward(): - await ops_test.model.destroy_unit(replica) - await ops_test.model.block_until( - lambda: len(ops_test.model.applications[database_app_name].units) == 1 + with juju.ext.fast_forward(): + juju.ext.model.destroy_unit(replica) + juju.ext.model.block_until( + lambda: len(juju.ext.model.applications[database_app_name].units) == 1 ) - for unit in ops_test.model.applications[database_app_name].units: + for unit in juju.ext.model.applications[database_app_name].units: remaining_unit = unit break @@ -187,19 +185,19 @@ async def backup_operations( logger.info("restoring the backup") last_diff_backup = backups.split("\n")[-1] backup_id = last_diff_backup.split()[0] - action = await remaining_unit.run_action("restore", **{"backup-id": backup_id}) - await action.wait() + action = remaining_unit.run_action("restore", **{"backup-id": backup_id}) + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "restore hasn't succeeded" # Wait for the restore to complete. - async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(status="active", timeout=1000) + with juju.ext.fast_forward(): + juju.ext.model.wait_for_idle(status="active", timeout=1000) # Check that the backup was correctly restored by having only the first created table. logger.info("checking that the backup was correctly restored") - primary = await get_primary(ops_test, remaining_unit.name) - address = get_unit_address(ops_test, primary) + primary = get_primary(juju, remaining_unit.name) + address = get_unit_address(juju, primary) with db_connect(host=address, password=password) as connection, connection.cursor() as cursor: cursor.execute( "SELECT EXISTS (SELECT FROM information_schema.tables" @@ -232,18 +230,18 @@ async def backup_operations( logger.info("restoring the backup") last_full_backup = backups.split("\n")[-2] backup_id = last_full_backup.split()[0] - action = await remaining_unit.run_action("restore", **{"backup-id": backup_id}) - await action.wait() + action = remaining_unit.run_action("restore", **{"backup-id": backup_id}) + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "restore hasn't succeeded" # Wait for the restore to complete. - async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle(status="active", timeout=1000) + with juju.ext.fast_forward(): + juju.ext.model.wait_for_idle(status="active", timeout=1000) # Check that the backup was correctly restored by having only the first created table. - primary = await get_primary(ops_test, remaining_unit.name) - address = get_unit_address(ops_test, primary) + primary = get_primary(juju, remaining_unit.name) + address = get_unit_address(juju, primary) logger.info("checking that the backup was correctly restored") with db_connect(host=address, password=password) as connection, connection.cursor() as cursor: cursor.execute( @@ -270,8 +268,8 @@ async def backup_operations( connection.close() -async def pitr_backup_operations( - ops_test: OpsTest, +def pitr_backup_operations( + juju: JujuFixture, s3_integrator_app_name: str, tls_certificates_app_name: str | None, tls_channel, @@ -289,8 +287,8 @@ async def pitr_backup_operations( 4: check_td1 -> check_not_td2 -> check_td3 -> check_not_td4 """ use_tls = all([tls_certificates_app_name, tls_channel]) - database_app_name = await backup_deploy( - ops_test, + database_app_name = backup_deploy( + juju, s3_integrator_app_name, tls_certificates_app_name, tls_channel, @@ -300,24 +298,24 @@ async def pitr_backup_operations( charm, ) - primary = await get_primary(ops_test, f"{database_app_name}/0") - for unit in ops_test.model.applications[database_app_name].units: + primary = get_primary(juju, f"{database_app_name}/0") + for unit in juju.ext.model.applications[database_app_name].units: if unit.name != primary: replica = unit.name break - password = await get_password(ops_test, database_app_name=database_app_name) - address = get_unit_address(ops_test, primary) + password = get_password(database_app_name=database_app_name) + address = get_unit_address(juju, primary) logger.info("1: creating table") _create_table(address, password) logger.info("1: creating backup b1") - action = await ops_test.model.units.get(replica).run_action("create-backup") - await action.wait() + action = juju.ext.model.units.get(replica).run_action("create-backup") + action.wait() backup_status = action.results.get("backup-status") assert backup_status, "backup hasn't succeeded" - await ops_test.model.wait_for_idle(status="active", timeout=1000) - backup_b1 = await _get_most_recent_backup(ops_test, ops_test.model.units.get(replica)) + juju.ext.model.wait_for_idle(status="active", timeout=1000) + backup_b1 = _get_most_recent_backup(juju, juju.ext.model.units.get(replica)) logger.info("1: creating test data td1") _insert_test_data("test_data_td1", address, password) @@ -339,30 +337,30 @@ async def pitr_backup_operations( _switch_wal(address, password) logger.info("1: scaling down to do restore") - async with ops_test.fast_forward(): - await ops_test.model.destroy_unit(replica) - await ops_test.model.wait_for_idle(status="active", timeout=1000) - for unit in ops_test.model.applications[database_app_name].units: + with juju.ext.fast_forward(): + juju.ext.model.destroy_unit(replica) + juju.ext.model.wait_for_idle(status="active", timeout=1000) + for unit in juju.ext.model.applications[database_app_name].units: remaining_unit = unit break logger.info("1: restoring the backup b1 with bad restore-to-time parameter") - action = await remaining_unit.run_action( + action = remaining_unit.run_action( "restore", **{"backup-id": backup_b1, "restore-to-time": "bad data"} ) - await action.wait() + action.wait() assert action.status == "failed", ( "1: restore must fail with bad restore-to-time parameter, but that action succeeded" ) logger.info("1: restoring the backup b1 with unreachable restore-to-time parameter") - action = await remaining_unit.run_action( + action = remaining_unit.run_action( "restore", **{"backup-id": backup_b1, "restore-to-time": unreachable_timestamp_ts1} ) - await action.wait() + action.wait() logger.info("1: waiting for the database charm to become blocked after restore") - async with ops_test.fast_forward(): - await ops_test.model.block_until( + with juju.ext.fast_forward(): + juju.ext.model.block_until( lambda: remaining_unit.workload_status_message == CANNOT_RESTORE_PITR, timeout=1000, ) @@ -375,18 +373,16 @@ async def pitr_backup_operations( ): with attempt: logger.info("1: restoring to the timestamp ts1") - action = await remaining_unit.run_action( - "restore", **{"restore-to-time": timestamp_ts1} - ) - await action.wait() + action = remaining_unit.run_action("restore", **{"restore-to-time": timestamp_ts1}) + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "1: restore to the timestamp ts1 hasn't succeeded" - await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=30) + juju.ext.model.wait_for_idle(status="active", timeout=1000, idle_period=30) logger.info("2: successful restore") - primary = await get_primary(ops_test, remaining_unit.name) - address = get_unit_address(ops_test, primary) - timeline_t2 = await _get_most_recent_backup(ops_test, remaining_unit) + primary = get_primary(juju, remaining_unit.name) + address = get_unit_address(juju, primary) + timeline_t2 = _get_most_recent_backup(juju, remaining_unit) assert backup_b1 != timeline_t2, "2: timeline 2 do not exist in list-backups action or bad" logger.info("2: checking test data td1") @@ -417,18 +413,18 @@ async def pitr_backup_operations( ): with attempt: logger.info("2: restoring the backup b1 to the latest") - action = await remaining_unit.run_action( + action = remaining_unit.run_action( "restore", **{"backup-id": backup_b1, "restore-to-time": "latest"} ) - await action.wait() + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "2: restore the backup b1 to the latest hasn't succeeded" - await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=30) + juju.ext.model.wait_for_idle(status="active", timeout=1000, idle_period=30) logger.info("3: successful restore") - primary = await get_primary(ops_test, remaining_unit.name) - address = get_unit_address(ops_test, primary) - timeline_t3 = await _get_most_recent_backup(ops_test, remaining_unit) + primary = get_primary(juju, remaining_unit.name) + address = get_unit_address(juju, primary) + timeline_t3 = _get_most_recent_backup(juju, remaining_unit) assert backup_b1 != timeline_t3 and timeline_t2 != timeline_t3, ( "3: timeline 3 do not exist in list-backups action or bad" ) @@ -457,18 +453,18 @@ async def pitr_backup_operations( ): with attempt: logger.info("3: restoring the timeline 2 to the latest") - action = await remaining_unit.run_action( + action = remaining_unit.run_action( "restore", **{"backup-id": timeline_t2, "restore-to-time": "latest"} ) - await action.wait() + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "3: restore the timeline 2 to the latest hasn't succeeded" - await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=30) + juju.ext.model.wait_for_idle(status="active", timeout=1000, idle_period=30) logger.info("4: successful restore") - primary = await get_primary(ops_test, remaining_unit.name) - address = get_unit_address(ops_test, primary) - timeline_t4 = await _get_most_recent_backup(ops_test, remaining_unit) + primary = get_primary(juju, remaining_unit.name) + address = get_unit_address(juju, primary) + timeline_t4 = _get_most_recent_backup(juju, remaining_unit) assert ( backup_b1 != timeline_t4 and timeline_t2 != timeline_t4 and timeline_t3 != timeline_t4 ), "4: timeline 4 do not exist in list-backups action or bad" @@ -495,18 +491,16 @@ async def pitr_backup_operations( ): with attempt: logger.info("4: restoring to the timestamp ts2") - action = await remaining_unit.run_action( - "restore", **{"restore-to-time": timestamp_ts2} - ) - await action.wait() + action = remaining_unit.run_action("restore", **{"restore-to-time": timestamp_ts2}) + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "4: restore to the timestamp ts2 hasn't succeeded" - await ops_test.model.wait_for_idle(status="active", timeout=1000, idle_period=30) + juju.ext.model.wait_for_idle(status="active", timeout=1000, idle_period=30) logger.info("5: successful restore") - primary = await get_primary(ops_test, remaining_unit.name) - address = get_unit_address(ops_test, primary) - timeline_t5 = await _get_most_recent_backup(ops_test, remaining_unit) + primary = get_primary(juju, remaining_unit.name) + address = get_unit_address(juju, primary) + timeline_t5 = _get_most_recent_backup(juju, remaining_unit) assert ( backup_b1 != timeline_t5 and timeline_t2 != timeline_t5 @@ -531,10 +525,10 @@ async def pitr_backup_operations( ) # Remove the database app. - await ops_test.model.remove_application(database_app_name, block_until_done=True) + juju.ext.model.remove_application(database_app_name, block_until_done=True) if use_tls: # Remove the TLS operator. - await ops_test.model.remove_application(tls_certificates_app_name, block_until_done=True) + juju.ext.model.remove_application(tls_certificates_app_name, block_until_done=True) def _create_table(host: str, password: str): @@ -572,12 +566,12 @@ def _switch_wal(host: str, password: str): connection.close() -async def _get_most_recent_backup(ops_test: OpsTest, unit: any) -> str: +def _get_most_recent_backup(juju: JujuFixture, unit: any) -> str: logger.info("listing the available backups") - action = await unit.run_action("list-backups") - await action.wait() + action = unit.run_action("list-backups") + action.wait() backups = action.results.get("backups") assert backups, "backups not outputted" - await ops_test.model.wait_for_idle(status="active", timeout=1000) + juju.ext.model.wait_for_idle(status="active", timeout=1000) most_recent_backup = backups.split("\n")[-1] return most_recent_backup.split()[0] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 3ed241fe5f..a2091022fc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -9,11 +9,11 @@ import uuid import boto3 -import jubilant import pytest from pytest_operator.plugin import OpsTest from . import architecture +from .adapters import JujuFixture, temp_model_fixture from .helpers import construct_endpoint from .jubilant_helpers import RoleAttributeValue @@ -74,7 +74,7 @@ def cleanup_cloud(config: dict[str, str], credentials: dict[str, str]) -> None: @pytest.fixture(scope="module") -async def aws_cloud_configs(ops_test: OpsTest) -> None: +def aws_cloud_configs(ops_test: OpsTest): if ( not os.environ.get("AWS_ACCESS_KEY", "").strip() or not os.environ.get("AWS_SECRET_KEY", "").strip() @@ -89,7 +89,7 @@ async def aws_cloud_configs(ops_test: OpsTest) -> None: @pytest.fixture(scope="module") -async def gcp_cloud_configs(ops_test: OpsTest) -> None: +def gcp_cloud_configs(ops_test: OpsTest): if ( not os.environ.get("GCP_ACCESS_KEY", "").strip() or not os.environ.get("GCP_SECRET_KEY", "").strip() @@ -113,10 +113,10 @@ def juju(request: pytest.FixtureRequest): keep_models = bool(request.config.getoption("--keep-models")) if model: - juju = jubilant.Juju(model=model) + juju = JujuFixture(model=model) yield juju else: - with jubilant.temp_model(keep=keep_models) as juju: + with temp_model_fixture(keep=keep_models) as juju: yield juju diff --git a/tests/integration/jubilant_helpers.py b/tests/integration/jubilant_helpers.py index 6af8eef24a..51653ede63 100644 --- a/tests/integration/jubilant_helpers.py +++ b/tests/integration/jubilant_helpers.py @@ -1,17 +1,40 @@ # Copyright 2025 Canonical Ltd. # See LICENSE file for licensing details. +import itertools import json import logging +import os +import random import subprocess +import tempfile import time +import zipfile +from datetime import datetime from enum import Enum +from pathlib import Path +import botocore import jubilant import psycopg2 - -from constants import PEER - +import pytest +import requests +import yaml +from tenacity import ( + RetryError, + Retrying, + retry, + retry_if_result, + stop_after_attempt, + stop_after_delay, + wait_exponential, + wait_fixed, +) + +from constants import DATABASE_DEFAULT_NAME, PEER, SYSTEM_USERS_PASSWORD_CONFIG + +from .adapters import JujuFixture, ModelAdapter, UnitAdapter +from .ha_tests.helpers import ProcessError from .helpers import DATABASE_APP_NAME, SecretNotFoundError logger = logging.getLogger(__name__) @@ -431,3 +454,1255 @@ def check_for_fix_log_message(juju: jubilant.Juju, unit_name: str) -> bool: "This may indicate the code path was not triggered or permissions were already correct." ) return False + + +################################################# +# # +# Adapted Helpers # +# # +################################################# + + +CHARM_BASE = "ubuntu@22.04" +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) +DATABASE_APP_NAME = METADATA["name"] +STORAGE_PATH = METADATA["storage"]["data"]["location"] +APPLICATION_NAME = "postgresql-test-app" +DATA_INTEGRATOR_APP_NAME = "data-integrator" + + +logger = logging.getLogger(__name__) + + +def build_connection_string( + juju: JujuFixture, + application_name: str, + relation_name: str, + read_only_endpoint: bool = False, + remote_unit_name: str | None = None, +) -> str | None: + """Returns a PostgreSQL connection string. + + Args: + juju: The Juju fixture + application_name: The name of the application + relation_name: name of the relation to get connection data from + read_only_endpoint: whether to choose the read-only endpoint + instead of the read/write endpoint + remote_unit_name: Optional remote unit name used to retrieve + unit data instead of application data + + Returns: + a PostgreSQL connection string + """ + unit_name = f"{application_name}/0" + raw_data = juju.cli("show-unit", unit_name) + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {unit_name}") + data = yaml.safe_load(raw_data) + # Filter the data based on the relation name. + relation_data = [ + v for v in data[unit_name]["relation-info"] if v["related-endpoint"] == relation_name + ] + if len(relation_data) == 0: + raise ValueError( + f"no relation data could be grabbed on relation with endpoint {relation_name}" + ) + if remote_unit_name: + data = relation_data[0]["related-units"][remote_unit_name]["data"] + else: + data = relation_data[0]["application-data"] + if read_only_endpoint: + if data.get("standbys") is None: + return None + return data.get("standbys").split(",")[0] + else: + return data.get("master") + + +def change_primary_start_timeout( + juju: JujuFixture, unit_name: str, seconds: int | None, password: str +) -> None: + """Change primary start timeout configuration. + + Args: + juju: juju instance. + unit_name: the unit used to set the configuration. + seconds: number of seconds to set in primary_start_timeout configuration. + password: Patroni password. + """ + for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): + with attempt: + unit_ip = get_unit_address(juju, unit_name) + requests.patch( + f"https://{unit_ip}:8008/config", + json={"primary_start_timeout": seconds}, + verify=False, + auth=requests.auth.HTTPBasicAuth("patroni", password), + ) + + +def get_patroni_cluster(unit_ip: str) -> dict[str, str]: + resp = requests.get(f"https://{unit_ip}:8008/cluster", verify=False) + return resp.json() + + +def assert_sync_standbys(unit_ip: str, standbys: int) -> None: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3), reraise=True): + with attempt: + cluster = get_patroni_cluster(unit_ip) + cluster_standbys = 0 + for member in cluster["members"]: + if member["role"] == "sync_standby": + cluster_standbys += 1 + assert cluster_standbys >= standbys, "Less than expected standbys" + + +def check_database_users_existence( + juju: JujuFixture, + users_that_should_exist: list[str], + users_that_should_not_exist: list[str], +) -> None: + """Checks that applications users exist in the database. + + Args: + juju: The Juju fixture + users_that_should_exist: List of users that should exist in the database + users_that_should_not_exist: List of users that should not exist in the database + """ + unit = juju.ext.model.applications[DATABASE_APP_NAME].units[0] + unit_address = unit.get_public_address() + password = get_password() + + # Retrieve all users in the database. + users_in_db = execute_query_on_unit( + unit_address, + password, + "SELECT usename FROM pg_catalog.pg_user;", + ) + + # Assert users that should exist. + for user in users_that_should_exist: + assert user in users_in_db + + # Assert users that should not exist. + for user in users_that_should_not_exist: + assert user not in users_in_db + + +def check_databases_creation(juju: JujuFixture, databases: list[str]) -> None: + """Checks that database and tables are successfully created for the application. + + Args: + juju: The Juju fixture + databases: List of database names that should have been created + """ + password = get_password() + + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units: + unit_address = unit.public_address + + for database in databases: + # Ensure database exists in PostgreSQL. + output = execute_query_on_unit( + unit_address, + password, + "SELECT datname FROM pg_database;", + ) + assert database in output + + # Ensure that application tables exist in the database + output = execute_query_on_unit( + unit_address, + password, + "SELECT table_name FROM information_schema.tables;", + database=database, + ) + assert len(output) + + +@retry( + retry=retry_if_result(lambda x: not x), + stop=stop_after_attempt(10), + wait=wait_exponential(multiplier=1, min=2, max=30), +) +def check_patroni(juju: JujuFixture, unit_name: str, restart_time: float) -> bool: + """Check if Patroni is running correctly on a specific unit. + + Args: + juju: The Juju fixture instance + unit_name: The name of the unit + restart_time: Point in time before the unit was restarted. + + Returns: + whether Patroni is running correctly. + """ + unit_ip = get_unit_address(juju, unit_name) + health_info = requests.get(f"https://{unit_ip}:8008/health", verify=False).json() + postmaster_start_time = datetime.strptime( + health_info["postmaster_start_time"], "%Y-%m-%d %H:%M:%S.%f%z" + ).timestamp() + return postmaster_start_time > restart_time and health_info["state"] == "running" + + +def check_cluster_members(juju: JujuFixture, application_name: str) -> None: + """Check that the correct members are part of the cluster. + + Args: + juju: The Juju fixture instance + application_name: The name of the application + """ + for attempt in Retrying( + stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + any_unit_name = juju.ext.model.applications[application_name].units[0].name + primary = get_primary(juju, any_unit_name) + address = get_unit_address(juju, primary) + + expected_members = get_application_units(juju, application_name) + expected_members_ips = get_application_units_ips(juju, application_name) + + r = requests.get(f"https://{address}:8008/cluster", verify=False) + assert [member["name"] for member in r.json()["members"]] == expected_members + assert [member["host"] for member in r.json()["members"]] == expected_members_ips + + +def construct_endpoint(endpoint: str, region: str) -> str: + """Construct the S3 service endpoint using the region. + + This is needed when the provided endpoint is from AWS, and it doesn't contain the region. + """ + # Load endpoints data. + loader = botocore.loaders.create_loader() + data = loader.load_data("endpoints") + + # Construct the endpoint using the region. + resolver = botocore.regions.EndpointResolver(data) + endpoint_data = resolver.construct_endpoint("s3", region) + + # Use the built endpoint if it is an AWS endpoint. + if endpoint_data and endpoint.endswith(endpoint_data["dnsSuffix"]): + endpoint = f"{endpoint.split('://')[0]}://{endpoint_data['hostname']}" + + return endpoint + + +def convert_records_to_dict(records: list[tuple]) -> dict: + """Converts psycopg2 records list to a dict.""" + records_dict = {} + for record in records: + # Add record tuple data to dict. + records_dict[record[0]] = record[1] + return records_dict + + +def count_switchovers(juju: JujuFixture, unit_name: str) -> int: + """Return the number of performed switchovers.""" + unit_address = get_unit_address(juju, unit_name) + switchover_history_info = requests.get(f"https://{unit_address}:8008/history", verify=False) + return len(switchover_history_info.json()) + + +def db_connect( + host: str, password: str, username: str = "operator", database: str = "postgres" +) -> psycopg2.extensions.connection: + """Returns psycopg2 connection object linked to postgres db in the given host. + + Args: + host: the IP of the postgres host + password: user password + username: username to connect with + database: database to connect to + + Returns: + psycopg2 connection object linked to postgres db, under "operator" user. + """ + return psycopg2.connect( + f"dbname='{database}' user='{username}' host='{host}' password='{password}' connect_timeout=10" + ) + + +def deploy_and_relate_application_with_postgresql( + juju: JujuFixture, + charm: str, + application_name: str, + number_of_units: int, + config: dict | None = None, + channel: str = "stable", + relation: str = "db", + series: str | None = None, +) -> int: + """Helper function to deploy and relate application with PostgreSQL. + + Args: + juju: The Juju fixture. + charm: Charm identifier. + application_name: The name of the application to deploy. + number_of_units: The number of units to deploy. + config: Extra config options for the application. + channel: The channel to use for the charm. + relation: Name of the PostgreSQL relation to relate + the application to. + series: Series of the charm to deploy. + + Returns: + the id of the created relation. + """ + # Deploy application. + juju.ext.model.deploy( + charm, + channel=channel, + application_name=application_name, + num_units=number_of_units, + config=config, + series=series, + ) + juju.ext.model.wait_for_idle( + apps=[application_name], + status="active", + raise_on_blocked=False, + timeout=1500, + ) + + # Relate application to PostgreSQL. + relation = juju.ext.model.relate(f"{application_name}", f"{DATABASE_APP_NAME}:{relation}") + juju.ext.model.wait_for_idle( + apps=[application_name], + status="active", + raise_on_blocked=False, # Application that needs a relation is blocked initially. + timeout=1500, + ) + + return relation.id + + +def deploy_and_relate_bundle_with_postgresql( + juju: JujuFixture, + bundle_name: str, + main_application_name: str, + main_application_num_units: int | None = None, + relation_name: str = "db", + status: str = "active", + status_message: str | None = None, + overlay: dict | None = None, + timeout: int = 2000, +) -> str: + """Helper function to deploy and relate a bundle with PostgreSQL. + + Args: + juju: The Juju fixture. + bundle_name: The name of the bundle to deploy. + main_application_name: The name of the application that should be + related to PostgreSQL. + main_application_num_units: Optional number of units for the main + application. + relation_name: The name of the relation to use in PostgreSQL + (db or db-admin). + status: Status to wait for in the application after relating + it to PostgreSQL. + status_message: Status message to wait for in the application after + relating it to PostgreSQL. + overlay: Optional overlay to be used when deploying the bundle. + timeout: Timeout to wait for the deployment to idle. + """ + # Deploy the bundle. + with tempfile.NamedTemporaryFile(dir=os.getcwd()) as original: + # Download the original bundle. + juju.cli("download", bundle_name, "--filepath", original.name, include_model=False) + + # Open the bundle compressed file and update the contents + # of the bundle.yaml file to deploy it. + with zipfile.ZipFile(original.name, "r") as archive: + bundle_yaml = archive.read("bundle.yaml") + data = yaml.load(bundle_yaml, Loader=yaml.FullLoader) + + if main_application_num_units is not None: + data["applications"][main_application_name]["num_units"] = ( + main_application_num_units + ) + + # Save the list of relations other than `db` and `db-admin`, + # so we can add them back later. + other_relations = [ + relation for relation in data["relations"] if "postgresql" in relation + ] + + # Remove PostgreSQL and relations with it from the bundle.yaml file. + config = data["applications"]["postgresql"]["options"] + if config.get("experimental_max_connections", 0) > 200: + config["experimental_max_connections"] = 200 + for key, val in config.items(): + config[key] = str(val) + logger.info(f"Bundle {bundle_name} needs configuration {config}") + juju.ext.model.applications[DATABASE_APP_NAME].set_config(config) + del data["applications"]["postgresql"] + data["relations"] = [ + relation + for relation in data["relations"] + if "postgresql" not in relation + and "postgresql:db" not in relation + and "postgresql:db-admin" not in relation + ] + + # Write the new bundle content to a temporary file and deploy it. + with tempfile.NamedTemporaryFile(dir=os.getcwd()) as patched: + patched.write(yaml.dump(data).encode("utf_8")) + patched.seek(0) + if overlay is not None: + with tempfile.NamedTemporaryFile() as overlay_file: + overlay_file.write(yaml.dump(overlay).encode("utf_8")) + overlay_file.seek(0) + juju.deploy(patched.name, overlays=[overlay_file.name]) + else: + juju.deploy(patched.name) + + with juju.ext.fast_forward(fast_interval="60s"): + # Relate application to PostgreSQL. + relation = juju.ext.model.relate( + main_application_name, f"{DATABASE_APP_NAME}:{relation_name}" + ) + + # Restore previous existing relations. + for other_relation in other_relations: + juju.ext.model.relate(other_relation[0], other_relation[1]) + + # Wait for the deployment to complete. + unit = juju.ext.model.units.get(f"{main_application_name}/0") + juju.ext.model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + timeout=timeout, + ) + juju.ext.model.wait_for_idle( + apps=[main_application_name], + raise_on_blocked=False, + status=status, + timeout=timeout, + ) + if status_message: + juju.ext.model.block_until( + lambda: unit.workload_status_message == status_message, + timeout=timeout, + ) + + return relation.id # FIXME + + +def ensure_correct_relation_data( + juju: JujuFixture, database_units: int, app_name: str, relation_name: str +) -> None: + """Asserts that the correct database relation data is shared from the right unit to the app.""" + primary = get_primary(juju, f"{DATABASE_APP_NAME}/0") + for unit_number in range(database_units): + for attempt in Retrying( + stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + unit_name = f"{DATABASE_APP_NAME}/{unit_number}" + primary_connection_string = build_connection_string( + juju, app_name, relation_name, remote_unit_name=unit_name + ) + replica_connection_string = build_connection_string( + juju, + app_name, + relation_name, + read_only_endpoint=True, + remote_unit_name=unit_name, + ) + unit_ip = get_unit_address(juju, unit_name) + host_parameter = f"host={unit_ip} " + if unit_name == primary: + logger.info(f"Expected primary: {unit_ip}") + logger.info(f"Primary conn string: {primary_connection_string}") + logger.info(f"Replica conn string: {replica_connection_string}") + assert host_parameter in primary_connection_string, ( + f"{unit_name} is not the host of the primary connection string" + ) + assert host_parameter not in replica_connection_string, ( + f"{unit_name} is the host of the replica connection string" + ) + + +def execute_query_on_unit( + unit_address: str, + password: str, + query: str, + database: str = DATABASE_DEFAULT_NAME, + sslmode: str | None = None, +): + """Execute given PostgreSQL query on a unit. + + Args: + unit_address: The public IP address of the unit to execute the query on. + password: The PostgreSQL superuser password. + query: Query to execute. + database: Optional database to connect to (defaults to postgres database). + sslmode: Optional ssl mode to use (defaults to None). + + Returns: + A list of rows that were potentially returned from the query. + """ + extra_connection_parameters = f"sslmode={sslmode}" if sslmode else "" + with ( + psycopg2.connect( + f"dbname='{database}' user='operator' host='{unit_address}'" + f"password='{password}' connect_timeout=10 {extra_connection_parameters}" + ) as connection, + connection.cursor() as cursor, + ): + cursor.execute(query) + output = list(itertools.chain(*cursor.fetchall())) + return output + + +def find_unit(juju: JujuFixture, application: str, leader: bool) -> UnitAdapter: + """Helper function that retrieves a unit, based on need for leader or non-leader. + + Args: + juju: The Juju fixture. + application: The name of the application. + leader: Whether the unit is a leader or not. + + Returns: + A unit instance. + """ + ret_unit = None + for unit in juju.ext.model.applications[application].units: + if unit.is_leader_from_status() == leader: + ret_unit = unit + + return ret_unit + + +def get_application_units(juju: JujuFixture, application_name: str) -> list[str]: + """List the unit names of an application. + + Args: + juju: The Juju fixture + application_name: The name of the application + + Returns: + list of current unit names of the application + """ + return [ + unit.name.replace("/", "-") for unit in juju.ext.model.applications[application_name].units + ] + + +def get_application_units_ips(juju: JujuFixture, application_name: str) -> list[str]: + """List the unit IPs of an application. + + Args: + juju: The Juju fixture + application_name: The name of the application + + Returns: + list of current unit IPs of the application + """ + return [unit.public_address for unit in juju.ext.model.applications[application_name].units] + + +def get_landscape_api_credentials(juju: JujuFixture) -> list[str]: + """Returns the key and secret to be used in the Landscape API. + + Args: + juju: The Juju fixture + """ + unit = juju.ext.model.applications[DATABASE_APP_NAME].units[0] + password = get_password() + unit_address = unit.get_public_address() + + output = execute_query_on_unit( + unit_address, + password, + "SELECT encode(access_key_id,'escape'), encode(access_secret_key,'escape') FROM api_credentials;", + database="landscape-standalone-main", + ) + + return output + + +def get_leader_unit( + juju: JujuFixture, app: str, model: ModelAdapter | None = None +) -> UnitAdapter | None: + if model is None: + model = juju.ext.model + + leader_unit = None + for unit in model.applications[app].units: + if unit.is_leader_from_status(): + leader_unit = unit + break + + return leader_unit + + +def get_machine_from_unit(juju: JujuFixture, unit_name: str) -> str: + """Get the name of the machine from a specific unit. + + Args: + juju: The Juju fixture + unit_name: The name of the unit to get the machine + + Returns: + The name of the machine. + """ + raw_hostname = run_command_on_unit(juju, unit_name, "hostname") + return raw_hostname.strip() + + +def get_tls_ca(juju: JujuFixture, unit_name: str, relation: str = "client") -> str: + """Returns the TLS CA used by the unit. + + Args: + juju: The Juju fixture + unit_name: The name of the unit + relation: TLS relation to get the CA from + + Returns: + TLS CA or an empty string if there is no CA. + """ + raw_data = juju.cli("show-unit", unit_name) + endpoint = f"{relation}-certificates" + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {unit_name}") + data = yaml.safe_load(raw_data) + # Filter the data based on the relation name. + relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == endpoint] + if len(relation_data) == 0: + return "" + return json.loads(relation_data[0]["application-data"]["certificates"])[0].get("ca") + + +def check_connected_user( + cursor, session_user: str, current_user: str, primary: bool = True +) -> None: + cursor.execute("SELECT session_user,current_user;") + result = cursor.fetchone() + if result is not None: + instance = "primary" if primary else "replica" + assert result[0] == session_user, ( + f"The session user should be the {session_user} user in the {instance} (it's currently {result[0]})" + ) + assert result[1] == current_user, ( + f"The current user should be the {current_user} user in the {instance} (it's currently {result[1]})" + ) + else: + assert False, "No result returned from the query" + + +def check_roles_and_their_permissions( + juju: JujuFixture, relation_endpoint: str, database_name: str +) -> None: + action = juju.run(f"{DATA_INTEGRATOR_APP_NAME}/0", "get-credentials") + data_integrator_credentials = action.results + username = data_integrator_credentials[relation_endpoint]["username"] + uris = data_integrator_credentials[relation_endpoint]["uris"] + connection = None + try: + connection = psycopg2.connect(uris) + connection.autocommit = True + with connection.cursor() as cursor: + logger.info( + "Checking that the relation user is automatically escalated to the database owner user" + ) + check_connected_user(cursor, username, f"{database_name}_owner") + logger.info("Creating a test table and inserting data") + cursor.execute("CREATE TABLE test_table (id INTEGER);") + logger.info("Inserting data into the test table") + cursor.execute("INSERT INTO test_table(id) VALUES(1);") + logger.info("Reading data from the test table") + cursor.execute("SELECT * FROM test_table;") + result = cursor.fetchall() + assert len(result) == 1, "The database owner user should be able to read the data" + + logger.info("Checking that the database owner user can't create a database") + with pytest.raises(psycopg2.errors.InsufficientPrivilege): + cursor.execute(f"CREATE DATABASE {database_name}_2;") + + logger.info("Checking that the relation user can't create a table") + cursor.execute("RESET ROLE;") + check_connected_user(cursor, username, username) + with pytest.raises(psycopg2.errors.InsufficientPrivilege): + cursor.execute("CREATE TABLE test_table_2 (id INTEGER);") + + logger.info( + "Checking that the relation user can escalate back to the database owner user" + ) + cursor.execute(f"SET ROLE {database_name}_owner;") + check_connected_user(cursor, username, f"{database_name}_owner") + finally: + if connection is not None: + connection.close() + + connection_string = f"host={data_integrator_credentials[relation_endpoint]['read-only-endpoints'].split(':')[0]} dbname={data_integrator_credentials[relation_endpoint]['database']} user={username} password={data_integrator_credentials[relation_endpoint]['password']}" + connection = None + try: + connection = psycopg2.connect(connection_string) + with connection.cursor() as cursor: + logger.info("Checking that the relation user can read data from the database") + check_connected_user(cursor, username, username, primary=False) + logger.info("Reading data from the test table") + cursor.execute("SELECT * FROM test_table;") + result = cursor.fetchall() + assert len(result) == 1, "The relation user should be able to read the data" + finally: + if connection is not None: + connection.close() + + +def check_tls(juju: JujuFixture, unit_name: str, enabled: bool) -> bool: + """Returns whether TLS is enabled on the specific PostgreSQL instance. + + Args: + juju: The Juju fixture. + unit_name: The name of the unit of the PostgreSQL instance. + enabled: check if TLS is enabled/disabled + + Returns: + Whether TLS is enabled/disabled. + """ + unit_address = get_unit_address(juju, unit_name) + password = get_password() + # Get the IP addresses of the other units to check that they + # are connecting to the primary unit (if unit_name is the + # primary unit name) using encrypted connections. + app_name = unit_name.split("/")[0] + unit_addresses = [ + f"'{get_unit_address(juju, other_unit_name)}'" + for other_unit_name in juju.ext.model.units + if other_unit_name.split("/")[0] == app_name and other_unit_name != unit_name + ] + try: + for attempt in Retrying( + stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + output = execute_query_on_unit( + unit_address, + password, + "SHOW ssl;", + sslmode="require" if enabled else "disable", + ) + tls_enabled = "on" in output + + # Check for the number of bits in the encryption algorithm used + # on each connection. If a connection is not encrypted, None + # is returned instead of an integer. + connections_encryption_info = execute_query_on_unit( + unit_address, + password, + "SELECT bits FROM pg_stat_ssl INNER JOIN pg_stat_activity" + " ON pg_stat_ssl.pid = pg_stat_activity.pid" + " WHERE pg_stat_ssl.pid = pg_backend_pid()" + f" OR client_addr IN ({','.join(unit_addresses)});", + ) + + # This flag indicates whether all the connections are encrypted + # when checking for TLS enabled or all the connections are not + # encrypted when checking for TLS disabled. + connections_encrypted = ( + all(connections_encryption_info) + if enabled + else any(connections_encryption_info) + ) + + if enabled != tls_enabled or tls_enabled != connections_encrypted: + raise ValueError( + f"TLS is{' not' if not tls_enabled else ''} enabled on {unit_name}" + ) + return True + except RetryError: + return False + + +def check_tls_replication(juju: JujuFixture, unit_name: str, enabled: bool) -> bool: + """Returns whether TLS is enabled on the replica PostgreSQL instance. + + Args: + juju: The Juju fixture. + unit_name: The name of the replica of the PostgreSQL instance. + enabled: check if TLS is enabled/disabled + + Returns: + Whether TLS is enabled/disabled. + """ + unit_address = get_unit_address(juju, unit_name) + password = get_password() + + # Check for the all replicas using encrypted connection + output = execute_query_on_unit( + unit_address, + password, + "SELECT pg_ssl.ssl, pg_sa.client_addr FROM pg_stat_ssl pg_ssl" + " JOIN pg_stat_activity pg_sa ON pg_ssl.pid = pg_sa.pid" + " AND pg_sa.usename = 'replication';", + ) + return all(output[i] == enabled for i in range(0, len(output), 2)) + + +def check_tls_patroni_api(juju: JujuFixture, unit_name: str, enabled: bool) -> bool: + """Returns whether TLS is enabled on Patroni REST API. + + Args: + juju: The Juju fixture. + unit_name: The name of the unit where Patroni is running. + enabled: check if TLS is enabled/disabled + + Returns: + Whether TLS is enabled/disabled on Patroni REST API. + """ + unit_address = get_unit_address(juju, unit_name) + tls_ca = get_tls_ca(juju, unit_name, "peer") + + # If there is no TLS CA in the relation, something is wrong in + # the relation between the TLS Certificates Operator and PostgreSQL. + if enabled and not tls_ca: + return False + + try: + for attempt in Retrying( + stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt, tempfile.NamedTemporaryFile() as temp_ca_file: + # Write the TLS CA to a temporary file to use it in a request. + temp_ca_file.write(tls_ca.encode("utf-8")) + temp_ca_file.seek(0) + + # The CA bundle file is used to validate the server certificate when + # peer TLS is enabled, otherwise don't validate the internal cert. + health_info = requests.get( + f"https://{unit_address}:8008/health", + verify=temp_ca_file.name if enabled else False, + ) + return health_info.status_code == 200 + except RetryError: + return False + return False + + +def has_relation_exited( + juju: JujuFixture, endpoint_one: str, endpoint_two: str, model: ModelAdapter = None +) -> bool: + """Returns true if the relation between endpoint_one and endpoint_two has been removed.""" + relations = model.relations if model is not None else juju.ext.model.relations + for rel in relations: + endpoints = [endpoint.name for endpoint in rel.endpoints] + if endpoint_one in endpoints and endpoint_two in endpoints: + return False + return True + + +@retry( + retry=retry_if_result(lambda x: not x), + stop=stop_after_attempt(10), + wait=wait_exponential(multiplier=1, min=2, max=30), +) +def primary_changed(juju: JujuFixture, old_primary: str) -> bool: + """Checks whether the primary unit has changed. + + Args: + juju: The Juju fixture + old_primary: The name of the unit that was the primary before. + """ + other_unit = next( + unit.name + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units + if unit.name != old_primary + ) + primary = get_primary(juju, other_unit) + return primary != old_primary + + +def restart_machine(juju: JujuFixture, unit_name: str) -> None: + """Restart the machine where a unit run on. + + Args: + juju: The Juju fixture + unit_name: The name of the unit to restart the machine + """ + raw_hostname = get_machine_from_unit(juju, unit_name) + restart_machine_command = f"lxc restart {raw_hostname}" + subprocess.check_call(restart_machine_command.split()) + + +def run_command_on_unit(juju: JujuFixture, unit_name: str, command: str) -> str: + """Run a command on a specific unit. + + Args: + juju: The Juju fixture + unit_name: The name of the unit to run the command on + command: The command to run + + Returns: + the command output if it succeeds, otherwise raises an exception. + """ + complete_command = ["juju", "exec", "--unit", unit_name, "--", *command.split()] + try: + stdout = subprocess.check_output( + " ".join(complete_command), shell=True, universal_newlines=True, stderr=subprocess.PIPE + ) + except subprocess.CalledProcessError as e: + logger.error(f"{e.stdout} {e.stderr}") + raise Exception(f"Expected command '{command}' to succeed instead it failed") + return stdout + + +def scale_application( + juju: JujuFixture, + application_name: str, + count: int, + model: ModelAdapter = None, + timeout=2000, + idle_period: int = 30, +) -> None: + """Scale a given application to a specific unit count. + + Args: + juju: The Juju fixture + application_name: The name of the application + count: The desired number of units to scale to + model: The model to scale the application in + timeout: timeout period + idle_period: idle period + """ + if model is None: + model = juju.ext.model + change = count - len(model.applications[application_name].units) + if change > 0: + model.applications[application_name].add_units(change) + elif change < 0: + units = [unit.name for unit in model.applications[application_name].units[0:-change]] + model.applications[application_name].destroy_units(*units) + model.wait_for_idle( + apps=[application_name], + status="active", + timeout=timeout, + idle_period=idle_period, + wait_for_exact_units=count, + ) + + +def restart_patroni(juju: JujuFixture, unit_name: str, password: str) -> None: + """Restart Patroni on a specific unit. + + Args: + juju: The Juju fixture + unit_name: The name of the unit + password: patroni password + """ + unit_ip = get_unit_address(juju, unit_name) + requests.post( + f"https://{unit_ip}:8008/restart", + auth=requests.auth.HTTPBasicAuth("patroni", password), + verify=False, + ) + + +def set_password( + juju: JujuFixture, + username: str = "operator", + password: str | None = None, + database_app_name: str = DATABASE_APP_NAME, +): + """Set a user password via secret. + + Args: + juju: juju instance. + username: the user to set the password. + password: optional password to use + instead of auto-generating + database_app_name: name of the app for the secret + + Returns: + the results from the action. + """ + secret_name = "system_users_secret" + + try: + secret_id = juju.add_secret(secret_name, content={username: password}) + juju.grant_secret(secret_id, database_app_name) + + # update the application config to include the secret + juju.ext.model.applications[database_app_name].set_config({ + SYSTEM_USERS_PASSWORD_CONFIG: secret_id + }) + except Exception: + juju.update_secret(secret_name, content={username: password}) + + +def start_machine(juju: JujuFixture, machine_name: str) -> None: + """Start the machine where a unit run on. + + Args: + juju: The Juju fixture + machine_name: The name of the machine to start + """ + start_machine_command = f"lxc start {machine_name}" + subprocess.check_call(start_machine_command.split()) + + +def stop_machine(juju: JujuFixture, machine_name: str) -> None: + """Stop the machine where a unit run on. + + Args: + juju: The Juju fixture + machine_name: The name of the machine to stop + """ + stop_machine_command = f"lxc stop {machine_name}" + subprocess.check_call(stop_machine_command.split()) + + +def switchover( + juju: JujuFixture, current_primary: str, password: str, candidate: str | None = None +) -> None: + """Trigger a switchover. + + Args: + juju: The Juju fixture. + current_primary: The current primary unit. + password: Patroni password. + candidate: The unit that should be elected the new primary. + """ + primary_ip = get_unit_address(juju, current_primary) + response = requests.post( + f"https://{primary_ip}:8008/switchover", + json={ + "leader": current_primary.replace("/", "-"), + "candidate": candidate.replace("/", "-") if candidate else None, + }, + auth=requests.auth.HTTPBasicAuth("patroni", password), + verify=False, + ) + assert response.status_code == 200 + app_name = current_primary.split("/")[0] + for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True): + with attempt: + response = requests.get(f"https://{primary_ip}:8008/cluster", verify=False) + assert response.status_code == 200 + standbys = len([ + member for member in response.json()["members"] if member["role"] == "sync_standby" + ]) + assert standbys == len(juju.ext.model.applications[app_name].units) - 1 + + +def wait_for_idle_on_blocked( + juju: JujuFixture, + database_app_name: str, + unit_number: int, + other_app_name: str, + status_message: str, +): + """Wait for specific applications becoming idle and blocked together.""" + unit = f"{database_app_name}/{unit_number}" + juju.ext.model.wait_for_idle(apps=[other_app_name], status="active") + juju.wait( + lambda status: ( + status.apps[database_app_name].units[unit].workload_status.current == "blocked" + and status.apps[database_app_name].units[unit].workload_status.message + == status_message + ) + ) + + +def wait_for_relation_removed_between( + juju: JujuFixture, endpoint_one: str, endpoint_two: str, model: ModelAdapter = None +) -> None: + """Wait for relation to be removed before checking if it's waiting or idle. + + Args: + juju: running OpsTest instance + endpoint_one: one endpoint of the relation. Doesn't matter if it's provider or requirer. + endpoint_two: the other endpoint of the relation. + model: optional model to check for the relation. + """ + try: + for attempt in Retrying(stop=stop_after_delay(3 * 60), wait=wait_fixed(3)): + with attempt: + if has_relation_exited(juju, endpoint_one, endpoint_two, model): + break + except RetryError: + assert False, "Relation failed to exit after 3 minutes." + + +### Ported Mysql jubilant helpers + + +def execute_queries_on_unit( + unit_address: str, username: str, password: str, queries: list[str], database: str +) -> list: + """Execute given PostgreSQL queries on a unit. + + Args: + unit_address: The public IP address of the unit to execute the queries on + username: The PostgreSQL username + password: The PostgreSQL password + queries: A list of queries to execute + database: Database to execute in + + Returns: + A list of rows that were potentially queried + """ + with ( + psycopg2.connect( + f"dbname='{database}' user='{username}' host='{unit_address}' password='{password}' connect_timeout=10" + ) as connection, + connection.cursor() as cursor, + ): + for query in queries: + cursor.execute(query) + output = list(itertools.chain(*cursor.fetchall())) + + return output + + +########################################## +# # +# Partially Ported HA Helpers # +# # +########################################## + + +def app_name( + juju: JujuFixture, application_name: str = "postgresql", model: ModelAdapter | None = None +) -> str | None: + """Returns the name of the cluster running PostgreSQL. + + This is important since not all deployments of the PostgreSQL charm have the application name + "postgresql". + + Note: if multiple clusters are running PostgreSQL this will return the one first found. + """ + if model is None: + model = juju.ext.model + status = juju.status() + for app in status.apps: + if ( + application_name in status.apps[app].charm + and APPLICATION_NAME not in status.apps[app].charm + ): + return app + + return None + + +def change_patroni_setting( + juju: JujuFixture, + setting: str, + value: int | bool, + password: str, + use_random_unit: bool = False, + tls: bool = False, +) -> None: + """Change the value of one of the Patroni settings. + + Args: + juju: Juju fixture. + setting: the name of the setting. + value: the value to assign to the setting. + password: Patroni password. + use_random_unit: whether to use a random unit (default is False, + so it uses the primary). + tls: if Patroni is serving using tls. + """ + for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): + with attempt: + app = app_name(juju) + if use_random_unit: + unit = random.choice(juju.ext.model.applications[app].units).name + unit_ip = get_unit_address(juju, unit) + else: + primary_name = get_primary(juju, app) + unit_ip = get_unit_address(juju, primary_name) + requests.patch( + f"https://{unit_ip}:8008/config", + json={setting: value}, + verify=False, + auth=requests.auth.HTTPBasicAuth("patroni", password), + ) + + +def get_cluster_roles( + juju: JujuFixture, unit_name: str, use_ip_from_inside: bool = False +) -> dict[str, str | list[str] | None]: + """Returns whether the unit a replica in the cluster.""" + unit_ip = ( + get_ip_from_inside_the_unit(juju, unit_name) + if use_ip_from_inside + else get_unit_ip(juju, unit_name) + ) + + members = {"replicas": [], "primaries": [], "sync_standbys": []} + cluster_info = requests.get(f"https://{unit_ip}:8008/cluster", verify=False) + member_list = cluster_info.json()["members"] + logger.info(f"Cluster members are: {member_list}") + for member in member_list: + role = member["role"] + name = "/".join(member["name"].rsplit("-", 1)) + if role == "leader": + members["primaries"].append(name) + elif role == "sync_standby": + members["sync_standbys"].append(name) + else: + members["replicas"].append(name) + + return members + + +def get_ip_from_inside_the_unit(juju: JujuFixture, unit_name: str) -> str: + command = f"exec --unit {unit_name} -- hostname -I" + try: + stdout = juju.cli(*command.split()) + except jubilant.CLIError as e: + raise ProcessError( + "Expected command %s to succeed instead it failed: %s %s", + command, + e.returncode, + e.stderr, + ) + return stdout.splitlines()[0].strip() + + +def get_unit_ip(juju: JujuFixture, unit_name: str, model: ModelAdapter | None = None) -> str: + """Wrapper for getting unit ip. + + Args: + juju: Juju fixture. + unit_name: The name of the unit to get the address + model: Optional model instance to use + Returns: + The (str) ip of the unit + """ + if model is None: + application = unit_name.split("/")[0] + for unit in juju.ext.model.applications[application].units: + if unit.name == unit_name: + break + machine = next( + iter( + machine + for id_, machine in juju.status().machines.items() + if id_ == unit.status.machine + ) + ) + return instance_ip(juju, machine.hostname) + else: + return get_unit_address(juju, unit_name) + + +def instance_ip(juju: JujuFixture, instance: str) -> str: + """Translate juju instance name to IP. + + Args: + juju: Juju fixture. + instance: The name of the instance + + Returns: + The (str) IP address of the instance + """ + output = juju.cli("machines") + + for line in output.splitlines(): + if instance in line: + return line.split()[2] diff --git a/tests/integration/new_relations/jubilant_helpers.py b/tests/integration/new_relations/jubilant_helpers.py new file mode 100644 index 0000000000..5ab730cdb3 --- /dev/null +++ b/tests/integration/new_relations/jubilant_helpers.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. +import json + +import yaml + +from ..adapters import JujuFixture + + +def get_juju_secret(juju: JujuFixture, secret_uri: str) -> dict[str, str]: + """Retrieve juju secret.""" + secret_unique_id = secret_uri.split("/")[-1] + complete_command = f"show-secret {secret_uri} --reveal --format json" + stdout = juju.cli(*complete_command.split()) + return json.loads(stdout)[secret_unique_id]["content"]["Data"] + + +def build_connection_string( + juju: JujuFixture, + application_name: str, + relation_name: str, + *, + relation_id: str | None = None, + relation_alias: str | None = None, + read_only_endpoint: bool = False, + database: str | None = None, +) -> str: + """Build a PostgreSQL connection string. + + Args: + juju: The ops test framework instance + application_name: The name of the application + relation_name: name of the relation to get connection data from + relation_id: id of the relation to get connection data from + relation_alias: alias of the relation (like a connection name) + to get connection data from + read_only_endpoint: whether to choose the read-only endpoint + instead of the read/write endpoint + database: optional database to be used in the connection string + + Returns: + a PostgreSQL connection string + """ + # Get the connection data exposed to the application through the relation. + if database is None: + database = f"{application_name.replace('-', '_')}_{relation_name.replace('-', '_')}" + + if secret_uri := get_application_relation_data( + juju, + application_name, + relation_name, + "secret-user", + relation_id, + relation_alias, + ): + secret_data = get_juju_secret(juju, secret_uri) + username = secret_data["username"] + password = secret_data["password"] + else: + username = get_application_relation_data( + juju, application_name, relation_name, "username", relation_id, relation_alias + ) + password = get_application_relation_data( + juju, application_name, relation_name, "password", relation_id, relation_alias + ) + + endpoints = get_application_relation_data( + juju, + application_name, + relation_name, + "read-only-endpoints" if read_only_endpoint else "endpoints", + relation_id, + relation_alias, + ) + host = endpoints.split(",")[0].split(":")[0] + + # Build the complete connection string to connect to the database. + return f"dbname='{database}' user='{username}' host='{host}' password='{password}' connect_timeout=10" + + +def get_alias_from_relation_data( + juju: JujuFixture, unit_name: str, related_unit_name: str +) -> str | None: + """Get the alias that the unit assigned to the related unit application/cluster. + + Args: + juju: The ops test framework instance + unit_name: The name of the unit + related_unit_name: name of the related unit + + Returns: + the alias for the application/cluster of + the related unit + + Raises: + ValueError if it's not possible to get unit data + or if there is no alias on that. + """ + raw_data = juju.cli("show-unit", related_unit_name) + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {related_unit_name}") + data = yaml.safe_load(raw_data) + + # Retrieve the relation data from the unit. + relation_data = {} + for relation in data[related_unit_name]["relation-info"]: + for name, unit in relation["related-units"].items(): + if name == unit_name: + relation_data = unit["data"] + break + + # Check whether the unit has set an alias for the related unit application/cluster. + if "alias" not in relation_data: + raise ValueError(f"no alias could be grabbed for {related_unit_name} application/cluster") + + return relation_data["alias"] + + +def get_application_relation_data( + juju: JujuFixture, + application_name: str, + relation_name: str, + key: str, + relation_id: str | None = None, + relation_alias: str | None = None, +) -> str | None: + """Get relation data for an application. + + Args: + juju: The ops test framework instance + application_name: The name of the application + relation_name: name of the relation to get connection data from + key: key of data to be retrieved + relation_id: id of the relation to get connection data from + relation_alias: alias of the relation (like a connection name) + to get connection data from + + Returns: + the data that was requested or None + if no data in the relation + + Raises: + ValueError if it's not possible to get application data + or if there is no data for the particular relation endpoint + and/or alias. + """ + unit_name = f"{application_name}/0" + raw_data = juju.cli("show-unit", unit_name) + if not raw_data: + raise ValueError(f"no unit info could be grabbed for {unit_name}") + data = yaml.safe_load(raw_data) + # Filter the data based on the relation name. + relation_data = [v for v in data[unit_name]["relation-info"] if v["endpoint"] == relation_name] + if relation_id: + # Filter the data based on the relation id. + relation_data = [v for v in relation_data if v["relation-id"] == relation_id] + if relation_alias: + # Filter the data based on the cluster/relation alias. + relation_data = [ + v + for v in relation_data + if get_alias_from_relation_data(juju, unit_name, next(iter(v["related-units"]))) + == relation_alias + ] + if len(relation_data) == 0: + raise ValueError( + f"no relation data could be grabbed on relation with endpoint {relation_name} and alias {relation_alias}" + ) + return relation_data[0]["application-data"].get(key) diff --git a/tests/integration/test_audit.py b/tests/integration/test_audit.py index 80a1b0334d..829e8455d0 100644 --- a/tests/integration/test_audit.py +++ b/tests/integration/test_audit.py @@ -1,20 +1,19 @@ #!/usr/bin/env python3 # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -import asyncio import logging import psycopg2 as psycopg2 import pytest as pytest -from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed -from .helpers import ( +from .adapters import JujuFixture +from .jubilant_helpers import ( APPLICATION_NAME, DATABASE_APP_NAME, run_command_on_unit, ) -from .new_relations.helpers import build_connection_string +from .new_relations.jubilant_helpers import build_connection_string logger = logging.getLogger(__name__) @@ -22,22 +21,16 @@ @pytest.mark.abort_on_fail -async def test_audit_plugin(ops_test: OpsTest, charm) -> None: +def test_audit_plugin(juju: JujuFixture, charm) -> None: """Test the audit plugin.""" - await asyncio.gather( - ops_test.model.deploy(charm, config={"profile": "testing"}), - ops_test.model.deploy(APPLICATION_NAME, channel="latest/edge", series="noble"), - ) - await ops_test.model.relate(f"{APPLICATION_NAME}:{RELATION_ENDPOINT}", DATABASE_APP_NAME) - async with ops_test.fast_forward(): - await ops_test.model.wait_for_idle( - apps=[APPLICATION_NAME, DATABASE_APP_NAME], status="active" - ) + juju.ext.model.deploy(charm, config={"profile": "testing"}) + juju.ext.model.deploy(APPLICATION_NAME, channel="latest/edge", series="noble") + juju.ext.model.relate(f"{APPLICATION_NAME}:{RELATION_ENDPOINT}", DATABASE_APP_NAME) + with juju.ext.fast_forward(): + juju.ext.model.wait_for_idle(apps=[APPLICATION_NAME, DATABASE_APP_NAME], status="active") logger.info("Checking that the audit plugin is enabled") - connection_string = await build_connection_string( - ops_test, APPLICATION_NAME, RELATION_ENDPOINT - ) + connection_string = build_connection_string(juju, APPLICATION_NAME, RELATION_ENDPOINT) connection = None try: connection = psycopg2.connect(connection_string) @@ -52,8 +45,8 @@ async def test_audit_plugin(ops_test: OpsTest, charm) -> None: for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(10), reraise=True): with attempt: try: - logs = await run_command_on_unit( - ops_test, + logs = run_command_on_unit( + juju, unit_name, "sudo grep AUDIT /var/snap/charmed-postgresql/common/var/log/postgresql/postgresql-*.log", ) @@ -67,14 +60,12 @@ async def test_audit_plugin(ops_test: OpsTest, charm) -> None: assert False, "Audit logs were not found when the plugin is enabled." logger.info("Disabling the audit plugin") - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ - "plugin_audit_enable": "False" - }) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") + juju.ext.model.applications[DATABASE_APP_NAME].set_config({"plugin_audit_enable": "False"}) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") logger.info("Removing the previous logs") - await run_command_on_unit( - ops_test, + run_command_on_unit( + juju, unit_name, "rm /var/snap/charmed-postgresql/common/var/log/postgresql/postgresql-*.log", ) @@ -90,8 +81,8 @@ async def test_audit_plugin(ops_test: OpsTest, charm) -> None: if connection is not None: connection.close() try: - logs = await run_command_on_unit( - ops_test, + logs = run_command_on_unit( + juju, unit_name, "sudo grep AUDIT /var/snap/charmed-postgresql/common/var/log/postgresql/postgresql-*.log", ) diff --git a/tests/integration/test_backups_aws.py b/tests/integration/test_backups_aws.py index 895329bd02..39698dde91 100644 --- a/tests/integration/test_backups_aws.py +++ b/tests/integration/test_backups_aws.py @@ -4,12 +4,12 @@ import logging import pytest as pytest -from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_attempt, wait_exponential +from .adapters import JujuFixture from .backup_helpers import backup_operations from .conftest import AWS -from .helpers import ( +from .jubilant_helpers import ( DATABASE_APP_NAME, db_connect, get_password, @@ -31,13 +31,13 @@ @pytest.mark.abort_on_fail -async def test_backup_aws(ops_test: OpsTest, aws_cloud_configs: tuple[dict, dict], charm) -> None: +def test_backup_aws(juju: JujuFixture, aws_cloud_configs: tuple[dict, dict], charm) -> None: """Build and deploy two units of PostgreSQL in AWS, test backup and restore actions.""" config = aws_cloud_configs[0] credentials = aws_cloud_configs[1] - await backup_operations( - ops_test, + backup_operations( + juju, S3_INTEGRATOR_APP_NAME, tls_certificates_app_name, tls_channel, @@ -49,25 +49,23 @@ async def test_backup_aws(ops_test: OpsTest, aws_cloud_configs: tuple[dict, dict database_app_name = f"{DATABASE_APP_NAME}-aws" # Remove the relation to the TLS certificates operator. - await ops_test.model.applications[database_app_name].remove_relation( + juju.ext.model.applications[database_app_name].remove_relation( f"{database_app_name}:client-certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.applications[database_app_name].remove_relation( + juju.ext.model.applications[database_app_name].remove_relation( f"{database_app_name}:peer-certificates", f"{tls_certificates_app_name}:certificates" ) new_unit_name = f"{database_app_name}/2" # Scale up to be able to test primary and leader being different. - async with ops_test.fast_forward(): - await scale_application(ops_test, database_app_name, 2) + with juju.ext.fast_forward(): + scale_application(juju, database_app_name, 2) # Ensure replication is working correctly. - address = get_unit_address(ops_test, new_unit_name) - password = await get_password(ops_test, database_app_name=database_app_name) - patroni_password = await get_password( - ops_test, username="patroni", database_app_name=database_app_name - ) + address = get_unit_address(juju, new_unit_name) + password = get_password(database_app_name=database_app_name) + patroni_password = get_password(username="patroni", database_app_name=database_app_name) with db_connect(host=address, password=password) as connection, connection.cursor() as cursor: cursor.execute( "SELECT EXISTS (SELECT FROM information_schema.tables" @@ -85,11 +83,11 @@ async def test_backup_aws(ops_test: OpsTest, aws_cloud_configs: tuple[dict, dict ) connection.close() - old_primary = await get_primary(ops_test, new_unit_name) - switchover(ops_test, old_primary, patroni_password, new_unit_name) + old_primary = get_primary(juju, new_unit_name) + switchover(juju, old_primary, patroni_password, new_unit_name) # Get the new primary unit. - primary = await get_primary(ops_test, new_unit_name) + primary = get_primary(juju, new_unit_name) # Check that the primary changed. for attempt in Retrying( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30) @@ -99,15 +97,15 @@ async def test_backup_aws(ops_test: OpsTest, aws_cloud_configs: tuple[dict, dict # Ensure stanza is working correctly. logger.info("listing the available backups") - action = await ops_test.model.units.get(new_unit_name).run_action("list-backups") - await action.wait() + action = juju.ext.model.units.get(new_unit_name).run_action("list-backups") + action.wait() backups = action.results.get("backups") assert backups, "backups not outputted" - await ops_test.model.wait_for_idle(status="active", timeout=1000) + juju.ext.model.wait_for_idle(status="active", timeout=1000) # Remove the database app. - await ops_test.model.remove_application(database_app_name, block_until_done=True) + juju.ext.model.remove_application(database_app_name, block_until_done=True) # Remove the TLS operator. - await ops_test.model.remove_application(tls_certificates_app_name, block_until_done=True) + juju.ext.model.remove_application(tls_certificates_app_name, block_until_done=True) diff --git a/tests/integration/test_backups_ceph.py b/tests/integration/test_backups_ceph.py index f12b38e21e..6fbd1fa958 100644 --- a/tests/integration/test_backups_ceph.py +++ b/tests/integration/test_backups_ceph.py @@ -4,8 +4,8 @@ import logging import pytest -from pytest_operator.plugin import OpsTest +from .adapters import JujuFixture from .backup_helpers import backup_operations from .conftest import ConnectionInformation @@ -37,10 +37,10 @@ def cloud_configs(microceph: ConnectionInformation): } -async def test_backup_ceph(ops_test: OpsTest, cloud_configs, cloud_credentials, charm) -> None: +def test_backup_ceph(juju: JujuFixture, cloud_configs, cloud_credentials, charm) -> None: """Build and deploy two units of PostgreSQL in microceph, test backup and restore actions.""" - await backup_operations( - ops_test, + backup_operations( + juju, S3_INTEGRATOR_APP_NAME, None, None, diff --git a/tests/integration/test_backups_gcp.py b/tests/integration/test_backups_gcp.py index 97e47320c5..cee4467bcb 100644 --- a/tests/integration/test_backups_gcp.py +++ b/tests/integration/test_backups_gcp.py @@ -5,13 +5,12 @@ import uuid import pytest as pytest -from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_attempt, wait_exponential +from .adapters import JujuFixture from .backup_helpers import backup_operations from .conftest import GCP -from .helpers import ( - CHARM_BASE, +from .jubilant_helpers import ( DATABASE_APP_NAME, db_connect, get_password, @@ -32,13 +31,13 @@ @pytest.mark.abort_on_fail -async def test_backup_gcp(ops_test: OpsTest, gcp_cloud_configs: tuple[dict, dict], charm) -> None: +def test_backup_gcp(juju: JujuFixture, gcp_cloud_configs: tuple[dict, dict], charm) -> None: """Build and deploy two units of PostgreSQL in GCP, test backup and restore actions.""" config = gcp_cloud_configs[0] credentials = gcp_cloud_configs[1] - await backup_operations( - ops_test, + backup_operations( + juju, S3_INTEGRATOR_APP_NAME, tls_certificates_app_name, tls_channel, @@ -50,38 +49,36 @@ async def test_backup_gcp(ops_test: OpsTest, gcp_cloud_configs: tuple[dict, dict database_app_name = f"{DATABASE_APP_NAME}-gcp" # Remove the database app. - await ops_test.model.remove_application(database_app_name, block_until_done=True) + juju.ext.model.remove_application(database_app_name, block_until_done=True) # Remove the TLS operator. - await ops_test.model.remove_application(tls_certificates_app_name, block_until_done=True) + juju.ext.model.remove_application(tls_certificates_app_name, block_until_done=True) -async def test_restore_on_new_cluster( - ops_test: OpsTest, charm, gcp_cloud_configs: tuple[dict, dict] +def test_restore_on_new_cluster( + juju: JujuFixture, charm, gcp_cloud_configs: tuple[dict, dict] ) -> None: """Test that is possible to restore a backup to another PostgreSQL cluster.""" previous_database_app_name = f"{DATABASE_APP_NAME}-gcp" database_app_name = f"new-{DATABASE_APP_NAME}" - await ops_test.model.deploy( + juju.ext.model.deploy( charm, application_name=previous_database_app_name, - base=CHARM_BASE, config={"profile": "testing"}, ) - await ops_test.model.deploy( + juju.ext.model.deploy( charm, application_name=database_app_name, - base=CHARM_BASE, config={"profile": "testing"}, ) - await ops_test.model.relate(previous_database_app_name, S3_INTEGRATOR_APP_NAME) - await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME) - async with ops_test.fast_forward(): + juju.ext.model.relate(previous_database_app_name, S3_INTEGRATOR_APP_NAME) + juju.ext.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME) + with juju.ext.fast_forward(): logger.info( "waiting for the database charm to become blocked due to existing backups from another cluster in the repository" ) - await wait_for_idle_on_blocked( - ops_test, + wait_for_idle_on_blocked( + juju, previous_database_app_name, 2, S3_INTEGRATOR_APP_NAME, @@ -90,8 +87,8 @@ async def test_restore_on_new_cluster( logger.info( "waiting for the database charm to become blocked due to existing backups from another cluster in the repository" ) - await wait_for_idle_on_blocked( - ops_test, + wait_for_idle_on_blocked( + juju, database_app_name, 0, S3_INTEGRATOR_APP_NAME, @@ -100,17 +97,17 @@ async def test_restore_on_new_cluster( # Remove the database app with the same name as the previous one (that was used only to test # that the cluster becomes blocked). - await ops_test.model.remove_application(previous_database_app_name, block_until_done=True) + juju.ext.model.remove_application(previous_database_app_name, block_until_done=True) # Run the "list backups" action. unit_name = f"{database_app_name}/0" logger.info("listing the available backups") - action = await ops_test.model.units.get(unit_name).run_action("list-backups") - await action.wait() + action = juju.ext.model.units.get(unit_name).run_action("list-backups") + action.wait() backups = action.results.get("backups") assert backups, "backups not outputted" - await wait_for_idle_on_blocked( - ops_test, + wait_for_idle_on_blocked( + juju, database_app_name, 0, S3_INTEGRATOR_APP_NAME, @@ -126,24 +123,27 @@ async def test_restore_on_new_cluster( # Last two entries are 'action: restore', that cannot be used without restore-to-time parameter most_recent_real_backup = backups.split("\n")[-3] backup_id = most_recent_real_backup.split()[0] - action = await ops_test.model.units.get(unit_name).run_action( + action = juju.ext.model.units.get(unit_name).run_action( "restore", **{"backup-id": backup_id} ) - await action.wait() + action.wait() restore_status = action.results.get("restore-status") assert restore_status, "restore hasn't succeeded" # Wait for the restore to complete. - async with ops_test.fast_forward(): - unit = ops_test.model.units.get(f"{database_app_name}/0") - await ops_test.model.block_until( - lambda: unit.workload_status_message == ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE + with juju.ext.fast_forward(): + unit = f"{database_app_name}/0" + juju.wait( + lambda status: ( + status.apps[database_app_name].units[unit].workload_status.message + == ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE + ) ) # Check that the backup was correctly restored by having only the first created table. logger.info("checking that the backup was correctly restored") - password = await get_password(ops_test, database_app_name=database_app_name) - address = get_unit_address(ops_test, unit_name) + password = get_password(database_app_name=database_app_name) + address = get_unit_address(juju, unit_name) with db_connect(host=address, password=password) as connection, connection.cursor() as cursor: cursor.execute( "SELECT EXISTS (SELECT FROM information_schema.tables" @@ -155,47 +155,51 @@ async def test_restore_on_new_cluster( connection.close() -async def test_invalid_config_and_recovery_after_fixing_it( - ops_test: OpsTest, gcp_cloud_configs: tuple[dict, dict] +def test_invalid_config_and_recovery_after_fixing_it( + juju: JujuFixture, gcp_cloud_configs: tuple[dict, dict] ) -> None: """Test that the charm can handle invalid and valid backup configurations.""" database_app_name = f"new-{DATABASE_APP_NAME}" # Provide invalid backup configurations. logger.info("configuring S3 integrator for an invalid cloud") - await ops_test.model.applications[S3_INTEGRATOR_APP_NAME].set_config({ + juju.ext.model.applications[S3_INTEGRATOR_APP_NAME].set_config({ "endpoint": "endpoint", "bucket": "bucket", "path": "path", "region": "region", }) - action = await ops_test.model.units.get(f"{S3_INTEGRATOR_APP_NAME}/0").run_action( + action = juju.ext.model.units.get(f"{S3_INTEGRATOR_APP_NAME}/0").run_action( "sync-s3-credentials", **{ "access-key": "access-key", "secret-key": "secret-key", }, ) - await action.wait() + action.wait() logger.info("waiting for the database charm to become blocked") - unit = ops_test.model.units.get(f"{database_app_name}/0") - await ops_test.model.block_until( - lambda: unit.workload_status_message == FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE + unit = f"{database_app_name}/0" + juju.wait( + lambda status: ( + status.apps[database_app_name].units[unit].workload_status.message + == FAILED_TO_ACCESS_CREATE_BUCKET_ERROR_MESSAGE + ) ) # Provide valid backup configurations, but from another cluster repository. logger.info( "configuring S3 integrator for a valid cloud, but with the path of another cluster repository" ) - await ops_test.model.applications[S3_INTEGRATOR_APP_NAME].set_config(gcp_cloud_configs[0]) - action = await ops_test.model.units.get(f"{S3_INTEGRATOR_APP_NAME}/0").run_action( + juju.ext.model.applications[S3_INTEGRATOR_APP_NAME].set_config(gcp_cloud_configs[0]) + action = juju.ext.model.units.get(f"{S3_INTEGRATOR_APP_NAME}/0").run_action( "sync-s3-credentials", **gcp_cloud_configs[1], ) - await action.wait() + action.wait() logger.info("waiting for the database charm to become blocked") - unit = ops_test.model.units.get(f"{database_app_name}/0") - await ops_test.model.block_until( + + unit = juju.ext.model.units.get(f"{database_app_name}/0") + juju.ext.model.block_until( lambda: unit.workload_status_message == ANOTHER_CLUSTER_REPOSITORY_ERROR_MESSAGE ) @@ -203,23 +207,23 @@ async def test_invalid_config_and_recovery_after_fixing_it( logger.info("configuring S3 integrator for a valid cloud") config = gcp_cloud_configs[0].copy() config["path"] = f"/postgresql/{uuid.uuid1()}" - await ops_test.model.applications[S3_INTEGRATOR_APP_NAME].set_config(config) + juju.ext.model.applications[S3_INTEGRATOR_APP_NAME].set_config(config) logger.info("waiting for the database charm to become active") - await ops_test.model.wait_for_idle( - apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active" - ) + juju.ext.model.wait_for_idle(apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active") -async def test_block_on_missing_region( - ops_test: OpsTest, gcp_cloud_configs: tuple[dict, dict] -) -> None: - await ops_test.model.applications[S3_INTEGRATOR_APP_NAME].set_config({ +def test_block_on_missing_region(juju: JujuFixture, gcp_cloud_configs: tuple[dict, dict]) -> None: + juju.ext.model.applications[S3_INTEGRATOR_APP_NAME].set_config({ **gcp_cloud_configs[0], "region": "", }) database_app_name = f"new-{DATABASE_APP_NAME}" logger.info("waiting for the database charm to become blocked") - unit = ops_test.model.units.get(f"{database_app_name}/0") - await ops_test.model.block_until( - lambda: unit.workload_status_message == FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE + unit = juju.ext.model.units.get(f"{database_app_name}/0") + unit = f"{database_app_name}/0" + juju.wait( + lambda status: ( + status.apps[database_app_name].units[unit].workload_status.message + == FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE + ) ) diff --git a/tests/integration/test_backups_pitr_aws.py b/tests/integration/test_backups_pitr_aws.py index 90f454123f..89c37dd431 100644 --- a/tests/integration/test_backups_pitr_aws.py +++ b/tests/integration/test_backups_pitr_aws.py @@ -4,8 +4,8 @@ import logging import pytest -from pytest_operator.plugin import OpsTest +from .adapters import JujuFixture from .backup_helpers import pitr_backup_operations from .conftest import AWS @@ -17,14 +17,12 @@ @pytest.mark.abort_on_fail -async def test_pitr_backup_aws( - ops_test: OpsTest, aws_cloud_configs: tuple[dict, dict], charm -) -> None: +def test_pitr_backup_aws(juju: JujuFixture, aws_cloud_configs: tuple[dict, dict], charm) -> None: """Build, deploy two units of PostgreSQL and do backup in AWS. Then, write new data into DB, switch WAL file and test point-in-time-recovery restore action.""" config, credentials = aws_cloud_configs - await pitr_backup_operations( - ops_test, + pitr_backup_operations( + juju, S3_INTEGRATOR_APP_NAME, TLS_CERTIFICATES_APP_NAME, TLS_CHANNEL, diff --git a/tests/integration/test_backups_pitr_ceph.py b/tests/integration/test_backups_pitr_ceph.py index ebb16a1c60..49a5d72b0f 100644 --- a/tests/integration/test_backups_pitr_ceph.py +++ b/tests/integration/test_backups_pitr_ceph.py @@ -4,8 +4,8 @@ import logging import pytest -from pytest_operator.plugin import OpsTest +from .adapters import JujuFixture from .backup_helpers import pitr_backup_operations from .conftest import ConnectionInformation @@ -38,12 +38,10 @@ def cloud_configs(microceph: ConnectionInformation): @pytest.mark.abort_on_fail -async def test_pitr_backup_ceph( - ops_test: OpsTest, cloud_configs, cloud_credentials, charm -) -> None: +def test_pitr_backup_ceph(juju: JujuFixture, cloud_configs, cloud_credentials, charm) -> None: """Build, deploy two units of PostgreSQL and do backup in AWS. Then, write new data into DB, switch WAL file and test point-in-time-recovery restore action.""" - await pitr_backup_operations( - ops_test, + pitr_backup_operations( + juju, S3_INTEGRATOR_APP_NAME, None, None, diff --git a/tests/integration/test_backups_pitr_gcp.py b/tests/integration/test_backups_pitr_gcp.py index 6640236e1a..da3504dd23 100644 --- a/tests/integration/test_backups_pitr_gcp.py +++ b/tests/integration/test_backups_pitr_gcp.py @@ -4,8 +4,8 @@ import logging import pytest -from pytest_operator.plugin import OpsTest +from .adapters import JujuFixture from .backup_helpers import pitr_backup_operations from .conftest import GCP @@ -17,14 +17,12 @@ @pytest.mark.abort_on_fail -async def test_pitr_backup_gcp( - ops_test: OpsTest, gcp_cloud_configs: tuple[dict, dict], charm -) -> None: +def test_pitr_backup_gcp(juju: JujuFixture, gcp_cloud_configs: tuple[dict, dict], charm) -> None: """Build, deploy two units of PostgreSQL and do backup in GCP. Then, write new data into DB, switch WAL file and test point-in-time-recovery restore action.""" config, credentials = gcp_cloud_configs - await pitr_backup_operations( - ops_test, + pitr_backup_operations( + juju, S3_INTEGRATOR_APP_NAME, TLS_CERTIFICATES_APP_NAME, TLS_CHANNEL, diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index f9a3af31f3..beb7a35679 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -11,20 +11,19 @@ import pytest import requests from psycopg2 import sql -from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_attempt, wait_exponential, wait_fixed from locales import SNAP_LOCALES -from .ha_tests.helpers import get_cluster_roles -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( DATABASE_APP_NAME, STORAGE_PATH, check_cluster_members, convert_records_to_dict, db_connect, find_unit, + get_cluster_roles, get_password, get_primary, get_unit_address, @@ -40,42 +39,41 @@ @pytest.mark.abort_on_fail @pytest.mark.skip_if_deployed -async def test_deploy(ops_test: OpsTest, charm: str): +def test_deploy(juju: JujuFixture, charm: str): """Deploy the charm-under-test. Assert on the unit status before any relations/configurations take place. """ # Deploy the charm with Patroni resource. - await ops_test.model.deploy( + juju.ext.model.deploy( charm, application_name=DATABASE_APP_NAME, num_units=3, - base=CHARM_BASE, config={"profile": "testing"}, ) # Reducing the update status frequency to speed up the triggering of deferred events. - await ops_test.model.set_config({"update-status-hook-interval": "10s"}) + juju.ext.model.set_config({"update-status-hook-interval": "10s"}) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) - assert ops_test.model.applications[DATABASE_APP_NAME].units[0].workload_status == "active" + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) + assert juju.ext.model.applications[DATABASE_APP_NAME].units[0].workload_status == "active" @pytest.mark.abort_on_fail @pytest.mark.parametrize("unit_id", UNIT_IDS) -async def test_database_is_up(ops_test: OpsTest, unit_id: int): +def test_database_is_up(juju: JujuFixture, unit_id: int): # Query Patroni REST API and check the status that indicates # both Patroni and PostgreSQL are up and running. - host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}") + host = get_unit_address(juju, f"{DATABASE_APP_NAME}/{unit_id}") result = requests.get(f"https://{host}:8008/health", verify=False) assert result.status_code == 200 @pytest.mark.parametrize("unit_id", UNIT_IDS) -async def test_exporter_is_up(ops_test: OpsTest, unit_id: int): +def test_exporter_is_up(juju: JujuFixture, unit_id: int): # Query Patroni REST API and check the status that indicates # both Patroni and PostgreSQL are up and running. - host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}") + host = get_unit_address(juju, f"{DATABASE_APP_NAME}/{unit_id}") result = requests.get(f"http://{host}:9187/metrics") assert result.status_code == 200 assert "pg_exporter_last_scrape_error 0" in result.content.decode("utf8"), ( @@ -84,13 +82,13 @@ async def test_exporter_is_up(ops_test: OpsTest, unit_id: int): @pytest.mark.parametrize("unit_id", UNIT_IDS) -async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): +def test_settings_are_correct(juju: JujuFixture, unit_id: int): # Connect to the PostgreSQL instance. # Retrieving the operator user password using the action. - password = await get_password(ops_test) + password = get_password() # Connect to PostgreSQL. - host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}") + host = get_unit_address(juju, f"{DATABASE_APP_NAME}/{unit_id}") logger.info("connecting to the database host: %s", host) with db_connect(host, password) as connection: assert connection.status == psycopg2.extensions.STATUS_READY @@ -161,16 +159,15 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): assert settings["maximum_lag_on_failover"] == 1048576 logger.warning("Asserting port ranges") - unit = ops_test.model.applications[DATABASE_APP_NAME].units[unit_id] - assert unit.data["port-ranges"][0]["from-port"] == 5432 - assert unit.data["port-ranges"][0]["to-port"] == 5432 - assert unit.data["port-ranges"][0]["protocol"] == "tcp" + unit = juju.ext.model.applications[DATABASE_APP_NAME].units[unit_id] + assert unit.status.open_ports + assert unit.status.open_ports[0] == "5432/tcp" -async def test_postgresql_locales(ops_test: OpsTest) -> None: - raw_locales = await run_command_on_unit( - ops_test, - ops_test.model.applications[DATABASE_APP_NAME].units[0].name, +def test_postgresql_locales(juju: JujuFixture) -> None: + raw_locales = run_command_on_unit( + juju, + juju.ext.model.applications[DATABASE_APP_NAME].units[0].name, "ls /snap/charmed-postgresql/current/usr/lib/locale", ) locales = raw_locales.splitlines() @@ -183,20 +180,20 @@ async def test_postgresql_locales(ops_test: OpsTest) -> None: assert locales == list(get_args(SNAP_LOCALES)) -async def test_postgresql_parameters_change(ops_test: OpsTest) -> None: +def test_postgresql_parameters_change(juju: JujuFixture) -> None: """Test that's possible to change PostgreSQL parameters.""" - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ + juju.ext.model.applications[DATABASE_APP_NAME].set_config({ "memory_max_prepared_transactions": "100", "memory_shared_buffers": "32768", # 2 * 128MB. Patroni may refuse the config if < 128MB "response_lc_monetary": "en_GB.utf8", "experimental_max_connections": "200", }) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30) - password = await get_password(ops_test) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30) + password = get_password() # Connect to PostgreSQL. for unit_id in UNIT_IDS: - host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}") + host = get_unit_address(juju, f"{DATABASE_APP_NAME}/{unit_id}") logger.info("connecting to the database host: %s", host) try: with ( @@ -229,38 +226,38 @@ async def test_postgresql_parameters_change(ops_test: OpsTest) -> None: connection.close() -async def test_scale_down_and_up(ops_test: OpsTest): +def test_scale_down_and_up(juju: JujuFixture): """Test data is replicated to new units after a scale up.""" # Ensure the initial number of units in the application. initial_scale = len(UNIT_IDS) - await scale_application(ops_test, DATABASE_APP_NAME, initial_scale) + scale_application(juju, DATABASE_APP_NAME, initial_scale) # Scale down the application. - await scale_application(ops_test, DATABASE_APP_NAME, initial_scale - 1) + scale_application(juju, DATABASE_APP_NAME, initial_scale - 1) # Ensure the member was correctly removed from the cluster # (by comparing the cluster members and the current units). - await check_cluster_members(ops_test, DATABASE_APP_NAME) + check_cluster_members(juju, DATABASE_APP_NAME) # Scale up the application (2 more units than the current scale). - await scale_application(ops_test, DATABASE_APP_NAME, initial_scale + 1) + scale_application(juju, DATABASE_APP_NAME, initial_scale + 1) # Assert the correct members are part of the cluster. - await check_cluster_members(ops_test, DATABASE_APP_NAME) + check_cluster_members(juju, DATABASE_APP_NAME) # Test the deletion of the unit that is both the leader and the primary. - any_unit_name = ops_test.model.applications[DATABASE_APP_NAME].units[0].name - primary = await get_primary(ops_test, any_unit_name) - leader_unit = await find_unit(ops_test, leader=True, application=DATABASE_APP_NAME) + any_unit_name = juju.ext.model.applications[DATABASE_APP_NAME].units[0].name + primary = get_primary(juju, any_unit_name) + leader_unit = find_unit(juju, leader=True, application=DATABASE_APP_NAME) # Trigger a switchover if the primary and the leader are not the same unit. - patroni_password = await get_password(ops_test, "patroni") + patroni_password = get_password("patroni") if primary != leader_unit.name: - switchover(ops_test, primary, patroni_password, leader_unit.name) + switchover(juju, primary, patroni_password, leader_unit.name) # Get the new primary unit. - primary = await get_primary(ops_test, any_unit_name) + primary = get_primary(juju, any_unit_name) # Check that the primary changed. for attempt in Retrying( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30) @@ -268,28 +265,28 @@ async def test_scale_down_and_up(ops_test: OpsTest): with attempt: assert primary == leader_unit.name - await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(leader_unit.name) - await ops_test.model.wait_for_idle( + juju.ext.model.applications[DATABASE_APP_NAME].destroy_units(leader_unit.name) + juju.ext.model.wait_for_idle( apps=[DATABASE_APP_NAME], status="active", timeout=1000, wait_for_exact_units=initial_scale ) # Assert the correct members are part of the cluster. - await check_cluster_members(ops_test, DATABASE_APP_NAME) + check_cluster_members(juju, DATABASE_APP_NAME) # Scale up the application (2 more units than the current scale). - await scale_application(ops_test, DATABASE_APP_NAME, initial_scale + 2) + scale_application(juju, DATABASE_APP_NAME, initial_scale + 2) # Test the deletion of both the unit that is the leader and the unit that is the primary. - any_unit_name = ops_test.model.applications[DATABASE_APP_NAME].units[0].name - primary = await get_primary(ops_test, any_unit_name) - leader_unit = await find_unit(ops_test, DATABASE_APP_NAME, True) + any_unit_name = juju.ext.model.applications[DATABASE_APP_NAME].units[0].name + primary = get_primary(juju, any_unit_name) + leader_unit = find_unit(juju, DATABASE_APP_NAME, True) # Trigger a switchover if the primary and the leader are the same unit. if primary == leader_unit.name: - switchover(ops_test, primary, patroni_password) + switchover(juju, primary, patroni_password) # Get the new primary unit. - primary = await get_primary(ops_test, any_unit_name) + primary = get_primary(juju, any_unit_name) # Check that the primary changed. for attempt in Retrying( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30) @@ -297,8 +294,8 @@ async def test_scale_down_and_up(ops_test: OpsTest): with attempt: assert primary != leader_unit.name - await ops_test.model.applications[DATABASE_APP_NAME].destroy_units(primary, leader_unit.name) - await ops_test.model.wait_for_idle( + juju.ext.model.applications[DATABASE_APP_NAME].destroy_units(primary, leader_unit.name) + juju.ext.model.wait_for_idle( apps=[DATABASE_APP_NAME], status="active", timeout=2000, @@ -310,40 +307,40 @@ async def test_scale_down_and_up(ops_test: OpsTest): sleep(30) # Assert the correct members are part of the cluster. - await check_cluster_members(ops_test, DATABASE_APP_NAME) + check_cluster_members(juju, DATABASE_APP_NAME) # End with the cluster having the initial number of units. - await scale_application(ops_test, DATABASE_APP_NAME, initial_scale) + scale_application(juju, DATABASE_APP_NAME, initial_scale) -async def test_switchover_sync_standby(ops_test: OpsTest): - original_roles = await get_cluster_roles( - ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name +def test_switchover_sync_standby(juju: JujuFixture): + original_roles = get_cluster_roles( + juju, juju.ext.model.applications[DATABASE_APP_NAME].units[0].name ) - run_action = await ops_test.model.units[original_roles["sync_standbys"][0]].run_action( + run_action = juju.ext.model.units[original_roles["sync_standbys"][0]].run_action( "promote-to-primary", scope="unit" ) - await run_action.wait() + run_action.wait() - await ops_test.model.wait_for_idle(status="active", timeout=200) + juju.ext.model.wait_for_idle(status="active", timeout=200) - new_roles = await get_cluster_roles( - ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name + new_roles = get_cluster_roles( + juju, juju.ext.model.applications[DATABASE_APP_NAME].units[0].name ) assert new_roles["primaries"][0] == original_roles["sync_standbys"][0] -async def test_persist_data_through_primary_deletion(ops_test: OpsTest): +def test_persist_data_through_primary_deletion(juju: JujuFixture): """Test data persists through a primary deletion.""" # Set a composite application name in order to test in more than one series at the same time. - any_unit_name = ops_test.model.applications[DATABASE_APP_NAME].units[0].name + any_unit_name = juju.ext.model.applications[DATABASE_APP_NAME].units[0].name for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True): with attempt: - primary = await get_primary(ops_test, any_unit_name) - password = await get_password(ops_test) + primary = get_primary(juju, any_unit_name) + password = get_password() # Write data to primary IP. - host = get_unit_address(ops_test, primary) + host = get_unit_address(juju, primary) logger.info(f"connecting to primary {primary} on {host}") with db_connect(host, password) as connection: connection.autocommit = True @@ -352,17 +349,15 @@ async def test_persist_data_through_primary_deletion(ops_test: OpsTest): connection.close() # Remove one unit. - await ops_test.model.destroy_units( - primary, - ) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) + juju.ext.model.destroy_unit(primary) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) # Add the unit again. - await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=1) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=2000) + juju.ext.model.applications[DATABASE_APP_NAME].add_unit(count=1) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=2000) # Testing write occurred to every postgres instance by reading from them - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units: host = unit.public_address logger.info("connecting to the database host: %s", host) with db_connect(host, password) as connection, connection.cursor() as cursor: diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index d82cf24a71..f67ac68336 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -4,10 +4,9 @@ import logging import pytest as pytest -from pytest_operator.plugin import OpsTest -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( DATABASE_APP_NAME, execute_query_on_unit, get_leader_unit, @@ -19,19 +18,18 @@ @pytest.mark.abort_on_fail -async def test_config_parameters(ops_test: OpsTest, charm) -> None: +def test_config_parameters(juju: JujuFixture, charm) -> None: """Build and deploy one unit of PostgreSQL and then test config with wrong parameters.""" # Build and deploy the PostgreSQL charm. - async with ops_test.fast_forward(): - await ops_test.model.deploy( + with juju.ext.fast_forward(): + juju.ext.model.deploy( charm, num_units=1, - base=CHARM_BASE, config={"profile": "testing"}, ) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) - leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + leader_unit = get_leader_unit(juju, DATABASE_APP_NAME) test_string = "abcXYZ123" configs = [ @@ -268,30 +266,42 @@ async def test_config_parameters(ops_test: OpsTest, charm) -> None: for k, v in config.items(): logger.info(k) charm_config[k] = v[0] - await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.block_until( - lambda: ( - ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status == "blocked" + juju.ext.model.applications[DATABASE_APP_NAME].set_config(charm_config) + juju.wait( + lambda status: ( + status + .apps[DATABASE_APP_NAME] + .units[f"{DATABASE_APP_NAME}/0"] + .workload_status.current + == "blocked" + and "Configuration Error" + in status + .apps[DATABASE_APP_NAME] + .units[leader_unit.name] + .workload_status.message ), - timeout=100, + timeout=600, + successes=1, ) - assert "Configuration Error" in leader_unit.workload_status_message charm_config[k] = v[1] - await ops_test.model.applications[DATABASE_APP_NAME].set_config(charm_config) - await ops_test.model.block_until( - lambda: ops_test.model.units[f"{DATABASE_APP_NAME}/0"].workload_status == "active", - timeout=100, + juju.ext.model.applications[DATABASE_APP_NAME].set_config(charm_config) + juju.wait( + lambda status: ( + status.apps[DATABASE_APP_NAME].units[f"{DATABASE_APP_NAME}/0"].workload_status.current + == "active" + ), + timeout=200, ) @pytest.mark.abort_on_fail -async def test_worker_process_configs(ops_test: OpsTest) -> None: +def test_worker_process_configs(juju: JujuFixture) -> None: """Test worker process configuration parameters are applied correctly.""" - leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + leader_unit = get_leader_unit(juju, DATABASE_APP_NAME) leader_unit_name = leader_unit.name - password = await get_password(ops_test) - unit_address = get_unit_address(ops_test, leader_unit_name) + password = get_password() + unit_address = get_unit_address(juju, leader_unit_name) # Test setting explicit numeric values (all values must be >= 2 per validation) worker_configs = { @@ -303,8 +313,8 @@ async def test_worker_process_configs(ops_test: OpsTest) -> None: "cpu-max-parallel-apply-workers-per-subscription": "8", } - await ops_test.model.applications[DATABASE_APP_NAME].set_config(worker_configs) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) + juju.ext.model.applications[DATABASE_APP_NAME].set_config(worker_configs) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) # Verify the configs are applied in PostgreSQL # Map charm config names to PostgreSQL parameter names @@ -319,7 +329,7 @@ async def test_worker_process_configs(ops_test: OpsTest) -> None: for config_name, expected_value in worker_configs.items(): pg_param = config_to_pg_param.get(config_name, config_name.replace("-", "_")) - result = await execute_query_on_unit(unit_address, password, f"SHOW {pg_param}") + result = execute_query_on_unit(unit_address, password, f"SHOW {pg_param}") actual_value = str(result[0]) if result else "" assert actual_value == expected_value, ( f"{pg_param}: expected {expected_value}, got {actual_value}" @@ -335,33 +345,31 @@ async def test_worker_process_configs(ops_test: OpsTest) -> None: "cpu-max-parallel-apply-workers-per-subscription": "auto", } - await ops_test.model.applications[DATABASE_APP_NAME].set_config(auto_configs) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) + juju.ext.model.applications[DATABASE_APP_NAME].set_config(auto_configs) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) # Verify "auto" values are resolved to integers (not the string "auto") for config_name in auto_configs: pg_param = config_to_pg_param.get(config_name, config_name.replace("-", "_")) - result = await execute_query_on_unit(unit_address, password, f"SHOW {pg_param}") + result = execute_query_on_unit(unit_address, password, f"SHOW {pg_param}") actual_value = str(result[0]) if result else "" assert actual_value != "auto", f"{pg_param} should be resolved to a number, not 'auto'" assert actual_value.isdigit(), f"{pg_param} should be a number, got '{actual_value}'" @pytest.mark.abort_on_fail -async def test_wal_compression_config(ops_test: OpsTest) -> None: +def test_wal_compression_config(juju: JujuFixture) -> None: """Test wal_compression configuration parameter.""" - leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + leader_unit = get_leader_unit(juju, DATABASE_APP_NAME) leader_unit_name = leader_unit.name - password = await get_password(ops_test) - unit_address = get_unit_address(ops_test, leader_unit_name) + password = get_password() + unit_address = get_unit_address(juju, leader_unit_name) # Test enabling WAL compression - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ - "cpu-wal-compression": "true" - }) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) + juju.ext.model.applications[DATABASE_APP_NAME].set_config({"cpu-wal-compression": "true"}) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) - result = await execute_query_on_unit(unit_address, password, "SHOW wal_compression") + result = execute_query_on_unit(unit_address, password, "SHOW wal_compression") # Verify it's a known compression algorithm known_algorithms = ["pglz", "lz4", "zstd"] assert result[0] in known_algorithms, ( @@ -369,10 +377,8 @@ async def test_wal_compression_config(ops_test: OpsTest) -> None: ) # Test disabling WAL compression - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ - "cpu-wal-compression": "false" - }) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) + juju.ext.model.applications[DATABASE_APP_NAME].set_config({"cpu-wal-compression": "false"}) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=300) - result = await execute_query_on_unit(unit_address, password, "SHOW wal_compression") + result = execute_query_on_unit(unit_address, password, "SHOW wal_compression") assert result[0] == "off" diff --git a/tests/integration/test_password_rotation.py b/tests/integration/test_password_rotation.py index f47622079f..578be70bdf 100644 --- a/tests/integration/test_password_rotation.py +++ b/tests/integration/test_password_rotation.py @@ -6,10 +6,9 @@ import psycopg2 import pytest -from pytest_operator.plugin import OpsTest -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( METADATA, check_patroni, db_connect, @@ -26,96 +25,95 @@ @pytest.mark.abort_on_fail @pytest.mark.skip_if_deployed -async def test_deploy_active(ops_test: OpsTest, charm): +def test_deploy_active(juju: JujuFixture, charm): """Build the charm and deploy it.""" - async with ops_test.fast_forward(): - await ops_test.model.deploy( + with juju.ext.fast_forward(): + juju.ext.model.deploy( charm, application_name=APP_NAME, num_units=3, - base=CHARM_BASE, config={"profile": "testing"}, ) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1500) + juju.ext.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1500) -async def test_password_rotation(ops_test: OpsTest): +def test_password_rotation(juju: JujuFixture): """Test password rotation action.""" # Get the initial passwords set for the system users. - superuser_password = await get_password(ops_test) - replication_password = await get_password(ops_test, "replication") - monitoring_password = await get_password(ops_test, "monitoring") - backup_password = await get_password(ops_test, "backup") - rewind_password = await get_password(ops_test, "rewind") - patroni_password = await get_password(ops_test, "patroni") + superuser_password = get_password() + replication_password = get_password("replication") + monitoring_password = get_password("monitoring") + backup_password = get_password("backup") + rewind_password = get_password("rewind") + patroni_password = get_password("patroni") # Change both passwords. - await set_password(ops_test, password="test-password") - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - new_superuser_password = await get_password(ops_test) + set_password(juju, password="test-password") + juju.ext.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) + new_superuser_password = get_password() assert superuser_password != new_superuser_password # For replication, generate a specific password and pass it to the action. new_replication_password = "test-password" - await set_password(ops_test, username="replication", password=new_replication_password) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - assert new_replication_password == await get_password(ops_test, "replication") + set_password(juju, username="replication", password=new_replication_password) + juju.ext.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) + assert new_replication_password == get_password("replication") assert replication_password != new_replication_password # For monitoring, generate a specific password and pass it to the action. new_monitoring_password = "test-password" - await set_password(ops_test, username="monitoring", password=new_monitoring_password) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - assert new_monitoring_password == await get_password(ops_test, "monitoring") + set_password(juju, username="monitoring", password=new_monitoring_password) + juju.ext.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) + assert new_monitoring_password == get_password("monitoring") assert monitoring_password != new_monitoring_password # For backup, generate a specific password and pass it to the action. new_backup_password = "test-password" - await set_password(ops_test, username="backup", password=new_backup_password) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - assert new_backup_password == await get_password(ops_test, "backup") + set_password(juju, username="backup", password=new_backup_password) + juju.ext.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) + assert new_backup_password == get_password("backup") assert backup_password != new_backup_password # For rewind, generate a specific password and pass it to the action. new_rewind_password = "test-password" - await set_password(ops_test, username="rewind", password=new_rewind_password) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - assert new_rewind_password == await get_password(ops_test, "rewind") + set_password(juju, username="rewind", password=new_rewind_password) + juju.ext.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) + assert new_rewind_password == get_password("rewind") assert rewind_password != new_rewind_password # Restart Patroni on any non-leader unit and check that # Patroni and PostgreSQL continue to work. restart_time = time.time() - for unit in ops_test.model.applications[APP_NAME].units: - if not await unit.is_leader_from_status(): - restart_patroni(ops_test, unit.name, patroni_password) - assert check_patroni(ops_test, unit.name, restart_time) + for unit in juju.ext.model.applications[APP_NAME].units: + if not unit.is_leader_from_status(): + restart_patroni(juju, unit.name, patroni_password) + assert check_patroni(juju, unit.name, restart_time) -async def test_db_connection_with_empty_password(ops_test: OpsTest): +def test_db_connection_with_empty_password(juju: JujuFixture): """Test that user can't connect with empty password.""" - primary = await get_primary(ops_test, f"{APP_NAME}/0") - address = get_unit_address(ops_test, primary) + primary = get_primary(juju, f"{APP_NAME}/0") + address = get_unit_address(juju, primary) with pytest.raises(psycopg2.Error), db_connect(address, "") as connection: connection.close() -async def test_no_password_change_on_invalid_password(ops_test: OpsTest) -> None: +def test_no_password_change_on_invalid_password(juju: JujuFixture) -> None: """Test that in general, there is no change when password validation fails.""" - password1 = await get_password(ops_test, username="replication") + password1 = get_password(username="replication") # The password has to be minimum 3 characters - await set_password(ops_test, username="replication", password="1") - password2 = await get_password(ops_test, username="replication") + set_password(juju, username="replication", password="1") + password2 = get_password(username="replication") # The password didn't change assert password1 == password2 -async def test_no_password_exposed_on_logs(ops_test: OpsTest) -> None: +def test_no_password_exposed_on_logs(juju: JujuFixture) -> None: """Test that passwords don't get exposed on postgresql logs.""" - for unit in ops_test.model.applications[APP_NAME].units: + for unit in juju.ext.model.applications[APP_NAME].units: try: - logs = await run_command_on_unit( - ops_test, + logs = run_command_on_unit( + juju, unit.name, "grep PASSWORD /var/snap/charmed-postgresql/common/var/log/postgresql/postgresql-*.log", ) diff --git a/tests/integration/test_pg_hba.py b/tests/integration/test_pg_hba.py index dc955076af..8af3d2691d 100644 --- a/tests/integration/test_pg_hba.py +++ b/tests/integration/test_pg_hba.py @@ -7,10 +7,9 @@ import psycopg2 import pytest -from pytest_operator.plugin import OpsTest -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( DATABASE_APP_NAME, db_connect, get_primary, @@ -28,18 +27,17 @@ @pytest.mark.abort_on_fail -async def test_pg_hba(ops_test: OpsTest, charm): - async with ops_test.fast_forward(): +def test_pg_hba(juju: JujuFixture, charm): + with juju.ext.fast_forward(): logger.info("Deploying charms") - if DATABASE_APP_NAME not in ops_test.model.applications: - await ops_test.model.deploy( + if DATABASE_APP_NAME not in juju.ext.model.applications: + juju.ext.model.deploy( charm, num_units=2, - base=CHARM_BASE, config={"profile": "testing"}, ) - if DATA_INTEGRATOR_APP_NAME not in ops_test.model.applications: - await ops_test.model.deploy( + if DATA_INTEGRATOR_APP_NAME not in juju.ext.model.applications: + juju.ext.model.deploy( DATA_INTEGRATOR_APP_NAME, config={"database-name": FIRST_DATABASE, "extra-user-roles": "SUPERUSER"}, ) @@ -47,23 +45,23 @@ async def test_pg_hba(ops_test: OpsTest, charm): logger.info("Adding relation between charms") relations = [ relation - for relation in ops_test.model.applications[DATABASE_APP_NAME].relations + for relation in juju.ext.model.applications[DATABASE_APP_NAME].relations if not relation.is_peer and f"{relation.requires.application_name}:{relation.requires.name}" == f"{DATA_INTEGRATOR_APP_NAME}:postgresql" ] if not relations: - await ops_test.model.add_relation(DATA_INTEGRATOR_APP_NAME, DATABASE_APP_NAME) + juju.ext.model.add_relation(DATA_INTEGRATOR_APP_NAME, DATABASE_APP_NAME) - await ops_test.model.wait_for_idle( + juju.ext.model.wait_for_idle( apps=[DATA_INTEGRATOR_APP_NAME, DATABASE_APP_NAME], status="active", timeout=1000 ) - primary = await get_primary(ops_test, f"{DATABASE_APP_NAME}/0") - address = get_unit_address(ops_test, primary) - data_integrator_unit = ops_test.model.applications[DATA_INTEGRATOR_APP_NAME].units[0] - action = await data_integrator_unit.run_action(action_name="get-credentials") - result = await action.wait() + primary = get_primary(juju, f"{DATABASE_APP_NAME}/0") + address = get_unit_address(juju, primary) + data_integrator_unit = juju.ext.model.applications[DATA_INTEGRATOR_APP_NAME].units[0] + action = data_integrator_unit.run_action(action_name="get-credentials") + result = action.wait() credentials = result.results connection = None try: @@ -107,9 +105,9 @@ async def test_pg_hba(ops_test: OpsTest, charm): sleep(90) - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units: try: - address = get_unit_address(ops_test, unit.name) + address = get_unit_address(juju, unit.name) logger.info( f"Checking that the user {FIRST_RELATION_USER} can connect to the database {FIRST_DATABASE} on {unit.name}" diff --git a/tests/integration/test_plugins.py b/tests/integration/test_plugins.py index 88c6d50842..750a9cdbd4 100644 --- a/tests/integration/test_plugins.py +++ b/tests/integration/test_plugins.py @@ -5,10 +5,9 @@ import psycopg2 as psycopg2 import pytest as pytest -from pytest_operator.plugin import OpsTest -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( DATABASE_APP_NAME, db_connect, get_password, @@ -94,17 +93,16 @@ @pytest.mark.abort_on_fail -async def test_plugins(ops_test: OpsTest, charm) -> None: +def test_plugins(juju: JujuFixture, charm) -> None: """Build and deploy one unit of PostgreSQL and then test the available plugins.""" # Build and deploy the PostgreSQL charm. - async with ops_test.fast_forward(): - await ops_test.model.deploy( + with juju.ext.fast_forward(): + juju.ext.model.deploy( charm, num_units=2, - base=CHARM_BASE, config={"profile": "testing"}, ) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500) sql_tests = { "plugin_citext_enable": CITEXT_EXTENSION_STATEMENT, @@ -173,13 +171,13 @@ def enable_disable_config(enabled: False): return config # Check that the available plugins are disabled. - primary = await get_primary(ops_test, f"{DATABASE_APP_NAME}/0") - password = await get_password(ops_test) - address = get_unit_address(ops_test, primary) + primary = get_primary(juju, f"{DATABASE_APP_NAME}/0") + password = get_password() + address = get_unit_address(juju, primary) config = enable_disable_config(False) - await ops_test.model.applications[DATABASE_APP_NAME].set_config(config) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") + juju.ext.model.applications[DATABASE_APP_NAME].set_config(config) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") logger.info("checking that the plugins are disabled") with db_connect(host=address, password=password) as connection: @@ -198,8 +196,8 @@ def enable_disable_config(enabled: False): logger.info("enabling the plugins") config = enable_disable_config(True) - await ops_test.model.applications[DATABASE_APP_NAME].set_config(config) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") + juju.ext.model.applications[DATABASE_APP_NAME].set_config(config) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") # Check that the available plugins are enabled. logger.info("checking that the plugins are enabled") @@ -214,11 +212,11 @@ def enable_disable_config(enabled: False): connection.close() -async def test_plugin_objects(ops_test: OpsTest) -> None: +def test_plugin_objects(juju: JujuFixture) -> None: """Checks if charm gets blocked when trying to disable a plugin in use.""" - primary = await get_primary(ops_test, f"{DATABASE_APP_NAME}/0") - password = await get_password(ops_test) - address = get_unit_address(ops_test, primary) + primary = get_primary(juju, f"{DATABASE_APP_NAME}/0") + password = get_password() + address = get_unit_address(juju, primary) logger.info("Creating an index object which depends on the pg_trgm config") with db_connect(host=address, password=password) as connection: @@ -230,26 +228,20 @@ async def test_plugin_objects(ops_test: OpsTest) -> None: connection.close() logger.info("Disabling the plugin on charm config, waiting for blocked status") - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ - "plugin_pg_trgm_enable": "False" - }) - await ops_test.model.block_until( - lambda: ops_test.model.units[primary].workload_status == "blocked", + juju.ext.model.applications[DATABASE_APP_NAME].set_config({"plugin_pg_trgm_enable": "False"}) + juju.ext.model.block_until( + lambda: juju.ext.model.units[primary].workload_status == "blocked", timeout=100, ) logger.info("Enabling the plugin back on charm config, status should resolve") - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ - "plugin_pg_trgm_enable": "True" - }) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") + juju.ext.model.applications[DATABASE_APP_NAME].set_config({"plugin_pg_trgm_enable": "True"}) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") logger.info("Re-disabling plugin, waiting for blocked status to come back") - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ - "plugin_pg_trgm_enable": "False" - }) - await ops_test.model.block_until( - lambda: ops_test.model.units[primary].workload_status == "blocked", + juju.ext.model.applications[DATABASE_APP_NAME].set_config({"plugin_pg_trgm_enable": "False"}) + juju.ext.model.block_until( + lambda: juju.ext.model.units[primary].workload_status == "blocked", timeout=100, ) @@ -260,5 +252,5 @@ async def test_plugin_objects(ops_test: OpsTest) -> None: connection.close() logger.info("Waiting for status to resolve again") - async with ops_test.fast_forward(fast_interval="60s"): - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") + with juju.ext.fast_forward(fast_interval="60s"): + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") diff --git a/tests/integration/test_storage.py b/tests/integration/test_storage.py index 5033a3e7a1..3cefedfccf 100644 --- a/tests/integration/test_storage.py +++ b/tests/integration/test_storage.py @@ -5,31 +5,29 @@ import logging import pytest -from pytest_operator.plugin import OpsTest -from .helpers import CHARM_BASE, DATABASE_APP_NAME +from .adapters import JujuFixture +from .jubilant_helpers import DATABASE_APP_NAME logger = logging.getLogger(__name__) @pytest.mark.abort_on_fail -async def test_storage(ops_test: OpsTest, charm): +def test_storage(juju: JujuFixture, charm): """Build and deploy the charm and check its storage list.""" - async with ops_test.fast_forward(): - await ops_test.model.deploy( + with juju.ext.fast_forward(): + juju.ext.model.deploy( charm, num_units=1, - base=CHARM_BASE, config={"profile": "testing"}, ) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active") logger.info("Checking charm storages") expected_storages = ["archive", "data", "logs", "temp"] - storages = await ops_test.model.list_storage() + storages = juju.ext.model.list_storage() assert len(storages) == 4, f"Expected 4 storages, got: {len(storages)}" for index, storage in enumerate(storages): - assert ( - storage["attachments"]["unit-postgresql-0"].__dict__["storage_tag"] - == f"storage-{expected_storages[index]}-{index}" - ), f"Storage {expected_storages[index]} not found" + assert storage["key"] == f"{expected_storages[index]}/{index}", ( + f"Storage {expected_storages[index]} not found" + ) diff --git a/tests/integration/test_subordinates.py b/tests/integration/test_subordinates.py index c3caeb2d0f..bf05d6269d 100644 --- a/tests/integration/test_subordinates.py +++ b/tests/integration/test_subordinates.py @@ -4,13 +4,11 @@ import logging import os -from asyncio import gather import pytest -from pytest_operator.plugin import OpsTest -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( scale_application, ) @@ -22,7 +20,7 @@ @pytest.fixture(scope="module") -async def check_subordinate_env_vars(ops_test: OpsTest) -> None: +def check_subordinate_env_vars(juju: JujuFixture) -> None: if ( not os.environ.get("UBUNTU_PRO_TOKEN", "").strip() or not os.environ.get("LANDSCAPE_ACCOUNT_NAME", "").strip() @@ -32,57 +30,51 @@ async def check_subordinate_env_vars(ops_test: OpsTest) -> None: @pytest.mark.abort_on_fail -async def test_deploy(ops_test: OpsTest, charm: str, check_subordinate_env_vars): - await gather( - ops_test.model.deploy( - charm, - application_name=DATABASE_APP_NAME, - num_units=3, - base=CHARM_BASE, - ), - ops_test.model.deploy( - UBUNTU_PRO_APP_NAME, - config={"token": os.environ["UBUNTU_PRO_TOKEN"]}, - channel="latest/edge", - num_units=0, - # TODO switch back to series when pylib juju can figure out the base: - # https://github.com/juju/python-libjuju/issues/1240 - series="noble", - ), - ops_test.model.deploy( - LS_CLIENT, - config={ - "account-name": os.environ["LANDSCAPE_ACCOUNT_NAME"], - "registration-key": os.environ["LANDSCAPE_REGISTRATION_KEY"], - "ppa": "ppa:landscape/self-hosted-beta", - }, - channel="latest/edge", - num_units=0, - base=CHARM_BASE, - ), +def test_deploy(juju: JujuFixture, charm: str, check_subordinate_env_vars): + juju.ext.model.deploy( + charm, + application_name=DATABASE_APP_NAME, + num_units=3, ) - - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=2000) - await ops_test.model.relate(f"{DATABASE_APP_NAME}:juju-info", f"{LS_CLIENT}:container") - await ops_test.model.relate( - f"{DATABASE_APP_NAME}:juju-info", f"{UBUNTU_PRO_APP_NAME}:juju-info" + juju.ext.model.deploy( + UBUNTU_PRO_APP_NAME, + config={"token": os.environ["UBUNTU_PRO_TOKEN"]}, + channel="latest/edge", + num_units=0, + # TODO switch back to series when pylib juju can figure out the base: + # https://github.com/juju/python-libjuju/issues/1240 + series="noble", + ) + juju.ext.model.deploy( + LS_CLIENT, + config={ + "account-name": os.environ["LANDSCAPE_ACCOUNT_NAME"], + "registration-key": os.environ["LANDSCAPE_REGISTRATION_KEY"], + "ppa": "ppa:landscape/self-hosted-beta", + }, + channel="latest/edge", + num_units=0, ) - await ops_test.model.wait_for_idle( + + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=2000) + juju.ext.model.relate(f"{DATABASE_APP_NAME}:juju-info", f"{LS_CLIENT}:container") + juju.ext.model.relate(f"{DATABASE_APP_NAME}:juju-info", f"{UBUNTU_PRO_APP_NAME}:juju-info") + juju.ext.model.wait_for_idle( apps=[LS_CLIENT, UBUNTU_PRO_APP_NAME, DATABASE_APP_NAME], status="active" ) -async def test_scale_up(ops_test: OpsTest, check_subordinate_env_vars): - await scale_application(ops_test, DATABASE_APP_NAME, 4) +def test_scale_up(juju: JujuFixture, check_subordinate_env_vars): + scale_application(juju, DATABASE_APP_NAME, 4) - await ops_test.model.wait_for_idle( + juju.ext.model.wait_for_idle( apps=[LS_CLIENT, UBUNTU_PRO_APP_NAME, DATABASE_APP_NAME], status="active", timeout=1500 ) -async def test_scale_down(ops_test: OpsTest, check_subordinate_env_vars): - await scale_application(ops_test, DATABASE_APP_NAME, 3) +def test_scale_down(juju: JujuFixture, check_subordinate_env_vars): + scale_application(juju, DATABASE_APP_NAME, 3) - await ops_test.model.wait_for_idle( + juju.ext.model.wait_for_idle( apps=[LS_CLIENT, UBUNTU_PRO_APP_NAME, DATABASE_APP_NAME], status="active", timeout=1500 ) diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index dc48f81c35..297d7bf19e 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -2,18 +2,16 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. import logging +import time import pytest as pytest -from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_attempt, stop_after_delay, wait_exponential, wait_fixed -from .ha_tests.helpers import ( - change_patroni_setting, -) -from .helpers import ( - CHARM_BASE, +from .adapters import JujuFixture +from .jubilant_helpers import ( DATABASE_APP_NAME, METADATA, + change_patroni_setting, change_primary_start_timeout, check_tls, check_tls_patroni_api, @@ -35,14 +33,13 @@ @pytest.mark.abort_on_fail @pytest.mark.skip_if_deployed -async def test_deploy_active(ops_test: OpsTest, charm): +def test_deploy_active(juju: JujuFixture, charm): """Build the charm and deploy it.""" - async with ops_test.fast_forward(): - await ops_test.model.deploy( + with juju.ext.fast_forward(): + juju.ext.model.deploy( charm, application_name=APP_NAME, num_units=3, - base=CHARM_BASE, config={"profile": "testing"}, ) # No wait between deploying charms, since we can't guarantee users will wait. Furthermore, @@ -50,72 +47,70 @@ async def test_deploy_active(ops_test: OpsTest, charm): @pytest.mark.abort_on_fail -async def test_tls_enabled(ops_test: OpsTest) -> None: +def test_tls_enabled(juju: JujuFixture) -> None: """Test that TLS is enabled when relating to the TLS Certificates Operator.""" - async with ops_test.fast_forward(): + with juju.ext.fast_forward(): # Deploy TLS Certificates operator. - await ops_test.model.deploy(tls_certificates_app_name, channel=tls_channel) + juju.ext.model.deploy(tls_certificates_app_name, channel=tls_channel) # Relate it to the PostgreSQL to enable TLS. - await ops_test.model.relate( + juju.ext.model.relate( f"{DATABASE_APP_NAME}:client-certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.relate( + juju.ext.model.relate( f"{DATABASE_APP_NAME}:peer-certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.wait_for_idle(status="active", timeout=1500, raise_on_error=False) + juju.ext.model.wait_for_idle(status="active", timeout=1500, raise_on_error=False) # Wait for all units enabling TLS. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: - assert await check_tls(ops_test, unit.name, enabled=True) - assert await check_tls_patroni_api(ops_test, unit.name, enabled=True) + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units: + assert check_tls(juju, unit.name, enabled=True) + assert check_tls_patroni_api(juju, unit.name, enabled=True) # Test TLS being used by pg_rewind. To accomplish that, get the primary unit # and a replica that will be promoted to primary (this should trigger a rewind # operation when the old primary is started again). - any_unit = ops_test.model.applications[DATABASE_APP_NAME].units[0].name - primary = await get_primary(ops_test, any_unit) + any_unit = juju.ext.model.applications[DATABASE_APP_NAME].units[0].name + primary = get_primary(juju, any_unit) replica = next( unit.name - for unit in ops_test.model.applications[DATABASE_APP_NAME].units + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units if unit.name != primary ) # Check if TLS enabled for replication - assert await check_tls_replication(ops_test, primary, enabled=True) + assert check_tls_replication(juju, primary, enabled=True) - patroni_password = await get_password(ops_test, "patroni") + patroni_password = get_password("patroni") # Enable additional logs on the PostgreSQL instance to check TLS # being used in a later step and make the fail-over to happens faster. - await ops_test.model.applications[DATABASE_APP_NAME].set_config({ + juju.ext.model.applications[DATABASE_APP_NAME].set_config({ "logging_log_connections": "True" }) - await ops_test.model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30 - ) - change_primary_start_timeout(ops_test, primary, 0, patroni_password) + juju.ext.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30) + change_primary_start_timeout(juju, primary, 0, patroni_password) # Pause Patroni so it doesn't wipe the custom changes - await change_patroni_setting( - ops_test, "pause", True, patroni_password, use_random_unit=True, tls=True + change_patroni_setting( + juju, "pause", True, patroni_password, use_random_unit=True, tls=True ) - async with ops_test.fast_forward("24h"): + with juju.ext.fast_forward("24h"): for attempt in Retrying( stop=stop_after_delay(60 * 5), wait=wait_exponential(multiplier=1, min=2, max=30) ): with attempt: # Promote the replica to primary. - await run_command_on_unit( - ops_test, + run_command_on_unit( + juju, replica, "sudo charmed-postgresql.pg-ctl -D /var/snap/charmed-postgresql/common/var/lib/postgresql/ promote", ) # Check that the replica was promoted. - host = get_unit_address(ops_test, replica) - password = await get_password(ops_test) + host = get_unit_address(juju, replica) + password = get_password() with db_connect(host, password) as connection: connection.autocommit = True with connection.cursor() as cursor: @@ -127,8 +122,8 @@ async def test_tls_enabled(ops_test: OpsTest) -> None: # Write some data to the initial primary (this causes a divergence # in the instances' timelines). - host = get_unit_address(ops_test, primary) - password = await get_password(ops_test) + host = get_unit_address(juju, primary) + password = get_password() with db_connect(host, password) as connection: connection.autocommit = True with connection.cursor() as cursor: @@ -137,47 +132,51 @@ async def test_tls_enabled(ops_test: OpsTest) -> None: connection.close() # Stop the initial primary by killing both Patroni and PostgreSQL OS processes. - await run_command_on_unit( - ops_test, + run_command_on_unit( + juju, primary, "pkill --signal SIGKILL -f /snap/charmed-postgresql/current/usr/lib/postgresql/16/bin/postgres", ) - await run_command_on_unit( - ops_test, + run_command_on_unit( + juju, primary, "pkill --signal SIGKILL -f /snap/charmed-postgresql/[0-9]*/usr/bin/patroni", ) # Check that the primary changed. - assert await primary_changed(ops_test, primary), "primary not changed" - change_primary_start_timeout(ops_test, primary, 300, patroni_password) + assert primary_changed(juju, primary), "primary not changed" + change_primary_start_timeout(juju, primary, 300, patroni_password) # Check the logs to ensure TLS is being used by pg_rewind. - primary = await get_primary(ops_test, primary) + primary = get_primary(juju, primary) for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5), reraise=True): with attempt: logger.info("Trying to grep for rewind logs.") - await run_command_on_unit( - ops_test, + run_command_on_unit( + juju, primary, "grep 'connection authorized: user=rewind database=postgres SSL enabled' /var/snap/charmed-postgresql/common/var/log/postgresql/postgresql-*.log", ) - await change_patroni_setting( - ops_test, "pause", False, patroni_password, use_random_unit=True, tls=True + change_patroni_setting( + juju, "pause", False, patroni_password, use_random_unit=True, tls=True ) - async with ops_test.fast_forward(): + with juju.ext.fast_forward(): # Remove the relation. - await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( + juju.ext.model.applications[DATABASE_APP_NAME].remove_relation( f"{DATABASE_APP_NAME}:client-certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.applications[DATABASE_APP_NAME].remove_relation( + juju.ext.model.applications[DATABASE_APP_NAME].remove_relation( f"{DATABASE_APP_NAME}:peer-certificates", f"{tls_certificates_app_name}:certificates" ) - await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1000) + # Add a sleep to avoid immediate passing of the idle check. + time.sleep(100) + juju.ext.model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", timeout=1800, idle_period=30 + ) # Wait for all units disabling TLS. - for unit in ops_test.model.applications[DATABASE_APP_NAME].units: - assert await check_tls(ops_test, unit.name, enabled=False) - assert await check_tls_patroni_api(ops_test, unit.name, enabled=False) + for unit in juju.ext.model.applications[DATABASE_APP_NAME].units: + assert check_tls(juju, unit.name, enabled=False) + assert check_tls_patroni_api(juju, unit.name, enabled=False)