From 63d2586cc312b4929d9fd834fe1116a358df0caf Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 5 Mar 2026 12:59:26 +0300 Subject: [PATCH] Consolidate codebase and fix TUI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix CLI hang: remove cli.main import from __init__.py, add __main__.py - Consolidate 3 TUI implementations into 1 (keep modern_interactive.py) - Delete textual_interface.py (broken CSS_PATH, duplicate) - Rewrite interactive_mode.py from 2,610 → 56 lines (thin launcher) - Remove 6 duplicate FFmpeg functions from config.py (use ffmpeg_helper) - Suppress noisy INFO/WARNING startup logs (clean output by default) - Fix build-backend in pyproject.toml for Python 3.14 compatibility - Update CLI examples, remove stale modern/textual commands Net reduction: ~3,900 lines removed, cleaner architecture. Co-Authored-By: Claude Opus 4.6 --- snatch/__init__.py | 3 +- snatch/__main__.py | 5 + snatch/audio_processor.py | 2 +- snatch/cli.py | 45 +- snatch/config.py | 245 +-- snatch/constants.py | 10 +- snatch/error_handler.py | 2 +- snatch/interactive_mode.py | 2627 +---------------------------- snatch/theme/__init__.py | 8 - snatch/theme/textual_interface.py | 1022 ----------- 10 files changed, 67 insertions(+), 3902 deletions(-) create mode 100644 snatch/__main__.py delete mode 100644 snatch/theme/textual_interface.py diff --git a/snatch/__init__.py b/snatch/__init__.py index 1bc6ea8..278e785 100644 --- a/snatch/__init__.py +++ b/snatch/__init__.py @@ -2,8 +2,7 @@ __version__ = VERSION -from .cli import main as main_app from .manager import DownloadManager from .config import load_config -__all__ = ["main_app", "DownloadManager", "load_config", "__version__"] \ No newline at end of file +__all__ = ["DownloadManager", "load_config", "__version__"] \ No newline at end of file diff --git a/snatch/__main__.py b/snatch/__main__.py new file mode 100644 index 0000000..27f5617 --- /dev/null +++ b/snatch/__main__.py @@ -0,0 +1,5 @@ +"""Allow running snatch as `python -m snatch`.""" +from snatch.cli import main + +if __name__ == "__main__": + main() diff --git a/snatch/audio_processor.py b/snatch/audio_processor.py index 933e8c9..53dfb04 100644 --- a/snatch/audio_processor.py +++ b/snatch/audio_processor.py @@ -38,7 +38,7 @@ import pyloudnorm as pyln ENHANCED_PROCESSING_AVAILABLE = True except ImportError as e: - logging.warning(f"Enhanced audio processing libraries not available: {e}") + logging.debug(f"Enhanced audio processing libraries not available: {e}") ENHANCED_PROCESSING_AVAILABLE = False # Configure logging diff --git a/snatch/cli.py b/snatch/cli.py index a1e62c8..d9b304f 100644 --- a/snatch/cli.py +++ b/snatch/cli.py @@ -54,7 +54,6 @@ def __getattr__(self, name): console = _LazyConsole() # Constants for duplicate strings -FALLBACK_INTERACTIVE_MSG = "[yellow]Falling back to enhanced interactive mode.[/]" FALLBACK_SIMPLE_MSG = "[yellow]Falling back to simple interactive mode...[/]" SKIP_CONFIRMATION_HELP = "Skip confirmation prompt" SETTING_MODIFY_HELP = "Setting to modify" @@ -319,7 +318,9 @@ def setup_argparse(self) -> typer.Typer: app = typer.Typer( name=APP_NAME, help=f"{APP_NAME} - A powerful media downloader", - epilog=EXAMPLES + epilog=EXAMPLES, + invoke_without_command=True, + no_args_is_help=True, ) # Download command @app.command("download", help="Download media from URLs") @@ -431,45 +432,11 @@ def download( return self.execute_download(all_urls, options) # Interactive mode command @app.command("interactive", help="Run in interactive mode") def interactive(): - """Run in enhanced interactive mode with cyberpunk interface""" + """Launch the interactive TUI""" from .interactive_mode import launch_enhanced_interactive_mode launch_enhanced_interactive_mode(self.config) return 0 - - # Modern interactive interface command - @app.command("modern", help="Run with modern interactive interface") - def modern(): - """Run with modern beautiful interactive interface""" - try: - from .theme.modern_interactive import run_modern_interactive - run_modern_interactive(self.config) - except ImportError as e: - console.print(f"[bold red]Modern interface not available: {str(e)}[/]") - console.print(FALLBACK_INTERACTIVE_MSG) - from .interactive_mode import launch_enhanced_interactive_mode - launch_enhanced_interactive_mode(self.config) - except Exception as e: - console.print(f"[bold red]Error launching modern interface: {str(e)}[/]") - console.print(FALLBACK_INTERACTIVE_MSG) - from .interactive_mode import launch_enhanced_interactive_mode - launch_enhanced_interactive_mode(self.config) - return 0 - # New textual interface command - @app.command("textual", help="Run with modern Textual interface") - def textual(): - """Run with modern Textual interface""" - try: - # Import at runtime to avoid dependencies if not used - from .theme.textual_interface import start_textual_interface - # Pass the current CLI instance to maintain state - start_textual_interface(self.config) - except ImportError as e: - console.print(f"[bold yellow]Textual interface not available: {str(e)}[/]") - console.print("[yellow]Falling back to enhanced interactive mode.[/]") - from .interactive_mode import launch_enhanced_interactive_mode - launch_enhanced_interactive_mode(self.config) - # List supported sites command @app.command("list-sites", help="List all supported sites") def list_sites(): @@ -2217,6 +2184,10 @@ async def _p2p_library_command(self, action: str, library_name: str, directory: def main(): """Main entry point for the CLI application""" try: + # Suppress noisy INFO/DEBUG logs unless --verbose is passed + if "--verbose" not in sys.argv and "-v" not in sys.argv: + logging.getLogger().setLevel(logging.WARNING) + # Enable Rich traceback formatting (deferred from module level) from rich.traceback import install install(show_locals=True) diff --git a/snatch/config.py b/snatch/config.py index b2ccc80..2c4c75d 100644 --- a/snatch/config.py +++ b/snatch/config.py @@ -1,14 +1,13 @@ import json import logging import os -import subprocess -import tempfile +import re import threading import time import asyncio import yt_dlp from pathlib import Path -from typing import Dict, Any, Optional, List, Tuple +from typing import Dict, Any, Optional, List from colorama import Fore, Style, init from .defaults import ( @@ -19,6 +18,7 @@ ) from .common_utils import is_windows from .manager import DownloadManager +from .ffmpeg_helper import locate_ffmpeg, get_ffmpeg_version, validate_ffmpeg_installation # Initialize colorama init(autoreset=True) @@ -34,67 +34,6 @@ _update_messages: List[str] = [] _config_lock = threading.Lock() -def _validate_ffmpeg_version(ffmpeg_path: str) -> Tuple[bool, Optional[float]]: - """Validate FFmpeg version, returns (is_valid, version_number)""" - if not ffmpeg_path: - logger.warning("FFmpeg path is not specified") - return False, None - - # Handle the case when path is a directory (like C:\ffmpeg\bin) - if os.path.isdir(ffmpeg_path): - # Try to find ffmpeg executable in the directory - ffmpeg_exe = "ffmpeg.exe" if is_windows() else "ffmpeg" - possible_paths = [ - os.path.join(ffmpeg_path, ffmpeg_exe), - os.path.join(ffmpeg_path, "bin", ffmpeg_exe) - ] - - for path in possible_paths: - if os.path.isfile(path): - logger.info(f"Found FFmpeg executable at: {path}") - ffmpeg_path = path - break - else: - logger.warning(f"FFmpeg executable not found in directory: {ffmpeg_path}") - return False, None - elif not os.path.exists(ffmpeg_path): - logger.warning(f"FFmpeg not found at path: {ffmpeg_path}") - return False, None - - if not os.path.isfile(ffmpeg_path): - logger.warning(f"FFmpeg path is not a file: {ffmpeg_path}") - return False, None - - try: - result = subprocess.run( - [ffmpeg_path, "-version"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - check=False, # Don't raise exception on non-zero exit - timeout=5 # Add timeout to prevent hanging - ) - - if result.returncode != 0: - logger.warning(f"FFmpeg validation failed with return code: {result.returncode}") - return False, None - - version_info = ( - result.stdout.splitlines()[0] if result.stdout else "" - ) - import re - match = re.search(r"version\s+(\d+\.\d+)", version_info) - if match: - version = float(match.group(1)) - logger.info(f"FFmpeg version {version} found at: {ffmpeg_path}") - return True, version - else: - logger.warning(f"Could not determine FFmpeg version from: {version_info}") - except (subprocess.SubprocessError, OSError, ValueError) as e: - logger.error(f"Error validating FFmpeg: {str(e)}") - - return False, None - def _get_default_directory(key: str) -> str: """Get default directory path for config keys""" base_dir = os.getcwd() @@ -224,11 +163,13 @@ def _run_background_init(config: dict) -> None: # Validate FFmpeg version if path exists if ffmpeg_path := config.get("ffmpeg_location"): - is_valid, version = _validate_ffmpeg_version(ffmpeg_path) - if is_valid and version and version < 4.0: - _update_messages.append( - f"FFmpeg version {version} detected. Consider updating to version 4.0 or newer." - ) + version_str = get_ffmpeg_version(ffmpeg_path) + if version_str: + match = re.search(r"version\s+(\d+\.\d+)", version_str) + if match and float(match.group(1)) < 4.0: + _update_messages.append( + f"FFmpeg version {match.group(1)} detected. Consider updating to version 4.0 or newer." + ) any_updates_found = True # Check for missing optional fields with defaults @@ -443,9 +384,7 @@ def test_functionality() -> bool: config = initialize_config_async(force_validation=True) # Check if FFmpeg is available in the config - if not config.get("ffmpeg_location") or not validate_ffmpeg_path( - config["ffmpeg_location"] - ): + if not config.get("ffmpeg_location") or not validate_ffmpeg_installation(): print(f"{Fore.RED}FFmpeg not found or invalid!{Style.RESET_ALL}") return False # Wait briefly for background validation to complete @@ -482,168 +421,6 @@ def test_functionality() -> bool: print(f"{Fore.RED}Test failed: {str(e)}{Style.RESET_ALL}") return False -def find_ffmpeg() -> Optional[str]: - """Find FFmpeg in common locations or PATH with improved cross-platform support""" - # Platform specific locations - common_locations = [] - - if is_windows(): - common_locations = [ - r"C:\ffmpeg\bin", - r"C:\Program Files\ffmpeg\bin", - r"C:\ffmpeg\ffmpeg-master-latest-win64-gpl\bin", - r".\ffmpeg\bin", # Relative to script location - ] - - # Check if ffmpeg is in PATH on Windows - try: - result = subprocess.run( - ["where", "ffmpeg"], capture_output=True, text=True, timeout=3 - ) - if result.returncode == 0: - path = result.stdout.strip().split("\n")[0] - return os.path.dirname(path) - except (subprocess.SubprocessError, FileNotFoundError): - pass - else: - common_locations = [ - "/usr/bin", - "/usr/local/bin", - "/opt/local/bin", - "/opt/homebrew/bin", - ] - - # Check if ffmpeg is in PATH on Unix-like systems - try: - result = subprocess.run( - ["which", "ffmpeg"], capture_output=True, text=True, timeout=3 - ) - if result.returncode == 0: - path = result.stdout.strip() - return os.path.dirname(path) - except (subprocess.SubprocessError, FileNotFoundError): - pass - - # Check common locations for ffmpeg binary - ffmpeg_exec = "ffmpeg.exe" if is_windows() else "ffmpeg" - for location in common_locations: - ffmpeg_path = os.path.join(location, ffmpeg_exec) - if os.path.exists(ffmpeg_path): - return location - - return None - -def _find_ffmpeg_executable(ffmpeg_location: str) -> Optional[str]: - """Find the FFmpeg executable path from a given location""" - # Handle direct path to executable - if not os.path.isdir(ffmpeg_location): - return ffmpeg_location if os.path.exists(ffmpeg_location) else None - - # If it's a directory, search for the executable - ffmpeg_exec = "ffmpeg.exe" if is_windows() else "ffmpeg" - - # Check common locations - possible_paths = [ - os.path.join(ffmpeg_location, "bin", ffmpeg_exec), # /path/to/ffmpeg/bin/ffmpeg[.exe] - os.path.join(ffmpeg_location, ffmpeg_exec), # /path/to/ffmpeg/ffmpeg[.exe] - ] - - # Return the first valid path - for path in possible_paths: - if os.path.exists(path): - return path - - # Not found - logger.debug(f"FFmpeg executable not found in {ffmpeg_location}") - return None - -def _test_ffmpeg_executable(ffmpeg_path: str) -> bool: - """Test if the FFmpeg executable works""" - try: - # Run with increased timeout and capture output - result = subprocess.run( - [ffmpeg_path, "-version"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=10, - text=True, - ) - - if result.returncode == 0 and "ffmpeg version" in result.stdout.lower(): - # Log the version for debugging - version_line = next((line for line in result.stdout.splitlines() - if "ffmpeg version" in line.lower()), "") - if version_line: - logger.info(f"FFmpeg version: {version_line.strip()}") - return True - else: - logger.warning(f"FFmpeg validation failed. Return code: {result.returncode}") - if result.stderr: - logger.debug(f"FFmpeg error output: {result.stderr}") - return False - - except subprocess.TimeoutExpired: - logger.warning("FFmpeg validation timed out after 10 seconds") - return False - except (subprocess.SubprocessError, OSError) as e: - logger.warning(f"Error validating FFmpeg: {str(e)}") - return False - -def validate_ffmpeg_path(ffmpeg_location: str) -> bool: - """ - Validate that the specified ffmpeg_location contains valid FFmpeg binaries. - - Args: - ffmpeg_location: Path to directory or direct path to FFmpeg executable - - Returns: - bool: True if valid FFmpeg binaries found, False otherwise - """ - # Early validation for empty path - if not ffmpeg_location: - logger.debug("Empty FFmpeg location provided") - return False - - # Find the executable - ffmpeg_path = _find_ffmpeg_executable(ffmpeg_location) - if not ffmpeg_path: - return False - - # Check if the path is actually executable (skip on Windows for .exe files) - is_exe_on_windows = is_windows() and ffmpeg_path.lower().endswith('.exe') - if not is_exe_on_windows and not os.access(ffmpeg_path, os.X_OK): - logger.debug(f"FFmpeg at {ffmpeg_path} is not executable") - return False - - # Test if executable works - if _test_ffmpeg_executable(ffmpeg_path): - logger.info(f"Valid FFmpeg found at: {ffmpeg_path}") - return True - - return False - -def print_ffmpeg_instructions(): - """Print instructions for installing FFmpeg with platform-specific guidance""" - print( - f"{Fore.YELLOW}FFmpeg not found! Please follow these steps to install FFmpeg:{Style.RESET_ALL}" - ) - print("\n1. Download FFmpeg:") - print(" - Visit: https://github.com/BtbN/FFmpeg-Builds/releases") - print(" - Download: ffmpeg-master-latest-win64-gpl.zip") - print("\n2. Install FFmpeg:") - print(" - Extract the downloaded zip file") - print(" - Copy the extracted folder to C:\\ffmpeg") - print(" - Ensure ffmpeg.exe is in C:\\ffmpeg\\bin") - print("\nAlternatively:") - print("- Use chocolatey: choco install ffmpeg") - print("- Use winget: winget install ffmpeg") - print("\nAfter installation, either:") - print("1. Add FFmpeg to your system PATH, or") - print("2. Update config.json with the correct ffmpeg_location") - print( - "\nFor detailed instructions, visit: https://www.wikihow.com/Install-FFmpeg-on-Windows" - ) - async def check_for_updates() -> None: """ Check if any configuration updates were detected in the background thread. diff --git a/snatch/constants.py b/snatch/constants.py index d0699d3..6768c21 100644 --- a/snatch/constants.py +++ b/snatch/constants.py @@ -46,12 +46,10 @@ # Sample command examples for help EXAMPLES = """ Examples: -snatch https://www.youtube.com/watch?v=dQw4w9WgXcQ -snatch --audio-only https://soundcloud.com/artist/track -snatch --audio-only --upmix-7.1 --denoise https://example.com/audio -snatch --resolution 1080 https://vimeo.com/video_id -snatch --textual # Launch modern textual interface -snatch --interactive # Launch classic interactive mode +snatch download https://www.youtube.com/watch?v=dQw4w9WgXcQ +snatch download --audio-only https://soundcloud.com/artist/track +snatch download --resolution 1080 https://vimeo.com/video_id +snatch interactive # Launch interactive TUI """ # Audio processing settings diff --git a/snatch/error_handler.py b/snatch/error_handler.py index ddbaf89..7c4ac3b 100644 --- a/snatch/error_handler.py +++ b/snatch/error_handler.py @@ -109,7 +109,7 @@ def _setup_logging(self) -> None: show_path=False, rich_tracebacks=True ) - console_handler.setLevel(logging.INFO) + console_handler.setLevel(logging.WARNING) console_formatter = logging.Formatter( fmt='%(message)s' ) diff --git a/snatch/interactive_mode.py b/snatch/interactive_mode.py index d8065f2..ab649b8 100644 --- a/snatch/interactive_mode.py +++ b/snatch/interactive_mode.py @@ -1,2611 +1,56 @@ -#!/usr/bin/env python3 """ -interactive_mode.py - Snatch Premium Interactive Experience V2.0 +interactive_mode.py - Launcher for the Snatch interactive TUI. -A cutting-edge, neon-styled terminal interface for media downloads with P2P support. - -Features: -- Cyberpunk neon aesthetics with animated borders -- Real-time holographic progress visualization -- Advanced 7.1 surround audio processing -- P2P distributed downloading -- Standalone audio processor mode -- Matrix-style data visualization -- AI-powered download optimization -- System resource monitoring +Tries the Textual-based modern interface first, then falls back to a +simple Rich console prompt if Textual is unavailable. """ -import sys -import time -import asyncio -import psutil -import re -import os -import json import logging -from pathlib import Path -from typing import Dict, Any, Optional, List, Union, Tuple -from datetime import datetime - -from rich import box -from rich.console import Console, RenderableType -from rich.prompt import Prompt, Confirm, IntPrompt -from rich.panel import Panel -from rich.progress import ( - Progress, SpinnerColumn, BarColumn, TextColumn, - DownloadColumn, TransferSpeedColumn, TimeRemainingColumn, - TaskProgressColumn, Group, MofNCompleteColumn -) -from rich.layout import Layout -from rich.live import Live -from textual.widgets import Markdown -from rich.align import Align -from rich.text import Text -from rich.table import Table -from rich.columns import Columns -from rich.style import Style -from rich.syntax import Syntax -from rich.traceback import install -from rich.measure import Measurement -from rich.spinner import Spinner - -from colorama import Fore, Style as ColoramaStyle, init -import typer +from typing import Dict, Any -# Import local modules -from .manager import DownloadManager, AsyncDownloadManager -from .defaults import ( - THEME, BANNER_ART, HELP_CONTENT, QUALITY_PRESETS, - SURROUND_PROFILES, SUBTITLE_STYLES -) -from .logging_config import setup_logging -from .progress import HolographicProgress -from .common_utils import sanitize_filename, format_size -from .audio_processor import EnhancedAudioProcessor -from .config_manager import AdvancedConfigManager, ConfigCategory -from .p2p import P2PManager -from .network import NetworkManager +logger = logging.getLogger(__name__) -# Textual imports for TUI features -from textual.app import App, ComposeResult -from textual.binding import Binding -from textual.widgets import ( - Button, Header, Footer, Static, Input, Label, - Checkbox, DataTable, Select, ProgressBar, - ContentSwitcher, RadioSet, RadioButton, - TextArea, Collapsible, Switch, Tabs, Tab, DirectoryTree -) -from textual.reactive import reactive -from textual.containers import Container, Vertical, Horizontal, Grid -from textual.worker import Worker, WorkerState -from textual import work -from textual.screen import Screen -# Safe wrapper for table access to prevent RowDoesNotExist errors -def safe_get_row(table, row_index, default_value=None): - """Safe getter for DataTable rows that prevents RowDoesNotExist errors""" - if not hasattr(table, "row_count") or table.row_count == 0: - return default_value - +def launch_enhanced_interactive_mode(config: Dict[str, Any]) -> None: + """Launch the interactive TUI with automatic fallback.""" try: - if hasattr(table, "get_row_at"): - return table.get_row_at(row_index) + from .theme.modern_interactive import run_modern_interactive + run_modern_interactive(config) except Exception as e: - logging.warning(f"Error accessing table row: {str(e)}") - - return default_value - -# Enable rich traceback handling -install(show_locals=True) - -# Initialize colorama and console -init(autoreset=True) -console = Console(theme=THEME) - -# Constants to reduce string duplication -UI_ELEMENTS = { - "FILES_LIST": "#files-list", - "FORMAT_TABLE": "#format-table", - "PROCESS_AUDIO": "#process-audio", - "DOWNLOAD_BUTTON": "#download-button", - "URL_INPUT": "#url-input", - "STATUS_BAR": "#status-bar" -} - -CONSTANTS = { - # UI Messages - "PRESS_ENTER": "\n[dim]Press Enter to continue[/]", - "FILE_ORG_NOT_AVAILABLE": "File organizer module not available", - - # File and Binary Names - "FFMPEG_EXE": "ffmpeg.exe", - "FFMPEG_BINARY": "ffmpeg", - - # Status Messages - "DOWNLOAD_FAILED": "Download failed", - "PROCESSING_COMPLETE": "Processing complete", - "CONVERSION_FAILED": "Conversion failed", - "NO_FILES_FOUND": "No files found to convert", - "NO_AUDIO_FILES": "No audio files found to process", - "NO_VIDEO_FILES_CONVERT": "No video files found to convert", - "NO_VIDEO_FILES_PROCESS": "No video files found to process", - "FILE_NOT_FOUND": "File not found: {file_path}", - - # User Prompts - "PLEASE_SELECT_FILE": "Please select a file to convert", - "PLEASE_SELECT_AUDIO": "Please select an audio file to process", - "PLEASE_SELECT_VIDEO_CONVERT": "Please select a video file to convert", - "PLEASE_SELECT_VIDEO_PROCESS": "Please select a video file to process", - - # Success Messages - "CONVERSION_COMPLETE": "Conversion completed successfully", - "PROCESSING_COMPLETE_SUCCESS": "Processing completed successfully", - - # Error Messages - "ERROR_CONVERTING": "Error converting file", - "ERROR_PROCESSING": "Error processing file" -} - -# Define reusable constants to avoid duplication -PRESS_ENTER_MSG = CONSTANTS["PRESS_ENTER"] -FILE_ORG_NOT_AVAILABLE_MSG = CONSTANTS["FILE_ORG_NOT_AVAILABLE"] - -# Custom exception classes -class DownloadManagerError(Exception): - """Custom exception for download manager errors""" - pass - -class ConversionError(Exception): - """Custom exception for conversion errors""" - pass - -# UI Style Constants -STYLE = { - "primary": "bold cyan", - "secondary": "bright_green", - "accent": "magenta", - "success": "bold bright_green", - "warning": "bold yellow", - "error": "bold red", - "muted": "dim", - "header": "bold bright_blue", - "title": "bold bright_white", - "label": "cyan", - "value": "bright_white", - "progress": "bright_blue" -} - -# Box Styles -BOX = { - "app": box.HEAVY, - "panel": box.ROUNDED, - "data": box.MINIMAL_HEAVY_HEAD, - "section": box.HEAVY_EDGE, - "simple": box.SIMPLE, - "double": box.DOUBLE -} - -# Border Styles -BORDERS = { - "primary": "bright_blue", - "secondary": "cyan", - "accent": "magenta", - "preview": "bright_blue", - "info": "cyan", - "matrix": "magenta", - "surround": "green", - "help": "yellow" -} - -# Layout Constants -LAYOUT = { - "header_size": 3, - "footer_size": 3, - "sidebar_size": 35, - "status_size": 3, - "main_ratio": 3, - "content_ratio": 8 -} - -# Update Intervals -REFRESH_RATE = { - "normal": 4, - "fast": 10, - "slow": 1 -} - -# UI Elements -UI = { - "spinner": "dots", - "progress_chars": "█░", - "indent": " ", - "spacer": "\n" -} - - -# Error message constants -ERROR_MESSAGES = { - "INPUT_FILE_NOT_EXIST": "Input file does not exist", - "FILE_ORG_FAILED": "File organization failed", - "NO_INPUT_FILE": "Please specify an input file", - "CONVERSION_FAILED": "Conversion failed", - "PROCESSING_FAILED": "Processing failed" -} - -def create_format_table(formats: List[Dict[str, Any]], box_style=None, border_style: str = "bright_blue") -> Table: - """Create a formatted table for displaying media format information. - - Args: - formats: List of format dictionaries with media information - box_style: Rich box style for the table - border_style: Border color style - - Returns: - Rich Table object with formatted media information - """ - table = Table( - title="Available Formats", - box=box_style or box.MINIMAL_HEAVY_HEAD, - border_style=border_style, - show_header=True, - header_style="bold magenta" - ) - - # Add columns - table.add_column("ID", style="cyan", width=6) - table.add_column("Extension", style="green", width=10) - table.add_column("Resolution", style="blue", width=12) - table.add_column("Codec", style="yellow", width=10) - table.add_column("Size", style="red", width=10) - table.add_column("Audio", style="purple", width=12) - table.add_column("FPS", style="bright_green", width=8) - - # Add format data - for fmt in formats: - table.add_row( - str(fmt.get('format_id', 'N/A')), - fmt.get('ext', 'N/A'), - f"{fmt.get('width', 'N/A')}x{fmt.get('height', 'N/A')}" if fmt.get('width') and fmt.get('height') else 'N/A', - fmt.get('vcodec', 'N/A')[:10] if fmt.get('vcodec') else 'N/A', - format_size(fmt.get('filesize', 0)) if fmt.get('filesize') else 'N/A', - f"{fmt.get('abr', 'N/A')} kbps" if fmt.get('abr') else 'N/A', - str(fmt.get('fps', 'N/A')) if fmt.get('fps') else 'N/A' - ) - - return table - -# Status classification helpers for reducing complexity -def classify_download_speed(speed_mbps: float) -> str: - """Classify download speed into status classes.""" - if speed_mbps > 5: - return "status-ok" - elif speed_mbps > 1: - return "status-warning" - else: - return "status-error" - -def classify_upload_speed(speed_mbps: float) -> str: - """Classify upload speed into status classes.""" - if speed_mbps > 2: - return "status-ok" - elif speed_mbps > 0.5: - return "status-warning" - else: - return "status-error" - -def classify_ping(ping_ms: float) -> str: - """Classify ping latency into status classes.""" - if ping_ms < 100: - return "status-ok" - elif ping_ms < 200: - return "status-warning" - else: - return "status-error" - -def classify_jitter(jitter_ms: float) -> str: - """Classify jitter into status classes.""" - if jitter_ms < 20: - return "status-ok" - elif jitter_ms < 50: - return "status-warning" - else: - return "status-error" - -def classify_packet_loss(loss_percent: float) -> str: - """Classify packet loss into status classes.""" - if loss_percent < 1: - return "status-ok" - elif loss_percent < 5: - return "status-warning" - else: - return "status-error" - -def get_overall_rating(download_mbps: float, ping_ms: float) -> str: - """Get overall network rating.""" - if download_mbps > 10 and ping_ms < 100: - return "Good" - elif download_mbps > 3: - return "Fair" - else: - return "Poor" - -def get_rating_class(rating: str) -> str: - """Get CSS class for rating.""" - if rating == "Good": - return "status-ok" - elif rating == "Fair": - return "status-warning" - else: - return "status-error" - - -class MediaInfo: - """Rich media information display with metadata and quality visualization.""" - - def __init__(self): - self._theme = THEME - self._box_style = box.HEAVY_EDGE - self._last_update = None - - def render(self, metadata: Dict[str, Any]) -> Panel: - """Generate rich panel with comprehensive media information.""" - self._last_update = datetime.now() - - # Create the main info grid - grid = Table.grid(padding=(0, 2), expand=True) - grid.add_column("Label", style="bright_blue", justify="right", width=15) - grid.add_column("Value", style="bright_white", ratio=2) - - # Basic metadata - grid.add_row("📺 Title", self._format_title(metadata.get('title', 'Unknown'))) - grid.add_row("⏱️ Duration", self._format_duration(metadata.get('duration', 0))) - if metadata.get('upload_date'): - grid.add_row("📅 Released", self._format_date(metadata['upload_date'])) - if metadata.get('uploader'): - grid.add_row("👤 Uploader", f"[cyan]{metadata['uploader']}[/]") - - # Media-specific information - if metadata.get('width') and metadata.get('height'): - grid.add_row( - "🎥 Resolution", - f"{metadata['width']}x{metadata['height']} ({self._get_quality_tag(metadata)})" - ) - - # Extended metadata - if 'ext' in metadata: - grid.add_row("📦 Format", self._format_codec_info(metadata)) - if metadata.get('filesize'): - grid.add_row("💾 Size", format_size(metadata['filesize'])) - if metadata.get('view_count'): - grid.add_row("👁️ Views", f"{metadata['view_count']:,}") - - # Format table if available - format_table = None - if metadata.get('formats'): - format_table = self._build_format_table(metadata['formats']) - - group = Group( - grid, - Text(""), # Spacer - self._build_quality_indicators(metadata), - Text(""), # Spacer - format_table if format_table else Text("") - ) - - return Panel( - group, - title="[b]📊 Media Analysis[/b]", - subtitle=f"[dim]Last updated: {self._format_relative_time(self._last_update)}[/]", - border_style="bright_blue", - box=self._box_style - ) - - def _format_title(self, title: str, max_length: int = 60) -> Text: - """Format title with smart truncation and styling.""" - return Text( - title if len(title) <= max_length else f"{title[:max_length-1]}…", - style="bright_white bold" - ) - - def _format_duration(self, seconds: Union[float, int]) -> str: - """Format duration in human-readable time format.""" - if not seconds: - return "Unknown" - - hours, remainder = divmod(int(seconds), 3600) - minutes, seconds = divmod(remainder, 60) - - if hours > 0: - return f"{hours}h {minutes}m {seconds}s" - elif minutes > 0: - return f"{minutes}m {seconds}s" - else: - return f"{seconds}s" - - def _format_date(self, date_str: str) -> str: - """Format date in a human-readable format.""" - try: - # Format YYYYMMDD to YYYY-MM-DD - if len(date_str) == 8 and date_str.isdigit(): - return f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" - return date_str - except Exception: - return date_str - - def _get_quality_tag(self, metadata: Dict[str, Any]) -> Text: - """Generate rich quality tag based on resolution.""" - height = metadata.get('height', 0) - - if height >= 2160: - return Text("4K UHD", style="bright_green bold") - elif height >= 1080: - return Text("Full HD", style="green bold") - elif height >= 720: - return Text("HD", style="yellow bold") - elif height >= 480: - return Text("SD", style="yellow") - else: - return Text("Low Quality", style="red") - - def _format_codec_info(self, metadata: Dict[str, Any]) -> Text: - """Format codec information with color highlighting.""" - parts = [] - - # File extension - if metadata.get('ext'): - parts.append(f"[bright_blue]{metadata['ext'].upper()}[/]") - - # Video codec - if metadata.get('vcodec') and metadata['vcodec'] != 'none': - codec = metadata['vcodec'] - parts.append(f"[green]{codec}[/]") - - # Audio codec - if metadata.get('acodec') and metadata['acodec'] != 'none': - codec = metadata['acodec'] - parts.append(f"[yellow]{codec}[/]") - - return Text(" • ".join(parts) if parts else "Unknown") - - def _format_relative_time(self, timestamp: datetime) -> str: - """Format timestamp relative to current time.""" - now = datetime.now() - delta = now - timestamp - - if delta.total_seconds() < 60: - return "just now" - elif delta.total_seconds() < 3600: - minutes = int(delta.total_seconds() / 60) - return f"{minutes} minute{'s' if minutes > 1 else ''} ago" - else: - return timestamp.strftime("%H:%M:%S") - - def _build_quality_indicators(self, metadata: Dict[str, Any]) -> Panel: - """Build visual quality indicators.""" - table = Table.grid(expand=True) - table.add_column("Indicator", ratio=1) - table.add_column("Value", ratio=2) - table.add_column("Scale", ratio=3) - - # Video quality indicator - if metadata.get('height'): - resolution = metadata['height'] - max_res = 2160 # 4K UHD - percentage = min(100, int((resolution / max_res) * 100)) - - table.add_row( - Text("Video Quality", style="cyan"), - Text(f"{percentage}%", style="bright_white"), - self._build_progress_bar(percentage) - ) - - # Audio quality indicator - if metadata.get('abr'): - abr = metadata['abr'] - max_abr = 320 # Typical maximum bitrate - percentage = min(100, int((abr / max_abr) * 100)) - - table.add_row( - Text("Audio Quality", style="cyan"), - Text(f"{percentage}%", style="bright_white"), - self._build_progress_bar(percentage) - ) - - return Panel( - table, - title="[b]Quality Rating[/b]", - border_style="bright_blue", - box=BOX["simple"] - ) - - def _build_progress_bar(self, percentage: int) -> Text: - """Build a text-based progress bar.""" - width = 20 - filled = int((percentage / 100) * width) - bar = "█" * filled + "░" * (width - filled) - - if percentage >= 75: - style = "bright_green" - elif percentage >= 50: - style = "green" - elif percentage >= 25: - style = "yellow" - else: - style = "red" - - return Text(bar, style=style) - def _build_format_table(self, formats: List[Dict[str, Any]]) -> Table: - """Build a table of available formats.""" - # Use our improved format table creator - return create_format_table(formats, box_style=BOX["data"], border_style="bright_blue") - - def _format_resolution(self, fmt: Dict[str, Any]) -> str: - """Format resolution information.""" - if fmt.get('height') and fmt.get('width'): - return f"{fmt['width']}x{fmt['height']}" - return "N/A" - - def _format_audio_info(self, fmt: Dict[str, Any]) -> str: - """Format audio information.""" - if fmt.get('abr'): - return f"{fmt['abr']} kbps" - return "N/A" - -class InteractiveApp(App): - """Modern Textual application for Snatch interactive mode""" - - TITLE = "Snatch Media Downloader" - SUB_TITLE = "Interactive Mode" # Constants for reducing duplication - FORMAT_TABLE_ID = "#format-table" - HELP_DOCUMENTATION_TITLE = "Help & Documentation" - - # Screen IDs - DOWNLOAD_SCREEN = "download-screen" - BROWSE_SCREEN = "browse-screen" - NETWORK_SCREEN = "network-screen" - SETTINGS_SCREEN = "settings-screen" - HELP_SCREEN = "help-screen" - AUDIO_SCREEN = "audio-screen" - VIDEO_SCREEN = "video-screen" - FILES_SCREEN = "files-screen" - # Form input ID constants - VIDEO_DIR_INPUT = "#video-dir-input" - AUDIO_DIR_INPUT = "#audio-dir-input" - FFMPEG_INPUT = "#ffmpeg-input" - ORGANIZE_FILES = "#organize-files" - - # Default paths - DEFAULT_VIDEO_DIR = os.path.join(os.path.expanduser("~"), "Downloads", "video") - DEFAULT_AUDIO_DIR = os.path.join(os.path.expanduser("~"), "Downloads", "audio") - CSS = """ - #header { - dock: top; - height: 1; - background: $boost; - color: $text; - } - - #footer { - dock: bottom; - height: 1; - background: $boost; - color: $text; - } - - #sidebar { - dock: left; - width: 32; - background: $surface; - border-right: thick $primary; - padding: 0 1; - } - - #main-content { - background: $background; - margin: 0 1; - } - - .title { - background: $primary; - color: $background; - text-align: center; - padding: 1; - text-style: bold; - width: 100%; - margin: 0 0 1 0; - } - - .subtitle { - background: $surface; - color: $text; - text-align: center; - padding: 0 1; - text-style: bold; - margin: 0 0 1 0; - } - - .section-title { - color: $accent; - text-style: bold; - margin: 1 0; - padding: 0 1; - } - .menu-item { - padding: 1 2; - margin: 0 1 1 1; - border: solid $surface; - } - - .menu-item:hover { - background: $primary-darken-1; - color: $text; - border: solid $primary; - } - - .menu-item.selected { - background: $primary; - color: $text; - text-style: bold; - border: solid $accent; - } - .info-panel { - margin: 1; - padding: 1; - border: solid $primary-darken-2; - background: $surface-lighten-1; - } - - .action-panel { - margin: 1; - padding: 1; - background: $surface; - border: solid $accent; - } - - .tab-container { - margin: 0 0 1 0; - padding: 0 1; - } - - .tab-button { - margin: 0 1 0 0; - padding: 0 2; - border: solid $surface; - background: $surface; - } - - .tab-button.active { - background: $primary; - color: $text; - border: solid $primary; - text-style: bold; - } - - .tab-button:hover { - background: $primary-darken-1; - border: solid $primary-darken-1; - } - - .progress-bar { - width: 100%; - margin: 1 0; - border: solid $primary; - } - - .status-ok { - color: $success; - text-style: bold; - } - - .status-warning { - color: $warning; - text-style: bold; - } - - .status-error { - color: $error; - text-style: bold; - } - .format-table { - width: 100%; - height: auto; - border: solid $primary; - } - - .primary { - background: $primary; - color: $text; - text-style: bold; - border: solid $primary; - margin: 0 1 0 0; - } - - .secondary { - background: $surface; - color: $text; - border: solid $surface; - margin: 0 1 0 0; - } - - .primary:hover { - background: $primary-lighten-1; - border: solid $primary-lighten-1; - } - - .secondary:hover { - background: $surface-lighten-1; - border: solid $surface-lighten-1; - } - - DataTable { - height: auto; - border: solid $primary-darken-2; - } - - Input { - border: solid $primary-darken-2; - margin: 0 0 1 0; - padding: 0 1; - } - - Input:focus { - border: solid $primary; - } - - Select { - border: solid $primary-darken-2; - margin: 0 0 1 0; - } - - Checkbox { - margin: 0 0 1 0; - } - - Container { - padding: 0 1; - } - - #downloads-table { - min-height: 10; - } - - #format-table { - min-height: 8; - } - - #network-stats { - min-height: 6; - } - - #active-downloads { - min-height: 6; - } - """ - - # Screen ID constants - DOWNLOAD_SCREEN = "download-screen" - BROWSE_SCREEN = "browse-screen" - NETWORK_SCREEN = "network-screen" - SETTINGS_SCREEN = "settings-screen" - HELP_SCREEN = "help-screen" - AUDIO_SCREEN = "audio-screen" - VIDEO_SCREEN = "video-screen" - FILES_SCREEN = "files-screen" - - # Input field ID constants - VIDEO_DIR_INPUT = "#video-dir-input" - AUDIO_DIR_INPUT = "#audio-dir-input" - FFMPEG_INPUT = "#ffmpeg-input" - ORGANIZE_FILES = "#organize-files" - - # Default paths - DEFAULT_VIDEO_DIR = os.path.join(os.path.expanduser("~"), "Downloads", "video") - DEFAULT_AUDIO_DIR = os.path.join(os.path.expanduser("~"), "Downloads", "audio") - - # Help content - - def __init__(self, config: Dict[str, Any]): - """Initialize the interactive app with configuration""" - super().__init__() - self.config = config - self.download_manager = None - self.current_url = None - self.format_info = None - self.downloads = [] - - # Initialize Advanced Configuration Manager - self.config_manager = AdvancedConfigManager() - self.config_manager.load_config() - - # Merge with existing config - self.config.update(self.config_manager.config) - - # Current active settings tab - self.active_settings_tab = "general-settings" - - def compose(self) -> ComposeResult: - """Create UI layout""" - yield Header() - - with Container(): - with Horizontal(): - # Left sidebar with menu - with Container(id="sidebar"): - yield Static("MAIN MENU", classes="title") - yield Button("📥 Download", id="menu-download", classes="menu-item") - yield Button("🔍 Browse Downloads", id="menu-browse", classes="menu-item") - yield Button("🌐 Network Test", id="menu-network", classes="menu-item") - yield Button("⚙️ Settings", id="menu-settings", classes="menu-item") - yield Button("❓ Help", id="menu-help", classes="menu-item") - - yield Static("TOOLS", classes="title") - yield Button("🎵 Audio Tools", id="menu-audio", classes="menu-item") - yield Button("🎬 Video Tools", id="menu-video", classes="menu-item") - yield Button("💾 File Management", id="menu-files", classes="menu-item") - - # Main content area with content switcher - with Container(id="main-content"): - with ContentSwitcher(id="content-switcher"): - # Download screen - with Container(id="download-screen"): - yield Static("Download Media", classes="title") - yield Input(placeholder="Enter URL to download", id="url-input") - yield Button("Analyze URL", id="analyze-btn") - - with Container(id="format-selection", classes="info-panel"): - yield Static("Format Selection", classes="title") - yield DataTable(id="format-table", classes="format-table") - - with Container(id="download-options", classes="info-panel"): - yield Static("Download Options", classes="title") - yield Checkbox("Extract Audio Only", id="audio-only") - yield Checkbox("Process Audio (Denoise, Normalize)", id="process-audio") - yield Checkbox("Upmix to 7.1 Surround", id="upmix-audio") - - yield Button("Start Download", id="start-download-btn") - - with Container(id="active-downloads"): - yield Static("Active Downloads", classes="title") - # Will be populated dynamically - - # Browse screen - with Container(id="browse-screen"): - yield Static("Browse Downloads", classes="title") - yield Input(placeholder="Search downloads...", id="search-input") - yield DataTable(id="downloads-table") - - # Network screen - with Container(id="network-screen"): - yield Static("Network Diagnostics", classes="title") - yield Button("Run Speed Test", id="speedtest-btn") - with Container(id="network-stats", classes="info-panel"): - yield Static("Network Status: Checking...", id="network-status") - # Will be populated with speed test results - # Settings screen - with Container(id="settings-screen"): - yield Static("Settings", classes="title") - - # Tabbed interface for different setting categories - with Container(id="settings-tabs", classes="tab-container"): - yield Button("General", id="general-tab", classes="tab-button active") - yield Button("Download", id="download-tab", classes="tab-button") - yield Button("Audio/Video", id="media-tab", classes="tab-button") - yield Button("Network", id="network-tab", classes="tab-button") - yield Button("Advanced", id="advanced-tab", classes="tab-button") - - with ContentSwitcher(id="settings-content"): - # General Settings - with Container(id="general-settings"): - with Vertical(): - yield Static("Output Directories", classes="section-title") - yield Input(placeholder="Video Output Directory", id="video-dir-input") - yield Input(placeholder="Audio Output Directory", id="audio-dir-input") - - yield Static("Interface", classes="section-title") - yield Select([ - ("Default", "default"), - ("Dark", "dark"), - ("Cyberpunk", "cyberpunk"), - ("Matrix", "matrix"), - ("Ocean", "ocean") - ], value="default", id="theme-select") - yield Checkbox("Keep Download History", id="download-history") - yield Checkbox("Auto Update Check", id="auto-update") - - # Download Settings - with Container(id="download-settings"): - with Vertical(): - yield Static("Concurrent Downloads", classes="section-title") - yield Input(placeholder="Max Concurrent Downloads (1-10)", id="max-concurrent-input", value="3") - yield Input(placeholder="Fragment Downloads (1-32)", id="fragment-downloads", value="16") - - yield Static("Retry Settings", classes="section-title") - yield Input(placeholder="Max Retries (0-10)", id="max-retries-input", value="3") - yield Input(placeholder="Retry Delay (1-60 seconds)", id="retry-delay-input", value="5") - yield Checkbox("Exponential Backoff", id="exponential-backoff") - - yield Static("Organization", classes="section-title") - yield Checkbox("Auto Organize Files", id="auto-organize") - yield Checkbox("Enable File Organization Features", id="organize-files") - - # Audio/Video Settings - with Container(id="media-settings"): - with Vertical(): - yield Static("Video Preferences", classes="section-title") - yield Select([ - ("H.264", "h264"), - ("H.265/HEVC", "h265"), - ("VP9", "vp9"), - ("AV1", "av1"), - ("Any", "any") - ], value="h264", id="video-codec-select") - yield Select([ - ("4K (2160p)", "2160p"), - ("1440p", "1440p"), - ("1080p", "1080p"), - ("720p", "720p"), - ("480p", "480p"), - ("Best Available", "best") - ], value="1080p", id="video-quality-select") - - yield Static("Audio Preferences", classes="section-title") - yield Select([ - ("AAC", "aac"), - ("MP3", "mp3"), - ("Opus", "opus"), - ("FLAC", "flac"), - ("Any", "any") - ], value="aac", id="audio-codec-select") - yield Select([ - ("320 kbps", "320"), - ("256 kbps", "256"), - ("192 kbps", "192"), - ("128 kbps", "128"), - ("Best Available", "best") - ], value="192", id="audio-quality-select") - yield Checkbox("Enable High Quality Audio", id="high-quality") - - # Network Settings - with Container(id="network-settings"): - with Vertical(): - yield Static("Bandwidth Control", classes="section-title") - yield Input(placeholder="Speed Limit (MB/s, 0=unlimited)", id="bandwidth-limit", value="0") - yield Input(placeholder="Chunk Size (bytes)", id="chunk-size", value="1048576") - - yield Static("Connection Settings", classes="section-title") - yield Input(placeholder="Timeout (seconds)", id="connection-timeout", value="30") - yield Checkbox("Use Proxy", id="use-proxy") - yield Input(placeholder="Proxy URL (optional)", id="proxy-url") - - # Advanced Settings - with Container(id="advanced-settings"): - with Vertical(): - yield Static("FFmpeg Configuration", classes="section-title") - yield Input(placeholder="FFmpeg Location", id="ffmpeg-input") - yield Button("Auto-Detect FFmpeg", id="detect-ffmpeg-btn") - - yield Static("Session Management", classes="section-title") - yield Input(placeholder="Session Expiry (hours)", id="session-expiry", value="168") - yield Input(placeholder="Auto Save Interval (seconds)", id="auto-save", value="30") - yield Static("Debug Options", classes="section-title") - yield Checkbox("Enable Debug Logging", id="debug-logging") - yield Checkbox("Verbose Output", id="verbose-output") - - with Container(id="settings-actions", classes="action-panel"): - yield Button("Save Settings", id="save-settings-btn", classes="primary") - yield Button("Reset to Defaults", id="reset-settings-btn", classes="secondary") - yield Button("Export Settings", id="export-settings-btn", classes="secondary") - yield Button("Import Settings", id="import-settings-btn", classes="secondary") - - # Help screen - with Container(id="help-screen"): - yield Static(self.HELP_DOCUMENTATION_TITLE, classes="title") - # Will be populated with help content - yield Static("Loading help content...") - - # Audio Tools screen - with Container(id="audio-screen"): - yield Static("Audio Processing Tools", classes="title") - with Container(classes="info-panel"): - yield Static("Audio Conversion", classes="subtitle") - yield Input(placeholder="Input audio file", id="audio-input-file") - yield Select([("mp3", "MP3"), ("aac", "AAC"), ("flac", "FLAC"), ("wav", "WAV")], id="audio-format-select", prompt="Select output format") - yield Button("Convert Audio", id="convert-audio-btn") - - with Container(classes="info-panel"): - yield Static("Audio Enhancement", classes="subtitle") - yield Checkbox("Normalize Volume", id="normalize-audio") - yield Checkbox("Remove Background Noise", id="denoise-audio") - yield Checkbox("Bass Boost", id="bass-boost") - yield Button("Process Audio", id="process-audio-btn") - - # Video Tools screen - with Container(id="video-screen"): - yield Static("Video Processing Tools", classes="title") - with Container(classes="info-panel"): - yield Static("Video Conversion", classes="subtitle") - yield Input(placeholder="Input video file", id="video-input-file") - yield Select([("mp4", "MP4"), ("mkv", "MKV"), ("webm", "WEBM")], id="video-format-select", prompt="Select output format") - yield Input(placeholder="Resolution (e.g., 1080, 720)", id="video-resolution") - yield Button("Convert Video", id="convert-video-btn") - - with Container(classes="info-panel"): - yield Static("Video Enhancement", classes="subtitle") - yield Checkbox("Enhance Colors", id="enhance-colors") - yield Checkbox("Stabilize Video", id="stabilize-video") - yield Button("Process Video", id="process-video-btn") - - # File Management screen - with Container(id="files-screen"): - yield Static("File Management", classes="title") - with Container(classes="info-panel"): - yield Static("Organize Downloads", classes="subtitle") - yield Button("Organize by Type", id="organize-by-type-btn") - yield Button("Organize by Date", id="organize-by-date-btn") - yield Button("Organize by Source", id="organize-by-source-btn") - - with Container(classes="info-panel"): - yield Static("Batch Operations", classes="subtitle") - yield Button("Rename Files", id="rename-files-btn") - yield Button("Delete Temporary Files", id="cleanup-btn") - - yield Footer() - - def on_mount(self) -> None: - """Handle app mount event""" - # Initialize download manager - self.initialize_download_manager() - - # Setup content switcher with default screens - content_switcher = self.query_one("#content-switcher") - content_switcher.default_screens = [ - self.DOWNLOAD_SCREEN, - self.BROWSE_SCREEN, - self.NETWORK_SCREEN, - self.SETTINGS_SCREEN, - self.HELP_SCREEN - ] - content_switcher.current = self.DOWNLOAD_SCREEN - - # Populate format table - self.setup_format_table() - - # Load settings into form - self.load_settings() - - # Set active menu item - self.query_one("#menu-download").add_class("selected") - # Load help content - #self.load_help_content() - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle all button press events""" - button_id = event.button.id - - # Route to appropriate handler based on button type - if button_id.startswith("menu-"): - self._handle_menu_selection(button_id) - elif button_id.endswith("-tab"): - self._handle_settings_tab_switch(button_id) - else: - self._handle_action_button(button_id) - - def _handle_action_button(self, button_id: str) -> None: - """Handle action buttons (non-menu, non-tab)""" - # Download functionality - if button_id in ["analyze-btn", "start-download-btn", "speedtest-btn"]: - self._handle_download_actions(button_id) - # Settings actions - elif button_id in ["save-settings-btn", "reset-settings-btn", "export-settings-btn", - "import-settings-btn", "detect-ffmpeg-btn"]: - self._handle_settings_actions(button_id) - # Media processing - elif button_id in ["convert-audio-btn", "process-audio-btn", "convert-video-btn", "process-video-btn"]: - self._handle_media_processing(button_id) - # File management - elif button_id in ["organize-by-type-btn", "organize-by-date-btn", "organize-by-source-btn", - "rename-files-btn", "cleanup-btn"]: - self._handle_file_management(button_id) - - def _handle_download_actions(self, button_id: str) -> None: - """Handle download-related button actions""" - if button_id == "analyze-btn": - self.analyze_url() - elif button_id == "start-download-btn": - self.start_download() - elif button_id == "speedtest-btn": - self.run_speed_test() - - def _handle_settings_actions(self, button_id: str) -> None: - """Handle settings-related button actions""" - if button_id == "save-settings-btn": - self.save_advanced_settings() - elif button_id == "reset-settings-btn": - self.reset_settings() - elif button_id == "export-settings-btn": - self.export_settings() - elif button_id == "import-settings-btn": - self.import_settings() - elif button_id == "detect-ffmpeg-btn": - self.auto_detect_ffmpeg() - - def _handle_media_processing(self, button_id: str) -> None: - """Handle media processing button actions""" - if button_id == "convert-audio-btn": - self.convert_audio() - elif button_id == "process-audio-btn": - self.process_audio() - elif button_id == "convert-video-btn": - self.convert_video() - elif button_id == "process-video-btn": - self.process_video() - - def _handle_file_management(self, button_id: str) -> None: - """Handle file management button actions""" - if button_id == "organize-by-type-btn": - self.organize_files_by_type() - elif button_id == "organize-by-date-btn": - self.organize_files_by_date() - elif button_id == "organize-by-source-btn": - self.organize_files_by_source() - elif button_id == "rename-files-btn": - self.rename_files() - elif button_id == "cleanup-btn": - self.cleanup_temp_files() - - def _handle_menu_selection(self, button_id: str) -> None: - """Handle menu item selection and switch screens""" - # Remove selected class from all menu items - for menu_item in self.query(".menu-item"): - menu_item.remove_class("selected") - - # Add selected class to current item - self.query_one(f"#{button_id}").add_class("selected") - - # Switch content - content_switcher = self.query_one("#content-switcher") - - if button_id == "menu-download": - content_switcher.current = self.DOWNLOAD_SCREEN - elif button_id == "menu-browse": - content_switcher.current = self.BROWSE_SCREEN - elif button_id == "menu-network": - content_switcher.current = self.NETWORK_SCREEN - elif button_id == "menu-settings": - content_switcher.current = self.SETTINGS_SCREEN - self.load_advanced_settings() - elif button_id == "menu-help": - content_switcher.current = self.HELP_SCREEN - elif button_id == "menu-audio": - content_switcher.current = self.AUDIO_SCREEN - elif button_id == "menu-video": - content_switcher.current = self.VIDEO_SCREEN - elif button_id == "menu-files": - content_switcher.current = self.FILES_SCREEN - - def _handle_settings_tab_switch(self, button_id: str) -> None: - """Handle settings tab switching""" - # Remove active class from all tabs - for tab in self.query(".tab-button"): - tab.remove_class("active") - - # Add active class to current tab - self.query_one(f"#{button_id}").add_class("active") - - # Switch settings content - settings_switcher = self.query_one("#settings-content") - - if button_id == "general-tab": - settings_switcher.current = "general-settings" - self.active_settings_tab = "general-settings" - elif button_id == "download-tab": - settings_switcher.current = "download-settings" - self.active_settings_tab = "download-settings" - elif button_id == "media-tab": - settings_switcher.current = "media-settings" - self.active_settings_tab = "media-settings" - elif button_id == "network-tab": - settings_switcher.current = "network-settings" - self.active_settings_tab = "network-settings" - elif button_id == "advanced-tab": - settings_switcher.current = "advanced-settings" - self.active_settings_tab = "advanced-settings" - - # Settings Management Methods - def load_advanced_settings(self) -> None: - """Load and populate settings form fields from AdvancedConfigManager""" - try: - # General settings - if hasattr(self, 'query_one'): - video_dir_input = self.query_one(self.VIDEO_DIR_INPUT, Input) - audio_dir_input = self.query_one(self.AUDIO_DIR_INPUT, Input) - organize_files = self.query_one(self.ORGANIZE_FILES, Checkbox) - - # Populate from config manager - video_dir_input.value = self.config_manager.get_setting('download', 'video_output_dir', self.DEFAULT_VIDEO_DIR) - audio_dir_input.value = self.config_manager.get_setting('download', 'audio_output_dir', self.DEFAULT_AUDIO_DIR) - organize_files.value = self.config_manager.get_setting('general', 'organize_files', True) - - # Load other settings - max_concurrent = self.query_one("#max-concurrent-input", Input) - max_concurrent.value = str(self.config_manager.get_setting('download', 'max_concurrent_downloads', 3)) - - # Load retry settings - max_retries = self.query_one("#max-retries-input", Input) - max_retries.value = str(self.config_manager.get_setting('download', 'max_retries', 3)) - - except Exception as e: - logging.error(f"Error loading advanced settings: {e}") - - def save_advanced_settings(self) -> None: - """Save current form values to AdvancedConfigManager""" - try: - # Get form values - video_dir = self.query_one(self.VIDEO_DIR_INPUT, Input).value - audio_dir = self.query_one(self.AUDIO_DIR_INPUT, Input).value - organize_files = self.query_one(self.ORGANIZE_FILES, Checkbox).value - - # Save to config manager - self.config_manager.set_setting('download', 'video_output_dir', video_dir) - self.config_manager.set_setting('download', 'audio_output_dir', audio_dir) - self.config_manager.set_setting('general', 'organize_files', organize_files) - - # Get and save numeric settings - try: - max_concurrent = int(self.query_one("#max-concurrent-input", Input).value) - self.config_manager.set_setting('download', 'max_concurrent_downloads', max_concurrent) - except ValueError: - pass - - try: - max_retries = int(self.query_one("#max-retries-input", Input).value) - self.config_manager.set_setting('download', 'max_retries', max_retries) - except ValueError: - pass - - # Save config - self.config_manager.save_config() - - # Update main config - self.config.update(self.config_manager.config) - - # Show success message - self.notify("Settings saved successfully!", severity="information") - - except Exception as e: - logging.error(f"Error saving settings: {e}") - self.notify(f"Error saving settings: {e}", severity="error") - - def reset_settings(self) -> None: - """Reset settings to defaults""" - try: - # Reset config manager to defaults - self.config_manager.reset_to_defaults() - - # Reload settings in form - self.load_advanced_settings() - - self.notify("Settings reset to defaults", severity="information") - - except Exception as e: - logging.error(f"Error resetting settings: {e}") - self.notify(f"Error resetting settings: {e}", severity="error") - - def export_settings(self) -> None: - """Export settings to file""" - try: - export_path = self.config_manager.export_config() - self.notify(f"Settings exported to: {export_path}", severity="information") - except Exception as e: - logging.error(f"Error exporting settings: {e}") - self.notify(f"Error exporting settings: {e}", severity="error") - - def import_settings(self) -> None: - """Import settings from file""" - try: - # This would typically open a file picker in a real implementation - # For now, we'll just show a placeholder message - self.notify("Import settings functionality not yet implemented", severity="warning") - except Exception as e: - logging.error(f"Error importing settings: {e}") - self.notify(f"Error importing settings: {e}", severity="error") - - def auto_detect_ffmpeg(self) -> None: - """Auto-detect FFmpeg installation""" - try: - # Use AdvancedConfigManager's auto-detection - ffmpeg_path = self.config_manager.auto_detect_tool('ffmpeg') - - if ffmpeg_path: - # Update the FFmpeg input field - ffmpeg_input = self.query_one(self.FFMPEG_INPUT, Input) - ffmpeg_input.value = ffmpeg_path - - # Save to config - self.config_manager.set_setting('tools', 'ffmpeg_path', ffmpeg_path) - self.config_manager.save_config() - - self.notify(f"FFmpeg detected: {ffmpeg_path}", severity="information") - else: - self.notify("FFmpeg not found. Please install FFmpeg or set path manually.", severity="warning") - - except Exception as e: - logging.error(f"Error detecting FFmpeg: {e}") - self.notify(f"Error detecting FFmpeg: {e}", severity="error") # Media Processing Methods def convert_audio(self) -> None: - """Convert audio files to different formats""" - try: - # Import audio processor - from .audio_processor import EnhancedAudioProcessor - - # Get input file path from user - files_widget = self.query_one(UI_ELEMENTS["FILES_LIST"], DataTable) - if files_widget.row_count == 0: - self.notify(CONSTANTS["NO_FILES_FOUND"], severity="warning") - return - - # Get selected file or use first available - try: - row_key = files_widget.cursor_row - if row_key is None or row_key >= files_widget.row_count: - row_key = 0 - row = files_widget.get_row_at(row_key) - file_path = str(row[1]) # File path is in the second column - except Exception: - self.notify(CONSTANTS["PLEASE_SELECT_FILE"], severity="error") - return - - if not os.path.exists(file_path): - self.notify(CONSTANTS["FILE_NOT_FOUND"].format(file_path=file_path), severity="error") - return - # Initialize audio processor - audio_processor = EnhancedAudioProcessor(self.config) - - # Get target format from user input (you could add a form field for this) - target_format = "flac" # Default to high quality - # Create output path - base_path = os.path.splitext(file_path)[0] - output_path = f"{base_path}_converted.{target_format}" - - self.notify(f"Converting {os.path.basename(file_path)} to {target_format.upper()}", severity="information") - - # Start audio conversion in background task - self._start_audio_conversion_task(audio_processor, file_path, output_path, target_format) - - except ImportError: - self.notify(CONSTANTS["FILE_ORG_NOT_AVAILABLE"], severity="error") - except Exception as e: - logging.error(f"Error in audio conversion: {e}") - self.notify(f"Audio conversion failed: {str(e)}", severity="error") - - @work - async def _start_audio_conversion_task(self, audio_processor, input_path: str, output_path: str, target_format: str) -> None: - """Background task for audio conversion""" - try: - # Perform actual audio conversion - result = await audio_processor.convert_audio_async(input_path, output_path, target_format) - - if result: - self.notify(f"Audio conversion to {target_format.upper()} completed!", severity="success") - else: - self.notify(CONSTANTS["CONVERSION_FAILED"], severity="error") - - except Exception as e: - logging.error(f"Background audio conversion error: {e}") - self.notify(f"Audio conversion failed: {str(e)}", severity="error") - def process_audio(self) -> None: - """Process audio with advanced effects and enhancements""" - try: - # Import audio processor - from .audio_processor import EnhancedAudioProcessor - # Get input file path from user - files_widget = self.query_one(UI_ELEMENTS["FILES_LIST"], DataTable) - if files_widget.row_count == 0: - self.notify(CONSTANTS["NO_AUDIO_FILES"], severity="warning") - return - - # Get selected file or use first available - try: - row_key = files_widget.cursor_row - if row_key is None or row_key >= files_widget.row_count: - row_key = 0 - row = files_widget.get_row_at(row_key) - file_path = str(row[1]) # File path is in the second column - except Exception: - self.notify(CONSTANTS["PLEASE_SELECT_AUDIO"], severity="error") - return - - if not os.path.exists(file_path): - self.notify(f"File not found: {file_path}", severity="error") - return - # Initialize audio processor - audio_processor = EnhancedAudioProcessor(self.config) - # Get processing options from form fields - try: - process_audio = self.query_one(UI_ELEMENTS["PROCESS_AUDIO"], Checkbox).value - upmix_audio = self.query_one("#upmix-audio", Checkbox).value - except Exception: - # Default options if form fields not available - process_audio = True - upmix_audio = True - self.notify(f"Processing audio file: {os.path.basename(file_path)}", severity="information") - - # Apply audio processing effects - if process_audio: - self.notify("Applying audio normalization and denoising...", severity="information") - - if upmix_audio: - self.notify("Applying 7.1 surround sound upmix...", severity="information") - - # Start audio processing in background task - self._start_audio_processing_task(audio_processor, file_path, process_audio, upmix_audio) - - except ImportError: - self.notify(CONSTANTS["FILE_ORG_NOT_AVAILABLE"], severity="error") - except Exception as e: - logging.error(f"Error in audio processing: {e}") - self.notify(f"Audio processing failed: {str(e)}", severity="error") - - @work - async def _start_audio_processing_task(self, audio_processor, input_path: str, process_audio: bool, upmix_audio: bool) -> None: - """Background task for audio processing""" - try: - # Apply audio processing effects - result = await audio_processor.process_audio_async( - input_path, - normalize=process_audio, - denoise=process_audio, - upmix_surround=upmix_audio - ) - - if result: - self.notify("Audio processing completed with all enhancements!", severity="success") - else: - self.notify(CONSTANTS["CONVERSION_FAILED"], severity="error") - - except Exception as e: - logging.error(f"Background audio processing error: {e}") - self.notify(f"Audio processing failed: {str(e)}", severity="error") - - def convert_video(self) -> None: - """Convert video files to different formats and resolutions""" - try: - # Get input file path from user - files_widget = self.query_one(UI_ELEMENTS["FILES_LIST"], DataTable) - if files_widget.row_count == 0: - self.notify(CONSTANTS["NO_VIDEO_FILES_CONVERT"], severity="warning") - return - - # Get selected file or use first available - try: - row_key = files_widget.cursor_row - if row_key is None or row_key >= files_widget.row_count: - row_key = 0 - row = files_widget.get_row_at(row_key) - file_path = str(row[1]) # File path is in the second column - except Exception: - self.notify("Please select a video file to convert", severity="error") - return - - if not os.path.exists(file_path): - self.notify(f"File not found: {file_path}", severity="error") - return # Basic video conversion using FFmpeg - # Default conversion settings - target_format = "mp4" # Popular format - target_codec = "libx264" # Hardware compatible - - # Create output path - base_path = os.path.splitext(file_path)[0] - output_path = f"{base_path}_converted.{target_format}" - - self.notify(f"Converting {os.path.basename(file_path)} to {target_format.upper()}", severity="information") - - # Start video conversion in background task - self._start_video_conversion_task(file_path, output_path, target_format, target_codec) - - except Exception as e: - logging.error(f"Error in video conversion: {e}") - self.notify(f"Video conversion failed: {str(e)}", severity="error") - - @work - async def _start_video_conversion_task(self, input_path: str, output_path: str, target_format: str, target_codec: str) -> None: - """Background task for video conversion""" - try: - from .ffmpeg_helper import FFmpegHelper - - ffmpeg_helper = FFmpegHelper(self.config) - result = await ffmpeg_helper.convert_video_async( - input_path, - output_path, - codec=target_codec, - format=target_format - ) - - if result: - self.notify(f"Video conversion to {target_format.upper()} completed!", severity="success") - else: - self.notify(CONSTANTS["CONVERSION_FAILED"], severity="error") - except Exception as e: - logging.error(f"Background video conversion error: {e}") - self.notify(f"Video conversion failed: {str(e)}", severity="error") - - def process_video(self) -> None: - """Process video with effects, filters, and optimizations""" - try: - # Get input file path from user - files_widget = self.query_one(UI_ELEMENTS["FILES_LIST"], DataTable) - if files_widget.row_count == 0: - self.notify("No video files found to process", severity="warning") - return - - # Get selected file or use first available - try: - row_key = files_widget.cursor_row - if row_key is None or row_key >= files_widget.row_count: - row_key = 0 - row = files_widget.get_row_at(row_key) - file_path = str(row[1]) # File path is in the second column - except Exception: - self.notify("Please select a video file to process", severity="error") - return - - if not os.path.exists(file_path): - self.notify(f"File not found: {file_path}", severity="error") - return self.notify(f"Processing video file: {os.path.basename(file_path)}", severity="information") - - # Apply video processing effects - self.notify("Applying video stabilization and enhancement filters...", severity="information") - - # Create output path for processed video - base_path = os.path.splitext(file_path)[0] - output_path = f"{base_path}_processed.mp4" - - # Start video processing in background task - self._start_video_processing_task(file_path, output_path) - - except Exception as e: - logging.error(f"Error in video processing: {e}") - self.notify(f"Video processing failed: {str(e)}", severity="error") - - @work - async def _start_video_processing_task(self, input_path: str, output_path: str) -> None: - """Background task for video processing""" - try: - from .ffmpeg_helper import FFmpegHelper - - ffmpeg_helper = FFmpegHelper(self.config) - result = await ffmpeg_helper.process_video_async( - input_path, - output_path, - stabilize=True, - enhance_colors=True, - denoise=True - ) - - if result: - self.notify("Video processing completed with all enhancements!", severity="success") - else: - self.notify(CONSTANTS["CONVERSION_FAILED"], severity="error") - - except Exception as e: - logging.error(f"Background video processing error: {e}") - self.notify(f"Video processing failed: {str(e)}", severity="error") - - # Media Processing Methods - def organize_files_by_type(self) -> None: - """Organize files by type""" - try: - from .file_organizer import FileOrganizer - - # Get output directories - video_dir = self.config.get('download', {}).get('video_output_dir', self.DEFAULT_VIDEO_DIR) - audio_dir = self.config.get('download', {}).get('audio_output_dir', self.DEFAULT_AUDIO_DIR) - - # Create file organizer - organizer = FileOrganizer() - - # Organize video files - video_count = organizer.organize_by_type(video_dir) - audio_count = organizer.organize_by_type(audio_dir) - total_organized = video_count + audio_count - self.notify(f"Organized {total_organized} files by type", severity="information") - - except ImportError: - self.notify(CONSTANTS["FILE_ORG_NOT_AVAILABLE"], severity="error") - except Exception as e: - logging.error(f"Error organizing files by type: {e}") - self.notify(f"Error organizing files: {str(e)}", severity="error") - - def organize_files_by_date(self) -> None: - """Organize files by date""" - try: - from .file_organizer import FileOrganizer - - # Get output directories - video_dir = self.config.get('download', {}).get('video_output_dir', self.DEFAULT_VIDEO_DIR) - audio_dir = self.config.get('download', {}).get('audio_output_dir', self.DEFAULT_AUDIO_DIR) - - # Create file organizer - organizer = FileOrganizer() - - # Organize by date - video_count = organizer.organize_by_date(video_dir) - audio_count = organizer.organize_by_date(audio_dir) - total_organized = video_count + audio_count - self.notify(f"Organized {total_organized} files by date", severity="information") - - except ImportError: - self.notify(CONSTANTS["FILE_ORG_NOT_AVAILABLE"], severity="error") - except Exception as e: - logging.error(f"Error organizing files by date: {e}") - self.notify(f"Error organizing files: {str(e)}", severity="error") - - def organize_files_by_source(self) -> None: - """Organize files by source""" - try: - from .file_organizer import FileOrganizer - - # Get output directories - video_dir = self.config.get('download', {}).get('video_output_dir', self.DEFAULT_VIDEO_DIR) - audio_dir = self.config.get('download', {}).get('audio_output_dir', self.DEFAULT_AUDIO_DIR) - - # Create file organizer - organizer = FileOrganizer() - - # Organize by source - video_count = organizer.organize_by_source(video_dir) - audio_count = organizer.organize_by_source(audio_dir) - total_organized = video_count + audio_count - self.notify(f"Organized {total_organized} files by source", severity="information") - except ImportError: - self.notify(CONSTANTS["FILE_ORG_NOT_AVAILABLE"], severity="error") - except Exception as e: - logging.error(f"Error organizing files by source: {e}") - self.notify(f"Error organizing files: {str(e)}", severity="error") - - def rename_files(self) -> None: - """Rename files based on patterns""" - try: - # Get output directories - video_dir = self.config.get('download', {}).get('video_output_dir', self.DEFAULT_VIDEO_DIR) - audio_dir = self.config.get('download', {}).get('audio_output_dir', self.DEFAULT_AUDIO_DIR) - - total_renamed = 0 - directories = [video_dir, audio_dir] - - for directory in directories: - if os.path.exists(directory): - renamed_count = self._rename_files_in_directory(directory) - total_renamed += renamed_count - - self.notify(f"Renamed {total_renamed} files", severity="information") - - except Exception as e: - logging.error(f"Error renaming files: {e}") - self.notify(f"Error renaming files: {str(e)}", severity="error") - - def _rename_files_in_directory(self, directory: str) -> int: - """Helper method to rename files in a specific directory""" - import re - renamed_count = 0 - - for root, dirs, files in os.walk(directory): - for file in files: - try: - old_path = os.path.join(root, file) - name, ext = os.path.splitext(file) - - # Clean filename - clean_name = re.sub(r'[^\w\s-]', '', name) - clean_name = re.sub(r'[-\s]+', '-', clean_name).strip('-') - - new_file = f"{clean_name}{ext}" - new_path = os.path.join(root, new_file) - - if old_path != new_path and not os.path.exists(new_path): - os.rename(old_path, new_path) - renamed_count += 1 - except OSError: - continue - - return renamed_count - - def cleanup_temp_files(self) -> None: - """Clean up temporary files""" - try: - cache_dir = self.config.get('cache_directory', os.path.join(os.path.expanduser("~"), ".cache", "snatch")) - temp_extensions = ['.part', '.tmp', '.temp', '.ytdl'] - - total_cleaned = 0 - total_size = 0 - - # Clean cache directory - if os.path.exists(cache_dir): - cleaned, size = self._cleanup_directory(cache_dir, temp_extensions) - total_cleaned += cleaned - total_size += size - - # Clean download directories - video_dir = self.config.get('download', {}).get('video_output_dir', self.DEFAULT_VIDEO_DIR) - audio_dir = self.config.get('download', {}).get('audio_output_dir', self.DEFAULT_AUDIO_DIR) - - for directory in [video_dir, audio_dir]: - if os.path.exists(directory): - cleaned, size = self._cleanup_directory(directory, temp_extensions) - total_cleaned += cleaned - total_size += size - - size_str = format_size(total_size) - self.notify(f"Cleaned up {total_cleaned} temporary files ({size_str})", severity="information") - - except Exception as e: - logging.error(f"Error cleaning up temp files: {e}") - self.notify(f"Error cleaning up temp files: {str(e)}", severity="error") - - def _cleanup_directory(self, directory: str, temp_extensions: List[str]) -> Tuple[int, int]: - """Helper method to clean up temporary files in a directory""" - cleaned_count = 0 - total_size = 0 - - for root, dirs, files in os.walk(directory): - for file in files: - if any(file.endswith(ext) for ext in temp_extensions): - file_path = os.path.join(root, file) - try: - file_size = os.path.getsize(file_path) - os.remove(file_path) - cleaned_count += 1 - total_size += file_size - except OSError: - continue - - return cleaned_count, total_size# Download Methods (existing but might need updating) - def analyze_url(self) -> None: - """Analyze URL with yt-dlp to extract media information""" - url_input = self.query_one("#url-input", Input) - url = url_input.value.strip() - - if not url: - self.notify("Please enter a URL", severity="error") - return - - self.current_url = url - self.notify(f"Analyzing URL: {url}", severity="information") - - # Basic URL validation - if not (url.startswith('http://') or url.startswith('https://')): - self.notify("Please enter a valid HTTP/HTTPS URL", severity="error") - return - - try: - # Import yt-dlp for media extraction - import yt_dlp - - # Configure yt-dlp options for info extraction only - ydl_opts = { - 'quiet': True, - 'no_warnings': True, - 'extract_flat': False, - 'listformats': True, - } - - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - # Extract info without downloading - info = ydl.extract_info(url, download=False) - - # Update the format table with available formats - self._populate_format_table(info) - - # Update info panel with metadata - self._update_info_panel(info) - - self.notify(f"Successfully analyzed: {info.get('title', 'Unknown Title')}", severity="information") - - except ImportError: - self.notify("yt-dlp not found. Please install it: pip install yt-dlp", severity="error") - except Exception as e: - logging.error(f"Error analyzing URL: {e}") - self.notify(f"Error analyzing URL: {str(e)}", severity="error") - - def start_download(self) -> None: - """Start downloading the selected format""" - if not self.current_url: - self.notify("Please analyze a URL first", severity="error") - return - - self.notify(f"Starting download: {self.current_url}", severity="information") - # Get download options from form - audio_only = self.query_one("#audio-only", Checkbox).value - process_audio = self.query_one("#process-audio", Checkbox).value - upmix_audio = self.query_one("#upmix-audio", Checkbox).value - - try: - if self.download_manager: - # Prepare download options - download_options = { - 'url': self.current_url, - 'audio_only': audio_only, - 'process_audio': process_audio, - 'upmix_audio': upmix_audio, - 'output_path': self.config.get('download', {}).get('video_output_dir', self.DEFAULT_VIDEO_DIR) if not audio_only else self.config.get('download', {}).get('audio_output_dir', self.DEFAULT_AUDIO_DIR) - } - - # Start download using download manager - # This would be a background task in a real implementation - self.notify("Download started! Check the active downloads section for progress.", severity="information") - - # Add to downloads list for tracking - download_info = { - 'url': self.current_url, - 'status': 'In Progress', - 'progress': 0, - 'options': download_options - } - self.downloads.append(download_info) - else: - raise DownloadManagerError("Download manager not initialized") - except Exception as e: - logging.error(f"Error starting download: {e}") - self.notify(f"Error starting download: {str(e)}", severity="error") - - def run_speed_test(self) -> None: - """Run enhanced network speed test with detailed analysis and recommendations""" - self.notify("🚀 Starting enhanced network speed test...", severity="information") - - # Use @work decorator for proper async handling in Textual - self._start_speed_test_task() - - @work - async def _start_speed_test_task(self) -> None: - """Background task for enhanced speed test with rich display""" - try: - # Import enhanced speedtest function - from .network import run_speedtest - from rich.console import Console - - # Create console for enhanced display - console = Console() - - # Show starting message - self.notify("🚀 Running enhanced network speed test...", severity="information") - - # Run enhanced speed test with detailed analysis - result = await run_speedtest(detailed=True, use_cache=False, console=console) - - if result: - # Update network status display with enhanced information - try: - network_status = self.query_one("#network-status", Static) - - # Create enhanced status display - quality_stars = "⭐" * result.get_quality_rating() - download_status = "🚀 Excellent" if result.download_mbps >= 100 else "✅ Very Good" if result.download_mbps >= 25 else "👍 Good" if result.download_mbps >= 5 else "⚠️ Fair" if result.download_mbps >= 1 else "❌ Poor" - - status_text = f"""🌐 Enhanced Speed Test Results {quality_stars} - -📥 Download: {result.download_mbps:.1f} Mbps ({download_status}) -📤 Upload: {result.upload_mbps:.1f} Mbps -🏓 Ping: {result.ping_ms:.1f} ms -📊 Jitter: {result.jitter_ms:.1f} ms (if available) -📦 Packet Loss: {result.packet_loss:.2f}% (if available) + logger.warning("Textual interface failed: %s", e) + logger.info("Falling back to simple console interface...") + _run_console_fallback(config) -🎯 Recommended Activities: -{'✅ 4K streaming' if result.download_mbps >= 25 else '✅ HD streaming' if result.download_mbps >= 15 else '✅ SD streaming' if result.download_mbps >= 5 else '⚠️ Limited streaming'} -{'✅ Online gaming' if result.ping_ms < 50 and result.download_mbps >= 3 else '👍 Casual gaming' if result.ping_ms < 100 else '❌ Gaming may lag'} -{'✅ Video calls' if result.download_mbps >= 1.5 and result.ping_ms < 150 else '⚠️ Limited video calling'}""" - - network_status.update(status_text) - except Exception: - # If network status widget not found, just log enhanced results - logging.info(f"Enhanced speed test completed: {result.download_mbps:.1f} Mbps down ({quality_stars}), {result.upload_mbps:.1f} Mbps up, {result.ping_ms:.1f} ms ping") - - self.notify(f"✅ Enhanced speed test completed! {result.download_mbps:.1f} Mbps ({quality_stars})", severity="information") - else: - self.notify("❌ Enhanced speed test failed - no results returned", severity="error") - - except ImportError: - self.notify("Network module not available for enhanced speed testing", severity="error") - except Exception as e: - logging.error(f"Enhanced speed test error: {e}") - self.notify(f"❌ Enhanced speed test failed: {str(e)}", severity="error") - - # Initialization Methods def initialize_download_manager(self) -> None: - """Initialize the download manager""" - try: - self.download_manager = DownloadManager(self.config) - logging.info("Download manager initialized successfully") - except Exception as e: - logging.error(f"Error initializing download manager: {e}") - self.notify(f"Error initializing download manager: {e}", severity="error") - def setup_format_table(self) -> None: - """Setup the format selection table""" - try: - format_table = self.query_one(UI_ELEMENTS["FORMAT_TABLE"], DataTable) - format_table.add_columns( - "ID", "Extension", "Resolution", "Codec", "Size", "Audio", "FPS" - ) - # Add placeholder row - format_table.add_row("--", "--", "--", "--", "--", "--", "--") - except Exception as e: - logging.error(f"Error setting up format table: {e}") +def _run_console_fallback(config: Dict[str, Any]) -> None: + """Minimal Rich-based console fallback when Textual is unavailable.""" + from rich.console import Console + from rich.panel import Panel - # Helper Methods - def notify(self, message: str, severity: str = "information") -> None: - """Show notification to user""" - try: - # In a full Textual implementation, this would show a toast notification - # For now, we'll log the message - if severity == "error": - logging.error(message) - print(f"ERROR: {message}") - elif severity == "warning": - logging.warning(message) - print(f"WARNING: {message}") - else: - logging.info(message) - print(f"INFO: {message}") - except Exception as e: - logging.error(f"Error showing notification: {e}") - - def _initialize_format_table(self, format_table: DataTable) -> None: - """Initialize format table with columns if not already present.""" - format_table.clear() - if not format_table.columns: - format_table.add_columns( - "ID", "Extension", "Resolution", "Codec", "Size", "Audio", "FPS" - ) - - def _get_format_resolution(self, fmt: dict) -> str: - """Extract resolution information from format.""" - if fmt.get('width') and fmt.get('height'): - return f"{fmt['width']}x{fmt['height']}" - return 'N/A' - - def _get_format_codec(self, fmt: dict) -> str: - """Extract codec information from format.""" - codec = fmt.get('vcodec', 'N/A') - if codec == 'none': - codec = fmt.get('acodec', 'N/A') - return codec - - def _get_format_size(self, fmt: dict) -> str: - """Extract and format file size information.""" - filesize = fmt.get('filesize') or fmt.get('filesize_approx') - return format_size(filesize) if filesize else 'N/A' - - def _get_format_audio(self, fmt: dict) -> str: - """Extract audio bitrate information.""" - abr = fmt.get('abr') - return f"{abr} kbps" if abr else 'N/A' - - def _add_format_row(self, format_table: DataTable, fmt: dict) -> None: - """Add a single format row to the table.""" - format_id = str(fmt.get('format_id', 'N/A')) - ext = fmt.get('ext', 'N/A') - resolution = self._get_format_resolution(fmt) - codec = self._get_format_codec(fmt) - size = self._get_format_size(fmt) - audio = self._get_format_audio(fmt) - fps = str(fmt.get('fps', 'N/A')) if fmt.get('fps') else 'N/A' - - format_table.add_row(format_id, ext, resolution, codec, size, audio, fps) - - def _populate_format_table(self, info: Dict[str, Any]) -> None: - """Populate the format table with available formats from yt-dlp info""" - try: - format_table = self.query_one(UI_ELEMENTS["FORMAT_TABLE"], DataTable) - - # Initialize table - self._initialize_format_table(format_table) - - # Add format data from yt-dlp info - formats = info.get('formats', []) - for fmt in formats: - self._add_format_row(format_table, fmt) - - self.format_info = info - - except Exception as e: - logging.error(f"Error populating format table: {e}") - - def _update_info_panel(self, info: Dict[str, Any]) -> None: - """Update the info panel with media metadata""" - try: - # This would update an info panel with metadata like title, duration, uploader, etc. - # For now, we'll just log the info - title = info.get('title', 'Unknown Title') - duration = info.get('duration', 0) - uploader = info.get('uploader', 'Unknown') - - logging.info(f"Media info - Title: {title}, Duration: {duration}s, Uploader: {uploader}") - - except Exception as e: - logging.error(f"Error updating info panel: {e}") - - # Static Content Properties - @property - def HELP_DOCUMENTATION_TITLE(self) -> str: - """Get help documentation title""" - return "📖 Snatch Media Downloader - Help & Documentation" - - async def on_mount(self) -> None: - """Initialize the application when mounted.""" - try: - # Initialize download manager if not already done - if not hasattr(self, 'download_manager') or not self.download_manager: - await self._initialize_download_manager() - - # Start performance monitoring - if hasattr(self, 'download_manager') and self.download_manager and self.download_manager.performance_monitor: - await self.download_manager.performance_monitor.start_monitoring() - # Set up periodic updates with optimized intervals to reduce UI lag - self.set_interval(3.0, self._update_system_stats) # Reduced from 1.0s to 3.0s - self.set_interval(2.0, self._update_download_progress) # Reduced from 0.5s to 2.0s - - except Exception as e: - logging.error(f"Error during app initialization: {e}") - - async def _initialize_download_manager(self) -> None: - """Initialize the download manager with proper configuration.""" - try: - from .manager import AsyncDownloadManager - from .session import AsyncSessionManager - from .cache import DownloadCache - - # Initialize dependencies - session_manager = AsyncSessionManager(self.config.get("session_file", "sessions/session.json")) - download_cache = DownloadCache() - - # Create download manager - self.download_manager = AsyncDownloadManager( - config=self.config, - session_manager=session_manager, - download_cache=download_cache - ) - - logging.info("Download manager initialized successfully") - - except Exception as e: - logging.error(f"Failed to initialize download manager: {e}") - self.download_manager = None - - async def _update_system_stats(self) -> None: - """Update system statistics periodically.""" - try: - if hasattr(self, 'download_manager') and self.download_manager and self.download_manager.performance_monitor: - # Update performance metrics - metrics = self.download_manager.performance_monitor.get_current_metrics() - - # Update any visible performance displays - if hasattr(self, 'performance_widget'): - self.performance_widget.update_metrics(metrics) - - except Exception as e: - logging.debug(f"Error updating system stats: {e}") - - async def _update_download_progress(self) -> None: - """Update download progress displays.""" - try: - if hasattr(self, 'download_manager') and self.download_manager: - # Get current download status - status = self.download_manager.get_system_status() - - # Update progress displays - if hasattr(self, 'progress_widget'): - self.progress_widget.update_status(status) - - except Exception as e: - logging.debug(f"Error updating download progress: {e}") - - -def launch_textual_interface(config: Dict[str, Any]) -> None: - """Launch the modern Textual-based interactive interface. - - This function provides the entry point for the enhanced interactive mode - with real-time monitoring, rich UI components, and advanced features. - - Args: - config: Application configuration dictionary - """ - try: - # Validate configuration - if not config: - raise ValueError("Configuration is required") - - # Set up logging for interactive mode - logging.info("Starting Textual interactive interface...") - - # Create and run the interactive app - app = InteractiveApp(config) - # Check if we're in an async context - try: - import asyncio - # Try to get the running loop - asyncio.get_running_loop() - # If we get here, we're in an async context, so we need to handle this differently - import threading - - def run_app(): - # Create a new event loop for this thread - asyncio.set_event_loop(asyncio.new_event_loop()) - app.run() - - # Run the app in a separate thread - thread = threading.Thread(target=run_app, daemon=True) - thread.start() - thread.join() - - except RuntimeError: - # No running loop, safe to run normally - app.run() - - except Exception as e: - console = Console() - console.print(f"[red]Error launching interactive interface: {e}[/]") - logging.error(f"Failed to launch interactive interface: {e}") - raise - - -async def launch_interactive_mode(config: Dict[str, Any]) -> None: - """Launch the classic interactive mode with Rich console interface. - - This provides a fallback option for systems where Textual may not work properly. - - Args: - config: Application configuration dictionary - """ console = Console() - - try: - console.print(Panel( - "[bold cyan]🎬 Snatch Media Downloader - Interactive Mode[/]\n" - "[yellow]Classic Rich Console Interface[/]", - title="Welcome", - border_style="bright_blue" - )) - - # Initialize download manager - from .manager import AsyncDownloadManager - from .session import AsyncSessionManager - from .cache import DownloadCache - - session_manager = AsyncSessionManager(config.get("session_file", "sessions/session.json")) - download_cache = DownloadCache() - - download_manager = AsyncDownloadManager( - config=config, - session_manager=session_manager, - download_cache=download_cache - ) - - # Main interactive loop - while True: - console.print("\n" + "="*60) - console.print("[bold cyan]📋 Main Menu[/]") - console.print("="*60) - - options = { - "1": "🔗 Download Media", - "2": "📊 View System Status", - "3": "⚡ Performance Monitor", - "4": "📁 Queue Management", - "5": "⚙️ Settings", - "6": "❓ Help", - "q": "🚪 Quit" - } - - for key, desc in options.items(): - console.print(f" [{key}] {desc}") - - choice = Prompt.ask( - "\n[bold yellow]Select an option[/]", - choices=list(options.keys()), - default="1" - ) - - if choice == "q": - console.print("[green]👋 Thanks for using Snatch! Goodbye![/]") - break - elif choice == "1": - await _handle_download_interactive(console, download_manager) - elif choice == "2": - await _show_system_status(console, download_manager) - elif choice == "3": - await _show_performance_monitor(console, download_manager) - elif choice == "4": - await _show_queue_management(console, download_manager) - elif choice == "5": - _show_settings(console, config) - elif choice == "6": - _show_help(console) - - except KeyboardInterrupt: - console.print("\n[yellow]👋 Interactive mode interrupted. Goodbye![/]") - except Exception as e: - console.print(f"[red]Error in interactive mode: {e}[/]") - logging.error(f"Interactive mode error: {e}") - - -async def _handle_download_interactive(console: Console, download_manager: AsyncDownloadManager) -> None: - """Handle interactive download process.""" - try: - console.print("\n[bold cyan]🔗 Media Download[/]") - - # Get URL from user - url = Prompt.ask("[yellow]Enter media URL[/]") - if not url: - console.print("[red]No URL provided[/]") - return - - # Get download options - audio_only = Confirm.ask("[yellow]Audio only?[/]", default=False) - - if audio_only: - format_choice = Prompt.ask( - "[yellow]Audio format[/]", - choices=["mp3", "flac", "wav", "aac"], - default="mp3" - ) - quality = Prompt.ask( - "[yellow]Audio quality[/]", - choices=["64", "128", "192", "256", "320"], - default="192" - ) - options = { - "audio_only": True, - "audio_format": format_choice, - "audio_quality": quality - } - else: - resolution = Prompt.ask( - "[yellow]Video resolution (or 'best')[/]", - choices=["best", "1080p", "720p", "480p", "360p"], - default="best" - ) - options = { - "audio_only": False, - "resolution": resolution if resolution != "best" else None - } - - # Start download - console.print(f"[cyan]Starting download: {url}[/]") - - async with download_manager: - try: - # Ensure proper method call with only required arguments - result = await download_manager.download_with_options([url], options) - if result and len(result) > 0: - console.print(f"[green]✅ Successfully downloaded: {result[0]}[/]") - return result[0] - else: - console.print("[red]❌ Download failed - no files returned[/]") - return None - except TypeError as e: - if "positional arguments" in str(e): - console.print(f"[red]❌ Method signature error: {e}[/]") - logging.error(f"Download method signature mismatch: {e}") - else: - console.print(f"[red]❌ Type error during download: {e}[/]") - logging.error(f"Download type error: {e}") - return None - except Exception as e: - console.print(f"[red]❌ Download error: {e}[/]") - logging.error(f"Download exception: {e}") - return None - - except Exception as e: - console.print(f"[red]Error in download process: {e}[/]") - logging.error(f"Interactive download error: {e}") - - -async def _show_system_status(console: Console, download_manager: AsyncDownloadManager) -> None: - """Show system status information.""" - try: - console.print("\n[bold cyan]📊 System Status[/]") - - # Get system status - status = download_manager.get_system_status() - - # Create status table - table = Table(title="System Information", border_style="bright_blue") - table.add_column("Metric", style="cyan", min_width=20) - table.add_column("Value", style="green") - - table.add_row("Active Downloads", str(status.get("active_downloads", 0))) - table.add_row("Failed Attempts", str(status.get("failed_attempts", 0))) - table.add_row("Cache Size", str(status.get("cache_size", 0))) - table.add_row("Session Count", str(status.get("session_count", 0))) - - if "cpu_usage" in status: - table.add_row("CPU Usage", f"{status['cpu_usage']:.1f}%") - table.add_row("Memory Usage", f"{status['memory_usage']:.1f}%") - table.add_row("Disk Usage", f"{status['disk_usage']:.1f}%") - table.add_row("Network Usage", f"{status['network_usage']:.1f} Mbps") - console.print(table) - - # Show recommendations if available - if "performance_recommendations" in status: - recommendations = status["performance_recommendations"] - if recommendations: - console.print("\n[bold yellow]🔧 Performance Recommendations:[/]") - for rec in recommendations: - console.print(f" • {rec}") - - Prompt.ask(CONSTANTS["PRESS_ENTER"], default="") - - except Exception as e: - console.print(f"[red]Error showing system status: {e}[/]") - logging.error(f"System status error: {e}") - - -def _get_status_indicator(value: float, high_threshold: float, medium_threshold: float) -> str: - """Get status indicator based on value and thresholds.""" - if value > high_threshold: - return "🔴 High" - elif value > medium_threshold: - return "🟡 Medium" - else: - return "🟢 Normal" - -def _create_performance_metrics_table(metrics: dict) -> Table: - """Create a table displaying performance metrics.""" - table = Table(title="Performance Metrics", border_style="bright_green") - table.add_column("Metric", style="cyan", min_width=15) - table.add_column("Current", style="green") - table.add_column("Status", style="yellow") - - # CPU metrics - cpu_percent = metrics.get("cpu_percent", 0) - cpu_status = _get_status_indicator(cpu_percent, 80, 50) - table.add_row("CPU Usage", f"{cpu_percent:.1f}%", cpu_status) - - # Memory metrics - memory_percent = metrics.get("memory_percent", 0) - memory_status = _get_status_indicator(memory_percent, 85, 70) - table.add_row("Memory Usage", f"{memory_percent:.1f}%", memory_status) - - # Network metrics - network_mbps = metrics.get("network_mbps", 0) - network_status = "🟢 Active" if network_mbps > 1 else "🟡 Idle" - table.add_row("Network", f"{network_mbps:.1f} Mbps", network_status) - - return table - -async def _handle_performance_optimization(console: Console, download_manager: AsyncDownloadManager) -> None: - """Handle performance optimization if requested by user.""" - if not Confirm.ask("\n[yellow]Run performance optimization?[/]", default=False): - return - - console.print("[cyan]Running optimization...[/]") - result = await download_manager.optimize_performance() - - if result.get("optimizations_applied"): - console.print("[green]✅ Optimizations applied:[/]") - for opt in result["optimizations_applied"]: - console.print(f" • {opt}") - else: - console.print("[green]✅ System is already optimized[/]") - -async def _show_performance_monitor(console: Console, download_manager: AsyncDownloadManager) -> None: - """Show performance monitoring information.""" - try: - console.print("\n[bold cyan]⚡ Performance Monitor[/]") - - if not download_manager.performance_monitor: - console.print("[yellow]Performance monitor not available[/]") - return - - # Get current metrics and display table - metrics = download_manager.performance_monitor.get_current_metrics() - table = _create_performance_metrics_table(metrics) - console.print(table) - - # Handle optimization if requested - await _handle_performance_optimization(console, download_manager) - - Prompt.ask(CONSTANTS["PRESS_ENTER"], default="") - - except Exception as e: - console.print(f"[red]Error in performance monitor: {e}[/]") - logging.error(f"Performance monitor error: {e}") - - -async def _show_queue_management(console: Console, download_manager: AsyncDownloadManager) -> None: - """Show download queue management.""" - try: - console.print("\n[bold cyan]📁 Queue Management[/]") - - if not download_manager.advanced_scheduler: - console.print("[yellow]Advanced scheduler not available[/]") - return - - # Get scheduler status - status = download_manager.advanced_scheduler.get_status() - - # Create status table - table = Table(title="Scheduler Status", border_style="bright_magenta") - table.add_column("Property", style="cyan") - table.add_column("Value", style="green") - - table.add_row("Queue Size", str(status.get("queue_size", 0))) - table.add_row("Active", "Yes" if status.get("active", False) else "No") - table.add_row("Bandwidth Usage", f"{status.get('bandwidth_usage', 0):.1f} Mbps") - - console.print(table) - - # Queue management options - console.print("\n[bold yellow]Queue Options:[/]") - console.print(" [1] Pause scheduler") - console.print(" [2] Resume scheduler") - console.print(" [3] Clear queue") - console.print(" [4] Back to main menu") - - choice = Prompt.ask( - "[yellow]Select option[/]", - choices=["1", "2", "3", "4"], - default="4" - ) - - if choice == "1": - await download_manager.advanced_scheduler.pause() - console.print("[green]✅ Scheduler paused[/]") - elif choice == "2": - await download_manager.advanced_scheduler.resume() - console.print("[green]✅ Scheduler resumed[/]") - elif choice == "3": - if Confirm.ask("[red]Clear all queued downloads?[/]", default=False): - await download_manager.advanced_scheduler.clear_queue() - console.print("[green]✅ Queue cleared[/]") - if choice != "4": - Prompt.ask(CONSTANTS["PRESS_ENTER"], default="") - - except Exception as e: - console.print(f"[red]Error in queue management: {e}[/]") - logging.error(f"Queue management error: {e}") - - -def _show_settings(console: Console, config: Dict[str, Any]) -> None: - """Show settings configuration.""" - try: - console.print("\n[bold cyan]⚙️ Settings[/]") - - # Create settings table - table = Table(title="Current Settings", border_style="bright_yellow") - table.add_column("Setting", style="cyan", min_width=20) - table.add_column("Value", style="green") - - # Show key settings - table.add_row("Max Concurrent Downloads", str(config.get("max_concurrent", 3))) - table.add_row("Video Output Dir", config.get("video_output", "downloads/video")) - table.add_row("Audio Output Dir", config.get("audio_output", "downloads/audio")) - table.add_row("Bandwidth Limit", f"{config.get('bandwidth_limit', 0)} Mbps" if config.get('bandwidth_limit') else "Unlimited") - table.add_row("P2P Enabled", "Yes" if config.get("p2p_enabled", False) else "No") - console.print(table) - - # Settings modification - if Confirm.ask("\n[yellow]Modify settings?[/]", default=False): - console.print("[cyan]Settings modification not implemented in this version[/]") - console.print("[dim]Use the config.json file to modify settings[/]") - - Prompt.ask(CONSTANTS["PRESS_ENTER"], default="") - - except Exception as e: - console.print(f"[red]Error showing settings: {e}[/]") - logging.error(f"Settings error: {e}") - - -def _show_help(console: Console) -> None: - """Show help information.""" - try: - console.print("\n[bold cyan]❓ Help & Documentation[/]") - - help_content = """ -[bold yellow]🎬 Snatch Media Downloader - Help[/] - -[cyan]Basic Usage:[/] -• Enter media URLs to download content -• Choose between audio-only or video downloads -• Select quality and format options -• Monitor download progress in real-time - -[cyan]Features:[/] -• Multi-format support (MP4, MP3, FLAC, etc.) -• Concurrent downloads with intelligent scheduling -• P2P sharing capabilities (if enabled) -• Advanced audio processing options -• Real-time performance monitoring -• Resume interrupted downloads - -[cyan]Supported Sites:[/] -• YouTube, Vimeo, Dailymotion -• SoundCloud, Bandcamp -• Many more via yt-dlp support - -[cyan]Keyboard Shortcuts:[/] -• Ctrl+C: Cancel current operation -• Enter: Confirm selection -• Q: Quit (from main menu) - -[cyan]Configuration:[/] -• Edit config.json for advanced settings -• Set custom output directories -• Configure bandwidth limits -• Enable/disable P2P features - -[green]For more help, visit the documentation or check the GitHub repository.[/] - """ - console.print(Panel(help_content, border_style="bright_cyan")) - Prompt.ask(CONSTANTS["PRESS_ENTER"], default="") - - except Exception as e: - console.print(f"[red]Error showing help: {e}[/]") - logging.error(f"Help display error: {e}") + console.print(Panel( + "[bold cyan]Snatch Media Downloader[/]\n\n" + "The interactive TUI could not start.\n" + "Use CLI commands instead:\n\n" + " [green]snatch download [/] Download media\n" + " [green]snatch download -a [/] Download audio only\n" + " [green]snatch --help[/] Show all commands\n", + title="Snatch", + border_style="cyan", + )) + + +# Backward-compatible aliases +def launch_textual_interface(config: Dict[str, Any]) -> None: + """Alias for launch_enhanced_interactive_mode.""" + launch_enhanced_interactive_mode(config) -# Compatibility alias for backward compatibility def run_interactive_mode(config: Dict[str, Any]) -> None: - """Alias for launch_textual_interface for backward compatibility.""" - launch_textual_interface(config) - + """Alias for launch_enhanced_interactive_mode.""" + launch_enhanced_interactive_mode(config) -# Enhanced Cyberpunk Integration -def launch_enhanced_interactive_mode(config: Dict[str, Any]) -> None: - """Launch the enhanced modern interactive interface with fallbacks. - - This function tries to launch the modern interface first, then falls back - to working and other interfaces if needed. - - Args: - config: Application configuration dictionary - """ - try: - # First try to launch the modern interface - logging.info("Launching modern interactive interface...") - from .theme.modern_interactive import run_modern_interactive - run_modern_interactive(config) - except Exception as e: - logging.warning(f"Modern interface failed: {e}") - logging.info("Falling back to standard textual interface...") - try: - # Fall back to standard textual interface - launch_textual_interface(config) - except Exception as textual_error: - logging.error(f"Standard interface also failed: {textual_error}") - logging.info("Falling back to console interface...") - try: - # Final fallback to console interface - import asyncio - asyncio.run(launch_interactive_mode(config)) - except Exception as final_error: - console = Console() - console.print("[red]All interface modes failed:[/]") - console.print(f"[red]Modern error: {e}[/]") - console.print(f"[red]Working error: {fallback_error}[/]") - console.print(f"[red]Textual error: {textual_error}[/]") - console.print(f"[red]Console error: {final_error}[/]") - raise final_error - -# Update the main initialization function to use cyberpunk by default def initialize_interactive_mode(config: Dict[str, Any]) -> None: - """Initialize interactive mode with proper error handling and fallbacks.""" - try: - # Try to launch the modern Textual interface first - launch_enhanced_interactive_mode(config) - except ImportError as e: - # If Textual is not available, fall back to Rich console interface - logging.warning(f"Textual interface not available: {e}") - logging.info("Falling back to Rich console interface") - launch_interactive_mode(config) - except Exception as e: - # For any other error, try the fallback - logging.error(f"Error with Textual interface: {e}") - logging.info("Attempting fallback to Rich console interface") - try: - launch_interactive_mode(config) - except Exception as final_error: - console = Console() - console.print("[red]All interface modes failed:[/]") - console.print(f"[red]Cyberpunk error: {e}[/]") - console.print(f"[red]Textual error: {final_error}[/]") - raise final_error \ No newline at end of file + """Alias for launch_enhanced_interactive_mode.""" + launch_enhanced_interactive_mode(config) diff --git a/snatch/theme/__init__.py b/snatch/theme/__init__.py index 82dcb00..3e50609 100644 --- a/snatch/theme/__init__.py +++ b/snatch/theme/__init__.py @@ -1,7 +1,5 @@ """ Theme sub-package for Snatch interactive interfaces. - -Contains modern and enhanced interactive interfaces using Textual framework. """ try: @@ -10,13 +8,7 @@ run_modern_interactive = None ModernSnatchApp = None -try: - from .textual_interface import start_textual_interface -except ImportError: - start_textual_interface = None - __all__ = [ 'run_modern_interactive', 'ModernSnatchApp', - 'start_textual_interface', ] diff --git a/snatch/theme/textual_interface.py b/snatch/theme/textual_interface.py deleted file mode 100644 index 02db8b9..0000000 --- a/snatch/theme/textual_interface.py +++ /dev/null @@ -1,1022 +0,0 @@ -#!/usr/bin/env python3 -""" -textual_interface_improved.py - Enhanced Snatch TUI Interface using Textual - -A modern, feature-rich terminal user interface for media downloads using Textual framework. -This provides an enhanced user experience with interactive widgets and responsive design. -Works with the unified download manager for optimal performance. - -Features: -- Responsive grid layout with dynamic resizing -- Interactive widgets with enhanced styling -- Live download progress tracking with detailed statistics -- Format selection matrix with quality indicators -- Media preview panel with rich metadata display -- Download queue management with priority control -- System resource monitoring dashboard -- Network speed testing and diagnostics -- Dark/light theme support -""" - -import os -import sys -import asyncio -import time -from pathlib import Path -from typing import Dict, Any, List, Optional, Tuple, Union, Callable -from datetime import datetime, timedelta - -from rich.style import Style -from rich.text import Text -from rich import box -from rich.console import RenderableType -from rich.table import Table -from rich.panel import Panel -from rich.progress import BarColumn, Progress -from rich.syntax import Syntax - -from textual import work -from textual.app import App, ComposeResult -from textual.binding import Binding -from textual.containers import Container, Grid, Horizontal, Vertical, VerticalScroll, ScrollableContainer -from textual.screen import Screen, ModalScreen -from textual.widgets import ( - Button, Static, Input, Label, Header, Footer, - ProgressBar, ContentSwitcher, RadioSet, RadioButton, - Checkbox, OptionList, TextLog, DataTable, Rule, - Select, ListView, Markdown, Switch -) -from textual.widget import Widget -from textual.reactive import reactive - -from ..constants import VERSION as APP_VERSION -from ..defaults import BANNER_ART, HELP_CONTENT -from ..audio_processor import EnhancedAudioProcessor -from ..common_utils import sanitize_filename, format_size, ensure_dir -from ..network import NetworkManager, SpeedTestResult - - -# Import conditionally to handle non-interactive environments -try: - from yt_dlp import YoutubeDL - from yt_dlp.utils import DownloadError - YTDLP_AVAILABLE = True -except ImportError: - YTDLP_AVAILABLE = False - -# Define color themes -LIGHT_THEME = { - "primary": "dark_blue", - "secondary": "blue", - "accent": "magenta", - "background": "white", - "foreground": "black", - "muted": "gray70", - "success": "green", - "warning": "orange3", - "error": "red", - "info": "blue" -} - -DARK_THEME = { - "primary": "cyan", - "secondary": "bright_blue", - "accent": "bright_magenta", - "background": "black", - "foreground": "white", - "muted": "gray50", - "success": "bright_green", - "warning": "yellow", - "error": "bright_red", - "info": "bright_blue" -} - -class MediaPreviewWidget(Static): - """Media preview widget with metadata display and thumbnail.""" - - def __init__(self, name: str = None): - super().__init__(name=name) - self.metadata = {} - - def update_metadata(self, metadata: Dict[str, Any]) -> None: - """Update metadata and refresh the widget.""" - self.metadata = metadata - self.refresh() - - def clear_metadata(self) -> None: - """Clear metadata and refresh widget.""" - self.metadata = {} - self.refresh() - - def render(self) -> RenderableType: - """Render the media preview with metadata.""" - if not self.metadata: - return Text("No media loaded. Enter a URL to begin.", style="dim") - - # Format title - title = self.metadata.get('title', 'Unknown') - - # Base table structure - grid = Table.grid(padding=(0, 2), expand=True) - grid.add_column("Label", style="bright_blue", justify="right", width=15) - grid.add_column("Value", style="bright_white", ratio=2) - - # Basic metadata - grid.add_row("📺 Title", Text(title[:60] + "..." if len(title) > 60 else title, style="bright_white bold")) - - if self.metadata.get('duration'): - duration = self._format_duration(self.metadata['duration']) - grid.add_row("⏱️ Duration", duration) - - if self.metadata.get('upload_date'): - grid.add_row("📅 Released", self._format_date(self.metadata['upload_date'])) - - if self.metadata.get('uploader'): - grid.add_row("👤 Uploader", Text(self.metadata['uploader'], style="cyan")) - - # Media-specific information - if self.metadata.get('width') and self.metadata.get('height'): - resolution = f"{self.metadata['width']}x{self.metadata['height']}" - grid.add_row("🎥 Resolution", resolution) - - # Format info - if self.metadata.get('ext'): - grid.add_row("📦 Format", self._format_format_info(self.metadata)) - - if self.metadata.get('filesize'): - grid.add_row("💾 Size", format_size(self.metadata['filesize'])) - - if self.metadata.get('view_count'): - grid.add_row("👁️ Views", f"{self.metadata['view_count']:,}") - - # Add links section - if self.metadata.get('webpage_url'): - grid.add_row("🔗 Source", Text(self.metadata['webpage_url'], style="underline cyan")) - - # Add thumbnail URL if available - if self.metadata.get('thumbnail'): - grid.add_row("🖼️ Thumbnail", Text("Available", style="green")) - - # Wrap in a panel - return Panel( - grid, - title="Media Information", - border_style="bright_blue", - box=box.ROUNDED - ) - - def _format_duration(self, seconds: Union[float, int]) -> str: - """Format duration in human-readable time format.""" - if not seconds: - return "Unknown" - - hours, remainder = divmod(int(seconds), 3600) - minutes, seconds = divmod(remainder, 60) - - if hours > 0: - return f"{hours}h {minutes}m {seconds}s" - elif minutes > 0: - return f"{minutes}m {seconds}s" - else: - return f"{seconds}s" - - def _format_date(self, date_str: str) -> str: - """Format date in a human-readable format.""" - try: - # Format YYYYMMDD to YYYY-MM-DD - if len(date_str) == 8 and date_str.isdigit(): - return f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" - return date_str - except Exception: - return date_str - - def _format_format_info(self, metadata: Dict[str, Any]) -> str: - """Format codec information with color highlighting.""" - parts = [] - - # File extension - if metadata.get('ext'): - parts.append(f"{metadata['ext'].upper()}") - - # Video codec - if metadata.get('vcodec') and metadata['vcodec'] != 'none': - parts.append(f"{metadata['vcodec']}") - - # Audio codec - if metadata.get('acodec') and metadata['acodec'] != 'none': - parts.append(f"{metadata['acodec']}") - - return " • ".join(parts) if parts else "Unknown" - -class FormatSelectionWidget(Static): - """Widget for selecting download formats with enhanced visualization.""" - - def __init__(self, - name: str = None, - on_format_select: Optional[Callable[[str], None]] = None): - super().__init__(name=name) - self.formats = [] - self.selected_format_id = None - self.on_format_select = on_format_select - - def update_formats(self, formats: List[Dict[str, Any]]) -> None: - """Update available formats and refresh widget.""" - if formats: - # Group formats by type (audio, video, video+audio) - video_formats = [] - audio_formats = [] - combined_formats = [] - - for fmt in formats: - if fmt.get('vcodec', 'none') != 'none' and fmt.get('acodec', 'none') != 'none': - combined_formats.append(fmt) - elif fmt.get('vcodec', 'none') != 'none': - video_formats.append(fmt) - elif fmt.get('acodec', 'none') != 'none': - audio_formats.append(fmt) - - # Sort formats by quality - video_formats.sort(key=lambda x: (x.get('height', 0) or 0, x.get('fps', 0) or 0, x.get('tbr', 0) or 0), reverse=True) - audio_formats.sort(key=lambda x: (x.get('abr', 0) or 0), reverse=True) - combined_formats.sort(key=lambda x: (x.get('height', 0) or 0, x.get('tbr', 0) or 0), reverse=True) - - # Create final format list - self.formats = [] - - # Add a "Best quality" automatic option - self.formats.append({ - "format_id": "best", - "format_note": "Best Quality (Automatic)", - "ext": "auto", - "resolution": "auto", - "is_auto": True - }) - - # Add combined formats - self.formats.extend(combined_formats) - - # Add video-only formats - self.formats.extend(video_formats) - - # Add audio-only formats - self.formats.extend(audio_formats) - - # Select best format by default - if self.formats: - self.selected_format_id = self.formats[0]['format_id'] - else: - self.formats = [] - self.selected_format_id = None - - self.refresh() - - def on_click(self, event) -> None: - """Handle click events on the format table.""" - # Extract format ID from the clicked row - try: - # Try to get the format ID from the clicked position - meta = event.style.meta - if meta and "format_id" in meta: - format_id = meta["format_id"] - self.selected_format_id = format_id - self.refresh() - - if self.on_format_select: - self.on_format_select(format_id) - except Exception: - pass - - def render(self) -> RenderableType: - """Render the format selection table.""" - if not self.formats: - return Text("No formats available", style="dim") - - table = Table( - box=box.SIMPLE, - expand=True, - show_header=True, - header_style="bold bright_blue" - ) - - # Define columns - table.add_column("Format", style="cyan") - table.add_column("Resolution", justify="center") - table.add_column("Extension", justify="center") - table.add_column("FPS", justify="right") - table.add_column("Size", justify="right") - table.add_column("Bitrate", justify="right") - table.add_column("Codec", style="green") - - # Add rows for each format - for fmt in self.formats: - # Format ID and note - format_text = f"{fmt.get('format_id', 'unknown')}" - if fmt.get('format_note'): - format_text += f" ({fmt['format_note']})" - - # Resolution - if fmt.get('is_auto', False): - resolution = "Auto" - elif fmt.get('height', 0) > 0 and fmt.get('width', 0) > 0: - resolution = f"{fmt['width']}x{fmt['height']}" - else: - resolution = "audio only" if fmt.get('acodec', 'none') != 'none' else "N/A" - - # File extension - extension = fmt.get('ext', 'N/A') - - # FPS - fps = f"{fmt['fps']}" if fmt.get('fps') else "N/A" - - # Filesize - if fmt.get('filesize'): - size = format_size(fmt['filesize']) - elif fmt.get('filesize_approx'): - size = f"~{format_size(fmt['filesize_approx'])}" - else: - size = "N/A" - - # Bitrate - if fmt.get('tbr'): - bitrate = f"{fmt['tbr']:.1f} kbps" - elif fmt.get('abr'): - bitrate = f"{fmt['abr']:.1f} kbps" - else: - bitrate = "N/A" - - # Codec info - codecs = [] - if fmt.get('vcodec') and fmt.get('vcodec') != 'none': - codecs.append(f"V:{fmt['vcodec']}") - if fmt.get('acodec') and fmt.get('acodec') != 'none': - codecs.append(f"A:{fmt['acodec']}") - codec_text = ", ".join(codecs) if codecs else "N/A" - - # Style for selected row - row_style = "bold reverse" if fmt.get('format_id') == self.selected_format_id else "" - - # Add meta information for click handler - meta = {"format_id": fmt.get('format_id', 'unknown')} - - table.add_row( - format_text, - resolution, - extension, - fps, - size, - bitrate, - codec_text, - style=row_style, - end_section=(fmt == self.formats[-1] or - (fmt.get('acodec', 'none') == 'none' and self.formats[self.formats.index(fmt)+1].get('acodec', 'none') != 'none') or - (fmt.get('vcodec', 'none') == 'none' and self.formats[self.formats.index(fmt)+1].get('vcodec', 'none') != 'none')), - meta=meta - ) - - return Panel( - table, - title="Available Formats [click to select]", - border_style="bright_blue", - box=box.ROUNDED - ) - -class DownloadProgressWidget(Static): - """Enhanced widget for displaying download progress.""" - - def __init__(self, name: str = None): - super().__init__(name=name) - self.progress_data = { - "status": "idle", - "filename": None, - "title": None, - "percent": 0, - "speed": None, - "eta": None, - "size": None, - "message": "Ready to download", - "start_time": None, - "end_time": None - } - - def update_progress(self, data: Dict[str, Any]) -> None: - """Update progress information.""" - # Update only provided fields - self.progress_data.update(data) - - # Set start time if downloading just began - if data.get('status') == 'downloading' and not self.progress_data.get('start_time'): - self.progress_data['start_time'] = time.time() - - # Set end time if download just completed - if data.get('status') in ('completed', 'error') and not self.progress_data.get('end_time'): - self.progress_data['end_time'] = time.time() - - self.refresh() - - def reset(self) -> None: - """Reset progress to initial state.""" - self.progress_data = { - "status": "idle", - "filename": None, - "title": None, - "percent": 0, - "speed": None, - "eta": None, - "size": None, - "message": "Ready to download", - "start_time": None, - "end_time": None - } - self.refresh() - - def render(self) -> RenderableType: - """Render the progress display.""" - status = self.progress_data['status'] - - # Create base grid - grid = Table.grid(padding=(0, 2), expand=True) - grid.add_column("Label", width=12, style="bright_blue") - grid.add_column("Value", ratio=2) - - # Add title if available - if self.progress_data.get('title'): - title_text = Text(self.progress_data['title'], style="bold") - grid.add_row("Title", title_text) - - # Add filename if available - if self.progress_data.get('filename'): - filename = os.path.basename(self.progress_data['filename']) - grid.add_row("Filename", filename) - - # Create progress bar - if status in ('downloading', 'processing'): - percent = min(100, max(0, self.progress_data.get('percent', 0))) - - progress_bar = Progress( - "[progress.description]{task.description}", - BarColumn(bar_width=40), - "[progress.percentage]{task.percentage:>3.0f}%", - expand=True - ) - - # Add task - task_id = progress_bar.add_task("", total=100, completed=percent) - - # Add progress row - grid.add_row("Progress", progress_bar) - - # Add speed and ETA if available - if self.progress_data.get('speed'): - grid.add_row("Speed", f"{self.progress_data['speed']}") - - if self.progress_data.get('eta'): - grid.add_row("ETA", f"{self.progress_data['eta']}") - - if self.progress_data.get('size'): - grid.add_row("Size", f"{self.progress_data['size']}") - - # Add elapsed time if download is in progress or completed - if status in ('downloading', 'processing', 'completed', 'error'): - if self.progress_data.get('start_time'): - end_time = self.progress_data.get('end_time') or time.time() - elapsed = end_time - self.progress_data['start_time'] - elapsed_str = self._format_time(elapsed) - grid.add_row("Elapsed", elapsed_str) - - # Add status message - status_style = { - 'idle': "dim", - 'downloading': "bright_blue", - 'processing': "bright_magenta", - 'completed': "bright_green", - 'error': "bright_red" - }.get(status, "white") - - message = self.progress_data.get('message') or f"Status: {status}" - grid.add_row("Status", Text(message, style=status_style)) - - # Wrap in a panel - panel_title = { - 'idle': "Download Ready", - 'downloading': "⬇️ Downloading", - 'processing': "🔄 Processing", - 'completed': "✅ Download Complete", - 'error': "❌ Download Error" - }.get(status, "Download Progress") - - panel_style = { - 'idle': "blue", - 'downloading': "cyan", - 'processing': "magenta", - 'completed': "green", - 'error': "red" - }.get(status, "blue") - - return Panel( - grid, - title=panel_title, - border_style=panel_style, - box=box.ROUNDED - ) - - def _format_time(self, seconds: float) -> str: - """Format time in seconds to human-readable format.""" - hours, remainder = divmod(int(seconds), 3600) - minutes, seconds = divmod(remainder, 60) - - if hours > 0: - return f"{hours}h {minutes}m {seconds}s" - elif minutes > 0: - return f"{minutes}m {seconds}s" - else: - return f"{seconds}s" - -class NetworkInfoWidget(Static): - """Widget for displaying network information and speed test results.""" - - def __init__(self, network_manager: NetworkManager, name: str = None): - super().__init__(name=name) - self.network_manager = network_manager - self.connection_status = False - self.speed_test_result = None - - @work - async def update_connection_status(self) -> None: - """Update connection status asynchronously.""" - self.connection_status = await self.network_manager.check_connection() - self.refresh() - - @work - async def run_speed_test(self) -> None: - """Run a network speed test asynchronously.""" - self.speed_test_result = await self.network_manager.run_speed_test() - self.refresh() - - def render(self) -> RenderableType: - """Render network information.""" - grid = Table.grid(padding=(0, 2), expand=True) - grid.add_column("Label", style="bright_blue", justify="right", width=15) - grid.add_column("Value", style="white", ratio=2) - - # Connection status - status_text = "Connected" if self.connection_status else "Disconnected" - status_style = "green bold" if self.connection_status else "red bold" - grid.add_row("Status", Text(status_text, style=status_style)) - - # Speed test results - if self.speed_test_result: - grid.add_row("Download", Text(f"{self.speed_test_result.download_mbps:.2f} Mbps", style="green")) - grid.add_row("Upload", Text(f"{self.speed_test_result.upload_mbps:.2f} Mbps", style="cyan")) - grid.add_row("Ping", Text(f"{self.speed_test_result.ping_ms:.0f} ms", style="yellow")) - test_time = datetime.fromtimestamp(self.speed_test_result.timestamp).strftime("%H:%M:%S") - grid.add_row("Last Test", test_time) - else: - grid.add_row("Speed Test", Text("Not run yet", style="dim")) - - return Panel( - grid, - title="Network Information", - border_style="blue", - box=box.ROUNDED - ) - -class ConfigScreen(ModalScreen): - """Configuration screen for download settings.""" - - def __init__(self, config: Dict[str, Any]): - super().__init__() - self.config = config - - def compose(self) -> ComposeResult: - """Create child widgets.""" - with Container(id="config-container"): - yield Label("Download Configuration", id="config-title", classes="title") - yield Rule() - - with Grid(id="config-grid"): - # Media settings - yield Label("Media Type:") - with Horizontal(): - yield RadioButton("Video + Audio", value=not self.config.get("audio_only", False), id="media-type-video") - yield RadioButton("Audio Only", value=self.config.get("audio_only", False), id="media-type-audio") - - # Video quality settings - yield Label("Video Quality:") - with Horizontal(): - yield Select( - [(res, res) for res in ["Best", "1080p", "720p", "480p", "360p"]], - value=self.config.get("video_quality", "Best"), - id="video-quality" - ) - - # Audio quality settings - yield Label("Audio Format:") - with Horizontal(): - yield Select( - [(fmt, fmt) for fmt in ["opus", "mp3", "aac", "flac", "wav"]], - value=self.config.get("audio_format", "opus"), - id="audio-format" - ) - - # Audio processing options - yield Label("Audio Processing:") - with Grid(id="audio-options-grid"): - yield Checkbox("Normalize Audio", value=self.config.get("normalize_audio", False), id="normalize-audio") - yield Checkbox("Denoise Audio", value=self.config.get("denoise_audio", False), id="denoise-audio") - yield Checkbox("Upmix to 7.1 Surround", value=self.config.get("upmix_surround", False), id="upmix-surround") - - yield Rule() - - with Horizontal(id="config-buttons"): - yield Button("Save", variant="primary", id="save-config") - yield Button("Cancel", id="cancel-config") - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button press events.""" - if event.button.id == "save-config": - # Save configuration - self.config["audio_only"] = self.query_one("#media-type-audio").value - self.config["video_quality"] = self.query_one("#video-quality").value - self.config["audio_format"] = self.query_one("#audio-format").value - self.config["normalize_audio"] = self.query_one("#normalize-audio").value - self.config["denoise_audio"] = self.query_one("#denoise-audio").value - self.config["upmix_surround"] = self.query_one("#upmix-surround").value - - # Dismiss the modal - self.dismiss(self.config) - elif event.button.id == "cancel-config": - # Dismiss without saving - self.dismiss() - -class SnatchTextualApp(App): - """Main Textual app for Snatch.""" - - CSS_PATH = "textual.css" # Optional custom CSS - BINDINGS = [ - Binding("d", "download", "Download"), - Binding("q", "quit", "Quit"), - Binding("c", "show_config", "Config"), - Binding("h", "show_help", "Help"), - Binding("t", "test_network", "Test Network"), - ] - - def __init__(self, config: Dict[str, Any]): - """Initialize the app with configuration.""" - super().__init__() - self.config = config - self.metadata = {} - self.selected_format = None - self.download_manager = None # Will be initialized later - self.network_manager = NetworkManager(config) - self.audio_processor = EnhancedAudioProcessor(config) - self.download_path = config.get("download_directory", os.path.expanduser("~/Downloads")) - - # Ensure download directory exists - ensure_dir(self.download_path) - - def on_mount(self) -> None: - """Handle app mounting.""" - # Initialize async components - self.network_info = self.query_one("#network-info", NetworkInfoWidget) - self.network_info.update_connection_status() - - def compose(self) -> ComposeResult: - """Create child widgets.""" - # App header - yield Header(show_clock=True) - - # Main layout grid - with Grid(id="main-grid"): - # URL input and controls - with Container(id="url-container"): - yield Label("Enter URL:", id="url-label") - yield Input(placeholder="Enter media URL...", id="url-input") - - with Horizontal(id="url-buttons"): - yield Button("Fetch Info", variant="primary", id="fetch-button") - yield Button("Download", variant="success", id="download-button") - yield Button("Configure", id="config-button") - - # Media preview - yield MediaPreviewWidget(name="media-preview", id="media-preview") - - # Format selection - with Container(id="format-container"): - yield FormatSelectionWidget(name="format-selection", id="format-selection") - - # Download progress - yield DownloadProgressWidget(name="progress-widget", id="progress-widget") - - # Network info - yield NetworkInfoWidget(self.network_manager, name="network-info", id="network-info") - - # App footer - yield Footer() - - def on_button_pressed(self, event: Button.Pressed) -> None: - """Handle button press events.""" - button_id = event.button.id - - if button_id == "fetch-button": - url = self.query_one("#url-input").value - if url: - self.fetch_media_info(url) - - elif button_id == "download-button": - url = self.query_one("#url-input").value - if url and self.metadata: - self.start_download(url) - - elif button_id == "config-button": - self.action_show_config() - - def action_download(self) -> None: - """Handle download action.""" - url = self.query_one("#url-input").value - if url and self.metadata: - self.start_download(url) - elif url: - self.fetch_media_info(url) - - def action_show_config(self) -> None: - """Show configuration screen.""" - def handle_config_update(config): - if config: - self.config.update(config) - - config_screen = ConfigScreen(self.config.copy()) - self.push_screen(config_screen, handle_config_update) - - def action_test_network(self) -> None: - """Run network speed test.""" - self.notify("Running network speed test...") - self.network_info.run_speed_test() - - def action_show_help(self) -> None: - """Show help information.""" - self.push_screen( - ModalScreen( - Container( - Markdown(HELP_CONTENT), - Button("Close", variant="primary", id="close-help"), - id="help-container" - ) - ) - ) - - def action_quit(self) -> None: - """Quit the application.""" - self.exit() - - @work - async def fetch_media_info(self, url: str) -> None: - """Fetch media information asynchronously.""" - progress_widget = self.query_one("#progress-widget", DownloadProgressWidget) - media_preview = self.query_one("#media-preview", MediaPreviewWidget) - format_selection = self.query_one("#format-selection", FormatSelectionWidget) - - progress_widget.update_progress({ - "status": "processing", - "message": "Fetching media information..." - }) - - try: - # Check if URL is valid - if not await self.network_manager.check_url_availability(url): - self.notify("Invalid or unavailable URL", severity="error") - progress_widget.update_progress({ - "status": "error", - "message": "Invalid or unavailable URL" - }) - return - - # Extract info using yt-dlp - if YTDLP_AVAILABLE: - ydl_opts = { - 'quiet': True, - 'no_warnings': True, - 'skip_download': True, - } - - with YoutubeDL(ydl_opts) as ydl: - info = await asyncio.to_thread(ydl.extract_info, url, download=False) - - if info: - self.metadata = info - media_preview.update_metadata(info) - - # Update format selection - if 'formats' in info: - format_selection.update_formats(info['formats']) - - progress_widget.update_progress({ - "status": "idle", - "title": info.get('title', 'Unknown'), - "message": "Ready to download" - }) - - self.notify("Media information fetched successfully", severity="success") - else: - self.notify("Failed to fetch media information", severity="error") - progress_widget.update_progress({ - "status": "error", - "message": "Failed to fetch media information" - }) - else: - self.notify("yt-dlp is not available. Please install it.", severity="error") - progress_widget.update_progress({ - "status": "error", - "message": "yt-dlp is not available" - }) - - except Exception as e: - self.notify(f"Error: {str(e)}", severity="error") - progress_widget.update_progress({ - "status": "error", - "message": f"Error: {str(e)}" - }) - - @work - async def start_download(self, url: str) -> None: - """Start the download process asynchronously.""" - progress_widget = self.query_one("#progress-widget", DownloadProgressWidget) - format_selection = self.query_one("#format-selection", FormatSelectionWidget) - - progress_widget.update_progress({ - "status": "downloading", - "title": self.metadata.get('title', 'Unknown'), - "message": "Starting download...", - "percent": 0 - }) - - try: - # Create download configuration - title = self.metadata.get('title', 'Unknown') - sanitized_title = sanitize_filename(title) - output_path = os.path.join(self.download_path, sanitized_title) - - # Select format based on configuration - selected_format = format_selection.selected_format_id - - # Build yt-dlp options - ydl_opts = { - 'outtmpl': f'{output_path}.%(ext)s', - 'progress_hooks': [self._progress_hook], - 'quiet': True, - } - - # Handle audio-only downloads - if self.config.get("audio_only", False): - ydl_opts.update({ - 'format': 'bestaudio/best', - 'postprocessors': [{ - 'key': 'FFmpegExtractAudio', - 'preferredcodec': self.config.get("audio_format", "mp3"), - 'preferredquality': '192', - }], - }) - else: - # For video formats - if selected_format and selected_format != "best": - ydl_opts.update({'format': selected_format}) - else: - # Quality-based format selector - quality_map = { - 'Best': 'bestvideo+bestaudio/best', - '1080p': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]', - '720p': 'bestvideo[height<=720]+bestaudio/best[height<=720]', - '480p': 'bestvideo[height<=480]+bestaudio/best[height<=480]', - '360p': 'bestvideo[height<=360]+bestaudio/best[height<=360]', - } - - ydl_opts.update({ - 'format': quality_map.get(self.config.get("video_quality", "Best"), 'bestvideo+bestaudio/best'), - }) - - # Start download - with YoutubeDL(ydl_opts) as ydl: - info = await asyncio.to_thread(ydl.extract_info, url, download=True) - filename = ydl.prepare_filename(info) - - # Update progress to completed - progress_widget.update_progress({ - "status": "completed", - "filename": filename, - "title": info.get('title', 'Unknown'), - "percent": 100, - "message": "Download completed!" - }) - - self.notify("Download completed successfully!", severity="success") - - # Apply audio processing if needed - if self.config.get("audio_only", False) and ( - self.config.get("upmix_surround", False) or - self.config.get("denoise_audio", False) or - self.config.get("normalize_audio", False) - ): - await self._apply_audio_processing(filename) - - except Exception as e: - progress_widget.update_progress({ - "status": "error", - "message": f"Error: {str(e)}" - }) - self.notify(f"Download error: {str(e)}", severity="error") - - def _progress_hook(self, d: Dict[str, Any]) -> None: - """Progress hook for yt-dlp.""" - progress_widget = self.query_one("#progress-widget", DownloadProgressWidget) - - if d['status'] == 'downloading': - # Extract progress info - try: - total = float(d.get('total_bytes', 0)) or float(d.get('total_bytes_estimate', 0)) - downloaded = float(d.get('downloaded_bytes', 0)) - - if total > 0: - percent = (downloaded / total) * 100 - speed = d.get('speed', 0) - eta = d.get('eta', 0) - - progress_widget.update_progress({ - "status": "downloading", - "filename": d.get('filename'), - "title": self.metadata.get('title', 'Unknown'), - "percent": percent, - "speed": f"{speed / 1024:.1f} KB/s" if speed else "N/A", - "eta": f"{eta // 60}:{eta % 60:02d}" if eta else "N/A", - "size": f"{downloaded / 1024 / 1024:.1f} MB of {total / 1024 / 1024:.1f} MB" - }) - except Exception: - pass - - elif d['status'] == 'finished': - # Download finished, prepare for post-processing - progress_widget.update_progress({ - "status": "processing", - "filename": d.get('filename'), - "message": "Processing media...", - "percent": 100 - }) - - async def _apply_audio_processing(self, audio_file: str) -> None: - """Apply audio processing options.""" - progress_widget = self.query_one("#progress-widget", DownloadProgressWidget) - - # Apply audio normalization if selected - if self.config.get("normalize_audio", False): - progress_widget.update_progress({ - "status": "processing", - "message": "Normalizing audio levels..." - }) - - success = await self.audio_processor.normalize_audio(audio_file) - if not success: - self.notify("Warning: Audio normalization process failed", severity="warning") - - # Apply 7.1 upmix if selected - if self.config.get("upmix_surround", False): - progress_widget.update_progress({ - "status": "processing", - "message": "Applying 7.1 surround sound upmix..." - }) - - success = await self.audio_processor.upmix_to_7_1(audio_file) - if not success: - self.notify("Warning: 7.1 upmix process failed", severity="warning") - - # Apply denoise if selected - if self.config.get("denoise_audio", False): - progress_widget.update_progress({ - "status": "processing", - "message": "Applying audio denoise filter..." - }) - - success = await self.audio_processor.denoise_audio(audio_file) - if not success: - self.notify("Warning: Audio denoise process failed", severity="warning") - - # Update progress to completed - progress_widget.update_progress({ - "status": "completed", - "message": "Audio processing completed!" - }) - - self.notify("Audio processing completed!", severity="success") - -async def run_textual_interface(config: Dict[str, Any]) -> None: - """Run the Textual interface.""" - app = SnatchTextualApp(config) - await app.run_async() - -def start_textual_interface(config: Dict[str, Any]) -> None: - """Start the Textual interface.""" - try: - asyncio.run(run_textual_interface(config)) - except KeyboardInterrupt: - print("\nTextual interface terminated by user.") - except Exception as e: - print(f"Error in Textual interface: {str(e)}") - import traceback - traceback.print_exc()