From 6e49b0363f26bf05664ecfab6fe9bac92e6b9b2e Mon Sep 17 00:00:00 2001 From: Paul Calnon Date: Wed, 6 May 2026 00:14:04 -0500 Subject: [PATCH 1/2] audit-fixup: wire juniper_data_datasets_cached gauge to cache layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The juniper_data_datasets_cached Gauge was defined in juniper_data/api/observability.py with a tested set_datasets_cached() helper, but had zero production callers -- the gauge would always read 0 in scrape output. juniper-ml#223 (post-METRICS-MON state report §15) flagged this as the only remaining test-vs-production gap. Wires the gauge into CachedDatasetStore (the canonical cache layer) via a private _emit_cached_count() helper that probes the cache backend for its current population and publishes via set_datasets_cached(). Failures are swallowed so observability never breaks the storage path -- mirrors the contextlib.suppress discipline already used everywhere else in the class. Cache mutation sites wired: - save() (write_through path) cached.py:88 - get_artifact_bytes() (read-through) cached.py:131-132 - delete() cached.py:162-163 - invalidate_cache() cached.py:216 - warm_cache() (post-batch) cached.py:248-249 update_meta() is intentionally NOT wired -- it mutates an existing entry, never changes cardinality. Tests: six new tests in test_cached_store.py cover insert / evict / invalidate / warm / read-through populate / no-write-through. Uses the _value.get() pattern + a registry-reset fixture matching test_observability.py. Local results: pytest juniper_data/tests/unit/ -x 813 passed pre-commit run --files cached.py test_cached_store.py all pass Refs: juniper-ml#223 §15 Tracker: juniper-ml notes/code-review/POST_METRICS_MON_TRACKER_2026-05-05.md Co-Authored-By: Claude Opus 4.7 (1M context) --- juniper_data/storage/cached.py | 40 ++++- juniper_data/tests/unit/test_cached_store.py | 157 +++++++++++++++++++ 2 files changed, 196 insertions(+), 1 deletion(-) diff --git a/juniper_data/storage/cached.py b/juniper_data/storage/cached.py index 9eb725f..d23e569 100644 --- a/juniper_data/storage/cached.py +++ b/juniper_data/storage/cached.py @@ -5,12 +5,21 @@ import numpy as np +from juniper_data.api.observability import set_datasets_cached from juniper_data.core.models import DatasetMeta from juniper_data.storage.constants import DEFAULT_LIST_LIMIT, DEFAULT_LIST_OFFSET logger = logging.getLogger(__name__) from .base import DatasetStore +# Probe limit used when sampling the cache backend for the +# ``juniper_data_datasets_cached`` gauge. Mirrors the limit used by +# :meth:`CachedDatasetStore.warm_cache` so the gauge reflects the same +# population that warm_cache would touch. Cache backends are expected +# to be in-memory (Redis / InMemoryDatasetStore) so a SCAN over 10k +# keys is cheap relative to a dataset save/load. +_CACHE_COUNT_PROBE_LIMIT: int = 10_000 + class CachedDatasetStore(DatasetStore): """Composable caching wrapper for dataset storage. @@ -42,6 +51,22 @@ def __init__( self._cache = cache self._write_through = write_through + def _emit_cached_count(self) -> None: + """Update the ``juniper_data_datasets_cached`` Prometheus gauge. + + Probes the cache backend for its current dataset population and + publishes the count via :func:`juniper_data.api.observability.set_datasets_cached`. + Failures (cache backend unavailable, metric registry not yet + initialised, etc.) are swallowed so observability never breaks + the storage path -- mirrors the ``contextlib.suppress(Exception)`` + discipline used everywhere else in this class. + """ + try: + count = len(self._cache.list_datasets(limit=_CACHE_COUNT_PROBE_LIMIT)) + set_datasets_cached(count) + except Exception: + logger.debug("Failed to update juniper_data_datasets_cached gauge", exc_info=True) + def save( self, dataset_id: str, @@ -60,6 +85,7 @@ def save( if self._write_through: with contextlib.suppress(Exception): self._cache.save(dataset_id, meta, arrays) + self._emit_cached_count() def get_meta(self, dataset_id: str) -> DatasetMeta | None: """Get metadata, checking cache first. @@ -92,6 +118,7 @@ def get_artifact_bytes(self, dataset_id: str) -> bytes | None: artifact = self._primary.get_artifact_bytes(dataset_id) if artifact is not None: + populated = False with contextlib.suppress(Exception): meta = self._primary.get_meta(dataset_id) if meta is not None: @@ -100,6 +127,9 @@ def get_artifact_bytes(self, dataset_id: str) -> bytes | None: with np.load(io.BytesIO(artifact)) as npz: arrays = {k: npz[k] for k in npz.files} self._cache.save(dataset_id, meta, arrays) + populated = True + if populated: + self._emit_cached_count() return artifact def exists(self, dataset_id: str) -> bool: @@ -125,8 +155,12 @@ def delete(self, dataset_id: str) -> bool: Returns: True if the dataset was deleted from primary, False otherwise. """ + cache_touched = False with contextlib.suppress(Exception): self._cache.delete(dataset_id) + cache_touched = True + if cache_touched: + self._emit_cached_count() return self._primary.delete(dataset_id) def list_datasets(self, limit: int = DEFAULT_LIST_LIMIT, offset: int = DEFAULT_LIST_OFFSET) -> list[str]: @@ -176,9 +210,11 @@ def invalidate_cache(self, dataset_id: str) -> bool: True if entry was removed from cache, False otherwise. """ try: - return self._cache.delete(dataset_id) + result = self._cache.delete(dataset_id) except Exception: return False + self._emit_cached_count() + return result def warm_cache(self, dataset_ids: list[str] | None = None) -> int: """Populate cache from primary store. @@ -209,4 +245,6 @@ def warm_cache(self, dataset_ids: list[str] | None = None) -> int: logger.warning("Failed to cache dataset %s", dataset_id, exc_info=True) continue + if cached_count > 0: + self._emit_cached_count() return cached_count diff --git a/juniper_data/tests/unit/test_cached_store.py b/juniper_data/tests/unit/test_cached_store.py index 0ca35c4..6821fc9 100644 --- a/juniper_data/tests/unit/test_cached_store.py +++ b/juniper_data/tests/unit/test_cached_store.py @@ -6,6 +6,7 @@ import numpy as np import pytest +import juniper_data.api.observability as obs from juniper_data.core.models import DatasetMeta from juniper_data.storage import CachedDatasetStore, InMemoryDatasetStore @@ -421,3 +422,159 @@ def test_exists_suppresses_cache_error( assert cached.exists("test-1") is True assert cached.exists("nonexistent") is False + + +@pytest.fixture +def _reset_dataset_metrics(): + """Reset the lazy-init dataset metrics + REGISTRY between tests. + + Mirrors the autouse fixture in ``test_observability.py``: re-using + ``CachedDatasetStore`` across tests would otherwise trip + ``ValueError: Duplicated timeseries in CollectorRegistry`` when the + gauge is re-registered against the global ``prometheus_client`` + REGISTRY. + """ + pytest.importorskip("prometheus_client") + from prometheus_client import REGISTRY + + obs._dataset_metrics = None + yield + collectors = list(getattr(REGISTRY, "_collector_to_names", {}).keys()) + for collector in collectors: + try: + REGISTRY.unregister(collector) + except KeyError: + pass + obs._dataset_metrics = None + + +def _read_datasets_cached_gauge() -> float: + """Return the current value of the ``juniper_data_datasets_cached`` gauge.""" + return obs._ensure_dataset_metrics()["datasets_cached"]._value.get() + + +@pytest.mark.unit +class TestDatasetsCachedGauge: + """Wire-up tests for the ``juniper_data_datasets_cached`` Prometheus gauge. + + Closes the production-caller gap surfaced by juniper-ml#223 + (post-METRICS-MON state report §15): the gauge was defined and + helper-tested but had no production updater. ``CachedDatasetStore`` + is the canonical cache layer, so its mutation paths now publish the + cache cardinality after every change. + """ + + def test_save_emits_cache_count( + self, + _reset_dataset_metrics, + primary_store: InMemoryDatasetStore, + cache_store: InMemoryDatasetStore, + sample_meta: DatasetMeta, + sample_arrays: dict[str, np.ndarray], + ) -> None: + """save() with write_through should publish the cache cardinality.""" + cached = CachedDatasetStore(primary_store, cache_store, write_through=True) + + cached.save("test-1", sample_meta, sample_arrays) + assert _read_datasets_cached_gauge() == 1 + + cached.save("test-2", sample_meta, sample_arrays) + assert _read_datasets_cached_gauge() == 2 + + cached.save("test-3", sample_meta, sample_arrays) + assert _read_datasets_cached_gauge() == 3 + + def test_delete_emits_decremented_cache_count( + self, + _reset_dataset_metrics, + primary_store: InMemoryDatasetStore, + cache_store: InMemoryDatasetStore, + sample_meta: DatasetMeta, + sample_arrays: dict[str, np.ndarray], + ) -> None: + """delete() should publish the post-eviction cache cardinality.""" + cached = CachedDatasetStore(primary_store, cache_store, write_through=True) + + cached.save("test-1", sample_meta, sample_arrays) + cached.save("test-2", sample_meta, sample_arrays) + cached.save("test-3", sample_meta, sample_arrays) + assert _read_datasets_cached_gauge() == 3 + + cached.delete("test-2") + assert _read_datasets_cached_gauge() == 2 + + cached.delete("test-1") + assert _read_datasets_cached_gauge() == 1 + + def test_invalidate_cache_emits_decremented_cache_count( + self, + _reset_dataset_metrics, + primary_store: InMemoryDatasetStore, + cache_store: InMemoryDatasetStore, + sample_meta: DatasetMeta, + sample_arrays: dict[str, np.ndarray], + ) -> None: + """invalidate_cache() should publish the post-eviction cache cardinality.""" + cached = CachedDatasetStore(primary_store, cache_store, write_through=True) + cached.save("test-1", sample_meta, sample_arrays) + cached.save("test-2", sample_meta, sample_arrays) + assert _read_datasets_cached_gauge() == 2 + + cached.invalidate_cache("test-1") + + assert _read_datasets_cached_gauge() == 1 + assert primary_store.exists("test-1") # primary untouched + + def test_warm_cache_emits_populated_count( + self, + _reset_dataset_metrics, + primary_store: InMemoryDatasetStore, + cache_store: InMemoryDatasetStore, + sample_meta: DatasetMeta, + sample_arrays: dict[str, np.ndarray], + ) -> None: + """warm_cache() should publish the post-warm cache cardinality.""" + cached = CachedDatasetStore(primary_store, cache_store, write_through=False) + primary_store.save("test-1", sample_meta, sample_arrays) + primary_store.save("test-2", sample_meta, sample_arrays) + # No write-through, so the cache is empty before warming. + assert _read_datasets_cached_gauge() == 0 + + count = cached.warm_cache() + + assert count == 2 + assert _read_datasets_cached_gauge() == 2 + + def test_get_artifact_bytes_read_through_emits_cache_count( + self, + _reset_dataset_metrics, + primary_store: InMemoryDatasetStore, + cache_store: InMemoryDatasetStore, + sample_meta: DatasetMeta, + sample_arrays: dict[str, np.ndarray], + ) -> None: + """Read-through cache population in get_artifact_bytes should publish.""" + cached = CachedDatasetStore(primary_store, cache_store, write_through=False) + primary_store.save("test-1", sample_meta, sample_arrays) + assert _read_datasets_cached_gauge() == 0 + + artifact = cached.get_artifact_bytes("test-1") + + assert artifact is not None + assert _read_datasets_cached_gauge() == 1 + + def test_save_without_write_through_does_not_emit( + self, + _reset_dataset_metrics, + primary_store: InMemoryDatasetStore, + cache_store: InMemoryDatasetStore, + sample_meta: DatasetMeta, + sample_arrays: dict[str, np.ndarray], + ) -> None: + """save() with write_through=False does not touch the cache, so no emit.""" + cached = CachedDatasetStore(primary_store, cache_store, write_through=False) + + cached.save("test-1", sample_meta, sample_arrays) + + # Cache is empty; gauge stays at 0 (default for a fresh Gauge). + assert _read_datasets_cached_gauge() == 0 From 5886a14035c8071155713a4c7e5ecdd4b604b951 Mon Sep 17 00:00:00 2001 From: Overtoad Date: Wed, 6 May 2026 00:29:12 -0500 Subject: [PATCH 2/2] Potential fix for pull request finding 'CodeQL / Empty except' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --- juniper_data/tests/unit/test_cached_store.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/juniper_data/tests/unit/test_cached_store.py b/juniper_data/tests/unit/test_cached_store.py index 6821fc9..b5b5c8e 100644 --- a/juniper_data/tests/unit/test_cached_store.py +++ b/juniper_data/tests/unit/test_cached_store.py @@ -444,7 +444,8 @@ def _reset_dataset_metrics(): try: REGISTRY.unregister(collector) except KeyError: - pass + # Collector may already be absent; teardown is best-effort. + continue obs._dataset_metrics = None