diff --git a/poetry.lock b/poetry.lock index 8b2d8c3..24e7cd1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1611,6 +1611,22 @@ files = [ [package.extras] crypto-eth-addresses = ["eth-hash[pycryptodome] (>=0.7.0)"] +[[package]] +name = "valkey" +version = "6.1.1" +description = "Python client for Valkey forked from redis-py" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "valkey-6.1.1-py3-none-any.whl", hash = "sha256:e2691541c6e1503b53c714ad9a35551ac9b7c0bbac93865f063dbc859a46de92"}, + {file = "valkey-6.1.1.tar.gz", hash = "sha256:5880792990c6c2b5eb604a5ed5f98f300880b6dd92d123819b66ed54bb259731"}, +] + +[package.extras] +libvalkey = ["libvalkey (>=4.0.1)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==23.2.1)", "requests (>=2.31.0)"] + [[package]] name = "valkey-glide" version = "0.0.0" @@ -1673,4 +1689,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "54a93e43d0c1bbb152b9f6f3ac3fdc4d75bd5916d2a77624e9b8ced5f7222495" +content-hash = "b8ff2b895655a7499b09645e24a9c5a883911dc1d4eaee66cb2d275d93522954" diff --git a/pyproject.toml b/pyproject.toml index 9f503c7..3836144 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ data-platform-helpers = ">=0.1.7" validators = ">=0.35.0" dpcharmlibs-interfaces = ">=1.0.2" lightkube = ">=0.19.0" +valkey = "^6.1.1" [tool.poetry.requires-plugins] poetry-plugin-export = ">=1.8" diff --git a/src/charm.py b/src/charm.py index bc2b23e..8e643e4 100755 --- a/src/charm.py +++ b/src/charm.py @@ -11,7 +11,7 @@ from core.cluster_state import ClusterState from events.base_events import BaseEvents -from events.external_clients import ExternalClientsEvents +from events.external_clients import ExternalClientsEvents, TopologyChangedCharmEvents from events.tls import TLSEvents from literals import CONTAINER, Substrate from managers.cluster import ClusterManager @@ -19,6 +19,7 @@ from managers.external_clients import ExternalClientsManager from managers.sentinel import SentinelManager from managers.tls import TLSManager +from managers.topology import TopologyManager from workload_k8s import ValkeyK8sWorkload from workload_vm import ValkeyVmWorkload @@ -28,6 +29,8 @@ class ValkeyCharm(ops.CharmBase): """Charmed Operator for Valkey.""" + on = TopologyChangedCharmEvents() + def __init__(self, *args) -> None: super().__init__(*args) try: @@ -50,6 +53,7 @@ def __init__(self, *args) -> None: self.sentinel_manager = SentinelManager(state=self.state, workload=self.workload) self.tls_manager = TLSManager(state=self.state, workload=self.workload) self.client_manager = ExternalClientsManager(state=self.state, workload=self.workload) + self.topology_manager = TopologyManager(state=self.state, workload=self.workload) # --- STATUS HANDLER --- self.status = StatusHandler( diff --git a/src/core/models.py b/src/core/models.py index 48715dd..3f43cbe 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -84,6 +84,7 @@ class PeerUnitModel(PeerModel): is_valkey_healthy: bool = Field(default=True) is_sentinel_healthy: bool = Field(default=True) client_user_epoch: float = Field(default=0) + topology_observer_pid: int = Field(default=0) class RelationState: diff --git a/src/events/base_events.py b/src/events/base_events.py index e2e1df0..6a5e67e 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -250,6 +250,14 @@ def _on_unit_fully_started(self, event: UnitFullyStarted) -> None: self.charm.unit.open_port("tcp", CLIENT_PORT) self.charm.unit.open_port("tcp", TLS_PORT) + if not self.charm.unit.is_leader(): + return + + try: + self.charm.topology_manager.start_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to start topology observer: %s", e) + def _on_peer_relation_changed(self, _: ops.RelationChangedEvent) -> None: """Handle event received by all units when a unit's relation data changes.""" try: @@ -264,6 +272,15 @@ def _on_peer_relation_changed(self, _: ops.RelationChangedEvent) -> None: for lock in [StartLock(self.charm.state), RestartLock(self.charm.state)]: lock.process() + if not self.charm.state.unit_server.is_active: + return + + # need to pick up scaling operations, TLS switchover, CA rotation and so on + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None: """Handle event received by all units when a unit departs.""" try: @@ -272,10 +289,27 @@ def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None: logger.error(f"Failed to update sentinel quorum: {e}") # not critical to defer here, we can wait for the next relation change + if not self.charm.unit.is_leader() or not self.charm.state.unit_server.is_active: + return + + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + def _on_update_status(self, event: ops.UpdateStatusEvent) -> None: """Handle the update-status event.""" if not self.charm.state.unit_server.is_started: logger.warning("Service not started") + return + + if not self.charm.unit.is_leader(): + return + + try: + self.charm.topology_manager.start_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to start topology observer: %s", e) def _on_leader_elected(self, event: ops.LeaderElectedEvent) -> None: """Handle the leader-elected event.""" @@ -294,6 +328,12 @@ def _on_leader_elected(self, event: ops.LeaderElectedEvent) -> None: if not self.charm.unit.is_leader(): return + if self.charm.state.unit_server.is_active: + try: + self.charm.topology_manager.start_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to start topology observer: %s", e) + if self.charm.state.cluster.internal_users_credentials: logger.debug("Internal user credentials already set") return @@ -367,6 +407,12 @@ def _on_config_changed(self, event: ops.ConfigChangedEvent) -> None: event.defer() return + # propagate updated credentials to topology observer + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: """Handle the secret_changed event.""" if not (admin_secret_id := self.charm.config.get(INTERNAL_USERS_PASSWORD_CONFIG)): @@ -384,6 +430,13 @@ def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: ): event.defer() return + + # propagate updated credentials to topology observer + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + return # from here, code is only relevant for non-leader units @@ -391,8 +444,11 @@ def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: # leader unit processed the secret change from user, non-leader units can replicate try: self.charm.config_manager.set_acl_file() + self.charm.config_manager.set_sentinel_acl_file() if self.charm.state.unit_server.is_started: self.charm.cluster_manager.reload_acl_file() + # todo: request rolling restart + self.charm.sentinel_manager.restart_service() # update the local unit admin password to match the leader self.charm.config_manager.update_local_valkey_admin_password() if self.charm.state.unit_server.is_started: @@ -454,8 +510,11 @@ def _update_internal_users_password(self, secret_id: str) -> None: logger.info("Password(s) for internal users have changed") try: self.charm.config_manager.set_acl_file(passwords=new_passwords) + self.charm.config_manager.set_sentinel_acl_file(passwords=new_passwords) if self.charm.state.unit_server.is_started: self.charm.cluster_manager.reload_acl_file() + # todo: request rolling restart + self.charm.sentinel_manager.restart_service() self.charm.state.cluster.update( { f"{user.value.replace('-', '_')}_password": new_passwords[user.value] @@ -549,6 +608,9 @@ def _on_storage_detaching(self, event: ops.StorageDetachingEvent) -> None: primary_ip, ) + if self.charm.unit.is_leader(): + self.charm.topology_manager.stop_observer() + # stop valkey and sentinel processes self.charm.workload.stop() active_sentinels = [ diff --git a/src/events/external_clients.py b/src/events/external_clients.py index 453fd44..d5c757a 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -43,6 +43,16 @@ logger = logging.getLogger(__name__) +class TopologyChangedEvent(ops.EventBase): + """A custom event for topology changes.""" + + +class TopologyChangedCharmEvents(ops.CharmEvents): + """A CharmEvent extension to observe topology changes.""" + + topology_changed = ops.EventSource(TopologyChangedEvent) + + class ExternalClientsEvents(ops.Object): """Handle all events for external client relations.""" @@ -84,9 +94,12 @@ def __init__(self, charm: "ValkeyCharm"): self.framework.observe( self.certificate_transfer.on.certificates_removed, self._on_ca_removed ) + self.framework.observe(self.charm.on.topology_changed, self._on_topology_changed) def _on_bulk_resources_requested( - self, event: BulkResourcesRequestedEvent[RequirerCommonModel] | ResourceRequestedEvent + self, + event: BulkResourcesRequestedEvent[RequirerCommonModel] + | ResourceRequestedEvent[RequirerCommonModel], ) -> None: """Handle bulk resources requested event.""" if not self.charm.unit.is_leader(): @@ -195,6 +208,7 @@ def _on_peer_relation_changed(self, event: ops.RelationChangedEvent) -> None: if self.charm.unit.is_leader() and self.charm.state.substrate == Substrate.K8S: try: self.charm.sentinel_manager.reconcile_k8s_services() + self.charm.sentinel_manager.set_pod_labels() except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: logger.error("Error updating Kubernetes services: %s", e) event.defer() @@ -372,3 +386,27 @@ def _on_ca_removed(self, event: CertificatesRemovedEvent) -> None: logger.error("Error removing CA certificates for external clients: %s", e) event.defer() return + + def _on_topology_changed(self, event: TopologyChangedEvent) -> None: + """Handle custom events for topology changes.""" + if not self.charm.unit.is_leader(): + return + + logger.info("Received topology-changed event") + if self.charm.state.substrate == Substrate.K8S: + try: + self.charm.sentinel_manager.set_pod_labels() + except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: + logger.error("Error updating Kubernetes services: %s", e) + event.defer() + return + + if not self.charm.state.external_client_relations: + return + + try: + self._update_client_relations() + except (ValkeyCannotGetPrimaryIPError, ValkeyWorkloadCommandError) as e: + logger.error("Error updating client relations: %s", e) + event.defer() + return diff --git a/src/literals.py b/src/literals.py index 38046e5..7cc5269 100644 --- a/src/literals.py +++ b/src/literals.py @@ -26,6 +26,10 @@ ACL_FILE = "var/lib/valkey/users.acl" SENTINEL_ACL_FILE = "var/lib/valkey/sentinel-users.acl" +# todo: these paths require root access, should be moved to dedicated user directories +TOPOLOGY_OBSERVER_LOG_FILE = "/var/log/topology_observer.log" +TOPOLOGY_OBSERVER_TLS_CA_FILE = "/etc/ssl/certs/valkey_ca.pem" + PEER_RELATION = "valkey-peers" STATUS_PEERS_RELATION = "status-peers" CLIENT_TLS_RELATION_NAME = "client-certificates" diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index ff40412..892b7cc 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -381,12 +381,14 @@ def set_quorum(self, quorum: int) -> None: client.set(self.state.endpoint, PRIMARY_NAME, "quorum", str(quorum)) def reconcile_k8s_services(self) -> None: - """Create or update the services and pod labels in Kubernetes.""" + """Create or update the services in Kubernetes.""" valkey_port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT self.k8s_client.ensure_endpoint_service(role=K8sService.PRIMARY.value, port=valkey_port) self.k8s_client.ensure_endpoint_service(role=K8sService.REPLICAS.value, port=valkey_port) + def set_pod_labels(self) -> None: + """Set labels for primary and replica pods in Kubernetes.""" primary_endpoint = self.get_primary_ip() for unit in self.state.servers: if not unit.is_active: diff --git a/src/managers/topology.py b/src/managers/topology.py new file mode 100644 index 0000000..8dd5928 --- /dev/null +++ b/src/managers/topology.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Manager for Cluster Topology.""" + +import logging +import os +import signal +import subprocess +from pathlib import Path +from sys import version_info + +from core.base_workload import WorkloadBase +from core.cluster_state import ClusterState +from literals import ( + SENTINEL_PORT, + SENTINEL_TLS_PORT, + TOPOLOGY_OBSERVER_LOG_FILE, + TOPOLOGY_OBSERVER_TLS_CA_FILE, + CharmUsers, +) + +logger = logging.getLogger(__name__) + + +class TopologyManager: + """Observe the topology for Valkey Sentinel.""" + + name: str = "topology_observer" + state: ClusterState + + def __init__(self, state: ClusterState, workload: WorkloadBase): + self.state = state + self.workload = workload + + def start_observer(self) -> None: + """Start the topology observer as a subprocess.""" + if (observer_pid := self.state.unit_server.model.topology_observer_pid) != 0: + try: + # check if the process already runs + os.kill(int(observer_pid), 0) + return + except OSError: + logger.debug("Topology observer not running") + pass + + # Generate the venv path based on the existing lib path + env = os.environ.copy() + env.pop("JUJU_CONTEXT_ID", None) + for loc in env["PYTHONPATH"].split(":"): + path = Path(loc) + venv_path = ( + path + / ".." + / "venv" + / "lib" + / f"python{version_info.major}.{version_info.minor}" + / "site-packages" + ) + if path.stem == "lib": + env["PYTHONPATH"] = f"{venv_path.resolve()}:{env['PYTHONPATH']}" + break + + # Gather Valkey hosts for connection + started_servers = [ + unit.get_endpoint(self.state.substrate) + for unit in self.state.servers + if unit.is_active + ] + port = SENTINEL_TLS_PORT if self.state.unit_server.is_tls_enabled else SENTINEL_PORT + hosts = ",".join(sorted([f"{server}:{port}" for server in started_servers])) + + if self.state.unit_server.is_tls_enabled: + # Store current TLS CA cert on operator container + tls_ca_cert = self.workload.read_file(self.workload.tls_paths.client_ca) + path = Path(TOPOLOGY_OBSERVER_TLS_CA_FILE) + path.write_text(tls_ca_cert) + + logging.info("Starting topology observer") + pid = subprocess.Popen( # noqa: S603 + [ + "/usr/bin/python3", + "src/scripts/topology_observer.py", + hosts, + CharmUsers.SENTINEL_CHARM_ADMIN.value, # username + self.state.cluster.internal_users_credentials.get( + CharmUsers.SENTINEL_CHARM_ADMIN.value, "" + ), # password + str(self.state.unit_server.is_tls_enabled), + self.state.unit_server.unit_name, + self.state.charm.charm_dir, + ], + # File shouldn't close + stdout=open(TOPOLOGY_OBSERVER_LOG_FILE, "a"), # noqa: SIM115 + stderr=subprocess.STDOUT, + env=env, + ).pid + + self.state.unit_server.update({"topology_observer_pid": pid}) + logging.info(f"Started topology observer process with PID {pid}") + + def stop_observer(self) -> None: + """Stop the topology observer.""" + if (observer_pid := self.state.unit_server.model.topology_observer_pid) == 0: + logger.debug("Topology observer already stopped") + return + + logger.debug("Stopping topology observer") + try: + os.kill(int(observer_pid), signal.SIGTERM) + logger.info("Topology observer stopped") + except OSError: + pass + finally: + self.state.unit_server.update({"topology_observer_pid": ""}) + + def restart_observer(self) -> None: + """Stop and start the topology observer to pickup host changes.""" + self.stop_observer() + self.start_observer() diff --git a/src/scripts/topology_observer.py b/src/scripts/topology_observer.py new file mode 100644 index 0000000..db1dcaa --- /dev/null +++ b/src/scripts/topology_observer.py @@ -0,0 +1,101 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Topology observer class for checking changes in Primary/Replica topology.""" + +import logging +import signal +import subprocess +import sys +import time + +from valkey.sentinel import MasterNotFoundError, Sentinel + +from literals import PRIMARY_NAME, TOPOLOGY_OBSERVER_TLS_CA_FILE + +# use global variable for gracefully handling stop signals +continue_running = True + +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.DEBUG, + datefmt="%Y-%m-%d %H:%M:%S", +) + + +def dispatch(unit_name: str, charm_dir: str) -> None: + """Dispatch a Juju custom event.""" + custom_event = "topology_changed" + + juju_run_command = "/usr/bin/juju-exec" + dispatch_command = f"JUJU_DISPATCH_PATH=hooks/{custom_event} {charm_dir}/dispatch" + + subprocess.run([juju_run_command, "-u", unit_name, dispatch_command], check=True) + + +def handle_stop_signal(signum, frame) -> None: + """Stop the execution gracefully.""" + global continue_running + continue_running = False + + +def main() -> None: + """Start a Sentinel client and check changes to primary.""" + hosts, username, password, tls, unit_name, charm_dir = sys.argv[1:] + + # handle the stop signal for a graceful stop of the subscription client + signal.signal(signal.SIGTERM, handle_stop_signal) + + logging.info("Starting new observer for hosts %s with tls=%s", hosts, tls) + + host_list = hosts.split(",") + addresses = [ + (hostname, int(port)) for host in host_list for hostname, port in [host.split(":")] + ] + tls_enabled = True if tls == "True" else False + sentinel_kwargs = { + "username": username, + "password": password, + "decode_responses": True, + "ssl": tls_enabled, + "ssl_ca_certs": TOPOLOGY_OBSERVER_TLS_CA_FILE if tls_enabled else None, + } + + primary_name = "" + previous_primary = "" + + while continue_running: + time.sleep(1) + + if primary_name != "": + previous_primary = primary_name + + sentinel = Sentinel( + sentinels=addresses, + socket_timeout=0.1, + sentinel_kwargs=sentinel_kwargs, + ) + + try: + primary_name = sentinel.discover_master(PRIMARY_NAME)[0] + except MasterNotFoundError as e: + logging.error("Failed to discover primary: %s", e) + continue + + if previous_primary == "" or primary_name == previous_primary: + continue + + logging.info( + "Primary change detected: previously %s, now %s", previous_primary, primary_name + ) + try: + dispatch(unit_name, charm_dir) + except subprocess.CalledProcessError as e: + logging.error("Error when dispatching Juju event: %s", e) + + else: + logging.info("Gracefully stopping observer") + + +if __name__ == "__main__": + main() diff --git a/src/workload_vm.py b/src/workload_vm.py index 8673cab..dc36570 100644 --- a/src/workload_vm.py +++ b/src/workload_vm.py @@ -93,8 +93,8 @@ def install(self, revision: str | None = None, retry_and_raise: bool = True) -> revision = str(SNAP_REVISIONS[platform.machine()]) try: - # TODO revesit this logic after snapd update is released - # refresh snapd to use candidate to bypass risv check issue. + # TODO revisit this logic after snapd update is released + # refresh snapd to use candidate to bypass risc check issue. snap.add("snapd", channel="candidate") # as long as 26.04 is not stable, we need to install the core26 snap from beta snap.add("core26", channel="beta") diff --git a/tests/integration/clients/test_client_relation.py b/tests/integration/clients/test_client_relation.py index 2683834..780feee 100644 --- a/tests/integration/clients/test_client_relation.py +++ b/tests/integration/clients/test_client_relation.py @@ -16,13 +16,16 @@ import jubilant import pytest -from literals import Substrate +from literals import CharmUsers, Substrate from tests.integration.helpers import ( APP_NAME, IMAGE_RESOURCE, TLS_CHANNEL, TLS_NAME, are_agents_idle, + exec_valkey_cli, + get_cluster_addresses, + get_password, ) logger = logging.getLogger(__name__) @@ -166,6 +169,77 @@ def test_integrate_client_interface_v1(juju: jubilant.Juju) -> None: assert result == TEST_VALUE +def test_failover_topology_update(juju: jubilant.Juju) -> None: + """Trigger a failover and ensure clients can still access Valkey.""" + ip_address = get_cluster_addresses(juju, APP_NAME)[0] + logger.info("Initiate failover through Sentinel %s", ip_address) + + failover_result = exec_valkey_cli( + hostname=ip_address, + username=CharmUsers.SENTINEL_CHARM_ADMIN, + password=get_password(juju, CharmUsers.SENTINEL_CHARM_ADMIN), + command="sentinel failover primary", + tls_enabled=False, + sentinel=True, + ).stdout + assert failover_result == "OK", "Failover not successful" + juju.wait( + lambda status: are_agents_idle(status, APP_NAME, idle_period=60, unit_count=NUM_UNITS), + timeout=600, + ) + + logger.info("Ensure access after failover for v0 client") + requirer_unit = next(iter(juju.status().get_units(REQUIRER_V0_NAME))) + get_credentials_action = juju.run(requirer_unit, "get-credentials") + username = get_credentials_action.results["usernames"] + + logger.info("Write data to granted keyspace") + set_action = juju.run( + requirer_unit, + "set", + params={"key": f"requirer-charm:{TEST_KEY}", "value": TEST_VALUE, "user": username}, + ) + assert set_action.status == "completed", "Action should succeed" + + logger.info("Read data that was just written") + get_action = juju.run( + requirer_unit, + "get", + params={"key": f"requirer-charm:{TEST_KEY}", "user": username}, + ) + assert get_action.status == "completed", "Action should succeed" + result = get_action.results["result"] + assert result == TEST_VALUE + + logger.info("Ensure access after failover for v1 client") + requirer_unit = next(iter(juju.status().get_units(REQUIRER_V1_NAME))) + get_credentials_action = juju.run(requirer_unit, "get-credentials") + usernames = get_credentials_action.results["usernames"] + user_restricted_keyspace = usernames.split(",")[0] + + logger.info("Write data to granted keyspace") + set_action = juju.run( + requirer_unit, + "set", + params={ + "key": f"requirer-charm:{TEST_KEY}", + "value": TEST_VALUE, + "user": user_restricted_keyspace, + }, + ) + assert set_action.status == "completed", "Action should succeed" + + logger.info("Read data that was just written") + get_action = juju.run( + requirer_unit, + "get", + params={"key": f"requirer-charm:{TEST_KEY}", "user": user_restricted_keyspace}, + ) + assert get_action.status == "completed", "Action should succeed" + result = get_action.results["result"] + assert result == TEST_VALUE + + def test_enable_tls(juju: jubilant.Juju) -> None: """Enable TLS on Valkey and the clients and ensure they can still read and write.""" logger.info("Enabling client TLS") diff --git a/tests/integration/tls/test_certificate_options.py b/tests/integration/tls/test_certificate_options.py index b43e8f7..e812ca4 100644 --- a/tests/integration/tls/test_certificate_options.py +++ b/tests/integration/tls/test_certificate_options.py @@ -287,5 +287,5 @@ def test_certificate_denied(juju: jubilant.Juju) -> None: lambda status: are_apps_active_and_agents_idle( status, APP_NAME, idle_period=30, unit_count=NUM_UNITS ), - timeout=100, + timeout=600, ) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 515b691..0a67b40 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -32,6 +32,11 @@ def mock_k8s_client(mocker): mocker.patch("lightkube.core.client.GenericSyncClient") +@pytest.fixture(autouse=True) +def mock_start_topology_observer(mocker): + mocker.patch("managers.topology.TopologyManager.start_observer") + + @pytest.fixture(autouse=True) def tenacity_wait(mocker): mocker.patch("tenacity.nap.time") diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 688542c..6215032 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -498,13 +498,17 @@ def test_config_changed_leader_unit(cloud_spec): ) with ( patch("managers.config.ConfigManager.set_acl_file") as mock_set_acl_file, + patch("managers.config.ConfigManager.set_sentinel_acl_file") as set_sentinel_acl_file, patch("common.client.ValkeyClient.acl_load") as mock_acl_load, patch("common.client.ValkeyClient.config_set") as mock_config_set, + patch("managers.sentinel.SentinelManager.restart_service") as restart_sentinel, ): state_out = ctx.run(ctx.on.config_changed(), state_in) mock_set_acl_file.assert_called_once() mock_acl_load.assert_called_once() mock_config_set.assert_called_once() + set_sentinel_acl_file.assert_called_once() + restart_sentinel.assert_called_once() secret_out = state_out.get_secret( label=f"{PEER_RELATION}.{APP_NAME}.app.{INTERNAL_USERS_SECRET_LABEL_SUFFIX}" ) @@ -620,15 +624,19 @@ def test_change_password_secret_changed_non_leader_unit(cloud_spec): "events.base_events.BaseEvents._update_internal_users_password" ) as mock_update_password, patch("managers.config.ConfigManager.set_acl_file") as mock_set_acl_file, + patch("managers.config.ConfigManager.set_sentinel_acl_file") as set_sentinel_acl_file, patch("common.client.ValkeyClient.acl_load") as mock_acl_load, patch("common.client.ValkeyClient.config_set") as mock_config_set, patch("managers.sentinel.SentinelManager.get_primary_ip", return_value="127.0.1.1"), + patch("managers.sentinel.SentinelManager.restart_service") as restart_sentinel, ): ctx.run(ctx.on.secret_changed(password_secret), state_in) mock_update_password.assert_not_called() mock_set_acl_file.assert_called_once() mock_acl_load.assert_called_once() mock_config_set.assert_called_once() + set_sentinel_acl_file.assert_called_once() + restart_sentinel.assert_called_once() def test_change_password_secret_changed_non_leader_unit_not_successful(cloud_spec): @@ -658,6 +666,7 @@ def test_change_password_secret_changed_non_leader_unit_not_successful(cloud_spe "events.base_events.BaseEvents._update_internal_users_password" ) as mock_update_password, patch("managers.config.ConfigManager.set_acl_file") as mock_set_acl_file, + patch("managers.config.ConfigManager.set_sentinel_acl_file") as set_sentinel_acl_file, patch( "common.client.ValkeyClient.exec_cli_command", side_effect=ValkeyWorkloadCommandError("Failed to execute command"), @@ -668,6 +677,7 @@ def test_change_password_secret_changed_non_leader_unit_not_successful(cloud_spe state_out = manager.run() mock_update_password.assert_not_called() mock_set_acl_file.assert_called_once() + set_sentinel_acl_file.assert_called_once() mock_exec_command.assert_called_once_with( ["acl", "load"], hostname="valkey-0.valkey-endpoints" )