Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 80 additions & 24 deletions rollingops/src/charmlibs/rollingops/_rollingops_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

from ops import CharmBase, Object, Relation, RelationBrokenEvent
from ops.framework import EventBase
from pydantic import ValidationError

from charmlibs import pathops
from charmlibs.rollingops.common._exceptions import (
RollingOpsDecodingError,
RollingOpsInvalidLockRequestError,
Expand Down Expand Up @@ -66,11 +68,12 @@ class RollingOpsManager(Object):
def __init__(
self,
charm: CharmBase,
peer_relation_name: str,
etcd_relation_name: str,
cluster_id: str,
callback_targets: dict[str, Any],
peer_relation_name: str,
etcd_relation_name: str | None = None,
cluster_id: str | None = None,
sync_lock_targets: dict[str, type[SyncLockBackend]] | None = None,
base_dir: pathops.LocalPath | None = None,
):
"""Create a rolling operations manager with etcd and peer backends.

Expand All @@ -86,19 +89,24 @@ def __init__(

Args:
charm: The charm instance owning this manager.
callback_targets: Mapping of callback identifiers to callables
executed when queued operations are granted the lock.
peer_relation_name: Name of the peer relation used for fallback
state and operation mirroring.
etcd_relation_name: Name of the relation providing etcd access.
If not provided, only peer backend is used.
cluster_id: Identifier used to scope etcd-backed state for this
rolling-ops instance.
callback_targets: Mapping of callback identifiers to callables
executed when queued operations are granted the lock.
sync_lock_targets: Optional mapping of sync lock backend
identifiers to backend implementations used when acquiring
synchronous locks through the peer fallback path.
base_dir: base directory where all files related to rollingops will be written.
"""
super().__init__(charm, 'rolling-ops-manager')

if base_dir is None:
base_dir = pathops.LocalPath('/var/lib/rollingops')

self.charm = charm
self.peer_relation_name = peer_relation_name
self.etcd_relation_name = etcd_relation_name
Expand All @@ -110,17 +118,23 @@ def __init__(
charm=charm,
relation_name=peer_relation_name,
callback_targets=callback_targets,
base_dir=base_dir,
)
self.etcd_backend = EtcdRollingOpsBackend(
charm=charm,
peer_relation_name=peer_relation_name,
etcd_relation_name=etcd_relation_name,
cluster_id=cluster_id,
callback_targets=callback_targets,
)
self.framework.observe(
charm.on[self.etcd_relation_name].relation_broken, self._on_etcd_relation_broken
)
self.etcd_backend: EtcdRollingOpsBackend | None = None
if etcd_relation_name and cluster_id:
Comment thread
patriciareinoso marked this conversation as resolved.
self.etcd_backend = EtcdRollingOpsBackend(
charm=charm,
peer_relation_name=peer_relation_name,
etcd_relation_name=etcd_relation_name,
cluster_id=cluster_id,
callback_targets=callback_targets,
base_dir=base_dir,
)
self.framework.observe(
charm.on[etcd_relation_name].relation_broken,
self._on_etcd_relation_broken,
)

self.framework.observe(charm.on.rollingops_lock_granted, self._on_rollingops_lock_granted)
self.framework.observe(charm.on.rollingops_etcd_failed, self._on_rollingops_etcd_failed)
self.framework.observe(charm.on.update_status, self._on_update_status)
Expand Down Expand Up @@ -158,6 +172,10 @@ def _select_processing_backend(self) -> ProcessingBackend:
Returns:
The selected processing backend.
"""
if self.etcd_backend is None:
logger.info('etcd backend not configured; selecting peer backend.')
return ProcessingBackend.PEER

if not self.etcd_backend.is_available():
logger.info('etcd backend unavailable; selecting peer backend.')
return ProcessingBackend.PEER
Expand All @@ -182,8 +200,12 @@ def _fallback_current_unit_to_peer(self) -> None:
It is used when etcd becomes unavailable, unhealthy, or inconsistent,
so that queued operations can continue without being lost.
"""
if self._peer_relation is None:
logger.info('Peer relation does not exists. Cannot fallback.')
return
self._backend_state.fallback_to_peer()
self.etcd_backend.worker.stop()
if self.etcd_backend is not None:
self.etcd_backend.worker.stop()
self.peer_backend.ensure_processing()

def request_async_lock(
Expand Down Expand Up @@ -218,7 +240,7 @@ def request_async_lock(
if callback_id not in self.peer_backend.callback_targets:
raise RollingOpsInvalidLockRequestError(f'Unknown callback_id: {callback_id}')

if not self._peer_relation:
if self._peer_relation is None:
raise RollingOpsNoRelationError('No %s peer relation yet.', self.peer_relation_name)

if kwargs is None:
Expand All @@ -240,7 +262,7 @@ def request_async_lock(
'Failed to persists operation in peer backend.'
) from e

if backend == ProcessingBackend.ETCD:
if backend == ProcessingBackend.ETCD and self.etcd_backend is not None:
try:
self.etcd_backend.enqueue_operation(operation)
except Exception as e:
Expand All @@ -250,7 +272,7 @@ def request_async_lock(
)
backend = ProcessingBackend.PEER

if backend == ProcessingBackend.ETCD:
if backend == ProcessingBackend.ETCD and self.etcd_backend is not None:
self.etcd_backend.ensure_processing()
else:
self._fallback_current_unit_to_peer()
Expand All @@ -264,6 +286,9 @@ def _on_rollingops_lock_granted(self, event: RollingOpsLockGrantedEvent) -> None
If the current unit is etcd-managed, the operation is executed through
the etcd backend.
"""
if self._peer_relation is None:
logger.error('Peer relation does not exists. Cannot run lock granted.')
return
if self._backend_state.is_peer_managed():
logger.info('Executing rollingop on peer backend.')
self.peer_backend._on_rollingops_lock_granted(event)
Expand All @@ -280,6 +305,11 @@ def _run_etcd_and_mirror_or_fallback(self) -> None:
If etcd execution fails or mirrored state becomes inconsistent, the
manager falls back to the peer backend and resumes processing there.
"""
if self.etcd_backend is None:
logger.info('etcd backend not configured; using peer backend.')
self._fallback_current_unit_to_peer()
return

try:
logger.info('Executing rollingop on etcd backend.')
outcome = self.etcd_backend._on_run_with_lock()
Expand Down Expand Up @@ -309,6 +339,9 @@ def _run_etcd_and_mirror_or_fallback(self) -> None:
def _on_rollingops_etcd_failed(self, event: RollingOpsEtcdFailedEvent) -> None:
"""Fall back to peer when the etcd worker reports a fatal failure."""
logger.warning('Received %s.', ETCD_FAILED_HOOK_NAME)
if self._peer_relation is None:
logger.info('Peer relation does not exists. Cannot fallback.')
return
if self._backend_state.is_etcd_managed():
# No need to stop the background process. This hook means that it stopped.
self._backend_state.fallback_to_peer()
Expand Down Expand Up @@ -364,7 +397,7 @@ def acquire_sync_lock(self, backend_id: str, timeout: int):
times out.
RollingOpsSyncLockError: if there is an error when acquiring the lock.
"""
if self.etcd_backend.is_available():
if self.etcd_backend is not None and self.etcd_backend.is_available():
logger.info('Acquiring sync lock on etcd.')
try:
self.etcd_backend.acquire_sync_lock(timeout)
Expand Down Expand Up @@ -423,15 +456,15 @@ def state(self) -> RollingOpsState:
"""
if self._peer_relation is None:
return RollingOpsState(
status=RollingOpsStatus.UNAVAILABLE,
status=RollingOpsStatus.NOT_READY,
processing_backend=ProcessingBackend.PEER,
operations=OperationQueue(),
)

status = self.peer_backend.get_status()
if self._backend_state.is_etcd_managed():
if self.etcd_backend is not None and self._backend_state.is_etcd_managed():
status = self.etcd_backend.get_status()
if status == RollingOpsStatus.UNAVAILABLE:
if status == RollingOpsStatus.NOT_READY:
logger.info('etcd backend is not available. Falling back to peer backend.')
self._fallback_current_unit_to_peer()
status = self.peer_backend.get_status()
Expand All @@ -443,11 +476,34 @@ def state(self) -> RollingOpsState:
operations=operations.queue,
)

def is_waiting(self, callback_id: str, kwargs: dict[str, Any] | None = None) -> bool:
Comment thread
Gu1nness marked this conversation as resolved.
"""Return whether the current unit has a pending operation matching callback and kwargs."""
if self._peer_relation is None:
return False

operations = PeerUnitOperations(
self.model,
self.peer_relation_name,
self.model.unit,
).queue.operations

kwargs = kwargs or {}

try:
check_operation = Operation.create(callback_id=callback_id, kwargs=kwargs)
except ValidationError:
return False

return any(op == check_operation for op in operations)

def _on_update_status(self, event: EventBase) -> None:
"""Periodic reconciliation of rolling-ops state."""
logger.info('Received a update-status event.')
if self._peer_relation is None:
logger.info('Peer relation does not exists. Cannot update status.')
return
if self._backend_state.is_etcd_managed():
if not self.etcd_backend.is_available():
if self.etcd_backend is None or not self.etcd_backend.is_available():
logger.warning('etcd unavailable during update_status; falling back.')
self._fallback_current_unit_to_peer()
return
Expand Down
17 changes: 15 additions & 2 deletions rollingops/src/charmlibs/rollingops/common/_base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,28 @@ class BaseRollingOpsAsyncWorker(Object):
_pid_field: str
_log_filename: str

def __init__(self, charm: CharmBase, handle_name: str, peer_relation_name: str):
def __init__(
self,
charm: CharmBase,
handle_name: str,
peer_relation_name: str,
base_dir: pathops.LocalPath,
):
"""Initialize the base rolling-ops worker helper.

Args:
charm: The charm instance managing the worker process.
handle_name: Framework handle name used for this worker object.
peer_relation_name: Name of the peer relation used by subclasses
to store and retrieve worker state.
base_dir: base directory used for logs in the background process.
"""
super().__init__(charm, handle_name)
self._charm = charm
self._charm_dir = charm.charm_dir
self._peer_relation_name = peer_relation_name
self._handle_name = handle_name
self._base_dir = base_dir

@property
def _relation(self) -> Relation | None:
Expand Down Expand Up @@ -207,12 +215,17 @@ def start(self) -> None:
worker = self._worker_script_path()
env = self._build_env()

with open(f'{self._log_filename}', 'a') as log_out:
with_pebble_retry(lambda: self._base_dir.mkdir(parents=True, exist_ok=True))

log_file = self._base_dir / self._log_filename
with open(log_file, 'a') as log_out:
pid = subprocess.Popen(
[
'/usr/bin/python3',
'-u',
str(worker),
'--base-dir',
self._base_dir,
'--unit-name',
self.model.unit.name,
'--charm-dir',
Expand Down
27 changes: 11 additions & 16 deletions rollingops/src/charmlibs/rollingops/common/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,20 @@ class RollingOpsStatus(StrEnum):

States:

- UNAVAILABLE:
- NOT_READY:
Rolling-ops cannot be used on this unit. This typically occurs when
required relations are missing or the selected backend is not reachable.
* peer backend: peer relation does not exist
* etcd backend: peer or etcd relation missing, or etcd not reachable
* peer backend: peer relation does not exist
* etcd backend: peer or etcd relation missing, or etcd not reachable

- WAITING:
The unit has pending operations but does not currently hold the lock.
- WAITING: The unit has pending operations but does not currently hold the lock.

- GRANTED:
The unit currently holds the lock and may execute operations.
- GRANTED: The unit currently holds the lock and may execute operations.

- IDLE:
The unit has no pending operations and is not holding the lock.
- IDLE: The unit has no pending operations and is not holding the lock.
"""

UNAVAILABLE = 'unavailable'
NOT_READY = 'not-ready'
WAITING = 'waiting'
GRANTED = 'granted'
IDLE = 'idle'
Expand Down Expand Up @@ -452,14 +449,12 @@ class RollingOpsState:
to peer).
The `operations` queue always reflects the peer-backed state, which
acts as the source of truth and fallback mechanism.
When `status` is UNAVAILABLE, the unit cannot currently participate
When `status` is NOT_READY, the unit cannot currently participate
in rolling operations due to missing relations or backend failures.

Attributes:
status: High-level rolling-ops status for the unit.
processing_backend: Backend currently responsible for executing
operations (e.g. ETCD or PEER).
operations: The unit's operation queue.
status: High-level rolling-ops status for the unit.
processing_backend: Backend currently responsible for executing operations (e.g. ETCD or PEER).
operations: The unit's operation queue.
"""

status: RollingOpsStatus
Expand Down
8 changes: 6 additions & 2 deletions rollingops/src/charmlibs/rollingops/common/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ops import pebble
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed

from charmlibs import pathops
from charmlibs.pathops import PebbleConnectionError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -61,7 +62,8 @@ def datetime_to_str(dt: datetime) -> str:


def setup_logging(
log_file: str,
base_dir: pathops.LocalPath,
log_filename: str,
*,
unit_name: str,
cluster_id: str | None = None,
Expand All @@ -76,11 +78,13 @@ def setup_logging(
This functions is used in the context of the background process.

Args:
log_file: Path to the log file where logs should be written.
base_dir: base directory used to write the rollingops files
log_filename: name of the file where logs should be written.
unit_name: Juju unit name associated with the background process.
cluster_id: Optional etcd cluster identifier.
owner: Optional worker owner identifier.
"""
log_file = base_dir / log_filename
handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10 MB
Expand Down
Loading
Loading