Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 54 additions & 56 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from time import sleep
from typing import get_args

import jubilant
import psycopg2
import pytest
import requests
from jubilant import Juju
from psycopg2 import sql
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_exponential, wait_fixed
Expand All @@ -18,7 +20,6 @@

from .ha_tests.helpers import get_cluster_roles
from .helpers import (
CHARM_BASE,
DATABASE_APP_NAME,
STORAGE_PATH,
check_cluster_members,
Expand All @@ -28,54 +29,52 @@
get_password,
get_primary,
get_unit_address,
run_command_on_unit,
scale_application,
switchover,
)
from .high_availability.high_availability_helpers_new import (
get_unit_ip,
get_user_password,
wait_for_apps_status,
)

logger = logging.getLogger(__name__)

DB_APP_NAME = "postgresql"
MINUTE_SECS = 60
UNIT_IDS = [0, 1, 2]


@pytest.mark.abort_on_fail
@pytest.mark.skip_if_deployed
async def test_deploy(ops_test: OpsTest, charm: str):
"""Deploy the charm-under-test.

Assert on the unit status before any relations/configurations take place.
"""
# Deploy the charm with Patroni resource.
await ops_test.model.deploy(
charm,
application_name=DATABASE_APP_NAME,
num_units=3,
base=CHARM_BASE,
def test_deploy(juju: Juju, charm) -> None:
"""Simple test to ensure that the PostgreSQL and application charms get deployed."""
logging.info("Deploying PostgreSQL cluster")
juju.deploy(
charm=charm,
app=DB_APP_NAME,
base="ubuntu@24.04",
config={"profile": "testing"},
num_units=3,
)

# Reducing the update status frequency to speed up the triggering of deferred events.
await ops_test.model.set_config({"update-status-hook-interval": "10s"})

await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", timeout=1500)
assert ops_test.model.applications[DATABASE_APP_NAME].units[0].workload_status == "active"
logging.info("Wait for applications to become active")
juju.wait(
ready=wait_for_apps_status(jubilant.all_active, DB_APP_NAME),
timeout=20 * MINUTE_SECS,
)


@pytest.mark.abort_on_fail
@pytest.mark.parametrize("unit_id", UNIT_IDS)
async def test_database_is_up(ops_test: OpsTest, unit_id: int):
def test_database_is_up(juju: Juju, unit_id: int):
# Query Patroni REST API and check the status that indicates
# both Patroni and PostgreSQL are up and running.
host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}")
host = get_unit_ip(juju, DB_APP_NAME, f"{DB_APP_NAME}/{unit_id}")
result = requests.get(f"https://{host}:8008/health", verify=False)
assert result.status_code == 200


@pytest.mark.parametrize("unit_id", UNIT_IDS)
async def test_exporter_is_up(ops_test: OpsTest, unit_id: int):
def test_exporter_is_up(juju: Juju, unit_id: int):
# Query Patroni REST API and check the status that indicates
# both Patroni and PostgreSQL are up and running.
host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}")
host = get_unit_ip(juju, DB_APP_NAME, f"{DB_APP_NAME}/{unit_id}")
result = requests.get(f"http://{host}:9187/metrics")
assert result.status_code == 200
assert "pg_exporter_last_scrape_error 0" in result.content.decode("utf8"), (
Expand All @@ -84,14 +83,14 @@ async def test_exporter_is_up(ops_test: OpsTest, unit_id: int):


@pytest.mark.parametrize("unit_id", UNIT_IDS)
async def test_settings_are_correct(ops_test: OpsTest, unit_id: int):
def test_settings_are_correct(juju: Juju, unit_id: int):
# Connect to the PostgreSQL instance.
# Retrieving the operator user password using the action.
password = await get_password(ops_test)
password = get_user_password(juju, DB_APP_NAME, "operator")

# Connect to PostgreSQL.
host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}")
logger.info("connecting to the database host: %s", host)
host = get_unit_ip(juju, DB_APP_NAME, f"{DB_APP_NAME}/{unit_id}")
logging.info("connecting to the database host: %s", host)
with db_connect(host, password) as connection:
assert connection.status == psycopg2.extensions.STATUS_READY

Expand Down Expand Up @@ -160,20 +159,15 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int):
assert settings["retry_timeout"] == 10
assert settings["maximum_lag_on_failover"] == 1048576

logger.warning("Asserting port ranges")
unit = ops_test.model.applications[DATABASE_APP_NAME].units[unit_id]
assert unit.data["port-ranges"][0]["from-port"] == 5432
assert unit.data["port-ranges"][0]["to-port"] == 5432
assert unit.data["port-ranges"][0]["protocol"] == "tcp"
logging.warning("Asserting port ranges")
unit = juju.status().apps[DATABASE_APP_NAME].units[f"{DB_APP_NAME}/{unit_id}"]
assert unit.open_ports == ["5432/tcp"]


async def test_postgresql_locales(ops_test: OpsTest) -> None:
raw_locales = await run_command_on_unit(
ops_test,
ops_test.model.applications[DATABASE_APP_NAME].units[0].name,
"ls /snap/charmed-postgresql/current/usr/lib/locale",
)
locales = raw_locales.splitlines()
def test_postgresql_locales(juju: Juju) -> None:
task = juju.exec("ls /snap/charmed-postgresql/current/usr/lib/locale", unit=f"{DB_APP_NAME}/0")
task.raise_on_failure()
locales = task.stdout.splitlines()
locales.append("C")
locales.sort()

Expand All @@ -183,21 +177,25 @@ async def test_postgresql_locales(ops_test: OpsTest) -> None:
assert locales == list(get_args(SNAP_LOCALES))


async def test_postgresql_parameters_change(ops_test: OpsTest) -> None:
def test_postgresql_parameters_change(juju: Juju) -> None:
"""Test that's possible to change PostgreSQL parameters."""
await ops_test.model.applications[DATABASE_APP_NAME].set_config({
"memory_max_prepared_transactions": "100",
"memory_shared_buffers": "32768", # 2 * 128MB. Patroni may refuse the config if < 128MB
"response_lc_monetary": "en_GB.utf8",
"experimental_max_connections": "200",
})
await ops_test.model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30)
password = await get_password(ops_test)
juju.config(
app=DATABASE_APP_NAME,
values={
"memory_max_prepared_transactions": "100",
"memory_shared_buffers": "32768", # 2 * 128MB. Patroni may refuse the config if < 128MB
"response_lc_monetary": "en_GB.utf8",
"experimental_max_connections": "200",
},
)
sleep(5)
juju.wait(ready=wait_for_apps_status(jubilant.all_active, DB_APP_NAME))
password = get_user_password(juju, DB_APP_NAME, "operator")

# Connect to PostgreSQL.
for unit_id in UNIT_IDS:
host = get_unit_address(ops_test, f"{DATABASE_APP_NAME}/{unit_id}")
logger.info("connecting to the database host: %s", host)
host = get_unit_ip(juju, DB_APP_NAME, f"{DB_APP_NAME}/{unit_id}")
logging.info("connecting to the database host: %s", host)
try:
with (
psycopg2.connect(
Expand Down Expand Up @@ -344,7 +342,7 @@ async def test_persist_data_through_primary_deletion(ops_test: OpsTest):

# Write data to primary IP.
host = get_unit_address(ops_test, primary)
logger.info(f"connecting to primary {primary} on {host}")
logging.info(f"connecting to primary {primary} on {host}")
with db_connect(host, password) as connection:
connection.autocommit = True
with connection.cursor() as cursor:
Expand All @@ -364,7 +362,7 @@ async def test_persist_data_through_primary_deletion(ops_test: OpsTest):
# Testing write occurred to every postgres instance by reading from them
for unit in ops_test.model.applications[DATABASE_APP_NAME].units:
host = unit.public_address
logger.info("connecting to the database host: %s", host)
logging.info("connecting to the database host: %s", host)
with db_connect(host, password) as connection, connection.cursor() as cursor:
# Ensure we can read from "primarydeletiontest" table
cursor.execute("SELECT * FROM primarydeletiontest;")
Expand Down
Loading