Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions backend/agents/action_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ async def extract_and_save_actions(

# Step 3: Save actions to database
saved_count = 0
all_used_hashes = set() # Track all hashes used across all actions

for resolved in resolved_actions:
action_data = resolved["data"]
action_hashes = resolved["hashes"]
Expand All @@ -136,6 +138,12 @@ async def extract_and_save_actions(
image_indices, screenshot_records
)

# Persist screenshots to disk before saving action
self._persist_action_screenshots(action_hashes)

# Track used hashes
all_used_hashes.update(action_hashes)

# Save action to database
await self.db.actions.save(
action_id=action_id,
Expand All @@ -149,6 +157,9 @@ async def extract_and_save_actions(
saved_count += 1
self.stats["actions_saved"] += 1

# Step 4: Clean up unused screenshots from this batch (legacy flow)
self._cleanup_unused_batch_screenshots_legacy(screenshot_records, all_used_hashes)

logger.debug(f"ActionAgent: Saved {saved_count} actions to database")
return saved_count

Expand Down Expand Up @@ -384,6 +395,7 @@ async def extract_and_save_actions_from_scenes(

# Step 3: Save actions to database
saved_count = 0
all_used_hashes = set() # Track all hashes used across all actions

for resolved in resolved_actions:
action_data = resolved["data"]
Expand All @@ -396,6 +408,12 @@ async def extract_and_save_actions_from_scenes(
scene_indices, scenes
)

# Persist screenshots to disk before saving action
self._persist_action_screenshots(action_hashes)

# Track used hashes
all_used_hashes.update(action_hashes)

# Save action to database
await self.db.actions.save(
action_id=action_id,
Expand All @@ -409,13 +427,115 @@ async def extract_and_save_actions_from_scenes(
saved_count += 1
self.stats["actions_saved"] += 1

# Step 4: Clean up unused screenshots from this batch
self._cleanup_unused_batch_screenshots(scenes, all_used_hashes)

logger.debug(f"ActionAgent: Saved {saved_count} actions to database")
return saved_count

except Exception as e:
logger.error(f"ActionAgent: Failed to process actions from scenes: {e}", exc_info=True)
return 0

def _persist_action_screenshots(self, screenshot_hashes: list[str]) -> None:
"""Persist screenshots to disk when action is saved

This is the trigger point for memory-first persistence.

Args:
screenshot_hashes: List of screenshot hashes to persist
"""
try:
if not screenshot_hashes:
return

logger.debug(
f"Persisting {len(screenshot_hashes)} screenshots for action"
)

results = self.image_manager.persist_images_batch(screenshot_hashes)

# Log warnings for failed persists
failed = [h for h, success in results.items() if not success]
if failed:
logger.warning(
f"Failed to persist {len(failed)} screenshots (likely evicted from memory): "
f"{[h[:8] for h in failed]}"
)

except Exception as e:
logger.error(f"Failed to persist action screenshots: {e}", exc_info=True)

def _cleanup_unused_batch_screenshots(self, scenes: List[Dict[str, Any]], used_hashes: set[str]) -> None:
"""Clean up screenshots from this batch that were not used in any action

This is called after all actions are saved to immediately free memory.

Args:
scenes: All scenes from this batch (contains all screenshot hashes)
used_hashes: Set of screenshot hashes that were used in actions
"""
try:
# Collect all screenshot hashes from this batch
all_hashes = set()
for scene in scenes:
screenshot_hash = scene.get("screenshot_hash")
if screenshot_hash:
all_hashes.add(screenshot_hash)

# Calculate unused hashes
unused_hashes = all_hashes - used_hashes

if not unused_hashes:
logger.debug("No unused screenshots to clean up in this batch")
return

# Clean up unused screenshots from memory
cleaned_count = self.image_manager.cleanup_batch_screenshots(list(unused_hashes))

if cleaned_count > 0:
logger.info(
f"Batch cleanup: removed {cleaned_count}/{len(unused_hashes)} unused screenshots from memory "
f"(total batch: {len(all_hashes)}, used: {len(used_hashes)})"
)

except Exception as e:
logger.error(f"Failed to cleanup unused batch screenshots: {e}", exc_info=True)

def _cleanup_unused_batch_screenshots_legacy(self, screenshot_records: List[RawRecord], used_hashes: set[str]) -> None:
"""Clean up screenshots from this batch that were not used in any action (legacy flow)

Args:
screenshot_records: All screenshot records from this batch
used_hashes: Set of screenshot hashes that were used in actions
"""
try:
# Collect all screenshot hashes from this batch
all_hashes = set()
for record in screenshot_records:
screenshot_hash = record.data.get("hash")
if screenshot_hash:
all_hashes.add(screenshot_hash)

# Calculate unused hashes
unused_hashes = all_hashes - used_hashes

if not unused_hashes:
logger.debug("No unused screenshots to clean up in this batch (legacy)")
return

# Clean up unused screenshots from memory
cleaned_count = self.image_manager.cleanup_batch_screenshots(list(unused_hashes))

if cleaned_count > 0:
logger.info(
f"Batch cleanup (legacy): removed {cleaned_count}/{len(unused_hashes)} unused screenshots from memory "
f"(total batch: {len(all_hashes)}, used: {len(used_hashes)})"
)

except Exception as e:
logger.error(f"Failed to cleanup unused batch screenshots (legacy): {e}", exc_info=True)

async def _extract_actions_from_scenes(
self,
scenes: List[Dict[str, Any]],
Expand Down
6 changes: 6 additions & 0 deletions backend/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ max_width = 1920
max_height = 1080
enable_phash = true

# Memory-first storage configuration
enable_memory_first = true # Master switch
memory_ttl_multiplier = 2.5 # TTL = processing_interval * multiplier
memory_ttl_min = 60 # Minimum TTL (seconds)
memory_ttl_max = 120 # Maximum TTL (seconds)

# Screenshot configuration
[screenshot]
# Smart capture - only capture the active monitor
Expand Down
13 changes: 13 additions & 0 deletions backend/core/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ async def _processing_loop(self) -> None:
try:
# First iteration has shorter delay, then use normal interval
first_iteration = True
last_ttl_cleanup = datetime.now() # Track last TTL cleanup time

while self.is_running:
# First iteration starts quickly (100ms), then use configured interval
Expand All @@ -521,6 +522,18 @@ async def _processing_loop(self) -> None:
logger.debug("Coordinator paused, skipping processing cycle")
continue

# Periodic TTL cleanup for memory-only images
now = datetime.now()
if (now - last_ttl_cleanup).total_seconds() >= self.processing_interval:
try:
if self.processing_pipeline and self.processing_pipeline.image_manager:
evicted = self.processing_pipeline.image_manager.cleanup_expired_memory_images()
if evicted > 0:
logger.debug(f"TTL cleanup: evicted {evicted} expired memory-only images")
last_ttl_cleanup = now
except Exception as e:
logger.error(f"TTL cleanup failed: {e}")

if not self.perception_manager:
logger.error("Perception manager not initialized")
raise Exception("Perception manager not initialized")
Expand Down
Loading