diff --git a/BACKGROUND_INGESTION_PRD.md b/BACKGROUND_INGESTION_PRD.md new file mode 100644 index 00000000..30db7c11 --- /dev/null +++ b/BACKGROUND_INGESTION_PRD.md @@ -0,0 +1,46 @@ +# Background Document Ingestion + +## Overview + +This feature allows CLI users to specify a directory for automatic document ingestion. Instead of manually running `elroy ingest` each time, Elroy will automatically monitor and ingest documents while the application is open. + +## Core Requirements + +### Configuration +- **Storage**: Background ingestion configuration is stored per-user in the database +- **Directory Setting**: Users can specify a watch directory via CLI tool/slash command +- **Patterns**: Support include/exclude glob patterns (same as manual ingestion) +- **Recursion**: Support recursive directory watching (enabled by default) + +### Change Detection +- **File System Watching**: Use `watchdog` library for real-time file system events +- **Periodic Full Scans**: Full directory scan approximately once per day to catch any missed changes +- **Duplicate Handling**: Leverage existing MD5 hash-based duplicate detection +- **File Operations**: Detect new files, modified files, and moved files (deleted file handling via existing inactive marking) + +### Technical Implementation +- **Scheduler Integration**: Use existing APScheduler system for periodic tasks +- **Multi-instance**: Handle multiple CLI instances gracefully (duplicate processing is acceptable due to existing deduplication) +- **Error Handling**: Log errors when ingestion fails, watch directory becomes inaccessible, etc. +- **Resource Management**: Handle thousands of files efficiently with infrequent changes + +### User Experience +- **Silent Operation**: Run in background without interrupting user workflow +- **Status Tracking**: Track last scan time and status in database +- **Configuration**: Simple CLI tool/slash command to enable/disable and configure + +## Frequency Configuration +- **Watchdog**: Real-time file system event monitoring while application is open +- **Full Scan**: Daily comprehensive directory scan (hard-coded interval) +- **Configuration Variable**: Hard-coded frequency setting for full scans + +## Database Schema +New `BackgroundIngestionConfig` table with: +- `user_id`: Foreign key to user +- `watch_directory`: Path to monitor +- `is_active`: Enable/disable flag +- `recursive`: Recursive monitoring flag +- `include_patterns`: Comma-separated glob patterns +- `exclude_patterns`: Comma-separated glob patterns +- `last_full_scan`: Timestamp of last full scan +- `last_scan_status`: Status tracking ('success', 'error', 'pending') diff --git a/elroy/cli/main.py b/elroy/cli/main.py index 06b22771..96eb276f 100644 --- a/elroy/cli/main.py +++ b/elroy/cli/main.py @@ -393,10 +393,17 @@ def chat(typer_ctx: typer.Context): # Initialize the APScheduler from ..core.async_tasks import init_scheduler, shutdown_scheduler + from ..core.background_ingestion_scheduler import ( + start_background_ingestion_for_user, + stop_background_ingestion, + ) init_scheduler() with init_elroy_session(ctx, io, True, True): + # Start background ingestion if configured + start_background_ingestion_for_user(ctx) + try: handle_chat(io, params["enable_assistant_greeting"], ctx) except BdbQuit: @@ -411,6 +418,7 @@ def chat(typer_ctx: typer.Context): else: create_bug_report_from_exception_if_confirmed(io, ctx, e) finally: + stop_background_ingestion() shutdown_scheduler(wait=False) else: diff --git a/elroy/core/background_ingestion.py b/elroy/core/background_ingestion.py new file mode 100644 index 00000000..e922d82e --- /dev/null +++ b/elroy/core/background_ingestion.py @@ -0,0 +1,263 @@ +import time +from pathlib import Path +from threading import Lock +from typing import Dict, Optional + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + +from ..core.ctx import ElroyContext +from ..core.logging import get_logger +from ..core.session import dbsession +from ..repository.background_ingestion.operations import ( + get_include_exclude_patterns, + mark_scan_completed, +) +from ..repository.background_ingestion.queries import ( + get_active_background_ingestion_config, +) +from ..repository.documents.operations import DocIngestStatus, do_ingest, do_ingest_dir +from ..utils.clock import utc_now + +logger = get_logger() + + +class BackgroundIngestionHandler(FileSystemEventHandler): + """File system event handler for background document ingestion.""" + + def __init__(self, ctx: ElroyContext): + super().__init__() + self.ctx = ctx + self.recent_files: Dict[str, float] = {} # Track recently processed files to avoid duplicates + self.lock = Lock() # Thread safety for recent_files + + def _should_process_file(self, file_path: str) -> bool: + """Check if a file should be processed based on timing and patterns.""" + + # Get current config + with dbsession(self.ctx): + config = get_active_background_ingestion_config(self.ctx) + if not config: + return False + + include_patterns, exclude_patterns = get_include_exclude_patterns(config) + + path = Path(file_path) + + # Check if file exists and is a file (not directory) + if not path.exists() or not path.is_file(): + return False + + # Import and use existing filtering logic + from ..repository.documents.operations import should_process_file + + if not should_process_file(path, include_patterns, exclude_patterns): + return False + + # Prevent duplicate processing of recently modified files + with self.lock: + current_time = time.time() + if file_path in self.recent_files: + if current_time - self.recent_files[file_path] < 5.0: # 5 second cooldown + return False + self.recent_files[file_path] = current_time + + # Clean up old entries (older than 1 minute) + cutoff_time = current_time - 60 + self.recent_files = {k: v for k, v in self.recent_files.items() if v > cutoff_time} + + return True + + def _process_file(self, file_path: str) -> None: + """Process a single file for ingestion.""" + try: + # Create a new context with fresh DB session for this thread + new_ctx = ElroyContext(**vars(self.ctx.params)) + with dbsession(new_ctx): + result = do_ingest(new_ctx, Path(file_path), force_refresh=False) + + if result in [DocIngestStatus.SUCCESS, DocIngestStatus.UPDATED]: + logger.info(f"Background ingestion: {result.name} for {file_path}") + elif result == DocIngestStatus.MOVED: + logger.info(f"Background ingestion: Document moved, updated address for {file_path}") + else: + logger.debug(f"Background ingestion: {result.name} for {file_path}") + + except Exception as e: + logger.error(f"Background ingestion failed for {file_path}: {str(e)}", exc_info=True) + + def on_created(self, event): + """Handle file creation events.""" + if not event.is_directory and self._should_process_file(event.src_path): + logger.debug(f"File created: {event.src_path}") + self._process_file(event.src_path) + + def on_modified(self, event): + """Handle file modification events.""" + if not event.is_directory and self._should_process_file(event.src_path): + logger.debug(f"File modified: {event.src_path}") + self._process_file(event.src_path) + + def on_moved(self, event): + """Handle file move events.""" + if not event.is_directory and self._should_process_file(event.dest_path): + logger.debug(f"File moved: {event.src_path} -> {event.dest_path}") + self._process_file(event.dest_path) + + +class BackgroundIngestionService: + """Service to manage background document ingestion.""" + + def __init__(self, ctx: ElroyContext): + self.ctx = ctx + self.observer: Optional[Observer] = None + self.handler: Optional[BackgroundIngestionHandler] = None + self.is_running = False + + def start(self) -> bool: + """Start the background ingestion service. + + Returns: + True if service started successfully, False otherwise + """ + if self.is_running: + logger.warning("Background ingestion service is already running") + return False + + if not self.ctx.background_ingestion_enabled: + logger.info("Background ingestion is globally disabled") + return False + + config = get_active_background_ingestion_config(self.ctx) + if not config: + logger.debug("No active background ingestion configuration found") + return False + + watch_directory = Path(config.watch_directory) + if not watch_directory.exists() or not watch_directory.is_dir(): + logger.error(f"Watch directory does not exist or is not a directory: {config.watch_directory}") + return False + + try: + self.handler = BackgroundIngestionHandler(self.ctx) + self.observer = Observer() + self.observer.schedule(self.handler, str(watch_directory), recursive=config.recursive) + self.observer.start() + self.is_running = True + logger.info(f"Started background ingestion service for directory: {watch_directory}") + return True + + except Exception as e: + logger.error(f"Failed to start background ingestion service: {str(e)}", exc_info=True) + return False + + def stop(self) -> bool: + """Stop the background ingestion service. + + Returns: + True if service stopped successfully, False otherwise + """ + if not self.is_running: + return False + + try: + if self.observer: + self.observer.stop() + self.observer.join(timeout=5.0) # Wait up to 5 seconds + self.observer = None + + self.handler = None + self.is_running = False + logger.info("Stopped background ingestion service") + return True + + except Exception as e: + logger.error(f"Error stopping background ingestion service: {str(e)}", exc_info=True) + return False + + def restart(self) -> bool: + """Restart the background ingestion service. + + Returns: + True if service restarted successfully, False otherwise + """ + self.stop() + return self.start() + + def is_service_running(self) -> bool: + """Check if the service is currently running.""" + return self.is_running and self.observer is not None + + +def perform_full_scan(ctx: ElroyContext) -> bool: + """Perform a full scan of the configured watch directory. + + Args: + ctx: The Elroy context + + Returns: + True if scan completed successfully, False otherwise + """ + config = get_active_background_ingestion_config(ctx) + if not config: + logger.debug("No active background ingestion configuration for full scan") + return False + + watch_directory = Path(config.watch_directory) + if not watch_directory.exists() or not watch_directory.is_dir(): + logger.error(f"Watch directory does not exist for full scan: {config.watch_directory}") + mark_scan_completed(ctx, success=False) + return False + + try: + logger.info(f"Starting full background ingestion scan of directory: {watch_directory}") + + include_patterns, exclude_patterns = get_include_exclude_patterns(config) + + # Use existing directory ingestion logic + total_processed = 0 + for status_update in do_ingest_dir( + ctx, watch_directory, force_refresh=False, recursive=config.recursive, include=include_patterns, exclude=exclude_patterns + ): + # Get final status counts + if DocIngestStatus.PENDING not in status_update or status_update[DocIngestStatus.PENDING] == 0: + total_processed = sum(status_update.values()) + success_count = status_update.get(DocIngestStatus.SUCCESS, 0) + updated_count = status_update.get(DocIngestStatus.UPDATED, 0) + unchanged_count = status_update.get(DocIngestStatus.UNCHANGED, 0) + moved_count = status_update.get(DocIngestStatus.MOVED, 0) + + logger.info( + f"Full scan completed: {total_processed} files processed " + f"({success_count} new, {updated_count} updated, {moved_count} moved, {unchanged_count} unchanged)" + ) + break + + mark_scan_completed(ctx, success=True) + return True + + except Exception as e: + logger.error(f"Full background ingestion scan failed: {str(e)}", exc_info=True) + mark_scan_completed(ctx, success=False) + return False + + +def should_run_full_scan(ctx: ElroyContext) -> bool: + """Check if a full scan should be run based on configuration and last scan time. + + Args: + ctx: The Elroy context + + Returns: + True if a full scan should be run, False otherwise + """ + config = get_active_background_ingestion_config(ctx) + if not config: + return False + + if not config.last_full_scan: + return True # Never run before + + # Check if enough time has passed since last scan + hours_since_scan = (utc_now() - config.last_full_scan).total_seconds() / 3600 + return hours_since_scan >= ctx.background_ingestion_full_scan_interval_hours diff --git a/elroy/core/background_ingestion_scheduler.py b/elroy/core/background_ingestion_scheduler.py new file mode 100644 index 00000000..731aba89 --- /dev/null +++ b/elroy/core/background_ingestion_scheduler.py @@ -0,0 +1,154 @@ +from datetime import datetime, timedelta +from typing import Optional + +from ..core.async_tasks import get_scheduler, schedule_task +from ..core.background_ingestion import ( + BackgroundIngestionService, + perform_full_scan, + should_run_full_scan, +) +from ..core.ctx import ElroyContext +from ..core.logging import get_logger +from ..repository.background_ingestion.queries import ( + get_active_background_ingestion_config, +) + +logger = get_logger() + +# Global service instance +_background_service: Optional[BackgroundIngestionService] = None + + +def start_background_ingestion_for_user(ctx: ElroyContext) -> bool: + """Start background ingestion service and schedule periodic tasks for a user. + + Args: + ctx: The Elroy context + + Returns: + True if started successfully, False otherwise + """ + global _background_service + + if not ctx.background_ingestion_enabled: + logger.debug("Background ingestion is globally disabled") + return False + + # Check if user has active config + config = get_active_background_ingestion_config(ctx) + if not config: + logger.debug("No active background ingestion config found for user") + return False + + # Start file system watching service + if _background_service is None: + _background_service = BackgroundIngestionService(ctx) + + if not _background_service.is_service_running(): + success = _background_service.start() + if not success: + return False + + # Schedule periodic full scans + schedule_periodic_full_scan(ctx) + + # Run initial full scan if needed + if should_run_full_scan(ctx): + schedule_task( + perform_full_scan, + ctx, + delay_seconds=10, # Run after 10 seconds to allow app to fully start + ) + + logger.info("Background ingestion started for user") + return True + + +def stop_background_ingestion() -> None: + """Stop the background ingestion service.""" + global _background_service + + if _background_service: + _background_service.stop() + _background_service = None + logger.info("Background ingestion service stopped") + + +def schedule_periodic_full_scan(ctx: ElroyContext) -> None: + """Schedule periodic full directory scans. + + Args: + ctx: The Elroy context + """ + + def full_scan_task(scan_ctx: ElroyContext) -> None: + """Task function for scheduled full scans.""" + try: + if should_run_full_scan(scan_ctx): + perform_full_scan(scan_ctx) + + # Reschedule next scan + next_scan_time = datetime.now() + timedelta(hours=scan_ctx.background_ingestion_full_scan_interval_hours) + scheduler = get_scheduler() + scheduler.add_job( + lambda: full_scan_task(scan_ctx), + "date", + run_date=next_scan_time, + id=f"background_full_scan_{scan_ctx.user_id}", + replace_existing=True, + ) + + except Exception as e: + logger.error(f"Error in scheduled full scan: {str(e)}", exc_info=True) + + # Schedule the first scan + next_scan_time = datetime.now() + timedelta(hours=ctx.background_ingestion_full_scan_interval_hours) + scheduler = get_scheduler() + scheduler.add_job( + lambda: full_scan_task(ctx), "date", run_date=next_scan_time, id=f"background_full_scan_{ctx.user_id}", replace_existing=True + ) + + logger.debug(f"Scheduled periodic full scan for user {ctx.user_id}") + + +def restart_background_ingestion_for_user(ctx: ElroyContext) -> bool: + """Restart background ingestion service for a user. + + This is useful when configuration changes require a restart. + + Args: + ctx: The Elroy context + + Returns: + True if restarted successfully, False otherwise + """ + global _background_service + + # Stop existing service + if _background_service: + _background_service.stop() + _background_service = None + + # Remove existing scheduled tasks for this user + scheduler = get_scheduler() + try: + scheduler.remove_job(f"background_full_scan_{ctx.user_id}") + except: + pass # Job might not exist + + # Start fresh + return start_background_ingestion_for_user(ctx) + + +def get_background_ingestion_status() -> dict: + """Get the current status of the background ingestion service. + + Returns: + Dictionary with service status information + """ + global _background_service + + return { + "service_running": _background_service is not None and _background_service.is_service_running(), + "service_exists": _background_service is not None, + } diff --git a/elroy/core/ctx.py b/elroy/core/ctx.py index 2195b7ef..d0fe67e9 100644 --- a/elroy/core/ctx.py +++ b/elroy/core/ctx.py @@ -81,9 +81,14 @@ def __init__( reflect: bool, shell_commands: bool, allowed_shell_command_prefixes: List[str], + # Background Document Ingestion + background_ingestion_enabled: bool, + background_ingestion_full_scan_interval_hours: int, ): self.allowed_shell_command_prefixes = [re.compile(f"^{p}") for p in allowed_shell_command_prefixes] self.shell_commands = shell_commands + self.background_ingestion_enabled = background_ingestion_enabled + self.background_ingestion_full_scan_interval_hours = background_ingestion_full_scan_interval_hours self.params = SimpleNamespace(**{k: v for k, v in locals().items() if k != "self"}) diff --git a/elroy/db/db_models.py b/elroy/db/db_models.py index 707028aa..03635245 100644 --- a/elroy/db/db_models.py +++ b/elroy/db/db_models.py @@ -249,6 +249,21 @@ class WaitlistSignup(SQLModel, table=True): platform: Optional[str] = Field(default=None, description="Platform preference (iOS/Android)") +class BackgroundIngestionConfig(SQLModel, table=True): + __table_args__ = (UniqueConstraint("user_id"), {"extend_existing": True}) + id: Optional[int] = Field(default=None, primary_key=True) + created_at: datetime = Field(default_factory=utc_now, nullable=False) + updated_at: datetime = Field(default_factory=utc_now, nullable=False) + user_id: int = Field(..., description="Elroy user for context") + watch_directory: str = Field(..., description="Directory to watch for new/changed documents") + is_active: bool = Field(default=True, description="Whether background ingestion is enabled") + recursive: bool = Field(default=True, description="Whether to recursively watch subdirectories") + include_patterns: str = Field(default="", description="Comma-separated glob patterns to include") + exclude_patterns: str = Field(default="", description="Comma-separated glob patterns to exclude") + last_full_scan: Optional[datetime] = Field(default=None, description="When the last full directory scan was completed") + last_scan_status: str = Field(default="pending", description="Status of the last scan: 'success', 'error', or 'pending'") + + def get_mem_source_options() -> Dict[str, Type[MemorySource]]: # Note, this is brittle! Should be replaced in the future with a registration process. from ..repository.context_messages.transforms import ContextMessageSetWithMessages diff --git a/elroy/db/postgres/alembic/versions/add_background_ingestion_config.py b/elroy/db/postgres/alembic/versions/add_background_ingestion_config.py new file mode 100644 index 00000000..8d11f828 --- /dev/null +++ b/elroy/db/postgres/alembic/versions/add_background_ingestion_config.py @@ -0,0 +1,46 @@ +"""add background ingestion config + +Revision ID: add_background_ingestion_config +Revises: 33a33b6314d5 +Create Date: 2025-08-25 12:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlmodel.sql.sqltypes import AutoString + +# revision identifiers, used by Alembic. +revision: str = "add_background_ingestion_config" +down_revision: Union[str, None] = "33a33b6314d5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "backgroundingestionconfig", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("watch_directory", AutoString(), nullable=False), + sa.Column("is_active", sa.Boolean(), nullable=False), + sa.Column("recursive", sa.Boolean(), nullable=False), + sa.Column("include_patterns", AutoString(), nullable=False), + sa.Column("exclude_patterns", AutoString(), nullable=False), + sa.Column("last_full_scan", sa.DateTime(), nullable=True), + sa.Column("last_scan_status", AutoString(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("user_id"), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("backgroundingestionconfig") + # ### end Alembic commands ### diff --git a/elroy/db/sqlite/alembic/versions/add_background_ingestion_config.py b/elroy/db/sqlite/alembic/versions/add_background_ingestion_config.py new file mode 100644 index 00000000..63131987 --- /dev/null +++ b/elroy/db/sqlite/alembic/versions/add_background_ingestion_config.py @@ -0,0 +1,46 @@ +"""add background ingestion config + +Revision ID: add_background_ingestion_config +Revises: 1c74c74a43e2 +Create Date: 2025-08-25 12:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlmodel.sql.sqltypes import AutoString + +# revision identifiers, used by Alembic. +revision: str = "add_background_ingestion_config" +down_revision: Union[str, None] = "1c74c74a43e2" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "backgroundingestionconfig", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("watch_directory", AutoString(), nullable=False), + sa.Column("is_active", sa.Boolean(), nullable=False), + sa.Column("recursive", sa.Boolean(), nullable=False), + sa.Column("include_patterns", AutoString(), nullable=False), + sa.Column("exclude_patterns", AutoString(), nullable=False), + sa.Column("last_full_scan", sa.DateTime(), nullable=True), + sa.Column("last_scan_status", AutoString(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("user_id"), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("backgroundingestionconfig") + # ### end Alembic commands ### diff --git a/elroy/defaults.yml b/elroy/defaults.yml index c8a9741f..6309c13c 100644 --- a/elroy/defaults.yml +++ b/elroy/defaults.yml @@ -55,3 +55,7 @@ show_memory_panel: true # Whether to display the memory panel shell_commands: false # Whether to enable shell commands allowed_shell_command_prefixes: - .* + +# Background Document Ingestion +background_ingestion_full_scan_interval_hours: 24 # How often to run a full directory scan (in hours) +background_ingestion_enabled: true # Whether background ingestion is globally enabled diff --git a/elroy/repository/background_ingestion/__init__.py b/elroy/repository/background_ingestion/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/elroy/repository/background_ingestion/operations.py b/elroy/repository/background_ingestion/operations.py new file mode 100644 index 00000000..1790e991 --- /dev/null +++ b/elroy/repository/background_ingestion/operations.py @@ -0,0 +1,161 @@ +from pathlib import Path +from typing import List, Optional + +from ...core.ctx import ElroyContext +from ...core.logging import get_logger +from ...db.db_models import BackgroundIngestionConfig +from ...utils.clock import utc_now +from .queries import get_background_ingestion_config + +logger = get_logger() + + +def create_background_ingestion_config( + ctx: ElroyContext, + watch_directory: str, + recursive: bool = True, + include_patterns: Optional[List[str]] = None, + exclude_patterns: Optional[List[str]] = None, +) -> BackgroundIngestionConfig: + """Create a new background ingestion configuration for the user. + + Args: + ctx: The Elroy context + watch_directory: Directory to watch for changes + recursive: Whether to watch subdirectories recursively + include_patterns: List of glob patterns to include (empty means include all) + exclude_patterns: List of glob patterns to exclude + + Returns: + The created background ingestion configuration + + Raises: + ValueError: If watch_directory doesn't exist or isn't a directory + """ + # Validate directory exists + dir_path = Path(watch_directory).expanduser().resolve() + if not dir_path.exists(): + raise ValueError(f"Directory does not exist: {watch_directory}") + if not dir_path.is_dir(): + raise ValueError(f"Path is not a directory: {watch_directory}") + + # Check if config already exists + existing_config = get_background_ingestion_config(ctx) + if existing_config: + raise ValueError("Background ingestion configuration already exists. Use update_background_ingestion_config to modify.") + + # Create config + config = BackgroundIngestionConfig( + user_id=ctx.user_id, + watch_directory=str(dir_path), + is_active=True, + recursive=recursive, + include_patterns=",".join(include_patterns or []), + exclude_patterns=",".join(exclude_patterns or []), + last_scan_status="pending", + ) + + config = ctx.db.persist(config) + logger.info(f"Created background ingestion config for directory: {watch_directory}") + return config + + +def update_background_ingestion_config( + ctx: ElroyContext, + watch_directory: Optional[str] = None, + is_active: Optional[bool] = None, + recursive: Optional[bool] = None, + include_patterns: Optional[List[str]] = None, + exclude_patterns: Optional[List[str]] = None, +) -> BackgroundIngestionConfig: + """Update an existing background ingestion configuration. + + Args: + ctx: The Elroy context + watch_directory: New directory to watch (optional) + is_active: Enable/disable background ingestion (optional) + recursive: Whether to watch subdirectories recursively (optional) + include_patterns: List of glob patterns to include (optional) + exclude_patterns: List of glob patterns to exclude (optional) + + Returns: + The updated background ingestion configuration + + Raises: + ValueError: If no configuration exists or watch_directory is invalid + """ + config = get_background_ingestion_config(ctx) + if not config: + raise ValueError("No background ingestion configuration exists. Use create_background_ingestion_config first.") + + # Validate new directory if provided + if watch_directory is not None: + dir_path = Path(watch_directory).expanduser().resolve() + if not dir_path.exists(): + raise ValueError(f"Directory does not exist: {watch_directory}") + if not dir_path.is_dir(): + raise ValueError(f"Path is not a directory: {watch_directory}") + config.watch_directory = str(dir_path) + + # Update other fields + if is_active is not None: + config.is_active = is_active + if recursive is not None: + config.recursive = recursive + if include_patterns is not None: + config.include_patterns = ",".join(include_patterns) + if exclude_patterns is not None: + config.exclude_patterns = ",".join(exclude_patterns) + + config.updated_at = utc_now() + config = ctx.db.persist(config) + logger.info(f"Updated background ingestion config") + return config + + +def delete_background_ingestion_config(ctx: ElroyContext) -> bool: + """Delete the background ingestion configuration for the user. + + Args: + ctx: The Elroy context + + Returns: + True if configuration was deleted, False if no configuration existed + """ + config = get_background_ingestion_config(ctx) + if not config: + return False + + ctx.db.delete(config) + ctx.db.commit() + logger.info("Deleted background ingestion config") + return True + + +def mark_scan_completed(ctx: ElroyContext, success: bool = True) -> None: + """Mark a background ingestion scan as completed. + + Args: + ctx: The Elroy context + success: Whether the scan completed successfully + """ + config = get_background_ingestion_config(ctx) + if config: + config.last_full_scan = utc_now() + config.last_scan_status = "success" if success else "error" + config.updated_at = utc_now() + ctx.db.persist(config) + + +def get_include_exclude_patterns(config: BackgroundIngestionConfig) -> tuple[List[str], List[str]]: + """Parse include and exclude patterns from configuration. + + Args: + config: The background ingestion configuration + + Returns: + Tuple of (include_patterns, exclude_patterns) + """ + include_patterns = [p.strip() for p in config.include_patterns.split(",") if p.strip()] + exclude_patterns = [p.strip() for p in config.exclude_patterns.split(",") if p.strip()] + return include_patterns, exclude_patterns diff --git a/elroy/repository/background_ingestion/queries.py b/elroy/repository/background_ingestion/queries.py new file mode 100644 index 00000000..093117d4 --- /dev/null +++ b/elroy/repository/background_ingestion/queries.py @@ -0,0 +1,34 @@ +from typing import Optional + +from sqlmodel import select + +from ...core.ctx import ElroyContext +from ...db.db_models import BackgroundIngestionConfig + + +def get_background_ingestion_config(ctx: ElroyContext) -> Optional[BackgroundIngestionConfig]: + """Get the background ingestion configuration for the current user. + + Args: + ctx: The Elroy context + + Returns: + The background ingestion configuration, or None if not configured + """ + return ctx.db.exec(select(BackgroundIngestionConfig).where(BackgroundIngestionConfig.user_id == ctx.user_id)).first() + + +def get_active_background_ingestion_config(ctx: ElroyContext) -> Optional[BackgroundIngestionConfig]: + """Get the active background ingestion configuration for the current user. + + Args: + ctx: The Elroy context + + Returns: + The active background ingestion configuration, or None if not configured or inactive + """ + return ctx.db.exec( + select(BackgroundIngestionConfig).where( + BackgroundIngestionConfig.user_id == ctx.user_id, BackgroundIngestionConfig.is_active == True + ) + ).first() diff --git a/elroy/repository/background_ingestion/tools.py b/elroy/repository/background_ingestion/tools.py new file mode 100644 index 00000000..f7fceafd --- /dev/null +++ b/elroy/repository/background_ingestion/tools.py @@ -0,0 +1,177 @@ +from pathlib import Path +from typing import Optional + +from ...core.constants import tool +from ...core.ctx import ElroyContext +from .operations import ( + create_background_ingestion_config, + delete_background_ingestion_config, + get_include_exclude_patterns, + update_background_ingestion_config, +) +from .queries import get_background_ingestion_config + + +@tool +def setup_background_ingestion( + ctx: ElroyContext, + directory: str, + recursive: bool = True, + include_patterns: Optional[str] = None, + exclude_patterns: Optional[str] = None, +) -> str: + """Set up background document ingestion for a directory. + + This enables automatic monitoring and ingestion of documents in the specified directory. + Files will be ingested automatically when they are created or modified. + + Args: + directory: Path to the directory to monitor for documents + recursive: Whether to also monitor subdirectories (default: True) + include_patterns: Comma-separated glob patterns for files to include (e.g. "*.md,*.txt") + exclude_patterns: Comma-separated glob patterns for files to exclude (e.g. "*.log,*.tmp") + + Returns: + Confirmation message about the setup + """ + try: + # Convert directory to absolute path + dir_path = Path(directory).expanduser().resolve() + + # Parse patterns + include_list = [p.strip() for p in include_patterns.split(",")] if include_patterns else [] + exclude_list = [p.strip() for p in exclude_patterns.split(",")] if exclude_patterns else [] + + # Check if config already exists + existing_config = get_background_ingestion_config(ctx) + if existing_config: + return f"Background ingestion is already configured for directory: {existing_config.watch_directory}. Use update_background_ingestion to modify settings." + + # Create new configuration + config = create_background_ingestion_config( + ctx=ctx, + watch_directory=str(dir_path), + recursive=recursive, + include_patterns=include_list, + exclude_patterns=exclude_list, + ) + + pattern_info = [] + if include_list: + pattern_info.append(f"including patterns: {', '.join(include_list)}") + if exclude_list: + pattern_info.append(f"excluding patterns: {', '.join(exclude_list)}") + + pattern_str = f" ({', '.join(pattern_info)})" if pattern_info else "" + recursive_str = " (recursive)" if recursive else " (non-recursive)" + + return f"Background ingestion enabled for directory: {dir_path}{recursive_str}{pattern_str}. Documents will be automatically ingested when created or modified." + + except ValueError as e: + return f"Error setting up background ingestion: {str(e)}" + + +@tool +def update_background_ingestion( + ctx: ElroyContext, + directory: Optional[str] = None, + enable: Optional[bool] = None, + recursive: Optional[bool] = None, + include_patterns: Optional[str] = None, + exclude_patterns: Optional[str] = None, +) -> str: + """Update background document ingestion settings. + + Args: + directory: New directory to monitor (optional) + enable: Enable or disable background ingestion (optional) + recursive: Whether to monitor subdirectories recursively (optional) + include_patterns: Comma-separated glob patterns for files to include (optional) + exclude_patterns: Comma-separated glob patterns for files to exclude (optional) + + Returns: + Confirmation message about the update + """ + try: + # Parse patterns if provided + include_list = [p.strip() for p in include_patterns.split(",")] if include_patterns else None + exclude_list = [p.strip() for p in exclude_patterns.split(",")] if exclude_patterns else None + + # Update configuration + config = update_background_ingestion_config( + ctx=ctx, + watch_directory=directory, + is_active=enable, + recursive=recursive, + include_patterns=include_list, + exclude_patterns=exclude_list, + ) + + return f"Background ingestion settings updated. Currently monitoring: {config.watch_directory} ({'enabled' if config.is_active else 'disabled'})" + + except ValueError as e: + return f"Error updating background ingestion: {str(e)}" + + +@tool +def get_background_ingestion_status(ctx: ElroyContext) -> str: + """Get the current status of background document ingestion. + + Returns: + Current background ingestion configuration and status + """ + config = get_background_ingestion_config(ctx) + + if not config: + return "Background document ingestion is not configured. Use setup_background_ingestion to enable it." + + status = "enabled" if config.is_active else "disabled" + recursive_str = "recursive" if config.recursive else "non-recursive" + + include_patterns, exclude_patterns = get_include_exclude_patterns(config) + + status_parts = [ + f"Background document ingestion is {status}", + f"Monitoring directory: {config.watch_directory} ({recursive_str})", + ] + + if include_patterns: + status_parts.append(f"Including patterns: {', '.join(include_patterns)}") + if exclude_patterns: + status_parts.append(f"Excluding patterns: {', '.join(exclude_patterns)}") + + if config.last_full_scan: + status_parts.append(f"Last full scan: {config.last_full_scan} (status: {config.last_scan_status})") + else: + status_parts.append("No full scan completed yet") + + return "\n".join(status_parts) + + +@tool +def disable_background_ingestion(ctx: ElroyContext) -> str: + """Disable background document ingestion without removing the configuration. + + Returns: + Confirmation message + """ + try: + config = update_background_ingestion_config(ctx=ctx, is_active=False) + return f"Background ingestion disabled for directory: {config.watch_directory}. Use update_background_ingestion to re-enable." + except ValueError as e: + return f"Error disabling background ingestion: {str(e)}" + + +@tool +def remove_background_ingestion(ctx: ElroyContext) -> str: + """Completely remove background document ingestion configuration. + + Returns: + Confirmation message + """ + success = delete_background_ingestion_config(ctx) + + if success: + return "Background document ingestion configuration removed completely." + else: + return "No background ingestion configuration found to remove." diff --git a/elroy/tools/tools_and_commands.py b/elroy/tools/tools_and_commands.py index 53fa664d..09d28202 100644 --- a/elroy/tools/tools_and_commands.py +++ b/elroy/tools/tools_and_commands.py @@ -13,6 +13,13 @@ ) from ..core.constants import IS_ENABLED, user_only_tool from ..core.ctx import ElroyContext +from ..repository.background_ingestion.tools import ( + disable_background_ingestion, + get_background_ingestion_status, + remove_background_ingestion, + setup_background_ingestion, + update_background_ingestion, +) from ..repository.context_messages.operations import ( pop, refresh_system_instructions, @@ -96,6 +103,11 @@ get_source_documents, get_user_preferred_name, set_user_preferred_name, + setup_background_ingestion, + update_background_ingestion, + get_background_ingestion_status, + disable_background_ingestion, + remove_background_ingestion, } USER_ONLY_COMMANDS = { tail_elroy_logs, diff --git a/pyproject.toml b/pyproject.toml index 8c2abd50..a618faee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "apscheduler>=3.11.0", "fastapi>=0.104.0", "uvicorn>=0.24.0", + "watchdog>=4.0.0", ] [project.optional-dependencies] diff --git a/tests/repository/background_ingestion/__init__.py b/tests/repository/background_ingestion/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/repository/background_ingestion/test_background_ingestion.py b/tests/repository/background_ingestion/test_background_ingestion.py new file mode 100644 index 00000000..9efdc6e4 --- /dev/null +++ b/tests/repository/background_ingestion/test_background_ingestion.py @@ -0,0 +1,259 @@ +import time +from pathlib import Path + +import pytest + +from elroy.core.ctx import ElroyContext +from elroy.repository.background_ingestion.operations import ( + create_background_ingestion_config, + delete_background_ingestion_config, + get_include_exclude_patterns, + mark_scan_completed, + update_background_ingestion_config, +) +from elroy.repository.background_ingestion.queries import ( + get_active_background_ingestion_config, + get_background_ingestion_config, +) +from elroy.repository.background_ingestion.tools import ( + get_background_ingestion_status, + remove_background_ingestion, + setup_background_ingestion, +) + + +def test_create_background_ingestion_config(ctx: ElroyContext, tmpdir: str): + """Test creating a background ingestion configuration.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + config = create_background_ingestion_config( + ctx=ctx, + watch_directory=str(watch_dir), + recursive=True, + include_patterns=["*.md", "*.txt"], + exclude_patterns=["*.log"], + ) + + assert config.user_id == ctx.user_id + assert config.watch_directory == str(watch_dir.resolve()) + assert config.is_active == True + assert config.recursive == True + assert config.include_patterns == "*.md,*.txt" + assert config.exclude_patterns == "*.log" + assert config.last_scan_status == "pending" + + +def test_create_config_invalid_directory(ctx: ElroyContext): + """Test creating config with invalid directory raises error.""" + with pytest.raises(ValueError, match="Directory does not exist"): + create_background_ingestion_config(ctx=ctx, watch_directory="/non/existent/path") + + +def test_create_config_duplicate_raises_error(ctx: ElroyContext, tmpdir: str): + """Test creating duplicate config raises error.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + # Create first config + create_background_ingestion_config(ctx=ctx, watch_directory=str(watch_dir)) + + # Attempt to create second config should fail + with pytest.raises(ValueError, match="Background ingestion configuration already exists"): + create_background_ingestion_config(ctx=ctx, watch_directory=str(watch_dir)) + + +def test_update_background_ingestion_config(ctx: ElroyContext, tmpdir: str): + """Test updating background ingestion configuration.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + # Create initial config + config = create_background_ingestion_config(ctx=ctx, watch_directory=str(watch_dir)) + original_updated_at = config.updated_at + + # Wait a bit to ensure updated_at changes + time.sleep(0.1) + + # Update config + updated_config = update_background_ingestion_config( + ctx=ctx, + is_active=False, + include_patterns=["*.py"], + ) + + assert updated_config.id == config.id + assert updated_config.is_active == False + assert updated_config.include_patterns == "*.py" + assert updated_config.recursive == True # Should remain unchanged + assert updated_config.updated_at > original_updated_at + + +def test_update_nonexistent_config_raises_error(ctx: ElroyContext): + """Test updating nonexistent config raises error.""" + with pytest.raises(ValueError, match="No background ingestion configuration exists"): + update_background_ingestion_config(ctx=ctx, is_active=False) + + +def test_get_include_exclude_patterns(): + """Test parsing include/exclude patterns.""" + from elroy.db.db_models import BackgroundIngestionConfig + + config = BackgroundIngestionConfig( + user_id=1, + watch_directory="/test", + is_active=True, + recursive=True, + include_patterns="*.md, *.txt,*.py ", + exclude_patterns=" *.log,*.tmp , *.bak", + last_scan_status="pending", + ) + + include_patterns, exclude_patterns = get_include_exclude_patterns(config) + + assert include_patterns == ["*.md", "*.txt", "*.py"] + assert exclude_patterns == ["*.log", "*.tmp", "*.bak"] + + +def test_get_include_exclude_patterns_empty(): + """Test parsing empty patterns.""" + from elroy.db.db_models import BackgroundIngestionConfig + + config = BackgroundIngestionConfig( + user_id=1, + watch_directory="/test", + is_active=True, + recursive=True, + include_patterns="", + exclude_patterns="", + last_scan_status="pending", + ) + + include_patterns, exclude_patterns = get_include_exclude_patterns(config) + + assert include_patterns == [] + assert exclude_patterns == [] + + +def test_mark_scan_completed(ctx: ElroyContext, tmpdir: str): + """Test marking scan as completed.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + config = create_background_ingestion_config(ctx=ctx, watch_directory=str(watch_dir)) + assert config.last_full_scan is None + assert config.last_scan_status == "pending" + + mark_scan_completed(ctx, success=True) + + updated_config = get_background_ingestion_config(ctx) + assert updated_config.last_full_scan is not None + assert updated_config.last_scan_status == "success" + + mark_scan_completed(ctx, success=False) + + updated_config = get_background_ingestion_config(ctx) + assert updated_config.last_scan_status == "error" + + +def test_delete_background_ingestion_config(ctx: ElroyContext, tmpdir: str): + """Test deleting background ingestion configuration.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + # Create config + create_background_ingestion_config(ctx=ctx, watch_directory=str(watch_dir)) + assert get_background_ingestion_config(ctx) is not None + + # Delete config + success = delete_background_ingestion_config(ctx) + assert success == True + assert get_background_ingestion_config(ctx) is None + + # Delete nonexistent config + success = delete_background_ingestion_config(ctx) + assert success == False + + +def test_get_active_background_ingestion_config(ctx: ElroyContext, tmpdir: str): + """Test getting active background ingestion configuration.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + # No config initially + assert get_active_background_ingestion_config(ctx) is None + + # Create active config + create_background_ingestion_config(ctx=ctx, watch_directory=str(watch_dir)) + active_config = get_active_background_ingestion_config(ctx) + assert active_config is not None + assert active_config.is_active == True + + # Deactivate config + update_background_ingestion_config(ctx=ctx, is_active=False) + assert get_active_background_ingestion_config(ctx) is None + + +def test_setup_background_ingestion_tool(ctx: ElroyContext, tmpdir: str): + """Test setup_background_ingestion tool.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + result = setup_background_ingestion( + ctx=ctx, directory=str(watch_dir), recursive=True, include_patterns="*.md,*.txt", exclude_patterns="*.log" + ) + + assert "Background ingestion enabled" in result + assert str(watch_dir) in result + + # Verify config was created + config = get_background_ingestion_config(ctx) + assert config is not None + assert config.include_patterns == "*.md,*.txt" + assert config.exclude_patterns == "*.log" + + +def test_setup_background_ingestion_duplicate(ctx: ElroyContext, tmpdir: str): + """Test setup tool with existing config.""" + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + + # Create first config + setup_background_ingestion(ctx=ctx, directory=str(watch_dir)) + + # Try to create second config + result = setup_background_ingestion(ctx=ctx, directory=str(watch_dir)) + assert "already configured" in result + + +def test_get_background_ingestion_status_tool(ctx: ElroyContext, tmpdir: str): + """Test get_background_ingestion_status tool.""" + # No config initially + result = get_background_ingestion_status(ctx) + assert "not configured" in result + + # Create config + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + setup_background_ingestion(ctx=ctx, directory=str(watch_dir), include_patterns="*.md") + + result = get_background_ingestion_status(ctx) + assert "enabled" in result + assert str(watch_dir) in result + assert "*.md" in result + + +def test_remove_background_ingestion_tool(ctx: ElroyContext, tmpdir: str): + """Test remove_background_ingestion tool.""" + # Remove nonexistent config + result = remove_background_ingestion(ctx) + assert "No background ingestion configuration found" in result + + # Create and remove config + watch_dir = Path(tmpdir) / "watch_dir" + watch_dir.mkdir() + setup_background_ingestion(ctx=ctx, directory=str(watch_dir)) + + result = remove_background_ingestion(ctx) + assert "removed completely" in result + assert get_background_ingestion_config(ctx) is None diff --git a/uv.lock b/uv.lock index 34464c76..bc868a5f 100644 --- a/uv.lock +++ b/uv.lock @@ -682,6 +682,7 @@ dependencies = [ { name = "toolz" }, { name = "typer" }, { name = "uvicorn" }, + { name = "watchdog" }, ] [package.optional-dependencies] @@ -777,6 +778,7 @@ requires-dist = [ { name = "typer", specifier = "==0.12.5" }, { name = "uvicorn", specifier = ">=0.24.0" }, { name = "vulture", marker = "extra == 'dev'", specifier = ">=2.11" }, + { name = "watchdog", specifier = ">=4.0.0" }, ] provides-extras = ["dev", "docs", "tracing"]