Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 211 additions & 44 deletions snatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,19 @@ async def run_download(self, urls: List[str], options: Dict[str, Any], non_inter
"""Run download(s) with proper context management"""
async with self.download_manager:
try:
console = Console()
console.print(f"[bold cyan]Processing {len(urls)} URLs[/]")
con = get_console()
con.print(f"[bold cyan]Processing {len(urls)} URLs[/]")
result = await self.download_manager.download_with_options(urls, options)
if result:
console.print(f"[bold green]Successfully downloaded {len(result)} files[/]")
con.print(f"[bold green]Successfully downloaded {len(result)} files[/]")
self.error_handler.log_error(
f"Successfully downloaded {len(result)} files",
ErrorCategory.DOWNLOAD,
ErrorSeverity.INFO,
context={"downloaded_files": len(result), "urls": urls}
)
except asyncio.CancelledError:
console = Console()
console.print("[bold yellow]Download was cancelled.[/]")
get_console().print("[bold yellow]Download was cancelled.[/]")
self.error_handler.log_error(
"Download process was cancelled",
ErrorCategory.DOWNLOAD,
Expand All @@ -181,8 +180,7 @@ async def run_download(self, urls: List[str], options: Dict[str, Any], non_inter
ErrorSeverity.ERROR,
context={"urls": urls, "options": options}
)
console = Console()
console.print(f"[bold red]Error: {str(e)}[/]")
get_console().print(f"[bold red]Error: {str(e)}[/]")
raise

def run_async(self, coro: Any) -> Any:
Expand Down Expand Up @@ -228,20 +226,33 @@ def execute_download(self, urls: List[str], options: Dict[str, Any]) -> int:
def _log_download_mode(self, options: Dict[str, Any]) -> None:
"""Log download mode information"""
if options.get("audio_only"):
console.print(f"[bold cyan]Audio mode:[/] Downloading as {options.get('audio_format', 'mp3')}")
fmt = options.get('audio_format', 'mp3')
quality = options.get('audio_quality', 'best')
console.print(f"[bold cyan]Audio mode:[/] {fmt} (quality: {quality})")
if options.get("upmix_71"):
console.print("[bold cyan]Audio processing:[/] Upmixing to 7.1 surround")
if options.get("denoise"):
console.print("[bold cyan]Audio processing:[/] Applying noise reduction")
if options.get("enhance_audio"):
preset = options.get("audio_enhancement_preset")
level = options.get("audio_enhancement_level", "medium")
label = f"preset={preset}" if preset else f"level={level}"
console.print(f"[bold cyan]Audio enhancement:[/] {label}")
elif options.get("resolution"):
resolution = options.get('resolution')
console.print(f"[bold cyan]Video mode:[/] Setting resolution to {resolution}")

codec = options.get('video_codec', 'auto')
codec_info = f" ({codec})" if codec != "auto" else ""
console.print(f"[bold cyan]Video mode:[/] {resolution}p{codec_info}")
else:
codec = options.get('video_codec', 'auto')
if codec != 'auto':
console.print(f"[bold cyan]Video mode:[/] best quality ({codec})")

# Log upscaling information
if options.get("upscale_video"):
method = options.get("upscale_method", "lanczos")
factor = options.get("upscale_factor", 2)
console.print(f"[bold cyan]Video upscaling:[/] {method} {factor}x enhancement enabled")
console.print(f"[bold cyan]Video upscaling:[/] {method} {factor}x")

@handle_errors(ErrorCategory.DOWNLOAD, ErrorSeverity.WARNING)
def _run_download_safely(self, urls: List[str], options: Dict[str, Any]) -> int:
Expand Down Expand Up @@ -353,7 +364,8 @@ def download(
target_lufs: float = typer.Option(-16.0, "--target-lufs", help="Target loudness in LUFS (-23.0 for broadcast)"),
declipping: bool = typer.Option(False, "--declipping", help="Remove audio clipping artifacts"),

# Video upscaling options
# Video codec and upscaling options
video_codec: str = typer.Option("auto", "--video-codec", help="Preferred video codec (h264, h265, vp9, av1, auto)"),
upscale_video: bool = typer.Option(False, "--upscale", "-u", help="Enable video upscaling"),
upscale_method: str = typer.Option("lanczos", "--upscale-method", help="Upscaling method (realesrgan, lanczos, bicubic)"),
upscale_factor: int = typer.Option(2, "--upscale-factor", help="Upscaling factor (2x, 4x)"),
Expand Down Expand Up @@ -415,7 +427,8 @@ def download(
"target_lufs": target_lufs,
"declipping": declipping,

# Video upscaling
# Video codec and upscaling
"video_codec": video_codec,
"upscale_video": upscale_video,
"upscale_method": upscale_method,
"upscale_factor": upscale_factor,
Expand Down Expand Up @@ -759,7 +772,7 @@ async def _show_active_downloads_async(self) -> None:
"""Show active downloads asynchronously"""
async with AsyncSessionManager(self.config["session_file"]) as sm:
sessions = sm.get_active_sessions()
console_obj = Console()
console_obj = get_console()
if sessions:
console_obj.print("[bold green]Active downloads:[/]")
for session in sessions:
Expand All @@ -769,7 +782,7 @@ async def _show_active_downloads_async(self) -> None:

async def interactive_mode(self) -> None:
"""Rich interactive mode with command history"""
console_obj = Console()
console_obj = get_console()
console_obj.print("[bold cyan]Starting interactive mode[/]")

# Try textual interface first, fallback to simple mode
Expand All @@ -791,57 +804,211 @@ async def _try_textual_interface(self, console_obj: Console) -> bool:

async def _run_simple_interactive_mode(self, console_obj: Console) -> None:
"""Run simple fallback interactive mode"""
from rich.panel import Panel

console_obj.print(Panel(
"[bold cyan]Snatch Interactive Mode[/]\n\n"
"Paste a URL to download. Add modifiers for format/quality:\n\n"
" [green]<URL>[/] Best quality video\n"
" [green]<URL> 1080[/] 1080p video\n"
" [green]<URL> flac[/] Audio as FLAC\n"
" [green]<URL> mp3[/] Audio as MP3\n"
" [green]<URL> upscale[/] Download + AI upscale\n\n"
"Type [green]help[/] for all commands, [green]q[/] to quit.",
title="Snatch",
border_style="cyan",
))

while True:
try:
command = console_obj.input("[bold cyan]Enter command (or 'help', 'exit'):[/] ")
command = command.strip().lower()

if await self._handle_command(command, console_obj):
break # Exit was requested

command = console_obj.input("\n[bold cyan]snatch>[/] ")
command = command.strip()
# Don't lowercase the whole thing — URLs are case-sensitive
command_lower = command.lower()

if await self._handle_command(command_lower if not command_lower.startswith(('http', 'www.')) else command, console_obj):
break

except KeyboardInterrupt:
console_obj.print(INTERRUPTED_MSG)
continue
except EOFError:
break
except Exception as error:
console_obj.print(f"[bold red]Command error:[/] {str(error)}")
continue

async def _handle_command(self, command: str, console_obj: Console) -> bool:
"""Handle interactive command. Returns True if exit was requested."""
if command in ['exit', 'quit']:
cmd_lower = command.strip().lower() if command else ""

if cmd_lower in ['exit', 'quit', 'q']:
console_obj.print("[bold cyan]Exiting interactive mode[/]")
return True
elif command == 'help':

elif cmd_lower in ['help', '?']:
self._show_help(console_obj)

elif command.startswith('download '):
await self._handle_download_command(command, console_obj)
elif command:

elif cmd_lower in ['clear', 'cls']:
console_obj.clear()

elif cmd_lower.startswith('download ') or cmd_lower.startswith('dl '):
parts = command.split(None, 1)
if len(parts) > 1:
await self._handle_smart_download(parts[1], console_obj)

elif cmd_lower.startswith(('audio ', 'flac ', 'mp3 ', 'opus ', 'wav ', 'm4a ', 'aac ')):
await self._handle_smart_download(command, console_obj)

elif command.startswith(('http://', 'https://', 'www.', 'HTTP://', 'HTTPS://')):
# Bare URL or "URL format/resolution" syntax — preserve original case for URL
await self._handle_smart_download(command, console_obj)

elif cmd_lower:
console_obj.print(f"{UNKNOWN_COMMAND_MSG} {command}")
console_obj.print(HELP_AVAILABLE_MSG)

return False

def _show_help(self, console_obj: Console) -> None:
"""Show help text for interactive mode"""
console_obj.print("[bold cyan]Available commands:[/]")
console_obj.print(" [green]download [URL][/] - Download media from URL")
console_obj.print(" [green]help[/] - Show this help")
console_obj.print(" [green]exit[/] or [green]quit[/] - Exit interactive mode")

async def _handle_download_command(self, command: str, console_obj: Console) -> None:
"""Handle download command in interactive mode"""
url = command[9:].strip()
from rich.table import Table
table = Table(title="Interactive Mode Commands", border_style="cyan", show_lines=False)
table.add_column("Command", style="green", min_width=28)
table.add_column("Description", style="white")

table.add_row("<URL>", "Download in best quality")
table.add_row("<URL> 1080", "Download at 1080p")
table.add_row("<URL> 720 / 4k / hd", "Download at 720p / 4K / 1080p")
table.add_row("<URL> flac", "Download audio as FLAC (lossless)")
table.add_row("<URL> mp3", "Download audio as MP3")
table.add_row("<URL> opus / wav / aac", "Download audio in other formats")
table.add_row("<URL> upscale", "Download + AI video upscaling (2x)")
table.add_row("<URL> upscale 4x", "Download + AI upscaling (4x)")
table.add_row("<URL> enhance", "Download + audio enhancement")
table.add_row("<URL> denoise", "Download + noise reduction")
table.add_row("audio <URL>", "Download audio (default format)")
table.add_row("flac <URL>", "Download audio as FLAC")
table.add_row("download <URL> / dl <URL>", "Download in best quality")
table.add_row("", "")
table.add_row("help / ?", "Show this help")
table.add_row("clear / cls", "Clear screen")
table.add_row("exit / quit / q", "Exit interactive mode")
console_obj.print(table)

def _parse_interactive_input(self, raw_input: str) -> tuple:
"""Parse freeform interactive input into (url, options).

Supports patterns like:
<URL> → best quality video
<URL> 1080 → 1080p video
<URL> flac → audio as FLAC
audio <URL> → audio default format
flac <URL> → audio as FLAC
<URL> upscale → download + AI upscale (2x)
<URL> upscale 4x → download + AI upscale (4x)
<URL> enhance → download + audio enhancement
<URL> denoise → download + noise reduction
"""
import re as _re

audio_formats = {'flac', 'mp3', 'opus', 'wav', 'm4a', 'aac'}
resolution_names = {'4k', '2k', 'hd', 'sd'}
resolution_pattern = _re.compile(r'^(\d{3,4})p?$')
upscale_factor_pattern = _re.compile(r'^(\d)x$')

parts = raw_input.strip().split()
url = None
options: Dict[str, Any] = {}

for part in parts:
lower = part.lower()

# Is this a URL?
if part.startswith(('http://', 'https://', 'www.')):
url = part
continue

# Audio format keyword
if lower in audio_formats:
options['audio_only'] = True
options['audio_format'] = lower
options['audio_quality'] = 'best'
continue

# "audio" keyword
if lower == 'audio':
options['audio_only'] = True
options['audio_format'] = options.get('audio_format', 'mp3')
options['audio_quality'] = 'best'
continue

# Resolution number (e.g., "1080", "720p")
m = resolution_pattern.match(lower)
if m:
options['resolution'] = m.group(1)
continue

# Named resolution
if lower in resolution_names:
options['resolution'] = lower
continue

# Upscale keyword
if lower == 'upscale':
options['upscale_video'] = True
options.setdefault('upscale_factor', 2)
options.setdefault('upscale_method', 'lanczos')
continue

# Upscale factor (e.g., "2x", "4x")
m = upscale_factor_pattern.match(lower)
if m:
options['upscale_video'] = True
options['upscale_factor'] = int(m.group(1))
continue

# Audio enhancement keywords
if lower == 'enhance':
options['enhance_audio'] = True
continue

if lower == 'denoise':
options['denoise'] = True
continue

return url, options

async def _handle_smart_download(self, raw_input: str, console_obj: Console) -> None:
"""Parse freeform input and start a download with detected options."""
url, options = self._parse_interactive_input(raw_input)

if not url:
console_obj.print("[yellow]Please provide a URL to download[/]")
console_obj.print(PROVIDE_URL_MSG)
return

console_obj.print(f"[cyan]Starting download of:[/] {url}")

# Show a concise summary of what we're about to do
mode_parts = []
if options.get('audio_only'):
mode_parts.append(f"audio/{options.get('audio_format', 'mp3')}")
if options.get('resolution'):
mode_parts.append(f"{options['resolution']}p")
if options.get('upscale_video'):
factor = options.get('upscale_factor', 2)
mode_parts.append(f"upscale {factor}x")
if options.get('enhance_audio'):
mode_parts.append("audio enhance")
if options.get('denoise'):
mode_parts.append("denoise")
mode = ", ".join(mode_parts) if mode_parts else "best quality"
console_obj.print(f"[cyan]Downloading:[/] {url} [dim]({mode})[/]")

try:
options = {} # Default options
await self.download_manager.download(url, **options)
console_obj.print(f"[bold green]Download complete:[/] {url}")
result = await self.download_manager.download_with_options([url], options)
if result:
console_obj.print(f"[bold green]Download complete[/]")
else:
console_obj.print(f"[bold yellow]Download finished with no output files[/]")
except Exception as e:
console_obj.print(f"[bold red]Download error:[/] {str(e)}")

Expand Down
16 changes: 14 additions & 2 deletions snatch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import re
import tempfile
import threading
import time
import asyncio
Expand Down Expand Up @@ -379,9 +380,20 @@ def test_functionality() -> bool:
"""Run basic tests to verify functionality"""
print(f"{Fore.CYAN}Running basic tests...{Style.RESET_ALL}")
try:
# Test configuration initialization
# Test configuration initialization (async function — must be run properly)
print(f"{Fore.CYAN}Testing configuration...{Style.RESET_ALL}")
config = initialize_config_async(force_validation=True)
try:
loop = asyncio.get_running_loop()
# Already in async context — can't use asyncio.run()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
config = pool.submit(lambda: asyncio.run(initialize_config_async(force_validation=True))).result()
except RuntimeError:
config = asyncio.run(initialize_config_async(force_validation=True))

if not config:
print(f"{Fore.RED}Failed to initialize configuration!{Style.RESET_ALL}")
return False

# Check if FFmpeg is available in the config
if not config.get("ffmpeg_location") or not validate_ffmpeg_installation():
Expand Down
Loading
Loading