From c042adf16a369714ce489bc9194d2d6d99031b77 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Thu, 16 Apr 2026 17:28:22 +0200 Subject: [PATCH 01/18] add K8s client class, create and update services and pod labels --- poetry.lock | 144 +++++++++++++++++++++++++++++++-- pyproject.toml | 1 + src/common/exceptions.py | 4 + src/common/k8s_client.py | 102 +++++++++++++++++++++++ src/events/external_clients.py | 24 ++++-- src/literals.py | 8 ++ src/managers/sentinel.py | 39 +++++++++ tests/unit/conftest.py | 5 ++ 8 files changed, 315 insertions(+), 12 deletions(-) create mode 100644 src/common/k8s_client.py diff --git a/poetry.lock b/poetry.lock index 2f17dc38..8b2d8c36 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand. [[package]] name = "allure-pytest" @@ -66,7 +66,7 @@ version = "4.13.0" description = "High-level concurrency and networking framework on top of asyncio or Trio" optional = false python-versions = ">=3.10" -groups = ["integration"] +groups = ["main", "integration"] files = [ {file = "anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708"}, {file = "anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc"}, @@ -97,7 +97,7 @@ version = "2026.2.25" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.7" -groups = ["integration"] +groups = ["main", "integration"] files = [ {file = "certifi-2026.2.25-py3-none-any.whl", hash = "sha256:027692e4402ad994f1c42e52a4997a9763c646b73e4096e4d5d6db8af1d6f0fa"}, {file = "certifi-2026.2.25.tar.gz", hash = "sha256:e887ab5cee78ea814d3472169153c2d12cd43b14bd03329a39a9c6e2e80bfba7"}, @@ -674,13 +674,113 @@ files = [ {file = "durationpy-0.10.tar.gz", hash = "sha256:1fa6893409a6e739c9c72334fc65cca1f355dbdd93405d30f726deb5bde42fba"}, ] +[[package]] +name = "h11" +version = "0.16.0" +description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"}, + {file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"}, +] + +[[package]] +name = "h2" +version = "4.3.0" +description = "Pure-Python HTTP/2 protocol implementation" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "h2-4.3.0-py3-none-any.whl", hash = "sha256:c438f029a25f7945c69e0ccf0fb951dc3f73a5f6412981daee861431b70e2bdd"}, + {file = "h2-4.3.0.tar.gz", hash = "sha256:6c59efe4323fa18b47a632221a1888bd7fde6249819beda254aeca909f221bf1"}, +] + +[package.dependencies] +hpack = ">=4.1,<5" +hyperframe = ">=6.1,<7" + +[[package]] +name = "hpack" +version = "4.1.0" +description = "Pure-Python HPACK header encoding" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496"}, + {file = "hpack-4.1.0.tar.gz", hash = "sha256:ec5eca154f7056aa06f196a557655c5b009b382873ac8d1e66e79e87535f1dca"}, +] + +[[package]] +name = "httpcore" +version = "1.0.9" +description = "A minimal low-level HTTP client." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"}, + {file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"}, +] + +[package.dependencies] +certifi = "*" +h11 = ">=0.16" + +[package.extras] +asyncio = ["anyio (>=4.0,<5.0)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +trio = ["trio (>=0.22.0,<1.0)"] + +[[package]] +name = "httpx" +version = "0.28.1" +description = "The next generation HTTP client." +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad"}, + {file = "httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc"}, +] + +[package.dependencies] +anyio = "*" +certifi = "*" +h2 = {version = ">=3,<5", optional = true, markers = "extra == \"http2\""} +httpcore = "==1.*" +idna = "*" + +[package.extras] +brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] +http2 = ["h2 (>=3,<5)"] +socks = ["socksio (==1.*)"] +zstd = ["zstandard (>=0.18.0)"] + +[[package]] +name = "hyperframe" +version = "6.1.0" +description = "Pure-Python HTTP/2 framing" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "hyperframe-6.1.0-py3-none-any.whl", hash = "sha256:b03380493a519fce58ea5af42e4a42317bf9bd425596f7a0835ffce80f1a42e5"}, + {file = "hyperframe-6.1.0.tar.gz", hash = "sha256:f630908a00854a7adeabd6382b43923a4c4cd4b821fcb527e6ab9e15382a3b08"}, +] + [[package]] name = "idna" version = "3.11" description = "Internationalized Domain Names in Applications (IDNA)" optional = false python-versions = ">=3.8" -groups = ["integration"] +groups = ["main", "integration"] files = [ {file = "idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea"}, {file = "idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902"}, @@ -753,7 +853,7 @@ files = [ ] [package.dependencies] -certifi = ">=14.05.14" +certifi = ">=14.5.14" durationpy = ">=0.7" python-dateutil = ">=2.5.3" pyyaml = ">=5.4.1" @@ -767,6 +867,38 @@ websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" adal = ["adal (>=1.0.2)"] google-auth = ["google-auth (>=1.0.1)"] +[[package]] +name = "lightkube" +version = "0.19.1" +description = "Lightweight kubernetes client library" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "lightkube-0.19.1-py3-none-any.whl", hash = "sha256:49fef08a1c7aa42082820111ffd5dbbaf78f54c99385810690fc9d94eef5c80d"}, + {file = "lightkube-0.19.1.tar.gz", hash = "sha256:4c8526068024c194c02fbc0ca6021922feb4b1b9d741d330129f873b27e0fe97"}, +] + +[package.dependencies] +httpx = {version = ">=0.28.1,<1.0.0", extras = ["http2"]} +lightkube-models = ">=1.15.12.0" +pyyaml = "*" + +[package.extras] +jinja-templates = ["jinja2"] + +[[package]] +name = "lightkube-models" +version = "1.35.0.8" +description = "Models and Resources for lightkube module" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "lightkube_models-1.35.0.8-py3-none-any.whl", hash = "sha256:d01fce42f96baf47a77a571bff59d6a513e96ae043fc03cfaaaaf79c609c4441"}, + {file = "lightkube_models-1.35.0.8.tar.gz", hash = "sha256:dbc624596a7d94e6c43c5deda972be964202e0e8f26e2ab8e61d589d710b5e22"}, +] + [[package]] name = "markdown-it-py" version = "4.0.0" @@ -1541,4 +1673,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "1bf4d9f6216b7c7cd55932889297619563057999e263c02090a73c7e903ccc6d" +content-hash = "54a93e43d0c1bbb152b9f6f3ac3fdc4d75bd5916d2a77624e9b8ced5f7222495" diff --git a/pyproject.toml b/pyproject.toml index 7ffa319c..1ff49048 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ tenacity = "*" data-platform-helpers = ">=0.1.7" validators = ">=0.35.0" dpcharmlibs-interfaces = ">=1.0.2" +lightkube = ">=0.19.0" [tool.poetry.requires-plugins] poetry-plugin-export = ">=1.8" diff --git a/src/common/exceptions.py b/src/common/exceptions.py index 26ced3d0..e3e19222 100644 --- a/src/common/exceptions.py +++ b/src/common/exceptions.py @@ -66,3 +66,7 @@ class RequestingLockTimedOutError(Exception): class ValkeyCertificatesNotReadyError(Exception): """Custom Exception if not all units have stored the TLS certificates.""" + + +class KubernetesClientError(Exception): + """Custom Exception if a connection to the Kubernetes Cluster API fails.""" diff --git a/src/common/k8s_client.py b/src/common/k8s_client.py new file mode 100644 index 00000000..e206503f --- /dev/null +++ b/src/common/k8s_client.py @@ -0,0 +1,102 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""K8sClient utility class to connect to the Kubernetes API server.""" + +import logging + +from lightkube.core.client import Client +from lightkube.core.exceptions import ApiError +from lightkube.models.core_v1 import ServicePort, ServiceSpec +from lightkube.models.meta_v1 import ObjectMeta +from lightkube.resources.core_v1 import Pod, Service + +from common.exceptions import KubernetesClientError + +logger = logging.getLogger(__name__) + + +class K8sClient: + """Expose Kubernetes API commands to the charm.""" + + def __init__(self, namespace: str, app_name: str): + self.namespace = namespace + self.app_name = app_name + self.client = Client() + + def ensure_endpoint_service(self, role: str, port: int) -> None: + """Create or update a K8s service. + + Args: + role(str): name of the role to create the service for, e.g "primary" or "replicas" + port(int): the port number to set for the service + """ + service_name = f"{self.app_name}-{role}" + service_port = ServicePort(port=port, targetPort=port) + + try: + service = self.client.get(res=Service, name=service_name, namespace=self.namespace) + if service.spec.ports != [service_port]: + service.spec.ports = [service_port] + self.client.patch(Service, service_name, service) + logger.info("Updated Kubernetes service %s to port %s", service_name, port) + return + except ApiError as e: + # 404 will be raised if service does not exist yet + if e.status.code != 404: + raise KubernetesClientError from e + + pod0 = self.client.get( + res=Pod, + name=self.app_name + "-0", + namespace=self.namespace, + ) + + service = Service( + apiVersion="v1", + kind="Service", + metadata=ObjectMeta( + namespace=self.namespace, + name=service_name, + ownerReferences=pod0.metadata.ownerReferences, + ), + spec=ServiceSpec( + selector={"application-name": self.app_name, "role": role}, + ports=[service_port], + type="ClusterIP", + ), + ) + + try: + self.client.create(service) + logger.info("Created Kubernetes service %s for port %s", service_name, port) + except ApiError as e: + logger.error("Kubernetes service creation failed: %s", e) + raise KubernetesClientError from e + + def update_pod_label(self, pod_name: str, role: str) -> None: + """Create or update a label for a pod. + + Args: + pod_name(str): the name of the pod + role(str): name of the role to create the label for, e.g "primary" or "replicas" + """ + try: + pod = self.client.get(Pod, pod_name, namespace=self.namespace) + except ApiError as e: + raise KubernetesClientError from e + + if not pod.metadata.labels: + pod.metadata.labels = {} + + if pod.metadata.labels.get("role") == role: + return + + logger.info("Updating pod %s to role %s", pod_name, role) + pod.metadata.labels["application-name"] = self.app_name + pod.metadata.labels["role"] = role + try: + self.client.patch(Pod, pod_name, pod) + except ApiError as e: + logger.error("Failed to update Kubernetes pod labels: %s", e) + raise KubernetesClientError from e diff --git a/src/events/external_clients.py b/src/events/external_clients.py index c23bd2b5..4e80b8b4 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -23,6 +23,7 @@ ) from common.exceptions import ( + KubernetesClientError, ValkeyACLLoadError, ValkeyCannotGetPrimaryIPError, ValkeyServicesFailedToStartError, @@ -178,15 +179,26 @@ def _on_bulk_resources_requested( self.charm.state.cluster.update({"client_user_epoch": time.time()}) def _on_peer_relation_changed(self, event: ops.RelationChangedEvent) -> None: - """Handle peer relation changes in regard to external client relations.""" - if ( - not self.charm.state.unit_server.is_started - or not self.charm.state.external_client_relations - ): + """Handle peer relation changes in regard to external client relations. + + This handler catches all changes from scaling operations, TLS switchover, TLS CA rotation, + IP changes, etc. + """ + if not self.charm.state.unit_server.is_started: + return + + if self.charm.unit.is_leader(): + try: + self.charm.sentinel_manager.reconcile_k8s_services() + except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: + logger.error("Error updating Kubernetes services: %s", e) + event.defer() + return + + if not self.charm.state.external_client_relations: return if self.charm.unit.is_leader(): - # this catches all changes from scaling operations, TLS switchover, IP changes, etc. try: self._update_client_relations() except (ValkeyCannotGetPrimaryIPError, ValkeyWorkloadCommandError) as e: diff --git a/src/literals.py b/src/literals.py index abb04f45..9168e365 100644 --- a/src/literals.py +++ b/src/literals.py @@ -122,3 +122,11 @@ class TLSCARotationState(StrEnum): NEW_CA_DETECTED = "new-ca-detected" NEW_CA_ADDED = "new-ca-added" CA_UPDATED = "ca-updated" + + +class K8sService(StrEnum): + """Services managed by the charm in Kubernetes.""" + + PRIMARY = "primary" + REPLICAS = "replicas" + SENTINELS = "sentinels" diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index 00140b9d..7a235479 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -20,6 +20,7 @@ ValkeyCannotGetPrimaryIPError, ValkeyWorkloadCommandError, ) +from common.k8s_client import K8sClient from core.base_workload import WorkloadBase from core.cluster_state import ClusterState from literals import ( @@ -29,6 +30,8 @@ SENTINEL_TLS_PORT, TLS_PORT, CharmUsers, + K8sService, + Substrate, ) from statuses import CharmStatuses @@ -46,6 +49,12 @@ def __init__(self, state: ClusterState, workload: WorkloadBase): self.workload = workload self.admin_user = CharmUsers.SENTINEL_CHARM_ADMIN.value + if self.state.substrate == Substrate.K8S: + self.k8s_client = K8sClient( + namespace=self.state.model.name, + app_name=self.state.model.app.name, + ) + @property def admin_password(self) -> str: """Get the password of the admin user for the sentinel service.""" @@ -359,3 +368,33 @@ def set_quorum(self, quorum: int) -> None: """Set the quorum for the sentinel cluster.""" client = self._get_sentinel_client() client.set(self.state.endpoint, PRIMARY_NAME, "quorum", str(quorum)) + + def reconcile_k8s_services(self) -> None: + """Create or update the services and pod labels in Kubernetes.""" + if self.state.substrate == Substrate.VM: + return + + valkey_port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT + sentinel_port = ( + SENTINEL_TLS_PORT if self.state.unit_server.is_tls_enabled else SENTINEL_PORT + ) + + self.k8s_client.ensure_endpoint_service(role=K8sService.PRIMARY.value, port=valkey_port) + self.k8s_client.ensure_endpoint_service(role=K8sService.REPLICAS.value, port=valkey_port) + self.k8s_client.ensure_endpoint_service( + role=K8sService.SENTINELS.value, port=sentinel_port + ) + + primary_endpoint = self.get_primary_ip() + for unit in self.state.servers: + if not unit.is_active: + continue + + pod_name = unit.unit_name.replace("/", "-") + self.k8s_client.update_pod_label( + pod_name=pod_name, + role=K8sService.PRIMARY.value + if unit.get_endpoint(Substrate.K8S) == primary_endpoint + else K8sService.REPLICAS.value, + ) + self.k8s_client.update_pod_label(pod_name=pod_name, role=K8sService.SENTINELS.value) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 41a6a18c..515b6912 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -27,6 +27,11 @@ def mock_bind_address(mocker): ) +@pytest.fixture(autouse=True) +def mock_k8s_client(mocker): + mocker.patch("lightkube.core.client.GenericSyncClient") + + @pytest.fixture(autouse=True) def tenacity_wait(mocker): mocker.patch("tenacity.nap.time") From 572e889f914d8219cf2b085e31a85e27b98ec9c6 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Fri, 17 Apr 2026 09:01:41 +0200 Subject: [PATCH 02/18] update rust to 1.94.0 to build `lightkube` --- charmcraft.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/charmcraft.yaml b/charmcraft.yaml index 795caf07..c309e121 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -75,7 +75,7 @@ parts: # rpds-py (Python package) >=0.19.0 requires rustc >=1.76, which is not available in the # Ubuntu 22.04 archive. Install rustc and cargo using rustup instead of the Ubuntu archive rustup set profile minimal - rustup default 1.90.0 # renovate: charmcraft-rust-latest + rustup default 1.94.0 # renovate: charmcraft-rust-latest craftctl default # Include requirements.txt in *.charm artifact for easier debugging From 48ba7d20e9da0fd0387294f668a2b58a7e0f3dd1 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Fri, 17 Apr 2026 11:06:33 +0200 Subject: [PATCH 03/18] publish service endpoints in client relation on K8s --- src/common/k8s_client.py | 3 ++- src/literals.py | 1 - src/managers/sentinel.py | 26 +++++++++++++++++--------- tests/unit/test_client_relation.py | 8 ++++---- 4 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/common/k8s_client.py b/src/common/k8s_client.py index e206503f..3e0ad1d0 100644 --- a/src/common/k8s_client.py +++ b/src/common/k8s_client.py @@ -9,6 +9,7 @@ from lightkube.core.exceptions import ApiError from lightkube.models.core_v1 import ServicePort, ServiceSpec from lightkube.models.meta_v1 import ObjectMeta +from lightkube.types import PatchType from lightkube.resources.core_v1 import Pod, Service from common.exceptions import KubernetesClientError @@ -38,7 +39,7 @@ def ensure_endpoint_service(self, role: str, port: int) -> None: service = self.client.get(res=Service, name=service_name, namespace=self.namespace) if service.spec.ports != [service_port]: service.spec.ports = [service_port] - self.client.patch(Service, service_name, service) + self.client.patch(Service, service_name, service, patch_type=PatchType.MERGE) logger.info("Updated Kubernetes service %s to port %s", service_name, port) return except ApiError as e: diff --git a/src/literals.py b/src/literals.py index 9168e365..063b5b33 100644 --- a/src/literals.py +++ b/src/literals.py @@ -129,4 +129,3 @@ class K8sService(StrEnum): PRIMARY = "primary" REPLICAS = "replicas" - SENTINELS = "sentinels" diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index 7a235479..57116917 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -5,6 +5,7 @@ """Manager for all sentinel related tasks.""" import logging +import socket import tenacity from data_platform_helpers.advanced_statuses.models import StatusObject @@ -141,16 +142,30 @@ def get_primary_ip(self) -> str: def get_primary_endpoint(self) -> str: """Get the endpoint of the primary node, consisting of address and port.""" - primary_address = self.get_primary_ip() port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT + if self.state.substrate == Substrate.K8S: + # get the DNS name of the K8s service + primary_address = socket.getfqdn( + f"{self.state.model.app.name}-{K8sService.PRIMARY.value}" + ) + return f"{primary_address}:{port}" + + primary_address = self.get_primary_ip() return f"{primary_address}:{port}" def get_replica_endpoints(self) -> str: """Get the endpoints of all replica nodes, consisting of address and port.""" port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT - client = self._get_sentinel_client() + if self.state.substrate == Substrate.K8S: + # get the DNS name of the K8s service + replicas_address = socket.getfqdn( + f"{self.state.model.app.name}-{K8sService.REPLICAS.value}" + ) + return f"{replicas_address}:{port}" + + client = self._get_sentinel_client() replica_list = client.replicas_primary(hostname=self.state.endpoint) return ",".join(sorted([f"{replica['ip']}:{port}" for replica in replica_list])) @@ -375,15 +390,9 @@ def reconcile_k8s_services(self) -> None: return valkey_port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT - sentinel_port = ( - SENTINEL_TLS_PORT if self.state.unit_server.is_tls_enabled else SENTINEL_PORT - ) self.k8s_client.ensure_endpoint_service(role=K8sService.PRIMARY.value, port=valkey_port) self.k8s_client.ensure_endpoint_service(role=K8sService.REPLICAS.value, port=valkey_port) - self.k8s_client.ensure_endpoint_service( - role=K8sService.SENTINELS.value, port=sentinel_port - ) primary_endpoint = self.get_primary_ip() for unit in self.state.servers: @@ -397,4 +406,3 @@ def reconcile_k8s_services(self) -> None: if unit.get_endpoint(Substrate.K8S) == primary_endpoint else K8sService.REPLICAS.value, ) - self.k8s_client.update_pod_label(pod_name=pod_name, role=K8sService.SENTINELS.value) diff --git a/tests/unit/test_client_relation.py b/tests/unit/test_client_relation.py index ebf8cff7..d2310fe0 100644 --- a/tests/unit/test_client_relation.py +++ b/tests/unit/test_client_relation.py @@ -93,8 +93,8 @@ def test_add_new_client_user(cloud_spec): assert response["resource"] == key_prefix assert response["request-id"] == request_id assert response["salt"] == salt - assert response["endpoints"] == f"{primary_endpoint}:{CLIENT_PORT}" - assert response["read-only-endpoints"] == f"{replica_endpoint}:{CLIENT_PORT}" + assert response["endpoints"] == f"valkey-primary:{CLIENT_PORT}" + assert response["read-only-endpoints"] == f"valkey-replicas:{CLIENT_PORT}" assert response["sentinel-endpoints"] == f"{primary_endpoint}:{SENTINEL_PORT}" assert response["version"] == valkey_version assert response["mode"] == "sentinel" @@ -161,8 +161,8 @@ def test_add_new_client_user_v0(cloud_spec): secret_tls = state_out.get_secret(id=secret_tls_id) assert response["database"] == key_prefix - assert response["endpoints"] == f"{primary_endpoint}:{CLIENT_PORT}" - assert response["read-only-endpoints"] == f"{replica_endpoint}:{CLIENT_PORT}" + assert response["endpoints"] == f"valkey-primary:{CLIENT_PORT}" + assert response["read-only-endpoints"] == f"valkey-replicas:{CLIENT_PORT}" assert response["sentinel_endpoints"] == f"{primary_endpoint}:{SENTINEL_PORT}" assert response["version"] == valkey_version assert response["mode"] == "sentinel" From f7816b0c6258b5fc4ca01bfd39e3fdc98ddf926a Mon Sep 17 00:00:00 2001 From: reneradoi Date: Fri, 17 Apr 2026 12:37:25 +0200 Subject: [PATCH 04/18] add K8s services to SANs DNS in TLS certificate requests --- src/common/k8s_client.py | 2 +- src/managers/sentinel.py | 9 ++------- src/managers/tls.py | 14 ++++++++++++-- tests/unit/test_tls.py | 9 ++++++++- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/common/k8s_client.py b/src/common/k8s_client.py index 3e0ad1d0..d4858e2d 100644 --- a/src/common/k8s_client.py +++ b/src/common/k8s_client.py @@ -9,8 +9,8 @@ from lightkube.core.exceptions import ApiError from lightkube.models.core_v1 import ServicePort, ServiceSpec from lightkube.models.meta_v1 import ObjectMeta -from lightkube.types import PatchType from lightkube.resources.core_v1 import Pod, Service +from lightkube.types import PatchType from common.exceptions import KubernetesClientError diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index 57116917..431b7f05 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -5,7 +5,6 @@ """Manager for all sentinel related tasks.""" import logging -import socket import tenacity from data_platform_helpers.advanced_statuses.models import StatusObject @@ -146,9 +145,7 @@ def get_primary_endpoint(self) -> str: if self.state.substrate == Substrate.K8S: # get the DNS name of the K8s service - primary_address = socket.getfqdn( - f"{self.state.model.app.name}-{K8sService.PRIMARY.value}" - ) + primary_address = f"{self.state.model.app.name}-{K8sService.PRIMARY.value}" return f"{primary_address}:{port}" primary_address = self.get_primary_ip() @@ -160,9 +157,7 @@ def get_replica_endpoints(self) -> str: if self.state.substrate == Substrate.K8S: # get the DNS name of the K8s service - replicas_address = socket.getfqdn( - f"{self.state.model.app.name}-{K8sService.REPLICAS.value}" - ) + replicas_address = f"{self.state.model.app.name}-{K8sService.REPLICAS.value}" return f"{replicas_address}:{port}" client = self._get_sentinel_client() diff --git a/src/managers/tls.py b/src/managers/tls.py index 59b0512e..51202424 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -27,7 +27,13 @@ from common.exceptions import ValkeyWorkloadCommandError from core.base_workload import WorkloadBase from core.cluster_state import ClusterState -from literals import TLS_CLIENT_PRIVATE_KEY_CONFIG, TLSCARotationState, TLSState +from literals import ( + TLS_CLIENT_PRIVATE_KEY_CONFIG, + K8sService, + Substrate, + TLSCARotationState, + TLSState, +) from statuses import CharmStatuses, TLSStatuses logger = logging.getLogger(__name__) @@ -107,7 +113,7 @@ def build_sans_ip(self) -> frozenset[str]: extra_sans = [san.strip() for san in extra_sans_config.split(",")] sans_ip = {san for san in extra_sans if self._is_ip_address(san)} - if self.state.substrate == "k8s": + if self.state.substrate == Substrate.K8S: return frozenset(sans_ip) sans_ip.add(self.state.bind_address) @@ -141,6 +147,10 @@ def build_sans_dns(self) -> frozenset[str]: sans_dns.add(self.state.unit_server.unit_name.replace("/", "")) sans_dns.add(self.state.hostname) + if self.state.substrate == Substrate.K8S: + sans_dns.add(f"{self.state.model.app.name}-{K8sService.PRIMARY.value}") + sans_dns.add(f"{self.state.model.app.name}-{K8sService.REPLICAS.value}") + return frozenset(sans_dns) def read_and_validate_private_key(self, private_key_secret_id: str) -> PrivateKey | None: diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index 8fa8aefd..8314d641 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -1117,6 +1117,7 @@ def test_set_extra_sans_config_option(cloud_spec): current_sans_value = ( "X509v3 Subject Alternative Name: \n " "DNS:valkey0, DNS:valkey-0.valkey-endpoints, " + "DNS:valkey-primary, DNS:valkey-replicas, " "IP Address:127.1.1.1, IP Address:192.0.2.0" ) with ( @@ -1151,6 +1152,7 @@ def test_set_extra_sans_config_option_unit_placeholder(cloud_spec): current_sans_value = ( "X509v3 Subject Alternative Name: \n " "DNS:myhostname, DNS:valkey0, DNS:valkey-0.valkey-endpoints, " + "DNS:valkey-primary, DNS:valkey-replicas, " "IP Address:127.1.1.1, IP Address:192.168.1.100, IP Address:192.0.2.0" ) with ( @@ -1258,7 +1260,12 @@ def test_set_extra_sans_config_option_no_update(cloud_spec): model=testing.Model(name="my-vm-model", type="lxd", cloud_spec=cloud_spec), ) - current_sans_value = "X509v3 Subject Alternative Name: \n DNS:myhostname, DNS:valkey0, DNS:valkey-0.valkey-endpoints, IP Address:192.168.1.100" + current_sans_value = ( + "X509v3 Subject Alternative Name: \n " + "DNS:myhostname, DNS:valkey0, DNS:valkey-0.valkey-endpoints, " + "DNS:valkey-primary, DNS:valkey-replicas, " + "IP Address:192.168.1.100" + ) with ( patch("workload_k8s.ValkeyK8sWorkload.exec", return_value=[current_sans_value]), patch("workload_k8s.ValkeyK8sWorkload.exec", return_value=[current_sans_value]), From 3c91b2b9bcfc6e7f45d6dd1a939f1a9096801a96 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Mon, 20 Apr 2026 17:17:10 +0200 Subject: [PATCH 05/18] WIP: add topology observer --- poetry.lock | 10 +-- pyproject.toml | 5 +- src/charm.py | 2 + src/common/topology_observer.py | 28 ++++++++ src/core/models.py | 1 + src/managers/topology.py | 122 ++++++++++++++++++++++++++++++++ 6 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 src/common/topology_observer.py create mode 100644 src/managers/topology.py diff --git a/poetry.lock b/poetry.lock index 8b2d8c36..947e77b9 100644 --- a/poetry.lock +++ b/poetry.lock @@ -66,7 +66,7 @@ version = "4.13.0" description = "High-level concurrency and networking framework on top of asyncio or Trio" optional = false python-versions = ">=3.10" -groups = ["main", "integration"] +groups = ["main"] files = [ {file = "anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708"}, {file = "anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc"}, @@ -1055,7 +1055,7 @@ version = "7.34.1" description = "" optional = false python-versions = ">=3.10" -groups = ["integration"] +groups = ["main"] files = [ {file = "protobuf-7.34.1-cp310-abi3-macosx_10_9_universal2.whl", hash = "sha256:d8b2cc79c4d8f62b293ad9b11ec3aebce9af481fa73e64556969f7345ebf9fc7"}, {file = "protobuf-7.34.1-cp310-abi3-manylinux2014_aarch64.whl", hash = "sha256:5185e0e948d07abe94bb76ec9b8416b604cfe5da6f871d67aad30cbf24c3110b"}, @@ -1529,7 +1529,7 @@ version = "1.3.1" description = "Sniff out which async library your code is running under" optional = false python-versions = ">=3.7" -groups = ["integration"] +groups = ["main"] files = [ {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, @@ -1617,7 +1617,7 @@ version = "0.0.0" description = "Valkey GLIDE Async client. Supports Valkey and Redis OSS." optional = false python-versions = ">=3.9" -groups = ["integration"] +groups = ["main"] files = [] develop = false @@ -1673,4 +1673,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "54a93e43d0c1bbb152b9f6f3ac3fdc4d75bd5916d2a77624e9b8ced5f7222495" +content-hash = "c1b7c7d55f52c76a312ad5a3324f9166b9389adeb3f12923ef322c05ce43cd6a" diff --git a/pyproject.toml b/pyproject.toml index 9f503c79..f3a8730c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,9 @@ data-platform-helpers = ">=0.1.7" validators = ">=0.35.0" dpcharmlibs-interfaces = ">=1.0.2" lightkube = ">=0.19.0" +# TODO replace with official release once build from source is possible +# https://github.com/valkey-io/valkey-glide/pull/5202 +valkey-glide = { git = "https://github.com/skourta/valkey-glide", subdirectory = "python/glide-async", branch = "add-build-rs-to-async-client" } [tool.poetry.requires-plugins] poetry-plugin-export = ">=1.8" @@ -57,8 +60,6 @@ data-platform-helpers = ">=0.1.7" jubilant = "^1.6.0" python-dateutil = "*" tenacity = "^9.1.2" -# https://github.com/valkey-io/valkey-glide/pull/5124 not yet released -valkey-glide = { git = "https://github.com/skourta/valkey-glide", subdirectory = "python/glide-async", branch = "add-build-rs-to-async-client" } kubernetes = "^35.0.0" [tool.coverage.run] diff --git a/src/charm.py b/src/charm.py index bc2b23e2..3c7dcce9 100755 --- a/src/charm.py +++ b/src/charm.py @@ -19,6 +19,7 @@ from managers.external_clients import ExternalClientsManager from managers.sentinel import SentinelManager from managers.tls import TLSManager +from managers.topology import TopologyManager from workload_k8s import ValkeyK8sWorkload from workload_vm import ValkeyVmWorkload @@ -50,6 +51,7 @@ def __init__(self, *args) -> None: self.sentinel_manager = SentinelManager(state=self.state, workload=self.workload) self.tls_manager = TLSManager(state=self.state, workload=self.workload) self.client_manager = ExternalClientsManager(state=self.state, workload=self.workload) + self.topology_manager = TopologyManager(state=self.state, workload=self.workload) # --- STATUS HANDLER --- self.status = StatusHandler( diff --git a/src/common/topology_observer.py b/src/common/topology_observer.py new file mode 100644 index 00000000..7dd26fd8 --- /dev/null +++ b/src/common/topology_observer.py @@ -0,0 +1,28 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Topology observer class for checking changes in Primary/Replica topology.""" + +import subprocess +import sys + +def dispatch(unit, charm_dir, custom_event) -> None: + """Dispatch a Juju custom event.""" + juju_run_command = "/usr/bin/juju-exec" + dispatch_command = f"JUJU_DISPATCH_PATH=hooks/{custom_event} {charm_dir}/dispatch" + + subprocess.run([juju_run_command, "-u", unit, dispatch_command]) + + +def callback() -> None: + """Handle received event messages and trigger a Juju event.""" + pass + + +def main() -> None: + """Start a Valkey client and subscribe to Sentinel event messages.""" + valkey_hosts, username, password, tls, tls_ca_cert_file, unit_name, charm_dir = sys.argv[1:] + + +if __name__ == "__main__": + main() diff --git a/src/core/models.py b/src/core/models.py index 48715dd0..f1565f48 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -84,6 +84,7 @@ class PeerUnitModel(PeerModel): is_valkey_healthy: bool = Field(default=True) is_sentinel_healthy: bool = Field(default=True) client_user_epoch: float = Field(default=0) + topology_observer_pid: str = Field(default="") class RelationState: diff --git a/src/managers/topology.py b/src/managers/topology.py new file mode 100644 index 00000000..286b9844 --- /dev/null +++ b/src/managers/topology.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Manager for Cluster Topology.""" + +import logging +import os +import signal +import subprocess +from pathlib import Path +from sys import version_info + +from core.base_workload import WorkloadBase +from core.cluster_state import ClusterState +from literals import ( + CLIENT_PORT, + TLS_PORT, + TOPOLOGY_OBSERVER_LOGFILE, + SNAP_TOPOLOGY_OBSERVER_LOGFILE, + CharmUsers, + Substrate, +) + +logger = logging.getLogger(__name__) + +LOG_FILE_PATH = "/var/log/topology_observer.log" + + +class TopologyManager: + """Observe the topology for Valkey Sentinel.""" + + name: str = "topology_observer" + state: ClusterState + + def __init__(self, state: ClusterState, workload: WorkloadBase): + self.state = state + self.workload = workload + + def start_observer(self) -> None: + """Start the topology observer as a subprocess.""" + if observer_pid := self.state.unit_server.model.topology_observer_pid: + try: + # check if the process already runs + os.kill(int(observer_pid), 0) + return + except OSError: + logger.debug("Topology observer not running") + pass + + # Generate the venv path based on the existing lib path + env = os.environ.copy() + env.pop('JUJU_CONTEXT_ID', None) + for loc in env["PYTHONPATH"].split(":"): + path = Path(loc) + venv_path = ( + path + / ".." + / "venv" + / "lib" + / f"python{version_info.major}.{version_info.minor}" + / "site-packages" + ) + if path.stem == "lib": + env["PYTHONPATH"] = f"{venv_path.resolve()}:{env['PYTHONPATH']}" + break + + # Gather Valkey hosts for connection + started_servers = [ + unit.get_endpoint(self.state.substrate) + for unit in self.state.servers + if unit.is_active + ] + port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT + valkey_hosts = ",".join(sorted([f"{server}:{port}" for server in started_servers])) + + # Store current TLS CA cert on operator container + tls_ca_cert = self.workload.read_file(self.workload.tls_paths.client_ca) + tls_ca_cert_file = "/etc/ssl/certs/Valkey_CA.pem" + path = Path(tls_ca_cert_file) + path.write_text(tls_ca_cert) + + logging.info("Starting topology observer") + pid = subprocess.Popen( # noqa: S603 + [ + "/usr/bin/python3", + "scripts/cluster_topology_observer.py", + valkey_hosts, + CharmUsers.VALKEY_ADMIN.value, # username + self.state.unit_server.valkey_admin_password, # password + str(self.state.unit_server.is_tls_enabled), + tls_ca_cert_file, + self.state.unit_server.unit_name, + self.state.charm.charm_dir, + ], + # File shouldn't close + stdout=open(LOG_FILE_PATH, "a"), # noqa: SIM115 + stderr=subprocess.STDOUT, + env=env, + ).pid + + self.state.unit_server.update({"topology_observer_pid": pid}) + logging.info(f"Started topology observer process with PID {pid}") + + def stop_observer(self) -> None: + """Stop the topology observer.""" + if not (observer_pid := self.state.unit_server.model.topology_observer_pid): + logger.debug("Topology observer already stopped") + return + + logger.debug("Stopping topology observer") + try: + os.kill(int(observer_pid), signal.SIGTERM) + logger.info("Topology observer stopped") + self.state.unit_server.update({"topology_observer_pid": ""}) + except OSError: + pass + + def restart_observer(self) -> None: + """Stop and start the topology observer to pickup host changes.""" + self.stop_observer() + self.start_observer() From c7d98ff5fca8b81b9eaffbe79d3ec8b8c0550efd Mon Sep 17 00:00:00 2001 From: reneradoi Date: Tue, 21 Apr 2026 21:49:18 +0200 Subject: [PATCH 06/18] add topology observer based on valkey-py --- poetry.lock | 26 +++++++-- pyproject.toml | 6 +- src/common/topology_observer.py | 28 --------- src/core/models.py | 2 +- src/events/base_events.py | 37 ++++++++++++ src/events/external_clients.py | 32 +++++++++++ src/literals.py | 6 +- src/managers/sentinel.py | 7 ++- src/managers/topology.py | 37 ++++++------ src/scripts/topology_observer.py | 97 ++++++++++++++++++++++++++++++++ tests/unit/test_tls.py | 5 ++ 11 files changed, 224 insertions(+), 59 deletions(-) delete mode 100644 src/common/topology_observer.py create mode 100644 src/scripts/topology_observer.py diff --git a/poetry.lock b/poetry.lock index 947e77b9..24e7cd1b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -66,7 +66,7 @@ version = "4.13.0" description = "High-level concurrency and networking framework on top of asyncio or Trio" optional = false python-versions = ">=3.10" -groups = ["main"] +groups = ["main", "integration"] files = [ {file = "anyio-4.13.0-py3-none-any.whl", hash = "sha256:08b310f9e24a9594186fd75b4f73f4a4152069e3853f1ed8bfbf58369f4ad708"}, {file = "anyio-4.13.0.tar.gz", hash = "sha256:334b70e641fd2221c1505b3890c69882fe4a2df910cba14d97019b90b24439dc"}, @@ -1055,7 +1055,7 @@ version = "7.34.1" description = "" optional = false python-versions = ">=3.10" -groups = ["main"] +groups = ["integration"] files = [ {file = "protobuf-7.34.1-cp310-abi3-macosx_10_9_universal2.whl", hash = "sha256:d8b2cc79c4d8f62b293ad9b11ec3aebce9af481fa73e64556969f7345ebf9fc7"}, {file = "protobuf-7.34.1-cp310-abi3-manylinux2014_aarch64.whl", hash = "sha256:5185e0e948d07abe94bb76ec9b8416b604cfe5da6f871d67aad30cbf24c3110b"}, @@ -1529,7 +1529,7 @@ version = "1.3.1" description = "Sniff out which async library your code is running under" optional = false python-versions = ">=3.7" -groups = ["main"] +groups = ["integration"] files = [ {file = "sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2"}, {file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"}, @@ -1611,13 +1611,29 @@ files = [ [package.extras] crypto-eth-addresses = ["eth-hash[pycryptodome] (>=0.7.0)"] +[[package]] +name = "valkey" +version = "6.1.1" +description = "Python client for Valkey forked from redis-py" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "valkey-6.1.1-py3-none-any.whl", hash = "sha256:e2691541c6e1503b53c714ad9a35551ac9b7c0bbac93865f063dbc859a46de92"}, + {file = "valkey-6.1.1.tar.gz", hash = "sha256:5880792990c6c2b5eb604a5ed5f98f300880b6dd92d123819b66ed54bb259731"}, +] + +[package.extras] +libvalkey = ["libvalkey (>=4.0.1)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==23.2.1)", "requests (>=2.31.0)"] + [[package]] name = "valkey-glide" version = "0.0.0" description = "Valkey GLIDE Async client. Supports Valkey and Redis OSS." optional = false python-versions = ">=3.9" -groups = ["main"] +groups = ["integration"] files = [] develop = false @@ -1673,4 +1689,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "c1b7c7d55f52c76a312ad5a3324f9166b9389adeb3f12923ef322c05ce43cd6a" +content-hash = "b8ff2b895655a7499b09645e24a9c5a883911dc1d4eaee66cb2d275d93522954" diff --git a/pyproject.toml b/pyproject.toml index f3a8730c..38361447 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,9 +17,7 @@ data-platform-helpers = ">=0.1.7" validators = ">=0.35.0" dpcharmlibs-interfaces = ">=1.0.2" lightkube = ">=0.19.0" -# TODO replace with official release once build from source is possible -# https://github.com/valkey-io/valkey-glide/pull/5202 -valkey-glide = { git = "https://github.com/skourta/valkey-glide", subdirectory = "python/glide-async", branch = "add-build-rs-to-async-client" } +valkey = "^6.1.1" [tool.poetry.requires-plugins] poetry-plugin-export = ">=1.8" @@ -60,6 +58,8 @@ data-platform-helpers = ">=0.1.7" jubilant = "^1.6.0" python-dateutil = "*" tenacity = "^9.1.2" +# https://github.com/valkey-io/valkey-glide/pull/5124 not yet released +valkey-glide = { git = "https://github.com/skourta/valkey-glide", subdirectory = "python/glide-async", branch = "add-build-rs-to-async-client" } kubernetes = "^35.0.0" [tool.coverage.run] diff --git a/src/common/topology_observer.py b/src/common/topology_observer.py deleted file mode 100644 index 7dd26fd8..00000000 --- a/src/common/topology_observer.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2026 Canonical Ltd. -# See LICENSE file for licensing details. - -"""Topology observer class for checking changes in Primary/Replica topology.""" - -import subprocess -import sys - -def dispatch(unit, charm_dir, custom_event) -> None: - """Dispatch a Juju custom event.""" - juju_run_command = "/usr/bin/juju-exec" - dispatch_command = f"JUJU_DISPATCH_PATH=hooks/{custom_event} {charm_dir}/dispatch" - - subprocess.run([juju_run_command, "-u", unit, dispatch_command]) - - -def callback() -> None: - """Handle received event messages and trigger a Juju event.""" - pass - - -def main() -> None: - """Start a Valkey client and subscribe to Sentinel event messages.""" - valkey_hosts, username, password, tls, tls_ca_cert_file, unit_name, charm_dir = sys.argv[1:] - - -if __name__ == "__main__": - main() diff --git a/src/core/models.py b/src/core/models.py index f1565f48..3f43cbe5 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -84,7 +84,7 @@ class PeerUnitModel(PeerModel): is_valkey_healthy: bool = Field(default=True) is_sentinel_healthy: bool = Field(default=True) client_user_epoch: float = Field(default=0) - topology_observer_pid: str = Field(default="") + topology_observer_pid: int = Field(default=0) class RelationState: diff --git a/src/events/base_events.py b/src/events/base_events.py index 8020ba4a..a5d55883 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -253,6 +253,14 @@ def _on_unit_fully_started(self, event: UnitFullyStarted) -> None: self.charm.unit.open_port("tcp", CLIENT_PORT) self.charm.unit.open_port("tcp", TLS_PORT) + if not self.charm.unit.is_leader(): + return + + try: + self.charm.topology_manager.start_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to start topology observer: %s", e) + def _on_peer_relation_changed(self, _: ops.RelationChangedEvent) -> None: """Handle event received by all units when a unit's relation data changes.""" try: @@ -267,6 +275,15 @@ def _on_peer_relation_changed(self, _: ops.RelationChangedEvent) -> None: for lock in [StartLock(self.charm.state), RestartLock(self.charm.state)]: lock.process() + if not self.charm.state.unit_server.is_active: + return + + # need to pick up scaling operations, TLS switchover, CA rotation and so on + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None: """Handle event received by all units when a unit departs.""" try: @@ -275,10 +292,27 @@ def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None: logger.error(f"Failed to update sentinel quorum: {e}") # not critical to defer here, we can wait for the next relation change + if not self.charm.state.unit_server.is_active: + return + + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + def _on_update_status(self, event: ops.UpdateStatusEvent) -> None: """Handle the update-status event.""" if not self.charm.state.unit_server.is_started: logger.warning("Service not started") + return + + if not self.charm.unit.is_leader(): + return + + try: + self.charm.topology_manager.start_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to start topology observer: %s", e) def _on_leader_elected(self, event: ops.LeaderElectedEvent) -> None: """Handle the leader-elected event.""" @@ -552,6 +586,9 @@ def _on_storage_detaching(self, event: ops.StorageDetachingEvent) -> None: primary_ip, ) + if self.charm.unit.is_leader(): + self.charm.topology_manager.stop_observer() + # stop valkey and sentinel processes self.charm.workload.stop() active_sentinels = [ diff --git a/src/events/external_clients.py b/src/events/external_clients.py index 4e80b8b4..64d63aa4 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -38,9 +38,15 @@ logger = logging.getLogger(__name__) +class TopologyChangedEvent(ops.EventBase): + """A custom event for topology changes.""" + + class ExternalClientsEvents(ops.Object): """Handle all events for external client relations.""" + topology_changed = ops.EventSource(TopologyChangedEvent) + def __init__(self, charm: "ValkeyCharm"): super().__init__(charm, key="client_events") self.charm = charm @@ -79,6 +85,7 @@ def __init__(self, charm: "ValkeyCharm"): self.framework.observe( self.certificate_transfer.on.certificates_removed, self._on_ca_removed ) + self.framework.observe(self.topology_changed, self._on_topology_changed) def _on_bulk_resources_requested( self, event: BulkResourcesRequestedEvent[RequirerCommonModel] | ResourceRequestedEvent @@ -190,6 +197,7 @@ def _on_peer_relation_changed(self, event: ops.RelationChangedEvent) -> None: if self.charm.unit.is_leader(): try: self.charm.sentinel_manager.reconcile_k8s_services() + self.charm.sentinel_manager.set_pod_labels() except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: logger.error("Error updating Kubernetes services: %s", e) event.defer() @@ -367,3 +375,27 @@ def _on_ca_removed(self, event: CertificatesRemovedEvent) -> None: logger.error("Error removing CA certificates for external clients: %s", e) event.defer() return + + def _on_topology_changed(self, event: TopologyChangedEvent) -> None: + """Handle custom events for topology changes.""" + if not self.charm.unit.is_leader(): + return + + logger.info("Received topology-changed event") + try: + self.charm.sentinel_manager.reconcile_k8s_services() + self.charm.sentinel_manager.set_pod_labels() + except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: + logger.error("Error updating Kubernetes services: %s", e) + event.defer() + return + + if not self.charm.state.external_client_relations: + return + + try: + self._update_client_relations() + except (ValkeyCannotGetPrimaryIPError, ValkeyWorkloadCommandError) as e: + logger.error("Error updating client relations: %s", e) + event.defer() + return diff --git a/src/literals.py b/src/literals.py index 063b5b33..5928aefd 100644 --- a/src/literals.py +++ b/src/literals.py @@ -26,6 +26,10 @@ ACL_FILE = "var/lib/valkey/users.acl" SENTINEL_ACL_FILE = "var/lib/valkey/sentinel-users.acl" +# todo: these paths require root access, should be moved to dedicated user directories +TOPOLOGY_OBSERVER_LOG_FILE = "/var/log/topology_observer.log" +TOPOLOGY_OBSERVER_TLS_CA_FILE = "/etc/ssl/certs/Valkey_CA.pem" + PEER_RELATION = "valkey-peers" STATUS_PEERS_RELATION = "status-peers" CLIENT_TLS_RELATION_NAME = "client-certificates" @@ -64,7 +68,7 @@ class CharmUsers(StrEnum): CHARM_USERS_ROLE_MAP = { - CharmUsers.VALKEY_ADMIN: "~* +@all", + CharmUsers.VALKEY_ADMIN: "~* &* +@all", CharmUsers.VALKEY_SENTINEL: "+subscribe +publish +failover +script|kill +ping +info +multi +slaveof +config +client +exec &__sentinel__:hello", CharmUsers.VALKEY_REPLICA: "+psync +replconf +ping", CharmUsers.VALKEY_MONITORING: "-@all +@connection +memory -readonly +strlen +config|get +xinfo +pfcount -quit +zcard +type +xlen -readwrite -command +client -wait +scard +llen +hlen +get +eval +slowlog +cluster|info +cluster|slots +cluster|nodes -hello -echo +info +latency +scan -reset -auth -asking", diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index 431b7f05..214ab102 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -380,7 +380,7 @@ def set_quorum(self, quorum: int) -> None: client.set(self.state.endpoint, PRIMARY_NAME, "quorum", str(quorum)) def reconcile_k8s_services(self) -> None: - """Create or update the services and pod labels in Kubernetes.""" + """Create or update the services in Kubernetes.""" if self.state.substrate == Substrate.VM: return @@ -389,6 +389,11 @@ def reconcile_k8s_services(self) -> None: self.k8s_client.ensure_endpoint_service(role=K8sService.PRIMARY.value, port=valkey_port) self.k8s_client.ensure_endpoint_service(role=K8sService.REPLICAS.value, port=valkey_port) + def set_pod_labels(self) -> None: + """Set labels for primary and replica pods in Kubernetes.""" + if self.state.substrate == Substrate.VM: + return + primary_endpoint = self.get_primary_ip() for unit in self.state.servers: if not unit.is_active: diff --git a/src/managers/topology.py b/src/managers/topology.py index 286b9844..c3a7ed41 100644 --- a/src/managers/topology.py +++ b/src/managers/topology.py @@ -14,18 +14,15 @@ from core.base_workload import WorkloadBase from core.cluster_state import ClusterState from literals import ( - CLIENT_PORT, - TLS_PORT, - TOPOLOGY_OBSERVER_LOGFILE, - SNAP_TOPOLOGY_OBSERVER_LOGFILE, + SENTINEL_PORT, + SENTINEL_TLS_PORT, + TOPOLOGY_OBSERVER_LOG_FILE, + TOPOLOGY_OBSERVER_TLS_CA_FILE, CharmUsers, - Substrate, ) logger = logging.getLogger(__name__) -LOG_FILE_PATH = "/var/log/topology_observer.log" - class TopologyManager: """Observe the topology for Valkey Sentinel.""" @@ -39,7 +36,7 @@ def __init__(self, state: ClusterState, workload: WorkloadBase): def start_observer(self) -> None: """Start the topology observer as a subprocess.""" - if observer_pid := self.state.unit_server.model.topology_observer_pid: + if (observer_pid := self.state.unit_server.model.topology_observer_pid) != 0: try: # check if the process already runs os.kill(int(observer_pid), 0) @@ -50,7 +47,7 @@ def start_observer(self) -> None: # Generate the venv path based on the existing lib path env = os.environ.copy() - env.pop('JUJU_CONTEXT_ID', None) + env.pop("JUJU_CONTEXT_ID", None) for loc in env["PYTHONPATH"].split(":"): path = Path(loc) venv_path = ( @@ -71,30 +68,30 @@ def start_observer(self) -> None: for unit in self.state.servers if unit.is_active ] - port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT - valkey_hosts = ",".join(sorted([f"{server}:{port}" for server in started_servers])) + port = SENTINEL_TLS_PORT if self.state.unit_server.is_tls_enabled else SENTINEL_PORT + hosts = ",".join(sorted([f"{server}:{port}" for server in started_servers])) # Store current TLS CA cert on operator container tls_ca_cert = self.workload.read_file(self.workload.tls_paths.client_ca) - tls_ca_cert_file = "/etc/ssl/certs/Valkey_CA.pem" - path = Path(tls_ca_cert_file) + path = Path(TOPOLOGY_OBSERVER_TLS_CA_FILE) path.write_text(tls_ca_cert) logging.info("Starting topology observer") pid = subprocess.Popen( # noqa: S603 [ "/usr/bin/python3", - "scripts/cluster_topology_observer.py", - valkey_hosts, - CharmUsers.VALKEY_ADMIN.value, # username - self.state.unit_server.valkey_admin_password, # password + "src/scripts/topology_observer.py", + hosts, + CharmUsers.SENTINEL_CHARM_ADMIN.value, # username + self.state.cluster.internal_users_credentials.get( + CharmUsers.SENTINEL_CHARM_ADMIN.value, "" + ), # password str(self.state.unit_server.is_tls_enabled), - tls_ca_cert_file, self.state.unit_server.unit_name, self.state.charm.charm_dir, ], # File shouldn't close - stdout=open(LOG_FILE_PATH, "a"), # noqa: SIM115 + stdout=open(TOPOLOGY_OBSERVER_LOG_FILE, "a"), # noqa: SIM115 stderr=subprocess.STDOUT, env=env, ).pid @@ -104,7 +101,7 @@ def start_observer(self) -> None: def stop_observer(self) -> None: """Stop the topology observer.""" - if not (observer_pid := self.state.unit_server.model.topology_observer_pid): + if (observer_pid := self.state.unit_server.model.topology_observer_pid) == 0: logger.debug("Topology observer already stopped") return diff --git a/src/scripts/topology_observer.py b/src/scripts/topology_observer.py new file mode 100644 index 00000000..af69bcb0 --- /dev/null +++ b/src/scripts/topology_observer.py @@ -0,0 +1,97 @@ +# Copyright 2026 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Topology observer class for checking changes in Primary/Replica topology.""" + +import signal +import subprocess +import sys +import time + +from valkey.sentinel import Sentinel, MasterNotFoundError + +from literals import PRIMARY_NAME, TOPOLOGY_OBSERVER_LOG_FILE, TOPOLOGY_OBSERVER_TLS_CA_FILE + +# use global variable for gracefully handling stop signals +continue_running = True + + +def dispatch(unit_name: str, charm_dir: str) -> None: + """Dispatch a Juju custom event.""" + custom_event = "topology_changed" + + juju_run_command = "/usr/bin/juju-exec" + dispatch_command = f"JUJU_DISPATCH_PATH=hooks/{custom_event} {charm_dir}/dispatch" + + subprocess.run([juju_run_command, "-u", unit_name, dispatch_command]) + + +def handle_stop_signal(signum, frame) -> None: + """Stop the execution gracefully.""" + global continue_running + continue_running = False + + +def main() -> None: + """Start a Sentinel client and check changes to primary.""" + hosts, username, password, tls, unit_name, charm_dir = sys.argv[1:] + + # handle the stop signal for a graceful stop of the subscription client + signal.signal(signal.SIGTERM, handle_stop_signal) + + with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: + log_file.write(f"Starting new observer for hosts {hosts} with tls={tls}\n") + + host_list = hosts.split(",") + addresses = [ + (hostname, int(port)) + for host in host_list + for hostname, port in [host.split(":")] + ] + tls_enabled = True if tls == "True" else False + sentinel_kwargs = { + "username": username, + "password": password, + "decode_responses": True, + "ssl": tls_enabled, + "ssl_ca_certs": TOPOLOGY_OBSERVER_TLS_CA_FILE if tls_enabled else None, + } + + primary_name = "" + previous_primary = "" + + while continue_running: + time.sleep(1) + + if primary_name != "": + previous_primary = primary_name + + sentinel = Sentinel( + sentinels=addresses, + socket_timeout=0.1, + sentinel_kwargs=sentinel_kwargs, + ) + + try: + primary_name = sentinel.discover_master(PRIMARY_NAME)[0] + except MasterNotFoundError as e: + with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: + log_file.write(f"Failed to discover primary: {e}\n") + continue + + if previous_primary == "" or primary_name == previous_primary: + continue + + with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: + log_file.write( + f"Primary change detected: previously {previous_primary}, now {primary_name}\n" + ) + dispatch(unit_name, charm_dir) + + else: + with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: + log_file.write("Gracefully stopping observer\n") + + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index 8314d641..fcbc8e25 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -553,6 +553,7 @@ def test_client_certificate_renewed(cloud_spec): patch("workload_k8s.ValkeyK8sWorkload.write_file") as write_certs, patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, + patch("managers.topology.TopologyManager.start_observer"), ): event.certificate = certificate.certificate charm.tls_events._on_certificate_available(event) @@ -611,6 +612,7 @@ def test_new_client_ca_single_unit(cloud_spec): patch("workload_k8s.ValkeyK8sWorkload.write_file") as write_certs, patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, + patch("managers.topology.TopologyManager.start_observer"), ): event.certificate = certificate.certificate charm.tls_events._on_certificate_available(event) @@ -674,6 +676,7 @@ def test_new_client_ca_rotation_started(cloud_spec): patch("workload_k8s.ValkeyK8sWorkload.write_file") as write_certs, patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, + patch("managers.topology.TopologyManager.start_observer"), ): event.certificate = certificate.certificate charm.tls_events._on_certificate_available(event) @@ -718,6 +721,7 @@ def test_internal_peer_ca_rotation_single_unit(cloud_spec): patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + patch("managers.topology.TopologyManager.restart_observer"), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation, remote_unit=1), state_in) @@ -760,6 +764,7 @@ def test_internal_peer_ca_rotation_started(cloud_spec): patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), + patch("managers.topology.TopologyManager.restart_observer"), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation, remote_unit=1), state_in) From fd44750c229c25914f06da167e2bc3f670c4374a Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 09:20:43 +0200 Subject: [PATCH 07/18] handle `topology-changed` event --- src/charm.py | 4 +++- src/events/base_events.py | 15 ++++++++++++++- src/events/external_clients.py | 10 +++++++--- src/scripts/topology_observer.py | 6 ++---- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/charm.py b/src/charm.py index 3c7dcce9..8e643e40 100755 --- a/src/charm.py +++ b/src/charm.py @@ -11,7 +11,7 @@ from core.cluster_state import ClusterState from events.base_events import BaseEvents -from events.external_clients import ExternalClientsEvents +from events.external_clients import ExternalClientsEvents, TopologyChangedCharmEvents from events.tls import TLSEvents from literals import CONTAINER, Substrate from managers.cluster import ClusterManager @@ -29,6 +29,8 @@ class ValkeyCharm(ops.CharmBase): """Charmed Operator for Valkey.""" + on = TopologyChangedCharmEvents() + def __init__(self, *args) -> None: super().__init__(*args) try: diff --git a/src/events/base_events.py b/src/events/base_events.py index a5d55883..0e1dfa52 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -404,6 +404,12 @@ def _on_config_changed(self, event: ops.ConfigChangedEvent) -> None: event.defer() return + # propagate updated credentials to topology observer + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: """Handle the secret_changed event.""" if not (admin_secret_id := self.charm.config.get(INTERNAL_USERS_PASSWORD_CONFIG)): @@ -421,7 +427,14 @@ def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: ): event.defer() return - return + + # propagate updated credentials to topology observer + try: + self.charm.topology_manager.restart_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to restart topology observer: %s", e) + finally: + return # from here, code is only relevant for non-leader units if event.secret.label and event.secret.label.endswith(INTERNAL_USERS_SECRET_LABEL_SUFFIX): diff --git a/src/events/external_clients.py b/src/events/external_clients.py index 64d63aa4..9e15098e 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -42,11 +42,15 @@ class TopologyChangedEvent(ops.EventBase): """A custom event for topology changes.""" -class ExternalClientsEvents(ops.Object): - """Handle all events for external client relations.""" +class TopologyChangedCharmEvents(ops.CharmEvents): + """A CharmEvent extension to observe topology changes.""" topology_changed = ops.EventSource(TopologyChangedEvent) + +class ExternalClientsEvents(ops.Object): + """Handle all events for external client relations.""" + def __init__(self, charm: "ValkeyCharm"): super().__init__(charm, key="client_events") self.charm = charm @@ -85,7 +89,7 @@ def __init__(self, charm: "ValkeyCharm"): self.framework.observe( self.certificate_transfer.on.certificates_removed, self._on_ca_removed ) - self.framework.observe(self.topology_changed, self._on_topology_changed) + self.framework.observe(self.charm.on.topology_changed, self._on_topology_changed) def _on_bulk_resources_requested( self, event: BulkResourcesRequestedEvent[RequirerCommonModel] | ResourceRequestedEvent diff --git a/src/scripts/topology_observer.py b/src/scripts/topology_observer.py index af69bcb0..94c0f67e 100644 --- a/src/scripts/topology_observer.py +++ b/src/scripts/topology_observer.py @@ -8,7 +8,7 @@ import sys import time -from valkey.sentinel import Sentinel, MasterNotFoundError +from valkey.sentinel import MasterNotFoundError, Sentinel from literals import PRIMARY_NAME, TOPOLOGY_OBSERVER_LOG_FILE, TOPOLOGY_OBSERVER_TLS_CA_FILE @@ -44,9 +44,7 @@ def main() -> None: host_list = hosts.split(",") addresses = [ - (hostname, int(port)) - for host in host_list - for hostname, port in [host.split(":")] + (hostname, int(port)) for host in host_list for hostname, port in [host.split(":")] ] tls_enabled = True if tls == "True" else False sentinel_kwargs = { From 06d1bc6492f2bccc9800d9ce713eae289da16a0a Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 09:29:23 +0200 Subject: [PATCH 08/18] workaround for core26 snap, remove pubsub channel permissions from admin user acl --- src/literals.py | 2 +- src/workload_vm.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/literals.py b/src/literals.py index 5928aefd..bd743122 100644 --- a/src/literals.py +++ b/src/literals.py @@ -68,7 +68,7 @@ class CharmUsers(StrEnum): CHARM_USERS_ROLE_MAP = { - CharmUsers.VALKEY_ADMIN: "~* &* +@all", + CharmUsers.VALKEY_ADMIN: "~* +@all", CharmUsers.VALKEY_SENTINEL: "+subscribe +publish +failover +script|kill +ping +info +multi +slaveof +config +client +exec &__sentinel__:hello", CharmUsers.VALKEY_REPLICA: "+psync +replconf +ping", CharmUsers.VALKEY_MONITORING: "-@all +@connection +memory -readonly +strlen +config|get +xinfo +pfcount -quit +zcard +type +xlen -readwrite -command +client -wait +scard +llen +hlen +get +eval +slowlog +cluster|info +cluster|slots +cluster|nodes -hello -echo +info +latency +scan -reset -auth -asking", diff --git a/src/workload_vm.py b/src/workload_vm.py index d7bb80d4..c951043c 100644 --- a/src/workload_vm.py +++ b/src/workload_vm.py @@ -92,6 +92,9 @@ def install(self, revision: str | None = None, retry_and_raise: bool = True) -> revision = str(SNAP_REVISION) try: + # TODO revisit this logic after snapd update is released + # refresh snapd to use candidate to bypass risc check issue. + snap.add("snapd", channel="candidate") # as long as 26.04 is not stable, we need to install the core26 snap from edge snap.add("core26", channel="edge") From 5460bf22c0b6680bc58ca8c4b22a9f22a4c6c93b Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 10:53:26 +0200 Subject: [PATCH 09/18] bugfix: passwords for sentinel users not updated on password-change --- src/events/base_events.py | 6 ++++++ tests/unit/test_charm.py | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/events/base_events.py b/src/events/base_events.py index 0e1dfa52..0f744ad3 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -441,8 +441,11 @@ def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: # leader unit processed the secret change from user, non-leader units can replicate try: self.charm.config_manager.set_acl_file() + self.charm.config_manager.set_sentinel_acl_file() if self.charm.state.unit_server.is_started: self.charm.cluster_manager.reload_acl_file() + # todo: request rolling restart + self.charm.sentinel_manager.restart_service() # update the local unit admin password to match the leader self.charm.config_manager.update_local_valkey_admin_password() if self.charm.state.unit_server.is_started: @@ -504,8 +507,11 @@ def _update_internal_users_password(self, secret_id: str) -> None: logger.info("Password(s) for internal users have changed") try: self.charm.config_manager.set_acl_file(passwords=new_passwords) + self.charm.config_manager.set_sentinel_acl_file(passwords=new_passwords) if self.charm.state.unit_server.is_started: self.charm.cluster_manager.reload_acl_file() + # todo: request rolling restart + self.charm.sentinel_manager.restart_service() self.charm.state.cluster.update( { f"{user.value.replace('-', '_')}_password": new_passwords[user.value] diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 688542cc..6215032c 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -498,13 +498,17 @@ def test_config_changed_leader_unit(cloud_spec): ) with ( patch("managers.config.ConfigManager.set_acl_file") as mock_set_acl_file, + patch("managers.config.ConfigManager.set_sentinel_acl_file") as set_sentinel_acl_file, patch("common.client.ValkeyClient.acl_load") as mock_acl_load, patch("common.client.ValkeyClient.config_set") as mock_config_set, + patch("managers.sentinel.SentinelManager.restart_service") as restart_sentinel, ): state_out = ctx.run(ctx.on.config_changed(), state_in) mock_set_acl_file.assert_called_once() mock_acl_load.assert_called_once() mock_config_set.assert_called_once() + set_sentinel_acl_file.assert_called_once() + restart_sentinel.assert_called_once() secret_out = state_out.get_secret( label=f"{PEER_RELATION}.{APP_NAME}.app.{INTERNAL_USERS_SECRET_LABEL_SUFFIX}" ) @@ -620,15 +624,19 @@ def test_change_password_secret_changed_non_leader_unit(cloud_spec): "events.base_events.BaseEvents._update_internal_users_password" ) as mock_update_password, patch("managers.config.ConfigManager.set_acl_file") as mock_set_acl_file, + patch("managers.config.ConfigManager.set_sentinel_acl_file") as set_sentinel_acl_file, patch("common.client.ValkeyClient.acl_load") as mock_acl_load, patch("common.client.ValkeyClient.config_set") as mock_config_set, patch("managers.sentinel.SentinelManager.get_primary_ip", return_value="127.0.1.1"), + patch("managers.sentinel.SentinelManager.restart_service") as restart_sentinel, ): ctx.run(ctx.on.secret_changed(password_secret), state_in) mock_update_password.assert_not_called() mock_set_acl_file.assert_called_once() mock_acl_load.assert_called_once() mock_config_set.assert_called_once() + set_sentinel_acl_file.assert_called_once() + restart_sentinel.assert_called_once() def test_change_password_secret_changed_non_leader_unit_not_successful(cloud_spec): @@ -658,6 +666,7 @@ def test_change_password_secret_changed_non_leader_unit_not_successful(cloud_spe "events.base_events.BaseEvents._update_internal_users_password" ) as mock_update_password, patch("managers.config.ConfigManager.set_acl_file") as mock_set_acl_file, + patch("managers.config.ConfigManager.set_sentinel_acl_file") as set_sentinel_acl_file, patch( "common.client.ValkeyClient.exec_cli_command", side_effect=ValkeyWorkloadCommandError("Failed to execute command"), @@ -668,6 +677,7 @@ def test_change_password_secret_changed_non_leader_unit_not_successful(cloud_spe state_out = manager.run() mock_update_password.assert_not_called() mock_set_acl_file.assert_called_once() + set_sentinel_acl_file.assert_called_once() mock_exec_command.assert_called_once_with( ["acl", "load"], hostname="valkey-0.valkey-endpoints" ) From 49f748b3d0ae173661fdc8d1fcf12cb4a98fad82 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 11:22:48 +0200 Subject: [PATCH 10/18] integration test coverage --- src/scripts/topology_observer.py | 25 +-- .../clients/test_client_relation.py | 147 +++++++++++++++++- 2 files changed, 160 insertions(+), 12 deletions(-) diff --git a/src/scripts/topology_observer.py b/src/scripts/topology_observer.py index 94c0f67e..6849c541 100644 --- a/src/scripts/topology_observer.py +++ b/src/scripts/topology_observer.py @@ -3,6 +3,7 @@ """Topology observer class for checking changes in Primary/Replica topology.""" +import logging import signal import subprocess import sys @@ -10,11 +11,17 @@ from valkey.sentinel import MasterNotFoundError, Sentinel -from literals import PRIMARY_NAME, TOPOLOGY_OBSERVER_LOG_FILE, TOPOLOGY_OBSERVER_TLS_CA_FILE +from literals import PRIMARY_NAME, TOPOLOGY_OBSERVER_TLS_CA_FILE # use global variable for gracefully handling stop signals continue_running = True +logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + level=logging.DEBUG, + datefmt="%Y-%m-%d %H:%M:%S", +) + def dispatch(unit_name: str, charm_dir: str) -> None: """Dispatch a Juju custom event.""" @@ -39,8 +46,7 @@ def main() -> None: # handle the stop signal for a graceful stop of the subscription client signal.signal(signal.SIGTERM, handle_stop_signal) - with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: - log_file.write(f"Starting new observer for hosts {hosts} with tls={tls}\n") + logging.info("Starting new observer for hosts %s with tls=%s", hosts, tls) host_list = hosts.split(",") addresses = [ @@ -73,22 +79,19 @@ def main() -> None: try: primary_name = sentinel.discover_master(PRIMARY_NAME)[0] except MasterNotFoundError as e: - with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: - log_file.write(f"Failed to discover primary: {e}\n") + logging.error("Failed to discover primary: %s", e) continue if previous_primary == "" or primary_name == previous_primary: continue - with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: - log_file.write( - f"Primary change detected: previously {previous_primary}, now {primary_name}\n" - ) + logging.info( + "Primary change detected: previously %s, now %s", previous_primary, primary_name + ) dispatch(unit_name, charm_dir) else: - with open(TOPOLOGY_OBSERVER_LOG_FILE, "a") as log_file: - log_file.write("Gracefully stopping observer\n") + logging.info("Gracefully stopping observer") if __name__ == "__main__": diff --git a/tests/integration/clients/test_client_relation.py b/tests/integration/clients/test_client_relation.py index 26838340..681aa34a 100644 --- a/tests/integration/clients/test_client_relation.py +++ b/tests/integration/clients/test_client_relation.py @@ -16,13 +16,16 @@ import jubilant import pytest -from literals import Substrate +from literals import CharmUsers, Substrate from tests.integration.helpers import ( APP_NAME, IMAGE_RESOURCE, TLS_CHANNEL, TLS_NAME, are_agents_idle, + exec_valkey_cli, + get_cluster_addresses, + get_password, ) logger = logging.getLogger(__name__) @@ -166,6 +169,77 @@ def test_integrate_client_interface_v1(juju: jubilant.Juju) -> None: assert result == TEST_VALUE +def test_failover_topology_update(juju: jubilant.Juju) -> None: + """Trigger a failover and ensure clients can still access Valkey.""" + ip_address = get_cluster_addresses(juju, APP_NAME)[0] + logger.info("Initiate failover through Sentinel %s", ip_address) + + failover_result = exec_valkey_cli( + hostname=ip_address, + username=CharmUsers.SENTINEL_CHARM_ADMIN, + password=get_password(juju, CharmUsers.SENTINEL_CHARM_ADMIN), + command="sentinel failover primary", + tls_enabled=False, + sentinel=True, + ).stdout + assert failover_result == "OK", "Failover not successful" + juju.wait( + lambda status: are_agents_idle(status, APP_NAME, idle_period=30, unit_count=NUM_UNITS), + timeout=600, + ) + + logger.info("Ensure access after failover for v0 client") + requirer_unit = next(iter(juju.status().get_units(REQUIRER_V0_NAME))) + get_credentials_action = juju.run(requirer_unit, "get-credentials") + username = get_credentials_action.results["usernames"] + + logger.info("Write data to granted keyspace") + set_action = juju.run( + requirer_unit, + "set", + params={"key": f"requirer-charm:{TEST_KEY}", "value": TEST_VALUE, "user": username}, + ) + assert set_action.status == "completed", "Action should succeed" + + logger.info("Read data that was just written") + get_action = juju.run( + requirer_unit, + "get", + params={"key": f"requirer-charm:{TEST_KEY}", "user": username}, + ) + assert get_action.status == "completed", "Action should succeed" + result = get_action.results["result"] + assert result == TEST_VALUE + + logger.info("Ensure access after failover for v1 client") + requirer_unit = next(iter(juju.status().get_units(REQUIRER_V1_NAME))) + get_credentials_action = juju.run(requirer_unit, "get-credentials") + usernames = get_credentials_action.results["usernames"] + user_restricted_keyspace = usernames.split(",")[0] + + logger.info("Write data to granted keyspace") + set_action = juju.run( + requirer_unit, + "set", + params={ + "key": f"requirer-charm:{TEST_KEY}", + "value": TEST_VALUE, + "user": user_restricted_keyspace, + }, + ) + assert set_action.status == "completed", "Action should succeed" + + logger.info("Read data that was just written") + get_action = juju.run( + requirer_unit, + "get", + params={"key": f"requirer-charm:{TEST_KEY}", "user": user_restricted_keyspace}, + ) + assert get_action.status == "completed", "Action should succeed" + result = get_action.results["result"] + assert result == TEST_VALUE + + def test_enable_tls(juju: jubilant.Juju) -> None: """Enable TLS on Valkey and the clients and ensure they can still read and write.""" logger.info("Enabling client TLS") @@ -247,6 +321,77 @@ def test_enable_tls(juju: jubilant.Juju) -> None: assert result == TEST_VALUE +def test_failover_topology_update_with_tls(juju: jubilant.Juju) -> None: + """Trigger a failover with TLS enabled and ensure clients can still access Valkey.""" + ip_address = get_cluster_addresses(juju, APP_NAME)[0] + logger.info("Initiate failover through Sentinel %s", ip_address) + + failover_result = exec_valkey_cli( + hostname=ip_address, + username=CharmUsers.SENTINEL_CHARM_ADMIN, + password=get_password(juju, CharmUsers.SENTINEL_CHARM_ADMIN), + command="sentinel failover primary", + tls_enabled=True, + sentinel=True, + ).stdout + assert failover_result == "OK", "Failover not successful" + juju.wait( + lambda status: are_agents_idle(status, APP_NAME, idle_period=30, unit_count=NUM_UNITS), + timeout=600, + ) + + logger.info("Ensure access after failover for v0 client") + requirer_unit = next(iter(juju.status().get_units(REQUIRER_V0_NAME))) + get_credentials_action = juju.run(requirer_unit, "get-credentials") + username = get_credentials_action.results["usernames"] + + logger.info("Write data to granted keyspace") + set_action = juju.run( + requirer_unit, + "set", + params={"key": f"requirer-charm:{TEST_KEY}", "value": TEST_VALUE, "user": username}, + ) + assert set_action.status == "completed", "Action should succeed" + + logger.info("Read data that was just written") + get_action = juju.run( + requirer_unit, + "get", + params={"key": f"requirer-charm:{TEST_KEY}", "user": username}, + ) + assert get_action.status == "completed", "Action should succeed" + result = get_action.results["result"] + assert result == TEST_VALUE + + logger.info("Ensure access after failover for v1 client") + requirer_unit = next(iter(juju.status().get_units(REQUIRER_V1_NAME))) + get_credentials_action = juju.run(requirer_unit, "get-credentials") + usernames = get_credentials_action.results["usernames"] + user_restricted_keyspace = usernames.split(",")[0] + + logger.info("Write data to granted keyspace") + set_action = juju.run( + requirer_unit, + "set", + params={ + "key": f"requirer-charm:{TEST_KEY}", + "value": TEST_VALUE, + "user": user_restricted_keyspace, + }, + ) + assert set_action.status == "completed", "Action should succeed" + + logger.info("Read data that was just written") + get_action = juju.run( + requirer_unit, + "get", + params={"key": f"requirer-charm:{TEST_KEY}", "user": user_restricted_keyspace}, + ) + assert get_action.status == "completed", "Action should succeed" + result = get_action.results["result"] + assert result == TEST_VALUE + + def test_certificate_transfer(juju: jubilant.Juju) -> None: """Relate Requirer charms to separate TLS providers and ensure functionality.""" logger.info("Enable certificate transfer to Valkey") From 4c930251f4e3a35472da9daf14e9f50c0be8b69d Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 11:41:18 +0200 Subject: [PATCH 11/18] adjust test steps --- .../clients/test_client_relation.py | 73 +------------------ 1 file changed, 1 insertion(+), 72 deletions(-) diff --git a/tests/integration/clients/test_client_relation.py b/tests/integration/clients/test_client_relation.py index 681aa34a..780feeec 100644 --- a/tests/integration/clients/test_client_relation.py +++ b/tests/integration/clients/test_client_relation.py @@ -184,7 +184,7 @@ def test_failover_topology_update(juju: jubilant.Juju) -> None: ).stdout assert failover_result == "OK", "Failover not successful" juju.wait( - lambda status: are_agents_idle(status, APP_NAME, idle_period=30, unit_count=NUM_UNITS), + lambda status: are_agents_idle(status, APP_NAME, idle_period=60, unit_count=NUM_UNITS), timeout=600, ) @@ -321,77 +321,6 @@ def test_enable_tls(juju: jubilant.Juju) -> None: assert result == TEST_VALUE -def test_failover_topology_update_with_tls(juju: jubilant.Juju) -> None: - """Trigger a failover with TLS enabled and ensure clients can still access Valkey.""" - ip_address = get_cluster_addresses(juju, APP_NAME)[0] - logger.info("Initiate failover through Sentinel %s", ip_address) - - failover_result = exec_valkey_cli( - hostname=ip_address, - username=CharmUsers.SENTINEL_CHARM_ADMIN, - password=get_password(juju, CharmUsers.SENTINEL_CHARM_ADMIN), - command="sentinel failover primary", - tls_enabled=True, - sentinel=True, - ).stdout - assert failover_result == "OK", "Failover not successful" - juju.wait( - lambda status: are_agents_idle(status, APP_NAME, idle_period=30, unit_count=NUM_UNITS), - timeout=600, - ) - - logger.info("Ensure access after failover for v0 client") - requirer_unit = next(iter(juju.status().get_units(REQUIRER_V0_NAME))) - get_credentials_action = juju.run(requirer_unit, "get-credentials") - username = get_credentials_action.results["usernames"] - - logger.info("Write data to granted keyspace") - set_action = juju.run( - requirer_unit, - "set", - params={"key": f"requirer-charm:{TEST_KEY}", "value": TEST_VALUE, "user": username}, - ) - assert set_action.status == "completed", "Action should succeed" - - logger.info("Read data that was just written") - get_action = juju.run( - requirer_unit, - "get", - params={"key": f"requirer-charm:{TEST_KEY}", "user": username}, - ) - assert get_action.status == "completed", "Action should succeed" - result = get_action.results["result"] - assert result == TEST_VALUE - - logger.info("Ensure access after failover for v1 client") - requirer_unit = next(iter(juju.status().get_units(REQUIRER_V1_NAME))) - get_credentials_action = juju.run(requirer_unit, "get-credentials") - usernames = get_credentials_action.results["usernames"] - user_restricted_keyspace = usernames.split(",")[0] - - logger.info("Write data to granted keyspace") - set_action = juju.run( - requirer_unit, - "set", - params={ - "key": f"requirer-charm:{TEST_KEY}", - "value": TEST_VALUE, - "user": user_restricted_keyspace, - }, - ) - assert set_action.status == "completed", "Action should succeed" - - logger.info("Read data that was just written") - get_action = juju.run( - requirer_unit, - "get", - params={"key": f"requirer-charm:{TEST_KEY}", "user": user_restricted_keyspace}, - ) - assert get_action.status == "completed", "Action should succeed" - result = get_action.results["result"] - assert result == TEST_VALUE - - def test_certificate_transfer(juju: jubilant.Juju) -> None: """Relate Requirer charms to separate TLS providers and ensure functionality.""" logger.info("Enable certificate transfer to Valkey") From 7a85afb583e578f27e6da913ddf509fad1bf441a Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 12:37:13 +0200 Subject: [PATCH 12/18] beautify --- src/events/base_events.py | 2 +- src/events/external_clients.py | 1 - src/managers/topology.py | 9 +++++---- tests/unit/conftest.py | 5 +++++ tests/unit/test_tls.py | 5 ----- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/events/base_events.py b/src/events/base_events.py index 0f744ad3..40955f7c 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -292,7 +292,7 @@ def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None: logger.error(f"Failed to update sentinel quorum: {e}") # not critical to defer here, we can wait for the next relation change - if not self.charm.state.unit_server.is_active: + if not self.charm.unit.is_leader() or not self.charm.state.unit_server.is_active: return try: diff --git a/src/events/external_clients.py b/src/events/external_clients.py index 9e15098e..0ba1c3b4 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -387,7 +387,6 @@ def _on_topology_changed(self, event: TopologyChangedEvent) -> None: logger.info("Received topology-changed event") try: - self.charm.sentinel_manager.reconcile_k8s_services() self.charm.sentinel_manager.set_pod_labels() except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: logger.error("Error updating Kubernetes services: %s", e) diff --git a/src/managers/topology.py b/src/managers/topology.py index c3a7ed41..e5ac7a85 100644 --- a/src/managers/topology.py +++ b/src/managers/topology.py @@ -71,10 +71,11 @@ def start_observer(self) -> None: port = SENTINEL_TLS_PORT if self.state.unit_server.is_tls_enabled else SENTINEL_PORT hosts = ",".join(sorted([f"{server}:{port}" for server in started_servers])) - # Store current TLS CA cert on operator container - tls_ca_cert = self.workload.read_file(self.workload.tls_paths.client_ca) - path = Path(TOPOLOGY_OBSERVER_TLS_CA_FILE) - path.write_text(tls_ca_cert) + if self.state.unit_server.is_tls_enabled: + # Store current TLS CA cert on operator container + tls_ca_cert = self.workload.read_file(self.workload.tls_paths.client_ca) + path = Path(TOPOLOGY_OBSERVER_TLS_CA_FILE) + path.write_text(tls_ca_cert) logging.info("Starting topology observer") pid = subprocess.Popen( # noqa: S603 diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 515b6912..0a67b405 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -32,6 +32,11 @@ def mock_k8s_client(mocker): mocker.patch("lightkube.core.client.GenericSyncClient") +@pytest.fixture(autouse=True) +def mock_start_topology_observer(mocker): + mocker.patch("managers.topology.TopologyManager.start_observer") + + @pytest.fixture(autouse=True) def tenacity_wait(mocker): mocker.patch("tenacity.nap.time") diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index fcbc8e25..8314d641 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -553,7 +553,6 @@ def test_client_certificate_renewed(cloud_spec): patch("workload_k8s.ValkeyK8sWorkload.write_file") as write_certs, patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, - patch("managers.topology.TopologyManager.start_observer"), ): event.certificate = certificate.certificate charm.tls_events._on_certificate_available(event) @@ -612,7 +611,6 @@ def test_new_client_ca_single_unit(cloud_spec): patch("workload_k8s.ValkeyK8sWorkload.write_file") as write_certs, patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, - patch("managers.topology.TopologyManager.start_observer"), ): event.certificate = certificate.certificate charm.tls_events._on_certificate_available(event) @@ -676,7 +674,6 @@ def test_new_client_ca_rotation_started(cloud_spec): patch("workload_k8s.ValkeyK8sWorkload.write_file") as write_certs, patch("managers.tls.TLSManager.rehash_ca_certificates"), patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, - patch("managers.topology.TopologyManager.start_observer"), ): event.certificate = certificate.certificate charm.tls_events._on_certificate_available(event) @@ -721,7 +718,6 @@ def test_internal_peer_ca_rotation_single_unit(cloud_spec): patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), - patch("managers.topology.TopologyManager.restart_observer"), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation, remote_unit=1), state_in) @@ -764,7 +760,6 @@ def test_internal_peer_ca_rotation_started(cloud_spec): patch("managers.cluster.ClusterManager.reload_tls_settings") as reload_tls, patch("managers.sentinel.SentinelManager.restart_service"), patch("common.client.SentinelClient.primary", return_value={"quorum": "1"}), - patch("managers.topology.TopologyManager.restart_observer"), ): state_out = ctx.run(ctx.on.relation_changed(peer_relation, remote_unit=1), state_in) From 887b6cef595457d94e5ef34d07ea50a717ad1e15 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Wed, 22 Apr 2026 14:35:44 +0200 Subject: [PATCH 13/18] handle leadership changes --- src/events/base_events.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/events/base_events.py b/src/events/base_events.py index 40955f7c..283ce116 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -331,6 +331,12 @@ def _on_leader_elected(self, event: ops.LeaderElectedEvent) -> None: if not self.charm.unit.is_leader(): return + if self.charm.state.unit_server.is_active: + try: + self.charm.topology_manager.start_observer() + except (ValkeyWorkloadCommandError, ValueError) as e: + logger.error("Failed to start topology observer: %s", e) + if self.charm.state.cluster.internal_users_credentials: logger.debug("Internal user credentials already set") return From 1474d534fd3dcfb8e27c7f8768dd3f5cb6269846 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Thu, 23 Apr 2026 08:36:04 +0200 Subject: [PATCH 14/18] additional error handling --- src/common/k8s_client.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/common/k8s_client.py b/src/common/k8s_client.py index d4858e2d..a7eb3e02 100644 --- a/src/common/k8s_client.py +++ b/src/common/k8s_client.py @@ -47,11 +47,14 @@ def ensure_endpoint_service(self, role: str, port: int) -> None: if e.status.code != 404: raise KubernetesClientError from e - pod0 = self.client.get( - res=Pod, - name=self.app_name + "-0", - namespace=self.namespace, - ) + try: + pod0 = self.client.get( + res=Pod, + name=self.app_name + "-0", + namespace=self.namespace, + ) + except ApiError as e: + raise KubernetesClientError from e service = Service( apiVersion="v1", From 613ef618e7723da5f1348ce6232cb6ed0bf68013 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Thu, 23 Apr 2026 10:24:19 +0200 Subject: [PATCH 15/18] update file name --- src/literals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/literals.py b/src/literals.py index 31237dab..7cc52697 100644 --- a/src/literals.py +++ b/src/literals.py @@ -28,7 +28,7 @@ # todo: these paths require root access, should be moved to dedicated user directories TOPOLOGY_OBSERVER_LOG_FILE = "/var/log/topology_observer.log" -TOPOLOGY_OBSERVER_TLS_CA_FILE = "/etc/ssl/certs/Valkey_CA.pem" +TOPOLOGY_OBSERVER_TLS_CA_FILE = "/etc/ssl/certs/valkey_ca.pem" PEER_RELATION = "valkey-peers" STATUS_PEERS_RELATION = "status-peers" From 36a4ba32356f68f47f2dc49f3b44f3c0e416fbc2 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Thu, 23 Apr 2026 12:20:29 +0200 Subject: [PATCH 16/18] move substrate check to event handler, instantiate `k8s_client` as `None` in VM --- src/events/external_clients.py | 9 +++++++-- src/managers/sentinel.py | 4 +--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/events/external_clients.py b/src/events/external_clients.py index 4e80b8b4..453fd44e 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -30,7 +30,12 @@ ValkeyTLSLoadError, ValkeyWorkloadCommandError, ) -from literals import CERTIFICATE_TRANSFER_RELATION, EXTERNAL_CLIENTS_RELATION, PEER_RELATION +from literals import ( + CERTIFICATE_TRANSFER_RELATION, + EXTERNAL_CLIENTS_RELATION, + PEER_RELATION, + Substrate, +) if TYPE_CHECKING: from charm import ValkeyCharm @@ -187,7 +192,7 @@ def _on_peer_relation_changed(self, event: ops.RelationChangedEvent) -> None: if not self.charm.state.unit_server.is_started: return - if self.charm.unit.is_leader(): + if self.charm.unit.is_leader() and self.charm.state.substrate == Substrate.K8S: try: self.charm.sentinel_manager.reconcile_k8s_services() except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: diff --git a/src/managers/sentinel.py b/src/managers/sentinel.py index 431b7f05..ff404126 100644 --- a/src/managers/sentinel.py +++ b/src/managers/sentinel.py @@ -49,6 +49,7 @@ def __init__(self, state: ClusterState, workload: WorkloadBase): self.workload = workload self.admin_user = CharmUsers.SENTINEL_CHARM_ADMIN.value + self.k8s_client: K8sClient | None = None if self.state.substrate == Substrate.K8S: self.k8s_client = K8sClient( namespace=self.state.model.name, @@ -381,9 +382,6 @@ def set_quorum(self, quorum: int) -> None: def reconcile_k8s_services(self) -> None: """Create or update the services and pod labels in Kubernetes.""" - if self.state.substrate == Substrate.VM: - return - valkey_port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT self.k8s_client.ensure_endpoint_service(role=K8sService.PRIMARY.value, port=valkey_port) From 88b7ae04452301e68adda06c63d87b81fb5ef3bc Mon Sep 17 00:00:00 2001 From: reneradoi Date: Thu, 23 Apr 2026 13:21:30 +0200 Subject: [PATCH 17/18] minor code improvements, add error handling for failed dispatch commands --- src/events/base_events.py | 4 ++-- src/events/external_clients.py | 17 ++++++++++------- src/managers/topology.py | 3 ++- src/scripts/topology_observer.py | 7 +++++-- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/events/base_events.py b/src/events/base_events.py index 960c69fb..6a5e67ec 100644 --- a/src/events/base_events.py +++ b/src/events/base_events.py @@ -436,8 +436,8 @@ def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None: self.charm.topology_manager.restart_observer() except (ValkeyWorkloadCommandError, ValueError) as e: logger.error("Failed to restart topology observer: %s", e) - finally: - return + + return # from here, code is only relevant for non-leader units if event.secret.label and event.secret.label.endswith(INTERNAL_USERS_SECRET_LABEL_SUFFIX): diff --git a/src/events/external_clients.py b/src/events/external_clients.py index a5a3685d..d5c757a4 100644 --- a/src/events/external_clients.py +++ b/src/events/external_clients.py @@ -97,7 +97,9 @@ def __init__(self, charm: "ValkeyCharm"): self.framework.observe(self.charm.on.topology_changed, self._on_topology_changed) def _on_bulk_resources_requested( - self, event: BulkResourcesRequestedEvent[RequirerCommonModel] | ResourceRequestedEvent + self, + event: BulkResourcesRequestedEvent[RequirerCommonModel] + | ResourceRequestedEvent[RequirerCommonModel], ) -> None: """Handle bulk resources requested event.""" if not self.charm.unit.is_leader(): @@ -391,12 +393,13 @@ def _on_topology_changed(self, event: TopologyChangedEvent) -> None: return logger.info("Received topology-changed event") - try: - self.charm.sentinel_manager.set_pod_labels() - except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: - logger.error("Error updating Kubernetes services: %s", e) - event.defer() - return + if self.charm.state.substrate == Substrate.K8S: + try: + self.charm.sentinel_manager.set_pod_labels() + except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e: + logger.error("Error updating Kubernetes services: %s", e) + event.defer() + return if not self.charm.state.external_client_relations: return diff --git a/src/managers/topology.py b/src/managers/topology.py index e5ac7a85..8dd59287 100644 --- a/src/managers/topology.py +++ b/src/managers/topology.py @@ -110,9 +110,10 @@ def stop_observer(self) -> None: try: os.kill(int(observer_pid), signal.SIGTERM) logger.info("Topology observer stopped") - self.state.unit_server.update({"topology_observer_pid": ""}) except OSError: pass + finally: + self.state.unit_server.update({"topology_observer_pid": ""}) def restart_observer(self) -> None: """Stop and start the topology observer to pickup host changes.""" diff --git a/src/scripts/topology_observer.py b/src/scripts/topology_observer.py index 6849c541..db1dcaa5 100644 --- a/src/scripts/topology_observer.py +++ b/src/scripts/topology_observer.py @@ -30,7 +30,7 @@ def dispatch(unit_name: str, charm_dir: str) -> None: juju_run_command = "/usr/bin/juju-exec" dispatch_command = f"JUJU_DISPATCH_PATH=hooks/{custom_event} {charm_dir}/dispatch" - subprocess.run([juju_run_command, "-u", unit_name, dispatch_command]) + subprocess.run([juju_run_command, "-u", unit_name, dispatch_command], check=True) def handle_stop_signal(signum, frame) -> None: @@ -88,7 +88,10 @@ def main() -> None: logging.info( "Primary change detected: previously %s, now %s", previous_primary, primary_name ) - dispatch(unit_name, charm_dir) + try: + dispatch(unit_name, charm_dir) + except subprocess.CalledProcessError as e: + logging.error("Error when dispatching Juju event: %s", e) else: logging.info("Gracefully stopping observer") From 0c5040e0ac360057353e17dae03467845b6eaba6 Mon Sep 17 00:00:00 2001 From: reneradoi Date: Fri, 24 Apr 2026 12:56:56 +0200 Subject: [PATCH 18/18] fix timeout for disabling TLS --- tests/integration/tls/test_certificate_options.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/tls/test_certificate_options.py b/tests/integration/tls/test_certificate_options.py index b43e8f7d..e812ca40 100644 --- a/tests/integration/tls/test_certificate_options.py +++ b/tests/integration/tls/test_certificate_options.py @@ -287,5 +287,5 @@ def test_certificate_denied(juju: jubilant.Juju) -> None: lambda status: are_apps_active_and_agents_idle( status, APP_NAME, idle_period=30, unit_count=NUM_UNITS ), - timeout=100, + timeout=600, )