Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c042adf
add K8s client class, create and update services and pod labels
reneradoi Apr 16, 2026
572e889
update rust to 1.94.0 to build `lightkube`
reneradoi Apr 17, 2026
48ba7d2
publish service endpoints in client relation on K8s
reneradoi Apr 17, 2026
f7816b0
add K8s services to SANs DNS in TLS certificate requests
reneradoi Apr 17, 2026
8d3cc01
Merge remote-tracking branch 'origin/9/edge' into add-k8s-services
reneradoi Apr 17, 2026
3c91b2b
WIP: add topology observer
reneradoi Apr 20, 2026
6c46244
Merge branch '9/edge' into add-k8s-services
Mehdi-Bendriss Apr 21, 2026
c7d98ff
add topology observer based on valkey-py
reneradoi Apr 21, 2026
fd44750
handle `topology-changed` event
reneradoi Apr 22, 2026
06d1bc6
workaround for core26 snap, remove pubsub channel permissions from ad…
reneradoi Apr 22, 2026
5460bf2
bugfix: passwords for sentinel users not updated on password-change
reneradoi Apr 22, 2026
49f748b
integration test coverage
reneradoi Apr 22, 2026
4c93025
adjust test steps
reneradoi Apr 22, 2026
3378430
Merge remote-tracking branch 'origin/add-k8s-services' into add-topol…
reneradoi Apr 22, 2026
7a85afb
beautify
reneradoi Apr 22, 2026
887b6ce
handle leadership changes
reneradoi Apr 22, 2026
438490f
Merge remote-tracking branch 'origin/9/edge' into add-k8s-services
reneradoi Apr 23, 2026
1474d53
additional error handling
reneradoi Apr 23, 2026
29fadf7
Merge branch 'add-k8s-services' into add-topology-observer
reneradoi Apr 23, 2026
613ef61
update file name
reneradoi Apr 23, 2026
36a4ba3
move substrate check to event handler, instantiate `k8s_client` as `N…
reneradoi Apr 23, 2026
ba70e90
Merge branch 'add-k8s-services' into add-topology-observer
reneradoi Apr 23, 2026
88b7ae0
minor code improvements, add error handling for failed dispatch commands
reneradoi Apr 23, 2026
55af2d0
Merge remote-tracking branch 'origin/9/edge' into add-topology-observer
reneradoi Apr 23, 2026
0c5040e
fix timeout for disabling TLS
reneradoi Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ data-platform-helpers = ">=0.1.7"
validators = ">=0.35.0"
dpcharmlibs-interfaces = ">=1.0.2"
lightkube = ">=0.19.0"
valkey = "^6.1.1"

[tool.poetry.requires-plugins]
poetry-plugin-export = ">=1.8"
Expand Down
6 changes: 5 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

from core.cluster_state import ClusterState
from events.base_events import BaseEvents
from events.external_clients import ExternalClientsEvents
from events.external_clients import ExternalClientsEvents, TopologyChangedCharmEvents
from events.tls import TLSEvents
from literals import CONTAINER, Substrate
from managers.cluster import ClusterManager
from managers.config import ConfigManager
from managers.external_clients import ExternalClientsManager
from managers.sentinel import SentinelManager
from managers.tls import TLSManager
from managers.topology import TopologyManager
from workload_k8s import ValkeyK8sWorkload
from workload_vm import ValkeyVmWorkload

Expand All @@ -28,6 +29,8 @@
class ValkeyCharm(ops.CharmBase):
"""Charmed Operator for Valkey."""

on = TopologyChangedCharmEvents()
Comment thread
reneradoi marked this conversation as resolved.

def __init__(self, *args) -> None:
super().__init__(*args)
try:
Expand All @@ -50,6 +53,7 @@ def __init__(self, *args) -> None:
self.sentinel_manager = SentinelManager(state=self.state, workload=self.workload)
self.tls_manager = TLSManager(state=self.state, workload=self.workload)
self.client_manager = ExternalClientsManager(state=self.state, workload=self.workload)
self.topology_manager = TopologyManager(state=self.state, workload=self.workload)

# --- STATUS HANDLER ---
self.status = StatusHandler(
Expand Down
1 change: 1 addition & 0 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class PeerUnitModel(PeerModel):
is_valkey_healthy: bool = Field(default=True)
is_sentinel_healthy: bool = Field(default=True)
client_user_epoch: float = Field(default=0)
topology_observer_pid: int = Field(default=0)


class RelationState:
Expand Down
62 changes: 62 additions & 0 deletions src/events/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ def _on_unit_fully_started(self, event: UnitFullyStarted) -> None:
self.charm.unit.open_port("tcp", CLIENT_PORT)
self.charm.unit.open_port("tcp", TLS_PORT)

if not self.charm.unit.is_leader():
return

try:
self.charm.topology_manager.start_observer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why isn't this run at the beginning of _on_unit_fully_started? in general, why are we conditioning the start of this process on the health of the server?

Copy link
Copy Markdown
Contributor Author

@reneradoi reneradoi Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily on the health of the server, rather on the starting procedure to be completed on the leader unit.

Two things to consider here:

  • I think there is no need to start an observer if the unit has not fully started yet, given that this is only started on the leader, which usually starts up first -> the observer would only run into connection errors anyway.
  • There is also no need to have the observer in place that early, as there are a lot of peer-relation-changed events during the startup process. The services and pod labels will only be created anyway as soon as the leader unit is up (here). And in case of client relations: The relation data will not be published until the leader unit is up, too.

except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to start topology observer: %s", e)

def _on_peer_relation_changed(self, _: ops.RelationChangedEvent) -> None:
"""Handle event received by all units when a unit's relation data changes."""
try:
Expand All @@ -264,6 +272,15 @@ def _on_peer_relation_changed(self, _: ops.RelationChangedEvent) -> None:
for lock in [StartLock(self.charm.state), RestartLock(self.charm.state)]:
lock.process()

if not self.charm.state.unit_server.is_active:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar question here, the health of the unit's service should not have an impact on the topology observer process start, or am I missing something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the health doesn't impact the observer, only the starting procedure to be completed (and the unit not being in the process of being removed).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For context: The is_active flag means the unit started (started flag is set) and is not being scaled down.

return

# need to pick up scaling operations, TLS switchover, CA rotation and so on
try:
self.charm.topology_manager.restart_observer()
Comment thread
reneradoi marked this conversation as resolved.
except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to restart topology observer: %s", e)

def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None:
"""Handle event received by all units when a unit departs."""
try:
Expand All @@ -272,10 +289,27 @@ def _on_peer_relation_departed(self, _: ops.RelationDepartedEvent) -> None:
logger.error(f"Failed to update sentinel quorum: {e}")
# not critical to defer here, we can wait for the next relation change

if not self.charm.unit.is_leader() or not self.charm.state.unit_server.is_active:
return

try:
self.charm.topology_manager.restart_observer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we restart in this hook?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a unit leaves, the connection parameters (endpoints) of the observer should be updated to not connect to this unit anymore.

except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to restart topology observer: %s", e)

def _on_update_status(self, event: ops.UpdateStatusEvent) -> None:
"""Handle the update-status event."""
if not self.charm.state.unit_server.is_started:
logger.warning("Service not started")
return

if not self.charm.unit.is_leader():
return

try:
self.charm.topology_manager.start_observer()
except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to start topology observer: %s", e)

def _on_leader_elected(self, event: ops.LeaderElectedEvent) -> None:
"""Handle the leader-elected event."""
Expand All @@ -294,6 +328,12 @@ def _on_leader_elected(self, event: ops.LeaderElectedEvent) -> None:
if not self.charm.unit.is_leader():
return

if self.charm.state.unit_server.is_active:
try:
self.charm.topology_manager.start_observer()
Comment thread
reneradoi marked this conversation as resolved.
except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to start topology observer: %s", e)

if self.charm.state.cluster.internal_users_credentials:
logger.debug("Internal user credentials already set")
return
Expand Down Expand Up @@ -367,6 +407,12 @@ def _on_config_changed(self, event: ops.ConfigChangedEvent) -> None:
event.defer()
return

# propagate updated credentials to topology observer
try:
self.charm.topology_manager.restart_observer()
except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to restart topology observer: %s", e)

def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None:
"""Handle the secret_changed event."""
if not (admin_secret_id := self.charm.config.get(INTERNAL_USERS_PASSWORD_CONFIG)):
Expand All @@ -384,15 +430,25 @@ def _on_secret_changed(self, event: ops.SecretChangedEvent) -> None:
):
event.defer()
return

# propagate updated credentials to topology observer
try:
self.charm.topology_manager.restart_observer()
except (ValkeyWorkloadCommandError, ValueError) as e:
logger.error("Failed to restart topology observer: %s", e)

return

# from here, code is only relevant for non-leader units
if event.secret.label and event.secret.label.endswith(INTERNAL_USERS_SECRET_LABEL_SUFFIX):
# leader unit processed the secret change from user, non-leader units can replicate
try:
self.charm.config_manager.set_acl_file()
self.charm.config_manager.set_sentinel_acl_file()
if self.charm.state.unit_server.is_started:
self.charm.cluster_manager.reload_acl_file()
# todo: request rolling restart
self.charm.sentinel_manager.restart_service()
# update the local unit admin password to match the leader
self.charm.config_manager.update_local_valkey_admin_password()
if self.charm.state.unit_server.is_started:
Expand Down Expand Up @@ -454,8 +510,11 @@ def _update_internal_users_password(self, secret_id: str) -> None:
logger.info("Password(s) for internal users have changed")
try:
self.charm.config_manager.set_acl_file(passwords=new_passwords)
self.charm.config_manager.set_sentinel_acl_file(passwords=new_passwords)
if self.charm.state.unit_server.is_started:
self.charm.cluster_manager.reload_acl_file()
# todo: request rolling restart
self.charm.sentinel_manager.restart_service()
self.charm.state.cluster.update(
{
f"{user.value.replace('-', '_')}_password": new_passwords[user.value]
Expand Down Expand Up @@ -549,6 +608,9 @@ def _on_storage_detaching(self, event: ops.StorageDetachingEvent) -> None:
primary_ip,
)

if self.charm.unit.is_leader():
self.charm.topology_manager.stop_observer()

# stop valkey and sentinel processes
self.charm.workload.stop()
active_sentinels = [
Expand Down
40 changes: 39 additions & 1 deletion src/events/external_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@
logger = logging.getLogger(__name__)


class TopologyChangedEvent(ops.EventBase):
"""A custom event for topology changes."""


class TopologyChangedCharmEvents(ops.CharmEvents):
"""A CharmEvent extension to observe topology changes."""

topology_changed = ops.EventSource(TopologyChangedEvent)


class ExternalClientsEvents(ops.Object):
"""Handle all events for external client relations."""

Expand Down Expand Up @@ -84,9 +94,12 @@ def __init__(self, charm: "ValkeyCharm"):
self.framework.observe(
self.certificate_transfer.on.certificates_removed, self._on_ca_removed
)
self.framework.observe(self.charm.on.topology_changed, self._on_topology_changed)

def _on_bulk_resources_requested(
self, event: BulkResourcesRequestedEvent[RequirerCommonModel] | ResourceRequestedEvent
self,
event: BulkResourcesRequestedEvent[RequirerCommonModel]
| ResourceRequestedEvent[RequirerCommonModel],
) -> None:
"""Handle bulk resources requested event."""
if not self.charm.unit.is_leader():
Expand Down Expand Up @@ -195,6 +208,7 @@ def _on_peer_relation_changed(self, event: ops.RelationChangedEvent) -> None:
if self.charm.unit.is_leader() and self.charm.state.substrate == Substrate.K8S:
try:
self.charm.sentinel_manager.reconcile_k8s_services()
self.charm.sentinel_manager.set_pod_labels()
except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e:
logger.error("Error updating Kubernetes services: %s", e)
event.defer()
Expand Down Expand Up @@ -372,3 +386,27 @@ def _on_ca_removed(self, event: CertificatesRemovedEvent) -> None:
logger.error("Error removing CA certificates for external clients: %s", e)
event.defer()
return

def _on_topology_changed(self, event: TopologyChangedEvent) -> None:
"""Handle custom events for topology changes."""
if not self.charm.unit.is_leader():
return

logger.info("Received topology-changed event")
if self.charm.state.substrate == Substrate.K8S:
try:
self.charm.sentinel_manager.set_pod_labels()
except (KubernetesClientError, ValkeyCannotGetPrimaryIPError) as e:
logger.error("Error updating Kubernetes services: %s", e)
event.defer()
return

if not self.charm.state.external_client_relations:
return

try:
self._update_client_relations()
except (ValkeyCannotGetPrimaryIPError, ValkeyWorkloadCommandError) as e:
logger.error("Error updating client relations: %s", e)
event.defer()
return
4 changes: 4 additions & 0 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
ACL_FILE = "var/lib/valkey/users.acl"
SENTINEL_ACL_FILE = "var/lib/valkey/sentinel-users.acl"

# todo: these paths require root access, should be moved to dedicated user directories
TOPOLOGY_OBSERVER_LOG_FILE = "/var/log/topology_observer.log"
TOPOLOGY_OBSERVER_TLS_CA_FILE = "/etc/ssl/certs/valkey_ca.pem"

PEER_RELATION = "valkey-peers"
STATUS_PEERS_RELATION = "status-peers"
CLIENT_TLS_RELATION_NAME = "client-certificates"
Expand Down
4 changes: 3 additions & 1 deletion src/managers/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,14 @@ def set_quorum(self, quorum: int) -> None:
client.set(self.state.endpoint, PRIMARY_NAME, "quorum", str(quorum))

def reconcile_k8s_services(self) -> None:
"""Create or update the services and pod labels in Kubernetes."""
"""Create or update the services in Kubernetes."""
valkey_port = TLS_PORT if self.state.unit_server.is_tls_enabled else CLIENT_PORT

self.k8s_client.ensure_endpoint_service(role=K8sService.PRIMARY.value, port=valkey_port)
self.k8s_client.ensure_endpoint_service(role=K8sService.REPLICAS.value, port=valkey_port)

def set_pod_labels(self) -> None:
"""Set labels for primary and replica pods in Kubernetes."""
primary_endpoint = self.get_primary_ip()
for unit in self.state.servers:
if not unit.is_active:
Expand Down
Loading
Loading