diff --git a/rollingops/src/charmlibs/rollingops/_rollingops_manager.py b/rollingops/src/charmlibs/rollingops/_rollingops_manager.py index dbd34d1a7..02837b099 100644 --- a/rollingops/src/charmlibs/rollingops/_rollingops_manager.py +++ b/rollingops/src/charmlibs/rollingops/_rollingops_manager.py @@ -20,7 +20,9 @@ from ops import CharmBase, Object, Relation, RelationBrokenEvent from ops.framework import EventBase +from pydantic import ValidationError +from charmlibs import pathops from charmlibs.rollingops.common._exceptions import ( RollingOpsDecodingError, RollingOpsInvalidLockRequestError, @@ -66,11 +68,12 @@ class RollingOpsManager(Object): def __init__( self, charm: CharmBase, - peer_relation_name: str, - etcd_relation_name: str, - cluster_id: str, callback_targets: dict[str, Any], + peer_relation_name: str, + etcd_relation_name: str | None = None, + cluster_id: str | None = None, sync_lock_targets: dict[str, type[SyncLockBackend]] | None = None, + base_dir: pathops.LocalPath | None = None, ): """Create a rolling operations manager with etcd and peer backends. @@ -86,19 +89,24 @@ def __init__( Args: charm: The charm instance owning this manager. + callback_targets: Mapping of callback identifiers to callables + executed when queued operations are granted the lock. peer_relation_name: Name of the peer relation used for fallback state and operation mirroring. etcd_relation_name: Name of the relation providing etcd access. + If not provided, only peer backend is used. cluster_id: Identifier used to scope etcd-backed state for this rolling-ops instance. - callback_targets: Mapping of callback identifiers to callables - executed when queued operations are granted the lock. sync_lock_targets: Optional mapping of sync lock backend identifiers to backend implementations used when acquiring synchronous locks through the peer fallback path. + base_dir: base directory where all files related to rollingops will be written. """ super().__init__(charm, 'rolling-ops-manager') + if base_dir is None: + base_dir = pathops.LocalPath('/var/lib/rollingops') + self.charm = charm self.peer_relation_name = peer_relation_name self.etcd_relation_name = etcd_relation_name @@ -110,17 +118,23 @@ def __init__( charm=charm, relation_name=peer_relation_name, callback_targets=callback_targets, + base_dir=base_dir, ) - self.etcd_backend = EtcdRollingOpsBackend( - charm=charm, - peer_relation_name=peer_relation_name, - etcd_relation_name=etcd_relation_name, - cluster_id=cluster_id, - callback_targets=callback_targets, - ) - self.framework.observe( - charm.on[self.etcd_relation_name].relation_broken, self._on_etcd_relation_broken - ) + self.etcd_backend: EtcdRollingOpsBackend | None = None + if etcd_relation_name and cluster_id: + self.etcd_backend = EtcdRollingOpsBackend( + charm=charm, + peer_relation_name=peer_relation_name, + etcd_relation_name=etcd_relation_name, + cluster_id=cluster_id, + callback_targets=callback_targets, + base_dir=base_dir, + ) + self.framework.observe( + charm.on[etcd_relation_name].relation_broken, + self._on_etcd_relation_broken, + ) + self.framework.observe(charm.on.rollingops_lock_granted, self._on_rollingops_lock_granted) self.framework.observe(charm.on.rollingops_etcd_failed, self._on_rollingops_etcd_failed) self.framework.observe(charm.on.update_status, self._on_update_status) @@ -158,6 +172,10 @@ def _select_processing_backend(self) -> ProcessingBackend: Returns: The selected processing backend. """ + if self.etcd_backend is None: + logger.info('etcd backend not configured; selecting peer backend.') + return ProcessingBackend.PEER + if not self.etcd_backend.is_available(): logger.info('etcd backend unavailable; selecting peer backend.') return ProcessingBackend.PEER @@ -182,8 +200,12 @@ def _fallback_current_unit_to_peer(self) -> None: It is used when etcd becomes unavailable, unhealthy, or inconsistent, so that queued operations can continue without being lost. """ + if self._peer_relation is None: + logger.info('Peer relation does not exists. Cannot fallback.') + return self._backend_state.fallback_to_peer() - self.etcd_backend.worker.stop() + if self.etcd_backend is not None: + self.etcd_backend.worker.stop() self.peer_backend.ensure_processing() def request_async_lock( @@ -218,7 +240,7 @@ def request_async_lock( if callback_id not in self.peer_backend.callback_targets: raise RollingOpsInvalidLockRequestError(f'Unknown callback_id: {callback_id}') - if not self._peer_relation: + if self._peer_relation is None: raise RollingOpsNoRelationError('No %s peer relation yet.', self.peer_relation_name) if kwargs is None: @@ -240,7 +262,7 @@ def request_async_lock( 'Failed to persists operation in peer backend.' ) from e - if backend == ProcessingBackend.ETCD: + if backend == ProcessingBackend.ETCD and self.etcd_backend is not None: try: self.etcd_backend.enqueue_operation(operation) except Exception as e: @@ -250,7 +272,7 @@ def request_async_lock( ) backend = ProcessingBackend.PEER - if backend == ProcessingBackend.ETCD: + if backend == ProcessingBackend.ETCD and self.etcd_backend is not None: self.etcd_backend.ensure_processing() else: self._fallback_current_unit_to_peer() @@ -264,6 +286,9 @@ def _on_rollingops_lock_granted(self, event: RollingOpsLockGrantedEvent) -> None If the current unit is etcd-managed, the operation is executed through the etcd backend. """ + if self._peer_relation is None: + logger.error('Peer relation does not exists. Cannot run lock granted.') + return if self._backend_state.is_peer_managed(): logger.info('Executing rollingop on peer backend.') self.peer_backend._on_rollingops_lock_granted(event) @@ -280,6 +305,11 @@ def _run_etcd_and_mirror_or_fallback(self) -> None: If etcd execution fails or mirrored state becomes inconsistent, the manager falls back to the peer backend and resumes processing there. """ + if self.etcd_backend is None: + logger.info('etcd backend not configured; using peer backend.') + self._fallback_current_unit_to_peer() + return + try: logger.info('Executing rollingop on etcd backend.') outcome = self.etcd_backend._on_run_with_lock() @@ -309,6 +339,9 @@ def _run_etcd_and_mirror_or_fallback(self) -> None: def _on_rollingops_etcd_failed(self, event: RollingOpsEtcdFailedEvent) -> None: """Fall back to peer when the etcd worker reports a fatal failure.""" logger.warning('Received %s.', ETCD_FAILED_HOOK_NAME) + if self._peer_relation is None: + logger.info('Peer relation does not exists. Cannot fallback.') + return if self._backend_state.is_etcd_managed(): # No need to stop the background process. This hook means that it stopped. self._backend_state.fallback_to_peer() @@ -364,7 +397,7 @@ def acquire_sync_lock(self, backend_id: str, timeout: int): times out. RollingOpsSyncLockError: if there is an error when acquiring the lock. """ - if self.etcd_backend.is_available(): + if self.etcd_backend is not None and self.etcd_backend.is_available(): logger.info('Acquiring sync lock on etcd.') try: self.etcd_backend.acquire_sync_lock(timeout) @@ -423,15 +456,15 @@ def state(self) -> RollingOpsState: """ if self._peer_relation is None: return RollingOpsState( - status=RollingOpsStatus.UNAVAILABLE, + status=RollingOpsStatus.NOT_READY, processing_backend=ProcessingBackend.PEER, operations=OperationQueue(), ) status = self.peer_backend.get_status() - if self._backend_state.is_etcd_managed(): + if self.etcd_backend is not None and self._backend_state.is_etcd_managed(): status = self.etcd_backend.get_status() - if status == RollingOpsStatus.UNAVAILABLE: + if status == RollingOpsStatus.NOT_READY: logger.info('etcd backend is not available. Falling back to peer backend.') self._fallback_current_unit_to_peer() status = self.peer_backend.get_status() @@ -443,11 +476,34 @@ def state(self) -> RollingOpsState: operations=operations.queue, ) + def is_waiting(self, callback_id: str, kwargs: dict[str, Any] | None = None) -> bool: + """Return whether the current unit has a pending operation matching callback and kwargs.""" + if self._peer_relation is None: + return False + + operations = PeerUnitOperations( + self.model, + self.peer_relation_name, + self.model.unit, + ).queue.operations + + kwargs = kwargs or {} + + try: + check_operation = Operation.create(callback_id=callback_id, kwargs=kwargs) + except ValidationError: + return False + + return any(op == check_operation for op in operations) + def _on_update_status(self, event: EventBase) -> None: """Periodic reconciliation of rolling-ops state.""" logger.info('Received a update-status event.') + if self._peer_relation is None: + logger.info('Peer relation does not exists. Cannot update status.') + return if self._backend_state.is_etcd_managed(): - if not self.etcd_backend.is_available(): + if self.etcd_backend is None or not self.etcd_backend.is_available(): logger.warning('etcd unavailable during update_status; falling back.') self._fallback_current_unit_to_peer() return diff --git a/rollingops/src/charmlibs/rollingops/common/_base_worker.py b/rollingops/src/charmlibs/rollingops/common/_base_worker.py index c444b344d..ede0b1262 100644 --- a/rollingops/src/charmlibs/rollingops/common/_base_worker.py +++ b/rollingops/src/charmlibs/rollingops/common/_base_worker.py @@ -48,7 +48,13 @@ class BaseRollingOpsAsyncWorker(Object): _pid_field: str _log_filename: str - def __init__(self, charm: CharmBase, handle_name: str, peer_relation_name: str): + def __init__( + self, + charm: CharmBase, + handle_name: str, + peer_relation_name: str, + base_dir: pathops.LocalPath, + ): """Initialize the base rolling-ops worker helper. Args: @@ -56,12 +62,14 @@ def __init__(self, charm: CharmBase, handle_name: str, peer_relation_name: str): handle_name: Framework handle name used for this worker object. peer_relation_name: Name of the peer relation used by subclasses to store and retrieve worker state. + base_dir: base directory used for logs in the background process. """ super().__init__(charm, handle_name) self._charm = charm self._charm_dir = charm.charm_dir self._peer_relation_name = peer_relation_name self._handle_name = handle_name + self._base_dir = base_dir @property def _relation(self) -> Relation | None: @@ -207,12 +215,17 @@ def start(self) -> None: worker = self._worker_script_path() env = self._build_env() - with open(f'{self._log_filename}', 'a') as log_out: + with_pebble_retry(lambda: self._base_dir.mkdir(parents=True, exist_ok=True)) + + log_file = self._base_dir / self._log_filename + with open(log_file, 'a') as log_out: pid = subprocess.Popen( [ '/usr/bin/python3', '-u', str(worker), + '--base-dir', + self._base_dir, '--unit-name', self.model.unit.name, '--charm-dir', diff --git a/rollingops/src/charmlibs/rollingops/common/_models.py b/rollingops/src/charmlibs/rollingops/common/_models.py index 7b15a1fd0..cdb760d26 100644 --- a/rollingops/src/charmlibs/rollingops/common/_models.py +++ b/rollingops/src/charmlibs/rollingops/common/_models.py @@ -96,23 +96,20 @@ class RollingOpsStatus(StrEnum): States: - - UNAVAILABLE: + - NOT_READY: Rolling-ops cannot be used on this unit. This typically occurs when required relations are missing or the selected backend is not reachable. - * peer backend: peer relation does not exist - * etcd backend: peer or etcd relation missing, or etcd not reachable + * peer backend: peer relation does not exist + * etcd backend: peer or etcd relation missing, or etcd not reachable - - WAITING: - The unit has pending operations but does not currently hold the lock. + - WAITING: The unit has pending operations but does not currently hold the lock. - - GRANTED: - The unit currently holds the lock and may execute operations. + - GRANTED: The unit currently holds the lock and may execute operations. - - IDLE: - The unit has no pending operations and is not holding the lock. + - IDLE: The unit has no pending operations and is not holding the lock. """ - UNAVAILABLE = 'unavailable' + NOT_READY = 'not-ready' WAITING = 'waiting' GRANTED = 'granted' IDLE = 'idle' @@ -452,14 +449,12 @@ class RollingOpsState: to peer). The `operations` queue always reflects the peer-backed state, which acts as the source of truth and fallback mechanism. - When `status` is UNAVAILABLE, the unit cannot currently participate + When `status` is NOT_READY, the unit cannot currently participate in rolling operations due to missing relations or backend failures. - Attributes: - status: High-level rolling-ops status for the unit. - processing_backend: Backend currently responsible for executing - operations (e.g. ETCD or PEER). - operations: The unit's operation queue. + status: High-level rolling-ops status for the unit. + processing_backend: Backend currently responsible for executing operations (e.g. ETCD or PEER). + operations: The unit's operation queue. """ status: RollingOpsStatus diff --git a/rollingops/src/charmlibs/rollingops/common/_utils.py b/rollingops/src/charmlibs/rollingops/common/_utils.py index dfdddb221..79cb39b5f 100644 --- a/rollingops/src/charmlibs/rollingops/common/_utils.py +++ b/rollingops/src/charmlibs/rollingops/common/_utils.py @@ -24,6 +24,7 @@ from ops import pebble from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed +from charmlibs import pathops from charmlibs.pathops import PebbleConnectionError logger = logging.getLogger(__name__) @@ -61,7 +62,8 @@ def datetime_to_str(dt: datetime) -> str: def setup_logging( - log_file: str, + base_dir: pathops.LocalPath, + log_filename: str, *, unit_name: str, cluster_id: str | None = None, @@ -76,11 +78,13 @@ def setup_logging( This functions is used in the context of the background process. Args: - log_file: Path to the log file where logs should be written. + base_dir: base directory used to write the rollingops files + log_filename: name of the file where logs should be written. unit_name: Juju unit name associated with the background process. cluster_id: Optional etcd cluster identifier. owner: Optional worker owner identifier. """ + log_file = base_dir / log_filename handler = RotatingFileHandler( log_file, maxBytes=10 * 1024 * 1024, # 10 MB diff --git a/rollingops/src/charmlibs/rollingops/etcd/_backend.py b/rollingops/src/charmlibs/rollingops/etcd/_backend.py index ebc616d97..ff267e8af 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_backend.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_backend.py @@ -23,6 +23,7 @@ RelationDepartedEvent, ) +from charmlibs import pathops from charmlibs.rollingops.common._exceptions import ( RollingOpsInvalidLockRequestError, RollingOpsNoEtcdRelationError, @@ -36,8 +37,8 @@ RunWithLockStatus, UnitBackendState, ) -from charmlibs.rollingops.etcd import _etcdctl as etcdctl from charmlibs.rollingops.etcd._etcd import EtcdLease, EtcdLock, ManagerOperationStore +from charmlibs.rollingops.etcd._etcdctl import ETCDCTL_CMD, Etcdctl from charmlibs.rollingops.etcd._models import RollingOpsKeys from charmlibs.rollingops.etcd._relations import EtcdRequiresV1, SharedClientCertificateManager from charmlibs.rollingops.etcd._worker import EtcdRollingOpsAsyncWorker @@ -63,6 +64,7 @@ def __init__( etcd_relation_name: str, cluster_id: str, callback_targets: dict[str, Any], + base_dir: pathops.LocalPath, ): """Initialize the etcd-backed rolling-ops backend. @@ -75,22 +77,31 @@ def __init__( instance. callback_targets: Mapping from callback identifiers to callables executed when an operation is granted the asynchronous lock. + base_dir: base directory where all files related to rollingops will be written. """ super().__init__(charm, 'etcd-rolling-ops-manager') self._charm = charm self.peer_relation_name = peer_relation_name self.etcd_relation_name = etcd_relation_name self.callback_targets = callback_targets + self._base_dir = base_dir + + self.etcdctl = Etcdctl(self._base_dir) owner = f'{self.model.uuid}-{self.model.unit.name}'.replace('/', '-') self.worker = EtcdRollingOpsAsyncWorker( - charm, peer_relation_name=peer_relation_name, owner=owner, cluster_id=cluster_id + charm, + peer_relation_name=peer_relation_name, + owner=owner, + cluster_id=cluster_id, + base_dir=self._base_dir, ) self.keys = RollingOpsKeys.for_owner(cluster_id=cluster_id, owner=owner) self.shared_certificates = SharedClientCertificateManager( charm, peer_relation_name=peer_relation_name, + base_dir=self._base_dir, ) self.etcd = EtcdRequiresV1( @@ -98,11 +109,16 @@ def __init__( relation_name=etcd_relation_name, cluster_id=self.keys.cluster_prefix, shared_certificates=self.shared_certificates, + base_dir=self._base_dir, + ) + self._async_lock = EtcdLock( + lock_key=self.keys.lock_key, owner=owner, base_dir=self._base_dir + ) + self._sync_lock = EtcdLock( + lock_key=self.keys.lock_key, owner=f'{owner}:sync', base_dir=self._base_dir ) - self._async_lock = EtcdLock(lock_key=self.keys.lock_key, owner=owner) - self._sync_lock = EtcdLock(lock_key=self.keys.lock_key, owner=f'{owner}:sync') self._lease: EtcdLease | None = None - self.operations_store = ManagerOperationStore(self.keys, owner) + self.operations_store = ManagerOperationStore(self.keys, owner, base_dir=self._base_dir) self.framework.observe( charm.on[self.peer_relation_name].relation_departed, self._on_peer_relation_departed @@ -133,7 +149,7 @@ def is_available(self) -> bool: if self._etcd_relation is None: return False try: - etcdctl.ensure_initialized() + self.etcdctl.ensure_initialized() except Exception: return False return True @@ -159,7 +175,7 @@ def enqueue_operation(self, operation: Operation) -> None: if self._etcd_relation is None: raise RollingOpsNoEtcdRelationError - etcdctl.ensure_initialized() + self.etcdctl.ensure_initialized() backend_state = UnitBackendState(self.model, self.peer_relation_name, self.model.unit) if backend_state.cleanup_needed: @@ -186,8 +202,8 @@ def _on_etcd_relation_created(self, event: RelationCreatedEvent) -> None: Args: event: The relation-created event for the etcd relation. """ - if not etcdctl.is_etcdctl_installed(): - logger.error('%s is not installed.', etcdctl.ETCDCTL_CMD) + if not self.etcdctl.is_etcdctl_installed(): + logger.error('%s is not installed.', ETCDCTL_CMD) def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: """Handle removal of a unit from the peer relation. @@ -240,7 +256,7 @@ def request_async_lock( if not self._etcd_relation: raise RollingOpsNoEtcdRelationError - etcdctl.ensure_initialized() + self.etcdctl.ensure_initialized() if kwargs is None: kwargs = {} @@ -337,7 +353,7 @@ def acquire_sync_lock(self, timeout: int | None) -> None: TimeoutError: If the lock could not be acquired before the timeout. RollingOpsSyncLockError: if there was an error obtaining the lock. """ - self._lease = EtcdLease() + self._lease = EtcdLease(self._base_dir) deadline = None if timeout is None else time.monotonic() + timeout @@ -380,7 +396,7 @@ def get_status(self) -> RollingOpsStatus: unit's queued operation state. Returned values: - - UNAVAILABLE: etcd backend is not available + - NOT_READY: etcd backend is not available - GRANTED: the async lock is currently held by this unit - WAITING: this unit has queued work but does not hold the lock - IDLE: this unit has no pending work @@ -389,7 +405,7 @@ def get_status(self) -> RollingOpsStatus: The current rolling-ops status for this unit. """ if self._peer_relation is None or self._etcd_relation is None or not self.is_available(): - return RollingOpsStatus.UNAVAILABLE + return RollingOpsStatus.NOT_READY if self._async_lock.is_held(): return RollingOpsStatus.GRANTED diff --git a/rollingops/src/charmlibs/rollingops/etcd/_certificates.py b/rollingops/src/charmlibs/rollingops/etcd/_certificates.py index 3c53bd939..2619d22b1 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_certificates.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_certificates.py @@ -38,136 +38,138 @@ from charmlibs.rollingops.common._utils import with_pebble_retry from charmlibs.rollingops.etcd._models import SharedCertificate -BASE_DIR = pathops.LocalPath('/var/lib/rollingops/tls') -CA_CERT_PATH = BASE_DIR / 'client-ca.pem' -CLIENT_KEY_PATH = BASE_DIR / 'client.key' -CLIENT_CERT_PATH = BASE_DIR / 'client.pem' VALIDITY_DAYS = 365 * 50 KEY_SIZE = 4096 -def persist_client_cert_key_and_ca(shared: SharedCertificate) -> None: - """Persist the provided client certificate, key, and CA to disk. - - Raises: - PebbleConnectionError: if the remote container cannot be reached - RollingOpsFileSystemError: if there is a problem when writing the certificates - """ - if _has_client_cert_key_and_ca(shared): - return - try: - with_pebble_retry(lambda: BASE_DIR.mkdir(parents=True, exist_ok=True)) - shared.write_to_paths(CLIENT_CERT_PATH, CLIENT_KEY_PATH, CA_CERT_PATH) - - except (FileNotFoundError, LookupError, NotADirectoryError, PermissionError) as e: - raise RollingOpsFileSystemError('Failed to persist client certificates and key.') from e - - -def _has_client_cert_key_and_ca(shared: SharedCertificate) -> bool: - """Return whether the provided certificate material matches local files. - - Raises: - PebbleConnectionError: if the remote container cannot be reached - RollingOpsFileSystemError: if there is a problem when writing the certificates - """ - if not _exists(): - return False - try: - stored = SharedCertificate.from_paths( - CLIENT_CERT_PATH, - CLIENT_KEY_PATH, - CA_CERT_PATH, +class CertificateStore: + def __init__(self, base_dir: pathops.LocalPath): + self.base_dir = base_dir / 'tls' + self.cert_path = self.base_dir / 'client.pem' + self.key_path = self.base_dir / 'client.key' + self.ca_path = self.base_dir / 'client-ca.pem' + + def persist_client_cert_key_and_ca(self, shared: SharedCertificate) -> None: + """Persist the provided client certificate, key, and CA to disk. + + Raises: + PebbleConnectionError: if the remote container cannot be reached + RollingOpsFileSystemError: if there is a problem when writing the certificates + """ + if self._has_client_cert_key_and_ca(shared): + return + try: + with_pebble_retry(lambda: self.base_dir.mkdir(parents=True, exist_ok=True)) + shared.write_to_paths(self.cert_path, self.key_path, self.ca_path) + + except (FileNotFoundError, LookupError, NotADirectoryError, PermissionError) as e: + raise RollingOpsFileSystemError( + 'Failed to persist client certificates and key.' + ) from e + + def _has_client_cert_key_and_ca(self, shared: SharedCertificate) -> bool: + """Return whether the provided certificate material matches local files. + + Raises: + PebbleConnectionError: if the remote container cannot be reached + RollingOpsFileSystemError: if there is a problem when writing the certificates + """ + if not self._exists(): + return False + try: + stored = SharedCertificate.from_paths( + self.cert_path, + self.key_path, + self.ca_path, + ) + return stored == shared + + except ( + FileNotFoundError, + IsADirectoryError, + PermissionError, + TLSCertificatesError, + ValueError, + ) as e: + raise RollingOpsFileSystemError('Failed to read certificates and key.') from e + + def generate(self, model_uuid: str, app_name: str) -> SharedCertificate: + """Generate a client CA and client certificate if they do not exist. + + This method creates: + 1. A CA private key and self-signed CA certificate. + 2. A client private key. + 3. A certificate signing request (CSR) using the provided common name. + 4. A client certificate signed by the generated CA. + + The generated files are written to disk and reused in future runs. + If the certificates already exist, this method does nothing. + + Args: + model_uuid: string used to build the common name. + app_name: string used to build the common name. + + Raises: + PebbleConnectionError: if the remote container cannot be reached + RollingOpsFileSystemError: if there is a problem when writing the certificates + """ + if self._exists(): + return SharedCertificate.from_paths( + self.cert_path, + self.key_path, + self.ca_path, + ) + + # Produce a unique <=64-character string + raw = f'{model_uuid}-{app_name}' + common_name = shortuuid.uuid(name=raw) + ca_key = PrivateKey.generate(key_size=KEY_SIZE) + ca_attributes = CertificateRequestAttributes( + common_name=common_name, + is_ca=True, + add_unique_id_to_subject_name=False, ) - return stored == shared - - except ( - FileNotFoundError, - IsADirectoryError, - PermissionError, - TLSCertificatesError, - ValueError, - ) as e: - raise RollingOpsFileSystemError('Failed to read certificates and key.') from e - - -def generate(model_uuid: str, app_name: str) -> SharedCertificate: - """Generate a client CA and client certificate if they do not exist. - - This method creates: - 1. A CA private key and self-signed CA certificate. - 2. A client private key. - 3. A certificate signing request (CSR) using the provided common name. - 4. A client certificate signed by the generated CA. - - The generated files are written to disk and reused in future runs. - If the certificates already exist, this method does nothing. - - Args: - model_uuid: string used to build the common name. - app_name: string used to build the common name. - - Raises: - PebbleConnectionError: if the remote container cannot be reached - RollingOpsFileSystemError: if there is a problem when writing the certificates - """ - if _exists(): - return SharedCertificate.from_paths( - CLIENT_CERT_PATH, - CLIENT_KEY_PATH, - CA_CERT_PATH, + ca_crt = Certificate.generate_self_signed_ca( + attributes=ca_attributes, + private_key=ca_key, + validity=timedelta(days=VALIDITY_DAYS), ) - # Produce a unique <=64-character string - raw = f'{model_uuid}-{app_name}' - common_name = shortuuid.uuid(name=raw) - ca_key = PrivateKey.generate(key_size=KEY_SIZE) - ca_attributes = CertificateRequestAttributes( - common_name=common_name, - is_ca=True, - add_unique_id_to_subject_name=False, - ) - ca_crt = Certificate.generate_self_signed_ca( - attributes=ca_attributes, - private_key=ca_key, - validity=timedelta(days=VALIDITY_DAYS), - ) - - client_key = PrivateKey.generate(key_size=KEY_SIZE) - - csr_attributes = CertificateRequestAttributes( - common_name=common_name, add_unique_id_to_subject_name=False - ) - csr = CertificateSigningRequest.generate( - attributes=csr_attributes, - private_key=client_key, - ) - - client_crt = Certificate.generate( - csr=csr, - ca=ca_crt, - ca_private_key=ca_key, - validity=timedelta(days=VALIDITY_DAYS), - is_ca=False, - ) - - shared = SharedCertificate( - certificate=client_crt, - key=client_key, - ca=ca_crt, - ) - - persist_client_cert_key_and_ca(shared) - return shared - - -def _exists() -> bool: - """Check whether the client certificates and CA certificate already exist. - - Raises: - PebbleConnectionError: if the remote container cannot be reached - """ - return ( - with_pebble_retry(lambda: CA_CERT_PATH.exists()) - and with_pebble_retry(lambda: CLIENT_KEY_PATH.exists()) - and with_pebble_retry(lambda: CLIENT_CERT_PATH.exists()) - ) + client_key = PrivateKey.generate(key_size=KEY_SIZE) + + csr_attributes = CertificateRequestAttributes( + common_name=common_name, add_unique_id_to_subject_name=False + ) + csr = CertificateSigningRequest.generate( + attributes=csr_attributes, + private_key=client_key, + ) + + client_crt = Certificate.generate( + csr=csr, + ca=ca_crt, + ca_private_key=ca_key, + validity=timedelta(days=VALIDITY_DAYS), + is_ca=False, + ) + + shared = SharedCertificate( + certificate=client_crt, + key=client_key, + ca=ca_crt, + ) + + self.persist_client_cert_key_and_ca(shared) + return shared + + def _exists(self) -> bool: + """Check whether the client certificates and CA certificate already exist. + + Raises: + PebbleConnectionError: if the remote container cannot be reached + """ + return ( + with_pebble_retry(lambda: self.ca_path.exists()) + and with_pebble_retry(lambda: self.key_path.exists()) + and with_pebble_retry(lambda: self.cert_path.exists()) + ) diff --git a/rollingops/src/charmlibs/rollingops/etcd/_etcd.py b/rollingops/src/charmlibs/rollingops/etcd/_etcd.py index 5ce1aef7f..97dd308b7 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_etcd.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_etcd.py @@ -19,13 +19,14 @@ import subprocess import time -import charmlibs.rollingops.etcd._etcdctl as etcdctl +from charmlibs import pathops from charmlibs.rollingops.common._exceptions import ( RollingOpsEtcdctlFatalError, RollingOpsEtcdctlParseError, RollingOpsEtcdTransactionError, ) from charmlibs.rollingops.common._models import Operation, OperationResult +from charmlibs.rollingops.etcd._etcdctl import ETCDCTL_CMD, Etcdctl from charmlibs.rollingops.etcd._models import RollingOpsKeys logger = logging.getLogger(__name__) @@ -36,14 +37,15 @@ class EtcdLease: """Manage the lifecycle of an etcd lease and its keep-alive process.""" - def __init__(self): + def __init__(self, base_dir: pathops.LocalPath): self.id: str | None = None self.keepalive_proc: subprocess.Popen[str] | None = None self._pipe_write_fd: int | None = None + self._etcdctl = Etcdctl(base_dir) def grant(self) -> None: """Create a new lease and start the keep-alive process.""" - res = etcdctl.run('lease', 'grant', LOCK_LEASE_TTL) + res = self._etcdctl.run('lease', 'grant', LOCK_LEASE_TTL) # parse: "lease 694d9c9aeca3422a granted with TTL(60s)" parts = res.split() try: @@ -61,7 +63,7 @@ def revoke(self) -> None: lease_id = self.id try: if self.id is not None: - etcdctl.run('lease', 'revoke', self.id) + self._etcdctl.run('lease', 'revoke', self.id) except Exception: logger.exception('Fail to revoke lease %s.', lease_id) raise @@ -79,19 +81,19 @@ def _start_lease_keepalive(self) -> None: if lease_id is None: logger.info('Lease ID is None. Keepalive for this lease cannot be started.') return - etcdctl.ensure_initialized() + self._etcdctl.ensure_initialized() pipe_read_fd, pipe_write_fd = os.pipe() self._pipe_write_fd = pipe_write_fd - keep_alive_cmd = f'{etcdctl.ETCDCTL_CMD} lease keep-alive {lease_id} /dev/null; wait' # noqa: E501 + keep_alive_cmd = f'{ETCDCTL_CMD} lease keep-alive {lease_id} /dev/null; wait' # noqa: E501 try: self.keepalive_proc = subprocess.Popen( ['bash', '-c', keep_alive_cmd], # The pipe read side becomes the child's stdin # so when the parent closes its write side, this stdin gets EOF stdin=pipe_read_fd, - env=etcdctl.load_env(), + env=self._etcdctl.load_env(), text=True, close_fds=True, preexec_fn=self._close_write_side_in_child, @@ -154,9 +156,10 @@ class EtcdLock: automatically released if the owner stops refreshing the lease. """ - def __init__(self, lock_key: str, owner: str): + def __init__(self, lock_key: str, owner: str, base_dir: pathops.LocalPath): self.lock_key = lock_key self.owner = owner + self._etcdctl = Etcdctl(base_dir) def try_acquire(self, lease_id: str) -> bool: """Attempt to acquire the lock. @@ -181,7 +184,7 @@ def try_acquire(self, lease_id: str) -> bool: """ - return etcdctl.txn(txn) + return self._etcdctl.txn(txn) def release(self) -> None: """Release the lock if it is currently held by this owner. @@ -200,13 +203,13 @@ def release(self) -> None: """ - etcdctl.txn(txn) + self._etcdctl.txn(txn) def is_held(self) -> bool: """Check whether the lock is currently held by the owner.""" if not self.lock_key or not self.owner: raise RollingOpsEtcdctlFatalError('Invalid input for check lock ownership operation.') - res = etcdctl.run('get', self.lock_key, '--print-value-only') + res = self._etcdctl.run('get', self.lock_key, '--print-value-only') return res == self.owner @@ -219,21 +222,22 @@ class EtcdOperationQueue: the value contains the serialized operation data. """ - def __init__(self, prefix: str, lock_key: str, owner: str): + def __init__(self, prefix: str, lock_key: str, owner: str, base_dir: pathops.LocalPath): self.prefix = prefix self.lock_key = lock_key self.owner = owner + self._etcdctl = Etcdctl(base_dir) def peek(self) -> Operation | None: """Return the first operation in the queue without removing it.""" - kv = etcdctl.get_first_key_value_pair(self.prefix) + kv = self._etcdctl.get_first_key_value_pair(self.prefix) if kv is None: return None return Operation.model_validate(kv.value) def _peek_last(self) -> Operation | None: """Return the last operation in the queue without removing it.""" - kv = etcdctl.get_last_key_value_pair(self.prefix) + kv = self._etcdctl.get_last_key_value_pair(self.prefix) if kv is None: return None return Operation.model_validate(kv.value) @@ -252,7 +256,7 @@ def move_head(self, to_queue_prefix: str) -> bool: Returns: True if the operation was moved successfully, otherwise False. """ - kv = etcdctl.get_first_key_value_pair(self.prefix) + kv = self._etcdctl.get_first_key_value_pair(self.prefix) if kv is None: return False @@ -270,7 +274,7 @@ def move_head(self, to_queue_prefix: str) -> bool: """ - return etcdctl.txn(txn) + return self._etcdctl.txn(txn) def move_operation(self, to_queue_prefix: str, operation: Operation) -> bool: """Move a specific operation from this queue to another queue. @@ -299,12 +303,12 @@ def move_operation(self, to_queue_prefix: str, operation: Operation) -> bool: """ - return etcdctl.txn(txn) + return self._etcdctl.txn(txn) def watch(self) -> Operation: """Block until at least one operation exists and return it.""" while True: - kv = etcdctl.get_first_key_value_pair(self.prefix) + kv = self._etcdctl.get_first_key_value_pair(self.prefix) if kv is not None: return Operation.model_validate(kv.value) time.sleep(10) @@ -318,7 +322,7 @@ def dequeue(self) -> bool: Returns: True if the operation was removed successfully, otherwise False. """ - kv = etcdctl.get_first_key_value_pair(self.prefix) + kv = self._etcdctl.get_first_key_value_pair(self.prefix) if kv is None: return False @@ -330,7 +334,7 @@ def dequeue(self) -> bool: """ - return etcdctl.txn(txn) + return self._etcdctl.txn(txn) def enqueue(self, operation: Operation) -> None: """Insert a new operation into the queue. @@ -353,11 +357,11 @@ def enqueue(self, operation: Operation) -> None: op_str = operation.to_string() key = f'{self.prefix}{operation.op_id}' - etcdctl.run('put', key, cmd_input=op_str) + self._etcdctl.run('put', key, cmd_input=op_str) logger.info('Operation %s added to the etcd queue.', operation.callback_id) def clear(self) -> None: - etcdctl.run('del', self.prefix, '--prefix') + self._etcdctl.run('del', self.prefix, '--prefix') class WorkerOperationStore: @@ -378,10 +382,14 @@ class WorkerOperationStore: - requeue or delete completed operations """ - def __init__(self, keys: RollingOpsKeys, owner: str): - self._pending = EtcdOperationQueue(keys.pending, keys.lock_key, owner) - self._inprogress = EtcdOperationQueue(keys.inprogress, keys.lock_key, owner) - self._completed = EtcdOperationQueue(keys.completed, keys.lock_key, owner) + def __init__(self, keys: RollingOpsKeys, owner: str, base_dir: pathops.LocalPath): + self._pending = EtcdOperationQueue(keys.pending, keys.lock_key, owner, base_dir=base_dir) + self._inprogress = EtcdOperationQueue( + keys.inprogress, keys.lock_key, owner, base_dir=base_dir + ) + self._completed = EtcdOperationQueue( + keys.completed, keys.lock_key, owner, base_dir=base_dir + ) def has_pending(self) -> bool: """Check whether there are pending operations. @@ -475,10 +483,14 @@ class ManagerOperationStore: Queue transitions and storage details remain encapsulated behind this API. """ - def __init__(self, keys: RollingOpsKeys, owner: str): - self._pending = EtcdOperationQueue(keys.pending, keys.lock_key, owner) - self._inprogress = EtcdOperationQueue(keys.inprogress, keys.lock_key, owner) - self._completed = EtcdOperationQueue(keys.completed, keys.lock_key, owner) + def __init__(self, keys: RollingOpsKeys, owner: str, base_dir: pathops.LocalPath): + self._pending = EtcdOperationQueue(keys.pending, keys.lock_key, owner, base_dir=base_dir) + self._inprogress = EtcdOperationQueue( + keys.inprogress, keys.lock_key, owner, base_dir=base_dir + ) + self._completed = EtcdOperationQueue( + keys.completed, keys.lock_key, owner, base_dir=base_dir + ) def request(self, operation: Operation) -> None: """Add a new operation to the pending queue. diff --git a/rollingops/src/charmlibs/rollingops/etcd/_etcdctl.py b/rollingops/src/charmlibs/rollingops/etcd/_etcdctl.py index e5eb4f1c3..375d7575e 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_etcdctl.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_etcdctl.py @@ -25,7 +25,6 @@ import shutil import subprocess from dataclasses import asdict -from functools import lru_cache from tenacity import ( before_sleep_log, @@ -48,360 +47,358 @@ logger = logging.getLogger(__name__) -BASE_DIR = pathops.LocalPath('/var/lib/rollingops/etcd') -SERVER_CA_PATH = BASE_DIR / 'server-ca.pem' -CONFIG_FILE_PATH = BASE_DIR / 'etcdctl.json' ETCDCTL_CMD = 'etcdctl' ETCDCTL_TIMEOUT_SECONDS = 15 ETCDCTL_RETRY_ATTEMPTS = 12 ETCDCTL_RETRY_WAIT_SECONDS = 5 -@lru_cache(maxsize=1) -def is_etcdctl_installed() -> bool: - """Return whether the snap-provided etcdctl command is available.""" - return shutil.which(ETCDCTL_CMD) is not None - - -def write_trusted_server_ca(tls_ca_pem: str) -> None: - """Persist the etcd server CA certificate to disk. - - Args: - tls_ca_pem: PEM-encoded CA certificate. - - Raises: - PebbleConnectionError: if the remote container cannot be reached - RollingOpsFileSystemError: if there is a problem when writing the certificates - """ - try: - with_pebble_retry(lambda: BASE_DIR.mkdir(parents=True, exist_ok=True)) - with_pebble_retry(lambda: SERVER_CA_PATH.write_text(tls_ca_pem, mode=CERT_MODE)) - except (FileNotFoundError, LookupError, NotADirectoryError, PermissionError) as e: - raise RollingOpsFileSystemError('Failed to persist etcd trusted CA certificate.') from e - - -def write_config_file( - endpoints: str, - client_cert_path: pathops.LocalPath, - client_key_path: pathops.LocalPath, -) -> None: - """Create or update the etcdctl configuration JSON file. - - This function writes a JSON file containing the required ETCDCTL_* - variables used by etcdctl to connect to the etcd cluster. - - Args: - endpoints: Comma-separated list of etcd endpoints. - client_cert_path: Path to the client certificate. - client_key_path: Path to the client private key. - - Raises: - PebbleConnectionError: if the remote container cannot be reached - RollingOpsFileSystemError: if there is a problem when writing the certificates - """ - config = EtcdConfig( - endpoints=endpoints, - cacert_path=str(SERVER_CA_PATH), - cert_path=str(client_cert_path), - key_path=str(client_key_path), - ) - - try: - with_pebble_retry(lambda: BASE_DIR.mkdir(parents=True, exist_ok=True)) - with_pebble_retry( - lambda: CONFIG_FILE_PATH.write_text(json.dumps(asdict(config), indent=2), mode=0o600) +class Etcdctl: + def __init__(self, base_dir: pathops.LocalPath): + self.base_dir = base_dir / 'etcd' + self.server_ca_path = self.base_dir / 'server-ca.pem' + self.config_file_path = self.base_dir / 'etcdctl.json' + + def is_etcdctl_installed(self) -> bool: + """Return whether the snap-provided etcdctl command is available.""" + return shutil.which(ETCDCTL_CMD) is not None + + def write_trusted_server_ca(self, tls_ca_pem: str) -> None: + """Persist the etcd server CA certificate to disk. + + Args: + tls_ca_pem: PEM-encoded CA certificate. + + Raises: + PebbleConnectionError: if the remote container cannot be reached + RollingOpsFileSystemError: if there is a problem when writing the certificates + """ + try: + with_pebble_retry(lambda: self.base_dir.mkdir(parents=True, exist_ok=True)) + with_pebble_retry(lambda: self.server_ca_path.write_text(tls_ca_pem, mode=CERT_MODE)) + except (FileNotFoundError, LookupError, NotADirectoryError, PermissionError) as e: + raise RollingOpsFileSystemError( + 'Failed to persist etcd trusted CA certificate.' + ) from e + + def write_config_file( + self, + endpoints: str, + client_cert_path: pathops.LocalPath, + client_key_path: pathops.LocalPath, + ) -> None: + """Create or update the etcdctl configuration JSON file. + + This function writes a JSON file containing the required ETCDCTL_* + variables used by etcdctl to connect to the etcd cluster. + + Args: + endpoints: Comma-separated list of etcd endpoints. + client_cert_path: Path to the client certificate. + client_key_path: Path to the client private key. + + Raises: + PebbleConnectionError: if the remote container cannot be reached + RollingOpsFileSystemError: if there is a problem when writing the certificates + """ + config = EtcdConfig( + endpoints=endpoints, + cacert_path=str(self.server_ca_path), + cert_path=str(client_cert_path), + key_path=str(client_key_path), ) - except (FileNotFoundError, LookupError, NotADirectoryError, PermissionError) as e: - raise RollingOpsFileSystemError('Failed to persist etcd config file.') from e - - -def _load_config() -> EtcdConfig: - """Load etcd configuration from disk. - - Raises: - RollingOpsEtcdNotConfiguredError: If the config file does not exist. - RollingOpsFileSystemError: if we faile to read the etcd configuration file or - file cannot be deserialized. - PebbleConnectionError: if the remote container cannot be reached - """ - if not with_pebble_retry(lambda: CONFIG_FILE_PATH.exists()): - raise RollingOpsEtcdNotConfiguredError( - f'etcdctl config file does not exist: {CONFIG_FILE_PATH}' - ) - - try: - data = json.loads(CONFIG_FILE_PATH.read_text()) - return EtcdConfig(**data) - except FileNotFoundError as e: - raise RollingOpsEtcdNotConfiguredError('etcd configuration file not found.') from e - except (IsADirectoryError, PermissionError) as e: - raise RollingOpsFileSystemError('Failed to read the etcd config file.') from e - except (json.JSONDecodeError, TypeError) as e: - raise RollingOpsFileSystemError('Invalid etcd configuration file format.') from e - - -def load_env() -> dict[str, str]: - """Return environment variables for etcdctl. - - Returns: A dictionary containing environment variables to pass to subprocess calls. - - Raises: - RollingOpsEtcdNotConfiguredError: If the environment file does not exist. - RollingOpsFileSystemError: if we fail to read the etcd configuration file or - the file cannot be deserialized. - PebbleConnectionError: if the remote container cannot be reached - """ - config = _load_config() - - env = os.environ.copy() - env.update({ - 'ETCDCTL_API': '3', - 'ETCDCTL_ENDPOINTS': config.endpoints, - 'ETCDCTL_CACERT': config.cacert_path, - 'ETCDCTL_CERT': config.cert_path, - 'ETCDCTL_KEY': config.key_path, - }) - return env - - -def ensure_initialized(): - """Checks whether the etcd config file for etcdctl is setup. - - Raises: - RollingOpsEtcdNotConfiguredError: if the etcd config file does not exist, etcd - server CA does not exist or etcdctl is not installed. - PebbleConnectionError: if the remote container cannot be reached. - """ - if not with_pebble_retry(lambda: CONFIG_FILE_PATH.exists()): - raise RollingOpsEtcdNotConfiguredError( - f'etcdctl config file does not exist: {CONFIG_FILE_PATH}' - ) - if not with_pebble_retry(lambda: SERVER_CA_PATH.exists()): - raise RollingOpsEtcdNotConfiguredError( - f'etcdctl server CA file does not exist: {SERVER_CA_PATH}' - ) - if not is_etcdctl_installed(): - raise RollingOpsEtcdNotConfiguredError(f'{ETCDCTL_CMD} is not installed.') - - -def cleanup() -> None: - """Removes the etcdctl env file and the trusted etcd server CA. - - Raises: - RollingOpsFileSystemError: if there is a problem when deleting the files. - PebbleConnectionError: if the remote container cannot be reached. - """ - try: - with_pebble_retry(lambda: SERVER_CA_PATH.unlink(missing_ok=True)) - with_pebble_retry(lambda: CONFIG_FILE_PATH.unlink(missing_ok=True)) - except (IsADirectoryError, PermissionError) as e: - raise RollingOpsFileSystemError('Failed to remove etcd config file and CA.') from e - - -def _is_retryable_stderr(stderr: str) -> bool: - """Return whether stderr looks like a transient etcd/client failure.""" - text = stderr.lower() - retryable_markers = ( - 'connection refused', - 'context deadline exceeded', - 'deadline exceeded', - 'temporarily unavailable', - 'transport is closing', - 'connection reset', - 'broken pipe', - 'unavailable', - 'leader changed', - 'etcdserver: request timed out', - ) - return any(marker in text for marker in retryable_markers) + try: + with_pebble_retry(lambda: self.base_dir.mkdir(parents=True, exist_ok=True)) + with_pebble_retry( + lambda: self.config_file_path.write_text( + json.dumps(asdict(config), indent=2), mode=0o600 + ) + ) + except (FileNotFoundError, LookupError, NotADirectoryError, PermissionError) as e: + raise RollingOpsFileSystemError('Failed to persist etcd config file.') from e + + def _load_config(self) -> EtcdConfig: + """Load etcd configuration from disk. + + Raises: + RollingOpsEtcdNotConfiguredError: If the config file does not exist. + RollingOpsFileSystemError: if we faile to read the etcd configuration file or + file cannot be deserialized. + PebbleConnectionError: if the remote container cannot be reached + """ + if not with_pebble_retry(lambda: self.config_file_path.exists()): + raise RollingOpsEtcdNotConfiguredError( + f'etcdctl config file does not exist: {self.config_file_path}' + ) -@retry( - retry=retry_if_exception_type(RollingOpsEtcdctlRetryableError), - stop=stop_after_attempt(ETCDCTL_RETRY_ATTEMPTS), - wait=wait_fixed(ETCDCTL_RETRY_WAIT_SECONDS), - before_sleep=before_sleep_log(logger, logging.WARNING), - reraise=True, -) -def _run_checked(*args: str, cmd_input: str | None = None) -> subprocess.CompletedProcess[str]: - """Execute etcdctl and return the completed process. - - Raises: - RollingOpsEtcdNotConfiguredError: if etcdctl is not configured. - PebbleConnectionError: if the remote container cannot be reached. - RollingOpsEtcdctlRetryableError: for transient command failures. - RollingOpsEtcdctlFatalError: for non-retryable command failures. - """ - ensure_initialized() - - cmd = [ETCDCTL_CMD, *args] - - try: - res = subprocess.run( - cmd, - env=load_env(), - input=cmd_input, - text=True, - capture_output=True, - check=False, - timeout=ETCDCTL_TIMEOUT_SECONDS, - ) - except subprocess.TimeoutExpired as e: - logger.warning( - 'Timed out running etcdctl: cmd=%r stdout=%r stderr=%r', cmd, e.stdout, e.stderr - ) - raise RollingOpsEtcdctlRetryableError(f'Timed out running etcdctl: {cmd!r}') from e - except FileNotFoundError as e: - logger.exception('etcdctl executable not found: %s', ETCDCTL_CMD) - raise RollingOpsEtcdctlFatalError(f'etcdctl executable not found: {ETCDCTL_CMD}') from e - except OSError as e: - logger.exception('Failed to execute etcdctl: cmd=%r', cmd) - raise RollingOpsEtcdctlFatalError(f'Failed to execute etcdctl: {cmd!r}') from e - - if res.returncode != 0: - logger.warning( - 'etcdctl command failed: cmd=%r returncode=%s stdout=%r stderr=%r', - cmd, - res.returncode, - res.stdout, - res.stderr, - ) - if _is_retryable_stderr(res.stderr): - raise RollingOpsEtcdctlRetryableError( - f'Retryable etcdctl failure (rc={res.returncode}): {res.stderr.strip()}' + try: + data = json.loads(self.config_file_path.read_text()) + return EtcdConfig(**data) + except FileNotFoundError as e: + raise RollingOpsEtcdNotConfiguredError('etcd configuration file not found.') from e + except (IsADirectoryError, PermissionError) as e: + raise RollingOpsFileSystemError('Failed to read the etcd config file.') from e + except (json.JSONDecodeError, TypeError) as e: + raise RollingOpsFileSystemError('Invalid etcd configuration file format.') from e + + def load_env(self) -> dict[str, str]: + """Return environment variables for etcdctl. + + Returns: A dictionary containing environment variables to pass to subprocess calls. + + Raises: + RollingOpsEtcdNotConfiguredError: If the environment file does not exist. + RollingOpsFileSystemError: if we fail to read the etcd configuration file or + the file cannot be deserialized. + PebbleConnectionError: if the remote container cannot be reached + """ + config = self._load_config() + + env = os.environ.copy() + env.update({ + 'ETCDCTL_API': '3', + 'ETCDCTL_ENDPOINTS': config.endpoints, + 'ETCDCTL_CACERT': config.cacert_path, + 'ETCDCTL_CERT': config.cert_path, + 'ETCDCTL_KEY': config.key_path, + }) + return env + + def ensure_initialized(self): + """Checks whether the etcd config file for etcdctl is setup. + + Raises: + RollingOpsEtcdNotConfiguredError: if the etcd config file does not exist, etcd + server CA does not exist or etcdctl is not installed. + PebbleConnectionError: if the remote container cannot be reached. + """ + if not with_pebble_retry(lambda: self.config_file_path.exists()): + raise RollingOpsEtcdNotConfiguredError( + f'etcdctl config file does not exist: {self.config_file_path}' ) - raise RollingOpsEtcdctlFatalError( - f'etcdctl failed (rc={res.returncode}): {res.stderr.strip()}' + if not with_pebble_retry(lambda: self.server_ca_path.exists()): + raise RollingOpsEtcdNotConfiguredError( + f'etcdctl server CA file does not exist: {self.server_ca_path}' + ) + if not self.is_etcdctl_installed(): + raise RollingOpsEtcdNotConfiguredError(f'{ETCDCTL_CMD} is not installed.') + + def cleanup(self) -> None: + """Removes the etcdctl env file and the trusted etcd server CA. + + Raises: + RollingOpsFileSystemError: if there is a problem when deleting the files. + PebbleConnectionError: if the remote container cannot be reached. + """ + try: + with_pebble_retry(lambda: self.server_ca_path.unlink(missing_ok=True)) + with_pebble_retry(lambda: self.config_file_path.unlink(missing_ok=True)) + except (IsADirectoryError, PermissionError) as e: + raise RollingOpsFileSystemError('Failed to remove etcd config file and CA.') from e + + def _is_retryable_stderr(self, stderr: str) -> bool: + """Return whether stderr looks like a transient etcd/client failure.""" + text = stderr.lower() + retryable_markers = ( + 'connection refused', + 'context deadline exceeded', + 'deadline exceeded', + 'temporarily unavailable', + 'transport is closing', + 'connection reset', + 'broken pipe', + 'unavailable', + 'leader changed', + 'etcdserver: request timed out', ) + return any(marker in text for marker in retryable_markers) + + @retry( + retry=retry_if_exception_type(RollingOpsEtcdctlRetryableError), + stop=stop_after_attempt(ETCDCTL_RETRY_ATTEMPTS), + wait=wait_fixed(ETCDCTL_RETRY_WAIT_SECONDS), + before_sleep=before_sleep_log(logger, logging.WARNING), + reraise=True, + ) + def _run_checked( + self, *args: str, cmd_input: str | None = None + ) -> subprocess.CompletedProcess[str]: + """Execute etcdctl and return the completed process. + + Raises: + RollingOpsEtcdNotConfiguredError: if etcdctl is not configured. + PebbleConnectionError: if the remote container cannot be reached. + RollingOpsEtcdctlRetryableError: for transient command failures. + RollingOpsEtcdctlFatalError: for non-retryable command failures. + """ + self.ensure_initialized() + + cmd = [ETCDCTL_CMD, *args] + + try: + res = subprocess.run( + cmd, + env=self.load_env(), + input=cmd_input, + text=True, + capture_output=True, + check=False, + timeout=ETCDCTL_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired as e: + logger.warning( + 'Timed out running etcdctl: cmd=%r stdout=%r stderr=%r', cmd, e.stdout, e.stderr + ) + raise RollingOpsEtcdctlRetryableError(f'Timed out running etcdctl: {cmd!r}') from e + except FileNotFoundError as e: + logger.exception('etcdctl executable not found: %s', ETCDCTL_CMD) + raise RollingOpsEtcdctlFatalError( + f'etcdctl executable not found: {ETCDCTL_CMD}' + ) from e + except OSError as e: + logger.exception('Failed to execute etcdctl: cmd=%r', cmd) + raise RollingOpsEtcdctlFatalError(f'Failed to execute etcdctl: {cmd!r}') from e + + if res.returncode != 0: + logger.warning( + 'etcdctl command failed: cmd=%r returncode=%s stdout=%r stderr=%r', + cmd, + res.returncode, + res.stdout, + res.stderr, + ) + if self._is_retryable_stderr(res.stderr): + raise RollingOpsEtcdctlRetryableError( + f'Retryable etcdctl failure (rc={res.returncode}): {res.stderr.strip()}' + ) + raise RollingOpsEtcdctlFatalError( + f'etcdctl failed (rc={res.returncode}): {res.stderr.strip()}' + ) - logger.debug('etcdctl command succeeded: cmd=%r stdout=%r', cmd, res.stdout) - return res - - -def run(*args: str, cmd_input: str | None = None) -> str: - """Execute an etcdctl command. - - Args: - args: List of arguments to pass to etcdctl. - cmd_input: value to use as input when running the command. - - Returns: - The stdout of the command, stripped, or None if execution failed. - - Raises: - RollingOpsEtcdNotConfiguredError: if etcdctl is not configured. - RollingOpsFileSystemError: if configuration cannot be read. - PebbleConnectionError: if the remote container cannot be reached. - RollingOpsEtcdctlError: etcdctl command error. - """ - return _run_checked(*args, cmd_input=cmd_input).stdout.strip() + logger.debug('etcdctl command succeeded: cmd=%r stdout=%r', cmd, res.stdout) + return res + def run(self, *args: str, cmd_input: str | None = None) -> str: + """Execute an etcdctl command. -def _get_key_value_pair(key_prefix: str, *extra_args: str) -> EtcdKV | None: - """Retrieve the first key and value under a given prefix. + Args: + args: List of arguments to pass to etcdctl. + cmd_input: value to use as input when running the command. - Args: - key_prefix: Key prefix to search for. - extra_args: Arguments to the get command + Returns: + The stdout of the command, stripped, or None if execution failed. - Returns: - A EtcdKV containing: - - The key string - - The parsed JSON value as a dictionary + Raises: + RollingOpsEtcdNotConfiguredError: if etcdctl is not configured. + RollingOpsFileSystemError: if configuration cannot be read. + PebbleConnectionError: if the remote container cannot be reached. + RollingOpsEtcdctlError: etcdctl command error. + """ + return self._run_checked(*args, cmd_input=cmd_input).stdout.strip() - Returns None if no key exists. + def _get_key_value_pair(self, key_prefix: str, *extra_args: str) -> EtcdKV | None: + """Retrieve the first key and value under a given prefix. - Raises: - RollingOpsEtcdctlParseError: if the output is malformed + Args: + key_prefix: Key prefix to search for. + extra_args: Arguments to the get command - """ - res = run('get', key_prefix, '--prefix', *extra_args) - out = res.splitlines() - if len(out) < 2: - return None + Returns: + A EtcdKV containing: + - The key string + - The parsed JSON value as a dictionary - try: - value = json.loads(out[1]) - except json.JSONDecodeError as e: - raise RollingOpsEtcdctlParseError( - f'Failed to parse JSON value for key {out[0]}: {out[1]}' - ) from e + Returns None if no key exists. - return EtcdKV(key=out[0], value=value) + Raises: + RollingOpsEtcdctlParseError: if the output is malformed + """ + res = self.run('get', key_prefix, '--prefix', *extra_args) + out = res.splitlines() + if len(out) < 2: + return None -def get_first_key_value_pair(key_prefix: str) -> EtcdKV | None: - """Retrieve the first key and value under a given prefix. + try: + value = json.loads(out[1]) + except json.JSONDecodeError as e: + raise RollingOpsEtcdctlParseError( + f'Failed to parse JSON value for key {out[0]}: {out[1]}' + ) from e - Args: - key_prefix: Key prefix to search for. + return EtcdKV(key=out[0], value=value) - Returns: - A tuple containing: - - The key string - - The parsed JSON value as a dictionary + def get_first_key_value_pair(self, key_prefix: str) -> EtcdKV | None: + """Retrieve the first key and value under a given prefix. - Returns None if no key exists or the command fails. + Args: + key_prefix: Key prefix to search for. - Raises: - RollingOpsEtcdctlParseError: if the output is malformed - """ - return _get_key_value_pair(key_prefix, '--limit=1') + Returns: + A tuple containing: + - The key string + - The parsed JSON value as a dictionary + Returns None if no key exists or the command fails. -def get_last_key_value_pair(key_prefix: str) -> EtcdKV | None: - """Retrieve the last key and value under a given prefix. + Raises: + RollingOpsEtcdctlParseError: if the output is malformed + """ + return self._get_key_value_pair(key_prefix, '--limit=1') - Args: - key_prefix: Key prefix to search for. + def get_last_key_value_pair(self, key_prefix: str) -> EtcdKV | None: + """Retrieve the last key and value under a given prefix. - Returns: - A tuple containing: - - The key string - - The parsed JSON value as a dictionary + Args: + key_prefix: Key prefix to search for. - Returns None if no key exists or the command fails. + Returns: + A tuple containing: + - The key string + - The parsed JSON value as a dictionary - Raises: - RollingOpsEtcdctlParseError: if the output is malformed - """ - return _get_key_value_pair( - key_prefix, - '--sort-by=KEY', - '--order=DESCEND', - '--limit=1', - ) + Returns None if no key exists or the command fails. + Raises: + RollingOpsEtcdctlParseError: if the output is malformed + """ + return self._get_key_value_pair( + key_prefix, + '--sort-by=KEY', + '--order=DESCEND', + '--limit=1', + ) -def txn(txn_input: str) -> bool: - """Execute an etcd transaction. + def txn(self, txn_input: str) -> bool: + """Execute an etcd transaction. - The transaction string should follow the etcdctl transaction format - where comparison statements are followed by operations. + The transaction string should follow the etcdctl transaction format + where comparison statements are followed by operations. - Args: - txn_input: The transaction specification passed to `etcdctl txn`. + Args: + txn_input: The transaction specification passed to `etcdctl txn`. - Returns: - True if the transaction succeeded, otherwise False. + Returns: + True if the transaction succeeded, otherwise False. - Raises: - RollingOpsEtcdNotConfiguredError: if etcdctl is not configured. - PebbleConnectionError: if the remote container cannot be reached. - RollingOpsEtcdctlError: etcdctl command error. - RollingOpsEtcdctlParseError: if invalid response is found - """ - res = _run_checked('txn', cmd_input=txn_input) + Raises: + RollingOpsEtcdNotConfiguredError: if etcdctl is not configured. + PebbleConnectionError: if the remote container cannot be reached. + RollingOpsEtcdctlError: etcdctl command error. + RollingOpsEtcdctlParseError: if invalid response is found + """ + res = self._run_checked('txn', cmd_input=txn_input) - lines = res.stdout.splitlines() - if not lines: - raise RollingOpsEtcdctlParseError('Empty txn response') + lines = res.stdout.splitlines() + if not lines: + raise RollingOpsEtcdctlParseError('Empty txn response') - first_line = lines[0].strip() + first_line = lines[0].strip() - if first_line == 'SUCCESS': - return True - if first_line == 'FAILURE': - return False + if first_line == 'SUCCESS': + return True + if first_line == 'FAILURE': + return False - raise RollingOpsEtcdctlParseError(f'Unexpected txn response: {res.stdout}') + raise RollingOpsEtcdctlParseError(f'Unexpected txn response: {res.stdout}') diff --git a/rollingops/src/charmlibs/rollingops/etcd/_relations.py b/rollingops/src/charmlibs/rollingops/etcd/_relations.py index 8d37f92d8..a41edf92c 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_relations.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_relations.py @@ -31,10 +31,11 @@ ) from ops.framework import Object +from charmlibs import pathops from charmlibs.interfaces.tls_certificates import Certificate, TLSCertificatesError from charmlibs.rollingops.common._exceptions import RollingOpsInvalidSecretContentError -from charmlibs.rollingops.etcd import _certificates as certificates -from charmlibs.rollingops.etcd import _etcdctl as etcdctl +from charmlibs.rollingops.etcd._certificates import CertificateStore +from charmlibs.rollingops.etcd._etcdctl import Etcdctl from charmlibs.rollingops.etcd._models import SharedCertificate logger = logging.getLogger(__name__) @@ -48,10 +49,13 @@ class SharedClientCertificateManager(Object): """Manage the shared rollingops client certificate via peer relation secret.""" - def __init__(self, charm: CharmBase, peer_relation_name: str) -> None: + def __init__( + self, charm: CharmBase, peer_relation_name: str, base_dir: pathops.LocalPath + ) -> None: super().__init__(charm, 'shared-client-certificate') self.charm = charm self.peer_relation_name = peer_relation_name + self.certificates_store = CertificateStore(base_dir) self.framework.observe(charm.on.leader_elected, self._on_leader_elected) self.framework.observe( @@ -112,7 +116,7 @@ def create_and_share_certificate(self) -> None: ) return - shared = certificates.generate(self.model.uuid, self.model.app.name) + shared = self.certificates_store.generate(self.model.uuid, self.model.app.name) secret = self.model.app.add_secret( content={ @@ -176,7 +180,7 @@ def sync_to_local_files(self) -> None: logger.info('Shared rollingops etcd client certificate is not available yet.') return - certificates.persist_client_cert_key_and_ca(shared) + self.certificates_store.persist_client_cert_key_and_ca(shared) def get_local_request_cert(self) -> Certificate | None: """Return the cert to place in relation requests.""" @@ -193,11 +197,14 @@ def __init__( relation_name: str, cluster_id: str, shared_certificates: SharedClientCertificateManager, + base_dir: pathops.LocalPath, ) -> None: super().__init__(charm, f'requirer-{relation_name}') self.charm = charm self.cluster_id = cluster_id self.shared_certificates = shared_certificates + self.certificates_store = CertificateStore(base_dir) + self.etcdctl = Etcdctl(base_dir) self.etcd_interface = ResourceRequirerEventHandler( self.charm, @@ -220,7 +227,7 @@ def etcd_relation(self) -> Relation | None: def _on_relation_broken(self, event: RelationBrokenEvent) -> None: """Remove the stored information about the etcd server.""" - etcdctl.cleanup() + self.etcdctl.cleanup() def _on_endpoints_changed( self, event: ResourceEndpointsChangedEvent[ResourceProviderModel] @@ -240,10 +247,10 @@ def _on_endpoints_changed( logger.info('etcd endpoints changed: %s', response.endpoints) - etcdctl.write_config_file( + self.etcdctl.write_config_file( endpoints=response.endpoints, - client_cert_path=certificates.CLIENT_CERT_PATH, - client_key_path=certificates.CLIENT_KEY_PATH, + client_cert_path=self.certificates_store.cert_path, + client_key_path=self.certificates_store.key_path, ) def _on_resource_created(self, event: ResourceCreatedEvent[ResourceProviderModel]) -> None: @@ -260,16 +267,16 @@ def _on_resource_created(self, event: ResourceCreatedEvent[ResourceProviderModel ) return - etcdctl.write_trusted_server_ca(tls_ca_pem=response.tls_ca) + self.etcdctl.write_trusted_server_ca(tls_ca_pem=response.tls_ca) if not response.endpoints: logger.error('Received a resource created event but no etcd endpoints available.') return - etcdctl.write_config_file( + self.etcdctl.write_config_file( endpoints=response.endpoints, - client_cert_path=certificates.CLIENT_CERT_PATH, - client_key_path=certificates.CLIENT_KEY_PATH, + client_cert_path=self.certificates_store.cert_path, + client_key_path=self.certificates_store.key_path, ) def client_requests(self) -> list[RequirerCommonModel]: diff --git a/rollingops/src/charmlibs/rollingops/etcd/_rollingops.py b/rollingops/src/charmlibs/rollingops/etcd/_rollingops.py index 9d45b183a..8237e0a05 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_rollingops.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_rollingops.py @@ -16,6 +16,7 @@ import logging import time +from charmlibs import pathops from charmlibs.rollingops.common._models import OperationResult from charmlibs.rollingops.common._utils import ( ETCD_FAILED_HOOK_NAME, @@ -64,6 +65,12 @@ def main(): before exiting. """ parser = argparse.ArgumentParser(description='RollingOps etcd worker') + parser.add_argument( + '--base-dir', + type=pathops.LocalPath, + required=True, + help='Base directory used to store all rollingops files.', + ) parser.add_argument( '--unit-name', type=str, @@ -91,17 +98,22 @@ def main(): ) args = parser.parse_args() + base_dir = args.base_dir setup_logging( - ETCD_LOG_FILENAME, unit_name=args.unit_name, owner=args.owner, cluster_id=args.cluster_id + base_dir=base_dir, + log_filename=ETCD_LOG_FILENAME, + unit_name=args.unit_name, + owner=args.owner, + cluster_id=args.cluster_id, ) logger.info('Starting worker.') time.sleep(INITIAL_SLEEP) keys = RollingOpsKeys.for_owner(args.cluster_id, args.owner) - lock = EtcdLock(keys.lock_key, args.owner) - lease = EtcdLease() - operations = WorkerOperationStore(keys, args.owner) + lock = EtcdLock(keys.lock_key, args.owner, base_dir) + lease = EtcdLease(base_dir) + operations = WorkerOperationStore(keys, args.owner, base_dir) try: while True: diff --git a/rollingops/src/charmlibs/rollingops/etcd/_worker.py b/rollingops/src/charmlibs/rollingops/etcd/_worker.py index 8240b4378..0ed52848b 100644 --- a/rollingops/src/charmlibs/rollingops/etcd/_worker.py +++ b/rollingops/src/charmlibs/rollingops/etcd/_worker.py @@ -23,7 +23,8 @@ logger = logging.getLogger(__name__) -ETCD_LOG_FILENAME = '/var/log/etcd_rollingops_worker.log' +ETCD_LOG_FILENAME = 'etcd_rollingops_worker.log' +WORKER_PID_FIELD = 'etcd-rollingops-worker-pid' class EtcdRollingOpsAsyncWorker(BaseRollingOpsAsyncWorker): @@ -35,11 +36,23 @@ class EtcdRollingOpsAsyncWorker(BaseRollingOpsAsyncWorker): manage its own worker lifecycle. """ - _pid_field = 'etcd-rollingops-worker-pid' + _pid_field = WORKER_PID_FIELD _log_filename = ETCD_LOG_FILENAME - def __init__(self, charm: CharmBase, peer_relation_name: str, owner: str, cluster_id: str): - super().__init__(charm, 'etcd-rollingops-async-worker', peer_relation_name) + def __init__( + self, + charm: CharmBase, + peer_relation_name: str, + owner: str, + cluster_id: str, + base_dir: pathops.LocalPath, + ): + super().__init__( + charm, + 'etcd-rollingops-async-worker', + peer_relation_name, + base_dir=base_dir, + ) self._owner = owner self._cluster_id = cluster_id diff --git a/rollingops/src/charmlibs/rollingops/peer/_backend.py b/rollingops/src/charmlibs/rollingops/peer/_backend.py index 310285c95..23f14def9 100644 --- a/rollingops/src/charmlibs/rollingops/peer/_backend.py +++ b/rollingops/src/charmlibs/rollingops/peer/_backend.py @@ -160,6 +160,7 @@ def _on_restart_action(self, event) -> None: ) from ops.framework import EventBase +from charmlibs import pathops from charmlibs.rollingops.common._exceptions import ( RollingOpsDecodingError, RollingOpsInvalidLockRequestError, @@ -197,7 +198,11 @@ class PeerRollingOpsBackend(Object): """ def __init__( - self, charm: CharmBase, relation_name: str, callback_targets: dict[str, Callable[..., Any]] + self, + charm: CharmBase, + relation_name: str, + callback_targets: dict[str, Callable[..., Any]], + base_dir: pathops.LocalPath, ): """Initialize the peer-backed rolling-ops backend. @@ -207,12 +212,15 @@ def __init__( operation state. callback_targets: Mapping from callback identifiers to callables executed when this unit is granted the lock. + base_dir: base directory where all files related to rollingops will be written. """ super().__init__(charm, 'peer-rolling-ops-manager') self._charm = charm self.relation_name = relation_name self.callback_targets = callback_targets - self.worker = PeerRollingOpsAsyncWorker(charm, relation_name=relation_name) + self.worker = PeerRollingOpsAsyncWorker( + charm, relation_name=relation_name, base_dir=base_dir + ) self.framework.observe( charm.on[self.relation_name].relation_changed, self._on_relation_changed @@ -571,7 +579,7 @@ def get_status(self) -> RollingOpsStatus: and from the shared peer lock state. Returned values: - - UNAVAILABLE: the peer relation does not exist + - NOT_READY: the peer relation does not exist - GRANTED: the current unit holds the peer lock - WAITING: the current unit has queued work but does not hold the lock - IDLE: the current unit has no pending work @@ -580,7 +588,7 @@ def get_status(self) -> RollingOpsStatus: The current rolling-ops status for this unit. """ if self._relation is None: - return RollingOpsStatus.UNAVAILABLE + return RollingOpsStatus.NOT_READY lock = self._lock() operations = self._operations(self.model.unit) diff --git a/rollingops/src/charmlibs/rollingops/peer/_rollingops.py b/rollingops/src/charmlibs/rollingops/peer/_rollingops.py index 8dae3c0f1..970fcf686 100644 --- a/rollingops/src/charmlibs/rollingops/peer/_rollingops.py +++ b/rollingops/src/charmlibs/rollingops/peer/_rollingops.py @@ -17,6 +17,7 @@ import argparse import time +from charmlibs import pathops from charmlibs.rollingops.common._utils import dispatch_lock_granted, setup_logging from charmlibs.rollingops.peer._worker import PEER_LOG_FILENAME @@ -24,6 +25,12 @@ def main(): """Juju hook event dispatcher.""" parser = argparse.ArgumentParser(description='RollingOps peer worker') + parser.add_argument( + '--base-dir', + type=pathops.LocalPath, + required=True, + help='Base directory used to store all rollingops files.', + ) parser.add_argument( '--unit-name', type=str, @@ -37,7 +44,8 @@ def main(): help='Path to the charm directory', ) args = parser.parse_args() - setup_logging(PEER_LOG_FILENAME, unit_name=args.unit_name) + base_dir = args.base_dir + setup_logging(base_dir=base_dir, log_filename=PEER_LOG_FILENAME, unit_name=args.unit_name) # Sleep so that the leader unit can properly leave the hook and start a new one time.sleep(10) diff --git a/rollingops/src/charmlibs/rollingops/peer/_worker.py b/rollingops/src/charmlibs/rollingops/peer/_worker.py index ca4da358c..430613627 100644 --- a/rollingops/src/charmlibs/rollingops/peer/_worker.py +++ b/rollingops/src/charmlibs/rollingops/peer/_worker.py @@ -26,7 +26,8 @@ logger = logging.getLogger(__name__) -PEER_LOG_FILENAME = '/var/log/peer_rollingops_worker.log' +PEER_LOG_FILENAME = 'peer_rollingops_worker.log' +WORKER_PID_FIELD = 'peer-rollingops-worker-pid' class PeerRollingOpsAsyncWorker(BaseRollingOpsAsyncWorker): @@ -38,11 +39,16 @@ class PeerRollingOpsAsyncWorker(BaseRollingOpsAsyncWorker): stop, or restart an existing worker process as needed. """ - _pid_field = 'peer-rollingops-worker-pid' + _pid_field = WORKER_PID_FIELD _log_filename = PEER_LOG_FILENAME - def __init__(self, charm: CharmBase, relation_name: str): - super().__init__(charm, 'peer-rollingops-async-worker', relation_name) + def __init__(self, charm: CharmBase, relation_name: str, base_dir: pathops.LocalPath): + super().__init__( + charm, + 'peer-rollingops-async-worker', + relation_name, + base_dir=base_dir, + ) @property def _app_data(self) -> RelationDataContent: diff --git a/rollingops/tests/integration/test_etcd_rolling_ops.py b/rollingops/tests/integration/test_etcd_rolling_ops.py index 4172d416e..9036ab4cf 100644 --- a/rollingops/tests/integration/test_etcd_rolling_ops.py +++ b/rollingops/tests/integration/test_etcd_rolling_ops.py @@ -31,8 +31,8 @@ logger = logging.getLogger(__name__) TIMEOUT = 15 * 60.0 -ETCD_PROCESS_LOGS = '/var/log/etcd_rollingops_worker.log' -PEER_PROCCES_LOGS = '/var/log/peer_rollingops_worker.log' +ETCD_PROCESS_LOGS = '/var/lib/rollingops/etcd_rollingops_worker.log' +PEER_PROCCES_LOGS = '/var/lib/rollingops/peer_rollingops_worker.log' ETCD_CONFIG_FILE = '/var/lib/rollingops/etcd/etcdctl.json' @@ -61,11 +61,7 @@ def test_charm_is_integrated_with_etcd(juju: jubilant.Juju, app_name: str): app='self-signed-certificates', channel='1/stable', ) - juju.deploy( - 'charmed-etcd', - app='etcd', - channel='3.6/stable', - ) + juju.deploy('charmed-etcd', app='etcd', channel='3.6/stable', num_units=3) juju.wait(jubilant.all_active, error=jubilant.any_error, timeout=TIMEOUT) juju.integrate( diff --git a/rollingops/tests/integration/test_peer_rolling_ops.py b/rollingops/tests/integration/test_peer_rolling_ops.py index dcd527b61..bc032b724 100644 --- a/rollingops/tests/integration/test_peer_rolling_ops.py +++ b/rollingops/tests/integration/test_peer_rolling_ops.py @@ -215,7 +215,7 @@ def test_retry_release_alternates_execution(juju: jubilant.Juju, app_name: str): juju.run(unit_a, 'failed-restart', {'delay': 10, 'max-retry': 2}, wait=TIMEOUT) juju.run(unit_b, 'failed-restart', {'delay': 1, 'max-retry': 2}, wait=TIMEOUT) - time.sleep(60) # wait for operation execution. TODO: in charm use lock state to clear status. + time.sleep(90) # wait for operation execution. TODO: in charm use lock state to clear status. all_events: list[dict[str, str]] = [] all_events.extend(get_unit_events(juju, unit_a)) diff --git a/rollingops/tests/unit/conftest.py b/rollingops/tests/unit/conftest.py index bb3c46f61..6be4dcfb5 100644 --- a/rollingops/tests/unit/conftest.py +++ b/rollingops/tests/unit/conftest.py @@ -14,7 +14,6 @@ """Fixtures for unit tests, typically mocking out parts of the external system.""" -import types from collections.abc import Generator from pathlib import Path from typing import Any @@ -27,11 +26,11 @@ import charmlibs.rollingops.etcd._certificates as certificates import charmlibs.rollingops.etcd._etcdctl as etcdctl +from charmlibs import pathops from charmlibs.interfaces.tls_certificates import ( Certificate, PrivateKey, ) -from charmlibs.pathops import LocalPath from charmlibs.rollingops import RollingOpsManager from charmlibs.rollingops.common._models import OperationResult from charmlibs.rollingops.etcd._models import SharedCertificate @@ -104,50 +103,30 @@ @pytest.fixture -def temp_certificates(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> types.ModuleType: - base_dir = LocalPath(str(tmp_path)) / 'tls' - ca_cert = base_dir / 'client-ca.pem' - client_key = base_dir / 'client.key' - client_cert = base_dir / 'client.pem' - - monkeypatch.setattr(certificates, 'BASE_DIR', base_dir) - monkeypatch.setattr(certificates, 'CA_CERT_PATH', ca_cert) - monkeypatch.setattr(certificates, 'CLIENT_KEY_PATH', client_key) - monkeypatch.setattr(certificates, 'CLIENT_CERT_PATH', client_cert) - - base_dir.mkdir(parents=True, exist_ok=True) - return certificates - - -@pytest.fixture -def temp_etcdctl(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> types.ModuleType: - base_dir = LocalPath(str(tmp_path)) / 'etcd' - server_ca = base_dir / 'server-ca.pem' - env_file = base_dir / 'etcdctl.json' - - monkeypatch.setattr(etcdctl, 'BASE_DIR', base_dir) - monkeypatch.setattr(etcdctl, 'SERVER_CA_PATH', server_ca) - monkeypatch.setattr(etcdctl, 'CONFIG_FILE_PATH', env_file) - - base_dir.mkdir(parents=True, exist_ok=True) - return etcdctl +def temp_certificates(tmp_path: Path) -> certificates.CertificateStore: + path = pathops.LocalPath(str(tmp_path)) + client = certificates.CertificateStore(path) + client.base_dir.mkdir(parents=True, exist_ok=True) + return client @pytest.fixture -def etcdctl_patch() -> Generator[MagicMock, None, None]: - with patch('charmlibs.rollingops.etcd._certificates') as mock_etcdctl: - yield mock_etcdctl +def temp_etcdctl(tmp_path: Path) -> etcdctl.Etcdctl: + path = pathops.LocalPath(str(tmp_path)) + client = etcdctl.Etcdctl(path) + client.base_dir.mkdir(parents=True, exist_ok=True) + return client @pytest.fixture def certificates_manager_patches() -> Generator[dict[str, MagicMock], None, None]: with ( patch( - 'charmlibs.rollingops.etcd._certificates._exists', + 'charmlibs.rollingops.etcd._certificates.CertificateStore._exists', return_value=False, ), patch( - 'charmlibs.rollingops.etcd._certificates.generate', + 'charmlibs.rollingops.etcd._certificates.CertificateStore.generate', return_value=SharedCertificate( certificate=Certificate.from_string(VALID_CLIENT_CERT_PEM), key=PrivateKey.from_string(VALID_CLIENT_KEY_PEM), @@ -155,13 +134,13 @@ def certificates_manager_patches() -> Generator[dict[str, MagicMock], None, None ), ) as mock_generate, patch( - 'charmlibs.rollingops.etcd._certificates.persist_client_cert_key_and_ca', + 'charmlibs.rollingops.etcd._certificates.CertificateStore.persist_client_cert_key_and_ca', return_value=None, - ) as mock_persit, + ) as mock_persist, ): yield { 'generate': mock_generate, - 'persist': mock_persit, + 'persist': mock_persist, } @@ -282,3 +261,26 @@ def charm_test() -> type[RollingOpsCharm]: @pytest.fixture def ctx(charm_test: type[RollingOpsCharm]) -> Context[RollingOpsCharm]: return Context(charm_test, meta=meta, actions=actions) + + +class StrictPeerRollingOpsCharm(ops.CharmBase): + def __init__(self, framework: ops.Framework): + super().__init__(framework) + + self.restart_manager = RollingOpsManager( + charm=self, + peer_relation_name='restart', + callback_targets={}, + ) + + +@pytest.fixture +def strict_peer_charm_test() -> type[StrictPeerRollingOpsCharm]: + return StrictPeerRollingOpsCharm + + +@pytest.fixture +def strict_peer_ctx( + charm_test: type[StrictPeerRollingOpsCharm], +) -> Context[StrictPeerRollingOpsCharm]: + return Context(charm_test, meta=meta, actions=actions) diff --git a/rollingops/tests/unit/test_etcd_certificates.py b/rollingops/tests/unit/test_etcd_certificates.py index 4984bb6db..e0218baa0 100644 --- a/rollingops/tests/unit/test_etcd_certificates.py +++ b/rollingops/tests/unit/test_etcd_certificates.py @@ -48,7 +48,7 @@ def test_certificates_manager_exists_returns_false_when_no_files( def test_certificates_manager_exists_returns_false_when_cert_does_not_exist( temp_certificates: Any, ) -> None: - temp_certificates.CLIENT_KEY_PATH.write_text('client-key') + temp_certificates.key_path.write_text('client-key') assert temp_certificates._exists() is False @@ -56,7 +56,7 @@ def test_certificates_manager_exists_returns_false_when_cert_does_not_exist( def test_certificates_manager_exists_returns_false_when_key_does_not_exist( temp_certificates: Any, ) -> None: - temp_certificates.CLIENT_CERT_PATH.write_text('client-cert') + temp_certificates.cert_path.write_text('client-cert') assert temp_certificates._exists() is False @@ -64,9 +64,9 @@ def test_certificates_manager_exists_returns_false_when_key_does_not_exist( def test_certificates_manager_exists_returns_true_when_all_files_exist( temp_certificates: Any, ) -> None: - temp_certificates.CLIENT_KEY_PATH.write_text('client-key') - temp_certificates.CLIENT_CERT_PATH.write_text('client-cert') - temp_certificates.CA_CERT_PATH.write_text('ca-cert') + temp_certificates.key_path.write_text('client-key') + temp_certificates.cert_path.write_text('client-cert') + temp_certificates.ca_path.write_text('ca-cert') assert temp_certificates._exists() is True @@ -77,9 +77,9 @@ def test_certificates_manager_persist_client_cert_and_key_writes_files( shared_certificate = make_shared_certificate() temp_certificates.persist_client_cert_key_and_ca(shared_certificate) - assert temp_certificates.CLIENT_CERT_PATH.read_text() == shared_certificate.certificate.raw - assert temp_certificates.CLIENT_KEY_PATH.read_text() == shared_certificate.key.raw - assert temp_certificates.CA_CERT_PATH.read_text() == shared_certificate.ca.raw + assert temp_certificates.cert_path.read_text() == shared_certificate.certificate.raw + assert temp_certificates.key_path.read_text() == shared_certificate.key.raw + assert temp_certificates.ca_path.read_text() == shared_certificate.ca.raw def test_certificates_manager_has_client_cert_and_key_returns_false_when_files_missing( @@ -92,9 +92,9 @@ def test_certificates_manager_has_client_cert_and_key_returns_false_when_files_m def test_certificates_manager_has_client_cert_and_key_returns_true_when_material_matches( temp_certificates: Any, ) -> None: - temp_certificates.CLIENT_CERT_PATH.write_text(VALID_CLIENT_CERT_PEM) - temp_certificates.CLIENT_KEY_PATH.write_text(VALID_CLIENT_KEY_PEM) - temp_certificates.CA_CERT_PATH.write_text(VALID_CA_CERT_PEM) + temp_certificates.cert_path.write_text(VALID_CLIENT_CERT_PEM) + temp_certificates.key_path.write_text(VALID_CLIENT_KEY_PEM) + temp_certificates.ca_path.write_text(VALID_CA_CERT_PEM) shared_certificate = make_shared_certificate() assert temp_certificates._has_client_cert_key_and_ca(shared_certificate) is True @@ -103,9 +103,9 @@ def test_certificates_manager_has_client_cert_and_key_returns_true_when_material def test_certificates_manager_has_client_cert_and_key_returns_false_when_material_differs( temp_certificates: Any, ) -> None: - temp_certificates.CLIENT_CERT_PATH.write_text(VALID_CLIENT_CERT_PEM) - temp_certificates.CLIENT_KEY_PATH.write_text(VALID_CLIENT_KEY_PEM) - temp_certificates.CA_CERT_PATH.write_text(VALID_CA_CERT_PEM) + temp_certificates.cert_path.write_text(VALID_CLIENT_CERT_PEM) + temp_certificates.key_path.write_text(VALID_CLIENT_KEY_PEM) + temp_certificates.ca_path.write_text(VALID_CA_CERT_PEM) other_shared_certificate = SharedCertificate( certificate=Certificate.from_string(VALID_CA_CERT_PEM), @@ -118,17 +118,17 @@ def test_certificates_manager_has_client_cert_and_key_returns_false_when_materia def test_certificates_manager_generate_does_nothing_when_files_already_exist( temp_certificates: Any, ) -> None: - temp_certificates.CLIENT_CERT_PATH.write_text(VALID_CLIENT_CERT_PEM) - temp_certificates.CLIENT_KEY_PATH.write_text(VALID_CLIENT_KEY_PEM) - temp_certificates.CA_CERT_PATH.write_text(VALID_CA_CERT_PEM) + temp_certificates.cert_path.write_text(VALID_CLIENT_CERT_PEM) + temp_certificates.key_path.write_text(VALID_CLIENT_KEY_PEM) + temp_certificates.ca_path.write_text(VALID_CA_CERT_PEM) old_certificates = make_shared_certificate() new_certificates = temp_certificates.generate(model_uuid='model', app_name='unit-1') written = SharedCertificate.from_strings( - certificate=temp_certificates.CLIENT_CERT_PATH.read_text(), - key=temp_certificates.CLIENT_KEY_PATH.read_text(), - ca=temp_certificates.CA_CERT_PATH.read_text(), + certificate=temp_certificates.cert_path.read_text(), + key=temp_certificates.key_path.read_text(), + ca=temp_certificates.ca_path.read_text(), ) assert written == old_certificates @@ -141,12 +141,10 @@ def test_certificates_manager_generate_creates_all_files( shared = temp_certificates.generate(model_uuid='model', app_name='unit-1') assert temp_certificates._exists() is True - assert temp_certificates.CA_CERT_PATH.read_text().startswith('-----BEGIN CERTIFICATE-----') - assert temp_certificates.CLIENT_KEY_PATH.read_text().startswith( - '-----BEGIN RSA PRIVATE KEY-----' - ) - assert temp_certificates.CLIENT_CERT_PATH.read_text().startswith('-----BEGIN CERTIFICATE-----') + assert temp_certificates.ca_path.read_text().startswith('-----BEGIN CERTIFICATE-----') + assert temp_certificates.key_path.read_text().startswith('-----BEGIN RSA PRIVATE KEY-----') + assert temp_certificates.cert_path.read_text().startswith('-----BEGIN CERTIFICATE-----') - assert temp_certificates.CA_CERT_PATH.read_text() == shared.ca.raw - assert temp_certificates.CLIENT_KEY_PATH.read_text() == shared.key.raw - assert temp_certificates.CLIENT_CERT_PATH.read_text() == shared.certificate.raw + assert temp_certificates.ca_path.read_text() == shared.ca.raw + assert temp_certificates.key_path.read_text() == shared.key.raw + assert temp_certificates.cert_path.read_text() == shared.certificate.raw diff --git a/rollingops/tests/unit/test_etcd_etcdctl.py b/rollingops/tests/unit/test_etcd_etcdctl.py index 26497fa1e..df8b85ca5 100644 --- a/rollingops/tests/unit/test_etcd_etcdctl.py +++ b/rollingops/tests/unit/test_etcd_etcdctl.py @@ -31,12 +31,12 @@ def test_etcdctl_write_env(temp_etcdctl: Any) -> None: client_key_path=LocalPath('PATH2'), ) - assert temp_etcdctl.BASE_DIR.exists() + assert temp_etcdctl.base_dir.exists() - config = json.loads(temp_etcdctl.CONFIG_FILE_PATH.read_text()) + config = json.loads(temp_etcdctl.config_file_path.read_text()) assert config == { 'endpoints': 'https://10.0.0.1:2379,https://10.0.0.2:2379', - 'cacert_path': str(temp_etcdctl.SERVER_CA_PATH), + 'cacert_path': str(temp_etcdctl.server_ca_path), 'cert_path': 'PATH1', 'key_path': 'PATH2', } @@ -48,33 +48,33 @@ def test_etcdctl_ensure_initialized_raises_when_env_missing(temp_etcdctl: Any) - def test_etcdctl_cleanup_removes_env_file_and_server_ca(temp_etcdctl: Any) -> None: - temp_etcdctl.BASE_DIR.mkdir(parents=True, exist_ok=True) - temp_etcdctl.CONFIG_FILE_PATH.write_text('env') - temp_etcdctl.SERVER_CA_PATH.write_text('ca') + temp_etcdctl.base_dir.mkdir(parents=True, exist_ok=True) + temp_etcdctl.config_file_path.write_text('env') + temp_etcdctl.server_ca_path.write_text('ca') - assert temp_etcdctl.CONFIG_FILE_PATH.exists() - assert temp_etcdctl.SERVER_CA_PATH.exists() + assert temp_etcdctl.config_file_path.exists() + assert temp_etcdctl.server_ca_path.exists() temp_etcdctl.cleanup() - assert not temp_etcdctl.CONFIG_FILE_PATH.exists() - assert not temp_etcdctl.SERVER_CA_PATH.exists() + assert not temp_etcdctl.config_file_path.exists() + assert not temp_etcdctl.server_ca_path.exists() def test_etcdctl_cleanup_is_noop_when_files_do_not_exist(temp_etcdctl: Any) -> None: - assert not temp_etcdctl.CONFIG_FILE_PATH.exists() - assert not temp_etcdctl.SERVER_CA_PATH.exists() + assert not temp_etcdctl.config_file_path.exists() + assert not temp_etcdctl.server_ca_path.exists() temp_etcdctl.cleanup() - assert not temp_etcdctl.CONFIG_FILE_PATH.exists() - assert not temp_etcdctl.SERVER_CA_PATH.exists() + assert not temp_etcdctl.config_file_path.exists() + assert not temp_etcdctl.server_ca_path.exists() def test_etcdctl_load_env_parses_exported_vars(temp_etcdctl: Any) -> None: - temp_etcdctl.BASE_DIR.mkdir(parents=True, exist_ok=True) - temp_etcdctl.SERVER_CA_PATH.write_text('SERVER CA') - temp_etcdctl.CONFIG_FILE_PATH.write_text( + temp_etcdctl.base_dir.mkdir(parents=True, exist_ok=True) + temp_etcdctl.server_ca_path.write_text('SERVER CA') + temp_etcdctl.config_file_path.write_text( json.dumps({ 'endpoints': 'https://10.0.0.1:2379', 'cacert_path': '/a-path/server-ca.pem', diff --git a/rollingops/tests/unit/test_etcd_rollingops_in_charm.py b/rollingops/tests/unit/test_etcd_rollingops_in_charm.py index e668412fd..e89d32d54 100644 --- a/rollingops/tests/unit/test_etcd_rollingops_in_charm.py +++ b/rollingops/tests/unit/test_etcd_rollingops_in_charm.py @@ -51,7 +51,6 @@ def _unit_databag(state: State, peer: PeerRelation) -> RawDataBagContents: def test_leader_elected_creates_shared_secret_and_stores_id( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): peer_relation = PeerRelation(endpoint='restart') @@ -68,7 +67,6 @@ def test_leader_elected_creates_shared_secret_and_stores_id( def test_leader_elected_does_not_regenerate_when_secret_already_exists( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): peer_relation = PeerRelation( @@ -95,7 +93,6 @@ def test_leader_elected_does_not_regenerate_when_secret_already_exists( def test_non_leader_does_not_create_shared_secret( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): peer_relation = PeerRelation(endpoint='restart') @@ -110,7 +107,6 @@ def test_non_leader_does_not_create_shared_secret( def test_relation_changed_syncs_local_certificate_from_secret( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): peer_relation = PeerRelation( @@ -138,7 +134,6 @@ def test_relation_changed_syncs_local_certificate_from_secret( def test_invalid_certificate_secret_content_raises( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): peer_relation = PeerRelation( @@ -192,7 +187,7 @@ def test_state_not_initialized(ctx: Context[RollingOpsCharm]): with ctx(ctx.on.start(), state) as mgr: rolling_state = mgr.charm.restart_manager.state - assert rolling_state.status == RollingOpsStatus.UNAVAILABLE + assert rolling_state.status == RollingOpsStatus.NOT_READY assert rolling_state.processing_backend == ProcessingBackend.PEER assert len(rolling_state.operations) == 0 @@ -334,10 +329,118 @@ def test_state_falls_back_to_peer_if_etcd_status_fails(ctx: Context[RollingOpsCh with patch( 'charmlibs.rollingops._rollingops_manager.EtcdRollingOpsBackend.get_status', - return_value=RollingOpsStatus.UNAVAILABLE, + return_value=RollingOpsStatus.NOT_READY, ): with ctx(ctx.on.update_status(), state) as mgr: rolling_state = mgr.charm.restart_manager.state assert rolling_state.status == RollingOpsStatus.WAITING assert rolling_state.processing_backend == ProcessingBackend.PEER assert len(rolling_state.operations) == 1 + + +def test_is_waiting_returns_true_when_matching_operation_exists(ctx: Context[RollingOpsCharm]): + peer_rel = PeerRelation( + endpoint='restart', + interface='rollingops', + local_app_data={}, + local_unit_data={ + 'state': 'request', + 'operations': OperationQueue([ + Operation.create('restart', {'delay': 1}), + Operation.create('restart', {'delay': 2}), + ]).to_string(), + 'executed_at': '', + 'processing_backend': 'peer', + 'etcd_cleanup_needed': 'false', + }, + ) + state = State(leader=False, relations={peer_rel}) + + with ctx(ctx.on.update_status(), state) as mgr: + assert mgr.charm.restart_manager.is_waiting('restart', {'delay': 1}) is True + + +def test_is_waiting_returns_false_when_callback_matches_but_kwargs_do_not( + ctx: Context[RollingOpsCharm], +): + peer_rel = PeerRelation( + endpoint='restart', + interface='rollingops', + local_app_data={}, + local_unit_data={ + 'state': 'request', + 'operations': OperationQueue([ + Operation.create('restart', {'delay': 1}), + ]).to_string(), + 'executed_at': '', + 'processing_backend': 'peer', + 'etcd_cleanup_needed': 'false', + }, + ) + state = State(leader=False, relations={peer_rel}) + + with ctx(ctx.on.update_status(), state) as mgr: + assert mgr.charm.restart_manager.is_waiting('restart', {'delay': 2}) is False + + +def test_is_waiting_returns_false_when_callback_does_not_match(ctx: Context[RollingOpsCharm]): + peer_rel = PeerRelation( + endpoint='restart', + interface='rollingops', + local_app_data={}, + local_unit_data={ + 'state': 'request', + 'operations': OperationQueue([ + Operation.create('restart', {'delay': 1}), + ]).to_string(), + 'executed_at': '', + 'processing_backend': 'peer', + 'etcd_cleanup_needed': 'false', + }, + ) + state = State(leader=False, relations={peer_rel}) + + with ctx(ctx.on.update_status(), state) as mgr: + assert mgr.charm.restart_manager.is_waiting('other-callback', {'delay': 1}) is False + + +def test_is_waiting_returns_true_when_kwargs_is_none_and_matching_operation_has_empty_kwargs( + ctx: Context[RollingOpsCharm], +): + peer_rel = PeerRelation( + endpoint='restart', + interface='rollingops', + local_app_data={}, + local_unit_data={ + 'state': 'request', + 'operations': OperationQueue([ + Operation.create('restart', {}), + ]).to_string(), + 'executed_at': '', + 'processing_backend': 'peer', + 'etcd_cleanup_needed': 'false', + }, + ) + state = State(leader=False, relations={peer_rel}) + + with ctx(ctx.on.update_status(), state) as mgr: + assert mgr.charm.restart_manager.is_waiting('restart') is True + + +def test_is_waiting_returns_false_when_operation_validation_fails(ctx: Context[RollingOpsCharm]): + peer_rel = PeerRelation( + endpoint='restart', + interface='rollingops', + local_app_data={}, + local_unit_data={ + 'state': 'request', + 'operations': OperationQueue([]).to_string(), + 'executed_at': '', + 'processing_backend': 'peer', + 'etcd_cleanup_needed': 'false', + }, + ) + state = State(leader=False, relations={peer_rel}) + + with ctx(ctx.on.update_status(), state) as mgr: + assert mgr.charm.restart_manager.is_waiting('restart', 'a') is False # type: ignore[reportArgumentType] diff --git a/rollingops/tests/unit/test_peer_rollingops_in_charm.py b/rollingops/tests/unit/test_peer_rollingops_in_charm.py index 11389de47..7ad6cae6f 100644 --- a/rollingops/tests/unit/test_peer_rollingops_in_charm.py +++ b/rollingops/tests/unit/test_peer_rollingops_in_charm.py @@ -21,8 +21,9 @@ import pytest from ops.testing import Context, PeerRelation, State from scenario import RawDataBagContents -from tests.unit.conftest import RollingOpsCharm +from tests.unit.conftest import RollingOpsCharm, StrictPeerRollingOpsCharm +from charmlibs.rollingops import ProcessingBackend, RollingOpsStatus from charmlibs.rollingops.common._exceptions import RollingOpsInvalidLockRequestError from charmlibs.rollingops.common._models import Operation, OperationQueue from charmlibs.rollingops.common._utils import now_timestamp @@ -354,7 +355,6 @@ def test_lock_retry_drops_when_max_retry_reached( def test_lock_grant_and_release( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): queue = _make_operation_queue(callback_id='_failed_restart', kwargs={}, max_retry=3) @@ -374,7 +374,6 @@ def test_lock_grant_and_release( def test_scheduling_does_nothing_if_lock_already_granted( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): queue = _make_operation_queue(callback_id='_failed_restart', kwargs={}, max_retry=3) @@ -399,7 +398,6 @@ def test_scheduling_does_nothing_if_lock_already_granted( def test_schedule_picks_retry_hold( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): old_operation = str(now_timestamp().timestamp()) @@ -437,7 +435,6 @@ def test_schedule_picks_retry_hold( def test_schedule_picks_oldest_requested_at_among_requests( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): old_queue = OperationQueue() @@ -465,7 +462,6 @@ def test_schedule_picks_oldest_requested_at_among_requests( def test_schedule_picks_oldest_executed_at_among_retries_when_no_requests( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): old_operation = str(now_timestamp().timestamp()) @@ -498,7 +494,6 @@ def test_schedule_picks_oldest_executed_at_among_retries_when_no_requests( def test_schedule_prioritizes_requests_over_retries( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): queue = _make_operation_queue(callback_id='_failed_restart', kwargs={}, max_retry=3) @@ -525,7 +520,6 @@ def test_schedule_prioritizes_requests_over_retries( def test_no_unit_is_granted_if_there_are_no_requests( certificates_manager_patches: dict[str, MagicMock], - etcdctl_patch: MagicMock, ctx: Context[RollingOpsCharm], ): peer = PeerRelation( @@ -539,3 +533,40 @@ def test_no_unit_is_granted_if_there_are_no_requests( databag = _app_databag(state_out, peer) assert databag.get('granted_unit', '') == '' assert databag.get('granted_at', '') == '' + + +def test_strict_peer_no_unit_is_granted_if_there_are_no_requests( + certificates_manager_patches: dict[str, MagicMock], + strict_peer_ctx: Context[StrictPeerRollingOpsCharm], +): + peer = PeerRelation( + endpoint='restart', + peers_data={1: {'state': LockIntent.IDLE}, 2: {'state': LockIntent.IDLE}}, + ) + state_in = State(leader=True, relations={peer}) + + state_out = strict_peer_ctx.run(strict_peer_ctx.on.leader_elected(), state_in) + + databag = _app_databag(state_out, peer) + assert databag.get('granted_unit', '') == '' + assert databag.get('granted_at', '') == '' + + +def test_state_peer_idle(strict_peer_ctx: Context[StrictPeerRollingOpsCharm]): + peer_rel = PeerRelation( + endpoint='restart', + local_unit_data={ + 'state': '', + 'operations': '', + 'executed_at': '', + 'processing_backend': 'peer', + 'etcd_cleanup_needed': 'false', + }, + ) + state = State(leader=False, relations={peer_rel}) + + with strict_peer_ctx(strict_peer_ctx.on.update_status(), state) as mgr: + rolling_state = mgr.charm.restart_manager.state + assert rolling_state.status == RollingOpsStatus.IDLE + assert rolling_state.processing_backend == ProcessingBackend.PEER + assert len(rolling_state.operations) == 0