From d163d30775b484e392d9372cb3f51d89d644e793 Mon Sep 17 00:00:00 2001 From: drondeseries Date: Tue, 3 Feb 2026 14:12:52 -0500 Subject: [PATCH] perf: major optimizations for scanning, concurrency, and notifications - Added parallel scan monitoring to prevent queue blocking - Implemented non-blocking async webhooks - Added smart notification grouping to reduce Discord spam - Implemented HTTP connection pooling for Plex/Jellyfin APIs - Added in-place ignore pruning for faster directory traversal - Enabled SQLite WAL mode and batch pruning for better DB performance - Added direct Plex Web links to Discord notifications - Added non-blocking background cache warmup - Optimized ignore pattern matching with pre-compiled regex - Improved path normalization and immediate cache synchronization - Added graceful shutdown handling for SIGTERM/SIGINT - Enabled global stuck file detection for watcher/webhooks --- omniscan_pkg/main.py | 25 +++- omniscan_pkg/models.py | 12 +- omniscan_pkg/scanner.py | 294 ++++++++++++++++++++++++++++++++++------ omniscan_pkg/watcher.py | 20 ++- omniscan_pkg/web.py | 4 +- 5 files changed, 307 insertions(+), 48 deletions(-) diff --git a/omniscan_pkg/main.py b/omniscan_pkg/main.py index e9a43b6..18572ca 100644 --- a/omniscan_pkg/main.py +++ b/omniscan_pkg/main.py @@ -142,7 +142,7 @@ def main(): for library_id, folder_path in sorted_folders: scanner.trigger_scan(library_id, folder_path) - asyncio.run(stats.send_discord_summary()) + scanner._run_async(stats.send_discord_summary()) else: logger.error(f"Path not found: {path}") @@ -166,7 +166,7 @@ def main(): # Default: Scheduled Mode logger.info(f"Will run every {BOLD}{config['RUN_INTERVAL']}{RESET} hours") - if config['RUN_ON_STARTUP']: + if config.get('RUN_ON_STARTUP'): scanner.run_scan() if config['START_TIME']: @@ -181,9 +181,26 @@ def main(): else: schedule.every(config['RUN_INTERVAL']).hours.do(scanner.run_scan) - while True: + # Graceful Shutdown Handling + import signal + stop_event = threading.Event() + + def signal_handler(signum, frame): + logger.info(f"🛑 Received signal {signum}, stopping...") + stop_event.set() + # Attempt to stop watcher if running + # (Watcher runs in main thread if enabled, but here we are in main thread too?) + # Actually start_watcher blocks if watch mode is on. + # But if we are in schedule mode, we are in the loop below. + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + while not stop_event.is_set(): schedule.run_pending() - time.sleep(60) + time.sleep(1) + + logger.info("👋 Omniscan shutdown complete.") if __name__ == '__main__': main() diff --git a/omniscan_pkg/models.py b/omniscan_pkg/models.py index 378c38a..33a1765 100644 --- a/omniscan_pkg/models.py +++ b/omniscan_pkg/models.py @@ -18,9 +18,12 @@ def __init__(self, db_file='history.db'): self._init_db() def _init_db(self): + self.prune_counter = 0 with self.lock: try: conn = sqlite3.connect(self.db_file) + # Enable WAL mode for better concurrency + conn.execute('PRAGMA journal_mode=WAL;') cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS stuck_files ( @@ -51,8 +54,13 @@ def add_event(self, event_type, details, status): conn = sqlite3.connect(self.db_file) cursor = conn.cursor() cursor.execute('INSERT INTO events (timestamp, event_type, details, status) VALUES (?, ?, ?, ?)', (timestamp, event_type, details, status)) - # Prune old events (keep last 20000) - cursor.execute('DELETE FROM events WHERE id NOT IN (SELECT id FROM events ORDER BY id DESC LIMIT 20000)') + + # Prune old events periodically (every 100 inserts) to reduce overhead + self.prune_counter += 1 + if self.prune_counter >= 100: + cursor.execute('DELETE FROM events WHERE id NOT IN (SELECT id FROM events ORDER BY id DESC LIMIT 20000)') + self.prune_counter = 0 + conn.commit() conn.close() except Exception as e: diff --git a/omniscan_pkg/scanner.py b/omniscan_pkg/scanner.py index bc2b896..36df304 100644 --- a/omniscan_pkg/scanner.py +++ b/omniscan_pkg/scanner.py @@ -28,24 +28,51 @@ logger = logging.getLogger(__name__) +import re + class PlexScanner: def __init__(self, config): self.config = config self.plex = None + + # Compile ignore patterns for performance + self.ignore_regex = None + if config.get('IGNORE_PATTERNS'): + # Convert glob patterns to regex + # fnmatch.translate converts glob to regex, we join them with OR + try: + patterns = [fnmatch.translate(p) for p in config['IGNORE_PATTERNS'] if p.strip()] + if patterns: + self.ignore_regex = re.compile('|'.join(patterns)) + except Exception as e: + logger.error(f"Failed to compile ignore patterns: {e}") + self.history = StuckFileTracker() self.library_ids = {} self.library_paths = {} self.library_sections_cache = [] self.library_files = {} # Changed to dict for easier clearing self.library_files_lock = threading.Lock() + self.loading_libraries = set() + self.loading_lock = threading.Lock() self.pending_scans = {} self.pending_scans_lock = threading.Lock() self.pending_notifications = defaultdict(lambda: {'added': [], 'deleted': [], 'library_title': ''}) self.last_health_results = [] # Store last 20 health check results self.is_scanning = False # Track if a full scan is currently running + # Persistent session for connection pooling + self.http_session = requests.Session() + self.http_session.headers.update({ + 'User-Agent': 'Omniscan/1.0', + 'Accept': 'application/json' + }) + # Executor for processing file events asynchronously self.event_executor = ThreadPoolExecutor(max_workers=config.get('SCAN_WORKERS', 4)) + + # Executor for monitoring Plex scans without blocking the queue + self.scan_monitor_executor = ThreadPoolExecutor(max_workers=4) # Start the background worker for debounced scans self.worker_thread = threading.Thread(target=self._process_scan_queue, daemon=True) @@ -56,17 +83,17 @@ def _send_discord_embed(self, embed): if not self.config['NOTIFICATIONS_ENABLED'] or not self.config.get('DISCORD_WEBHOOK_URL'): return - try: - async def _send(): + async def _send(): + try: import aiohttp async with aiohttp.ClientSession() as session: from discord import Webhook webhook = Webhook.from_url(self.config['DISCORD_WEBHOOK_URL'], session=session) await send_discord_webhook(webhook, embed, self.config) - - asyncio.run(_send()) - except Exception as e: - logger.error(f"Failed to send notification: {e}") + except Exception as e: + logger.error(f"Failed to send notification in coroutine: {e}") + + self._run_async(_send()) def send_single_notification(self, title, description, color): """Send a single-event notification to Discord.""" @@ -89,7 +116,7 @@ def connect_to_plex(self, retry=True): logger.error("PLEX_SERVER or PLEX_TOKEN not configured.") return None - self.plex = PlexServer(self.config['PLEX_URL'], self.config['TOKEN']) + self.plex = PlexServer(self.config['PLEX_URL'], self.config['TOKEN'], session=self.http_session) # Test connection logger.info(f"Connected to Plex: {self.plex.friendlyName} (v{self.plex.version})") return self.plex @@ -102,13 +129,17 @@ def connect_to_plex(self, retry=True): retry_delay = min(retry_delay * 2, max_delay) def is_ignored(self, file_path): - """Check if file matches any ignore pattern.""" - filename = os.path.basename(file_path) - for pattern in self.config['IGNORE_PATTERNS']: - if fnmatch.fnmatch(filename, pattern): - return True - if fnmatch.fnmatch(file_path, pattern): - return True + """Check if file matches any ignore pattern using compiled regex.""" + if not self.ignore_regex: + return False + + # Check both filename and full path to match standard behavior + # (Usually full path match is what users want for folders like /RecycleBin) + if self.ignore_regex.match(file_path): + return True + if self.ignore_regex.match(os.path.basename(file_path)): + return True + return False def get_library_ids(self): @@ -172,13 +203,14 @@ def get_library_id_for_path(self, file_path): best_match = None best_match_length = 0 + normalized_scan_path = os.path.normpath(file_path) + for section in self.library_sections_cache: section_id = section['id'] section_title = section['title'] section_type = section['type'] for location_path in section['locations']: - normalized_scan_path = os.path.normpath(file_path) normalized_location = os.path.normpath(location_path) if normalized_scan_path.startswith(normalized_location): @@ -214,7 +246,7 @@ def cache_library_files(self, library_id): for media in item.media: for part in media.parts: if part.file: - new_files.add(part.file) + new_files.add(os.path.normpath(part.file)) count += 1 # Clear items list immediately to free memory @@ -228,18 +260,47 @@ def cache_library_files(self, library_id): except Exception as e: logger.error(f"Error caching library {library_id}: {str(e)}") + def _trigger_cache_fill(self, library_id): + with self.loading_lock: + if library_id in self.loading_libraries: + return + self.loading_libraries.add(library_id) + + self.event_executor.submit(self._background_cache_fill, library_id) + + def _background_cache_fill(self, library_id): + try: + self.cache_library_files(library_id) + finally: + with self.loading_lock: + self.loading_libraries.discard(library_id) + def is_in_library(self, file_path): """Check if a file exists in the media server.""" server_type = self.config.get('SERVER_TYPE', 'plex') - # Check cache if it exists (usually during full scan) + # Check cache if it exists library_id, library_title, _ = self.get_library_id_for_path(file_path) if library_id: + norm_path = os.path.normpath(file_path) + + # Ensure cache is loaded + with self.library_files_lock: + cache_filled = library_id in self.library_files and self.library_files[library_id] + + if not cache_filled: + self._trigger_cache_fill(library_id) + # Fallback to direct API check while cache warms up + if server_type == 'plex': + return self._is_in_plex_api(file_path, library_id) + elif server_type in ['jellyfin', 'emby']: + return self._is_in_jellyfin_api(file_path, library_id) + with self.library_files_lock: if library_id in self.library_files and self.library_files[library_id]: - return file_path in self.library_files[library_id] + return norm_path in self.library_files[library_id] - # If cache is empty or missing, fallback to direct API check (memory efficient for watcher) + # If cache check failed or library not found in cache, fallback to direct API check if server_type == 'plex': return self._is_in_plex_api(file_path, library_id) elif server_type in ['jellyfin', 'emby']: @@ -256,9 +317,22 @@ def _is_in_plex_api(self, file_path, library_id=None): if not library_id: return False section = self.plex.library.sectionByID(int(library_id)) - # Search by filepath (Plex supports this filter) - results = section.search(filepath=file_path) - return len(results) > 0 + + # Use libtype based on section type for more accurate search + libtype = 'episode' if section.type == 'show' else 'movie' + + # Search by filename in title field as a fallback + filename = os.path.basename(file_path) + results = section.search(title=filename, libtype=libtype) + + norm_target = os.path.normpath(file_path) + for item in results: + if hasattr(item, 'media'): + for media in item.media: + for part in media.parts: + if os.path.normpath(part.file) == norm_target: + return True + return False except Exception as e: logger.debug(f"Direct Plex check failed for {file_path}: {e}") return False @@ -369,9 +443,25 @@ def trigger_scan(self, library_id, folder_path, force=False): return with self.pending_scans_lock: + is_new = (library_id, folder_path) not in self.pending_scans # Update the last event time for this (library, folder) self.pending_scans[(library_id, folder_path)] = time.time() - logger.info(f"⏳ Scan queued (debouncing): {BOLD}{folder_path}{RESET}") + if is_new: + logger.info(f"⏳ Scan queued (debouncing): {BOLD}{folder_path}{RESET}") + + def _run_async(self, coro): + """Safely run a coroutine from sync or async context.""" + try: + try: + loop = asyncio.get_running_loop() + if loop.is_running(): + loop.create_task(coro) + return + except RuntimeError: + pass + asyncio.run(coro) + except Exception as e: + logger.error(f"Async execution failed: {e}") def _process_scan_queue(self): """Background worker to process debounced scans and notifications.""" @@ -379,6 +469,7 @@ def _process_scan_queue(self): try: time.sleep(1) to_trigger = [] + ready_notifications = [] with self.pending_scans_lock: PENDING_SCANS.set(len(self.pending_scans)) @@ -390,20 +481,106 @@ def _process_scan_queue(self): library_id, folder_path = key to_trigger.append((library_id, folder_path)) - # Prepare notification + # Collect notification data notif_data = self.pending_notifications.get(folder_path) if notif_data: - self._send_grouped_notification(folder_path, notif_data) + ready_notifications.append((folder_path, notif_data)) del self.pending_notifications[folder_path] del self.pending_scans[key] + # Send a single grouped notification for all ready folders + if ready_notifications: + self._send_multi_grouped_notification(ready_notifications) + for library_id, folder_path in to_trigger: - self._do_trigger_scan(library_id, folder_path) + # Submit to monitor executor so we don't block the queue loop + self.scan_monitor_executor.submit(self._do_trigger_scan, library_id, folder_path) except Exception as e: logger.error(f"Error in scan queue worker: {e}") time.sleep(5) + def _send_multi_grouped_notification(self, notifications): + """Send a single Discord notification for multiple entities/folders.""" + if not notifications: + return + + # If only one folder, use the standard grouped notification logic + if len(notifications) == 1: + root, data = notifications[0] + self._send_grouped_notification(root, data) + return + + total_added = sum(len(d['added']) for _, d in notifications) + total_deleted = sum(len(d['deleted']) for _, d in notifications) + + color = Color.blue() + if total_added and total_deleted: color = Color.gold() + elif total_added: color = Color.green() + elif total_deleted: color = Color.red() + + embed = Embed( + title=f"📂 Bulk Update: {len(notifications)} folders", + description=f"Detected **{total_added}** additions and **{total_deleted}** deletions across multiple folders.", + color=color, + timestamp=datetime.now() + ) + + # Group by folder for fields + for root, data in notifications[:20]: # Limit to 20 folders to stay under Discord's 25 field limit + added = data['added'] + deleted = data['deleted'] + entity_name = os.path.basename(root) + + if entity_name.lower().startswith("season ") or entity_name.lower() in ["specials", "extras"]: + parent_name = os.path.basename(os.path.dirname(root)) + if parent_name: entity_name = f"{parent_name} - {entity_name}" + + msg = "" + if added: + msg += f"✅ +{len(added)}\n" + # Try to add a direct link for the first added item if possible + if len(added) == 1 and self.plex: + try: + # Best effort link generation + # We need to find the item in Plex first. + # Since we just added it, it might be in the cache or readable via API. + # We use the path to find the key. + fpath = added[0] + lid, _, _ = self.get_library_id_for_path(fpath) + if lid: + # Search by file path to get the key + # This is a bit expensive so we only do it for single item adds to be safe? + # Or we can construct a search URL. + # A direct link to the library filter is safer and faster. + + # Construct a deep link to the library filtered by folder + # This works even if the specific item ID isn't known yet + # URL format: https://app.plex.tv/desktop/#!/server/{machineIdentifier}/details?key=%2Flibrary%2Fsections%2F{lid}%2Ffolder%3Fparent%3D{quote(root)} + # Actually, linking to the folder view is more reliable for "added" events + + machine_id = self.plex.machineIdentifier + # Plex Web URL usually needs to know the specific server UUID + # We can try to generate a local link or app.plex.tv link + + # Let's link to the folder in Plex Web + # /library/sections/{id}/folder?parent={path} + encoded_root = quote(root) + link = f"https://app.plex.tv/desktop/#!/server/{machine_id}/details?key=%2Flibrary%2Fsections%2F{lid}%2Ffolder%3Fparent%3D{encoded_root}" + msg += f"[View in Plex]({link})\n" + except Exception: + pass + + if deleted: msg += f"🗑️ -{len(deleted)}\n" + + embed.add_field(name=f"📁 {entity_name}", value=msg or "No changes", inline=True) + + if len(notifications) > 20: + embed.add_field(name="...", value=f"and {len(notifications) - 20} more folders", inline=False) + + embed.set_footer(text="Omniscan Media Monitor") + self._send_discord_embed(embed) + def _send_grouped_notification(self, entity_root, data): """Send a single Discord notification for multiple file events.""" added = data['added'] @@ -426,9 +603,22 @@ def _send_grouped_notification(self, entity_root, data): elif deleted: color = Color.red() + desc = f"Changes detected in **{entity_name}**" + + # Add Plex Link if available + if self.plex and (added or deleted): + try: + lid, _, _ = self.get_library_id_for_path(entity_root) + if lid: + machine_id = self.plex.machineIdentifier + encoded_root = quote(entity_root) + link = f"https://app.plex.tv/desktop/#!/server/{machine_id}/details?key=%2Flibrary%2Fsections%2F{lid}%2Ffolder%3Fparent%3D{encoded_root}" + desc += f"\n[View in Plex]({link})" + except: pass + embed = Embed( title=f"📂 Update: {library}", - description=f"Changes detected in **{entity_name}**", + description=desc, color=color, timestamp=datetime.now() ) @@ -477,7 +667,7 @@ def _trigger_jellyfin_emby_scan(self, library_id, folder_path): } try: - response = requests.post(url, json=payload, headers=headers) + response = self.http_session.post(url, json=payload, headers=headers) response.raise_for_status() logger.info(f"🔎 {self.config['SERVER_TYPE'].capitalize()} scan triggered for: {BOLD}{folder_path}{RESET}") self.history.add_event("Scan Triggered", folder_path, self.config['SERVER_TYPE']) @@ -490,7 +680,7 @@ def _trigger_plex_scan(self, library_id, folder_path): url = f"{self.config['PLEX_URL']}/library/sections/{library_id}/refresh?path={encoded_path}&X-Plex-Token={self.config['TOKEN']}" try: - response = requests.get(url) + response = self.http_session.get(url) response.raise_for_status() logger.info(f"🔎 Plex scan triggered for: {BOLD}{folder_path}{RESET}") self.history.add_event("Scan Triggered", folder_path, "Plex") @@ -646,6 +836,10 @@ def submit_file_event(self, event_type, file_path): def scan_file(self, file_path, stats=None, tracker=None): """Scan a single file and trigger Plex refresh if missing.""" + # Fallback to global history if no specific tracker provided (e.g. for Watcher/Webhooks) + if not tracker: + tracker = self.history + if self.is_ignored(file_path): return @@ -710,6 +904,12 @@ def scan_file(self, file_path, stats=None, tracker=None): self.pending_notifications[target_path]['library_title'] = library_title self.pending_notifications[target_path]['added'].append(file_path) + # Add to cache immediately to prevent duplicate triggers for the same file + # before the Plex scan even finishes + with self.library_files_lock: + if library_id in self.library_files: + self.library_files[library_id].add(os.path.normpath(file_path)) + self.trigger_scan(library_id, target_path) else: if tracker: tracker.clear_entry(file_path) @@ -767,15 +967,28 @@ def handle_deletion(self, file_path): self.pending_notifications[target_path]['library_title'] = library_title or "Media" self.pending_notifications[target_path]['deleted'].append(file_path) + # Remove from cache immediately to prevent duplicate triggers + with self.library_files_lock: + if library_id in self.library_files: + self.library_files[library_id].discard(os.path.normpath(file_path)) + self.trigger_scan(library_id, target_path) def scan_directory(self, path, stats, tracker, folders_to_scan, folders_to_scan_lock): + # Pre-calculate cutoff time for incremental scan + cutoff_time = 0 + if self.config.get('INCREMENTAL_SCAN'): + cutoff_time = time.time() - (self.config['SCAN_SINCE_DAYS'] * 86400) + for root, dirs, files in os.walk(path, followlinks=True): + # Prune ignored directories in-place to avoid traversing them + # This is a significant optimization for large ignored trees (e.g. .git, extras) + dirs[:] = [d for d in dirs if not self.is_ignored(os.path.join(root, d))] + if self.config.get('INCREMENTAL_SCAN'): try: mtime = os.path.getmtime(root) - cutoff_time = time.time() - (self.config['SCAN_SINCE_DAYS'] * 86400) if mtime < cutoff_time: continue except OSError: @@ -940,20 +1153,25 @@ def run_scan(self): future.result() if stats.total_missing > 0: - asyncio.run(stats.send_discord_pending(len(folders_to_scan))) + self._run_async(stats.send_discord_pending(len(folders_to_scan))) sorted_folders = sorted(list(folders_to_scan), key=lambda x: x[1]) for library_id, folder_path in sorted_folders: self.trigger_scan(library_id, folder_path) tracker.save_history() - asyncio.run(stats.send_discord_summary()) + self._run_async(stats.send_discord_summary()) except Exception as e: logger.error(f"Error during scan: {e}") - finally: - self.is_scanning = False - # Clear cache after full scan to save memory - with self.library_files_lock: - self.library_files.clear() - gc.collect() # Trigger garbage collection to release memory immediately \ No newline at end of file + finally: + self.is_scanning = False + # Only clear cache if NOT in watch mode. + # If watching, we want to keep the cache hot to avoid re-fetching on every event. + if not self.config.get('WATCH_MODE'): + with self.library_files_lock: + self.library_files.clear() + gc.collect() # Trigger garbage collection to release memory + else: + logger.info("🧠 Retaining library cache for active watcher") + \ No newline at end of file diff --git a/omniscan_pkg/watcher.py b/omniscan_pkg/watcher.py index e7b31ad..e33b1d5 100644 --- a/omniscan_pkg/watcher.py +++ b/omniscan_pkg/watcher.py @@ -48,9 +48,25 @@ def start_watcher(scanner): logger.warning(f"Directory not found, cannot watch: {path}") observer.start() + + # Setup signal handling for graceful stop if running as main blocker + import signal + import threading + stop_event = threading.Event() + + def signal_handler(signum, frame): + logger.info("🛑 Watcher stopping...") + stop_event.set() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + try: - while True: + while not stop_event.is_set(): time.sleep(1) except KeyboardInterrupt: + pass + finally: observer.stop() - observer.join() + observer.join() + logger.info("👋 Watcher stopped.") diff --git a/omniscan_pkg/web.py b/omniscan_pkg/web.py index 5303683..1754271 100644 --- a/omniscan_pkg/web.py +++ b/omniscan_pkg/web.py @@ -483,11 +483,11 @@ async def webhook_trigger(request: Request): if os.path.exists(p): exists = True break - time.sleep(1) + await asyncio.sleep(1) if exists: if os.path.isfile(p): - scanner_instance.scan_file(p) + scanner_instance.submit_file_event('created', p) triggered += 1 elif os.path.isdir(p): lid, _, _ = scanner_instance.get_library_id_for_path(p)