From 912c080cec571507b048cb4675b8d56f69dc0d06 Mon Sep 17 00:00:00 2001 From: TexasOct Date: Mon, 22 Dec 2025 22:58:18 +0800 Subject: [PATCH] feat(perception): implement memory-first image storage strategy Implement a memory-first storage strategy to reduce disk I/O and optimize disk usage. **Key Changes:** - ImageManager: Add memory-first mode with dynamic TTL calculation - New methods: persist_image(), persist_images_batch(), cleanup_batch_screenshots() - TTL-based cleanup for memory-only images (default: processing_interval * 2.5) - Track image metadata (timestamp, is_persisted) for lifecycle management - ActionAgent: Trigger persistence only when actions are created - Persist screenshots to disk before saving actions - Immediate batch cleanup for unused screenshots after action generation - Coordinator: Add periodic TTL cleanup in processing loop - Config: Add memory-first configuration options - enable_memory_first, memory_ttl_multiplier, memory_ttl_min/max **Benefits:** - Reduce disk writes by ~95% (from 3,600/hour to ~150/hour) - Immediate cleanup of unused screenshots (vs. waiting for TTL) - Dynamic TTL based on processing_interval for adaptive behavior **Three-tier cleanup mechanism:** 1. Batch cleanup: Remove unused images immediately after action creation 2. TTL cleanup: Evict expired memory-only images every processing cycle 3. LRU eviction: Fallback when cache limit (500 images) is reached --- backend/agents/action_agent.py | 120 ++++++++++++++ backend/config/config.toml | 6 + backend/core/coordinator.py | 13 ++ backend/perception/image_manager.py | 243 ++++++++++++++++++++++++++-- 4 files changed, 373 insertions(+), 9 deletions(-) diff --git a/backend/agents/action_agent.py b/backend/agents/action_agent.py index ed6e9d1..21f7cc5 100644 --- a/backend/agents/action_agent.py +++ b/backend/agents/action_agent.py @@ -123,6 +123,8 @@ async def extract_and_save_actions( # Step 3: Save actions to database saved_count = 0 + all_used_hashes = set() # Track all hashes used across all actions + for resolved in resolved_actions: action_data = resolved["data"] action_hashes = resolved["hashes"] @@ -136,6 +138,12 @@ async def extract_and_save_actions( image_indices, screenshot_records ) + # Persist screenshots to disk before saving action + self._persist_action_screenshots(action_hashes) + + # Track used hashes + all_used_hashes.update(action_hashes) + # Save action to database await self.db.actions.save( action_id=action_id, @@ -149,6 +157,9 @@ async def extract_and_save_actions( saved_count += 1 self.stats["actions_saved"] += 1 + # Step 4: Clean up unused screenshots from this batch (legacy flow) + self._cleanup_unused_batch_screenshots_legacy(screenshot_records, all_used_hashes) + logger.debug(f"ActionAgent: Saved {saved_count} actions to database") return saved_count @@ -384,6 +395,7 @@ async def extract_and_save_actions_from_scenes( # Step 3: Save actions to database saved_count = 0 + all_used_hashes = set() # Track all hashes used across all actions for resolved in resolved_actions: action_data = resolved["data"] @@ -396,6 +408,12 @@ async def extract_and_save_actions_from_scenes( scene_indices, scenes ) + # Persist screenshots to disk before saving action + self._persist_action_screenshots(action_hashes) + + # Track used hashes + all_used_hashes.update(action_hashes) + # Save action to database await self.db.actions.save( action_id=action_id, @@ -409,6 +427,9 @@ async def extract_and_save_actions_from_scenes( saved_count += 1 self.stats["actions_saved"] += 1 + # Step 4: Clean up unused screenshots from this batch + self._cleanup_unused_batch_screenshots(scenes, all_used_hashes) + logger.debug(f"ActionAgent: Saved {saved_count} actions to database") return saved_count @@ -416,6 +437,105 @@ async def extract_and_save_actions_from_scenes( logger.error(f"ActionAgent: Failed to process actions from scenes: {e}", exc_info=True) return 0 + def _persist_action_screenshots(self, screenshot_hashes: list[str]) -> None: + """Persist screenshots to disk when action is saved + + This is the trigger point for memory-first persistence. + + Args: + screenshot_hashes: List of screenshot hashes to persist + """ + try: + if not screenshot_hashes: + return + + logger.debug( + f"Persisting {len(screenshot_hashes)} screenshots for action" + ) + + results = self.image_manager.persist_images_batch(screenshot_hashes) + + # Log warnings for failed persists + failed = [h for h, success in results.items() if not success] + if failed: + logger.warning( + f"Failed to persist {len(failed)} screenshots (likely evicted from memory): " + f"{[h[:8] for h in failed]}" + ) + + except Exception as e: + logger.error(f"Failed to persist action screenshots: {e}", exc_info=True) + + def _cleanup_unused_batch_screenshots(self, scenes: List[Dict[str, Any]], used_hashes: set[str]) -> None: + """Clean up screenshots from this batch that were not used in any action + + This is called after all actions are saved to immediately free memory. + + Args: + scenes: All scenes from this batch (contains all screenshot hashes) + used_hashes: Set of screenshot hashes that were used in actions + """ + try: + # Collect all screenshot hashes from this batch + all_hashes = set() + for scene in scenes: + screenshot_hash = scene.get("screenshot_hash") + if screenshot_hash: + all_hashes.add(screenshot_hash) + + # Calculate unused hashes + unused_hashes = all_hashes - used_hashes + + if not unused_hashes: + logger.debug("No unused screenshots to clean up in this batch") + return + + # Clean up unused screenshots from memory + cleaned_count = self.image_manager.cleanup_batch_screenshots(list(unused_hashes)) + + if cleaned_count > 0: + logger.info( + f"Batch cleanup: removed {cleaned_count}/{len(unused_hashes)} unused screenshots from memory " + f"(total batch: {len(all_hashes)}, used: {len(used_hashes)})" + ) + + except Exception as e: + logger.error(f"Failed to cleanup unused batch screenshots: {e}", exc_info=True) + + def _cleanup_unused_batch_screenshots_legacy(self, screenshot_records: List[RawRecord], used_hashes: set[str]) -> None: + """Clean up screenshots from this batch that were not used in any action (legacy flow) + + Args: + screenshot_records: All screenshot records from this batch + used_hashes: Set of screenshot hashes that were used in actions + """ + try: + # Collect all screenshot hashes from this batch + all_hashes = set() + for record in screenshot_records: + screenshot_hash = record.data.get("hash") + if screenshot_hash: + all_hashes.add(screenshot_hash) + + # Calculate unused hashes + unused_hashes = all_hashes - used_hashes + + if not unused_hashes: + logger.debug("No unused screenshots to clean up in this batch (legacy)") + return + + # Clean up unused screenshots from memory + cleaned_count = self.image_manager.cleanup_batch_screenshots(list(unused_hashes)) + + if cleaned_count > 0: + logger.info( + f"Batch cleanup (legacy): removed {cleaned_count}/{len(unused_hashes)} unused screenshots from memory " + f"(total batch: {len(all_hashes)}, used: {len(used_hashes)})" + ) + + except Exception as e: + logger.error(f"Failed to cleanup unused batch screenshots (legacy): {e}", exc_info=True) + async def _extract_actions_from_scenes( self, scenes: List[Dict[str, Any]], diff --git a/backend/config/config.toml b/backend/config/config.toml index 42debf8..e5b078c 100644 --- a/backend/config/config.toml +++ b/backend/config/config.toml @@ -26,6 +26,12 @@ max_width = 1920 max_height = 1080 enable_phash = true +# Memory-first storage configuration +enable_memory_first = true # Master switch +memory_ttl_multiplier = 2.5 # TTL = processing_interval * multiplier +memory_ttl_min = 60 # Minimum TTL (seconds) +memory_ttl_max = 120 # Maximum TTL (seconds) + # Screenshot configuration [screenshot] # Smart capture - only capture the active monitor diff --git a/backend/core/coordinator.py b/backend/core/coordinator.py index 01700ad..a929c6c 100644 --- a/backend/core/coordinator.py +++ b/backend/core/coordinator.py @@ -505,6 +505,7 @@ async def _processing_loop(self) -> None: try: # First iteration has shorter delay, then use normal interval first_iteration = True + last_ttl_cleanup = datetime.now() # Track last TTL cleanup time while self.is_running: # First iteration starts quickly (100ms), then use configured interval @@ -521,6 +522,18 @@ async def _processing_loop(self) -> None: logger.debug("Coordinator paused, skipping processing cycle") continue + # Periodic TTL cleanup for memory-only images + now = datetime.now() + if (now - last_ttl_cleanup).total_seconds() >= self.processing_interval: + try: + if self.processing_pipeline and self.processing_pipeline.image_manager: + evicted = self.processing_pipeline.image_manager.cleanup_expired_memory_images() + if evicted > 0: + logger.debug(f"TTL cleanup: evicted {evicted} expired memory-only images") + last_ttl_cleanup = now + except Exception as e: + logger.error(f"TTL cleanup failed: {e}") + if not self.perception_manager: logger.error("Perception manager not initialized") raise Exception("Perception manager not initialized") diff --git a/backend/perception/image_manager.py b/backend/perception/image_manager.py index 5b8e419..b4e9495 100644 --- a/backend/perception/image_manager.py +++ b/backend/perception/image_manager.py @@ -29,6 +29,8 @@ def __init__( base_dir: Optional[ str ] = None, # Screenshot storage root directory (override config) + enable_memory_first: bool = True, # Enable memory-first storage strategy + memory_ttl: int = 75, # TTL for memory-only images (seconds) ): # Try to read custom path from configuration try: @@ -53,6 +55,10 @@ def __init__( self.scale_threshold = 1440 # Scale when any side exceeds this threshold self.scale_factor = 0.75 # When scaling is needed, scale to 75% of original size + # Memory-first storage configuration + self.enable_memory_first = enable_memory_first + self.memory_ttl = memory_ttl + # Determine storage directory (supports user configuration) self.base_dir = self._resolve_base_dir(base_dir) self.thumbnails_dir = ensure_dir(self.base_dir / "thumbnails") @@ -60,6 +66,9 @@ def __init__( # Memory cache: hash -> (base64_data, timestamp) self._memory_cache: OrderedDict[str, Tuple[str, datetime]] = OrderedDict() + # Image metadata: hash -> (timestamp, is_persisted) + self._image_metadata: dict[str, Tuple[datetime, bool]] = {} + self._ensure_directories() logger.debug( @@ -149,7 +158,7 @@ def get_multiple_from_cache(self, img_hashes: List[str]) -> Dict[str, str]: return result def add_to_cache(self, img_hash: str, img_data: str) -> None: - """Add image to memory cache + """Add image to memory cache with TTL cleanup Args: img_hash: Image hash value @@ -157,11 +166,26 @@ def add_to_cache(self, img_hash: str, img_data: str) -> None: """ try: now = datetime.now() + + # Perform TTL cleanup before adding new image + if self.enable_memory_first: + self.cleanup_expired_memory_images() + self._memory_cache[img_hash] = (img_data, now) - # Remove oldest entries if cache is full + # LRU eviction if cache is full while len(self._memory_cache) > self.memory_cache_size: - self._memory_cache.popitem(last=False) # Remove oldest + evicted_hash, _ = self._memory_cache.popitem(last=False) + + # Clean metadata for evicted image + if evicted_hash in self._image_metadata: + metadata = self._image_metadata[evicted_hash] + if not metadata[1]: # Not persisted + logger.warning( + f"LRU evicted memory-only image: {evicted_hash[:8]}... " + f"(never persisted to disk)" + ) + del self._image_metadata[evicted_hash] logger.debug(f"Added image to cache: {img_hash[:8]}...") except Exception as e: @@ -231,7 +255,7 @@ def _create_thumbnail(self, img_bytes: bytes) -> bytes: return img_bytes # Return original if thumbnail creation fails def process_image_for_cache(self, img_hash: str, img_bytes: bytes) -> None: - """Process image: create thumbnail (memory cache disabled) + """Process image: create thumbnail and store based on memory-first strategy Args: img_hash: Image hash value @@ -241,14 +265,170 @@ def process_image_for_cache(self, img_hash: str, img_bytes: bytes) -> None: # Create thumbnail thumbnail_bytes = self._create_thumbnail(img_bytes) - # Save thumbnail to disk - self.save_thumbnail(img_hash, thumbnail_bytes) - - # Memory cache is deprecated; skip storing original image - logger.debug(f"Processed image (thumbnail only) for hash: {img_hash[:8]}...") + if self.enable_memory_first: + # Memory-first: store in memory only + thumbnail_base64 = base64.b64encode(thumbnail_bytes).decode("utf-8") + self.add_to_cache(img_hash, thumbnail_base64) + self._image_metadata[img_hash] = (datetime.now(), False) # Mark as memory-only + logger.debug(f"Stored image in memory: {img_hash[:8]}...") + else: + # Legacy: immediate disk save + self.save_thumbnail(img_hash, thumbnail_bytes) + logger.debug(f"Processed image (thumbnail only) for hash: {img_hash[:8]}...") except Exception as e: logger.error(f"Failed to process image for cache: {e}") + def persist_image(self, img_hash: str) -> bool: + """Persist a memory-only image to disk + + Args: + img_hash: Image hash to persist + + Returns: + True if persisted successfully, False otherwise + """ + try: + # Check if already persisted + metadata = self._image_metadata.get(img_hash) + if metadata and metadata[1]: # is_persisted = True + logger.debug(f"Image already persisted: {img_hash[:8]}...") + return True + + # Check if exists on disk already + thumbnail_path = self.thumbnails_dir / f"{img_hash}.jpg" + if thumbnail_path.exists(): + # Update metadata + self._image_metadata[img_hash] = (datetime.now(), True) + logger.debug(f"Image already on disk: {img_hash[:8]}...") + return True + + # Get from memory cache + img_data = self.get_from_cache(img_hash) + if not img_data: + logger.warning( + f"Image not found in memory cache (likely evicted): {img_hash[:8]}... " + f"Cannot persist to disk." + ) + return False + + # Decode and save to disk + img_bytes = base64.b64decode(img_data) + self.save_thumbnail(img_hash, img_bytes) + + # Update metadata + self._image_metadata[img_hash] = (datetime.now(), True) + + logger.debug(f"Persisted image to disk: {img_hash[:8]}...") + return True + + except Exception as e: + logger.error(f"Failed to persist image {img_hash[:8]}: {e}") + return False + + def persist_images_batch(self, img_hashes: list[str]) -> dict[str, bool]: + """Persist multiple images in batch + + Args: + img_hashes: List of image hashes to persist + + Returns: + Dict mapping hash to success status + """ + results = {} + success_count = 0 + + for img_hash in img_hashes: + success = self.persist_image(img_hash) + results[img_hash] = success + if success: + success_count += 1 + + logger.info( + f"Batch persist completed: {success_count}/{len(img_hashes)} images persisted" + ) + + return results + + def cleanup_expired_memory_images(self) -> int: + """Clean up memory-only images that exceed TTL + + Returns: + Number of images evicted + """ + if not self.enable_memory_first: + return 0 + + try: + now = datetime.now() + cutoff_time = now - timedelta(seconds=self.memory_ttl) + + evicted_count = 0 + hashes_to_remove = [] + + for img_hash, (timestamp, is_persisted) in self._image_metadata.items(): + # Only evict memory-only images + if not is_persisted and timestamp < cutoff_time: + hashes_to_remove.append(img_hash) + + # Remove from memory cache + for img_hash in hashes_to_remove: + if img_hash in self._memory_cache: + del self._memory_cache[img_hash] + evicted_count += 1 + + # Clean metadata + if img_hash in self._image_metadata: + del self._image_metadata[img_hash] + + if evicted_count > 0: + logger.info( + f"TTL cleanup: evicted {evicted_count} memory-only images " + f"(TTL={self.memory_ttl}s)" + ) + + return evicted_count + + except Exception as e: + logger.error(f"Failed to cleanup expired memory images: {e}") + return 0 + + def cleanup_batch_screenshots(self, img_hashes: list[str]) -> int: + """Clean up specific screenshots from memory (batch cleanup after action generation) + + This is called after actions are saved to immediately free memory-only images + that were not used in the final actions. + + Args: + img_hashes: List of image hashes to remove from memory + + Returns: + Number of images removed + """ + if not self.enable_memory_first: + return 0 + + try: + removed_count = 0 + + for img_hash in img_hashes: + # Check if this image is memory-only (not persisted) + metadata = self._image_metadata.get(img_hash) + if metadata and not metadata[1]: # is_persisted = False + # Remove from memory cache + if img_hash in self._memory_cache: + del self._memory_cache[img_hash] + removed_count += 1 + + # Remove from metadata + if img_hash in self._image_metadata: + del self._image_metadata[img_hash] + + return removed_count + + except Exception as e: + logger.error(f"Failed to cleanup batch screenshots: {e}") + return 0 + def cleanup_old_files(self, max_age_hours: Optional[int] = None) -> int: """ Clean up old temporary files @@ -367,6 +547,16 @@ def get_stats(self) -> Dict[str, Any]: disk_count += 1 disk_size += file_path.stat().st_size + # Memory-first stats + memory_only_count = 0 + persisted_count = 0 + + for _, (_, is_persisted) in self._image_metadata.items(): + if is_persisted: + persisted_count += 1 + else: + memory_only_count += 1 + return { "memory_cache_count": memory_count, "memory_cache_limit": self.memory_cache_size, @@ -377,6 +567,11 @@ def get_stats(self) -> Dict[str, Any]: "scale_threshold": self.scale_threshold, "scale_factor": self.scale_factor, "thumbnail_quality": self.thumbnail_quality, + # Memory-first stats + "memory_first_enabled": self.enable_memory_first, + "memory_ttl_seconds": self.memory_ttl, + "memory_only_images": memory_only_count, + "persisted_images_in_cache": persisted_count, } except Exception as e: @@ -471,5 +666,35 @@ def get_image_manager() -> ImageManager: def init_image_manager(**kwargs) -> ImageManager: """Initialize image manager (can customize parameters)""" global _image_manager + + # Calculate TTL from config if not provided + if "memory_ttl" not in kwargs or "enable_memory_first" not in kwargs: + try: + from config.loader import get_config + + config = get_config().load() + + enable_memory_first = config.get("image.enable_memory_first", True) + processing_interval = config.get("monitoring.processing_interval", 30) + multiplier = config.get("image.memory_ttl_multiplier", 2.5) + ttl_min = config.get("image.memory_ttl_min", 60) + ttl_max = config.get("image.memory_ttl_max", 120) + + # Calculate dynamic TTL + calculated_ttl = int(processing_interval * multiplier) + memory_ttl = max(ttl_min, min(ttl_max, calculated_ttl)) + + if "enable_memory_first" not in kwargs: + kwargs["enable_memory_first"] = enable_memory_first + if "memory_ttl" not in kwargs: + kwargs["memory_ttl"] = memory_ttl + + logger.info( + f"ImageManager: memory_first={enable_memory_first}, " + f"TTL={memory_ttl}s (processing_interval={processing_interval}s)" + ) + except Exception as e: + logger.warning(f"Failed to calculate memory TTL from config: {e}") + _image_manager = ImageManager(**kwargs) return _image_manager